Sunday, September 15, 2024

PySpark Fundamentals

By: Girish Gowda

PySpark Fundamentals

1. Basic DataFrame Operations

1.      Creating a DataFrame: spark.createDataFrame(data)

2.      Reading a CSV File: spark.read.csv("path/to/file.csv")

3.      Reading a JSON File: spark.read.json("path/to/file.json")

4.      Reading a Parquet File: spark.read.parquet("path/to/file.parquet")

5.      Writing DataFrame to CSV: df.write.csv("path/to/output")

6.      Writing DataFrame to JSON: df.write.json("path/to/output")

7.      Writing DataFrame to Parquet: df.write.parquet("path/to/output")

8.      Show DataFrame: df.show()

9.      Print Schema: df.printSchema()

10. Column Selection: df.select("column1", "column2")

11. Filtering Rows: df.filter(df["column"] > value)

12. Adding a New Column: df.withColumn("newColumn", expression)

13. Renaming a Column: df.withColumnRenamed("oldName", "newName")

14. Dropping a Column: df.drop("column")

15. Describe Data: df.describe().show()

16. Counting Rows: df.count()

17. Aggregating Data: df.groupBy("column").agg({"column": "sum"})

18. Sorting Data: df.sort("column")

19. Distinct Values: df.select("column").distinct()

Sample Data: df.sample(withReplacement, fraction, seed)

2. Advanced Data Manipulation

Joining DataFrames: df1.join(df2, "column")

Union DataFrames: df1.union(df2)

Pivot Table:

 df.groupBy("column").pivot("pivot_column").agg(count("another_column"))

Window Functions:

from pyspark.sql import Window;

 windowSpec = Window.partitionBy("column")

DataFrame to RDD Conversion: df.rdd

RDD to DataFrame Conversion: rdd.toDF()


Caching a DataFrame: df.cache()

Uncaching a DataFrame: df.unpersist()

Collecting Data to Driver: df.collect()

Broadcast Join:

from pyspark.sql.functions import broadcast;

 df1.join(broadcast(df2), "column")


3. Handling Missing and Duplicate Data

Dropping Rows with Null Values: df.na.drop()

Filling Null Values: df.na.fill(value)

Dropping Duplicate Rows: df.dropDuplicates()

Replacing Values: df.na.replace(["old_value"], ["new_value"])

4. Data Transformation

UDF (User Defined Function):

from pyspark.sql.functions import udf;

 udf_function = udf(lambda z: custom_function(z))

String Operations:

from pyspark.sql.functions import lower, upper;

df.select(upper(df["column"]))

Date and Time Functions:

from pyspark.sql.functions import current_date, current_timestamp;

df.select(current_date())

Numeric Functions:

from pyspark.sql.functions import abs, sqrt;

 df.select(abs(df["column"]))

Conditional Expressions:

from pyspark.sql.functions import when;

df.select(when(df["column"] > value, "true").otherwise("false"))

Type Casting: df.withColumn("column", df["column"].cast("new_type"))

Explode Function (Array to Rows):

from pyspark.sql.functions import explode;

df.withColumn("exploded_column", explode(df["array_column"]))

Pandas UDF:

from pyspark.sql.functions import pandas_udf;

@pandas_udf("return_type")

def pandas_function(col1, col2): return operation


Aggregating with Custom Functions:

 df.groupBy("column").agg(custom_agg_function(df["another_column"]))

Window Functions (Rank, Lead, Lag):

from pyspark.sql.functions import rank, lead, lag;

windowSpec = Window.orderBy("column");

df.withColumn("rank", rank().over(windowSpec));

Handling JSON Columns:

from pyspark.sql.functions import from_json, schema_of_json;

df.withColumn("parsed_json", from_json(df["json_column"], schema_of_json))

5. Data Profiling

Column Value Counts: df.groupBy("column").count()

Summary Statistics for Numeric Columns: df.describe()

Correlation Between Columns: df.stat.corr("column1", "column2")

Crosstabulation and Contingency Tables:

df.stat.crosstab("column1","column2")

Frequent Items in Columns: df.stat.freqItems(["column1", "column2"])

Approximate Quantile Calculation:

df.approxQuantile("column", [0.25, 0.5,0.75], relativeError)

6. Data Visualization (Integration with other libraries)

Convert to Pandas for Visualization: df.toPandas().plot(kind='bar')

Histograms using Matplotlib: df.toPandas()["column"].hist()

Box Plots using Seaborn: import seaborn as sns; sns.boxplot(x=df.toPandas()["column"])

Scatter Plots using Matplotlib: df.toPandas().plot.scatter(x='col1', y='col2')

7. Data Import/Export

Reading Data from JDBC Sources:

spark.read.format("jdbc").options(url="jdbc_url", dbtable="table_name").load()

Writing Data to JDBC Sources:

df.write.format("jdbc").options(url="jdbc_url", dbtable="table_name").save()

Reading Data from HDFS: spark.read.text("hdfs://path/to/file")

Writing Data to HDFS: df.write.save("hdfs://path/to/output")

Creating DataFrames from Hive Tables: spark.table("hive_table_name")

8. Working with Large Data

Partitioning Data: df.repartition(numPartitions)

Coalesce Partitions: df.coalesce(numPartitions)

Reading Data in Chunks:

spark.read.option("maxFilesPerTrigger",1).csv("path/to/file.csv")

Optimizing Data for Skewed Joins: df.repartition("skewed_column")

Handling Data Skew in Joins: df1.join(df2.hint("broadcast"), "column")

9. Spark SQL

Running SQL Queries on Data Frames:

df.createOrReplaceTempView("table");

spark.sql("SELECT * FROM table")

Registering UDF for SQL Queries:

spark.udf.register("udf_name", lambda x: custom_function(x))

Using SQL Functions in DataFrames:

from pyspark.sql.functions import expr;

df.withColumn("new_column", expr("SQL_expression"))

10. Machine Learning and Advanced Analytics

● VectorAssembler for Feature Vectors:

from pyspark.ml.feature import VectorAssembler;

assembler = VectorAssembler(inputCols=["col1", "col2"], outputCol="features")

StandardScaler for Feature Scaling:

from pyspark.ml.feature import StandardScaler;

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

Building a Machine Learning Pipeline:

from pyspark.ml import Pipeline; pipeline = Pipeline(stages=[assembler, scaler, ml_model])

Train-Test Split: train, test = df.randomSplit([0.7, 0.3])


● Model Fitting and Predictions:

model = pipeline.fit(train);

predictions = model.transform(test)

● Cross-Validation for Model Tuning:

from pyspark.ml.tuning import CrossValidator;

crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid)

Hyperparameter Tuning:

from pyspark.ml.tuning import ParamGridBuilder; paramGrid =

ParamGridBuilder().addGrid(model.param, [value1, value2]).build()

11. Graph and Network Analysis

Creating a GraphFrame:

from graphframes import GraphFrame;

 g = GraphFrame(vertices_df, edges_df)

Running Graph Algorithms: results = g.pageRank(resetProbability=0.15, maxIter=10)

Subgraphs and Motif Finding: g.find("(a)-[e]->(b); (b)-[e2]->(a)")

12. Streaming Data Processing

Reading from a Stream: spark.readStream.format("source").load()

Writing to a Stream: query =df.writeStream.outputMode("append").format("console").start()

● By: Girish Gowda

PySpark Fundamentals

1. Basic DataFrame Operations

1.      ● Creating a DataFrame: spark.createDataFrame(data)

