Tuesday, September 27, 2016

Spark Union,Insection,


Union--Merging two RDDS
scala> val a = sc.parallelize(1 to 5)
scala> val b = sc.parallelize(6 to 10)
scala> val c = a++b
c: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at $plus$plus at <console>:31

scala> c.collect()
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-------------------
intersection
scala> val a = sc.parallelize(1 to 8)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27

scala> val b = sc.parallelize(6 to 10)

scala> val c = a++b
c: org.apache.spark.rdd.RDD[Int] = UnionRDD[4] at $plus$plus at <console>:31

scala> c.collect()
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 6, 7, 8, 9, 10)
[cloudera@quickstart ~]$ cat abc.txt
Emp1,F,D1
Emp2,M,D2
Emp3,F,D3
Cat
Bat
Mat
[cloudera@quickstart ~]$ cat xyz.txt
Cat
Book
Bat


scala> val a = sc.textFile("/user1/abc.txt")
a: org.apache.spark.rdd.RDD[String] = /user1/abc.txt MapPartitionsRDD[33] at textFile at <console>:27

scala> val b =sc.textFile("/user1/xyz.txt")
b: org.apache.spark.rdd.RDD[String] = /user1/xyz.txt MapPartitionsRDD[35] at textFile at <console>:27

scala> val c = a++b
c: org.apache.spark.rdd.RDD[String] = UnionRDD[36] at $plus$plus at <console>:31

scala> c.collect()
res12: Array[String] = Array(Emp1,F,D1, Emp2,M,D2, Emp3,F,D3, Cat, Bat, Mat, Cat, Book, Bat)

scala> val c = a.intersection(b
     | )
c: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at intersection at <console>:31

scala> c.collect()
res13: Array[String] = Array(Bat, Cat)
----------------------------
Sorting By Key -- It will sort according to key

scala> val a  = List((1,"a"),(7,"b"),(5,"F"))
a: List[(Int, String)] = List((1,a), (7,b), (5,F))

scala> val b=sc.parallelize(a);
b: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[43] at parallelize at <console>:29

scala> val c=b.sortByKey()
c: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[46] at sortByKey at <console>:31

scala> c.collect()
res14: Array[(Int, String)] = Array((1,a), (5,F), (7,b))

------------
scala> val a = sc.parallelize(2 to 7)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:27

scala> val b = sc.parallelize(5 to 9)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27

scala> val c = a.intersection(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[17] at intersection at <console>:31

scala> c.collect()
res6: Array[Int] = Array(6, 7, 5)
--------------------

Sunday, September 4, 2016

Hive index

hive> create table sales(cid int, pid string, amt int)
     row format delimited
       fields terminated by ',';
OK
Time taken: 11.849 seconds
hive> load data local inpath 'sales' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=1, totalSize=192]
OK
Time taken: 1.142 seconds
hive> select * from sales;
OK
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
Time taken: 0.508 seconds, Fetched: 16 row(s)
hive>

hive> create INDEX cid_index on TABLE
      sales(cid)
     as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
     WITH deferred rebuild;
OK
Time taken: 0.351 seconds
hive>

hive> ALTER INDEX  cid_index  ON sales REBUILD;

hive> show tables;
OK
damp
default__sales_cid_index__
mamp
mytab
ramp
sales

hive> describe default__sales_cid_index__;
OK
cid                     int                                        
_bucketname             string                                     
_offsets                array<bigint>                             

hive> select * from default__sales_cid_index__;
OK
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]

[cloudera@quickstart ~]$ cat sales > sales001

hive> load data local inpath 'sales001' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=2, totalSize=384]
OK
Time taken: 0.248 seconds
hive> select * from sales;
OK
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
Time taken: 0.073 seconds, Fetched: 32 row(s)
hive>

hive> ALTER INDEX  cid_index  ON sales REBUILD;

hive> select * from default__sales_cid_index__;
OK
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
Time taken: 0.086 seconds, Fetched: 6 row(s)
hive>

[cloudera@quickstart ~]$ cat sales002
101,p1,1000
101,p1,1200
101,p3,5000
101,p4,6000
101,p7,9000
105,p1,9000
105,p4,10000
105,p6,9000
[cloudera@quickstart ~]$

hive> load data local inpath 'sales002' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=3, totalSize=481]
OK
Time taken: 0.233 seconds
hive> select * from sales;
OK
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
105    p1    9000
105    p4    10000
105    p6    9000
Time taken: 0.085 seconds, Fetched: 40 row(s)
hive>

-- in above table , 105  is available only in bucket3(sales002)

hive> select * from default__sales_cid_index__;
OK
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
Time taken: 0.074 seconds, Fetched: 6 row(s)
hive>

-- in above output no information about 3rd bucket.
-- bcoz, index is not rebuild.

hive> ALTER INDEX  cid_index  ON sales REBUILD;

hive> select * from default__sales_cid_index__;
OK
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales002    [0,12,24,36,48]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
105    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales002    [60,72,85]
Time taken: 0.081 seconds, Fetched: 8 row(s)
hive>

-- after rebuild index, bucket3(sales002) information available.

hive> select * from sales where cid=105;

-- now it reads only bucket3(sales002).

-----------------------------------------------------------------

----------------------------------------------------------------

Hive Bucketing tables and Indexes.
-----------------------------------------------------------------

hive> create table bucks_sales(cid int, pid string,
       amt int)
    > clustered by (pid)
    > into 4 buckets
    > row format delimited
    >   fields terminated by ',';
OK
Time taken: 0.077 seconds
hive>

hive> set hive.enforce.bucketing=true;
hive> insert overwrite table bucks_sales
    >   select * from sales;

[cloudera@quickstart ~]$ hadoop fs -ls /user/hive/warehouse/bucks_sales
Found 4 items
-rwxrwxrwx   1 cloudera supergroup         73 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000000_0
-rwxrwxrwx   1 cloudera supergroup        204 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000001_0
-rwxrwxrwx   1 cloudera supergroup         84 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000002_0
-rwxrwxrwx   1 cloudera supergroup        120 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000003_0
[cloudera@quickstart ~]$

-- now data(all rows) divided into 4 buckets.

[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000000_0
105,p4,10000
101,p4,6000
101,p4,6000
101,p4,6000
102,p4,6000
102,p4,6000
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000001_0
101,p1,1000
105,p1,9000
101,p1,1200
101,p1,1000
101,p1,5000
103,p1,4000
103,p1,2000
103,p1,1000
101,p1,1200
102,p1,2000
101,p1,1000
101,p1,5000
103,p1,4000
103,p1,2000
103,p1,1000
101,p1,1200
102,p1,2000
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000002_0
102,p2,4000
101,p2,3000
102,p2,4000
105,p6,9000
102,p2,4000
102,p2,4000
101,p2,3000
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000003_0
101,p3,5000
102,p3,5000
102,p3,5000
103,p3,4000
101,p7,9000
101,p7,9000
101,p7,9000
101,p3,5000
101,p3,5000
103,p3,4000
[cloudera@quickstart ~]$

-- in above output,
all p4 s available in bucket1 (000000_0)

all p1 s available in bucket2 (000001_0)

all p2 and p6 available in bucket3 (000002_0)

all p3 and p7 available in bucket4 (000003_0)

hive> select * from bucks_sales where pid='p3';
-- to read p3 rows, hive will read all buckets of the table.
-- bcoz, hive does not know in which bucket 'p3' s available.

thats why,
  lets create index object on bucks_sales table on column pid.

hive> create index pid_index on table bucks_sales(pid)
     as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
     with deferred rebuild;

hive> show tables;
OK
bucks_sales
damp
default__bucks_sales_pid_index__
default__sales_cid_index__
mamp
mytab
ramp
sales
Time taken: 0.032 seconds, Fetched: 8 row(s)
hive> select * from default__bucks_sales_pid_index__;
OK
Time taken: 0.089 seconds
hive>

-- now index table is empty. bcoz, index is not rebuild(altered).

hive> ALTER INDEX  pid_index  ON bucks_sales REBUILD;

hive> select * from default__bucks_sales_pid_index__;
OK
p1    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000001_0    [0,12,24,36,48,60,72,84,96,108,120,132,144,156,168,180,192]
p2    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000002_0    [0,12,24,48,60,72]
p3    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000003_0    [0,12,24,36,84,96,108]
p4    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000000_0    [0,13,25,37,49,61]
p6    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000002_0    [36]
p7    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000003_0    [48,60,72]
Time taken: 0.072 seconds, Fetched: 6 row(s)
hive>

hive> select * from bucks_sales where pid='p3';
OK
101    p3    5000
102    p3    5000
102    p3    5000
103    p3    4000
101    p3    5000
101    p3    5000
103    p3    4000
Time taken: 0.078 seconds, Fetched: 7 row(s)
hive>

-- when you ask 'p3' data hive will read only from  4th bucket    (000003_0)