Wednesday, August 31, 2016

Mr 3

MR Lab3
grouping By Multiple columns.

ex:

select dno, sex, sum(sal) from emp
   group by dno, sex;

DnoSexSalMap.java
--------------------
package mr.analytics;

import java.io.IOException;

import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class DnoSexSalMap extends

Mapper
<LongWritable,Text,Text,IntWritable>
{
     //  file : emp
     // schema : id,name,sal,sex,dno
    //  delimiter : "," (comma)
//  sample row : 101,amar,20000,m,11
//   sex as key, sal as value.
    public void map(LongWritable 
k,Text v,
            Context  con)
     throws IOException,
InterruptedException
     {
        String line =
v.toString();
      String[] w = line.split(",");   
      String sex = w[3];
      String dno = w[4];
      String myKey = dno+"\t"+sex;
     int sal =Integer.parseInt(w[2]);
    con.write(new Text(myKey),new
IntWritable(sal));
     }
  }

----------------
Driver8.java
----------------

package mr.analytics;

import

org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import

org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import

org.apache.hadoop.mapreduce.lib.input.F

ileInputFormat;
import

org.apache.hadoop.mapreduce.lib.output.

FileOutputFormat;

public class Driver8
{
    public static void main(String

[] args)
     throws Exception
     {
        Configuration c = new

Configuration();
        Job j = new Job

(c,"d8");
        j.setJarByClass

(Driver8.class);
        j.setMapperClass

(DnoSexSalMap.class);
        j.setReducerClass

(RedForSum.class);
        j.setOutputKeyClass

(Text.class);
        j.setOutputValueClass

(IntWritable.class);
         Path p1 = new Path

(args[0]); //input
         Path p2 = new Path

(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ?

0:1);
  }
}

--------------------------

submit:

[training@localhost ~]$ hadoop fs -cat

mrlab/r8/part-r-00000
11      f       25000
11      m       26000
12      f       18000
13      m       19000

______________________________

Spark lab1

spark lab1 : Spark Aggregations : map, flatMap, sc.textFile(), reduceByKey(), groupByKey()
spark Lab1:
___________
[cloudera@quickstart ~]$ cat > comment
i love hadoop
i love spark
i love hadoop and spark
[cloudera@quickstart ~]$ hadoop fs -mkdir spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal comment spark

Word Count using spark:

scala> val r1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/comment")

scala> r1.collect.foreach(println)

scala> val r2 = r1.map(x => x.split(" "))

scala> val r3 = r2.flatMap(x => x)

instead of writing r2 and r3.

scala> val words  = r1.flatMap(x =>
     |    x.split(" ") )

scala> val wpair = words.map( x =>
     |    (x,1) )

scala> val wc = wpair.reduceByKey((x,y) => x+y)

scala> wc.collect

scala> val wcres = wc.map( x =>
     |     x._1+","+x._2 )

scala> wcres.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/spark/results2")

[cloudera@quickstart ~]$ cat emp
101,aa,20000,m,11
102,bb,30000,f,12
103,cc,40000,m,11
104,ddd,50000,f,12
105,ee,60000,m,12
106,dd,90000,f,11
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal emp spark
[cloudera@quickstart ~]$

scala> val e1 = sc.textFile("/user/cloudera/spark/emp")

scala> val e2 = e1.map(_.split(","))

scala> val epair = e2.map( x=>
     |   (x(3), x(2).toInt ) )

scala> val res = epair.reduceByKey(_+_)
res: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:24

scala> res.collect.foreach(println)
(f,170000)
(m,120000)

scala> val resmax = epair.reduceByKey(
     |    (x,y) => Math.max(x,y))

scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> resmax.collect.foreach(println)
(f,90000)
(m,60000)

scala> resmin.collect.foreach(println)
(f,30000)
(m,20000)

scala> val grpd = epair.groupByKey()

scala> val resall = grpd.map(x =>
     |  (x._1, x._2.sum,x._2.size,x._2.max,x._2.min,x._2.sum/x._2.size) )
scala> resall.collect.foreach(println)

Spark lab2

Spark Lab2
scala> val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/emp")

scala> val earray = emp.map(x=> x.split(","))
earray: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14

scala> earray.collect

Array[Array[String]] = Array(Array(101, aa, 20000, m, 11), Array(102, bb, 30000, f, 12), Array(103, cc, 40000, m, 11), Array(104, ddd, 50000, f, 12), Array(105, ee, 60000, m, 12), Array(106, dd, 90000, f, 11))