2.       Reading a CSV File: spark.read.csv("path/to/file.csv")

3.       Reading a JSON File: spark.read.json("path/to/file.json")

4.       Reading a Parquet File: spark.read.parquet("path/to/file.parquet")

5.      ● Writing DataFrame to CSV: df.write.csv("path/to/output")

6.      ● Writing DataFrame to JSON: df.write.json("path/to/output")

7.      ● Writing DataFrame to Parquet: df.write.parquet("path/to/output")

8.      ● Show DataFrame: df.show()

9.      ● Print Schema: df.printSchema()

10. Column Selection: df.select("column1", "column2")

11. Filtering Rows: df.filter(df["column"] > value)

12. ● Adding a New Column: df.withColumn("newColumn", expression)

13. ● Renaming a Column: df.withColumnRenamed("oldName", "newName")

14. ● Dropping a Column: df.drop("column")

15. ● Describe Data: df.describe().show()

16. ● Counting Rows: df.count()

17. ● Aggregating Data: df.groupBy("column").agg({"column": "sum"})

18. ● Sorting Data: df.sort("column")

19. ● Distinct Values: df.select("column").distinct()

● Sample Data: df.sample(withReplacement, fraction, seed)

2. Advanced Data Manipulation

● Joining DataFrames: df1.join(df2, "column")

● Union DataFrames: df1.union(df2)

● Pivot Table:

 df.groupBy("column").pivot("pivot_column").agg(count("another_column"))

● Window Functions:

from pyspark.sql import Window;

 windowSpec = Window.partitionBy("column")

● DataFrame to RDD Conversion: df.rdd

● RDD to DataFrame Conversion: rdd.toDF()

By: Girish Gowda

● Caching a DataFrame: df.cache()

● Uncaching a DataFrame: df.unpersist()

● Collecting Data to Driver: df.collect()

● Broadcast Join:

from pyspark.sql.functions import broadcast;

 df1.join(broadcast(df2), "column")

3. Handling Missing and Duplicate Data

● Dropping Rows with Null Values: df.na.drop()

● Filling Null Values: df.na.fill(value)

● Dropping Duplicate Rows: df.dropDuplicates()

● Replacing Values: df.na.replace(["old_value"], ["new_value"])

4. Data Transformation

● UDF (User Defined Function):

from pyspark.sql.functions import udf;

 udf_function = udf(lambda z: custom_function(z))

● String Operations:

from pyspark.sql.functions import lower, upper;

df.select(upper(df["column"]))

● Date and Time Functions:

from pyspark.sql.functions import current_date, current_timestamp;

df.select(current_date())

● Numeric Functions:

from pyspark.sql.functions import abs, sqrt;

 df.select(abs(df["column"]))

● Conditional Expressions:

from pyspark.sql.functions import when;

df.select(when(df["column"] > value, "true").otherwise("false"))

● Type Casting: df.withColumn("column", df["column"].cast("new_type"))

● Explode Function (Array to Rows):

from pyspark.sql.functions import explode;

df.withColumn("exploded_column", explode(df["array_column"]))

● Pandas UDF:

from pyspark.sql.functions import pandas_udf;

@pandas_udf("return_type")

def pandas_function(col1, col2): return operation

By: Girish Gowda

● Aggregating with Custom Functions:

 df.groupBy("column").agg(custom_agg_function(df["another_column"]))

● Window Functions (Rank, Lead, Lag):

from pyspark.sql.functions import rank, lead, lag;

windowSpec = Window.orderBy("column");

df.withColumn("rank", rank().over(windowSpec));

● Handling JSON Columns:

from pyspark.sql.functions import from_json, schema_of_json;

df.withColumn("parsed_json", from_json(df["json_column"], schema_of_json))

5. Data Profiling

● Column Value Counts: df.groupBy("column").count()

● Summary Statistics for Numeric Columns: df.describe()

Correlation Between Columns: df.stat.corr("column1", "column2")

● Crosstabulation and Contingency Tables:

df.stat.crosstab("column1","column2")

● Frequent Items in Columns: df.stat.freqItems(["column1", "column2"])

● Approximate Quantile Calculation:

df.approxQuantile("column", [0.25, 0.5,0.75], relativeError)

6. Data Visualization (Integration with other libraries)

● Convert to Pandas for Visualization: df.toPandas().plot(kind='bar')

● Histograms using Matplotlib: df.toPandas()["column"].hist()

● Box Plots using Seaborn: import seaborn as sns; sns.boxplot(x=df.toPandas()["column"])

● Scatter Plots using Matplotlib: df.toPandas().plot.scatter(x='col1', y='col2')

7. Data Import/Export

● Reading Data from JDBC Sources:

spark.read.format("jdbc").options(url="jdbc_url", dbtable="table_name").load()

● Writing Data to JDBC Sources:

df.write.format("jdbc").options(url="jdbc_url", dbtable="table_name").save()

● Reading Data from HDFS: spark.read.text("hdfs://path/to/file")

● Writing Data to HDFS: df.write.save("hdfs://path/to/output")

By: Girish Gowda

● Creating DataFrames from Hive Tables: spark.table("hive_table_name")

8. Working with Large Data

● Partitioning Data: df.repartition(numPartitions)

● Coalesce Partitions: df.coalesce(numPartitions)

● Reading Data in Chunks:

spark.read.option("maxFilesPerTrigger",1).csv("path/to/file.csv")

● Optimizing Data for Skewed Joins: df.repartition("skewed_column")

● Handling Data Skew in Joins: df1.join(df2.hint("broadcast"), "column")

9. Spark SQL

● Running SQL Queries on DataFrames:

df.createOrReplaceTempView("table");

spark.sql("SELECT * FROM table")

● Registering UDF for SQL Queries:

spark.udf.register("udf_name", lambda x: custom_function(x))

● Using SQL Functions in DataFrames:

from pyspark.sql.functions import expr;

df.withColumn("new_column", expr("SQL_expression"))

10. Machine Learning and Advanced Analytics

● VectorAssembler for Feature Vectors:

from pyspark.ml.feature import VectorAssembler;

assembler = VectorAssembler(inputCols=["col1", "col2"], outputCol="features")

● StandardScaler for Feature Scaling:

from pyspark.ml.feature import StandardScaler;

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

● Building a Machine Learning Pipeline:

from pyspark.ml import Pipeline; pipeline = Pipeline(stages=[assembler, scaler, ml_model])

● Train-Test Split: train, test = df.randomSplit([0.7, 0.3])

By: Girish Gowda

● Model Fitting and Predictions:

model = pipeline.fit(train);

predictions = model.transform(test)

● Cross-Validation for Model Tuning:

from pyspark.ml.tuning import CrossValidator;

crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid)

● Hyperparameter Tuning:

from pyspark.ml.tuning import ParamGridBuilder; paramGrid =

ParamGridBuilder().addGrid(model.param, [value1, value2]).build()

11. Graph and Network Analysis

● Creating a GraphFrame:

from graphframes import GraphFrame;

 g = GraphFrame(vertices_df, edges_df)

● Running Graph Algorithms: results = g.pageRank(resetProbability=0.15, maxIter=10)

● Subgraphs and Motif Finding: g.find("(a)-[e]->(b); (b)-[e2]->(a)")

12. Streaming Data Processing

● Reading from a Stream: spark.readStream.format("source").load()

● Writing to a Stream: query =df.writeStream.outputMode("append").format("console").start()

● Window Operations on Streaming Data:

