Tuesday, August 23, 2016

spark json

[cloudera@quickstart ~]$ vi a.json
[cloudera@quickstart ~]$ cat a.json
{"name":"shr","age":23,"loc":"hyd"}
{"name":"kumar","age" :24,"loc":"del"}

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sql= new SQLContext(sc)
sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@410438f6

scala> val df =sql.read.json("/user1/a.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, loc: string, name: string]

scala> df.show()
+---+---+-----+
|age|loc| name|
+---+---+-----+
| 23|hyd|  shr|
| 24|del|kumar|
+---+---+-----+

scala> df.select("name").show()
+-----+
| name|
+-----+
|  shr|
|kumar|
+-----+


scala> df.select(df("age")+1).show()
+---------+
|(age + 1)|
+---------+
|       24|
|       25|
+---------+

scala> df.first()
res10: org.apache.spark.sql.Row = [23,hyd,shr]

scala> df.take(1)
res12: Array[org.apache.spark.sql.Row] = Array([23,hyd,shr])

scala> df.take(2)
res13: Array[org.apache.spark.sql.Row] = Array([23,hyd,shr], [24,del,kumar])

scala> df.sort().show()
+---+---+-----+
|age|loc| name|
+---+---+-----+
| 23|hyd|  shr|
| 24|del|kumar|
+---+---+-----+


scala> df.filter(df("age")>23).show()
+---+---+-----+
|age|loc| name|
+---+---+-----+
| 24|del|kumar|
+---+---+-----+


scala> df.describe().show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|              23.5|
| stddev|0.7071067811865476|
|    min|                23|
|    max|                24|
+-------+------------------+

scala> df.agg(avg("age")).show()
+--------+
|avg(age)|
+--------+
|    23.5|
+--------+



{"name":"kumar","age" :24,"loc":"del","sal":4000,"sex":"f"}
{"name":"aaaa","age" :24,"loc":"wgl","sal":5000,"sex":"m"}
{"name":"bbbb","age" :24,"loc":"del","sal":1000,"sex":"m"}
{"name":"ccccc","age" :24,"loc":"pune","sal":5000,"sex":"f"}
{"name":"ddddddd","age" :24,"loc":"del","sal":3000,"sex":"m"}
{"name":"eeeee","age" :24,"loc":"hyd","sal":2000,"sex":"f"}




[cloudera@quickstart ~]$ vi a.json
[cloudera@quickstart ~]$ cat a.json
{"name":"kumar","age" :24,"loc":"del","sal":4000,"sex":"f"}
{"name":"aaaa","age" :34,"loc":"wgl","sal":5000,"sex":"m"}
{"name":"bbbb","age" :25,"loc":"del","sal":1000,"sex":"m"}
{"name":"ccccc","age" :21,"loc":"pune","sal":5000,"sex":"f"}
{"name":"ddddddd","age" :44,"loc":"del","sal":3000,"sex":"m"}
{"name":"eeeee","age" :30,"loc":"hyd","sal":2000,"sex":"f"}

[cloudera@quickstart ~]$ hadoop fs -put a.json /user1/


scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sql= new SQLContext(sc)
sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@60ae0f04

scala> val df =sql.read.json("/user1/a.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, loc: string, name: string, sal: bigint, sex: string]

scala> df.show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 25| del|   bbbb|1000|  m|
| 21|pune|  ccccc|5000|  f|
| 44| del|ddddddd|3000|  m|
| 30| hyd|  eeeee|2000|  f|
+---+----+-------+----+---+


scala> df.select("age").show()
+---+
|age|
+---+
| 24|
| 34|
| 25|
| 21|
| 44|
| 30|
+---+

scala> df.filter(df("age")<30).show()
+---+----+-----+----+---+
|age| loc| name| sal|sex|
+---+----+-----+----+---+
| 24| del|kumar|4000|  f|
| 25| del| bbbb|1000|  m|
| 21|pune|ccccc|5000|  f|
+---+----+-----+----+---+

scala> df.groupBy("sex").count().show()
+---+-----+                                                                    
|sex|count|
+---+-----+
|  f|    3|
|  m|    3|
+---+-----+

scala> df.where(df("age")<25).show()
+---+----+-----+----+---+
|age| loc| name| sal|sex|
+---+----+-----+----+---+
| 24| del|kumar|4000|  f|
| 21|pune|ccccc|5000|  f|
+---+----+-----+----+---+


scala> val e= df.toJSON
e: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[269] at toJSON at <console>:38

scala> e.collect()
res73: Array[String] = Array({"age":24,"loc":"del","name":"kumar","sal":4000,"sex":"f"}, {"age":34,"loc":"wgl","name":"aaaa","sal":5000,"sex":"m"}, {"age":25,"loc":"del","name":"bbbb","sal":1000,"sex":"m"}, {"age":21,"loc":"pune","name":"ccccc","sal":5000,"sex":"f"}, {"age":44,"loc":"del","name":"ddddddd","sal":3000,"sex":"m"}, {"age":30,"loc":"hyd","name":"eeeee","sal":2000,"sex":"f"})



scala> df.describe().show()
+-------+------------------+------------------+                                
|summary|               age|               sal|
+-------+------------------+------------------+
|  count|                 6|                 6|
|   mean|29.666666666666668|3333.3333333333335|
| stddev| 8.406346808612327|1632.9931618554522|
|    min|                21|              1000|
|    max|                44|              5000|
+-------+------------------+------------------+

scala> df.agg(max("sal")).show()
+--------+
|max(sal)|
+--------+
|    5000|
+--------+

scala> df.sort().show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 44| del|ddddddd|3000|  m|
| 30| hyd|  eeeee|2000|  f|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 25| del|   bbbb|1000|  m|
| 21|pune|  ccccc|5000|  f|
+---+----+-------+----+---+


scala> df.sort("sal").show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 44| del|ddddddd|3000|  m|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 21|pune|  ccccc|5000|  f|
+---+----+-------+----+---+

scala> df.take(2)
res62: Array[org.apache.spark.sql.Row] = Array([24,del,kumar,4000,f], [34,wgl,aaaa,5000,m])

scala> df.first()
res63: org.apache.spark.sql.Row = [24,del,kumar,4000,f]

scala> df.head(3)
res69: Array[org.apache.spark.sql.Row] = Array([24,del,kumar,4000,f], [34,wgl,aaaa,5000,m], [25,del,bbbb,1000,m])

scala> df.orderBy("age").show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 21|pune|  ccccc|5000|  f|
| 24| del|  kumar|4000|  f|
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 34| wgl|   aaaa|5000|  m|
| 44| del|ddddddd|3000|  m|
+---+----+-------+----+---+


scala> df.orderBy("sal").show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 44| del|ddddddd|3000|  m|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 21|pune|  ccccc|5000|  f|
+---+----+-------+----+---+

No comments:

Post a Comment