Sunday, December 22, 2019

10. SPARK : LOCAL SETUP - Eclipse and Deploy JAR

10. SPARK : LOCAL SETUP - Eclipse and Deploy JAR



Env/LAB Setup
##################

Editor - Scala IDE:

Sacle IDE download -> http://scala-ide.org/download/sdk.html
Download for Windows version
Unzip under Spark_Scala/NewDev/scala-SDK-4.7.0-vfinal-2.12-win32.win32.x86_64

Ecliplse :
Spark_Scala/NewDev/scala-SDK-4.7.0-vfinal-2.12-win32.win32.x86_64/eclipse -> click on eclipse and open and give workspace



MAVEN : Integrate with Maven (Dependency manager)

Create folder spark_Scala/Repository
Download Maven : https://maven.apache.org/download.cgi
Unzip under Spark_Scala\Development\apache-maven-3.6.3-src



Create First Project :

Open Eclispse -> New -> Maven Project -> "Practice"

        Change Scala Compiler -> RtClk -> Properties -> Scala Compiler ->  Latest 2.10.6

        Maven settings file change -> provide Repository path where to download all jars -> apache-maven-3.6.3-src -> settings.xml -> then go back to Eclipse -> Window -> Preferences -> Maven -> User Settings -> Browse for Repository path -> Apply and Close

        Maven pom.xml change -> Add spark core & spark sql dependencies
     
        Eclipse -> Create "Scala Object" -> "Demo.FruitCount" -> Write code -> Rtclk -> Export -> JAR file -> venkat.jar

        FTP -> upload "venkat.jar"

        Webconsole -> spark-submit --class Demo.FruitCount Desktop/Jars/venkat.jar
























9. SPARK : COMMANDS : DATAFRAME

9. SPARK : COMMANDS : DATAFRAME
 

Reading From CSV Files

Automatic Schema Inference

  1. Start spark shell
spark-shell --master local[1]

2. Observe people_with_header.csv from Spark\Spark SQL\Labs\CSV. Upload this file to hdfs. Please check VM guide for more details

(base) [hands-on@localhost demos]$ hdfs dfs -put people_with_header.csv
(base) [hands-on@localhost demos]$ hdfs dfs -ls
-rw-r--r--   1 hands-on hadoop       people_with_header.csv

3. Please execute the below commands one by one in the spark shell created in #1

val empcrm = spark.read.
                     format("csv"). // by default ,parquet. File will be loaded by assuming the parquet format and might fail if the file is not parquet
                          //format("com.databricks.spark.csv").
                          option("header", "true"). // otherwise, header will be considered as a record, it will add new header with names as _C1 etc ..
                          option("inferSchema", "true"). // If true, Spark will determine the column data types. Else , the "age" will be considered as string
                          load("people_with_header.csv")

4. Describe the schema

scala> empcrm.show
+----------+---------+------+---+
|first_name|last_name|gender|age|
+----------+---------+------+---+
|      Erin|  Shannon|     F| 42|
|    Norman| Lockwood|     M| 81|
|    Miguel|     Ruiz|     M| 64|
|  Rosalita|  Ramirez|     F| 14|
|      Ally|   Garcia|     F| 39|
|    Claire|  McBride|     F| 23|
|   Abigail| Cottrell|     F| 75|
|       Jos|   Rivera|     M| 59|
|      Ravi| Dasgupta|     M| 25|
|      Ravi|  Shannon|     F| 42|
|   Shannon|  McBride|     F| 23|
|   Abigail|  McBride|     F| 23|
+----------+---------+------+---+


scala> empcrm.printSchema
root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)

5. Retrieve the columns

scala> empcrm.select($"first_name", $"age").show(5)
+----------+---+
|first_name|age|
+----------+---+
|      Erin| 42|
|    Norman| 81|
|    Miguel| 64|
|  Rosalita| 14|
|      Ally| 39|
+----------+---+
only showing top 5 rows

// aliases

scala> empcrm.select($"first_name",
     | $"age",
     | ($"age" > 60),
     |   ($"age" > 60).alias("older"),
     | ($"age" + 10).alias("inc10")).show(5)
+----------+---+----------+-----+-----+
|first_name|age|(age > 60)|older|inc10|
+----------+---+----------+-----+-----+
|      Erin| 42|     false|false|   52|
|    Norman| 81|      true| true|   91|
|    Miguel| 64|      true| true|   74|
|  Rosalita| 14|     false|false|   24|
|      Ally| 39|     false|false|   49|
+----------+---+----------+-----+-----+
only showing top 5 rows

6. Register temp table so that we can access using SQL queries.

// register temp table
scala> empcrm.createOrReplaceTempView("EMPCRM")

scala> spark.sql("SELECT first_name, age, age> 60 as older FROM EMPCRM ").show(false)
+----------+---+-----+
|first_name|age|older|
+----------+---+-----+
|Erin      |42 |false|
|Norman    |81 |true |
|Miguel    |64 |true |
|Rosalita  |14 |false|
|Ally      |39 |false|
|Claire    |23 |false|
|Abigail   |75 |true |
|Jos       |59 |false|
|Ravi      |25 |false|
|Ravi      |42 |false|
|Shannon   |23 |false|
|Abigail   |23 |false|
+----------+---+-----+


7. Filter the rows

//filter
//These are different syntactic sugar do the same thing.
scala> empcrm.filter($"age" > 60).select("first_name","age").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman    |81 |
|Miguel    |64 |
|Abigail   |75 |
+----------+---+

//or

scala> empcrm.filter($"age" > 60).select($"first_name",$"age").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman    |81 |
|Miguel    |64 |
|Abigail   |75 |
+----------+---+

8. Rather filter by using SQL query

scala> spark.sql("SELECT first_name, age FROM EMPCRM WHERE age > 60").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman    |81 |
|Miguel    |64 |
|Abigail   |75 |
+----------+---+

9. Order by age ( asc)

scala> :paste
// Entering paste mode (ctrl-D to finish)

empcrm.filter(empcrm("age") > 49)
  .select(empcrm("first_name"), empcrm("age"))
  //.orderBy(empcrm("age"), empcrm("first_name"))//ascending order
  .orderBy(empcrm("age"))
  .show(false)

// Exiting paste mode, now interpreting.

+----------+---+
|first_name|age|
+----------+---+
|Jos       |59 |
|Miguel    |64 |
|Abigail   |75 |
|Norman    |81 |
+----------+---+

scala> :paste
// Entering paste mode (ctrl-D to finish)

empcrm.filter(empcrm("age") > 49)
  .select(empcrm("first_name"), empcrm("age"))
  //.orderBy(empcrm("age"), empcrm("first_name"))//descending order
  .orderBy(empcrm("age").desc)
  .show(false)

// Exiting paste mode, now interpreting.

+----------+---+
|first_name|age|
+----------+---+
|Norman    |81 |
|Abigail   |75 |
|Miguel    |64 |
|Jos       |59 |
+----------+---+

10. groupby age

scala> empcrm.groupBy($"age").count().show // creates a structure with (age,count)
+---+-----+
|age|count|
+---+-----+
| 81|    1|
| 64|    1|
| 59|    1|
| 39|    1|
| 23|    3|
| 25|    1|
| 75|    1|
| 14|    1|
| 42|    2|
+---+-----+

11. Group by using SQL query

scala> spark.sql("SELECT age, count(age) FROM EMPCRM  GROUP BY age").show
+---+----------+
|age|count(age)|
+---+----------+
| 81|         1|
| 64|         1|
| 59|         1|
| 39|         1|
| 23|         3|
| 25|         1|
| 75|         1|
| 14|         1|
| 42|         2|
+---+----------+

12. Write the results to hdfs in parquet format

//finally store the results in parquet ( hadoop popular storage format)
scala> empcrm.write.parquet("empcrm")

//execute the following command in  a different shell
(base) [hands-on@localhost demos]$ hdfs dfs -ls empcrm

-rw-r--r--   1 hands-on hadoop         07:05 empcrm/_SUCCESS
-rw-r--r--   1 hands-on hadoop        empcrm/part-00000-227ffb4a-242b-4e42-9c29-cdf43830484d-c000.snappy.parquet

13. Read from parquet files

scala> val parquetempcrm = spark.read.parquet("empcrm")
parquetempcrm: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string ... 2 more fields]

scala> parquetempcrm.show
+----------+---------+------+---+
|first_name|last_name|gender|age|
+----------+---------+------+---+
|      Erin|  Shannon|     F| 42|
|    Norman| Lockwood|     M| 81|
|    Miguel|     Ruiz|     M| 64|
|  Rosalita|  Ramirez|     F| 14|
|      Ally|   Garcia|     F| 39|
|    Claire|  McBride|     F| 23|
|   Abigail| Cottrell|     F| 75|
|       Jos|   Rivera|     M| 59|
|      Ravi| Dasgupta|     M| 25|
|      Ravi|  Shannon|     F| 42|
|   Shannon|  McBride|     F| 23|
|   Abigail|  McBride|     F| 23|
+----------+---------+------+---+

Schema Inference programmatically

  1. Start spark shell as above.
    spark-shell --master local[1]
  1. Define the schema . Enter the command with (:paste) mode or execute the command highlighted in bold.
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    // schema inference programmtically
    val schema = "first_name STRING, last_name STRING, gender STRING , age INT" // please be careful in giving the schema in the same order as we expect it to be
    val empcrm = spark.read
                         .format("csv") // by default ,parquet. File will be loaded by 
    assuming the parquet format and might fail if the file is not parquet
                                .option("header", "true") // otherwise, header will be considered as a record, it will add new header with names as _C1 etc ..
                              //option("inferSchema", "true"). // If true, Spark will determine the column data types. Else , the "age" will be considered as string
                              .schema(schema)
                              .load("people_with_header.csv")
    
    // Exiting paste mode, now interpreting.
    
    schema: String = first_name STRING, last_name STRING, gender STRING , age INT
    empcrm: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string ... 2 more fields]
  1. Repeat the above commands as above from step #4.
    scala> empcrm.show
    +----------+---------+------+---+
    |first_name|last_name|gender|age|
    +----------+---------+------+---+
    |      Erin|  Shannon|     F| 42|
    |    Norman| Lockwood|     M| 81|
    |    Miguel|     Ruiz|     M| 64|
    |  Rosalita|  Ramirez|     F| 14|
    |      Ally|   Garcia|     F| 39|
    |    Claire|  McBride|     F| 23|
    |   Abigail| Cottrell|     F| 75|
    |       Jos|   Rivera|     M| 59|
    |      Ravi| Dasgupta|     M| 25|
    |      Ravi|  Shannon|     F| 42|
    |   Shannon|  McBride|     F| 23|
    |   Abigail|  McBride|     F| 23|
    +----------+---------+------+---+

Reading From Text Files

Automatic Schema Inference

  1. start spark shell
    spark-shell --master local[*]
  1. observe customers.txt from Spark SQL\Labs\TextFile. This file is already available in the hdfs.
    (base) [hands-on@localhost demos]$ hdfs dfs -put customers.txt
    
    (base) [hands-on@localhost demos]$ hdfs dfs -ls
    
    Found 4 items
    drwxr-xr-x   - hands-on hadoop         .sparkStaging
    -rw-r--r--   1 hands-on hadoop      customers.txt
    -rw-r--r--   1 hands-on hadoop      hamlet.txt
    -rw-r--r--   1 hands-on hadoop     u.data
  1. Create the SQLContext first from the existing Spark Context
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  1. Run the following commands
    // Import statement to implicitly convert an RDD to a DataFrame ( as the text file didnot have a structure)
    import sqlContext.implicits._
    // Create a custom class to represent the Customer
    case class Customer(customer_id: Int, name: String,city: String, state: String, zip_code: String)
    // Create a DataFrame of Customer objects from the data set text file.
    val dfCustomers = sc.textFile("customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt,p(1), p(2), p(3), p(4))).toDF()
    dfCustomers.show
    // Register DataFrame as a table.
    dfCustomers.registerTempTable("customers")
    // Display the content of DataFrame
    dfCustomers.show()
    // Print the DF schema
    dfCustomers.printSchema()
    // Select customer name column
    dfCustomers.select("name").show()
    // Select customer name and city columns
    dfCustomers.select("name", "city").show()
    // Select a customer by id
    dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()
    // Count the customers by zip code
    dfCustomers.groupBy("zip_code").count().show()
    
    
    scala> import sqlContext.implicits._
    import sqlContext.implicits._
    
    scala> case class Customer(customer_id: Int, name: String,city: String, state: String, zip_code: String)
    defined class Customer
    
    scala> val dfCustomers = sc.textFile("customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt,p(1), p(2), p(3), p(4))).toDF()
    dfCustomers: org.apache.spark.sql.DataFrame = [customer_id: int, name: string ... 3 more fields]
    
    scala> dfCustomers.show
    +-----------+---------------+------------+-----+--------+
    |customer_id|           name|        city|state|zip_code|
    +-----------+---------------+------------+-----+--------+
    |        100|     John Smith|      Austin|   TX|   78727|
    |        200|    Joe Johnson|      Dallas|   TX|   75201|
    |        300|      Bob Jones|     Houston|   TX|   77028|
    |        400|     Andy Davis| San Antonio|   TX|   78227|
    |        500| James Williams|      Austin|   TX|   78727|
    +-----------+---------------+------------+-----+--------+
    
    scala> dfCustomers.registerTempTable("customers")
    warning: there was one deprecation warning; re-run with -deprecation for details
    
    scala> dfCustomers.show()
    +-----------+---------------+------------+-----+--------+
    |customer_id|           name|        city|state|zip_code|
    +-----------+---------------+------------+-----+--------+
    |        100|     John Smith|      Austin|   TX|   78727|
    |        200|    Joe Johnson|      Dallas|   TX|   75201|
    |        300|      Bob Jones|     Houston|   TX|   77028|
    |        400|     Andy Davis| San Antonio|   TX|   78227|
    |        500| James Williams|      Austin|   TX|   78727|
    +-----------+---------------+------------+-----+--------+
    
    
    scala> dfCustomers.printSchema()
    root
     |-- customer_id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- city: string (nullable = true)
     |-- state: string (nullable = true)
     |-- zip_code: string (nullable = true)
    
    
    scala> dfCustomers.select("name").show()
    +---------------+
    |           name|
    +---------------+
    |     John Smith|
    |    Joe Johnson|
    |      Bob Jones|
    |     Andy Davis|
    | James Williams|
    +---------------+
    
    
    scala> dfCustomers.select("name", "city").show()
    +---------------+------------+
    |           name|        city|
    +---------------+------------+
    |     John Smith|      Austin|
    |    Joe Johnson|      Dallas|
    |      Bob Jones|     Houston|
    |     Andy Davis| San Antonio|
    | James Williams|      Austin|
    +---------------+------------+
    
    
    scala> dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()
    +-----------+---------------+-------+-----+--------+
    |customer_id|           name|   city|state|zip_code|
    +-----------+---------------+-------+-----+--------+
    |        500| James Williams| Austin|   TX|   78727|
    +-----------+---------------+-------+-----+--------+
    
    
    scala> dfCustomers.groupBy("zip_code").count().show()
    +--------+-----+
    |zip_code|count|
    +--------+-----+
    |   75201|    1|
    |   78227|    1|
    |   78727|    2|
    |   77028|    1|
    +--------+-----+
    
    
    scala>

Reading from file with the schema ( manual - Programmatically Specifying the Schema)

In the above example, the schema is inferred using the reflection. We can

also programmatically specify the schema of the dataset. This is useful

when the custom classes cannot be defined ahead of time because the

structure of data is encoded in a string.

Following code example shows how to specify the schema using the new

data type classes StructType, StringType, and StructField.

  1. Repeat the steps #1, #2, #3 if not already.
  1. Execute the commands
    //
    // Programmatically Specifying the Schema
    //
    // Create SQLContext from the existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // Create an RDD
    val rddCustomers = sc.textFile("customers.txt")
    // The schema is encoded in a string
    val schemaString = "customer_id name city state zip_code"
    // Import Spark SQL data types and Row.
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    // Generate the schema based on the string of schema
    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    // Convert records of the RDD (rddCustomers) to Rows.
    val rowRDD = rddCustomers.map(_.split(",")).map(p =>Row(p(0).trim,p(1),p(2),p(3),p(4)))
    // Apply the schema to the RDD.
    val dfCustomers = sqlContext.createDataFrame(rowRDD,schema)
    // Register the DataFrames as a table.
    dfCustomers.registerTempTable("customers")
    // SQL statements can be run by using the sql methods against the "customers" table provided by sqlContext.
    val custNames = sqlContext.sql("SELECT name FROM customers")
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    custNames.map(t => "Name: " + t(0)).collect().foreach(println)
    // SQL statements can be run by using the sql methods provided by sqlContext.
    val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)
    
    
    scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@41d1bea3
    
    scala> val rddCustomers = sc.textFile("customers.txt")
    rddCustomers: org.apache.spark.rdd.RDD[String] = customers.txt MapPartitionsRDD[39] at textFile at <console>:30
    
    
    
    scala> rddCustomers.collect
    res19: Array[String] = Array(100, John Smith, Austin, TX, 78727, 200, Joe Johnson, Dallas, TX, 75201, 300, Bob Jones, Houston, TX, 77028, 400, Andy Davis, San Antonio, TX, 78227, 500, James Williams, Austin, TX, 78727)
    
    scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@57484e17
    
    scala> val rddCustomers = sc.textFile("customers.txt")
    rddCustomers: org.apache.spark.rdd.RDD[String] = customers.txt MapPartitionsRDD[41] at textFile at <console>:30
    
    scala> val schemaString = "customer_id name city state zip_code"
    schemaString: String = customer_id name city state zip_code
    
    scala> import org.apache.spark.sql._
    import org.apache.spark.sql._
    
    scala> import org.apache.spark.sql.types._
    import org.apache.spark.sql.types._
    
    scala> val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(customer_id,StringType,true), StructField(name,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(zip_code,StringType,true))
    
    scala> val rowRDD = rddCustomers.map(_.split(",")).map(p =>Row(p(0).trim,p(1),p(2),p(3),p(4)))
    rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[43] at map at <console>:37
    
    scala> val dfCustomers = sqlContext.createDataFrame(rowRDD,schema)
    dfCustomers: org.apache.spark.sql.DataFrame = [customer_id: string, name: string ... 3 more fields]
    
    scala> dfCustomers.registerTempTable("customers")
    warning: there was one deprecation warning; re-run with -deprecation for details
    
    scala> val custNames = sqlContext.sql("SELECT name FROM customers")
    custNames: org.apache.spark.sql.DataFrame = [name: string]
    
    scala> custNames.map(t => "Name: " + t(0)).collect().foreach(println)
    Name:  John Smith
    Name:  Joe Johnson
    Name:  Bob Jones
    Name:  Andy Davis
    Name:  James Williams
    
    scala> val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
    customersByCity: org.apache.spark.sql.DataFrame = [name: string, zip_code: string]
    
    scala> customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)
     Joe Johnson, 75201
     Bob Jones, 77028
     Andy Davis, 78227
     John Smith, 78727
     James Williams, 78727

With Databricks

  1. Create the Cluster.

    Select the runtime as 6.6 and give a cluster name. Finally , click on Create Cluster.

    confirm by clicking "clusters" page

  1. upload the file. upload file "people_with_header.csv" , it will be uploaded to the path "/FileStore/tables/people_with_header.csv" ( please make note of the path).Click on "Data" , and then "Add Data" . Drop the file "people_with_header.csv" in "Drop files to upload"


  1. Import sparkSQL Demo.html . Click on "workspace" and import from there.
  2.  


     



 

8. SPARK : COMMANDS : PRACTICE LAB : RDD

8. SPARK : COMMANDS : PRACTICE LAB : RDD


PRACTICE 1: Combine 2 RDD and perform logic on that

logic is : if right table content is there then return or else print -1

Create 2 RDD & then perform on that 
-------------------------------------------

val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
val c = List((3,7),(3,4),(4,9),(5,12))
val d = sc.parallelize(c)

b.collect
d.collect

b.leftOuterJoin(d)
<Int, (Int, Option[Int]))]
b.leftOuterJoin(d).collect
(2,(3, None)

b.leftOuterJoin(d).map{
x =>
x._2._2 match{
case Some(rightTableContent) => 
{
(x_1, (x._2._1,rightTableContent))
{

}
}
case None => 
{
(x_1, (x._2._1,-1))
}

}.collect










PRACTICE 2: Want to process the serverlogs


Requirement: 
- client has 40 - 50 thousend servers
- want monitor for servers and only on Error and source of  error
- Error from java application or anywhere
Log -> Info/Warn/Error
       If Error -> Source of error

Anaylyze:
var slrdd = sc.textFile("server log path")

var erdd = slrdd.filter(Error)
erdd.cache  //Transformation

var jerdd = erdd.filter(javaError)
jerdd.count   //perform DAG cycle: (slrdd -> erdd - cache - jerdd -> count)

var sqlerdd = erdd.filter(SQLError)
sqlerdd.count //perform DAG cycle: (erdd - sqlerdd -> count)

######################################################################

hdfs dfs -cat data/serverlogs.log | head
hdfs dfs -cat data/serverlogs.log | grep -i warn head
hdfs dfs -cat data/serverlogs.log | grep -i error head




var slrdd = sc.textFile("data/serverlogs.log")
var erdd = slrdd.filter(x => x.startsWith("Error"))
erdd.cache

var jerdd = erdd.filter(x => x.split(":")(1).equals("Java"))
sc.setLogLevel("info")
jerdd.count  //52 Million and took 44 secs
slrdd.count  //total it has 253 Million records




hdfs dfs -ls data/serverlogs.log   
//file size is : 17664 MB ( 17 GB)
//(0 + 0) / 138
//file will be divided in to 138 blocks of each bolck - 128 MB


var sqlerdd = erdd.filter(x => x.split(":")(1).equals("Sql"))
sqlerdd .count   //12 secs






//file is kept below
data_scientist_sarvesh_gm83dc@ip-10-0-1-10    -  data/serverlogs.log
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/serverlogs.log .

######################################################################

Error can be from anywhere in serverlogs

OS
DB
Application



PRACTICE 3 :Advance performance improvement





var c = List((3,7), (3,4), (4,9), (5,12))
var d = sc.parallelize(c)
d.partitioner  //None  - Hash partitioning is not done
d.partitions.size  //29, this is different than above hash-partitioner

//How to do hash partitioning
var e = d.partitionBy(new org.apache.spark.HashPartitioner(3))
e.partitioner  //Some(org.apache.spark.HashPartitioner@3)
//caching is important here




//MAP : will not keep parent partitioning information

//apply trnsformation like map, resulting RDD will not keep parent partitioning information
var f = e.map(x => x)  //VERY IMP : here 'e' is hashPartitioned but when we apply map  f will not have Hash partitioning info.
f.partitioner //None (NOT MAINTAINED)
e.map(x => x._2)
e.map(x => (x_1+1, x._2))


//MAPVALUE : will maintain parent partitioning info

//solution to above problem is : mapValue
var g = e.mapValues(x => x + 1))  //it will maintain the partitioning information, means e is hash-partitioned and even g also will have hash-partitioned
g.partitioner   //Some(org.apache.spark.HashPartitioner@3)






Tuesday, December 17, 2019

7. SPARK : COMMANDS : RDD

7. SPARK : COMMANDS : RDD


RDD can be created using below 2 options:

1. var frdd = sc.parallelize(1 to 100)

2. var frdd = sc.textFile("data/fruit.txt")

###################################################################

webconsole
spark2-shell
sc

//RDD: create RDD & Rdd operations ###########################################
var myfrdd = sc.parallelize(1 to 100)
myfrdd.partitions.size   //2
var myfrdd = sc.parallelize(1 to 100, 3)  //want to create  rdd with 3 partitions
myfrdd.partitions.size   //3
myfrdd.glom   //Transformaiton, we get another RDD
myfrdd.glom.collect  //processing happens 
    //it will show content of each partition
    //1st partition = 1 to 33
    //2nd partition = till 66
    //3rd partition = till 100

glom -> is to display content in each partition.




//help command
myfrdd.  //press tab
 ###########################################################################

//EVEN RDD : create even rdd ################################################################
//filter is Transformation and HOF
//We know that (RDD -> TRANSFORMATION -> RDD)
myfrdd.filter(x => x%2 == 0)   //it will give another RDD
var erdd = myfrdd.filter(x => x%2 == 0)  //just transformation and store that in another rdd
erdd.collect   //now processing happens and give even result

erdd.partitions.size  //3 it will maintain same 3 partitions.
erdd.glom.collect

###########################################################################


https://drive.google.com/drive/folders/1JaXdI37xCkYsJHiou5-WK3g03LuVSVLD

//READ hdfc file : and split based on " " #######################################
hdfs dfs -cat data/fruit.txt | head

sc.textFile("data/fruit.txt")  //this will give rdd of string bez file has String
var frdd = sc.textFile("data/fruit.txt")
frdd.take(3)  //take 1st 3 conent of rdd



we can get the file from below location and put it in to HDFS your folder:
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/fruit.txt .
hdfs dfs -put fruit.txt /user/pavankumarmadhu123_gmail

//Note if file is in user/pavankumarmadhu123_gmail/fruit.txt  then we need to give sc.textFile("fruit.txt")


var ifrdd = frdd.fltMap(x => x.split(" ") )  //take rdd which each rdd has 1 fruit as content
ifrdd.take(5)
ifrdd.count  //file has 57 million fruits and took 8 secs (in memory computation, distributed...)
ifrdd.countByValue

OR 

//how scala is compact program : single line of code same as above
sc.textFile("data/fruit.txt").fltMap(x => x.split(" ") ).countByValue  //not storing in any of the variable




##############################################################################

//REDUCE demo - (HOF) - passing anonymous function #####################################
var myfrdd = sc.parallelize( 1 to 10, 3)
myfrdd.reduce((x,y) => x + y)   
//55




##########################################################################

//MULTIPLE RDDS : load data and create multiple rdds out of  it
we can get the file from below location and put it in to HDFS your folder:
Get the file from : hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/sample.txt .
hdfs dfs -put sample.txt /user/pavankumarmadhu123_gmail
########################################
hdfs dfs -cat data/sample.txt | head
var frdd = sc.textFile("data/sample.txt")   //returns rdd of string
//Note if file is in user/pavankumarmadhu123_gmail/sample.txt  then we need to give sc.textFile("sample.txt")

//all the lines which are having "are"
var arerdd = frdd.filter(x => x.contains("are") )

//all the lines which are having "is"
var isrdd = frdd.filter(x => x.contains("is") )

frdd.count  //2 crore
arerdd.count  //1 crore 40 lakhs
isrdd.count  //84 lakh




arerdd.union(isrdd)  //Transformation will return rdd again
arerdd.union(isrdd).count   //Processing  2 crore

EXPENSIVE operations (intersection & distinct)
------------------------------------------------------------

arerdd.intersection(isrdd)  //Transformation will return rdd again
arerdd.intersection(isrdd).count    //Processing  0

arerdd.distinct.collect  //Procrssing 5
isrdd.distinct.collect   //Processing 3

//means above it has to compare all the partitions and need to travel all - we call it as shuffeling
//DISTINCT : shuffle intensive operatoin - 

//INTERSECTION : every record of one partition has to compare with another partion again its shuffelling






arerdd.subtract(isrdd)  //records which are common between both rdds will be removed
arerdd.subtract(isrdd).count  //since there is no common in this eg it will return arerdd count itself

################################################################################

Cartesian : //between cross products, even possible between 2 rdds###########################
----------------------
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).count   //25
sc.parallelize(1 to 5).cartesian(sc.parallelize(6 to 10)).collect