df.groupBy(window(df["timestamp"], "1 hour")).count()

● Handling Late Data and Watermarks: df.withWatermark("timestamp", "2 hours")

● Triggering Streaming: df.writeStream.trigger(processingTime='1 hour').start()

● Streaming Aggregations: df.groupBy("group_column").agg({"value": "sum"})

● Reading from Kafka:

spark.readStream.format("kafka").option("kafka.bootstrap.servers",

"host:port").option("subscribe", "topic").load()

● Writing to Kafka:

df.writeStream.format("kafka").option("kafka.bootstrap.servers",

"host:port").option("topic", "topic").start()

By: Girish Gowda

13. Advanced Dataframe Transformations

● Handling Complex Data Types (Arrays, Maps):

df.selectExpr("explode(array_column) as value")

● Flattening Nested Structures: df.selectExpr("struct_col.*")

● Pivoting and Unpivoting Data:

df.groupBy("group_col").pivot("pivot_col").sum()

● Creating Buckets or Bins: from pyspark.ml.feature import Bucketizer; bucketizer =

Bucketizer(splits=[0, 10, 20], inputCol="feature", outputCol="bucketed_feature")

● Normalization of Data: from pyspark.ml.feature import Normalizer; normalizer =

Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

14. Data Quality and Validation

● Data Integrity Checks: df.checkpoint()

● Schema Validation: df.schema == expected_schema

● Data Completeness Validation: df.count() == expected_count

● Column Value Range Validation: df.filter((df["column"] >= lower_bound) & (df["column"] <=

upper_bound))

15. Integration with Other Data Systems

● Reading from Hive Tables: spark.sql("SELECT * FROM hive_table")

● Writing to Hive Tables: df.write.saveAsTable("hive_table")

● Connecting to External Databases: df.write.jdbc(url, table, properties)

● Using Hadoop File System (HDFS): df.write.save("hdfs://path/to/save")

By: Girish Gowda

16. Performance Tuning and Optimization

● Broadcast Variables for Join Optimization:

from pyspark.sql.functions import broadcast;

df1.join(broadcast(df2), "join_column")

● Caching Intermediate Data: df.cache()

● Repartitioning for Parallel Processing: df.repartition(num_partitions)

● Avoiding Shuffle and Spill to Disk: df.coalesce(num_partitions)

● Tuning Spark Configuration and Parameters:

spark.conf.set("spark.executor.memory", "2g")

17. Exploratory Data Analysis Techniques

● Histograms for Exploring Distributions:

df.select('column').rdd.flatMap(lambda x: x).histogram(10)

● Quantile and Percentile Analysis:

df.approxQuantile("column", [0.25, 0.5,0.75], 0.0)

● Exploring Data with Spark SQL: spark.sql("SELECT * FROM df_table").show()

● Calculating Covariance and Correlation:

df.stat.cov("col1", "col2"), df.stat.corr("col1", "col2")

18. Dealing with Different Data Formats

● Handling Avro Files:

df.write.format("avro").save("path"),spark.read.format("avro").load("path")

● Dealing with ORC Files: df.write.orc("path"), spark.read.orc("path")

● Handling XML Data: (Using spark-xml library)

● Dealing with Binary Files: spark.read.format("binaryFile").load("path")

By: Girish Gowda

19. Handling Geospatial Data

● Using GeoSpark for Geospatial Operations: (Integrating with GeoSpark

library)

● Geospatial Joins and Calculations: (Using location-based joins and UDFs

for distance calculations)

20. Time Series Data Handling

● Time Series Resampling and Aggregation:

 df.groupBy(window(df["timestamp"], "1 hour")).agg({"value": "mean"})

● Time Series Window Functions:

from pyspark.sql.functions import window;

df.groupBy(window("timestamp", "1 hour")).mean()

21. Advanced Machine Learning Operations

● Custom Machine Learning Models with MLlib:

from pyspark.ml.classification import LogisticRegression; lr = LogisticRegression()

● Text Analysis with MLlib:

from pyspark.ml.feature import Tokenizer;

tokenizer = Tokenizer(inputCol="text", outputCol="words")

● Model Evaluation and Metrics:

from pyspark.ml.evaluation import BinaryClassificationEvaluator;

evaluator = BinaryClassificationEvaluator()

● Model Persistence and Loading: model.save("path"), ModelType.load("path")

22. Graph Analysis with GraphFrames

● Creating GraphFrames for Network Analysis:

from graphframes import GraphFrame;

g = GraphFrame(vertices_df, edges_df)

● Running Graph Algorithms (e.g., PageRank, Shortest Paths):

g.pageRank(resetProbability=0.15, maxIter=10)

By: Girish Gowda

23. Custom Transformation and UDFs

● Applying Custom Transformations: df.withColumn("custom_col", custom_udf("column"))

● Vector Operations for ML Features:

from pyspark.ml.linalg import Vectors;

df.withColumn("vector_col", Vectors.dense("column"))

24. Logging and Monitoring

● Logging Operations in Spark: spark.sparkContext.setLogLevel("WARN")

25. Best Practices and Patterns

● Following Data Partitioning Best Practices: (Optimizing partition

strategy for data size and operations)

● Efficient Data Serialization: (Using Kryo serialization for performance)

● Optimizing Data Locality: (Ensuring data is close to computation

resources)

● Error Handling and Recovery Strategies: (Implementing try-catch logic and

checkpointing)

26. Security and Compliance

● Data Encryption and Security: (Configuring Spark with encryption and

security features)

● GDPR Compliance and Data Anonymization: (Implementing data masking and

anonymization)

27. Advanced Data Science Techniques

● Deep Learning Integration (e.g., with TensorFlow): (Using Spark with

TensorFlow for distributed deep learning)

● Complex Event Processing in Streams: (Using structured streaming for

event pattern detection)

By: Girish Gowda

28. Cloud Integration

● Running Spark on Cloud Platforms (e.g., AWS, Azure, GCP): (Setting up

Spark clusters on cloud services)

● Integrating with Cloud Storage Services: (Reading and writing data to

cloud storage like S3, ADLS, GCS)


df.groupBy(window(df["timestamp"], "1 hour")).count()

● Handling Late Data and Watermarks: df.withWatermark("timestamp", "2 hours")

● Triggering Streaming: df.writeStream.trigger(processingTime='1 hour').start()

● Streaming Aggregations: df.groupBy("group_column").agg({"value": "sum"})

● Reading from Kafka:

spark.readStream.format("kafka").option("kafka.bootstrap.servers",

"host:port").option("subscribe", "topic").load()

● Writing to Kafka:

df.writeStream.format("kafka").option("kafka.bootstrap.servers",

"host:port").option("topic", "topic").start()


13. Advanced Dataframe Transformations

● Handling Complex Data Types (Arrays, Maps):

df.selectExpr("explode(array_column) as value")

● Flattening Nested Structures: df.selectExpr("struct_col.*")

● Pivoting and Unpivoting Data:

df.groupBy("group_col").pivot("pivot_col").sum()

● Creating Buckets or Bins: from pyspark.ml.feature import Bucketizer; bucketizer =

Bucketizer(splits=[0, 10, 20], inputCol="feature", outputCol="bucketed_feature")

● Normalization of Data: from pyspark.ml.feature import Normalizer; normalizer =

Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

14. Data Quality and Validation

● Data Integrity Checks: df.checkpoint()

● Schema Validation: df.schema == expected_schema

● Data Completeness Validation: df.count() == expected_count

