Saturday, April 4, 2020

SQL basics, joins, agg functions



1) Why data is kept in different table, why not a single table with all the data?

- Speed
- fact tables(orders, sales, invoices), dimensional tables(customer data)


Database Normalization

There are essentially three ideas that are aimed at database normalization:
  1. Are the tables storing logical groupings of the data?
  2. Can I make changes in a single location, rather than in many tables for the same information?
  3. Can I access and manipulate data quickly and efficiently?

Having column- is useful when you want to have a similar to where condition on aggregated column which is not possible with WHERE keyword

wrong: where SUM(room_nights)> 10

Correct way: :Having SUM(room_nights)> 10

SELECT COUNT(*) num_reps_above5
FROM(SELECT s.id, s.name, COUNT(*) num_accounts
     FROM accounts a
     JOIN sales_reps s
     ON s.id = a.sales_rep_id
     GROUP BY s.id, s.name
     HAVING COUNT(*) > 5
     ORDER BY num_accounts) AS Table1;



CASE statement examples:

SELECT a.name, SUM(total_amt_usd) total_spent, 
     CASE WHEN SUM(total_amt_usd) > 200000 THEN 'top'
     WHEN  SUM(total_amt_usd) > 100000 THEN 'middle'
     ELSE 'low' END AS customer_level
FROM orders o
JOIN accounts a
ON o.account_id = a.id
WHERE occurred_at > '2015-12-31' 
GROUP BY 1
ORDER BY 2 DESC;

Date_Trunc and DATE_PART functions

SELECT DATE_TRUNC('month', o.occurred_at) ord_date, SUM(o.gloss_amt_usd) tot_spent
FROM orders o 
JOIN accounts a
ON a.id = o.account_id
WHERE a.name = 'Walmart'
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1;



SELECT DATE_PART('year', occurred_at) ord_year,  SUM(total_amt_usd) total_spent
 FROM orders
 GROUP BY 1
 ORDER BY 2 DESC;


case query:

select case
when condition1 then ''
when condition2 then '' end
from table1;



select case
when condition* then case when condition1 then ''
                     case when condition2 then '' end
then
else '' end
from table1



select case
when a+b>c then
case 
when a=b and b=c then 'Equilateral'
when a=b or b=c or a=c then 'Isosceles'
when a!=b and b!=c then 'Scalene' end
else 'Not A Triangle' end

from triangles;

SET @number = 21;

SELECT REPEAT('* ', @number := @number - 1) FROM information_schema.tables WHERE @number > 0;

Wednesday, April 1, 2020

Big Data EcoSystem and Spark


Parallel programing: multiple CPU's but one Memory- Shared Memory.

Distributed Programming: Each node with it's own Memory and CUP.

Limitations: We can write all the big data programs with Java, Python but we run into out of memory issues or takes longer time to process.


MapReduce is a programming technique, hadoop MapReduce is a specific implementation of programing technique.

It has 3 stages.

Map: Where it reads the data and converts to Key, value pairs
example: (song1, 1), (song2, 1), (song1, 1)

Shuffle: It collects data with same keys and forms groups.

Group1
(song1,1)
(song1,1)

Group2
(song2,1)

Reduce:

(song1, 2)
(song2,1)




1) Spark is crucial to process huge volumes of data sets faster with in memory compute model.

Programming: High level API's

1) Data Frames
2) SPARK SQL

Low level:

RDD- Resilient distributed data set which is fault tolerant.



A) Data wrangling using Functional Programming Language.


Python is procedural language, so very slow if write spark code in python.

hence they implemented pySpark with high level API's.


Lazy evaluation is introduced to avoid out of memory issues while copying the input data to each function.

DAG: Directed Acyclic Graph: Make sure we can procrastinate/postpone some actions till last minute.

Dag contains multiple kind of map functions.

These multi step combos called stages

Instead of defining a new def() to convert a string to lowercase using str.lower, instead the Anonymous functions are used using Lambda.

Maps:
In Spark, maps take data as input and then transform that data with whatever function you put in the map. They are like directions for the data telling how each input should get to the output.

distributed_song_log.map(convert_song_to_lowercase).collect()

Spark does not actually execute the map step unless it needs to.
"RDD" in the output refers to resilient distributed dataset. RDDs are exactly what they say they are: fault-tolerant datasets distributed across a cluster. This is how Spark stores data.
To get Spark to actually run the map step, you need to use an "action". One available action is the collect method. The collect() method takes the results from all of the clusters and "collects" them into a single list on the master node.

You do not always have to write a custom function for the map step. You can also use anonymous (lambda) functions as well as built-in Python functions like string.lower().
Anonymous functions are actually a Python feature for writing functional style programs.

DataFormats:
csv, xml, json, html.
Hadoop Distributed File System: Splits data into blocks of 64 MB or 128 MB and stores across cluster to facilitate fault tolerant behaviour.

SparkContext: to read dataframes for non SQL
sparkSession is for SQL dataframes.
sparkConf is for configuring app name, setmaster local/ip address. 
getorCreate is to collect if there is already existign session and modify with configuration properties provided.

reading and writing data into spark data frames:
spark.saprkContext.getConf.getAll()
log = spark.read.json(path)
log.printschema()
log.desribe()
log.show(n=1)
log.take(5)

log.write.save(outputpath, format="csv", header=True)
csvlog = spark.read.csv(outputpath)

Imperative Programming vs Declarativ programming
Declarating is bit high level which is an abstraction of imperative programming
Imperative programming is step by step calculate sum
a=1, b=1
c = a+b
return c
Declarative: give me sum of these two numbers
sum(1,1)

log.describe("facilityid").show()
log.select("facilityid").dropDuplicates().sort(facilityid).show()

userderined function:

getHr = udf(lambda x: datatime.datetime.fromtimestamp(x/1000).hour)

log = withColumn('hr', getHr(log.ts))
DAG: Multiple Stages,
Stage: Multiple Tasks,
Tasks.

Accumulators are used since print statement gets executed on the worker nodes and which can't be accessed while trying to access from master node where driver program runs.

s3a://
hdfs:///

port 22 for SSH
prot 80 for HTML
7077 is used by master node to connect with worker nodes.

webui for master node is 8080
port 8888 for jupiter notebooks
4040 active spark jobs

Pareto principle - 80% of the data comes from 20% of the users

In this lesson, we walked through various examples of issues you can debug based on error messages, loglines and stack traces.
We have also touched on another very common issue with Spark jobs that can be harder to address: everything working fine but just taking a very long time. So what do you do when your Spark job is (too) slow?

Insufficient resources

Often while there are some possible ways of improvement, processing large data sets just takes a lot longer time than smaller ones even without any big problem in the code or job tuning. Using more resources, either by increasing the number of executors or using more powerful machines, might just not be possible. When you have a slow job it’s useful to understand
  • how much data you’re actually processing (compressed file formats can be tricky to interpret),
  • if you can decrease the amount of data to be processed by filtering or aggregating to lower cardinality,
  • and if resource utilization is reasonable.
There are many cases where different stages of a Spark job differ greatly in their resource needs: loading data is typically I/O heavy, some stages might require a lot of memory, others might need a lot of CPU. Understanding these differences might help to optimize the overall performance. Use the Spark UI and logs to collect information on these metrics.
If you run into out of memory errors you might consider increasing the number of partitions. If the memory errors occur over time you can look into why the size of certain objects is increasing too much during the run and if the size can be contained. Also, look for ways of freeing up resources if garbage collection metrics are high.
Certain algorithms (especially ML ones) use the driver to store data the workers share and update during the run. If you see memory issues on the driver check if the algorithm you’re using is pushing too much data there.

Data skew

If you drill down on the Spark UI to the task level you can see if certain partitions process significantly more data than others and if they are lagging behind. Such symptoms usually indicate a skewed data set. Consider implementing the techniques mentioned in this lesson:
  • add an intermediate data processing step with an alternative key
  • adjust the spark.sql.shuffle.partitions parameter if necessary
The problem with data skew is that it’s very specific to a data set. You might know ahead of time that certain customers or accounts are expected to generate a lot more activity but the solution for dealing with the skew might strongly depend on how the data looks like. If you need to implement a more general solution (for example for an automated pipeline) it’s recommended to take a more conservative approach (so assume that your data will be skewed) and then monitor how bad the skew really is.

Inefficient queries

Once your Spark application works it’s worth spending some time to analyze the query it runs. You can use the Spark UI to check the DAG and the jobs and stages it’s built of.
Spark’s query optimizer is called Catalyst. While Catalyst is a powerful tool to turn Python code to an optimized query plan that can run on the JVM it has some limitations when optimizing your code. It will for example push filters in a particular stage as early as possible in the plan but won’t move a filter across stages. It’s your job to make sure that if early filtering is possible without compromising the business logic than you perform this filtering where it’s more appropriate.
It also can’t decide for you how much data you’re shuffling across the cluster. Remember from the first lesson how expensive sending data through the network is. As much as possible try to avoid shuffling unnecessary data. In practice, this means that you need to perform joins and grouped aggregations as late as possible.
When it comes to joins there is more than one strategy to choose from. If one of your data frames are small consider using broadcast hash join instead of a hash join.

Further reading

Debugging and tuning your Spark application can be a daunting task. There is an ever growing community out there though always sharing new ideas and working on improving Spark itself and tooling that makes using Spark easier. So if you have a complicated issue don’t hesitate to reach out to others (via user mailing lists, forums, and Q&A sites).
You can find more information on tuning Spark and Spark SQL in the documentation.

Spark Machine learning: is SciKit
Spark Dataframes: Python DataFrames or R DataFrames.



Tuesday, February 4, 2020

JOIN

Table 1 − CUSTOMERS Table
+----+----------+-----+-----------+----------+
| ID | NAME     | AGE | ADDRESS   | SALARY   |
+----+----------+-----+-----------+----------+
|  1 | Ramesh   |  32 | Ahmedabad |  2000.00 |
|  2 | Khilan   |  25 | Delhi     |  1500.00 |
|  3 | kaushik  |  23 | Kota      |  2000.00 |
|  4 | Chaitali |  25 | Mumbai    |  6500.00 |
|  5 | Hardik   |  27 | Bhopal    |  8500.00 |
|  6 | Komal    |  22 | MP        |  4500.00 |
|  7 | Muffy    |  24 | Indore    | 10000.00 |
+----+----------+-----+-----------+----------+
Table 2 − ORDERS Table
+-----+---------------------+-------------+--------+
|OID  | DATE                | CUSTOMER_ID | AMOUNT |
+-----+---------------------+-------------+--------+
| 102 | 2009-10-08 00:00:00 |           3 |   3000 |
| 100 | 2009-10-08 00:00:00 |           3 |   1500 |
| 101 | 2009-11-20 00:00:00 |           2 |   1560 |
| 103 | 2008-05-20 00:00:00 |           4 |   2060 |
+-----+---------------------+-------------+--------+
 
 
SQL> SELECT ID, NAME, AGE, AMOUNT
   FROM CUSTOMERS, ORDERS
   WHERE  CUSTOMERS.ID = ORDERS.CUSTOMER_ID; 



This would produce the following result.

| ID | NAME | AGE | AMOUNT | +
| 3 | kaushik | 23 | 3000 |
| 3 | kaushik | 23 | 1500 |
| 2 | Khilan | 25 | 1560 |
| 4 | Chaitali | 25 | 2060 |

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
INNER JOIN

SQL> SELECT  ID, NAME, AMOUNT, DATE
   FROM CUSTOMERS
   INNER JOIN ORDERS
   ON CUSTOMERS.ID = ORDERS.CUSTOMER_ID;



This would produce the following result.
+----+----------+--------+---------------------+
| ID | NAME     | AMOUNT | DATE                |
+----+----------+--------+---------------------+
|  3 | kaushik  |   3000 | 2009-10-08 00:00:00 |
|  3 | kaushik  |   1500 | 2009-10-08 00:00:00 |
|  2 | Khilan   |   1560 | 2009-11-20 00:00:00 |
|  4 | Chaitali |   2060 | 2008-05-20 00:00:00 |
+----+----------+--------+---------------------+
 
 
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ 
 
LEFT JOIN
 
SQL> SELECT  ID, NAME, AMOUNT, DATE
   FROM CUSTOMERS
   LEFT JOIN ORDERS
   ON CUSTOMERS.ID = ORDERS.CUSTOMER_ID;
This would produce the following result −
| ID | NAME | AMOUNT | DATE |
| 1 | Ramesh  | NULL | NULL |
| 2 | Khilan    | 1560 | 2009-11-20 00:00:00 |
| 3 | kaushik  | 3000 | 2009-10-08 00:00:00 |
| 3 | kaushik | 1500 | 2009-10-08 00:00:00 |
| 4 | Chaitali | 2060 | 2008-05-20 00:00:00 |
| 5 | Hardik  | NULL | NULL |
| 6 | Komal  | NULL | NULL |
| 7 | Muffy   | NULL | NULL |
 
 
RIGHT JOIN 

This would produce the following result −
+------+----------+--------+---------------------+
| ID   | NAME     | AMOUNT | DATE                |
+------+----------+--------+---------------------+
|    3 | kaushik  |   3000 | 2009-10-08 00:00:00 |
|    3 | kaushik  |   1500 | 2009-10-08 00:00:00 |
|    2 | Khilan   |   1560 | 2009-11-20 00:00:00 |
|    4 | Chaitali |   2060 | 2008-05-20 00:00:00
 
 
 
FULL JOIN
 
SQL> SELECT  ID, NAME, AMOUNT, DATE
   FROM CUSTOMERS
   FULL JOIN ORDERS
   ON CUSTOMERS.ID = ORDERS.CUSTOMER_ID;
This would produce the following result −
+------+----------+--------+---------------------+ | ID | NAME | AMOUNT | DATE | +------+----------+--------+---------------------+ | 1 | Ramesh | NULL | NULL | | 2 | Khilan | 1560 | 2009-11-20 00:00:00 | | 3 | kaushik | 3000 | 2009-10-08 00:00:00 | | 3 | kaushik | 1500 | 2009-10-08 00:00:00 | | 4 | Chaitali | 2060 | 2008-05-20 00:00:00 | | 5 | Hardik | NULL | NULL | | 6 | Komal | NULL | NULL | | 7 | Muffy | NULL | NULL | | 3 | kaushik | 3000 | 2009-10-08 00:00:00 | | 3 | kaushik | 1500 | 2009-10-08 00:00:00 | | 2 | Khilan | 1560 | 2009-11-20 00:00:00 | | 4 | Chaitali | 2060 | 2008-05-20 00:00:00
 

 
 

Friday, December 20, 2019

HDFS commands

How can you debug Hadoop code?

First, we should check the list of MapReduce jobs currently running. Next, we need to see that there are no orphaned jobs running; if yes, we need to determine the location of RM logs.
  1. Run:
    ps –ef | grep –I ResourceManager
    Then, look for the log directory in the displayed result. We have to find out the job ID from the displayed list and check if there is any error message associated with that job.
  2. On the basis of RM logs, we need to identify the worker node that was involved in the execution of the task.
  3. Now, we will login to that node and run the below code:
    ps –ef | grep –iNodeManager
  4. Then, we will examine the Node Manager log. The majority of errors come from the user-level logs for each MapReduce job.

How to configure Replication Factor in HDFS?

The hdfs-site.xml file is used to configure HDFS. Changing the dfs.replication property in hdfs-site.xml will change the default replication for all the files placed in HDFS.
We can also modify the replication factor on a per-file basis using the below:
Hadoop FS Shell:[training@localhost ~]$ hadoopfs –setrep –w 3 /my/fileConversely,
We can also change the replication factor of all the files under a directory.
[training@localhost ~]$ hadoop fs –setrep –w 3 -R /my/dir

config file named as ‘hdfs-site.xml

Replication factor is the number of replication we are creating for a particular block as to avoid any fault in system if any data block or data gets deleted or lost.
 

How to compress a Mapper output not touching Reducer output?

To achieve this compression, we should set:
conf.set("mapreduce.map.output.compress", true) conf.set("mapreduce.output.fileoutputformat.compress", false)

What is the difference between Map-side Join and Reduce-side Join?

Map-side Join at Map side is performed when data reaches the Map. We need a strict structure for defining Map-side Join. On the other hand, Reduce-side Join (Repartitioned Join) is simpler than Map-side Join since here the input datasets need not be structured. However, it is less efficient as it will have to go through sort and shuffle phases, coming with network overheads.

How can you transfer data from Hive to HDFS?

By writing the query:
hive> insert overwrite directory '/' select * from emp;
We can write our query for the data we want to import from Hive to HDFS. The output we receive will be stored in part files in the specified HDFS path


 

 
 

Monday, November 25, 2019

AWS CLI copy files S3 to local

Copying a file to S3

$ aws s3 cp MyFile.txt s3://my-bucket/path/

Moving all files from S3

$ aws s3 mv s3://my-bucket/path ./MyDirectory --recursive

Removing a file from S3

$ aws s3 rm s3://my-bucket/path/MyFile.txt

Syncing current directory with a S3 bucket

$ aws s3 sync . s3://my-bucket/path

Delete all contents of a bucket

$ aws s3 rm s3://my-bucket/path --recursive

List all buckets

$ aws s3 ls

List contents of a bucket

$ aws s3 ls s3://bucket-name

Friday, November 1, 2019

Python Misc

1) Class Variables - defined in side the class and outside of any methods()
Class variables in python are defined just after the class definition and outside of any methods:
class SomeClass:
    variable_1 = “ This is a class variable”
    variable_2 = 100   #this is also a class variable


2) Instance Variables- defined inside of the instance methods()
Unlike class variables, instance variables should be defined within methods:
class SomeClass:
    variable_1 = “ This is a class variable”
    variable_2 = 100    #this is also a class variable.

    def __init__(self, param1, param2):
        self.instance_var1 = param1
        #instance_var1 is a instance variable
 self.instance_var2 = param2   
        #instance_var2 is a instance variable
 
 
Let’s instantiate above class and do some introspections about those instances and above class:
>>> obj1 = SomeClass("some thing", 18)
#creating instance of SomeClass named obj1
>>> obj2 = SomeClass(28, 6)
#creating a instance of SomeClass named obj2

>>> obj1.variable_1
'a class variable'

>>> obj2.variable_1
'a class variable'



So as seen above, both obj1 and obj2 gives the same value when variable_1 is accessed, which is the normal behavior that we should expect from a class variable. Let’s find about instance variables:
>>> obj1.instance_var1
'some thing'
>>> obj2.instance_var1
28

 
1) Class methods(cls)-- cls never used. 2) Instance methogs(self, args). self used to access values

class SomeClass:    
    def create_arr(self): # An instance method
        self.arr = []
    
    def insert_to_arr(self, value):  #An instance method
        self.arr.append(value)
 
We can instantiate above class as obj3, and do some investigations as follows:
>>> obj3 = SomeClass()
>>> obj3.create_arr()
>>> obj3.insert_to_arr(5)
>>> obj3.arr
[5]


class SomeClass:
    def create_arr(self): # An instance method
        self.arr = []
    
    def insert_to_arr(self, value):  #An instance method
        self.arr.append(value)
        
    @classmethod
    def class_method(cls):
        print("the class method was called")

Without even instantiating an object, we can access class methods as follows:
SomeClass.class_method()

 
self:

class SomeClass:
    def __init__(self):
        self.arr = [] 
        #All SomeClass objects will have an array arr by default
    
    def insert_to_arr(self, value):
        self.arr.append(value)
So now let’s create two objects of SomeClass and append some values for their arrays:
obj1 = SomeClass()
obj2 = SomeClass()
obj1.insert_to_arr(6)


SomeClass.inseart_to_arr(obj1, 6)

Friday, October 11, 2019

Kafka Commands

Apache Kafka Command Line Interface (CLI)

Here are some  commands often be used when we work with Apache Kafka command line interface (CLI).

1. Start the Kafka server

We needs 2 steps:

1.1 Start the ZooKeeper

1.2. Start the Kafka server

2. List all topics

The output in my console:

3. Create a topic

The output is:
Above command will create a “hello-topic“, with replication-factor = 1 and the number of partitions is 1. Note that the replication-factor controls how many servers will replicate each message that is written; therefore, it should be less than or equal the number of Kafka servers/brokers.

4. Describe a topic

The output is as below:

5. Publish messages to a topic

One of the most interesting command.
After this command, you can add any messages to the console, line by line. For ex:
 You can stop the console consumer via Ctrl-C

6. Consume messages

Below command will consume messages from the topic: hello-topic
The output on my console is:
You can stop the console consumer via Ctrl-C

7. Alter Apache Kafka Topics

7.1. Add more partitions to the topic

Below command will add 10 more partitions to the hello-topic topic. Note that before, the topic has only 1 partition.
The output in my console:

7.2. Delete a topic

7.3. Add configurations to the Kafka topic

The general syntax is:
For example,  below command will set the max message size = 128000 bytes for the hello-topic topic.
Here is an example output:

7.4. Add configurations to the Kafka topic

To remove above overridden configuration, we can use command:
For the list of configurations, please reference Apache Kafka page

Distributed Computing: A Guide to Comparing Data Between Hive Tables Using Spark

In big data, efficient data comparison is essential for ensuring data integrity and validating data migrations. Apache Spark, with its in-me...