Tuesday, December 17, 2019

7. SPARK : COMMANDS : RDD

7. SPARK : COMMANDS : RDD


RDD can be created using below 2 options:

1. var frdd = sc.parallelize(1 to 100)

2. var frdd = sc.textFile("data/fruit.txt")

###################################################################

webconsole
spark2-shell
sc

//RDD: create RDD & Rdd operations ###########################################
var myfrdd = sc.parallelize(1 to 100)
myfrdd.partitions.size   //2
var myfrdd = sc.parallelize(1 to 100, 3)  //want to create  rdd with 3 partitions
myfrdd.partitions.size   //3
myfrdd.glom   //Transformaiton, we get another RDD
myfrdd.glom.collect  //processing happens 
    //it will show content of each partition
    //1st partition = 1 to 33
    //2nd partition = till 66
    //3rd partition = till 100

glom -> is to display content in each partition.




//help command
myfrdd.  //press tab
 ###########################################################################

//EVEN RDD : create even rdd ################################################################
//filter is Transformation and HOF
//We know that (RDD -> TRANSFORMATION -> RDD)
myfrdd.filter(x => x%2 == 0)   //it will give another RDD
var erdd = myfrdd.filter(x => x%2 == 0)  //just transformation and store that in another rdd
erdd.collect   //now processing happens and give even result

erdd.partitions.size  //3 it will maintain same 3 partitions.
erdd.glom.collect

###########################################################################


https://drive.google.com/drive/folders/1JaXdI37xCkYsJHiou5-WK3g03LuVSVLD

//READ hdfc file : and split based on " " #######################################
hdfs dfs -cat data/fruit.txt | head

sc.textFile("data/fruit.txt")  //this will give rdd of string bez file has String
var frdd = sc.textFile("data/fruit.txt")
frdd.take(3)  //take 1st 3 conent of rdd



we can get the file from below location and put it in to HDFS your folder:
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/fruit.txt .
hdfs dfs -put fruit.txt /user/pavankumarmadhu123_gmail

//Note if file is in user/pavankumarmadhu123_gmail/fruit.txt  then we need to give sc.textFile("fruit.txt")


var ifrdd = frdd.fltMap(x => x.split(" ") )  //take rdd which each rdd has 1 fruit as content
ifrdd.take(5)
ifrdd.count  //file has 57 million fruits and took 8 secs (in memory computation, distributed...)
ifrdd.countByValue

OR 

//how scala is compact program : single line of code same as above
sc.textFile("data/fruit.txt").fltMap(x => x.split(" ") ).countByValue  //not storing in any of the variable




##############################################################################

//REDUCE demo - (HOF) - passing anonymous function #####################################
var myfrdd = sc.parallelize( 1 to 10, 3)
myfrdd.reduce((x,y) => x + y)   
//55




##########################################################################

//MULTIPLE RDDS : load data and create multiple rdds out of  it
we can get the file from below location and put it in to HDFS your folder:
Get the file from : hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/sample.txt .
hdfs dfs -put sample.txt /user/pavankumarmadhu123_gmail
########################################
hdfs dfs -cat data/sample.txt | head
var frdd = sc.textFile("data/sample.txt")   //returns rdd of string
//Note if file is in user/pavankumarmadhu123_gmail/sample.txt  then we need to give sc.textFile("sample.txt")

//all the lines which are having "are"
var arerdd = frdd.filter(x => x.contains("are") )

//all the lines which are having "is"
var isrdd = frdd.filter(x => x.contains("is") )

frdd.count  //2 crore
arerdd.count  //1 crore 40 lakhs
isrdd.count  //84 lakh




arerdd.union(isrdd)  //Transformation will return rdd again
arerdd.union(isrdd).count   //Processing  2 crore

EXPENSIVE operations (intersection & distinct)
------------------------------------------------------------

arerdd.intersection(isrdd)  //Transformation will return rdd again
arerdd.intersection(isrdd).count    //Processing  0