● Column Value Range Validation: df.filter((df["column"] >= lower_bound) & (df["column"] <=

upper_bound))

15. Integration with Other Data Systems

● Reading from Hive Tables: spark.sql("SELECT * FROM hive_table")

● Writing to Hive Tables: df.write.saveAsTable("hive_table")

● Connecting to External Databases: df.write.jdbc(url, table, properties)

● Using Hadoop File System (HDFS): df.write.save("hdfs://path/to/save")

By: Girish Gowda

16. Performance Tuning and Optimization

● Broadcast Variables for Join Optimization:

from pyspark.sql.functions import broadcast;

df1.join(broadcast(df2), "join_column")

● Caching Intermediate Data: df.cache()

● Repartitioning for Parallel Processing: df.repartition(num_partitions)

● Avoiding Shuffle and Spill to Disk: df.coalesce(num_partitions)

● Tuning Spark Configuration and Parameters:

spark.conf.set("spark.executor.memory", "2g")

17. Exploratory Data Analysis Techniques

● Histograms for Exploring Distributions:

df.select('column').rdd.flatMap(lambda x: x).histogram(10)

● Quantile and Percentile Analysis:

df.approxQuantile("column", [0.25, 0.5,0.75], 0.0)

● Exploring Data with Spark SQL: spark.sql("SELECT * FROM df_table").show()

● Calculating Covariance and Correlation:

df.stat.cov("col1", "col2"), df.stat.corr("col1", "col2")

18. Dealing with Different Data Formats

● Handling Avro Files:

df.write.format("avro").save("path"),spark.read.format("avro").load("path")

● Dealing with ORC Files: df.write.orc("path"), spark.read.orc("path")

● Handling XML Data: (Using spark-xml library)

● Dealing with Binary Files: spark.read.format("binaryFile").load("path")

By: Girish Gowda

19. Handling Geospatial Data

● Using GeoSpark for Geospatial Operations: (Integrating with GeoSpark

library)

● Geospatial Joins and Calculations: (Using location-based joins and UDFs

for distance calculations)

20. Time Series Data Handling

● Time Series Resampling and Aggregation:

 df.groupBy(window(df["timestamp"], "1 hour")).agg({"value": "mean"})

● Time Series Window Functions:

from pyspark.sql.functions import window;

df.groupBy(window("timestamp", "1 hour")).mean()

21. Advanced Machine Learning Operations

● Custom Machine Learning Models with MLlib:

from pyspark.ml.classification import LogisticRegression; lr = LogisticRegression()

● Text Analysis with MLlib:

from pyspark.ml.feature import Tokenizer;

tokenizer = Tokenizer(inputCol="text", outputCol="words")

● Model Evaluation and Metrics:

from pyspark.ml.evaluation import BinaryClassificationEvaluator;

evaluator = BinaryClassificationEvaluator()

● Model Persistence and Loading: model.save("path"), ModelType.load("path")

22. Graph Analysis with GraphFrames

● Creating GraphFrames for Network Analysis:

from graphframes import GraphFrame;

g = GraphFrame(vertices_df, edges_df)

● Running Graph Algorithms (e.g., PageRank, Shortest Paths):

g.pageRank(resetProbability=0.15, maxIter=10)

By: Girish Gowda

23. Custom Transformation and UDFs

● Applying Custom Transformations: df.withColumn("custom_col", custom_udf("column"))

● Vector Operations for ML Features:

from pyspark.ml.linalg import Vectors;

df.withColumn("vector_col", Vectors.dense("column"))

24. Logging and Monitoring

● Logging Operations in Spark: spark.sparkContext.setLogLevel("WARN")

25. Best Practices and Patterns

● Following Data Partitioning Best Practices: (Optimizing partition

strategy for data size and operations)

● Efficient Data Serialization: (Using Kryo serialization for performance)

● Optimizing Data Locality: (Ensuring data is close to computation

resources)

● Error Handling and Recovery Strategies: (Implementing try-catch logic and

checkpointing)

26. Security and Compliance

● Data Encryption and Security: (Configuring Spark with encryption and

security features)

● GDPR Compliance and Data Anonymization: (Implementing data masking and

anonymization)

27. Advanced Data Science Techniques

● Deep Learning Integration (e.g., with TensorFlow): (Using Spark with

TensorFlow for distributed deep learning)

● Complex Event Processing in Streams: (Using structured streaming for

event pattern detection)

By: Girish Gowda

28. Cloud Integration

● Running Spark on Cloud Platforms (e.g., AWS, Azure, GCP): (Setting up

Spark clusters on cloud services)

● Integrating with Cloud Storage Services: (Reading and writing data to

cloud storage like S3, ADLS, GCS)


Friday, September 13, 2024

Spark Tips for setting

 **🔥 Cracking the Code: Mastering Databricks for 1TB Data Processing with Pro-Level Performance Tuning! 🚀** 

Ready to take on the challenge of processing 1TB of data in Databricks like a true expert? 💪 It's not just about having the right configurations—it's about mastering the nuances and tuning your cluster to perfection. Let’s dive deep into advanced strategies to ensure your Spark jobs are lightning-fast, efficient, and cost-effective! 

⚡ ### 🧠 **Step 1: Intelligent Partitioning for Massive Data Sets** ✅**What’s the Deal?** For 1TB of data, partitioning isn’t just important—it’s critical! With 128MB as the default partition size: ✅ **Calculation**: 1TB = 1,024,000MB ➡️ 1,024,000MB / 128MB = **8,000 partitions**.

 ✅**Optimization Alert 🚨**: Aim for ~200MB per partition for better parallelism. Adjust the artition size using `spark.sql.files.maxPartitionBytes` for more granular control and enhanced performance. 

✅ 🔥🔥 Po Tip 💡**: Avoid small files syndrome—combine smaller files to reduce overhead and improve processing speed!

 ### 🔥 **Step 2: Optimizing Executor Cores—Beyond the Basics** 

✅**Don’t Get Stuck!** The common mistake? Overloading executors with too many tasks! Start with 4–5 cores per executor and monitor for **task queue delays**. Too many cores = memory contention; too few = underutilized CPUs. 

✅- **Optimal Config**: For 8,000 partitions, 1,600 executors with 5 cores each strike a good balance. 

✅**High-Impact Tip**: Use **Dynamic Resource Allocation** to automatically scale executor numbers based on the workload. Set `spark.dynamicAllocation.enabled` to `true` to let Spark adjust resources on the fly. 

### 💾 **Step 3: Supercharging Executor Memory for Heavy Lifting** 

✅**Memory Management 101**: For large-scale processing, consider the rule of thumb: 

✅- **Memory Per Core**: Allocate 512MB per core as a baseline but bump it up based on shuffle intensity. 

✅- **Total Memory per Executor**: With 5 cores, you’re looking at 2.5GB minimum per executor. For 1,600 executors, you need a total of **4TB of memory**. 

✅ **Avoid Memory Pitfalls**: Enable **Memory Overhead** to handle large shuffle operations and avoid out-of-memory errors. Set `spark.executor.memoryOverhead` to ~10% of executor memory. 🌟

### 🚀 **Step 4: Advanced Performance Tuning—Go Beyond Default Settings!**

🌟 1. **Adaptive Query Execution (AQE) 🛠**: Turn on AQE (`spark.sql.adaptive.enabled`) to allow Spark to optimize its query plan at runtime, especially helpful for skewed data. 

2. **Broadcast Joins 🌐**: For joining massive datasets, use broadcast joins where appropriate. Broadcast smaller datasets to all executors with `spark.sql.autoBroadcastJoinThreshold`. 

