Thursday, November 5, 2020
Spark DF Join
cat input.txt
transaction_id,user_name,user_type,transaction_date
1,user1,New,20-04-2018
2,user2,Privileged,20-11-2019
3,user3,Privileged,20-04-2018
4,user4,New,22-05-2019
5,user5,New,20-04-2019
6,user6,New,25-06-2018
7,user7,New,20-04-2018
8,user8,Privileged,20-04-2019
9,user9,Privileged,20-04-2018
10,user10,New,20-04-2019
cat vailadte.csv
val_date
2019
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object SparkDFJoin {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
val spark : SparkSession = SparkSession.builder().appName("SparkJoinDf").master("local[1]").getOrCreate()
import spark.implicits._
val df1 = spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("C:\\Users\\shrav\\IdeaProjects\\test\\src\\main\\scala\\input.csv")
val addcol = df1.withColumn("year",$"transaction_date".substr(7,4))
addcol.createOrReplaceTempView("userdata")
val df2= spark.read.format("csv").option("header","true").option("inferschema","true")
.load("C:\\Users\\shrav\\IdeaProjects\\test\\src\\main\\scala\\vailadtedate.csv")
df2.createOrReplaceTempView("valtable")
//spark.sqlContext.sql("select * from userdata a left join valtable b ON a.year=b.val_date").show()
/*+--------------+---------+----------+----------------+----+--------+
|transaction_id|user_name| user_type|transaction_date|year|val_date|
+--------------+---------+----------+----------------+----+--------+
| 1| user1| New| 20-04-2018|2018| null|
| 2| user2|Privileged| 20-11-2019|2019| 2019|
| 3| user3|Privileged| 20-04-2018|2018| null|
| 4| user4| New| 22-05-2019|2019| 2019|
| 5| user5| New| 20-04-2019|2019| 2019|
| 6| user6| New| 25-06-2018|2018| null|
| 7| user7| New| 20-04-2018|2018| null|
| 8| user8|Privileged| 20-04-2019|2019| 2019|
| 9| user9|Privileged| 20-04-2018|2018| null|
| 10| user10| New| 20-04-2019|2019| 2019|
+--------------+---------+----------+----------------+----+--------+
*/
spark.sqlContext.sql("select transaction_id,user_name,user_type,transaction_date from " +
"(select * from userdata a left join valtable b ON a.year=b.val_date) where val_date Is not NUll").show()
/*
+--------------+---------+----------+----------------+
|transaction_id|user_name| user_type|transaction_date|
+--------------+---------+----------+----------------+
| 2| user2|Privileged| 20-11-2019|
| 4| user4| New| 22-05-2019|
| 5| user5| New| 20-04-2019|
| 8| user8|Privileged| 20-04-2019|
| 10| user10| New| 20-04-2019|
+--------------+---------+----------+----------------+
*/
}
}
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment