Spark Lab2
scala> val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/emp")
scala> val earray = emp.map(x=> x.split(","))
earray: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14
scala> earray.collect
Array[Array[String]] = Array(Array(101, aa, 20000, m, 11), Array(102, bb, 30000, f, 12), Array(103, cc, 40000, m, 11), Array(104, ddd, 50000, f, 12), Array(105, ee, 60000, m, 12), Array(106, dd, 90000, f, 11))
scala> val epair = earray.map( x =>
| (x(4), x(2).toInt))
scala> val ressum = epair.reduceByKey(_+_)
scala> val resmax = epair.reduceByKey(Math.max(_,_))
scala> val resmin = epair.reduceByKey(Math.min(_,_))
scala> ressum.collect.foreach(println)
(12,140000)
(11,150000)
scala> val grpByDno =
epair.groupByKey()
scala> grpByDno.collect
Array[(String, Iterable[Int])] = Array((12,CompactBuffer(30000, 50000, 60000)), (11,CompactBuffer(20000, 40000, 90000)))
scala> val resall = grpByDno.map(x =>
x._1+"\t"+
x._2.sum+"\t"+
x._2.size+"\t"+
x._2.sum/x._2.size+"\t"+
x._2.max+"\t"+
x._2.min )
12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000
[cloudera@quickstart ~]$ hadoop fs -cat spark/today1/part-00000
12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000
[cloudera@quickstart ~]$
____________________________________
aggregations by multiple grouping.
ex: equivalant sql/hql query.
select dno, sex , sum(sal) from emp
group by dno, sex;
---
scala> val DnoSexSalPair = earray.map(
| x => ((x(4),x(3)),x(2).toInt) )
scala> DnoSexSalPair.collect.foreach(println)
((11,m),20000)
((12,f),30000)
((11,m),40000)
((12,f),50000)
((12,m),60000)
((11,f),90000)
scala> val rsum = DnoSexSalPair.reduceByKey(_+_)
scala> rsum.collect.foreach(println)
((11,f),90000)
((12,f),80000)
((12,m),60000)
((11,m),60000)
scala> val rs = rsum.map( x =>
x._1._1+"\t"+x._1._2+"\t"+
x._2 )
scala> rs.collect.foreach(println)
11 f 90000
12 f 80000
12 m 60000
11 m 60000
_______________________________________
grouping by multiple columns, and multiple aggregations.
Assignment:
select dno, sex, sum(sal), max(sal) ,
min(sal), count(*), avg(sal)
from emp group by dno, sex;
val grpDnoSex =
DnoSexSalPair.groupByKey();
val r = grpDnoSex.map( x =>
x._1._1+"\t"+
x._1._2+"\t"+
x._2.sum+"\t"+
x._2.max+"\t"+
x._2.min+"\t"+
x._2.size+"\t"+
x._2.sum/x._2.size )
r.collect.foreach(println)
11 f 90000 90000 90000 1 90000
12 f 80000 50000 30000 2 40000
12 m 60000 60000 60000 1 60000
11 m 60000 40000 20000 2 30000
______________________________________
No comments:
Post a Comment