3. **Shuffle Optimization 🌀**: Adjust `spark.sql.shuffle.partitions`—bump it up from the default (200) to something more suitable like 1,000+ for 1TB data.

 4. **Caching & Persistence 📥**: Use `.persist()` strategically to cache intermediate results that are reused, reducing redundant computation. 

### 💡 **Final Thought: Driver Memory—Keep It in Check!** 

- **Driver Memory Tip**: Unless you’re collecting massive results back to the driver, keep driver memory reasonable—2–3x the executor memory. Avoid the `collect()` trap with large datasets unless absolutely necessary!

 ### **Your Call to Action: Unlock the Power of Databricks Today! 

🌟** By optimizing partitioning, carefully configuring executors, and leveraging advanced features like AQE and broadcast joins, you’re not just processing 1TB of data—you’re **mastering** it. 

🚀 Was this insightful? If you found value in this deep dive, hit that 👍 and share with your network! Let’s transform how we handle big data!

 🌍 🌟🌟🌟🌟Here are some key Spark configurations you can use for optimizing performance, particularly processing large datasets like 1TB. Each configuration is explained with its use case and impact:🌟🌟🌟🌟

 ### **Essential Spark Configurations for Optimizing Performance**

 ✅1. **`spark.executor.memory`**: 

- **Purpose**: Sets the amount of memory allocated to each executor. 

- **Usage**: `spark.executor.memory = 8g` (8 GB per executor)

 - **Benefit**: Ensures executors have sufficient memory to handle tasks, reducing the risk of OutOfMemory errors and improving performance for memory-intensive operations.

 ✅2. **`spark.executor.cores`**:

 - **Purpose**: Specifies the number of cores allocated to each executor. 

- **Usage**: `spark.executor.cores = 4` 

- **Benefit**: Determines the parallelism within each executor. More cores mean more tasks can be processed simultaneously within each executor, enhancing parallel processing capabilities.

 ✅3. **`spark.sql.shuffle.partitions`**:

- **Purpose**: Sets the number of partitions to use when shuffling data for joins or aggregations. 

- **Usage**: `spark.sql.shuffle.partitions = 1000`

 - **Benefit**: Controls the size of shuffle partitions. A higher number of partitions can improve parallelism and avoid bottlenecks, but setting it too high can cause overhead. Finding the right balance based on your data size is crucial. 

✅4. **`spark.sql.autoBroadcastJoinThreshold`**: 

- **Purpose**: Sets the threshold for broadcasting small tables in joins. 

- **Usage**: `spark.sql.autoBroadcastJoinThreshold = 10MB`

 - **Benefit**: Automatically broadcasts smaller tables to all nodes to speed up join operations. Useful for optimizing performance when dealing with smaller datasets that can fit into memory.

 ✅5. **`spark.sql.adaptive.enabled`**:

 - **Purpose**: Enables Adaptive Query Execution (AQE) to optimize query plans dynamically. - **Usage**: `spark.sql.adaptive.enabled = true` 

- **Benefit**: Adjusts query execution plans based on runtime statistics, improving performance by optimizing joins, aggregations, and data partitions dynamically.

 ✅6. **`spark.sql.files.maxPartitionBytes`**: 

- **Purpose**: Defines the maximum size of a partition when reading files. 

- **Usage**: `spark.sql.files.maxPartitionBytes = 128MB`

- **Benefit**: Controls the size of each partition. Smaller partitions can reduce shuffle sizes and improve parallelism, but too small can lead to excessive overhead. 

✅7. **`spark.sql.files.openCostInBytes`**: -

 **Purpose**: Sets the cost of opening a file for reading in bytes. 

- **Usage**: `spark.sql.files.openCostInBytes = 4MB` 

- **Benefit**: Helps Spark decide whether to combine smaller files into a single partition or not. Helps in optimizing read performance for large numbers of small files. 

✅8. **`spark.dynamicAllocation.enabled`**: 

- **Purpose**: Enables dynamic allocation of executors based on workload. 

- **Usage**: `spark.dynamicAllocation.enabled = true` 

- **Benefit**: Adjusts the number of executors dynamically based on the workload, reducing resource wastage and optimizing cluster usage. 

✅9. **`spark.executor.memoryOverhead`**: 

- **Purpose**: Sets additional memory for each executor to handle overhead operations. 

- **Usage**: `spark.executor.memoryOverhead = 1g` 

- **Benefit**: Allocates extra memory for non-heap operations like garbage collection and network communication, reducing the risk of out-of-memory errors.

 ### **How These Configurations Help**

 - **Memory Management**: `spark.executor.memory` and `spark.executor.memoryOverhead` ensure that each executor has enough memory for processing and overhead tasks, reducing errors and improving stability.

 - **Parallelism**: `spark.executor.cores` and `spark.sql.shuffle.partitions` enhance parallel processing, speeding up data processing tasks by leveraging more cores and optimized partitioning. 

- **Adaptive Optimization**: `spark.sql.adaptive.enabled` dynamically adjusts query plans based on real-time data, improving execution efficiency and query performance.

 - **Efficient Joins**: `spark.sql.autoBroadcastJoinThreshold` helps in optimizing join operations by broadcasting smaller tables, which can significantly reduce the time taken for joins.

 - **File Handling**: `spark.sql.files.maxPartitionBytes` and `spark.sql.files.openCostInBytes` optimize how data files are read and partitioned, improving read performance and managing large numbers of small files. 

- **Resource Utilization**: `spark.dynamicAllocation.enabled` adjusts resources based on current workload, improving resource utilization and cost-effectiveness. Implementing these configurations can greatly enhance Spark job performance, particularly for large-scale data processing tasks. Adjusting these settings based on your specific workload and cluster resources can lead to more efficient and faster data processing.

Monday, February 20, 2023

git_

 -bash-4.2$ cat git_post_automation.py

#!/usr/bin/python3

from subprocess import Popen,PIPE

from email.mime.multipart import MIMEMultipart

import smtplib

from datetime import datetime

from email.mime.base import MIMEBase

from email import encoders

from email.mime.text import MIMEText

import sys

import os


node = ""

server_name = ""

try :

    env=sys.argv[1]

    if env.lower() == "dr":

        print("DR")

        node = "[sps_eap_cust360_hdr@hnr01n02-d]$"

        server_name = "sps_eap_cust360_hdr"

    elif env.lower() == "itg":

        print("itg")

        node = "[srvc_nextgen_hitg@hnr02n04~]$"

        server_name = "srvc_nextgen_hitg"

    elif env.lower() == "prod":

        print("prod")

        node = "[srvc_eap_cust360_hpro@hnr04n01~]$"

        server_name = "srvc_eap_cust360_hpro"

    else:

        print("please enter the python git_post_auotation.py <itg/dr/prod> ")

except :

    print("please enter the python git_post_auotation.py <itg/dr/prod> ")

print(node)


sender = "shravan.narisetty@hpe.com"  # the sender's email address 'hpe-it-eap-cp360_support@hpe.com'

receiver = "shravan.narisetty@hpe.com" # the recipient's email address 'hpe-it-eap-cp360_support@hpe.com'

msg = MIMEMultipart('alternative')

msg['From'] = sender

msg['To'] = receiver


text = "Hi Team,<br/><br/>Please find the status of daily job logs as of " + datetime.today().strftime(


            "%d-%b-%Y %H:%M:%S GMT") + " <br/>"

