Sunday, March 5, 2017

MR Lab9

MR Lab9 : Mapper Only Functionalities

Mapper Only functionality.
--------------------------

  row filter:

    ex: select * from emp where sex = 'm';

  for this reducer is not required.

     we need to suspend the reducer..
   ex:
        j.setNumReduceTasks(0);

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RowFilterMap extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select * from emp where sex ="m";
      String line = v.toString();
      String[] w = line.split(",");
      String sex = w[3];
      if(sex.matches("m"))
         con.write( v , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(RowFilterMap.class);
// j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
}

}

[training@localhost ~]$ hadoop fs -cat mrlab/emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/males

[training@localhost ~]$ hadoop fs -ls mrlab/males
Found 3 items
-rw-r--r--   1 training supergroup          0 2016-09-27 06:57 /user/training/mrlab/males/_SUCCESS
drwxr-xr-x   - training supergroup          0 2016-09-27 06:57 /user/training/mrlab/males/_logs
-rw-r--r--   1 training supergroup         60 2016-09-27 06:57 /user/training/mrlab/males/part-m-00000
[training@localhost ~]$ hadoop fs -cat mrlab/males/part-m-00000
101,vino,26000,m,11
103,mohan,13000,m,13
105,naga,6000,m,13
[training@localhost ~]$

-------------------------------------------
RowFilter 2:

   on unstructured Text:

[training@localhost ~]$ cat > news
Mr Modi implementing BigData for the Govt data process
Rahul learning BigData           
Pakistan Jurking India
BigData is a hipe or real
[training@localhost ~]$ hadoop fs -copyFromLocal news mrlab
[training@localhost ~]$

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class RowFilter2 extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select * from News where
     //   contains(upcase(line),'BIGDATA');
     
      String line = v.toString().toUpperCase();
     
      if(line.contains("BIGDATA") ||
               line.contains("BIG DATA"))
         con.write( v , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(RowFilter2.class);
// j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
}

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/news  mrlab/bignews

[training@localhost ~]$ hadoop fs -cat mrlab/bignews/part-m-00000
Mr Modi implementing BigData for the Govt data process
Rahul learning BigData
BigData is a hipe or real
[training@localhost ~]$

--------------------------------

Column Filter.

ex:
     select name, sal, dno  from emp;

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ColumnFilter extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   { // select name,sal,dno from emp;
      String line = v.toString();
      String[] w = line.split(",");
      String newLine = w[1]+","+
                       w[2]+","+w[4];
      con.write( new Text(newLine) , NullWritable.get());
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(ColumnFilter.class);
// j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
}

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/cfilter

[training@localhost ~]$ hadoop fs -cat mrlab/cfilter/part-m-00000
vino,26000,11
Sri,25000,11
mohan,13000,13
lokitha,8000,12
naga,6000,13
janaki,10000,12
[training@localhost ~]$

-----------------------------------------

Generating new Fields

hive>
   select id, name, sal, sal*0.1 as tax,
            sal*0.2 as hra,
          sal-(sal*0.1)+(sal*0.2) as net,
        sex, dno from emp;

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class GenerateNewFields extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      String[] w = line.split(",");
      int sal = Integer.parseInt(w[2]);
      int tax = sal*10/100;
      int hra = sal*20/100;
      int net = sal-tax+hra;
      String newLine =w[0]+","+w[1]+","+sal+","+
                     tax+","+hra+","+net+","+
                      w[3]+","+w[4];
     
     con.write( new Text(newLine) , NullWritable.get());
        
   }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(GenerateNewFields.class);
// j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
}

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/newFields

[training@localhost ~]$ hadoop fs -cat mrlab/newFields/part-m-00000
101,vino,26000,2600,5200,28600,m,11
102,Sri,25000,2500,5000,27500,f,11
103,mohan,13000,1300,2600,14300,m,13
104,lokitha,8000,800,1600,8800,f,12
105,naga,6000,600,1200,6600,m,13
101,janaki,10000,1000,2000,11000,f,12
[training@localhost ~]$
-----------------------------------
transformations

hive> select id, name, sal ,
    >  if(sal>=70000,'A',
    >     if(sal>=50000,'B',
    >      if(sal>=30000,'C','D'))) as grade,
    > if(sex='m','Male','Female') as sex,
    > if(dno=11,'Marketing',
    >  if(dno=12,'hr',
    >   if(dno=13,'Finance','Other'))) as dname
    > from emp;

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Transform extends Mapper<LongWritable,Text,Text,NullWritable>
{
  public void map(LongWritable k, Text v, Context con)
   throws IOException, InterruptedException
   {
      String line = v.toString();
      String[] w = line.split(",");
      int sal = Integer.parseInt(w[2]);
      String sex = w[3];
      int dno = Integer.parseInt(w[4]);
      String grade;
      if(sal>=70000)
           grade="A";
      else if(sal>=50000)
          grade="B";
      else if(sal>=30000)
          grade="C";
      else grade="D";
     
      if(sex.matches("m"))
          sex="Male";
      else sex="Female";
      String dname;
      switch(dno)
      {
      case 11:
           dname="Marketing";
           break;
      case 12:
           dname="Hr";
           break;
      case 13:
           dname="Finance";
           break;
      default:
           dname="Other";
      }

      String newLine = w[0]+","+
      w[1]+","+sal+","+grade+","+sex+","+dname;
      con.write(new Text(newLine), NullWritable.get());
      }
}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driverx
{

public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "colaggr");
  j.setJarByClass(Driverx.class);

  j.setMapperClass(Transform.class);
// j.setReducerClass(RedForNoDupeKeys.class);
   j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
 
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
}

}

[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp  mrlab/transform

[training@localhost ~]$ hadoop fs -cat mrlab/transform/part-m-00000
101,vino,26000,D,Male,Marketing
102,Sri,25000,D,Female,Marketing
103,mohan,13000,D,Male,Finance
104,lokitha,8000,D,Female,Hr
105,naga,6000,D,Male,Finance
101,janaki,10000,D,Female,Hr
[training@localhost ~]$

No comments:

Post a Comment