Quotes

Wednesday, April 1, 2020

Big Data EcoSystem and Spark


Parallel programing: multiple CPU's but one Memory- Shared Memory.

Distributed Programming: Each node with it's own Memory and CUP.

Limitations: We can write all the big data programs with Java, Python but we run into out of memory issues or takes longer time to process.


MapReduce is a programming technique, hadoop MapReduce is a specific implementation of programing technique.

It has 3 stages.

Map: Where it reads the data and converts to Key, value pairs
example: (song1, 1), (song2, 1), (song1, 1)

Shuffle: It collects data with same keys and forms groups.

Group1
(song1,1)
(song1,1)

Group2
(song2,1)

Reduce:

(song1, 2)
(song2,1)




1) Spark is crucial to process huge volumes of data sets faster with in memory compute model.

Programming: High level API's

1) Data Frames
2) SPARK SQL

Low level:

RDD- Resilient distributed data set which is fault tolerant.



A) Data wrangling using Functional Programming Language.


Python is procedural language, so very slow if write spark code in python.

hence they implemented pySpark with high level API's.


Lazy evaluation is introduced to avoid out of memory issues while copying the input data to each function.

DAG: Directed Acyclic Graph: Make sure we can procrastinate/postpone some actions till last minute.

Dag contains multiple kind of map functions.

These multi step combos called stages

Instead of defining a new def() to convert a string to lowercase using str.lower, instead the Anonymous functions are used using Lambda.

Maps:
In Spark, maps take data as input and then transform that data with whatever function you put in the map. They are like directions for the data telling how each input should get to the output.

distributed_song_log.map(convert_song_to_lowercase).collect()

Spark does not actually execute the map step unless it needs to.
"RDD" in the output refers to resilient distributed dataset. RDDs are exactly what they say they are: fault-tolerant datasets distributed across a cluster. This is how Spark stores data.
To get Spark to actually run the map step, you need to use an "action". One available action is the collect method. The collect() method takes the results from all of the clusters and "collects" them into a single list on the master node.

You do not always have to write a custom function for the map step. You can also use anonymous (lambda) functions as well as built-in Python functions like string.lower().
Anonymous functions are actually a Python feature for writing functional style programs.

DataFormats:
csv, xml, json, html.
Hadoop Distributed File System: Splits data into blocks of 64 MB or 128 MB and stores across cluster to facilitate fault tolerant behaviour.

SparkContext: to read dataframes for non SQL
sparkSession is for SQL dataframes.
sparkConf is for configuring app name, setmaster local/ip address. 
getorCreate is to collect if there is already existign session and modify with configuration properties provided.

reading and writing data into spark data frames:
spark.saprkContext.getConf.getAll()
log = spark.read.json(path)
log.printschema()
log.desribe()
log.show(n=1)
log.take(5)

log.write.save(outputpath, format="csv", header=True)
csvlog = spark.read.csv(outputpath)

Imperative Programming vs Declarativ programming
Declarating is bit high level which is an abstraction of imperative programming
Imperative programming is step by step calculate sum
a=1, b=1
c = a+b
return c
Declarative: give me sum of these two numbers
sum(1,1)

log.describe("facilityid").show()
log.select("facilityid").dropDuplicates().sort(facilityid).show()

userderined function:

getHr = udf(lambda x: datatime.datetime.fromtimestamp(x/1000).hour)

log = withColumn('hr', getHr(log.ts))
DAG: Multiple Stages,
Stage: Multiple Tasks,
Tasks.

Accumulators are used since print statement gets executed on the worker nodes and which can't be accessed while trying to access from master node where driver program runs.

s3a://
hdfs:///

port 22 for SSH
prot 80 for HTML
7077 is used by master node to connect with worker nodes.

webui for master node is 8080
port 8888 for jupiter notebooks
4040 active spark jobs

Pareto principle - 80% of the data comes from 20% of the users

In this lesson, we walked through various examples of issues you can debug based on error messages, loglines and stack traces.
We have also touched on another very common issue with Spark jobs that can be harder to address: everything working fine but just taking a very long time. So what do you do when your Spark job is (too) slow?

Insufficient resources

Often while there are some possible ways of improvement, processing large data sets just takes a lot longer time than smaller ones even without any big problem in the code or job tuning. Using more resources, either by increasing the number of executors or using more powerful machines, might just not be possible. When you have a slow job it’s useful to understand
  • how much data you’re actually processing (compressed file formats can be tricky to interpret),
  • if you can decrease the amount of data to be processed by filtering or aggregating to lower cardinality,
  • and if resource utilization is reasonable.
There are many cases where different stages of a Spark job differ greatly in their resource needs: loading data is typically I/O heavy, some stages might require a lot of memory, others might need a lot of CPU. Understanding these differences might help to optimize the overall performance. Use the Spark UI and logs to collect information on these metrics.
If you run into out of memory errors you might consider increasing the number of partitions. If the memory errors occur over time you can look into why the size of certain objects is increasing too much during the run and if the size can be contained. Also, look for ways of freeing up resources if garbage collection metrics are high.
Certain algorithms (especially ML ones) use the driver to store data the workers share and update during the run. If you see memory issues on the driver check if the algorithm you’re using is pushing too much data there.

Data skew

If you drill down on the Spark UI to the task level you can see if certain partitions process significantly more data than others and if they are lagging behind. Such symptoms usually indicate a skewed data set. Consider implementing the techniques mentioned in this lesson:
  • add an intermediate data processing step with an alternative key
  • adjust the spark.sql.shuffle.partitions parameter if necessary
The problem with data skew is that it’s very specific to a data set. You might know ahead of time that certain customers or accounts are expected to generate a lot more activity but the solution for dealing with the skew might strongly depend on how the data looks like. If you need to implement a more general solution (for example for an automated pipeline) it’s recommended to take a more conservative approach (so assume that your data will be skewed) and then monitor how bad the skew really is.

Inefficient queries

Once your Spark application works it’s worth spending some time to analyze the query it runs. You can use the Spark UI to check the DAG and the jobs and stages it’s built of.
Spark’s query optimizer is called Catalyst. While Catalyst is a powerful tool to turn Python code to an optimized query plan that can run on the JVM it has some limitations when optimizing your code. It will for example push filters in a particular stage as early as possible in the plan but won’t move a filter across stages. It’s your job to make sure that if early filtering is possible without compromising the business logic than you perform this filtering where it’s more appropriate.
It also can’t decide for you how much data you’re shuffling across the cluster. Remember from the first lesson how expensive sending data through the network is. As much as possible try to avoid shuffling unnecessary data. In practice, this means that you need to perform joins and grouped aggregations as late as possible.
When it comes to joins there is more than one strategy to choose from. If one of your data frames are small consider using broadcast hash join instead of a hash join.

Further reading

Debugging and tuning your Spark application can be a daunting task. There is an ever growing community out there though always sharing new ideas and working on improving Spark itself and tooling that makes using Spark easier. So if you have a complicated issue don’t hesitate to reach out to others (via user mailing lists, forums, and Q&A sites).
You can find more information on tuning Spark and Spark SQL in the documentation.

Spark Machine learning: is SciKit
Spark Dataframes: Python DataFrames or R DataFrames.



No comments:

Post a Comment