Thursday, November 5, 2020

Spark DF Join

cat input.txt transaction_id,user_name,user_type,transaction_date 1,user1,New,20-04-2018 2,user2,Privileged,20-11-2019 3,user3,Privileged,20-04-2018 4,user4,New,22-05-2019 5,user5,New,20-04-2019 6,user6,New,25-06-2018 7,user7,New,20-04-2018 8,user8,Privileged,20-04-2019 9,user9,Privileged,20-04-2018 10,user10,New,20-04-2019 cat vailadte.csv val_date 2019 import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession object SparkDFJoin { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.OFF) val spark : SparkSession = SparkSession.builder().appName("SparkJoinDf").master("local[1]").getOrCreate() import spark.implicits._ val df1 = spark.read.format("csv") .option("header","true") .option("inferSchema","true") .load("C:\\Users\\shrav\\IdeaProjects\\test\\src\\main\\scala\\input.csv") val addcol = df1.withColumn("year",$"transaction_date".substr(7,4)) addcol.createOrReplaceTempView("userdata") val df2= spark.read.format("csv").option("header","true").option("inferschema","true") .load("C:\\Users\\shrav\\IdeaProjects\\test\\src\\main\\scala\\vailadtedate.csv") df2.createOrReplaceTempView("valtable") //spark.sqlContext.sql("select * from userdata a left join valtable b ON a.year=b.val_date").show() /*+--------------+---------+----------+----------------+----+--------+ |transaction_id|user_name| user_type|transaction_date|year|val_date| +--------------+---------+----------+----------------+----+--------+ | 1| user1| New| 20-04-2018|2018| null| | 2| user2|Privileged| 20-11-2019|2019| 2019| | 3| user3|Privileged| 20-04-2018|2018| null| | 4| user4| New| 22-05-2019|2019| 2019| | 5| user5| New| 20-04-2019|2019| 2019| | 6| user6| New| 25-06-2018|2018| null| | 7| user7| New| 20-04-2018|2018| null| | 8| user8|Privileged| 20-04-2019|2019| 2019| | 9| user9|Privileged| 20-04-2018|2018| null| | 10| user10| New| 20-04-2019|2019| 2019| +--------------+---------+----------+----------------+----+--------+ */ spark.sqlContext.sql("select transaction_id,user_name,user_type,transaction_date from " + "(select * from userdata a left join valtable b ON a.year=b.val_date) where val_date Is not NUll").show() /* +--------------+---------+----------+----------------+ |transaction_id|user_name| user_type|transaction_date| +--------------+---------+----------+----------------+ | 2| user2|Privileged| 20-11-2019| | 4| user4| New| 22-05-2019| | 5| user5| New| 20-04-2019| | 8| user8|Privileged| 20-04-2019| | 10| user10| New| 20-04-2019| +--------------+---------+----------+----------------+ */ } }

No comments:

Post a Comment