MR Lab9 : Mapper Only Functionalities
Mapper Only functionality.
--------------------------
row filter:
ex: select * from emp where sex = 'm';
for this reducer is not required.
we need to suspend the reducer..
ex:
j.setNumReduceTasks(0);
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class RowFilterMap extends Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k, Text v, Context con)
throws IOException, InterruptedException
{ // select * from emp where sex ="m";
String line = v.toString();
String[] w = line.split(",");
String sex = w[3];
if(sex.matches("m"))
con.write( v , NullWritable.get());
}
}
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driverx
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "colaggr");
j.setJarByClass(Driverx.class);
j.setMapperClass(RowFilterMap.class);
// j.setReducerClass(RedForNoDupeKeys.class);
j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
[training@localhost ~]$ hadoop fs -cat mrlab/emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/males
[training@localhost ~]$ hadoop fs -ls mrlab/males
Found 3 items
-rw-r--r-- 1 training supergroup 0 2016-09-27 06:57 /user/training/mrlab/males/_SUCCESS
drwxr-xr-x - training supergroup 0 2016-09-27 06:57 /user/training/mrlab/males/_logs
-rw-r--r-- 1 training supergroup 60 2016-09-27 06:57 /user/training/mrlab/males/part-m-00000
[training@localhost ~]$ hadoop fs -cat mrlab/males/part-m-00000
101,vino,26000,m,11
103,mohan,13000,m,13
105,naga,6000,m,13
[training@localhost ~]$
-------------------------------------------
RowFilter 2:
on unstructured Text:
[training@localhost ~]$ cat > news
Mr Modi implementing BigData for the Govt data process
Rahul learning BigData
Pakistan Jurking India
BigData is a hipe or real
[training@localhost ~]$ hadoop fs -copyFromLocal news mrlab
[training@localhost ~]$
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class RowFilter2 extends Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k, Text v, Context con)
throws IOException, InterruptedException
{ // select * from News where
// contains(upcase(line),'BIGDATA');
String line = v.toString().toUpperCase();
if(line.contains("BIGDATA") ||
line.contains("BIG DATA"))
con.write( v , NullWritable.get());
}
}
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driverx
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "colaggr");
j.setJarByClass(Driverx.class);
j.setMapperClass(RowFilter2.class);
// j.setReducerClass(RedForNoDupeKeys.class);
j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/news mrlab/bignews
[training@localhost ~]$ hadoop fs -cat mrlab/bignews/part-m-00000
Mr Modi implementing BigData for the Govt data process
Rahul learning BigData
BigData is a hipe or real
[training@localhost ~]$
--------------------------------
Column Filter.
ex:
select name, sal, dno from emp;
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ColumnFilter extends Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k, Text v, Context con)
throws IOException, InterruptedException
{ // select name,sal,dno from emp;
String line = v.toString();
String[] w = line.split(",");
String newLine = w[1]+","+
w[2]+","+w[4];
con.write( new Text(newLine) , NullWritable.get());
}
}
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driverx
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "colaggr");
j.setJarByClass(Driverx.class);
j.setMapperClass(ColumnFilter.class);
// j.setReducerClass(RedForNoDupeKeys.class);
j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/cfilter
[training@localhost ~]$ hadoop fs -cat mrlab/cfilter/part-m-00000
vino,26000,11
Sri,25000,11
mohan,13000,13
lokitha,8000,12
naga,6000,13
janaki,10000,12
[training@localhost ~]$
-----------------------------------------
Generating new Fields
hive>
select id, name, sal, sal*0.1 as tax,
sal*0.2 as hra,
sal-(sal*0.1)+(sal*0.2) as net,
sex, dno from emp;
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class GenerateNewFields extends Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k, Text v, Context con)
throws IOException, InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
int sal = Integer.parseInt(w[2]);
int tax = sal*10/100;
int hra = sal*20/100;
int net = sal-tax+hra;
String newLine =w[0]+","+w[1]+","+sal+","+
tax+","+hra+","+net+","+
w[3]+","+w[4];
con.write( new Text(newLine) , NullWritable.get());
}
}
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driverx
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "colaggr");
j.setJarByClass(Driverx.class);
j.setMapperClass(GenerateNewFields.class);
// j.setReducerClass(RedForNoDupeKeys.class);
j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/newFields
[training@localhost ~]$ hadoop fs -cat mrlab/newFields/part-m-00000
101,vino,26000,2600,5200,28600,m,11
102,Sri,25000,2500,5000,27500,f,11
103,mohan,13000,1300,2600,14300,m,13
104,lokitha,8000,800,1600,8800,f,12
105,naga,6000,600,1200,6600,m,13
101,janaki,10000,1000,2000,11000,f,12
[training@localhost ~]$
-----------------------------------
transformations
hive> select id, name, sal ,
> if(sal>=70000,'A',
> if(sal>=50000,'B',
> if(sal>=30000,'C','D'))) as grade,
> if(sex='m','Male','Female') as sex,
> if(dno=11,'Marketing',
> if(dno=12,'hr',
> if(dno=13,'Finance','Other'))) as dname
> from emp;
package mr.analytics;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Transform extends Mapper<LongWritable,Text,Text,NullWritable>
{
public void map(LongWritable k, Text v, Context con)
throws IOException, InterruptedException
{
String line = v.toString();
String[] w = line.split(",");
int sal = Integer.parseInt(w[2]);
String sex = w[3];
int dno = Integer.parseInt(w[4]);
String grade;
if(sal>=70000)
grade="A";
else if(sal>=50000)
grade="B";
else if(sal>=30000)
grade="C";
else grade="D";
if(sex.matches("m"))
sex="Male";
else sex="Female";
String dname;
switch(dno)
{
case 11:
dname="Marketing";
break;
case 12:
dname="Hr";
break;
case 13:
dname="Finance";
break;
default:
dname="Other";
}
String newLine = w[0]+","+
w[1]+","+sal+","+grade+","+sex+","+dname;
con.write(new Text(newLine), NullWritable.get());
}
}
package mr.analytics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driverx
{
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
Job j = new Job(c, "colaggr");
j.setJarByClass(Driverx.class);
j.setMapperClass(Transform.class);
// j.setReducerClass(RedForNoDupeKeys.class);
j.setNumReduceTasks(0);
//j.setSortComparatorClass(SortComparator.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
[training@localhost ~]$ hadoop jar Desktop/myapp.jar mr.analytics.Driverx mrlab/emp mrlab/transform
[training@localhost ~]$ hadoop fs -cat mrlab/transform/part-m-00000
101,vino,26000,D,Male,Marketing
102,Sri,25000,D,Female,Marketing
103,mohan,13000,D,Male,Finance
104,lokitha,8000,D,Female,Hr
105,naga,6000,D,Male,Finance
101,janaki,10000,D,Female,Hr
[training@localhost ~]$
No comments:
Post a Comment