arerdd.distinct.collect  //Procrssing 5
isrdd.distinct.collect   //Processing 3

//means above it has to compare all the partitions and need to travel all - we call it as shuffeling
//DISTINCT : shuffle intensive operatoin - 

//INTERSECTION : every record of one partition has to compare with another partion again its shuffelling






arerdd.subtract(isrdd)  //records which are common between both rdds will be removed
arerdd.subtract(isrdd).count  //since there is no common in this eg it will return arerdd count itself

################################################################################

Cartesian : //between cross products, even possible between 2 rdds###########################
----------------------
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).count   //25
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).collect




##########################################################################################

//MAP :################################################################

val a = sc.pralellelize(1 to 10)
a.map(x => x*x).collect

a.flatMap(x => 1 to x).collect   //1 12  123  1234  12345
//1,1,2,1,2,3,1,2,3,4,1,2,3,4,5.....




################################################################################################

Local setup
----------------
https://www.cloudera.com/downloads.html


Aggregate: (is a action and HOF - and it takes 2 functions as input)
#################################################################################


Reduce : Limitation : output format has to be same as input
--------------------------------
If Requirement : along with total- 15 want to get 
tuple (15,5)  (output, total)
input is integer
output is tuple

Aggregate: 
--------------------------------
function1 - work on partitions and give the output
function2 - work on function 1 output


//a.aggregate((0,0))(function1, function2)
a.aggregate((0,0))((x,y) => (x._1 + y, x._2 +1), (x,y => (x._1 + y._1,x._2 + y._2) )
//(15,5)




####################################################################################
Pair RDD ###########################################################################
Tuple rdd is a pair rdd
rdd of tuple is a pair rdd

eg: (2,5)
    (2,5).getClass   //Tuple2$mc...

val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
b.collect
b.map(x => (x._1,x._2*x._2)).collect  //want to keep key same and squre of value
b.keys.collect  //1 2 1 2 3
b.values.colect   //2 3 5 5 4

############################################################################################

//ReduceByKey : (Transformation does same as Reduce but do key by key)###################
--------------------------
b.reduceByKey((x,y) => x + y)   //tranformation
b.reduceByKey((x,y) => x + y).collect   //(2,8) (1,7) (3,4)  doing addition of value based on key




############################################################################


GroupByKey (Transormation) : #########################################################
-------------------
multiple aggregation operations: key, max, min, count ....
better to have it one place and do all above operation

b.groupByKey  //transformation returns rdd of tuple
b.groupByKey.collect





b.groupByKey.map(x => (x._1, x_2.sum, x._2.max, x._2.min, x._2.size)).collect




###############################################################################################

CombineByKey (HOF & accept 3 functions) : #########################################################
------------------
Instead of tranferring every thing, 
do local aggregation
and then do shuffle data 

Every thing will be based on key as name tells - CombineByKey

Func1 : new key - work on value and return tuple (Value,1)
Func2 : work on partition
Func3 : output of func2 for all partition


var a = sc.parallelize(Array((1,3), (1,6), (1,8), (2,3), (2,10), (3,9)))
a.collect
a.combineByKey(x => (x,1), (x:(Int,Int), y) => (x._1+y, x._2+1), (x:(Int,Int), y:(Int,Int)) => (x._1 + y._1, x._2+y._2)).collect





var b = sc.parallelize(Array((1,3), (4,6), (4,8), (5,3), (5,10), (3,4)))
b.collect


a.union(b).collect
  //we get 12 combination
a.union(b).size    //12

a.intersection(b).collect  //we get 1 bez 1 is common between them

a.subtract(b).collect   //5 components will come common will be removed

a.distinct.collect    //shows unique

a.join(b).collect    //inner join, based on key
//[Int, (Int, Int)]
//[key, tuple]

a.leftOuterJoin(b).collect
(Int, (Int, Option[Int]))

a.rightOuterJoin(b).collect
(Int, (Option[Int], Int))




###################
############################################################################



No comments:

Post a Comment