html = """

        <html>

            <head>

                <style type="text/css" media="screen"> th, td {padding: 1px;} </style>

            </head>

            <body"> """

##########################################################################

############check jar file ###############################################

##########################################################################

html = html + """<p>Taking screenshot JAR File</p>"""

dr_jar="hadoop fs -ls /user/"+server_name+"/cp360/oozie/hpe-eap-oozie-cp360/jar_main/"

t1 = os.system(dr_jar+"|grep -i EAP_[CP]360-1.0.jar>a.txt")

html = html + """<p style="background-color:black;color:white">""" + node + dr_jar + """</br> """

with open("a.txt") as file1:

    for w_f in file1:

        print(w_f)

        html =html+ w_f + """</br> """+"""</p1>"""


##########################################################################

############check oozie file ###############################################

##########################################################################

html = html + """<p>Taking screenshot oozie customer side</p>"""

dr_oozie_cust="hadoop fs -ls /user/"+server_name+"/cp360/oozie/hpe-eap-oozie-cp360/customer/"

t1 = os.system(dr_oozie_cust+">a.txt")

html = html + """<p style="background-color:black;color:white">""" + node + dr_oozie_cust + """</br> """

with open("a.txt") as file1:

    for w_f in file1:

        print(w_f)

        html =html+ w_f +"""</br> """+ """</p1>"""


##########################################################################

############check oozie partner side###############################################

##########################################################################

html = html + """<p>Taking screenshot partner side</p>"""

dr="[sps_eap_cust360_hdr@hnr01n02-d]$"

dr_oozie_prnt="hadoop fs -ls /user/"+server_name+"/cp360/oozie/hpe-eap-oozie-cp360/partner"

t1 = os.system(dr_oozie_prnt+">a.txt")

html = html + """<p style="background-color:black;color:white">""" + node + dr_oozie_prnt + """</br> """

with open("a.txt") as file1:

    for w_f in file1:

        print(w_f)

        html =html+ w_f +"""</br> """+ """</p1>"""


##########################################################################

############check properties file ###############################################

##########################################################################

html = html + """<p>Taking screenshot properties side</p>"""

dr_properties="hadoop fs -ls /user/"+server_name+"/cp360/properties/"

t1 = os.system(dr_properties+">a.txt")

html = html + """<p style="background-color:black;color:white">""" + node + dr_properties + """</br> """

with open("a.txt") as file1:

    for w_f in file1:

        print(w_f)

        html =html+ w_f +"""</br> """+ """</p1>"""

##########################################################################

############check python script ###############################################

##########################################################################

html = html + """<p>Taking screenshot python script</p>"""

dr_python="ls -ltr /home/"+server_name+"/cp360/python_scripts"

t1 = os.system(dr_python+">a.txt")

html = html + """<p style="background-color:black;color:white">""" + node + dr_python + """</br> """

with open("a.txt") as file1:

    for w_f in file1:

        print(w_f)

        html =html+ w_f +"""</br> """+ """</p1>"""


##########################################################################

############check Shell shell script ###############################################

##########################################################################

html = html + """<p>Taking screenshot shell script</p>"""

dr_shell="ls -ltr /home/"+server_name+"/cp360/shell_scripts"

t1 = os.system(dr_shell+">a.txt")

html = html + """<p style="background-color:black;color:white">""" + node + dr_shell + """</br> """

with open("a.txt") as file1:

    for w_f in file1:

        print(w_f)

        html =html+ w_f +"""</br> """+ """</p1>"""


##########################################################################

############check Shell properties script ###############################################

##########################################################################

html = html + """<p>Taking screenshot shell script script</p>"""

dr_prop="ls -ltr /home/"+server_name+"/cp360/properties"

t1 = os.system(dr_prop+">a.txt")

html = html + """<p style="background-color:black;color:white">""" + node + dr_prop + """</br> """

with open("a.txt") as file1:

    for w_f in file1:

        print(w_f)

        html =html+ w_f +"""</br> """+ """</p1>"""



html =html+"""   </body>

                 <p><font face="Calibri"><br/>Thank You,<br/>CP360 Support.</font></p>

         </html>"""

msg['Subject'] = "CP360 - Pre $ Post Screenslot in "+ env.upper() +" "+ datetime.today().strftime('%d-%m-%Y %H:%M:%S GMT')

part1 = MIMEText(html, 'html')

msg.attach(part1)

s = smtplib.SMTP('smtp3.hpe.com')

s.sendmail(sender, receiver.split(","), msg.as_string())

s.quit()


Wednesday, December 28, 2022

spark_array_columns_array_list


Here we have created two DataFrames df and full_df which contain two columns and three columns respectively.

Check schema

Let’s check the schema of the above DataFrame full_df

The output shows that col1 is string type and array_col1 and array_col2 are array type:

root
|-- col1: string (nullable = true)
|-- array_col1: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- array_col2: array (nullable = true)
| |-- element: integer (containsNull = true)

Similarly for DataFramedf, the schema looks like this:

root
|-- col1: string (nullable = true)
|-- array_col2: array (nullable = true)
| |-- element: integer (containsNull = true)

In the examples that follow we will use df for functions that take a single array as input and df_full for functions that take two arrays as input.

Array functions

array_contains

If we need to find a particular element is present in array, we can use array_contains function. This function returns true if the value is present in array and false otherwise.

Output:

+----+------------------+------+
|col1| array_col2|result|
+----+------------------+------+
| x| [1, 2, 3, 7, 7]| true|
| z|[3, 2, 8, 9, 4, 9]| true|
| a| [4, 5, 2, 8]| false|
+----+------------------+------+

We are looking for value ‘3’ in array column array_col2 and getting true for the first two rows where ‘3’ is present and false for last column because ‘3’ is not present.

array_distinct

This function returns only distinct values from an array and removes duplicate values.

Output:

+----+------------------+---------------+
|col1| array_col2| result|
+----+------------------+---------------+
| x| [1, 2, 3, 7, 7]| [1, 2, 3, 7]|
| z|[3, 2, 8, 9, 4, 9]|[3, 2, 8, 9, 4]|
| a| [4, 5, 2, 8]| [4, 5, 2, 8]|
+----+------------------+---------------+

Duplicate values got removed and only distinct values are present from array column result.

array_except

This function returns the elements from first array which are not present in second array. This is logically equivalent to set subtract operation.

Output:

+----+------------------+------------------+---------+
|col1| array_col1| array_col2| result|
+----+------------------+------------------+---------+
| x| [4, 6, 7, 9, 2]| [1, 2, 3, 7, 7]|[4, 6, 9]|
| z|[7, 5, 1, 4, 7, 1]|[3, 2, 8, 9, 4, 9]|[7, 5, 1]|
| a| [3, 8, 5, 3]| [4, 5, 2, 8]| [3]|
+----+------------------+------------------+---------+

Column result contains elements that are only present in array_col1 but not in array_col2. For example, in the first row the result column contains [4, 6, 9] because these elements are present in array_col1 but not in array_col2.

array_intersect

This function returns common elements from both arrays. This is logically equivalent to set intersection operation.

Output:

+----+------------------+------------------+------+
|col1| array_col1| array_col2|result|
+----+------------------+------------------+------+
| x| [4, 6, 7, 9, 2]| [1, 2, 3, 7, 7]|[7, 2]|
| z|[7, 5, 1, 4, 7, 1]|[3, 2, 8, 9, 4, 9]| [4]|
| a| [3, 8, 5, 3]| [4, 5, 2, 8]|[8, 5]|
+----+------------------+------------------+------+