scala> val epair = earray.map( x =>
     |   (x(4), x(2).toInt))

scala> val ressum = epair.reduceByKey(_+_)

scala> val resmax = epair.reduceByKey(Math.max(_,_))

scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> ressum.collect.foreach(println)
(12,140000)
(11,150000)

scala> val grpByDno =
      epair.groupByKey()

scala> grpByDno.collect
Array[(String, Iterable[Int])] = Array((12,CompactBuffer(30000, 50000, 60000)), (11,CompactBuffer(20000, 40000, 90000)))

scala> val resall = grpByDno.map(x =>
          x._1+"\t"+
          x._2.sum+"\t"+
         x._2.size+"\t"+
         x._2.sum/x._2.size+"\t"+
         x._2.max+"\t"+
         x._2.min  )

12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000

[cloudera@quickstart ~]$ hadoop fs -cat spark/today1/part-00000
12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000
[cloudera@quickstart ~]$

____________________________________

aggregations by multiple grouping.

ex: equivalant sql/hql query.

select dno, sex , sum(sal) from emp
  group by dno, sex;
---
scala> val DnoSexSalPair = earray.map(
     |   x => ((x(4),x(3)),x(2).toInt) )
scala> DnoSexSalPair.collect.foreach(println)

((11,m),20000)
((12,f),30000)
((11,m),40000)
((12,f),50000)
((12,m),60000)
((11,f),90000)

scala> val rsum = DnoSexSalPair.reduceByKey(_+_)

scala> rsum.collect.foreach(println)

((11,f),90000)
((12,f),80000)
((12,m),60000)
((11,m),60000)

scala> val rs = rsum.map( x =>
     x._1._1+"\t"+x._1._2+"\t"+
                x._2 )
 
scala> rs.collect.foreach(println)

11 f 90000
12 f 80000
12 m 60000
11 m 60000

_______________________________________

grouping by multiple columns, and multiple aggregations.

Assignment:

select dno, sex, sum(sal), max(sal) ,
  min(sal), count(*), avg(sal)
   from emp group by dno, sex;

val grpDnoSex =
    DnoSexSalPair.groupByKey();

val r = grpDnoSex.map( x =>
     x._1._1+"\t"+
           x._1._2+"\t"+
    x._2.sum+"\t"+
    x._2.max+"\t"+
x._2.min+"\t"+
    x._2.size+"\t"+
x._2.sum/x._2.size  )
r.collect.foreach(println)

11 f 90000 90000 90000 1 90000
12 f 80000 50000 30000 2 40000
12 m 60000 60000 60000 1 60000
11 m 60000 40000 20000 2 30000
______________________________________

   
   
   

Mr lab1

MR Lab1 : WordCount

[training@localhost ~]$ cat > comment
i love hadoop
i love java
i hate coding
[training@localhost ~]$ hadoop fs -mkdir mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal comment mrlab
[training@localhost ~]$ hadoop fs -cat mrlab/comment
i love hadoop
i love java
i hate coding
[training@localhost ~]$

___________________________

Eclipse steps :

create java project:

file--> new --> java project
  ex: MyMr

configure jar files.

MyMr--> src --> Build Path --> Configure build Path
      ---> Libraries ---> add external jar files.

  select folloing jars.
   /usr/lib/hadoop-*.*/hadoop-core.jar
  /usr/lib/hadoop-*.*/lib/commons-cli-*.*.jar

create package :

MyMr --> src --> new --> package
ex: mr.analytics

create java class :

MyMr --> src --> mr.analytics --> new --> class
  ex: WordMap

__________________________________

WordMap.java:
is a mapper program, which writes word as key and 1 as value.
----------------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.util.StringTokenizer;
public class WordMap extends Mapper
<LongWritable,Text,Text,IntWritable>
{
     // i love hadoop
    public void map(LongWritable  k,Text v,
            Context  con)
     throws IOException, InterruptedException
     {
        String line = v.toString();
    StringTokenizer t = new StringTokenizer(line);
     while(t.hasMoreTokens())
     {
         String word = t.nextToken();
         con.write(new Text(word),new IntWritable(1));
     }
  }
}
---------------------
RedForSum.java
  is Reducer program, which will give sum aggregation.

----------------------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RedForSum extends Reducer
<Text,IntWritable,Text,IntWritable>
{
    //   i   <1,1,1>   
   public void reduce(Text k,Iterable<IntWritable> vlist,
            Context con)
   throws IOException, InterruptedException
   {
       int tot=0;
       for(IntWritable v: vlist)
          tot+=v.get();
       con.write(k, new IntWritable(tot));
   }
}
---------------------------

Driver1.java

is Driver program, to call mapper and reducer
-----------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver1
{
    public static void main(String[] args)
     throws Exception
     {
        Configuration c = new Configuration();
        Job j = new Job(c,"MyFirst");
        j.setJarByClass(Driver1.class);
        j.setMapperClass(WordMap.class);
        j.setReducerClass(RedForSum.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
         Path p1 = new Path(args[0]); //input
         Path p2 = new Path(args[1]); //output
       
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ? 0:1);
  }
}
---------------------

Now export all these 3 classes into jar file.

MyMr --> export --> jar -->java jar
  ex: /home/training/Desktop/myapp.jar

-------------------------------

submitting mr job:

[training@localhost ~]$ hadoop jar Desktop/myapp.jar \
>  mr.analytics.Driver1 \
>  mrlab/comment \
>  mrlab/res1

[training@localhost ~]$ hadoop fs -ls mrlab/res1
Found 3 items
-rw-r--r--   1 training supergroup          0 2016-08-30 05:27

/user/training/mrlab/res1/_SUCCESS
drwxr-xr-x   - training supergroup          0 2016-08-30 05:26

/user/training/mrlab/res1/_logs
-rw-r--r--   1 training supergroup         58 2016-08-30 05:27

/user/training/mrlab/res1/part-r-00000

[training@localhost ~]$ hadoop fs -cat mrlab/res1/part-r-00000
coding  1
hadoop  1
hate    1
i       3
java    1
love    2

Spark lab1

spark lab1 : Spark Aggregations : map, flatMap, sc.textFile(), reduceByKey(), groupByKey()
spark Lab1:
___________
[cloudera@quickstart ~]$ cat > comment
i love hadoop
i love spark
i love hadoop and spark
[cloudera@quickstart ~]$ hadoop fs -mkdir spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal comment spark

Word Count using spark:

scala> val r1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/comment")

scala> r1.collect.foreach(println)

scala> val r2 = r1.map(x => x.split(" "))

scala> val r3 = r2.flatMap(x => x)

instead of writing r2 and r3.

scala> val words  = r1.flatMap(x =>
     |    x.split(" ") )

scala> val wpair = words.map( x =>
     |    (x,1) )

scala> val wc = wpair.reduceByKey((x,y) => x+y)

scala> wc.collect

scala> val wcres = wc.map( x =>
     |     x._1+","+x._2 )

scala> wcres.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/spark/results2")

[cloudera@quickstart ~]$ cat emp
101,aa,20000,m,11
102,bb,30000,f,12
103,cc,40000,m,11
104,ddd,50000,f,12
105,ee,60000,m,12
106,dd,90000,f,11
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal emp spark
[cloudera@quickstart ~]$

scala> val e1 = sc.textFile("/user/cloudera/spark/emp")

scala> val e2 = e1.map(_.split(","))

scala> val epair = e2.map( x=>
     |   (x(3), x(2).toInt ) )

scala> val res = epair.reduceByKey(_+_)
res: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:24

scala> res.collect.foreach(println)
(f,170000)
(m,120000)

scala> val resmax = epair.reduceByKey(
     |    (x,y) => Math.max(x,y))

scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> resmax.collect.foreach(println)
(f,90000)
(m,60000)

scala> resmin.collect.foreach(println)
(f,30000)
(m,20000)

scala> val grpd = epair.groupByKey()

scala> val resall = grpd.map(x =>
     |  (x._1, x._2.sum,x._2.size,x._2.max,x._2.min,x._2.sum/x._2.size) )
scala> resall.collect.foreach(println)

Monday, August 29, 2016

Spark Word count

spark word count
[cloudera@quickstart ~]$ cat as.txt
i love hadoop
i love spark
o love hadoop and spark

scala> val rdd = sc.textFile("/user1/as.txt")
rdd: org.apache.spark.rdd.RDD[String] = /user1/as.txt MapPartitionsRDD[42] at textFile at <console>:31

scala> val rdd1 =rdd.flatMap(x=>x.split(" "))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[43] at flatMap at <console>:33

scala> val rdd2 =rdd1.map(x=>(x,1))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[44] at map at <console>:35

scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:33

scala> val rdd4 = rdd3.map(x => x._1 + " " +x._2).collect()
rdd4: Array[String] = Array(love 3, spark 2, hadoop 2, i 2, o 1, and 1)

Tuesday, August 23, 2016

spark json

[cloudera@quickstart ~]$ vi a.json
[cloudera@quickstart ~]$ cat a.json
{"name":"shr","age":23,"loc":"hyd"}
{"name":"kumar","age" :24,"loc":"del"}

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sql= new SQLContext(sc)
sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@410438f6

scala> val df =sql.read.json("/user1/a.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, loc: string, name: string]

scala> df.show()
+---+---+-----+
|age|loc| name|
+---+---+-----+
| 23|hyd|  shr|
| 24|del|kumar|
+---+---+-----+

scala> df.select("name").show()
+-----+
| name|
+-----+
|  shr|
|kumar|
+-----+


scala> df.select(df("age")+1).show()
+---------+
|(age + 1)|
+---------+
|       24|
|       25|
+---------+

scala> df.first()
res10: org.apache.spark.sql.Row = [23,hyd,shr]

scala> df.take(1)
res12: Array[org.apache.spark.sql.Row] = Array([23,hyd,shr])

scala> df.take(2)
res13: Array[org.apache.spark.sql.Row] = Array([23,hyd,shr], [24,del,kumar])

scala> df.sort().show()
+---+---+-----+
|age|loc| name|
+---+---+-----+
| 23|hyd|  shr|
| 24|del|kumar|
+---+---+-----+


scala> df.filter(df("age")>23).show()
+---+---+-----+
|age|loc| name|
+---+---+-----+
| 24|del|kumar|
+---+---+-----+


scala> df.describe().show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|              23.5|
| stddev|0.7071067811865476|
|    min|                23|
|    max|                24|
+-------+------------------+

scala> df.agg(avg("age")).show()
+--------+
|avg(age)|
+--------+
|    23.5|
+--------+



{"name":"kumar","age" :24,"loc":"del","sal":4000,"sex":"f"}
{"name":"aaaa","age" :24,"loc":"wgl","sal":5000,"sex":"m"}
{"name":"bbbb","age" :24,"loc":"del","sal":1000,"sex":"m"}
{"name":"ccccc","age" :24,"loc":"pune","sal":5000,"sex":"f"}
{"name":"ddddddd","age" :24,"loc":"del","sal":3000,"sex":"m"}
{"name":"eeeee","age" :24,"loc":"hyd","sal":2000,"sex":"f"}




[cloudera@quickstart ~]$ vi a.json
[cloudera@quickstart ~]$ cat a.json
{"name":"kumar","age" :24,"loc":"del","sal":4000,"sex":"f"}
{"name":"aaaa","age" :34,"loc":"wgl","sal":5000,"sex":"m"}
{"name":"bbbb","age" :25,"loc":"del","sal":1000,"sex":"m"}
{"name":"ccccc","age" :21,"loc":"pune","sal":5000,"sex":"f"}
{"name":"ddddddd","age" :44,"loc":"del","sal":3000,"sex":"m"}
{"name":"eeeee","age" :30,"loc":"hyd","sal":2000,"sex":"f"}

[cloudera@quickstart ~]$ hadoop fs -put a.json /user1/


scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sql= new SQLContext(sc)
sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@60ae0f04

scala> val df =sql.read.json("/user1/a.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, loc: string, name: string, sal: bigint, sex: string]

scala> df.show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 25| del|   bbbb|1000|  m|
| 21|pune|  ccccc|5000|  f|
| 44| del|ddddddd|3000|  m|
| 30| hyd|  eeeee|2000|  f|
+---+----+-------+----+---+


scala> df.select("age").show()
+---+
|age|
+---+
| 24|
| 34|
| 25|
| 21|
| 44|
| 30|
+---+

scala> df.filter(df("age")<30).show()
+---+----+-----+----+---+
|age| loc| name| sal|sex|
+---+----+-----+----+---+
| 24| del|kumar|4000|  f|
| 25| del| bbbb|1000|  m|
| 21|pune|ccccc|5000|  f|
+---+----+-----+----+---+

scala> df.groupBy("sex").count().show()
+---+-----+                                                                    
|sex|count|
+---+-----+
|  f|    3|
|  m|    3|
+---+-----+

scala> df.where(df("age")<25).show()
+---+----+-----+----+---+
|age| loc| name| sal|sex|
+---+----+-----+----+---+
| 24| del|kumar|4000|  f|
| 21|pune|ccccc|5000|  f|
+---+----+-----+----+---+


scala> val e= df.toJSON
e: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[269] at toJSON at <console>:38

scala> e.collect()
res73: Array[String] = Array({"age":24,"loc":"del","name":"kumar","sal":4000,"sex":"f"}, {"age":34,"loc":"wgl","name":"aaaa","sal":5000,"sex":"m"}, {"age":25,"loc":"del","name":"bbbb","sal":1000,"sex":"m"}, {"age":21,"loc":"pune","name":"ccccc","sal":5000,"sex":"f"}, {"age":44,"loc":"del","name":"ddddddd","sal":3000,"sex":"m"}, {"age":30,"loc":"hyd","name":"eeeee","sal":2000,"sex":"f"})



scala> df.describe().show()
+-------+------------------+------------------+                                
|summary|               age|               sal|
+-------+------------------+------------------+
|  count|                 6|                 6|
|   mean|29.666666666666668|3333.3333333333335|
| stddev| 8.406346808612327|1632.9931618554522|
|    min|                21|              1000|
|    max|                44|              5000|
+-------+------------------+------------------+

scala> df.agg(max("sal")).show()
+--------+
|max(sal)|
+--------+
|    5000|
+--------+

scala> df.sort().show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 44| del|ddddddd|3000|  m|
| 30| hyd|  eeeee|2000|  f|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 25| del|   bbbb|1000|  m|
| 21|pune|  ccccc|5000|  f|
+---+----+-------+----+---+


scala> df.sort("sal").show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 44| del|ddddddd|3000|  m|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 21|pune|  ccccc|5000|  f|
+---+----+-------+----+---+

scala> df.take(2)
res62: Array[org.apache.spark.sql.Row] = Array([24,del,kumar,4000,f], [34,wgl,aaaa,5000,m])

scala> df.first()
res63: org.apache.spark.sql.Row = [24,del,kumar,4000,f]

scala> df.head(3)
res69: Array[org.apache.spark.sql.Row] = Array([24,del,kumar,4000,f], [34,wgl,aaaa,5000,m], [25,del,bbbb,1000,m])

scala> df.orderBy("age").show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 21|pune|  ccccc|5000|  f|
| 24| del|  kumar|4000|  f|
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 34| wgl|   aaaa|5000|  m|
| 44| del|ddddddd|3000|  m|
+---+----+-------+----+---+


scala> df.orderBy("sal").show()
+---+----+-------+----+---+
|age| loc|   name| sal|sex|
+---+----+-------+----+---+
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 44| del|ddddddd|3000|  m|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 21|pune|  ccccc|5000|  f|
+---+----+-------+----+---+

Sunday, August 21, 2016

Language neutralization

pig lab9 : Language neutralization
Laguage neutralization.

ex: hindi to english.

[training@localhost ~]$ gedit comments
[training@localhost ~]$ gedit dictionary
[training@localhost ~]$ hadoop fs -copyFromLocal comments pdemo
[training@localhost ~]$ hadoop fs -copyFromLocal dictionary pdemo
[training@localhost ~]$

grunt> comms = load 'pdemo/comments'
>>     as (line:chararray);
grunt> dict = load 'pdemo/dictionary'
>>     using PigStorage(',')
>>    as (hindi:chararray, eng:chararray);
grunt> words = foreach comms
>>    generate FLATTEN(TOKENIZE(line)) as word;
grunt>

grunt> describe words
words: {word: chararray}
grunt> describe dict
dict: {hindi: chararray,eng: chararray}
grunt> j = join words by word left outer,
>>    dict by hindi;
grunt> jj = foreach j generate words::word as word, dict::hindi as hindi, dict::eng as eng;
grunt> dump jj
(is,,)
(is,,)
(is,,)
(xyz,,)
(acha,acha,good)
(bacha,bacha,small)
(lucha,lucha,worst)
(hadoop,,)
(oracle,,)
____________________

grunt> rset = foreach jj generate
>>    (hindi is not null ? eng:word) as word;
grunt> dump rset

(is)
(is)
(is)
(xyz)
(good)
(small)
(worst)
(hadoop)
(oracle)

__________________________

Then we can apply sentiment on this restult set.

__________________________-

Quartely sales

Pig Lab10 : Quarterly sales Report and comparing sales with Previous Quarter. Cross .
grunt> raw = load 'piglab/sales'
>>     using PigStorage(',')
>>     as (dt:chararray, amt:int);
grunt> msales = foreach raw generate 
>>      SUBSTRING(dt,0,2)  as m, amt ;
grunt> describe msales;
msales: {m: chararray,amt: int}
grunt> msales = foreach msales generate
>>         (int)m, amt;
grunt> describe msales;               
msales: {m: int,amt: int}
grunt> dump msales;

grunt> qsales = foreach msales generate
>>       (m<4 ? 1:
>>         (m<7 ? 2:
>>           (m<10 ? 3:4))) as q, amt ;
grunt> dump qsales

grunt> qsales = foreach msales generate
>>       (m<4 ? 1:
>>         (m<7 ? 2:
>>           (m<10 ? 3:4))) as q, amt ;
grunt> dump qsales

grunt> rep2 = foreach rep generate *;
grunt> cr = cross rep, rep2;
grunt> describe cr
cr: {rep::q: int,rep::tot: long,rep2::q: int,rep2::tot: long}
grunt> cr = foreach cr generate
>>     rep::q as q1, rep2::q as q2,
>>     rep::tot as tot1, rep2::tot as tot2;

grunt> describe cr;
cr: {q1: int,q2: int,tot1: long,tot2: long}
grunt> dump cr;

grunt> fltd = filter cr by ((q1-q2)==1);
grunt> dump fltd;

grunt> res = foreach fltd generate *,
>>        ((tot1-tot2)*100)/tot2 as pgrowth;
grunt> dump res

(3,2,70000,355000,-80)
(2,1,355000,140000,153)
(4,3,290000,70000,314)

_________________________________

Executing scripts

Pig Lab 11: Executing Scripts and UDFs.

Excecuting scripts(pig) :-
____________________________

   three commands (operators) are used to execute scripts.

  i) pig
  ii) exec. 
  iii) run.

i) Pig:-
 
   to execute script from Command Line(operating sys).

  $ pig  script1.pig
--> script will be executed,
   but relation aliases are not available with grunt shell. so that we  can not reuse them.

ii) exec:

--> to execute script from grunt shell.
still aliases will not be available with grunt.
   so "No reuse".

grunt> exec  script1.pig

__________________________________

iii) run:

---> to execute script from grunt shell,
   Aliases will be available with grunt. So we can reuse them.

grunt> run script1.pig
______________________________

run:
  adv --> aliases will be available.
  disadv --> overriding previous aliases with same name.

exec:
   adv --> aliases will not be available.
    so no -0verriding.
  disadv --> no reusability.

pig :
  adv --->
      -- used for production operators.
     --- can be called other evenvironments , like shell script.

disadv --> aliases will not be reflected into grunt.

_________________________________

Pig Udfs:
_____________j

   User defined functions.

   adv:
   i) Custom functionality.
   ii) Reusabilty .

Udf life cycle:

step 1) Develop UDF class
step 2) Export into jar file.
step 3) register jar file into pig.
step 4) create temporary function for  the  UDF class.
step 5) call the function.
__________________________

[training@localhost ~]$ cat > samp1
101,ravi
102,mani
103,Deva
104,Devi
105,AmAr
[training@localhost ~]$ hadoop fs -copyFromLocal samp1  piglab
[training@localhost ~]$

grunt> s = load 'piglab/samp1'
>>    using PigStorage(',')
>>   as (id:int, name:chararray);

eclipse navigations:
i) create java project.

file --> new --> java project.
  ex: PigDemo

ii) create package>

  pigDemo ---> new ---> package.

    ex:    pig.test

  iii) configure pig jar.

src -- build path --> configure build path --> libraries ---> add external jars.
 
  /usr/lib/pig/pig-core.jar

  iv) create jar class

pig.test ---> new --> class

   FirstUpper

  v)  export into jar.

pigdemo ---> export --> java --java jar --
    /home/training/Desktop/pigudfs.jar

_____________________
package pig.test;
import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class FirstUpper  extends EvalFunc<String>
{
  public String   exec(Tuple v) throws IOException
  {    //   raVI  --> Ravi

     String name = (String)v.get(0);
   String fc = name.substring(0,1).toUpperCase();
   String rc = name.substring(1).toLowerCase();
   String n = fc+rc;
   return n;
  }

}

grunt> register  Desktop/pigudfs.jar;

grunt> define cconvert pig.test.FirstUpper();

grunt> r = foreach s generate
>>       id, cconvert(name) as name;

grunt> dump r;

______________________________

[training@localhost ~]$ cat > f1
100     200     120
300     450     780
120     56      90
1000    3456    789
[training@localhost ~]$ hadoop fs -copyFromLocal f1 piglab
[training@localhost ~]$

task:
  write udf , to find max value for a row.

package pig.test;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class RowMax extends EvalFunc<Integer>
{
public Integer  exec(Tuple v) throws IOException
{
  int a = (Integer)v.get(0);
        int b = (Integer)v.get(1);
        int c = (Integer)v.get(2);
  int big =0; 
  if (a>big) big=a;
  if (b>big) big=b;
  if (c>big) big=c;
  return  new Integer(big);
}

}

export into jar.

    /home/training/Desktop/pigudfs.jar

grunt> s1 = load 'piglab/f1'
>>     as (a:int, b:int, c:int);
grunt> register Desktop/pigudfs.jar;
grunt> define rowmax pig.test.RowMax();
grunt> r1 = foreach s1 generate  *,
>>           rowmax(*) as rmax;
grunt> dump r1
(100,200,120,200)
(300,450,780,780)
(120,56,90,120)
(1000,3456,789,3456)

[training@localhost ~]$ cat f2
-10,-30,-56,-23,-21,-5
1,2,3,45,67,9
[training@localhost ~]$ hadoop fs -copyFromLocal f2 piglab
[training@localhost ~]$

package pig.test;

import java.io.IOException;
import java.util.List;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class DynRowMax  extends EvalFunc<Integer>
{
public Integer  exec(Tuple v) throws IOException
{
  List<Object>  olist  =  v.getAll();
   int max = 0;//  10  30  20
   int cnt=0;
   for( Object o : olist){
       cnt++;
       int val= (Integer)o;
       if (cnt==1)  max = val;
                max = Math.max(val, max);
   }
  return new Integer(max);
}

}

export into jar   /home/training/Desktop/pigudfs.jar

grunt> register Desktop/pigudfs.jar;
grunt> define dynmax pig.test.DynRowMax();
grunt> ss = load 'piglab/f2'   
>>    using PigStorage(',')
>>   as (a:int, b:int, c:int, d:int,
>>     e:int, f:int);
grunt> define  rmax pig.test.RowMax();
grunt> rr = foreach ss generate *,
>>      dynmax(*) as max;
grunt> dump rr

Flame lab

Flume Lab
********************************
sreeram hadoop notes
sreeram flume notes for practice

conf/agent1.conf
_______________________

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1

# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/myFlume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

############################################
submit above flow using following command

[cloudera@quickstart ~]$ hadoop fs -mkdir myFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent1.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls myFlume

note: By default sink will write in  sequence format

The Risk in above flow is, if channel failed or channel system down,
data will be missed.  to provide fault tolerence for channel use following flow.

************************************

sreeram hadoop notes:
sreeram flume notes for practice:

conf/agent2.conf
___________________

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1 s2
a1.channels = c1 c2

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1

# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume

a1.sinks.s2.type = hdfs
a1.sinks.s2.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/urFlume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1200
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1 c2
a1.sinks.s1.channel = c1
a1.sinks.s2.channel = c2

##################################

