Thursday, November 24, 2016


First we have to create an App on the Facebook platform. We will use this app to connect to the Facebook API. This way you can manage your connections very good but it also has some disadvantages. Different than in version 1 of the API you can now just get information of the friends who are also using the app. This creates big problems as you can´t create a friend network with all you friends anymore.
To create a new app go to
Facebook R create App
Click on “Apps” and choose “Add a New App“. In the next window choose “Website” and give your app a fancy name.

After clicking on “Create a New App ID“, choose a category for your app in the next window and apply the changes with “Create App ID“.
You can then click on “Skip Quick Start” to get directly to the settings of your app.

Facebook R app settings
Welcome to your first own Facebook app!


First we need to install the packages devtools and Rfacebook from github as this is currently the most recent version.
Ok now we need to connect our R session with our test app and authenticate it to our Facebook Profile for Data Mining. Rfacebook offers a very easy function for that.
Just copy your app id and your app secret from your app settings on the Facebook developer page.
The console will then print you the message:
Facebook R message
Copy the URL and go to the settings of your Facebook app. Click on the settings tab on the left side and then choose “+ Add Platform“.
R Facebook settings tab
Then add the URL in the field “Site URL” and save the changes.
Facebook R add url
Go back to your R session and hit enter. Then a browser window should open you have to allow the app to access your Facebook account.
If everything worked the browser should show the message
Facebook R success
And your R console will confirm it with
Facebook R success console
You can then save your fb_oauth object and use it for the next time.

Analyze Facebook with R!

Now we connected everything and have access to Facebook. We will start with getting our own profile information.
As I mentioned before because of Facebook´s new API policies the information you can get is very limited compared to the amount you were able to download with apps using API 1.
So I will just show you now how to get your own personal information but other posts will follow with new use-cases of the new API version.
The getUsers function returns public information about one or more Facebook user. If we use “me” as the username argument, it will return our own profile info.
Now we saved our own public information in the variable „me“ and you can take a look at it.
An advantage of the new API version is that you can get more than 100 likes. You can get the things you liked with:
install_github("Rfacebook", "pablobarbera", subdir="Rfacebook")

save(fb_oauth, file="fb_oauth")

axis_page <- getPage(page="axisbank", token=fb_oauth)
icici_page <- getPage(page="icicibank", token=fb_oauth)
fb_page <- getPage(page="facebook", token=fb_oauth)

my_friends <- getFriends(token=token, simplify=TRUE)

Wednesday, November 23, 2016

Spark submit with SBT

Spark submit with SBT

This recipe assumes sbt is installed and you have already gone over mysql with Spark recipe.
I am a big fan of Spark Shell. Biggest proof is Spark Cookbook which has all recipes in the form of collection of single commands on Spark Shell. It makes it easy to understand and run and see what exact effect each command is having.
On similar lines, I have a big fan of Maven. Maven brought two disruptive changes to the build world which are going to stay forever and they are
  • Declarative dependency management
  • Convention over configuration
That being said, to deploy Spark applications on Cluster and do cluster level optimizations, spark-shell is not enough and we have to use spark-submit. spark-submit expects the application logic to be bundled in a jar file. Now creating this jar file using maven is a lot of work especially for super simple project and this is where simplicity of Sbt comes into the picture.
In this recipe we’ll build and deploy a simple application using Sbt.
Create directory people and src/main/scala in that.
$ mkdir -p people/src/main/scala
Create file Person.scala in src/main/scala with following content.
import org.apache.spark._
import org.apache.spark.sql._
object Person extends App {
  val sc = new SparkContext
  val sqlContext = new SQLContext(sc)
  val url="jdbc:mysql://localhost:3306/hadoopdb"
  val prop = new java.util.Properties
  val people =,"person",prop)
Now cd to people directory and create build file build.sbt using following command
$ echo "libraryDependencies += \"org.apache.spark\" %% \"spark-sql\" % \"1.4.1\"" >> build.sbt
Now package the jar
$ sbt clean package
Now run the application using sbt-submit
$ spark-submit --driver-class-path /home/hduser/thirdparty/mysql.jar target/scala-2.10/people*.jar
I hope you appreciate beauty and simplicity of Sbt now.
Now let’s do the same using cluster deploy mode, in this case we have to go to http://spark-master:8080 to see the results.
$ spark-submit --master spark://localhost:7077 --deploy-mode cluster  --driver-class-path /home/hduser/thirdparty/mysql.jar --jars /home/hduser/thirdparty/mysql.jar target/scala-2.10/people*.jar
One thing to notice is that documentation suggests that –jars should put library on both driver and executors. My personal experience suggests that –driver-class-path puts library on driver and –jars puts them only on executors

Tuesday, November 8, 2016

HBase shell commands

HBase shell commands

