Wednesday, August 31, 2016

Spark lab2

Spark Lab2
scala> val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/emp")

scala> val earray = emp.map(x=> x.split(","))
earray: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14

scala> earray.collect

Array[Array[String]] = Array(Array(101, aa, 20000, m, 11), Array(102, bb, 30000, f, 12), Array(103, cc, 40000, m, 11), Array(104, ddd, 50000, f, 12), Array(105, ee, 60000, m, 12), Array(106, dd, 90000, f, 11))

scala> val epair = earray.map( x =>
     |   (x(4), x(2).toInt))

scala> val ressum = epair.reduceByKey(_+_)

scala> val resmax = epair.reduceByKey(Math.max(_,_))

scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> ressum.collect.foreach(println)
(12,140000)
(11,150000)

scala> val grpByDno =
      epair.groupByKey()

scala> grpByDno.collect
Array[(String, Iterable[Int])] = Array((12,CompactBuffer(30000, 50000, 60000)), (11,CompactBuffer(20000, 40000, 90000)))

scala> val resall = grpByDno.map(x =>
          x._1+"\t"+
          x._2.sum+"\t"+
         x._2.size+"\t"+
         x._2.sum/x._2.size+"\t"+
         x._2.max+"\t"+
         x._2.min  )

12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000

[cloudera@quickstart ~]$ hadoop fs -cat spark/today1/part-00000
12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000
[cloudera@quickstart ~]$

____________________________________

aggregations by multiple grouping.

ex: equivalant sql/hql query.

select dno, sex , sum(sal) from emp
  group by dno, sex;
---
scala> val DnoSexSalPair = earray.map(
     |   x => ((x(4),x(3)),x(2).toInt) )
scala> DnoSexSalPair.collect.foreach(println)

((11,m),20000)
((12,f),30000)
((11,m),40000)
((12,f),50000)
((12,m),60000)
((11,f),90000)

scala> val rsum = DnoSexSalPair.reduceByKey(_+_)

scala> rsum.collect.foreach(println)

((11,f),90000)
((12,f),80000)
((12,m),60000)
((11,m),60000)

scala> val rs = rsum.map( x =>
     x._1._1+"\t"+x._1._2+"\t"+
                x._2 )
 
scala> rs.collect.foreach(println)

11 f 90000
12 f 80000
12 m 60000
11 m 60000

_______________________________________

grouping by multiple columns, and multiple aggregations.

Assignment:

select dno, sex, sum(sal), max(sal) ,
  min(sal), count(*), avg(sal)
   from emp group by dno, sex;

val grpDnoSex =
    DnoSexSalPair.groupByKey();

val r = grpDnoSex.map( x =>
     x._1._1+"\t"+
           x._1._2+"\t"+
    x._2.sum+"\t"+
    x._2.max+"\t"+
x._2.min+"\t"+
    x._2.size+"\t"+
x._2.sum/x._2.size  )
r.collect.foreach(println)

11 f 90000 90000 90000 1 90000
12 f 80000 50000 30000 2 40000
12 m 60000 60000 60000 1 60000
11 m 60000 40000 20000 2 30000
______________________________________

   
   
   

No comments:

Post a Comment