[cloudera@quickstart ~]$ hadoop fs -mkdir urFlume
[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent2.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls urFlume

from above flow, src1 is writing into c1 and c2 channels,
  if one channel fails, still data available in another.
but if no failure happend data will be duplicated in target hadoop directory.
so before processing data, we need eliminated duplicate records.

*****************************************

sreeram hadoop notes:
sreeram flume notes for practice:
Task --> importing from Hive Table

conf/agent3.conf
______________________________
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=hive -e 'select * from mraw'

# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/ourFlume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

#############################################

[cloudera@quickstart ~]$ hive
hive> create table mraw(line string);
OK
Time taken: 2.218 seconds
hive> load data local inpath 'f1' into table mraw;
hive> select * from mraw limit 5;
OK
aaaaaaaaaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
Time taken: 0.341 seconds, Fetched: 5 row(s)
hive>

[cloudera@quickstart ~]$ hadoop fs -mkdir ourFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent3.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls ourFlume

in all above cases , output will be in sequence file format.

******************************************

sreeram hadoop notes:
sreeram flume notes for practice:

conf/agent4.conf
________________________________


# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = src1
a1.sinks = s1
a1.channels = c1

# Describe/configure source1
a1.sources.src1.type = exec
a1.sources.src1.shell = /bin/bash -c
a1.sources.src1.command=cat f1

# Describe sin
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.path=hdfs://quickstart.cloudera/user/cloudera/naFlume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1200
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

######################################

above flow will write output in Text Format.

[cloudera@quickstart ~]$ hadoop fs -mkdir naFlume

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file conf/agent4.conf --name a1 -Dflume.root.logger=INFO,console

[cloudera@quickstart ~]$ hadoop fs -ls naFlume

[cloudera@quickstart ~]$ hadoop fs -cat naFlume/FlumeData.1471351131067
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
cccccccccccccccccccccccccccccc
ddddddddddddddddddddddddddd
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
ddddddddddddddddddddddddddddd
ccccccccccccccccccccccccccccccc
cccccccccccccccccccccccccccccc
ccccccccccccccccccccccccccccccc

above output is in Text Format.

******************************************

Spark SQL and hql

Spark Sql and Hql
[cloudera@quickstart ~]$ sudo find / -name

'hive-site.xml'

[cloudera@quickstart ~]$ sudo chmod -R 777

/usr/lib/spark/conf
[cloudera@quickstart ~]$ cp

/etc/hive/conf.dist/hive-site.xml

/usr/lib/spark/conf
_____________________________________
from hive-site.xml --> 

hive.metastore.warehouse.dir

from spark 2.0.0  onwards above opt is

depricated
    use following option..

------>   spark.sql.warehouse.dir
_____________________________________________

____  [ tested in cloudera 5.8
          spark version 1.6.0 ]

[cloudera@quickstart ~]$ls

/usr/lib/hue/apps/beeswax/data/sample_07.csv

[cloudera@quickstart ~]$ head -n 2

/usr/lib/hue/apps/beeswax/data/sample_07.csv
_____________________
val hq  = new 

org.apache.spark.sql.hive.HiveContext(sc)

hq.sql("create database sparkdb")

  hq.sql("CREATE TABLE sample_07 (code 

string,description string,total_emp   

int,salary int) ROW FORMAT DELIMITED FIELDS 

TERMINATED BY '\t' STORED AS TextFile")

[cloudera@quickstart ~]$ hadoop fs -mkdir   

sparks

[cloudera@quickstart ~]$ hadoop fs - 

copyFromLocal 

/usr/lib/hue/apps/beeswax/data/sample_07.csv 

sparks
[cloudera@quickstart ~]$ hadoop fs -ls   

sparks

hq.sql("LOAD DATA INPATH 

'/user/cloudera/sparks/sample_07.csv' 

OVERWRITE INTO TABLE sample_07")

val df = hq.sql("SELECT * from sample_07")

__________________________________________
scala> df.filter(df("salary") > 150000).show

()
+-------+--------------------+---------

+------+
|   code|         description|total_emp|

salary|
+-------+--------------------+---------

+------+
|11-1011|    Chief executives|   299160|

151370|
|29-1022|Oral and maxillof...|     5040|

178440|
|29-1023|       Orthodontists|     5350|

185340|
|29-1024|     Prosthodontists|      380|

169360|
|29-1061|   Anesthesiologists|    31030|

192780|
|29-1062|Family and genera...|   113250|

153640|
|29-1063| Internists, general|    46260|

167270|
|29-1064|Obstetricians and...|    21340|

183600|
|29-1067|            Surgeons|    50260|

191410|
|29-1069|Physicians and su...|   237400|

155150|
+-------+--------------------+---------

+------+
____________________________________________

Tuesday, August 16, 2016

Flume

[cloudera@quickstart ~]$ cat flumeexample.conf
a1.sources= src1
a1.channels = c1
a1.sinks = s1

a1.sources.src1.type = exec
a1.sources.src1.command = cat f1.txt

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path= hdfs://quickstart.cloudera/user1

a1.channels.c1.type = memory
a1.channels.c1.capacity =1000
a1.channels.c1.transactionCapacity =100

a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

[cloudera@quickstart ~]$ flume-ng agent     --conf conf      --conf-file flumeexample.conf      --name a1      -Dflume.root.logger=INFO,consoleflume-ng agent     --conf conf      --conf-file flumeexample.conf      --name a1      -Dflume.root.logger=INFO,console


Monday, August 15, 2016

Flume


[cloudera@quickstart ~]$ cat flumeexample.conf
a1.sources= src1
a1.channels = c1
a1.sinks = s1

a1.sources.src1.type = exec
a1.sources.src1.command = cat f1.txt

a1.sinks.s1.type=logger

a1.channels.c1.type = memory
a1.channels.c1.capacity =1000
a1.channels.c1.transactionCapacity =100

a1.sources.src1.channels = c1
a1.sinks.s1.channel = c1

[cloudera@quickstart ~]$ flume-ng agent     --conf conf      --conf-file flumeexample.conf      --name a1      -Dflume.root.logger=INFO,console