Thursday, November 5, 2020

Spark Create DataFrame with Examples

In Spark,to create dataset 1. createDataFrame() and 2. toDF() methods These methods you can create a Spark DataFrame from already existing RDD, DataFrame, Dataset, List, Seq data objects.. In Spark, createDataFrame() and toDF() methods are used to create a DataFrame, using these methods you can create a Spark DataFrame from already existing RDD, DataFrame, Dataset, List, Seq data objects. 1. Spark Create DataFrame from RDD import spark.implicits._ val columns = Seq("language","users_count") val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000")) val rdd = spark.sparkContext.parallelize(data) 1.1 Using toDF() function val dfFromRDD1 = rdd.toDF() dfFromRDD1.printSchema() o/p: default it creates with _1 and _2 and so on for column names root |-- _1: string (nullable = true) |-- _2: string (nullable = true) val dfFromRDD1 = rdd.toDF("language","users_count") dfFromRDD1.printSchema() o/p: root |-- language: string (nullable = true) |-- users: string (nullable = true) 1.2 Using Spark createDataFrame() from SparkSession val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*) 1.3 Using createDataFrame() with the Row type val schema = StructType( Array(StructField("language", StringType,true), StructField("language", StringType,true))) val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2)) val dfFromRDD3 = spark.createDataFrame(rowRDD,schema) 3. Creating Spark DataFrame from CSV Here, will see how to create from a CSV file. val df2 = spark.read.csv("/src/resources/file.csv") 4. Creating from text (TXT) file Here, will see how to create from a TXT file. val df2 = spark.read .text("/src/resources/file.txt") 5. Creating from JSON file Here, will see how to create from a JSON file. val df2 = spark.read .json("/src/resources/file.json") 6. Creating from an XML file To create DataFrame by parse XML, we should use DataSource "com.databricks.spark.xml" spark-xml api from Databricks. com.databricks spark-xml_2.11 0.6.0 val df = spark.read .format("com.databricks.spark.xml") .option("rowTag", "person") .xml("src/main/resources/persons.xml") 7. Creating from Hive val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext) val hiveDF = hiveContext.sql(“select * from emp”) 8. Creating from the Database table (RDBMS) 8.a) From Mysql table Make sure you have MySQL library as a dependency in your pom.xml file or MySQL jars in your classpath. val df_mysql = spark.read.format(“jdbc”) .option(“url”, “jdbc:mysql://localhost:port/db”) .option(“driver”, “com.mysql.jdbc.Driver”) .option(“dbtable”, “tablename”) .option(“user”, “user”) .option(“password”, “password”) .load() 8.1 From DB2 table Make sure you have DB2 library as a dependency in your pom.xml file or DB2 jars in your classpath. val df_db2 = spark.read.format(“jdbc”) .option(“url”, “jdbc:db2://localhost:50000/dbname”) .option(“driver”, “com.ibm.db2.jcc.DB2Driver”) .option(“dbtable”, “tablename”) .option(“user”, “user”) .option(“password”, “password”) .load() Similarly, we can create DataFrame in Spark from most of the relational databases which I’ve not covered here and I will leave this to you to explore. 9. Create DataFrame from HBase table To create Spark DataFrame from the HBase table, we should use DataSource defined in Spark HBase connectors. for example use DataSource “org.apache.spark.sql.execution.datasources.hbase” from Hortonworks or use “org.apache.hadoop.hbase.spark“from spark HBase connector. val hbaseDF = sparkSession.read .options(Map(HBaseTableCatalog.tableCatalog -> catalog)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() Detail example explained at Generating DataFrame from HBase table

No comments:

Post a Comment