As told in HBase introduction, HBase provides Extensible jruby-based (JIRB) shell as a feature to execute some commands(each command represents one functionality).
HBase shell commands are mainly categorized into 6 parts
1) General  HBase shell commands
status Show cluster status. Can be ‘summary’, ‘simple’, or ‘detailed’. The
default is ‘summary’.
hbase> status
hbase> status ‘simple’
hbase> status ‘summary’
hbase> status ‘detailed’
version Output this HBase versionUsage:
hbase> version
whoami Show the current hbase user.Usage:
hbase> whoami
2) Tables Management commands
alter Alter column family schema; pass table name and a dictionary
specifying new column family schema. Dictionaries are described
on the main help command output. Dictionary must include name
of column family to alter.For example, to change or add the ‘f1’ column family in table ‘t1’ from
current value to keep a maximum of 5 cell VERSIONS, do:
hbase> alter ‘t1’, NAME => ‘f1’, VERSIONS => 5
You can operate on several column families:
hbase> alter ‘t1’, ‘f1’, {NAME => ‘f2’, IN_MEMORY => true}, {NAME => ‘f3’, VERSIONS => 5}
To delete the ‘f1’ column family in table ‘t1’, use one of:hbase> alter ‘t1’, NAME => ‘f1’, METHOD => ‘delete’
hbase> alter ‘t1’, ‘delete’ => ‘f1’
You can also change table-scope attributes like MAX_FILESIZE, READONLY,
MEMSTORE_FLUSHSIZE, DEFERRED_LOG_FLUSH, etc. These can be put at the end;
for example, to change the max size of a region to 128MB, do:
hbase> alter ‘t1’, MAX_FILESIZE => ‘134217728’
You can add a table coprocessor by setting a table coprocessor attribute:
hbase> alter ‘t1’,
Since you can have multiple coprocessors configured for a table, a
sequence number will be automatically appended to the attribute name
to uniquely identify it.
The coprocessor attribute must match the pattern below in order for
the framework to understand how to load the coprocessor classes:
[coprocessor jar file location] | class name | [priority] | [arguments]
You can also set configuration settings specific to this table or column family:
hbase> alter ‘t1’, CONFIGURATION => {‘hbase.hregion.scan.loadColumnFamiliesOnDemand’ => ‘true’}
hbase> alter ‘t1’, {NAME => ‘f2’, CONFIGURATION => {‘hbase.hstore.blockingStoreFiles’ => ’10’}}
You can also remove a table-scope attribute:
hbase> alter ‘t1’, METHOD => ‘table_att_unset’, NAME => ‘MAX_FILESIZE’
hbase> alter ‘t1’, METHOD => ‘table_att_unset’, NAME => ‘coprocessor$1’
There could be more than one alteration in one command:
hbase> alter ‘t1’, { NAME => ‘f1’, VERSIONS => 3 },
{ MAX_FILESIZE => ‘134217728’ }, { METHOD => ‘delete’, NAME => ‘f2’ },
OWNER => ‘johndoe’, METADATA => { ‘mykey’ => ‘myvalue’ }
create Create table; pass table name, a dictionary of specifications per
column family, and optionally a dictionary of table configuration.
hbase> create ‘t1’, {NAME => ‘f1’, VERSIONS => 5}
hbase> create ‘t1’, {NAME => ‘f1’}, {NAME => ‘f2’}, {NAME => ‘f3’}
hbase> # The above in shorthand would be the following:
hbase> create ‘t1’, ‘f1’, ‘f2’, ‘f3’
hbase> create ‘t1’, {NAME => ‘f1’, VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}
hbase> create ‘t1’, {NAME => ‘f1’, CONFIGURATION => {‘hbase.hstore.blockingStoreFiles’ => ’10’}}
Table configuration options can be put at the end.
describe Describe the named table.
hbase> describe ‘t1’
disable Start disable of named table
hbase> disable ‘t1’
disable_all Disable all of tables matching the given regex
hbase> disable_all ‘t.*’
is_disabled verifies Is named table disabled
hbase> is_disabled ‘t1’
drop  Drop the named table. Table must first be disabled
hbase> drop ‘t1’
drop_all Drop all of the tables matching the given regex
hbase> drop_all ‘t.*’
enable Start enable of named table
hbase> enable ‘t1’
enable_all Enable all of the tables matching the given regex
hbase> enable_all ‘t.*’
is_enabled verifies Is named table enabled
hbase> is_enabled ‘t1’
exists Does the named table exist
hbase> exists ‘t1’
list List all tables in hbase. Optional regular expression parameter could
be used to filter the output
hbase> list
hbase> list ‘abc.*’
show_filters Show all the filters in hbase.
hbase> show_filters
alter_status Get the status of the alter command. Indicates the number of regions of the table that have received the updated schema Pass table name.
hbase> alter_status ‘t1’
alter_async Alter column family schema, does not wait for all regions to receive the
schema changes. Pass table name and a dictionary specifying new column
family schema. Dictionaries are described on the main help command output.
Dictionary must include name of column family to alter.
To change or add the ‘f1’ column family in table ‘t1’ from defaults
to instead keep a maximum of 5 cell VERSIONS, do:hbase> alter_async ‘t1’, NAME => ‘f1’, VERSIONS => 5To delete the ‘f1’ column family in table ‘t1’, do:
hbase> alter_async ‘t1’, NAME => ‘f1’, METHOD => ‘delete’or a shorter version:hbase> alter_async ‘t1’, ‘delete’ => ‘f1’
You can also change table-scope attributes like MAX_FILESIZE
For example, to change the max size of a family to 128MB, do:
hbase> alter ‘t1’, METHOD => ‘table_att’, MAX_FILESIZE => ‘134217728’
There could be more than one alteration in one command:
hbase> alter ‘t1’, {NAME => ‘f1’}, {NAME => ‘f2’, METHOD => ‘delete’}
To check if all the regions have been updated, use alter_status <table_name>
3) Data Manipulation commands  
count Count the number of rows in a table. Return value is the number of rows.
This operation may take a LONG time (Run ‘$HADOOP_HOME/bin/hadoop jar
hbase.jar rowcount’ to run a counting mapreduce job). Current count is shown
every 1000 rows by default. Count interval may be optionally specified. Scan
caching is enabled on count scans by default. Default cache size is 10 rows.
If your rows are small in size, you may want to increase this
parameter. Examples:hbase> count ‘t1’
hbase> count ‘t1’, INTERVAL => 100000
hbase> count ‘t1’, CACHE => 1000
hbase> count ‘t1’, INTERVAL => 10, CACHE => 1000
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding commands would be:hbase> t.count
hbase> t.count INTERVAL => 100000
hbase> t.count CACHE => 1000
hbase> t.count INTERVAL => 10, CACHE => 1000
delete Put a delete cell value at specified table/row/column and optionally
timestamp coordinates. Deletes must match the deleted cell’s
coordinates exactly. When scanning, a delete cell suppresses older
versions. To delete a cell from ‘t1’ at row ‘r1’ under column ‘c1’
marked with the time ‘ts1’, do:
hbase> delete ‘t1’, ‘r1’, ‘c1’, ts1
The same command can also be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:hbase> t.delete ‘r1’, ‘c1’, ts1
deleteall Delete all cells in a given row; pass a table name, row, and optionally
a column and timestamp. Examples:hbase> deleteall ‘t1’, ‘r1’
hbase> deleteall ‘t1’, ‘r1’, ‘c1’
hbase> deleteall ‘t1’, ‘r1’, ‘c1’, ts1
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:hbase> t.deleteall ‘r1’
hbase> t.deleteall ‘r1’, ‘c1’
hbase> t.deleteall ‘r1’, ‘c1’, ts1
get Get row or cell contents; pass table name, row, and optionally
a dictionary of column(s), timestamp, timerange and versions. Examples:
hbase> get ‘t1’, ‘r1’
hbase> get ‘t1’, ‘r1’, {TIMERANGE => [ts1, ts2]}
hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’}
hbase> get ‘t1’, ‘r1’, {COLUMN => [‘c1’, ‘c2’, ‘c3’]}
hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1}
hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMERANGE => [ts1, ts2], VERSIONS => 4}
hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1, VERSIONS => 4}
hbase> get ‘t1’, ‘r1’, {FILTER => “ValueFilter(=, ‘binary:abc’)”}
hbase> get ‘t1’, ‘r1’, ‘c1’
hbase> get ‘t1’, ‘r1’, ‘c1’, ‘c2’
hbase> get ‘t1’, ‘r1’, [‘c1’, ‘c2’]
Besides the default ‘toStringBinary’ format, ‘get’ also supports custom formatting by
column. A user can define a FORMATTER by adding it to the column name in the get
specification. The FORMATTER can be stipulated:1. either as a org.apache.hadoop.hbase.util.Bytes method name (e.g, toInt, toString)
2. or as a custom class followed by method name: e.g. ‘c(MyFormatterClass).format’.Example formatting cf:qualifier1 and cf:qualifier2 both as Integers:
hbase> get ‘t1’, ‘r1’ {COLUMN => [‘cf:qualifier1:toInt’,
‘cf:qualifier2:c(org.apache.hadoop.hbase.util.Bytes).toInt’] }
Note that you can specify a FORMATTER by column only (cf:qualifer). You cannot specify
a FORMATTER for all columns of a column family.The same commands also can be run on a reference to a table (obtained via get_table or
create_table). Suppose you had a reference t to table ‘t1’, the corresponding commands
would be:
hbase> t.get ‘r1’
hbase> t.get ‘r1’, {TIMERANGE => [ts1, ts2]}
hbase> t.get ‘r1’, {COLUMN => ‘c1’}
hbase> t.get ‘r1’, {COLUMN => [‘c1’, ‘c2’, ‘c3’]}
hbase> t.get ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1}
hbase> t.get ‘r1’, {COLUMN => ‘c1’, TIMERANGE => [ts1, ts2], VERSIONS => 4}
hbase> t.get ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1, VERSIONS => 4}
hbase> t.get ‘r1’, {FILTER => “ValueFilter(=, ‘binary:abc’)”}
hbase> t.get ‘r1’, ‘c1’
hbase> t.get ‘r1’, ‘c1’, ‘c2’
hbase> t.get ‘r1’, [‘c1’, ‘c2’]
get_counter Return a counter cell value at specified table/row/column coordinates.
A cell cell should be managed with atomic increment function oh HBase
and the data should be binary encoded. Example:
hbase> get_counter ‘t1’, ‘r1’, ‘c1’
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:
hbase> t.get_counter ‘r1’, ‘c1’
incr Increments a cell ‘value’ at specified table/row/column coordinates.
To increment a cell value in table ‘t1’ at row ‘r1’ under column
‘c1’ by 1 (can be omitted) or 10 do:
hbase> incr ‘t1’, ‘r1’, ‘c1’
hbase> incr ‘t1’, ‘r1’, ‘c1’, 1
hbase> incr ‘t1’, ‘r1’, ‘c1’, 10
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:hbase> t.incr ‘r1’, ‘c1’
hbase> t.incr ‘r1’, ‘c1’, 1
hbase> t.incr ‘r1’, ‘c1’, 10
put Put a cell ‘value’ at specified table/row/column and optionally
timestamp coordinates. To put a cell value into table ‘t1’ at
row ‘r1’ under column ‘c1’ marked with the time ‘ts1’, do:
hbase> put ‘t1’, ‘r1’, ‘c1’, ‘value’, ts1
The same commands also can be run on a table reference. Suppose you had a reference
t to table ‘t1’, the corresponding command would be:
hbase> t.put ‘r1’, ‘c1’, ‘value’, ts1
scan Scan a table; pass table name and optionally a dictionary of scanner
specifications. Scanner specifications may include one or more of:
or COLUMNS, CACHEIf no columns are specified, all columns will be scanned.
To scan all members of a column family, leave the qualifier empty as in
‘col_family:’.The filter can be specified in two ways:
1. Using a filterString – more information on this is available in the
Filter Language document attached to the HBASE-4176 JIRA
2. Using the entire package name of the filter.Some examples:hbase> scan ‘.META.’
hbase> scan ‘.META.’, {COLUMNS => ‘info:regioninfo’}
hbase> scan ‘t1’, {COLUMNS => [‘c1’, ‘c2’], LIMIT => 10, STARTROW => ‘xyz’}
hbase> scan ‘t1’, {COLUMNS => ‘c1’, TIMERANGE => [1303668804, 1303668904]}
hbase> scan ‘t1’, {FILTER => “(PrefixFilter (‘row2’) AND
(QualifierFilter (>=, ‘binary:xyz’))) AND (TimestampsFilter ( 123, 456))”}
hbase> scan ‘t1’, {FILTER =>, 0)}
For experts, there is an additional option — CACHE_BLOCKS — which
switches block caching for the scanner on (true) or off (false). By
default it is enabled. Examples:hbase> scan ‘t1’, {COLUMNS => [‘c1’, ‘c2’], CACHE_BLOCKS => false}
Also for experts, there is an advanced option — RAW — which instructs the
scanner to return all cells (including delete markers and uncollected deleted
cells). This option cannot be combined with requesting specific COLUMNS.
Disabled by default. Example:
hbase> scan ‘t1’, {RAW => true, VERSIONS => 10}
Besides the default ‘toStringBinary’ format, ‘scan’ supports custom formatting
by column. A user can define a FORMATTER by adding it to the column name in
the scan specification. The FORMATTER can be stipulated:
1. either as a org.apache.hadoop.hbase.util.Bytes method name (e.g, toInt, toString)
2. or as a custom class followed by method name: e.g. ‘c(MyFormatterClass).format’.
Example formatting cf:qualifier1 and cf:qualifier2 both as Integers:
hbase> scan ‘t1’, {COLUMNS => [‘cf:qualifier1:toInt’,
‘cf:qualifier2:c(org.apache.hadoop.hbase.util.Bytes).toInt’] }
Note that you can specify a FORMATTER by column only (cf:qualifer). You cannot
specify a FORMATTER for all columns of a column family.
Scan can also be used directly from a table, by first getting a reference to a
table, like such:
hbase> t = get_table ‘t’
hbase> t.scan
Note in the above situation, you can still provide all the filtering, columns,
options, etc as described above.
truncate Disables, drops and recreates the specified table.
hbase>truncate ‘t1’
4) HBase surgery tools
assign Assign a region. Use with caution. If region already assigned,
this command will do a force reassign. For experts only.
hbase> assign ‘REGION_NAME’
balancer Trigger the cluster balancer. Returns true if balancer ran and was able to
tell the region servers to unassign all the regions to balance (the re-assignment itself is async).
Otherwise false (Will not run if regions in transition).
hbase> balancer
balance_switch Enable/Disable balancer. Returns previous balancer state.
hbase> balance_switch true
hbase> balance_switch false
close_region Close a single region. Ask the master to close a region out on the cluster
or if ‘SERVER_NAME’ is supplied, ask the designated hosting regionserver to
close the region directly. Closing a region, the master expects ‘REGIONNAME’
to be a fully qualified region name. When asking the hosting regionserver to
directly close a region, you pass the regions’ encoded name only. A region
name looks like this:TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396.The trailing period is part of the regionserver name. A region’s encoded name
is the hash at the end of a region name; e.g. 527db22f95c8a9e0116f0cc13c680396
(without the period). A ‘SERVER_NAME’ is its host, port plus startcode. For
example:,60020,1289493121758 (find servername in master ui
or when you do detailed status in shell). This command will end up running
close on the region hosting regionserver. The close is done without the
master’s involvement (It will not know of the close). Once closed, region will
stay closed. Use assign to reopen/reassign. Use unassign or move to assign
the region elsewhere on cluster. Use with caution. For experts only.
Examples:hbase> close_region ‘REGIONNAME’
hbase> close_region ‘REGIONNAME’, ‘SERVER_NAME’
compact Compact all regions in passed table or pass a region row
to compact an individual region. You can also compact a single column
family within a region.
Compact all regions in a table:
hbase> compact ‘t1’
Compact an entire region:
hbase> compact ‘r1’
Compact only a column family within a region:
hbase> compact ‘r1’, ‘c1’
Compact a column family within a table:
hbase> compact ‘t1’, ‘c1’
flush Flush all regions in passed table or pass a region row to
flush an individual region. For example:hbase> flush ‘TABLENAME’
hbase> flush ‘REGIONNAME’
major_compact Run major compaction on passed table or pass a region row
to major compact an individual region. To compact a single
column family within a region specify the region name
followed by the column family name.
Compact all regions in a table:
hbase> major_compact ‘t1’
Compact an entire region:
hbase> major_compact ‘r1’
Compact a single column family within a region:
hbase> major_compact ‘r1’, ‘c1’
Compact a single column family within a table:
hbase> major_compact ‘t1’, ‘c1’
move Move a region. Optionally specify target regionserver else we choose one
at random. NOTE: You pass the encoded region name, not the region name so
this command is a little different to the others. The encoded region name
is the hash suffix on region names: e.g. if the region name were
TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396. then
the encoded region name portion is 527db22f95c8a9e0116f0cc13c680396
A server name is its host, port plus startcode. For example:,60020,1289493121758
Examples:hbase> move ‘ENCODED_REGIONNAME’
split Split entire table or pass a region to split individual region. With the
second parameter, you can specify an explicit split key for the region.
split ‘tableName’
split ‘regionName’ # format: ‘tableName,startKey,id’
split ‘tableName’, ‘splitKey’
split ‘regionName’, ‘splitKey’
unassign Unassign a region. Unassign will close region in current location and then
reopen it again. Pass ‘true’ to force the unassignment (‘force’ will clear
all in-memory state in master before the reassign. If results in
double assignment use hbck -fix to resolve. To be used by experts).
Use with caution. For expert use only. Examples:hbase> unassign ‘REGIONNAME’
hbase> unassign ‘REGIONNAME’, true
hlog_roll Roll the log writer. That is, start writing log messages to a new file.
The name of the regionserver should be given as the parameter. A
‘server_name’ is the host, port plus startcode of a regionserver. For
example:,60020,1289493121758 (find servername in
master ui or when you do detailed status in shell)
zk_dump Dump status of HBase cluster as seen by ZooKeeper. Example:

5) Cluster replication tools
add_peer Add a peer cluster to replicate to, the id must be a short and
the cluster key is composed like this:
This gives a full path for HBase to connect to another cluster.
Examples:hbase> add_peer ‘1’, “”
hbase> add_peer ‘2’, “zk1,zk2,zk3:2182:/hbase-prod”
remove_peer Stops the specified replication stream and deletes all the meta
information kept about it. Examples:
hbase> remove_peer ‘1’
list_peers List all replication peer clusters.
hbase> list_peers
enable_peer Restarts the replication to the specified peer cluster,
continuing from where it was disabled.Examples:
hbase> enable_peer ‘1’
disable_peer Stops the replication stream to the specified cluster, but still
keeps track of new edits to replicate.Examples:
hbase> disable_peer ‘1’
start_replication Restarts all the replication features. The state in which each
stream starts in is undetermined.
start/stop replication is only meant to be used in critical load situations.
hbase> start_replication
stop_replication Stops all the replication features. The state in which each
stream stops in is undetermined.
start/stop replication is only meant to be used in critical load situations.
hbase> stop_replication
6) Security tools
grant Grant users specific rights.
Syntax : grantpermissions is either zero or more letters from the set “RWXCA”.
READ(‘R’), WRITE(‘W’), EXEC(‘X’), CREATE(‘C’), ADMIN(‘A’)For example:hbase> grant ‘bobsmith’, ‘RWXCA’
hbase> grant ‘bobsmith’, ‘RW’, ‘t1’, ‘f1’, ‘col1’
revoke Revoke a user’s access rights.
Syntax : revoke
For example:
hbase> revoke ‘bobsmith’, ‘t1’, ‘f1’, ‘col1’
user_permission Show all permissions for the particular user.
Syntax : user_permission
For example:hbase> user_permission
hbase> user_permission ‘table1’

Monday, October 17, 2016

spark Streaming word count

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val stc = new StreamingContext(sc, Seconds(5))
stc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@18f2d752

scala> val lines = stc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@346995e1

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@27058c08

scala> val pairs = => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@de5d972

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@409f70ca

scala> wordCounts.print()

scala> wordCounts.foreachRDD { rdd => rdd.foreach(print) }

scala> stc.start()

scala> stc.awaitTermination()

new terminal
[cloudera@quickstart ~]$ nc -lk 9999
hello hello

Tuesday, September 27, 2016

Spark Union,Insection,

Union--Merging two RDDS
scala> val a = sc.parallelize(1 to 5)
scala> val b = sc.parallelize(6 to 10)
scala> val c = a++b
c: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at $plus$plus at <console>:31

scala> c.collect()
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val a = sc.parallelize(1 to 8)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27

scala> val b = sc.parallelize(6 to 10)

scala> val c = a++b
c: org.apache.spark.rdd.RDD[Int] = UnionRDD[4] at $plus$plus at <console>:31

scala> c.collect()
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 6, 7, 8, 9, 10)
[cloudera@quickstart ~]$ cat abc.txt
[cloudera@quickstart ~]$ cat xyz.txt

scala> val a = sc.textFile("/user1/abc.txt")
a: org.apache.spark.rdd.RDD[String] = /user1/abc.txt MapPartitionsRDD[33] at textFile at <console>:27

