Sunday, December 22, 2019

8. SPARK : COMMANDS : PRACTICE LAB : RDD

8. SPARK : COMMANDS : PRACTICE LAB : RDD


PRACTICE 1: Combine 2 RDD and perform logic on that

logic is : if right table content is there then return or else print -1

Create 2 RDD & then perform on that 
-------------------------------------------

val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
val c = List((3,7),(3,4),(4,9),(5,12))
val d = sc.parallelize(c)

b.collect
d.collect

b.leftOuterJoin(d)
<Int, (Int, Option[Int]))]
b.leftOuterJoin(d).collect
(2,(3, None)

b.leftOuterJoin(d).map{
x =>
x._2._2 match{
case Some(rightTableContent) => 
{
(x_1, (x._2._1,rightTableContent))
{

}
}
case None => 
{
(x_1, (x._2._1,-1))
}

}.collect










PRACTICE 2: Want to process the serverlogs


Requirement: 
- client has 40 - 50 thousend servers
- want monitor for servers and only on Error and source of  error
- Error from java application or anywhere
Log -> Info/Warn/Error
       If Error -> Source of error

Anaylyze:
var slrdd = sc.textFile("server log path")

var erdd = slrdd.filter(Error)
erdd.cache  //Transformation

var jerdd = erdd.filter(javaError)
jerdd.count   //perform DAG cycle: (slrdd -> erdd - cache - jerdd -> count)

var sqlerdd = erdd.filter(SQLError)
sqlerdd.count //perform DAG cycle: (erdd - sqlerdd -> count)

######################################################################

hdfs dfs -cat data/serverlogs.log | head
hdfs dfs -cat data/serverlogs.log | grep -i warn head
hdfs dfs -cat data/serverlogs.log | grep -i error head




var slrdd = sc.textFile("data/serverlogs.log")
var erdd = slrdd.filter(x => x.startsWith("Error"))
erdd.cache

var jerdd = erdd.filter(x => x.split(":")(1).equals("Java"))
sc.setLogLevel("info")
jerdd.count  //52 Million and took 44 secs
slrdd.count  //total it has 253 Million records




hdfs dfs -ls data/serverlogs.log   
//file size is : 17664 MB ( 17 GB)
//(0 + 0) / 138
//file will be divided in to 138 blocks of each bolck - 128 MB


var sqlerdd = erdd.filter(x => x.split(":")(1).equals("Sql"))
sqlerdd .count   //12 secs






//file is kept below
data_scientist_sarvesh_gm83dc@ip-10-0-1-10    -  data/serverlogs.log
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/serverlogs.log .

######################################################################

Error can be from anywhere in serverlogs

OS
DB
Application



PRACTICE 3 :Advance performance improvement





var c = List((3,7), (3,4), (4,9), (5,12))
var d = sc.parallelize(c)
d.partitioner  //None  - Hash partitioning is not done
d.partitions.size  //29, this is different than above hash-partitioner

//How to do hash partitioning
var e = d.partitionBy(new org.apache.spark.HashPartitioner(3))
e.partitioner  //Some(org.apache.spark.HashPartitioner@3)
//caching is important here




//MAP : will not keep parent partitioning information

//apply trnsformation like map, resulting RDD will not keep parent partitioning information
var f = e.map(x => x)  //VERY IMP : here 'e' is hashPartitioned but when we apply map  f will not have Hash partitioning info.
f.partitioner //None (NOT MAINTAINED)
e.map(x => x._2)
e.map(x => (x_1+1, x._2))


//MAPVALUE : will maintain parent partitioning info

//solution to above problem is : mapValue
var g = e.mapValues(x => x + 1))  //it will maintain the partitioning information, means e is hash-partitioned and even g also will have hash-partitioned
g.partitioner   //Some(org.apache.spark.HashPartitioner@3)






No comments:

Post a Comment