8. SPARK : COMMANDS : PRACTICE LAB : RDD
PRACTICE 1: Combine 2 RDD and perform logic on that
logic is : if right table content is there then return or else print -1Create 2 RDD & then perform on that
-------------------------------------------
val a = List((1,2),(2,3),(1,5),(2,5),(3,4))
val b = sc.parallelize(a)
val c = List((3,7),(3,4),(4,9),(5,12))
val d = sc.parallelize(c)
b.collect
d.collect
b.leftOuterJoin(d)
<Int, (Int, Option[Int]))]
b.leftOuterJoin(d).collect
(2,(3, None)
b.leftOuterJoin(d).map{
x =>
x._2._2 match{
case Some(rightTableContent) =>
{
(x_1, (x._2._1,rightTableContent))
{
}
}
case None =>
{
(x_1, (x._2._1,-1))
}
}.collect
PRACTICE 2: Want to process the serverlogs
Requirement:
- client has 40 - 50 thousend servers
- want monitor for servers and only on Error and source of error
- Error from java application or anywhere
Log -> Info/Warn/Error
If Error -> Source of error
Anaylyze:
var slrdd = sc.textFile("server log path")
var erdd = slrdd.filter(Error)
erdd.cache //Transformation
var jerdd = erdd.filter(javaError)
jerdd.count //perform DAG cycle: (slrdd -> erdd - cache - jerdd -> count)
var sqlerdd = erdd.filter(SQLError)
sqlerdd.count //perform DAG cycle: (erdd - sqlerdd -> count)
######################################################################
hdfs dfs -cat data/serverlogs.log | head
hdfs dfs -cat data/serverlogs.log | grep -i warn head
hdfs dfs -cat data/serverlogs.log | grep -i error head
var slrdd = sc.textFile("data/serverlogs.log")
var erdd = slrdd.filter(x => x.startsWith("Error"))
erdd.cache
var jerdd = erdd.filter(x => x.split(":")(1).equals("Java"))
sc.setLogLevel("info")
jerdd.count //52 Million and took 44 secs
slrdd.count //total it has 253 Million records
hdfs dfs -ls data/serverlogs.log
//file size is : 17664 MB ( 17 GB)
//(0 + 0) / 138
//file will be divided in to 138 blocks of each bolck - 128 MB
var sqlerdd = erdd.filter(x => x.split(":")(1).equals("Sql"))
sqlerdd .count //12 secs
//file is kept below
data_scientist_sarvesh_gm83dc@ip-10-0-1-10 - data/serverlogs.log
hdfs dfs -get /user/data_scientist_sarvesh_gm83dc/data/serverlogs.log .
######################################################################
Error can be from anywhere in serverlogs
OS
DB
Application
PRACTICE 3 :Advance performance improvement
var c = List((3,7), (3,4), (4,9), (5,12))
var d = sc.parallelize(c)
d.partitioner //None - Hash partitioning is not done
d.partitions.size //29, this is different than above hash-partitioner
//How to do hash partitioning
var e = d.partitionBy(new org.apache.spark.HashPartitioner(3))
e.partitioner //Some(org.apache.spark.HashPartitioner@3)
//caching is important here
//MAP : will not keep parent partitioning information
//apply trnsformation like map, resulting RDD will not keep parent partitioning informationvar f = e.map(x => x) //VERY IMP : here 'e' is hashPartitioned but when we apply map f will not have Hash partitioning info.
f.partitioner //None (NOT MAINTAINED)
e.map(x => x._2)
e.map(x => (x_1+1, x._2))
//MAPVALUE : will maintain parent partitioning info
//solution to above problem is : mapValuevar g = e.mapValues(x => x + 1)) //it will maintain the partitioning information, means e is hash-partitioned and even g also will have hash-partitioned
g.partitioner //Some(org.apache.spark.HashPartitioner@3)






No comments:
Post a Comment