scala> val b =sc.textFile("/user1/xyz.txt")
b: org.apache.spark.rdd.RDD[String] = /user1/xyz.txt MapPartitionsRDD[35] at textFile at <console>:27

scala> val c = a++b
c: org.apache.spark.rdd.RDD[String] = UnionRDD[36] at $plus$plus at <console>:31

scala> c.collect()
res12: Array[String] = Array(Emp1,F,D1, Emp2,M,D2, Emp3,F,D3, Cat, Bat, Mat, Cat, Book, Bat)

scala> val c = a.intersection(b
     | )
c: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at intersection at <console>:31

scala> c.collect()
res13: Array[String] = Array(Bat, Cat)
Sorting By Key -- It will sort according to key

scala> val a  = List((1,"a"),(7,"b"),(5,"F"))
a: List[(Int, String)] = List((1,a), (7,b), (5,F))

scala> val b=sc.parallelize(a);
b: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[43] at parallelize at <console>:29

scala> val c=b.sortByKey()
c: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[46] at sortByKey at <console>:31

scala> c.collect()
res14: Array[(Int, String)] = Array((1,a), (5,F), (7,b))

scala> val a = sc.parallelize(2 to 7)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:27

scala> val b = sc.parallelize(5 to 9)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27

scala> val c = a.intersection(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[17] at intersection at <console>:31

scala> c.collect()
res6: Array[Int] = Array(6, 7, 5)

Sunday, September 4, 2016

Hive index

hive> create table sales(cid int, pid string, amt int)
     row format delimited
       fields terminated by ',';
Time taken: 11.849 seconds
hive> load data local inpath 'sales' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=1, totalSize=192]
Time taken: 1.142 seconds
hive> select * from sales;
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
Time taken: 0.508 seconds, Fetched: 16 row(s)

hive> create INDEX cid_index on TABLE
     as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
     WITH deferred rebuild;
Time taken: 0.351 seconds

hive> ALTER INDEX  cid_index  ON sales REBUILD;

hive> show tables;

