7. SPARK : COMMANDS : RDD
RDD can be created using below 2 options:
1. var frdd = sc.parallelize(1 to 100)
2. var frdd = sc.textFile("data/fruit.txt")
###################################################################
webconsole
spark2-shell
sc
//RDD: create RDD & Rdd operations ###########################################
var myfrdd = sc.parallelize(1 to 100)
myfrdd.partitions.size //2
var myfrdd = sc.parallelize(1 to 100, 3) //want to create rdd with 3 partitions
myfrdd.partitions.size //3
myfrdd.glom //Transformaiton, we get another RDD
myfrdd.glom.collect //processing happens
//it will show content of each partition
//1st partition = 1 to 33
//2nd partition = till 66
//3rd partition = till 100
glom -> is to display content in each partition.
//help command
myfrdd. //press tab
###########################################################################
//EVEN RDD : create even rdd ################################################################
//filter is Transformation and HOF
//We know that (RDD -> TRANSFORMATION -> RDD)
myfrdd.filter(x => x%2 == 0) //it will give another RDD
var erdd = myfrdd.filter(x => x%2 == 0) //just transformation and store that in another rdd
erdd.collect //now processing happens and give even result
erdd.partitions.size //3 it will maintain same 3 partitions.
erdd.glom.collect
###########################################################################
https://drive.google.com/drive/folders/1JaXdI37xCkYsJHiou5-WK3g03LuVSVLD
//READ hdfc file : and split based on " " #######################################
hdfs dfs -cat data/fruit.txt | head
sc.textFile("data/fruit.txt") //this will give rdd of string bez file has String
var frdd = sc.textFile("data/fruit.txt")
frdd.take(3) //take 1st 3 conent of rdd
we can get the file from below location and put it in to HDFS your folder:
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/fruit.txt .
hdfs dfs -put fruit.txt /user/pavankumarmadhu123_gmail
//Note if file is in user/pavankumarmadhu123_gmail/fruit.txt then we need to give sc.textFile("fruit.txt")
var ifrdd = frdd.fltMap(x => x.split(" ") ) //take rdd which each rdd has 1 fruit as content
ifrdd.take(5)
ifrdd.count //file has 57 million fruits and took 8 secs (in memory computation, distributed...)
ifrdd.countByValue
OR
//how scala is compact program : single line of code same as above
sc.textFile("data/fruit.txt").fltMap(x => x.split(" ") ).countByValue //not storing in any of the variable
##############################################################################
//REDUCE demo - (HOF) - passing anonymous function #####################################
var myfrdd = sc.parallelize( 1 to 10, 3)
myfrdd.reduce((x,y) => x + y)
//55
##########################################################################
//MULTIPLE RDDS : load data and create multiple rdds out of it
we can get the file from below location and put it in to HDFS your folder:
Get the file from : hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/sample.txt .
hdfs dfs -put sample.txt /user/pavankumarmadhu123_gmail
########################################
hdfs dfs -cat data/sample.txt | head
var frdd = sc.textFile("data/sample.txt") //returns rdd of string
//Note if file is in user/pavankumarmadhu123_gmail/sample.txt then we need to give sc.textFile("sample.txt")
//all the lines which are having "are"
var arerdd = frdd.filter(x => x.contains("are") )
//all the lines which are having "is"
var isrdd = frdd.filter(x => x.contains("is") )
frdd.count //2 crore
arerdd.count //1 crore 40 lakhs
isrdd.count //84 lakh
arerdd.union(isrdd) //Transformation will return rdd again
arerdd.union(isrdd).count //Processing 2 crore
EXPENSIVE operations (intersection & distinct)
------------------------------------------------------------
arerdd.intersection(isrdd) //Transformation will return rdd again
arerdd.intersection(isrdd).count //Processing 0
arerdd.distinct.collect //Procrssing 5
isrdd.distinct.collect //Processing 3
//means above it has to compare all the partitions and need to travel all - we call it as shuffeling
//DISTINCT : shuffle intensive operatoin -
//INTERSECTION : every record of one partition has to compare with another partion again its shuffelling
arerdd.subtract(isrdd) //records which are common between both rdds will be removed
arerdd.subtract(isrdd).count //since there is no common in this eg it will return arerdd count itself
################################################################################
Cartesian : //between cross products, even possible between 2 rdds###########################
----------------------
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).count //25
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).collect
##########################################################################################
//MAP :################################################################
val a = sc.pralellelize(1 to 10)
a.map(x => x*x).collect
a.flatMap(x => 1 to x).collect //1 12 123 1234 12345
//1,1,2,1,2,3,1,2,3,4,1,2,3,4,5.....
################################################################################################
Local setup
----------------
https://www.cloudera.com/downloads.html
Aggregate: (is a action and HOF - and it takes 2 functions as input)
#################################################################################
Reduce : Limitation : output format has to be same as input
--------------------------------
If Requirement : along with total- 15 want to get
tuple (15,5) (output, total)
input is integer
output is tuple
Aggregate:
--------------------------------
function1 - work on partitions and give the output
function2 - work on function 1 output
//a.aggregate((0,0))(function1, function2)
a.aggregate((0,0))((x,y) => (x._1 + y, x._2 +1), (x,y => (x._1 + y._1,x._2 + y._2) )
//(15,5)
####################################################################################
Pair RDD : ###########################################################################
Tuple rdd is a pair rdd
rdd of tuple is a pair rdd
eg: (2,5)
(2,5).getClass //Tuple2$mc...
val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
b.collect
b.map(x => (x._1,x._2*x._2)).collect //want to keep key same and squre of value
b.keys.collect //1 2 1 2 3
b.values.colect //2 3 5 5 4
############################################################################################
//ReduceByKey : (Transformation does same as Reduce but do key by key)###################
--------------------------
b.reduceByKey((x,y) => x + y) //tranformation
b.reduceByKey((x,y) => x + y).collect //(2,8) (1,7) (3,4) doing addition of value based on key
############################################################################
GroupByKey (Transormation) : #########################################################
-------------------
multiple aggregation operations: key, max, min, count ....
better to have it one place and do all above operation
b.groupByKey //transformation returns rdd of tuple
b.groupByKey.collect
b.groupByKey.map(x => (x._1, x_2.sum, x._2.max, x._2.min, x._2.size)).collect
###############################################################################################
CombineByKey (HOF & accept 3 functions) : #########################################################
------------------
Instead of tranferring every thing,
do local aggregation
and then do shuffle data
Every thing will be based on key as name tells - CombineByKey
Func1 : new key - work on value and return tuple (Value,1)
Func2 : work on partition
Func3 : output of func2 for all partition
var a = sc.parallelize(Array((1,3), (1,6), (1,8), (2,3), (2,10), (3,9)))
a.collect
a.combineByKey(x => (x,1), (x:(Int,Int), y) => (x._1+y, x._2+1), (x:(Int,Int), y:(Int,Int)) => (x._1 + y._1, x._2+y._2)).collect
var b = sc.parallelize(Array((1,3), (4,6), (4,8), (5,3), (5,10), (3,4)))
b.collect
a.union(b).collect //we get 12 combination
a.union(b).size //12
a.intersection(b).collect //we get 1 bez 1 is common between them
a.subtract(b).collect //5 components will come common will be removed
a.distinct.collect //shows unique
a.join(b).collect //inner join, based on key
//[Int, (Int, Int)]
//[key, tuple]
a.leftOuterJoin(b).collect
(Int, (Int, Option[Int]))
a.rightOuterJoin(b).collect
(Int, (Option[Int], Int))
###################
############################################################################
RDD can be created using below 2 options:
1. var frdd = sc.parallelize(1 to 100)
2. var frdd = sc.textFile("data/fruit.txt")
###################################################################
webconsole
spark2-shell
sc
//RDD: create RDD & Rdd operations ###########################################
var myfrdd = sc.parallelize(1 to 100)
myfrdd.partitions.size //2
var myfrdd = sc.parallelize(1 to 100, 3) //want to create rdd with 3 partitions
myfrdd.partitions.size //3
myfrdd.glom //Transformaiton, we get another RDD
myfrdd.glom.collect //processing happens
//it will show content of each partition
//1st partition = 1 to 33
//2nd partition = till 66
//3rd partition = till 100
glom -> is to display content in each partition.
//help command
myfrdd. //press tab
###########################################################################
//EVEN RDD : create even rdd ################################################################
//filter is Transformation and HOF
//We know that (RDD -> TRANSFORMATION -> RDD)
myfrdd.filter(x => x%2 == 0) //it will give another RDD
var erdd = myfrdd.filter(x => x%2 == 0) //just transformation and store that in another rdd
erdd.collect //now processing happens and give even result
erdd.partitions.size //3 it will maintain same 3 partitions.
erdd.glom.collect
###########################################################################
https://drive.google.com/drive/folders/1JaXdI37xCkYsJHiou5-WK3g03LuVSVLD
//READ hdfc file : and split based on " " #######################################
hdfs dfs -cat data/fruit.txt | head
sc.textFile("data/fruit.txt") //this will give rdd of string bez file has String
var frdd = sc.textFile("data/fruit.txt")
frdd.take(3) //take 1st 3 conent of rdd
we can get the file from below location and put it in to HDFS your folder:
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/fruit.txt .
hdfs dfs -put fruit.txt /user/pavankumarmadhu123_gmail
//Note if file is in user/pavankumarmadhu123_gmail/fruit.txt then we need to give sc.textFile("fruit.txt")
var ifrdd = frdd.fltMap(x => x.split(" ") ) //take rdd which each rdd has 1 fruit as content
ifrdd.take(5)
ifrdd.count //file has 57 million fruits and took 8 secs (in memory computation, distributed...)
ifrdd.countByValue
OR
//how scala is compact program : single line of code same as above
sc.textFile("data/fruit.txt").fltMap(x => x.split(" ") ).countByValue //not storing in any of the variable
##############################################################################
//REDUCE demo - (HOF) - passing anonymous function #####################################
var myfrdd = sc.parallelize( 1 to 10, 3)
myfrdd.reduce((x,y) => x + y)
//55
##########################################################################
//MULTIPLE RDDS : load data and create multiple rdds out of it
we can get the file from below location and put it in to HDFS your folder:
Get the file from : hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/sample.txt .
hdfs dfs -put sample.txt /user/pavankumarmadhu123_gmail
########################################
hdfs dfs -cat data/sample.txt | head
var frdd = sc.textFile("data/sample.txt") //returns rdd of string
//Note if file is in user/pavankumarmadhu123_gmail/sample.txt then we need to give sc.textFile("sample.txt")
//all the lines which are having "are"
var arerdd = frdd.filter(x => x.contains("are") )
//all the lines which are having "is"
var isrdd = frdd.filter(x => x.contains("is") )
frdd.count //2 crore
arerdd.count //1 crore 40 lakhs
isrdd.count //84 lakh
arerdd.union(isrdd) //Transformation will return rdd again
arerdd.union(isrdd).count //Processing 2 crore
EXPENSIVE operations (intersection & distinct)
------------------------------------------------------------
arerdd.intersection(isrdd) //Transformation will return rdd again
arerdd.intersection(isrdd).count //Processing 0
arerdd.distinct.collect //Procrssing 5
isrdd.distinct.collect //Processing 3
//means above it has to compare all the partitions and need to travel all - we call it as shuffeling
//DISTINCT : shuffle intensive operatoin -
//INTERSECTION : every record of one partition has to compare with another partion again its shuffelling
arerdd.subtract(isrdd) //records which are common between both rdds will be removed
arerdd.subtract(isrdd).count //since there is no common in this eg it will return arerdd count itself
################################################################################
Cartesian : //between cross products, even possible between 2 rdds###########################
----------------------
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).count //25
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).collect
##########################################################################################
//MAP :################################################################
val a = sc.pralellelize(1 to 10)
a.map(x => x*x).collect
a.flatMap(x => 1 to x).collect //1 12 123 1234 12345
//1,1,2,1,2,3,1,2,3,4,1,2,3,4,5.....
################################################################################################
Local setup
----------------
https://www.cloudera.com/downloads.html
Aggregate: (is a action and HOF - and it takes 2 functions as input)
#################################################################################
Reduce : Limitation : output format has to be same as input
--------------------------------
If Requirement : along with total- 15 want to get
tuple (15,5) (output, total)
input is integer
output is tuple
Aggregate:
--------------------------------
function1 - work on partitions and give the output
function2 - work on function 1 output
//a.aggregate((0,0))(function1, function2)
a.aggregate((0,0))((x,y) => (x._1 + y, x._2 +1), (x,y => (x._1 + y._1,x._2 + y._2) )
//(15,5)
####################################################################################
Pair RDD : ###########################################################################
Tuple rdd is a pair rdd
rdd of tuple is a pair rdd
eg: (2,5)
(2,5).getClass //Tuple2$mc...
val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
b.collect
b.map(x => (x._1,x._2*x._2)).collect //want to keep key same and squre of value
b.keys.collect //1 2 1 2 3
b.values.colect //2 3 5 5 4
############################################################################################
//ReduceByKey : (Transformation does same as Reduce but do key by key)###################
--------------------------
b.reduceByKey((x,y) => x + y) //tranformation
b.reduceByKey((x,y) => x + y).collect //(2,8) (1,7) (3,4) doing addition of value based on key
############################################################################
GroupByKey (Transormation) : #########################################################
-------------------
multiple aggregation operations: key, max, min, count ....
better to have it one place and do all above operation
b.groupByKey //transformation returns rdd of tuple
b.groupByKey.collect
b.groupByKey.map(x => (x._1, x_2.sum, x._2.max, x._2.min, x._2.size)).collect
###############################################################################################
CombineByKey (HOF & accept 3 functions) : #########################################################
------------------
Instead of tranferring every thing,
do local aggregation
and then do shuffle data
Every thing will be based on key as name tells - CombineByKey
Func1 : new key - work on value and return tuple (Value,1)
Func2 : work on partition
Func3 : output of func2 for all partition
var a = sc.parallelize(Array((1,3), (1,6), (1,8), (2,3), (2,10), (3,9)))
a.collect
a.combineByKey(x => (x,1), (x:(Int,Int), y) => (x._1+y, x._2+1), (x:(Int,Int), y:(Int,Int)) => (x._1 + y._1, x._2+y._2)).collect
var b = sc.parallelize(Array((1,3), (4,6), (4,8), (5,3), (5,10), (3,4)))
b.collect
a.union(b).collect //we get 12 combination
a.union(b).size //12
a.intersection(b).collect //we get 1 bez 1 is common between them
a.subtract(b).collect //5 components will come common will be removed
a.distinct.collect //shows unique
a.join(b).collect //inner join, based on key
//[Int, (Int, Int)]
//[key, tuple]
a.leftOuterJoin(b).collect
(Int, (Int, Option[Int]))
a.rightOuterJoin(b).collect
(Int, (Option[Int], Int))
###################
############################################################################














No comments:
Post a Comment