##########################################################################################

//MAP :################################################################

val a = sc.pralellelize(1 to 10)
a.map(x => x*x).collect

a.flatMap(x => 1 to x).collect   //1 12  123  1234  12345
//1,1,2,1,2,3,1,2,3,4,1,2,3,4,5.....




################################################################################################

Local setup
----------------
https://www.cloudera.com/downloads.html


Aggregate: (is a action and HOF - and it takes 2 functions as input)
#################################################################################


Reduce : Limitation : output format has to be same as input
--------------------------------
If Requirement : along with total- 15 want to get 
tuple (15,5)  (output, total)
input is integer
output is tuple

Aggregate: 
--------------------------------
function1 - work on partitions and give the output
function2 - work on function 1 output


//a.aggregate((0,0))(function1, function2)
a.aggregate((0,0))((x,y) => (x._1 + y, x._2 +1), (x,y => (x._1 + y._1,x._2 + y._2) )
//(15,5)




####################################################################################
Pair RDD ###########################################################################
Tuple rdd is a pair rdd
rdd of tuple is a pair rdd

eg: (2,5)
    (2,5).getClass   //Tuple2$mc...

val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
b.collect
b.map(x => (x._1,x._2*x._2)).collect  //want to keep key same and squre of value
b.keys.collect  //1 2 1 2 3
b.values.colect   //2 3 5 5 4

############################################################################################

//ReduceByKey : (Transformation does same as Reduce but do key by key)###################
--------------------------
b.reduceByKey((x,y) => x + y)   //tranformation
b.reduceByKey((x,y) => x + y).collect   //(2,8) (1,7) (3,4)  doing addition of value based on key




############################################################################


GroupByKey (Transormation) : #########################################################
-------------------
multiple aggregation operations: key, max, min, count ....
better to have it one place and do all above operation

b.groupByKey  //transformation returns rdd of tuple
b.groupByKey.collect





b.groupByKey.map(x => (x._1, x_2.sum, x._2.max, x._2.min, x._2.size)).collect




###############################################################################################

CombineByKey (HOF & accept 3 functions) : #########################################################
------------------
Instead of tranferring every thing, 
do local aggregation
and then do shuffle data 

Every thing will be based on key as name tells - CombineByKey

Func1 : new key - work on value and return tuple (Value,1)
Func2 : work on partition
Func3 : output of func2 for all partition


var a = sc.parallelize(Array((1,3), (1,6), (1,8), (2,3), (2,10), (3,9)))
a.collect
a.combineByKey(x => (x,1), (x:(Int,Int), y) => (x._1+y, x._2+1), (x:(Int,Int), y:(Int,Int)) => (x._1 + y._1, x._2+y._2)).collect





var b = sc.parallelize(Array((1,3), (4,6), (4,8), (5,3), (5,10), (3,4)))
b.collect


a.union(b).collect
  //we get 12 combination
a.union(b).size    //12

a.intersection(b).collect  //we get 1 bez 1 is common between them

a.subtract(b).collect   //5 components will come common will be removed

a.distinct.collect    //shows unique

a.join(b).collect    //inner join, based on key
//[Int, (Int, Int)]
//[key, tuple]

a.leftOuterJoin(b).collect
(Int, (Int, Option[Int]))

a.rightOuterJoin(b).collect
(Int, (Option[Int], Int))




###################
############################################################################



6. SPARK : ARCHITECTURE

6. SPARK : ARCHITECTURE












5. SCALA : MORE....

5. SCALA : MORE....

Sunday, December 15, 2019

4. SCALA : Functional Programming

4. SCALA : Functional Programming



Higher order function.: If one function takes another function as parameter => we call 


webconsole
spark2-shell

//pass function at run time, runtime we will decide
def sum(fun:Int=>Int, a:Int, b:Int)
:Int={

if(a == b){
fun(a)
}
else{
fun(a) + sum(fun,a+1,b)
}

}


def firstPower(input:Int):Int= {Input}


sum(firstPower,1,3)

// O/p = 6

def secondPower(input : Int):Int={Input * Input}

sum(secondPower,1,3)

// o/p = 14

Anonymous function (Same as above but its Anonymous)
-----------------------


sum(x => x, 1,3)

sum(x => x * x, 1,3)

sum(x => x * x * x, 1,3)

Higher order function examples (map is higher order func takes func as 

map (higher order function - means takes function as input parameter)
--------------------------------
var arr = Array("pavan","madhu","amar")

arr.map(x => x.length() ).foreach(println)
arr.map(x => x.trim() ).foreach(println)
arr.map(x => x.replace(" ","") ).foreach(println)

//depending true/false it will filter
Array(1,2,3,4,5).filter(x => x%2 == 0).foreach(pritnln)
2
4


Array(1,2,3,4,5).map(x => x*x).foreach(pritnln)
1 4 9 16 25


Reduce - HOF (Again higher order function - means takes function as input parameter)
1 2 3 4 5
x y x+y

1  2 = 3
3  3 = 6
6  4 = 10
10 5 = 15

println(Array(1,2,3,4,5).reduce((x,y) => x+y)))
println(Array(1,2,3,4,5).reduce((x,y) => scala.math.max(x,y)))
5


FlatMap (HOF)
#####################

Takes func as input
return Array

FlatMap - convert them in to individual item

var arr = Array("pavan","madhu","amar")
arr.flatMap(x => x).foreach(println)
p
a
v
a
n

m
a
d
h
u

a
m
a
r

var arr = Array("pavan malatkar","madhu hiraskar","amar ganapathi")
arr.flatMap(x => x.split(" ")).foreach(println)

pavan
malatkar
madhu
hiraskar
amar
ganapathi


arr.flatMap(x => x).foreach(println)  -> Here x is returning as it is, but flatMap will do it as individual char
arr.flatMap(x => x.split(" ")).foreach(println)  -> Here it is not returning simply x, we have overriden logic to do split 















3. SCALA : OOPS

3. SCALA : OOPS


SCALA as OOP###########################


Rule1 : name of the class and file name is not mandatory

MainClass.scala
------------------------------------------------------------------------------------------------------------------------------
class SecondClass{

var a = 0
var b = 100
def getA():Int=
{
   a

}

def setA(input:Int)=
{
   a = input

}

def getB():Int=
{
   b

}

def setB(input:Int)=
{
   b = input

}


object xyz extends App{
var newObj = new SecondClass
println(newObj.getA)
println(newObj.getB)
newObj.setA(1000)
newObj.setB(2000)

println(newObj.getA)
println(newObj.getB)
}

}

------------------------------------------------------------------------------------------------------------


Rule2: Scala is going to give getter and setter for private variables

class SecondClass{

var a = 0
var b = 100

Var x new SecondClass
println(x.a)    //internally it access x.getA()
x.a = 100  //internally it access x.setA()
}

Rule3: private var c = 10  //anyhow variable is private, but even getter and setter also becomes private

//if we define variable, 
 -> variable becomes private
 -> we will get getter & setter by default

//if we define private variable
 -> variable is anyhow private

 -> we will get private getter & setter (we have to define our own public getter and setter to access)


Constructors-------------

Class Abc { var a = 0; var b = 0; } main{ var a = new Abc() }

1. Primary constructor 2. Auxilary connstructor

Rules for Auxilary: ----------------------- - access using this - it calls Primary const inside it OR call already defined auxilary constructor








SingleTon Class#################################

object MySingleton { def concat(str1:String, str2:String):String={ str1 + str2 } main{ MySingleton.concat("Hello","Pavan') //valid only if MySingleTon is object } }










Companion objets#######################

Mix of Static and Class level variables


Inheitance####################





Trait########################

-> Interface...in java -> if we want to enforce the method -> only signature no body (abstract method) -> if we have method with body -> then its concrete method


Case class##################

when we define row of table