hive> describe default__sales_cid_index__;
cid                     int                                        
_bucketname             string                                     
_offsets                array<bigint>                             

hive> select * from default__sales_cid_index__;
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]

[cloudera@quickstart ~]$ cat sales > sales001

hive> load data local inpath 'sales001' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=2, totalSize=384]
Time taken: 0.248 seconds
hive> select * from sales;
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
Time taken: 0.073 seconds, Fetched: 32 row(s)

hive> ALTER INDEX  cid_index  ON sales REBUILD;

hive> select * from default__sales_cid_index__;
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
Time taken: 0.086 seconds, Fetched: 6 row(s)

[cloudera@quickstart ~]$ cat sales002
[cloudera@quickstart ~]$

hive> load data local inpath 'sales002' into table sales;
Loading data to table default.sales
Table default.sales stats: [numFiles=3, totalSize=481]
Time taken: 0.233 seconds
hive> select * from sales;
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
102    p1    2000
103    p3    4000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
102    p2    4000
102    p3    5000
102    p4    6000
103    p1    1000
103    p1    2000
103    p1    4000
102    p2    4000
101    p1    5000
101    p2    3000
101    p1    1000
101    p1    1200
101    p3    5000
101    p4    6000
101    p7    9000
105    p1    9000
105    p4    10000
105    p6    9000
Time taken: 0.085 seconds, Fetched: 40 row(s)

-- in above table , 105  is available only in bucket3(sales002)

hive> select * from default__sales_cid_index__;
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
Time taken: 0.074 seconds, Fetched: 6 row(s)

-- in above output no information about 3rd bucket.
-- bcoz, index is not rebuild.

hive> ALTER INDEX  cid_index  ON sales REBUILD;

hive> select * from default__sales_cid_index__;
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [0,36,48,60,72,168,180]
101    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales002    [0,12,24,36,48]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [12,84,96,108,156]
102    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [12,84,96,108,156]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales    [24,120,132,144]
103    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales001    [24,120,132,144]
105    hdfs://quickstart.cloudera:8020/user/hive/warehouse/sales/sales002    [60,72,85]
Time taken: 0.081 seconds, Fetched: 8 row(s)

-- after rebuild index, bucket3(sales002) information available.

hive> select * from sales where cid=105;

-- now it reads only bucket3(sales002).



Hive Bucketing tables and Indexes.

hive> create table bucks_sales(cid int, pid string,
       amt int)
    > clustered by (pid)
    > into 4 buckets
    > row format delimited
    >   fields terminated by ',';
Time taken: 0.077 seconds

hive> set hive.enforce.bucketing=true;
hive> insert overwrite table bucks_sales
    >   select * from sales;

[cloudera@quickstart ~]$ hadoop fs -ls /user/hive/warehouse/bucks_sales
Found 4 items
-rwxrwxrwx   1 cloudera supergroup         73 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000000_0
-rwxrwxrwx   1 cloudera supergroup        204 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000001_0
-rwxrwxrwx   1 cloudera supergroup         84 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000002_0
-rwxrwxrwx   1 cloudera supergroup        120 2016-09-02 11:11 /user/hive/warehouse/bucks_sales/000003_0
[cloudera@quickstart ~]$

-- now data(all rows) divided into 4 buckets.

[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000000_0
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000001_0
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000002_0
[cloudera@quickstart ~]$ hadoop fs -cat /user/hive/warehouse/bucks_sales/000003_0
[cloudera@quickstart ~]$

-- in above output,
all p4 s available in bucket1 (000000_0)

all p1 s available in bucket2 (000001_0)

all p2 and p6 available in bucket3 (000002_0)

all p3 and p7 available in bucket4 (000003_0)

hive> select * from bucks_sales where pid='p3';
-- to read p3 rows, hive will read all buckets of the table.
-- bcoz, hive does not know in which bucket 'p3' s available.

thats why,
  lets create index object on bucks_sales table on column pid.

hive> create index pid_index on table bucks_sales(pid)
     as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
     with deferred rebuild;

hive> show tables;
Time taken: 0.032 seconds, Fetched: 8 row(s)
hive> select * from default__bucks_sales_pid_index__;
Time taken: 0.089 seconds

-- now index table is empty. bcoz, index is not rebuild(altered).

hive> ALTER INDEX  pid_index  ON bucks_sales REBUILD;

hive> select * from default__bucks_sales_pid_index__;
p1    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000001_0    [0,12,24,36,48,60,72,84,96,108,120,132,144,156,168,180,192]
p2    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000002_0    [0,12,24,48,60,72]
p3    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000003_0    [0,12,24,36,84,96,108]
p4    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000000_0    [0,13,25,37,49,61]
p6    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000002_0    [36]
p7    hdfs://quickstart.cloudera:8020/user/hive/warehouse/bucks_sales/000003_0    [48,60,72]
Time taken: 0.072 seconds, Fetched: 6 row(s)

hive> select * from bucks_sales where pid='p3';
101    p3    5000
102    p3    5000
102    p3    5000
103    p3    4000
101    p3    5000
101    p3    5000
103    p3    4000
Time taken: 0.078 seconds, Fetched: 7 row(s)

-- when you ask 'p3' data hive will read only from  4th bucket    (000003_0)

Wednesday, August 31, 2016

Mr 3

MR Lab3
grouping By Multiple columns.


select dno, sex, sum(sal) from emp
   group by dno, sex;


public class DnoSexSalMap extends

     //  file : emp
     // schema : id,name,sal,sex,dno
    //  delimiter : "," (comma)
//  sample row : 101,amar,20000,m,11
//   sex as key, sal as value.
    public void map(LongWritable 
k,Text v,
            Context  con)
     throws IOException,
        String line =
      String[] w = line.split(",");   
      String sex = w[3];
      String dno = w[4];
      String myKey = dno+"\t"+sex;
     int sal =Integer.parseInt(w[2]);
    con.write(new Text(myKey),new




import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;





public class Driver8
    public static void main(String

[] args)
     throws Exception
        Configuration c = new

        Job j = new Job






         Path p1 = new Path

(args[0]); //input
         Path p2 = new Path

(args[1]); //output
FileOutputFormat.setOutputPath(j, p2);

System.exit(j.waitForCompletion(true) ?




[training@localhost ~]$ hadoop fs -cat

11      f       25000
11      m       26000
12      f       18000
13      m       19000


Spark lab1

spark lab1 : Spark Aggregations : map, flatMap, sc.textFile(), reduceByKey(), groupByKey()
spark Lab1:
[cloudera@quickstart ~]$ cat > comment
i love hadoop
i love spark
i love hadoop and spark
[cloudera@quickstart ~]$ hadoop fs -mkdir spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal comment spark

Word Count using spark:

scala> val r1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/comment")

scala> r1.collect.foreach(println)

scala> val r2 = => x.split(" "))

scala> val r3 = r2.flatMap(x => x)

instead of writing r2 and r3.

scala> val words  = r1.flatMap(x =>
     |    x.split(" ") )

scala> val wpair = x =>
     |    (x,1) )

scala> val wc = wpair.reduceByKey((x,y) => x+y)

scala> wc.collect

scala> val wcres = x =>
     |     x._1+","+x._2 )