Column result contains the elements that are common in both the array columns (array_col1 and array_col2). For example, in the first row the result column contains [7, 2] because these elements are present in botharray_col1 and array_col2 .

array_join

This Function joins all the array elements based on delimiter defined as the second argument.

Output:

+----+------------------+-----------+
|col1| array_col2| result|
+----+------------------+-----------+
| x| [1, 2, 3, 7, 7]| 1,2,3,7,7|
| z|[3, 2, 8, 9, 4, 9]|3,2,8,9,4,9|
| a| [4, 5, 2, 8]| 4,5,2,8|
+----+------------------+-----------+

Column result contains a string which is a concatenation of all the array elements using a specified delimiter (comma in this example).

Note: if there are any null values then we can replace with third argument (nullReplacement) with any string value.

array_max

This function returns the maximum value from an array.

Output:

+----+------------------+------+
|col1| array_col2|result|
+----+------------------+------+
| x| [1, 2, 3, 7, 7]| 7|
| z|[3, 2, 8, 9, 4, 9]| 9|
| a| [4, 5, 2, 8]| 8|
+----+------------------+------+

Column result contains the maximum value from each array in a row. For example, in the first row the result column contains ‘7’ because this is the maximum element in array [1, 2, 3, 7, 7].

array_min

This function returns the minimum value from an array.

Output:

+----+------------------+------+
|col1| array_col2|result|
+----+------------------+------+
| x| [1, 2, 3, 7, 7]| 1|
| z|[3, 2, 8, 9, 4, 9]| 2|
| a| [4, 5, 2, 8]| 2|
+----+------------------+------+

Column result contains the minimum value from each array in a row. For example, in the first row the result column contains ‘1’ because this is the minimum element in array [1, 2, 3, 7, 7].

array_position

This function returns the position of first occurrence of a specified element. If the element is not present it returns 0.

Let’s try to find the position of element say ‘7’ from column array_col2 .

Output:

+----+------------------+------+
|col1| array_col2|result|
+----+------------------+------+
| x| [1, 2, 3, 7, 7]| 4|
| z|[3, 2, 8, 9, 4, 9]| 0|
| a| [4, 5, 2, 8]| 0|
+----+------------------+------+

In the first row we get position ‘4’ because ‘7’ occurs the first time in position four. For the rest of the rows, we get ‘0’ because ‘7’ is not present.

array_remove

This function removes all the occurrences of an element from an array.

Let’s remove the element ‘7’ from column array_col2.

Output:

+----+------------------+------------------+
|col1| array_col2| result|
+----+------------------+------------------+
| x| [1, 2, 3, 7, 7]| [1, 2, 3]|
| z|[3, 2, 8, 9, 4, 9]|[3, 2, 8, 9, 4, 9]|
| a| [4, 5, 2, 8]| [4, 5, 2, 8]|
+----+------------------+------------------+

All occurrences of element ‘7’ are removed from array.

array_repeat

This function creates an array that is repeated as specified by second argument.

Output:

+----+------------------+----------------------------------------+ |col1|array_col2        |result                                  | +----+------------------+----------------------------------------+ |x   |[1, 2, 3, 7, 7]   |[[1, 2, 3, 7, 7], [1, 2, 3, 7, 7]]      | |z   |[3, 2, 8, 9, 4, 9]|[[3, 2, 8, 9, 4, 9], [3, 2, 8, 9, 4, 9]]| |a   |[4, 5, 2, 8]      |[[4, 5, 2, 8], [4, 5, 2, 8]]            | +----+------------------+----------------------------------------+

Array from array_col2 got repeated 2 times in result column. For example, in the first row the result column contains the array [1, 2, 3, 7, 7] twice.

array_sort

This function sorts the elements of an array in ascending order.

Output:

+----+------------------+------------------+
|col1| array_col2| result|
+----+------------------+------------------+
| x| [1, 2, 3, 7, 7]| [1, 2, 3, 7, 7]|
| z|[3, 2, 8, 9, 4, 9]|[2, 3, 4, 8, 9, 9]|
| a| [4, 5, 2, 8]| [2, 4, 5, 8]|
+----+------------------+------------------+

Array in column result got sorted in ascending order. For example, in the last row the column result contains [2, 4, 5, 8] which is sorted in ascending order.

array_union

This function returns the union of all elements from the input arrays.

Output:

+------------------+------------------+------------------------+ |array_col1        |array_col2        |result                  | 
+------------------+------------------+------------------------+
|[4, 6, 7, 9, 2] |[1, 2, 3, 7, 7] |[4, 6, 7, 9, 2, 1, 3] |
|[7, 5, 1, 4, 7, 1]|[3, 2, 8, 9, 4, 9]|[7, 5, 1, 4, 3, 2, 8, 9]|
|[3, 8, 5, 3] |[4, 5, 2, 8] |[3, 8, 5, 4, 2] |
+------------------+------------------+------------------------+

Note: dropped the col1 to fit the result here in code block.

Column result contains the union of arrays from column array_col1 and array_col2 and contains distinct values only.

arrays_overlap

This function checks if at least one element is common/overlapping in arrays. It returns true if at least one element is common in both array and false otherwise. It returns null if at least one of the arrays is null.

Output:

+----+------------------+------------------+------+
|col1|array_col1 |array_col2 |result|
+----+------------------+------------------+------+
|x |[4, 6, 7, 9, 2] |[1, 2, 3, 7, 7] |true |
|z |[7, 5, 1, 4, 7, 1]|[3, 2, 8, 9, 4, 9]|true |
|a |[3, 8, 5, 3] |[4, 5, 2, 8] |true |
+----+------------------+------------------+------+

All the values in result column are true because we have at-least one element common in array_col1 and array_col2 for all rows. For example, in the first row the result column is true because the elements ‘2’ and ‘7’ are present in both columns array_col1 and array_col2.

arrays_zip

This function merges the i-th element of an array and returns array<struct>.

Since both the array columns have same numbers of values, let’s remove some values from one array column and see how it behaves with different values in the array with zip operation.

First, we will remove element ‘2’ from array column array_col2 and then try to zip column array_col1 with newly created column new_array_col

Output:

+------------------+---------------+-------------------------------+
|array_col1 |new_array_col |result |
+------------------+---------------+-------------------------------+
|[4, 6, 7, 9, 2] |[1, 3, 7, 7] |[[4, 1], [6, 3] ..., [2,]] |
|[7, 5, 1, 4, 7, 1]|[3, 8, 9, 4, 9]|[[7, 3], [5, 8] ..., [1,]] |
|[3, 8, 5, 3] |[4, 5, 8] |[[3, 4], ... [3,]] |
+------------------+---------------+-------------------------------+

In first row, first element of result column is [4, 1] which is a zip of first element from array array_col1 (4) and new_array_col (1). Also, last element of result column is [2,] (which is a zip of 5-th element) and second value is blank because there is no 5-th element in first row of column new_array_col.

Let’s check the type of the result columns as well.

Output:

root
|-- result: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- array_col1: integer (nullable = true)
| | |-- new_array_col: integer (nullable = true)

Type of the result column is array<struct>.

concat

This function concatenates all the elements of both arrays into a single one.

Output:

+------------------+------------------+----------------------------+
|array_col1 |array_col2 |result |
+------------------+------------------+----------------------------+
|[4, 6, 7, 9, 2] |[1, 2, 3, 7, 7] |[4, 6, 7, 9, ..., 3, 7, 7] |
|[7, 5, 1, 4, 7, 1]|[3, 2, 8, 9, 4, 9]|[7, 5, 1, 4, ..., 9, 4, 9] |
|[3, 8, 5, 3] |[4, 5, 2, 8] |[3, 8, 5, 3, 4, 5, 2, 8] |
+------------------+------------------+----------------------------+

Column result contains the array which is a concatenation of arrays in columns array_col1 and array_col2.

Note: In order to fit the output we removed a few elements from result column from the display.

element_at

This function returns the element at a specified index.

Let’s try to get the first element from each array.

Output:

+----+------------------+------+
|col1|array_col2 |result|
+----+------------------+------+
|x |[1, 2, 3, 7, 7] |1 |
|z |[3, 2, 8, 9, 4, 9]|3 |
|a |[4, 5, 2, 8] |4 |
+----+------------------+------+

Column result contains the first element from each array. For example, in the first row the result contains ‘1’ because this is first element in the array [1, 2, 3, 7, 7].

flatten

This function returns a single array from array of an arrays. If an array is more than 2 levels deep, it removes one level of nesting from an array.

Let’s first generate the nested array using the function array_repeat as discussed above and then flatten the nested array.

Output:

+-----------------------------------+------------------------------+ 
|repeat |result |
+-----------------------------------+------------------------------+
|[[1, 2, 3, 7, 7], [1, 2, 3, 7, 7]] |[1, 2, 3, 7, 7, 1, 2, 3, 7, 7]|
|[[3, 2, 8, 9, 4], [3, 2, 8, 9, 4]] |[3, 2, 8, 9, 4, 3, 2, 8, 9, 4]|
|[[4, 5, 2, 8], [4, 5, 2, 8]] |[4, 5, 2, 8, 4, 5, 2, 8] |
+----------------------------------------+-------------------------+

Column result contains all the values from an array of arrays from column repeat but in a single array.

map_from_arrays

This function creates a map column. Elements of the first column will be used for keys and second column will be used for values.

Output:

+------------------+------------------+--------------------+ 
| array_col1| array_col2| result|
+------------------+------------------+--------------------+
| [4, 6, 7, 9, 2]| [1, 2, 3, 7, 7]|[4 -> 1, 6 -> 2, ...|
|[7, 5, 1, 4, 7, 1]|[3, 2, 8, 9, 4, 9]|[7 -> 3, 5 -> 2, ...|
| [3, 8, 5, 3]| [4, 5, 2, 8]|[3 -> 4, 8 -> 5, ...|
+------------------+------------------+--------------------+

Column result contains the map generated from both the input arrays. The first element in first row is 4 -> 1 where ‘4’ is a key which is the first element from first column array_col1 and ‘1’ is the key’s value which is the first element from second column array_col2.

reverse

This function reverses the order of elements in input array.

Output:

+----+------------------+------------------+ 
|col1| array_col2| result|
+----+------------------+------------------+
| x| [1, 2, 3, 7, 7]| [7, 7, 3, 2, 1]|
| z|[3, 2, 8, 9, 4, 9]|[9, 4, 9, 8, 2, 3]|
| a| [4, 5, 2, 8]| [8, 2, 5, 4]|
+----+------------------+------------------+

Column result contains the reverse of array present in column array_col2. For e.g in first row, result contains [7, 7, 3, 2, 1] which is reverse of array [1, 2, 3, 7, 7] from column aray_col2 .

shuffle

This function shuffles the elements of array randomly.

Output:

+----+------------------+------------------+ 
|col1| array_col2| result|
+----+------------------+------------------+
| x| [1, 2, 3, 7, 7]| [2, 7, 1, 7, 3]|
| z|[3, 2, 8, 9, 4, 9]|[3, 8, 9, 4, 9, 2]|
| a| [4, 5, 2, 8]| [8, 4, 2, 5]|
+----+------------------+------------------+

Column result contains shuffled elements from column array_col2. In other words, order of elements in result column is random. For example, in the first row the result column contains [2, 7, 1, 7, 3] which is the shuffled output of array [1, 2, 3, 7, 7] from column array_col2.

Note: Try executing the shuffle function multiple times. The order of values in the result column will be different for each execution.

size

This function returns a number of elements in an array or map.

Output:

+----+------------------+------+ 
|col1| array_col2|result|
+----+------------------+------+
| x| [1, 2, 3, 7, 7]| 5|
| z|[3, 2, 8, 9, 4, 9]| 6|
| a| [4, 5, 2, 8]| 4|
+----+------------------+------+

Column result contains the size(number of elements) of an array in column array_col2. For example, in the first row the result column contains ‘5’ because number of elements in [1, 2, 3, 7, 7] is 5.

slice

This function slices the array into a sub-array. We can specify the start of the index as second argument and number of elements as third argument.

Note: Arrays in spark start with index 1. It also supports negative indexing to access the elements from last.

Let’s try to create a sub-array of 3 elements starting from index 2.

Output:

+----+------------------+---------+ 
|col1| array_col2| result|
+----+------------------+---------+
| x| [1, 2, 3, 7, 7]|[2, 3, 7]|
| z|[3, 2, 8, 9, 4, 9]|[2, 8, 9]|
| a| [4, 5, 2, 8]|[5, 2, 8]|
+----+------------------+---------+

In first row, result contains sub-array [2, 3, 7] which is created with 3 elements from index 2 in [1, 2, 3, 7, 7].

sort_array

This function sorts the array in ascending order by default. However, we can sort in descending order with second arg as asc=false.

Output:

+----+------------------+------------------+ 
|col1| array_col2| result|
+----+------------------+------------------+
| x| [1, 2, 3, 7, 7]| [7, 7, 3, 2, 1]|
| z|[3, 2, 8, 9, 4, 9]|[9, 9, 8, 4, 3, 2]|
| a| [4, 5, 2, 8]| [8, 5, 4, 2]|
+----+------------------+------------------+

Column result is sorted in descending order. For example, in the first row the result column contains [7, 7, 3, 2, 1] which is the descending sorted result of array[1, 2, 3, 7, 7] from column array_col2 .

explode

This function creates a new row for each element of an array or map.

Let’s first create new column with fewer values to explode.

Output:

+----+---------+ 
|col1|slice_col|
+----+---------+
| x| [1, 2]|
| z| [3, 2]|
| a| [4, 5]|
+----+---------+

slice_col contains 2 elements in an array. So upon explode, this generates 2 rows for each array.

Now let's try to explode the column slice_col.

Output:

+----+---------+------+ 
|col1|slice_col|result|
+----+---------+------+
| x| [1, 2]| 1|
| x| [1, 2]| 2|
| z| [3, 2]| 3|
| z| [3, 2]| 2|
| a| [4, 5]| 4|
| a| [4, 5]| 5|
+----+---------+------+

Upon explode, 2 rows are generated for each element of an array in column slice_col.

posexplode

This function creates a new row for each element with position of an array or map.

Let’s first create new column with fewer values to explode.

Now let’s try to explode the slice_col with a position as well.

Output:

+----+---------+---+---+ 
|col1|slice_col|pos|col|
+----+---------+---+---+
|x |[1, 2] |0 |1 |
|x |[1, 2] |1 |2 |
|z |[3, 2] |0 |3 |
|z |[3, 2] |1 |2 |
|a |[4, 5] |0 |4 |
|a |[4, 5] |1 |5 |
+----+---------+---+---+

Here 2 more columns got generated as pos (represent position) and col (represent element value).