Spark with Scala
Thursday, January 9, 2020
Sunday, December 22, 2019
10. SPARK : LOCAL SETUP - Eclipse and Deploy JAR
10. SPARK : LOCAL SETUP - Eclipse and Deploy JAR
Env/LAB Setup
##################
Editor - Scala IDE:
Sacle IDE download -> http://scala-ide.org/download/sdk.htmlDownload for Windows version
Unzip under Spark_Scala/NewDev/scala-SDK-4.7.0-vfinal-2.12-win32.win32.x86_64
Ecliplse :
Spark_Scala/NewDev/scala-SDK-4.7.0-vfinal-2.12-win32.win32.x86_64/eclipse -> click on eclipse and open and give workspace
MAVEN : Integrate with Maven (Dependency manager)
Create folder spark_Scala/RepositoryDownload Maven : https://maven.apache.org/download.cgi
Unzip under Spark_Scala\Development\apache-maven-3.6.3-src
Create First Project :
Open Eclispse -> New -> Maven Project -> "Practice"Change Scala Compiler -> RtClk -> Properties -> Scala Compiler -> Latest 2.10.6
Maven settings file change -> provide Repository path where to download all jars -> apache-maven-3.6.3-src -> settings.xml -> then go back to Eclipse -> Window -> Preferences -> Maven -> User Settings -> Browse for Repository path -> Apply and Close
Maven pom.xml change -> Add spark core & spark sql dependencies
Eclipse -> Create "Scala Object" -> "Demo.FruitCount" -> Write code -> Rtclk -> Export -> JAR file -> venkat.jar
FTP -> upload "venkat.jar"
Webconsole -> spark-submit --class Demo.FruitCount Desktop/Jars/venkat.jar
9. SPARK : COMMANDS : DATAFRAME
Reading From CSV Files
Automatic Schema Inference
- Start spark shell
spark-shell --master local[1]2. Observe people_with_header.csv from Spark\Spark SQL\Labs\CSV. Upload this file to hdfs. Please check VM guide for more details
(base) [hands-on@localhost demos]$ hdfs dfs -put people_with_header.csv
(base) [hands-on@localhost demos]$ hdfs dfs -ls
-rw-r--r-- 1 hands-on hadoop people_with_header.csv3. Please execute the below commands one by one in the spark shell created in #1
val empcrm = spark.read.
format("csv"). // by default ,parquet. File will be loaded by assuming the parquet format and might fail if the file is not parquet
//format("com.databricks.spark.csv").
option("header", "true"). // otherwise, header will be considered as a record, it will add new header with names as _C1 etc ..
option("inferSchema", "true"). // If true, Spark will determine the column data types. Else , the "age" will be considered as string
load("people_with_header.csv")4. Describe the schema
scala> empcrm.show
+----------+---------+------+---+
|first_name|last_name|gender|age|
+----------+---------+------+---+
| Erin| Shannon| F| 42|
| Norman| Lockwood| M| 81|
| Miguel| Ruiz| M| 64|
| Rosalita| Ramirez| F| 14|
| Ally| Garcia| F| 39|
| Claire| McBride| F| 23|
| Abigail| Cottrell| F| 75|
| Jos| Rivera| M| 59|
| Ravi| Dasgupta| M| 25|
| Ravi| Shannon| F| 42|
| Shannon| McBride| F| 23|
| Abigail| McBride| F| 23|
+----------+---------+------+---+
scala> empcrm.printSchema
root
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- gender: string (nullable = true)
|-- age: integer (nullable = true)5. Retrieve the columns
scala> empcrm.select($"first_name", $"age").show(5)
+----------+---+
|first_name|age|
+----------+---+
| Erin| 42|
| Norman| 81|
| Miguel| 64|
| Rosalita| 14|
| Ally| 39|
+----------+---+
only showing top 5 rows
// aliases
scala> empcrm.select($"first_name",
| $"age",
| ($"age" > 60),
| ($"age" > 60).alias("older"),
| ($"age" + 10).alias("inc10")).show(5)
+----------+---+----------+-----+-----+
|first_name|age|(age > 60)|older|inc10|
+----------+---+----------+-----+-----+
| Erin| 42| false|false| 52|
| Norman| 81| true| true| 91|
| Miguel| 64| true| true| 74|
| Rosalita| 14| false|false| 24|
| Ally| 39| false|false| 49|
+----------+---+----------+-----+-----+
only showing top 5 rows6. Register temp table so that we can access using SQL queries.
// register temp table
scala> empcrm.createOrReplaceTempView("EMPCRM")
scala> spark.sql("SELECT first_name, age, age> 60 as older FROM EMPCRM ").show(false)
+----------+---+-----+
|first_name|age|older|
+----------+---+-----+
|Erin |42 |false|
|Norman |81 |true |
|Miguel |64 |true |
|Rosalita |14 |false|
|Ally |39 |false|
|Claire |23 |false|
|Abigail |75 |true |
|Jos |59 |false|
|Ravi |25 |false|
|Ravi |42 |false|
|Shannon |23 |false|
|Abigail |23 |false|
+----------+---+-----+
7. Filter the rows
//filter
//These are different syntactic sugar do the same thing.
scala> empcrm.filter($"age" > 60).select("first_name","age").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman |81 |
|Miguel |64 |
|Abigail |75 |
+----------+---+
//or
scala> empcrm.filter($"age" > 60).select($"first_name",$"age").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman |81 |
|Miguel |64 |
|Abigail |75 |
+----------+---+8. Rather filter by using SQL query
scala> spark.sql("SELECT first_name, age FROM EMPCRM WHERE age > 60").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman |81 |
|Miguel |64 |
|Abigail |75 |
+----------+---+9. Order by age ( asc)
scala> :paste
// Entering paste mode (ctrl-D to finish)
empcrm.filter(empcrm("age") > 49)
.select(empcrm("first_name"), empcrm("age"))
//.orderBy(empcrm("age"), empcrm("first_name"))//ascending order
.orderBy(empcrm("age"))
.show(false)
// Exiting paste mode, now interpreting.
+----------+---+
|first_name|age|
+----------+---+
|Jos |59 |
|Miguel |64 |
|Abigail |75 |
|Norman |81 |
+----------+---+
scala> :paste
// Entering paste mode (ctrl-D to finish)
empcrm.filter(empcrm("age") > 49)
.select(empcrm("first_name"), empcrm("age"))
//.orderBy(empcrm("age"), empcrm("first_name"))//descending order
.orderBy(empcrm("age").desc)
.show(false)
// Exiting paste mode, now interpreting.
+----------+---+
|first_name|age|
+----------+---+
|Norman |81 |
|Abigail |75 |
|Miguel |64 |
|Jos |59 |
+----------+---+10. groupby age
scala> empcrm.groupBy($"age").count().show // creates a structure with (age,count)
+---+-----+
|age|count|
+---+-----+
| 81| 1|
| 64| 1|
| 59| 1|
| 39| 1|
| 23| 3|
| 25| 1|
| 75| 1|
| 14| 1|
| 42| 2|
+---+-----+11. Group by using SQL query
scala> spark.sql("SELECT age, count(age) FROM EMPCRM GROUP BY age").show
+---+----------+
|age|count(age)|
+---+----------+
| 81| 1|
| 64| 1|
| 59| 1|
| 39| 1|
| 23| 3|
| 25| 1|
| 75| 1|
| 14| 1|
| 42| 2|
+---+----------+12. Write the results to hdfs in parquet format
//finally store the results in parquet ( hadoop popular storage format)
scala> empcrm.write.parquet("empcrm")
//execute the following command in a different shell
(base) [hands-on@localhost demos]$ hdfs dfs -ls empcrm
-rw-r--r-- 1 hands-on hadoop 07:05 empcrm/_SUCCESS
-rw-r--r-- 1 hands-on hadoop empcrm/part-00000-227ffb4a-242b-4e42-9c29-cdf43830484d-c000.snappy.parquet13. Read from parquet files
scala> val parquetempcrm = spark.read.parquet("empcrm")
parquetempcrm: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string ... 2 more fields]
scala> parquetempcrm.show
+----------+---------+------+---+
|first_name|last_name|gender|age|
+----------+---------+------+---+
| Erin| Shannon| F| 42|
| Norman| Lockwood| M| 81|
| Miguel| Ruiz| M| 64|
| Rosalita| Ramirez| F| 14|
| Ally| Garcia| F| 39|
| Claire| McBride| F| 23|
| Abigail| Cottrell| F| 75|
| Jos| Rivera| M| 59|
| Ravi| Dasgupta| M| 25|
| Ravi| Shannon| F| 42|
| Shannon| McBride| F| 23|
| Abigail| McBride| F| 23|
+----------+---------+------+---+Schema Inference programmatically
- Start spark shell as above.
spark-shell --master local[1]
- Define the schema . Enter the command with (:paste) mode or execute the command highlighted in bold.
scala> :paste // Entering paste mode (ctrl-D to finish) // schema inference programmtically val schema = "first_name STRING, last_name STRING, gender STRING , age INT" // please be careful in giving the schema in the same order as we expect it to be val empcrm = spark.read .format("csv") // by default ,parquet. File will be loaded by assuming the parquet format and might fail if the file is not parquet .option("header", "true") // otherwise, header will be considered as a record, it will add new header with names as _C1 etc .. //option("inferSchema", "true"). // If true, Spark will determine the column data types. Else , the "age" will be considered as string .schema(schema) .load("people_with_header.csv") // Exiting paste mode, now interpreting. schema: String = first_name STRING, last_name STRING, gender STRING , age INT empcrm: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string ... 2 more fields]
- Repeat the above commands as above from step #4.
scala> empcrm.show +----------+---------+------+---+ |first_name|last_name|gender|age| +----------+---------+------+---+ | Erin| Shannon| F| 42| | Norman| Lockwood| M| 81| | Miguel| Ruiz| M| 64| | Rosalita| Ramirez| F| 14| | Ally| Garcia| F| 39| | Claire| McBride| F| 23| | Abigail| Cottrell| F| 75| | Jos| Rivera| M| 59| | Ravi| Dasgupta| M| 25| | Ravi| Shannon| F| 42| | Shannon| McBride| F| 23| | Abigail| McBride| F| 23| +----------+---------+------+---+
Reading From Text Files
Automatic Schema Inference
- start spark shell
spark-shell --master local[*]
- observe
customers.txtfromSpark SQL\Labs\TextFile. This file is already available in the hdfs.(base) [hands-on@localhost demos]$ hdfs dfs -put customers.txt (base) [hands-on@localhost demos]$ hdfs dfs -ls Found 4 items drwxr-xr-x - hands-on hadoop .sparkStaging -rw-r--r-- 1 hands-on hadoop customers.txt -rw-r--r-- 1 hands-on hadoop hamlet.txt -rw-r--r-- 1 hands-on hadoop u.data
- Create the SQLContext first from the existing Spark Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- Run the following commands
// Import statement to implicitly convert an RDD to a DataFrame ( as the text file didnot have a structure) import sqlContext.implicits._ // Create a custom class to represent the Customer case class Customer(customer_id: Int, name: String,city: String, state: String, zip_code: String) // Create a DataFrame of Customer objects from the data set text file. val dfCustomers = sc.textFile("customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt,p(1), p(2), p(3), p(4))).toDF() dfCustomers.show // Register DataFrame as a table. dfCustomers.registerTempTable("customers") // Display the content of DataFrame dfCustomers.show() // Print the DF schema dfCustomers.printSchema() // Select customer name column dfCustomers.select("name").show() // Select customer name and city columns dfCustomers.select("name", "city").show() // Select a customer by id dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show() // Count the customers by zip code dfCustomers.groupBy("zip_code").count().show() scala> import sqlContext.implicits._ import sqlContext.implicits._ scala> case class Customer(customer_id: Int, name: String,city: String, state: String, zip_code: String) defined class Customer scala> val dfCustomers = sc.textFile("customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt,p(1), p(2), p(3), p(4))).toDF() dfCustomers: org.apache.spark.sql.DataFrame = [customer_id: int, name: string ... 3 more fields] scala> dfCustomers.show +-----------+---------------+------------+-----+--------+ |customer_id| name| city|state|zip_code| +-----------+---------------+------------+-----+--------+ | 100| John Smith| Austin| TX| 78727| | 200| Joe Johnson| Dallas| TX| 75201| | 300| Bob Jones| Houston| TX| 77028| | 400| Andy Davis| San Antonio| TX| 78227| | 500| James Williams| Austin| TX| 78727| +-----------+---------------+------------+-----+--------+ scala> dfCustomers.registerTempTable("customers") warning: there was one deprecation warning; re-run with -deprecation for details scala> dfCustomers.show() +-----------+---------------+------------+-----+--------+ |customer_id| name| city|state|zip_code| +-----------+---------------+------------+-----+--------+ | 100| John Smith| Austin| TX| 78727| | 200| Joe Johnson| Dallas| TX| 75201| | 300| Bob Jones| Houston| TX| 77028| | 400| Andy Davis| San Antonio| TX| 78227| | 500| James Williams| Austin| TX| 78727| +-----------+---------------+------------+-----+--------+ scala> dfCustomers.printSchema() root |-- customer_id: integer (nullable = false) |-- name: string (nullable = true) |-- city: string (nullable = true) |-- state: string (nullable = true) |-- zip_code: string (nullable = true) scala> dfCustomers.select("name").show() +---------------+ | name| +---------------+ | John Smith| | Joe Johnson| | Bob Jones| | Andy Davis| | James Williams| +---------------+ scala> dfCustomers.select("name", "city").show() +---------------+------------+ | name| city| +---------------+------------+ | John Smith| Austin| | Joe Johnson| Dallas| | Bob Jones| Houston| | Andy Davis| San Antonio| | James Williams| Austin| +---------------+------------+ scala> dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show() +-----------+---------------+-------+-----+--------+ |customer_id| name| city|state|zip_code| +-----------+---------------+-------+-----+--------+ | 500| James Williams| Austin| TX| 78727| +-----------+---------------+-------+-----+--------+ scala> dfCustomers.groupBy("zip_code").count().show() +--------+-----+ |zip_code|count| +--------+-----+ | 75201| 1| | 78227| 1| | 78727| 2| | 77028| 1| +--------+-----+ scala>
Reading from file with the schema ( manual - Programmatically Specifying the Schema)
In the above example, the schema is inferred using the reflection. We can
also programmatically specify the schema of the dataset. This is useful
when the custom classes cannot be defined ahead of time because the
structure of data is encoded in a string.
Following code example shows how to specify the schema using the new
data type classes StructType, StringType, and StructField.
- Repeat the steps #1, #2, #3 if not already.
- Execute the commands
// // Programmatically Specifying the Schema // // Create SQLContext from the existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create an RDD val rddCustomers = sc.textFile("customers.txt") // The schema is encoded in a string val schemaString = "customer_id name city state zip_code" // Import Spark SQL data types and Row. import org.apache.spark.sql._ import org.apache.spark.sql.types._ // Generate the schema based on the string of schema val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (rddCustomers) to Rows. val rowRDD = rddCustomers.map(_.split(",")).map(p =>Row(p(0).trim,p(1),p(2),p(3),p(4))) // Apply the schema to the RDD. val dfCustomers = sqlContext.createDataFrame(rowRDD,schema) // Register the DataFrames as a table. dfCustomers.registerTempTable("customers") // SQL statements can be run by using the sql methods against the "customers" table provided by sqlContext. val custNames = sqlContext.sql("SELECT name FROM customers") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. custNames.map(t => "Name: " + t(0)).collect().foreach(println) // SQL statements can be run by using the sql methods provided by sqlContext. val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println) scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) warning: there was one deprecation warning; re-run with -deprecation for details sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@41d1bea3 scala> val rddCustomers = sc.textFile("customers.txt") rddCustomers: org.apache.spark.rdd.RDD[String] = customers.txt MapPartitionsRDD[39] at textFile at <console>:30 scala> rddCustomers.collect res19: Array[String] = Array(100, John Smith, Austin, TX, 78727, 200, Joe Johnson, Dallas, TX, 75201, 300, Bob Jones, Houston, TX, 77028, 400, Andy Davis, San Antonio, TX, 78227, 500, James Williams, Austin, TX, 78727) scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) warning: there was one deprecation warning; re-run with -deprecation for details sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@57484e17 scala> val rddCustomers = sc.textFile("customers.txt") rddCustomers: org.apache.spark.rdd.RDD[String] = customers.txt MapPartitionsRDD[41] at textFile at <console>:30 scala> val schemaString = "customer_id name city state zip_code" schemaString: String = customer_id name city state zip_code scala> import org.apache.spark.sql._ import org.apache.spark.sql._ scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) schema: org.apache.spark.sql.types.StructType = StructType(StructField(customer_id,StringType,true), StructField(name,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(zip_code,StringType,true)) scala> val rowRDD = rddCustomers.map(_.split(",")).map(p =>Row(p(0).trim,p(1),p(2),p(3),p(4))) rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[43] at map at <console>:37 scala> val dfCustomers = sqlContext.createDataFrame(rowRDD,schema) dfCustomers: org.apache.spark.sql.DataFrame = [customer_id: string, name: string ... 3 more fields] scala> dfCustomers.registerTempTable("customers") warning: there was one deprecation warning; re-run with -deprecation for details scala> val custNames = sqlContext.sql("SELECT name FROM customers") custNames: org.apache.spark.sql.DataFrame = [name: string] scala> custNames.map(t => "Name: " + t(0)).collect().foreach(println) Name: John Smith Name: Joe Johnson Name: Bob Jones Name: Andy Davis Name: James Williams scala> val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code") customersByCity: org.apache.spark.sql.DataFrame = [name: string, zip_code: string] scala> customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println) Joe Johnson, 75201 Bob Jones, 77028 Andy Davis, 78227 John Smith, 78727 James Williams, 78727
With Databricks
- Create the Cluster.
Select the runtime as 6.6 and give a cluster name. Finally , click on Create Cluster.
confirm by clicking "clusters" page
- upload the file. upload file "people_with_header.csv" , it will be uploaded to the path "/FileStore/tables/people_with_header.csv" ( please make note of the path).Click on "Data" , and then "Add Data" . Drop the file "people_with_header.csv" in "Drop files to upload"
- Import sparkSQL Demo.html . Click on "workspace" and import from there.
8. SPARK : COMMANDS : PRACTICE LAB : RDD
8. SPARK : COMMANDS : PRACTICE LAB : RDD
PRACTICE 1: Combine 2 RDD and perform logic on that
logic is : if right table content is there then return or else print -1Create 2 RDD & then perform on that
-------------------------------------------
val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
val c = List((3,7),(3,4),(4,9),(5,12))
val d = sc.parallelize(c)
b.collect
d.collect
b.leftOuterJoin(d)
<Int, (Int, Option[Int]))]
b.leftOuterJoin(d).collect
(2,(3, None)
b.leftOuterJoin(d).map{
x =>
x._2._2 match{
case Some(rightTableContent) =>
{
(x_1, (x._2._1,rightTableContent))
{
}
}
case None =>
{
(x_1, (x._2._1,-1))
}
}.collect
PRACTICE 2: Want to process the serverlogs
Requirement:
- client has 40 - 50 thousend servers
- want monitor for servers and only on Error and source of error
- Error from java application or anywhere
Log -> Info/Warn/Error
If Error -> Source of error
Anaylyze:
var slrdd = sc.textFile("server log path")
var erdd = slrdd.filter(Error)
erdd.cache //Transformation
var jerdd = erdd.filter(javaError)
jerdd.count //perform DAG cycle: (slrdd -> erdd - cache - jerdd -> count)
var sqlerdd = erdd.filter(SQLError)
sqlerdd.count //perform DAG cycle: (erdd - sqlerdd -> count)
######################################################################
hdfs dfs -cat data/serverlogs.log | head
hdfs dfs -cat data/serverlogs.log | grep -i warn head
hdfs dfs -cat data/serverlogs.log | grep -i error head
var slrdd = sc.textFile("data/serverlogs.log")
var erdd = slrdd.filter(x => x.startsWith("Error"))
erdd.cache
var jerdd = erdd.filter(x => x.split(":")(1).equals("Java"))
sc.setLogLevel("info")
jerdd.count //52 Million and took 44 secs
slrdd.count //total it has 253 Million records
hdfs dfs -ls data/serverlogs.log
//file size is : 17664 MB ( 17 GB)
//(0 + 0) / 138
//file will be divided in to 138 blocks of each bolck - 128 MB
var sqlerdd = erdd.filter(x => x.split(":")(1).equals("Sql"))
sqlerdd .count //12 secs
//file is kept below
data_scientist_sarvesh_gm83dc@ip-10-0-1-10 - data/serverlogs.log
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/serverlogs.log .
######################################################################
Error can be from anywhere in serverlogs
OS
DB
Application
PRACTICE 3 :Advance performance improvement
var c = List((3,7), (3,4), (4,9), (5,12))
var d = sc.parallelize(c)
d.partitioner //None - Hash partitioning is not done
d.partitions.size //29, this is different than above hash-partitioner
//How to do hash partitioning
var e = d.partitionBy(new org.apache.spark.HashPartitioner(3))
e.partitioner //Some(org.apache.spark.HashPartitioner@3)
//caching is important here
//MAP : will not keep parent partitioning information
//apply trnsformation like map, resulting RDD will not keep parent partitioning informationvar f = e.map(x => x) //VERY IMP : here 'e' is hashPartitioned but when we apply map f will not have Hash partitioning info.
f.partitioner //None (NOT MAINTAINED)
e.map(x => x._2)
e.map(x => (x_1+1, x._2))
//MAPVALUE : will maintain parent partitioning info
//solution to above problem is : mapValuevar g = e.mapValues(x => x + 1)) //it will maintain the partitioning information, means e is hash-partitioned and even g also will have hash-partitioned
g.partitioner //Some(org.apache.spark.HashPartitioner@3)
Tuesday, December 17, 2019
7. SPARK : COMMANDS : RDD
RDD can be created using below 2 options:
1. var frdd = sc.parallelize(1 to 100)
2. var frdd = sc.textFile("data/fruit.txt")
###################################################################
webconsole
spark2-shell
sc
//RDD: create RDD & Rdd operations ###########################################
var myfrdd = sc.parallelize(1 to 100)
myfrdd.partitions.size //2
var myfrdd = sc.parallelize(1 to 100, 3) //want to create rdd with 3 partitions
myfrdd.partitions.size //3
myfrdd.glom //Transformaiton, we get another RDD
myfrdd.glom.collect //processing happens
//it will show content of each partition
//1st partition = 1 to 33
//2nd partition = till 66
//3rd partition = till 100
glom -> is to display content in each partition.
//help command
myfrdd. //press tab
###########################################################################
//EVEN RDD : create even rdd ################################################################
//filter is Transformation and HOF
//We know that (RDD -> TRANSFORMATION -> RDD)
myfrdd.filter(x => x%2 == 0) //it will give another RDD
var erdd = myfrdd.filter(x => x%2 == 0) //just transformation and store that in another rdd
erdd.collect //now processing happens and give even result
erdd.partitions.size //3 it will maintain same 3 partitions.
erdd.glom.collect
###########################################################################
https://drive.google.com/drive/folders/1JaXdI37xCkYsJHiou5-WK3g03LuVSVLD
//READ hdfc file : and split based on " " #######################################
hdfs dfs -cat data/fruit.txt | head
sc.textFile("data/fruit.txt") //this will give rdd of string bez file has String
var frdd = sc.textFile("data/fruit.txt")
frdd.take(3) //take 1st 3 conent of rdd
we can get the file from below location and put it in to HDFS your folder:
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/fruit.txt .
hdfs dfs -put fruit.txt /user/pavankumarmadhu123_gmail
//Note if file is in user/pavankumarmadhu123_gmail/fruit.txt then we need to give sc.textFile("fruit.txt")
var ifrdd = frdd.fltMap(x => x.split(" ") ) //take rdd which each rdd has 1 fruit as content
ifrdd.take(5)
ifrdd.count //file has 57 million fruits and took 8 secs (in memory computation, distributed...)
ifrdd.countByValue
OR
//how scala is compact program : single line of code same as above
sc.textFile("data/fruit.txt").fltMap(x => x.split(" ") ).countByValue //not storing in any of the variable
##############################################################################
//REDUCE demo - (HOF) - passing anonymous function #####################################
var myfrdd = sc.parallelize( 1 to 10, 3)
myfrdd.reduce((x,y) => x + y)
//55
##########################################################################
//MULTIPLE RDDS : load data and create multiple rdds out of it
we can get the file from below location and put it in to HDFS your folder:
Get the file from : hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/sample.txt .
hdfs dfs -put sample.txt /user/pavankumarmadhu123_gmail
########################################
hdfs dfs -cat data/sample.txt | head
var frdd = sc.textFile("data/sample.txt") //returns rdd of string
//Note if file is in user/pavankumarmadhu123_gmail/sample.txt then we need to give sc.textFile("sample.txt")
//all the lines which are having "are"
var arerdd = frdd.filter(x => x.contains("are") )
//all the lines which are having "is"
var isrdd = frdd.filter(x => x.contains("is") )
frdd.count //2 crore
arerdd.count //1 crore 40 lakhs
isrdd.count //84 lakh
arerdd.union(isrdd) //Transformation will return rdd again
arerdd.union(isrdd).count //Processing 2 crore
EXPENSIVE operations (intersection & distinct)
------------------------------------------------------------
arerdd.intersection(isrdd) //Transformation will return rdd again
arerdd.intersection(isrdd).count //Processing 0
arerdd.distinct.collect //Procrssing 5
isrdd.distinct.collect //Processing 3
//means above it has to compare all the partitions and need to travel all - we call it as shuffeling
//DISTINCT : shuffle intensive operatoin -
//INTERSECTION : every record of one partition has to compare with another partion again its shuffelling
arerdd.subtract(isrdd) //records which are common between both rdds will be removed
arerdd.subtract(isrdd).count //since there is no common in this eg it will return arerdd count itself
################################################################################
Cartesian : //between cross products, even possible between 2 rdds###########################
----------------------
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).count //25
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).collect
##########################################################################################
//MAP :################################################################
val a = sc.pralellelize(1 to 10)
a.map(x => x*x).collect
a.flatMap(x => 1 to x).collect //1 12 123 1234 12345
//1,1,2,1,2,3,1,2,3,4,1,2,3,4,5.....
################################################################################################
Local setup
----------------
https://www.cloudera.com/downloads.html
Aggregate: (is a action and HOF - and it takes 2 functions as input)
#################################################################################
Reduce : Limitation : output format has to be same as input
--------------------------------
If Requirement : along with total- 15 want to get
tuple (15,5) (output, total)
input is integer
output is tuple
Aggregate:
--------------------------------
function1 - work on partitions and give the output
function2 - work on function 1 output
//a.aggregate((0,0))(function1, function2)
a.aggregate((0,0))((x,y) => (x._1 + y, x._2 +1), (x,y => (x._1 + y._1,x._2 + y._2) )
//(15,5)
####################################################################################
Pair RDD : ###########################################################################
Tuple rdd is a pair rdd
rdd of tuple is a pair rdd
eg: (2,5)
(2,5).getClass //Tuple2$mc...
val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
b.collect
b.map(x => (x._1,x._2*x._2)).collect //want to keep key same and squre of value
b.keys.collect //1 2 1 2 3
b.values.colect //2 3 5 5 4
############################################################################################
//ReduceByKey : (Transformation does same as Reduce but do key by key)###################
--------------------------
b.reduceByKey((x,y) => x + y) //tranformation
b.reduceByKey((x,y) => x + y).collect //(2,8) (1,7) (3,4) doing addition of value based on key
############################################################################
GroupByKey (Transormation) : #########################################################
-------------------
multiple aggregation operations: key, max, min, count ....
better to have it one place and do all above operation
b.groupByKey //transformation returns rdd of tuple
b.groupByKey.collect
b.groupByKey.map(x => (x._1, x_2.sum, x._2.max, x._2.min, x._2.size)).collect
###############################################################################################
CombineByKey (HOF & accept 3 functions) : #########################################################
------------------
Instead of tranferring every thing,
do local aggregation
and then do shuffle data
Every thing will be based on key as name tells - CombineByKey
Func1 : new key - work on value and return tuple (Value,1)
Func2 : work on partition
Func3 : output of func2 for all partition
var a = sc.parallelize(Array((1,3), (1,6), (1,8), (2,3), (2,10), (3,9)))
a.collect
a.combineByKey(x => (x,1), (x:(Int,Int), y) => (x._1+y, x._2+1), (x:(Int,Int), y:(Int,Int)) => (x._1 + y._1, x._2+y._2)).collect
var b = sc.parallelize(Array((1,3), (4,6), (4,8), (5,3), (5,10), (3,4)))
b.collect
a.union(b).collect //we get 12 combination
a.union(b).size //12
a.intersection(b).collect //we get 1 bez 1 is common between them
a.subtract(b).collect //5 components will come common will be removed
a.distinct.collect //shows unique
a.join(b).collect //inner join, based on key
//[Int, (Int, Int)]
//[key, tuple]
a.leftOuterJoin(b).collect
(Int, (Int, Option[Int]))
a.rightOuterJoin(b).collect
(Int, (Option[Int], Int))
###################
############################################################################


