scala> wcres.saveAsTextFile("hdfs://quickstart.cloudera/user/cloudera/spark/results2")

[cloudera@quickstart ~]$ cat emp
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal emp spark
[cloudera@quickstart ~]$

scala> val e1 = sc.textFile("/user/cloudera/spark/emp")

scala> val e2 =","))

scala> val epair = x=>
     |   (x(3), x(2).toInt ) )

scala> val res = epair.reduceByKey(_+_)
res: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:24

scala> res.collect.foreach(println)

scala> val resmax = epair.reduceByKey(
     |    (x,y) => Math.max(x,y))

scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> resmax.collect.foreach(println)

scala> resmin.collect.foreach(println)

scala> val grpd = epair.groupByKey()

scala> val resall = =>
     |  (x._1, x._2.sum,x._2.size,x._2.max,x._2.min,x._2.sum/x._2.size) )
scala> resall.collect.foreach(println)

Spark lab2

Spark Lab2
scala> val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/spark/emp")

scala> val earray => x.split(","))
earray: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14

scala> earray.collect

Array[Array[String]] = Array(Array(101, aa, 20000, m, 11), Array(102, bb, 30000, f, 12), Array(103, cc, 40000, m, 11), Array(104, ddd, 50000, f, 12), Array(105, ee, 60000, m, 12), Array(106, dd, 90000, f, 11))

scala> val epair = x =>
     |   (x(4), x(2).toInt))

scala> val ressum = epair.reduceByKey(_+_)

scala> val resmax = epair.reduceByKey(Math.max(_,_))

scala> val resmin = epair.reduceByKey(Math.min(_,_))

scala> ressum.collect.foreach(println)

scala> val grpByDno =

scala> grpByDno.collect
Array[(String, Iterable[Int])] = Array((12,CompactBuffer(30000, 50000, 60000)), (11,CompactBuffer(20000, 40000, 90000)))

scala> val resall = =>
         x._2.min  )

12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000

[cloudera@quickstart ~]$ hadoop fs -cat spark/today1/part-00000
12 140000 3 46666 60000 30000
11 150000 3 50000 90000 20000
[cloudera@quickstart ~]$


aggregations by multiple grouping.

ex: equivalant sql/hql query.

select dno, sex , sum(sal) from emp
  group by dno, sex;
scala> val DnoSexSalPair =
     |   x => ((x(4),x(3)),x(2).toInt) )
scala> DnoSexSalPair.collect.foreach(println)


scala> val rsum = DnoSexSalPair.reduceByKey(_+_)

scala> rsum.collect.foreach(println)


scala> val rs = x =>
                x._2 )
scala> rs.collect.foreach(println)

11 f 90000
12 f 80000
12 m 60000
11 m 60000


grouping by multiple columns, and multiple aggregations.


select dno, sex, sum(sal), max(sal) ,
  min(sal), count(*), avg(sal)
   from emp group by dno, sex;

val grpDnoSex =

val r = x =>
x._2.sum/x._2.size  )

11 f 90000 90000 90000 1 90000
12 f 80000 50000 30000 2 40000
12 m 60000 60000 60000 1 60000
11 m 60000 40000 20000 2 30000


Monday, August 29, 2016

Spark Word count

spark word count
[cloudera@quickstart ~]$ cat as.txt
i love hadoop
i love spark
o love hadoop and spark

scala> val rdd = sc.textFile("/user1/as.txt")
rdd: org.apache.spark.rdd.RDD[String] = /user1/as.txt MapPartitionsRDD[42] at textFile at <console>:31

scala> val rdd1 =rdd.flatMap(x=>x.split(" "))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[43] at flatMap at <console>:33

scala> val rdd2>(x,1))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[44] at map at <console>:35

scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:33

scala> val rdd4 = => x._1 + " " +x._2).collect()
rdd4: Array[String] = Array(love 3, spark 2, hadoop 2, i 2, o 1, and 1)

Tuesday, August 23, 2016

spark json

[cloudera@quickstart ~]$ vi a.json
[cloudera@quickstart ~]$ cat a.json
{"name":"kumar","age" :24,"loc":"del"}

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sql= new SQLContext(sc)
sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@410438f6

scala> val df"/user1/a.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, loc: string, name: string]

|age|loc| name|
| 23|hyd|  shr|
| 24|del|kumar|

| name|
|  shr|

|(age + 1)|
|       24|
|       25|

scala> df.first()
res10: org.apache.spark.sql.Row = [23,hyd,shr]

scala> df.take(1)
res12: Array[org.apache.spark.sql.Row] = Array([23,hyd,shr])

scala> df.take(2)
res13: Array[org.apache.spark.sql.Row] = Array([23,hyd,shr], [24,del,kumar])

scala> df.sort().show()
|age|loc| name|
| 23|hyd|  shr|
| 24|del|kumar|

scala> df.filter(df("age")>23).show()
|age|loc| name|
| 24|del|kumar|

scala> df.describe().show()
|summary|               age|
|  count|                 2|
|   mean|              23.5|
| stddev|0.7071067811865476|
|    min|                23|
|    max|                24|

scala> df.agg(avg("age")).show()
|    23.5|

{"name":"kumar","age" :24,"loc":"del","sal":4000,"sex":"f"}
{"name":"aaaa","age" :24,"loc":"wgl","sal":5000,"sex":"m"}
{"name":"bbbb","age" :24,"loc":"del","sal":1000,"sex":"m"}
{"name":"ccccc","age" :24,"loc":"pune","sal":5000,"sex":"f"}
{"name":"ddddddd","age" :24,"loc":"del","sal":3000,"sex":"m"}
{"name":"eeeee","age" :24,"loc":"hyd","sal":2000,"sex":"f"}

[cloudera@quickstart ~]$ vi a.json
[cloudera@quickstart ~]$ cat a.json
{"name":"kumar","age" :24,"loc":"del","sal":4000,"sex":"f"}
{"name":"aaaa","age" :34,"loc":"wgl","sal":5000,"sex":"m"}
{"name":"bbbb","age" :25,"loc":"del","sal":1000,"sex":"m"}
{"name":"ccccc","age" :21,"loc":"pune","sal":5000,"sex":"f"}
{"name":"ddddddd","age" :44,"loc":"del","sal":3000,"sex":"m"}
{"name":"eeeee","age" :30,"loc":"hyd","sal":2000,"sex":"f"}

[cloudera@quickstart ~]$ hadoop fs -put a.json /user1/

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sql= new SQLContext(sc)
sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@60ae0f04

scala> val df"/user1/a.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, loc: string, name: string, sal: bigint, sex: string]

|age| loc|   name| sal|sex|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 25| del|   bbbb|1000|  m|
| 21|pune|  ccccc|5000|  f|
| 44| del|ddddddd|3000|  m|
| 30| hyd|  eeeee|2000|  f|

| 24|
| 34|
| 25|
| 21|
| 44|
| 30|

scala> df.filter(df("age")<30).show()
|age| loc| name| sal|sex|
| 24| del|kumar|4000|  f|
| 25| del| bbbb|1000|  m|
| 21|pune|ccccc|5000|  f|

scala> df.groupBy("sex").count().show()
|  f|    3|
|  m|    3|

scala> df.where(df("age")<25).show()
|age| loc| name| sal|sex|
| 24| del|kumar|4000|  f|
| 21|pune|ccccc|5000|  f|

scala> val e= df.toJSON
e: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[269] at toJSON at <console>:38

scala> e.collect()
res73: Array[String] = Array({"age":24,"loc":"del","name":"kumar","sal":4000,"sex":"f"}, {"age":34,"loc":"wgl","name":"aaaa","sal":5000,"sex":"m"}, {"age":25,"loc":"del","name":"bbbb","sal":1000,"sex":"m"}, {"age":21,"loc":"pune","name":"ccccc","sal":5000,"sex":"f"}, {"age":44,"loc":"del","name":"ddddddd","sal":3000,"sex":"m"}, {"age":30,"loc":"hyd","name":"eeeee","sal":2000,"sex":"f"})

scala> df.describe().show()
|summary|               age|               sal|
|  count|                 6|                 6|
|   mean|29.666666666666668|3333.3333333333335|
| stddev| 8.406346808612327|1632.9931618554522|
|    min|                21|              1000|
|    max|                44|              5000|

scala> df.agg(max("sal")).show()
|    5000|

scala> df.sort().show()
|age| loc|   name| sal|sex|
| 44| del|ddddddd|3000|  m|
| 30| hyd|  eeeee|2000|  f|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 25| del|   bbbb|1000|  m|
| 21|pune|  ccccc|5000|  f|

scala> df.sort("sal").show()
|age| loc|   name| sal|sex|
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 44| del|ddddddd|3000|  m|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 21|pune|  ccccc|5000|  f|

scala> df.take(2)
res62: Array[org.apache.spark.sql.Row] = Array([24,del,kumar,4000,f], [34,wgl,aaaa,5000,m])

scala> df.first()
res63: org.apache.spark.sql.Row = [24,del,kumar,4000,f]

scala> df.head(3)
res69: Array[org.apache.spark.sql.Row] = Array([24,del,kumar,4000,f], [34,wgl,aaaa,5000,m], [25,del,bbbb,1000,m])

scala> df.orderBy("age").show()
|age| loc|   name| sal|sex|
| 21|pune|  ccccc|5000|  f|
| 24| del|  kumar|4000|  f|
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 34| wgl|   aaaa|5000|  m|
| 44| del|ddddddd|3000|  m|

scala> df.orderBy("sal").show()
|age| loc|   name| sal|sex|
| 25| del|   bbbb|1000|  m|
| 30| hyd|  eeeee|2000|  f|
| 44| del|ddddddd|3000|  m|
| 24| del|  kumar|4000|  f|
| 34| wgl|   aaaa|5000|  m|
| 21|pune|  ccccc|5000|  f|