Thursday, November 5, 2020
Spark Create DataFrame with Examples
In Spark,to create dataset
1. createDataFrame() and
2. toDF() methods
These methods you can create a Spark DataFrame from already existing RDD, DataFrame, Dataset, List, Seq data objects..
In Spark, createDataFrame() and toDF() methods are used to create a DataFrame, using these methods you can create a Spark DataFrame from already existing RDD, DataFrame, Dataset, List, Seq data objects.
1. Spark Create DataFrame from RDD
import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
1.1 Using toDF() function
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
o/p:
default it creates with _1 and _2 and so on for column names
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()
o/p:
root
|-- language: string (nullable = true)
|-- users: string (nullable = true)
1.2 Using Spark createDataFrame() from SparkSession
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
1.3 Using createDataFrame() with the Row type
val schema = StructType( Array(StructField("language", StringType,true),
StructField("language", StringType,true)))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)
3. Creating Spark DataFrame from CSV
Here, will see how to create from a CSV file.
val df2 = spark.read.csv("/src/resources/file.csv")
4. Creating from text (TXT) file
Here, will see how to create from a TXT file.
val df2 = spark.read
.text("/src/resources/file.txt")
5. Creating from JSON file
Here, will see how to create from a JSON file.
val df2 = spark.read
.json("/src/resources/file.json")
6. Creating from an XML file
To create DataFrame by parse XML, we should use DataSource "com.databricks.spark.xml" spark-xml api from Databricks.
com.databricks
spark-xml_2.11
0.6.0
val df = spark.read
.format("com.databricks.spark.xml")
.option("rowTag", "person")
.xml("src/main/resources/persons.xml")
7. Creating from Hive
val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql(“select * from emp”)
8. Creating from the Database table (RDBMS)
8.a) From Mysql table
Make sure you have MySQL library as a dependency in your pom.xml file or MySQL jars in your classpath.
val df_mysql = spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://localhost:port/db”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()
8.1 From DB2 table
Make sure you have DB2 library as a dependency in your pom.xml file or DB2 jars in your classpath.
val df_db2 = spark.read.format(“jdbc”)
.option(“url”, “jdbc:db2://localhost:50000/dbname”)
.option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()
Similarly, we can create DataFrame in Spark from most of the relational databases which I’ve not covered here and I will leave this to you to explore.
9. Create DataFrame from HBase table
To create Spark DataFrame from the HBase table, we should use DataSource defined in Spark HBase connectors. for example use DataSource “org.apache.spark.sql.execution.datasources.hbase” from Hortonworks or use “org.apache.hadoop.hbase.spark“from spark HBase connector.
val hbaseDF = sparkSession.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
Detail example explained at Generating DataFrame from HBase table
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment