Quotes

Sunday, March 2, 2025

Distributed Computing: A Guide to Comparing Data Between Hive Tables Using Spark

In big data, efficient data comparison is essential for ensuring data integrity and validating data migrations. Apache Spark, with its in-memory processing capabilities, offers a powerful solution for comparing large datasets stored in Hive tables. In this blog post, we’ll explore how to compare data between two Hive tables using Spark, focusing on practical implementation and key considerations.

Setting Up the Spark Environment

To get started, we’ll need to set up a Spark session and read the data from our Hive tables. We’ll use the following code snippet to achieve this:

scala

import org.apache.spark.sql.{Column, SaveMode, SparkSession}

// Initialize Spark session

val spark = SparkSession.builder.appName(“HiveTableComparison”).enableHiveSupport().getOrCreate()

// Load data from Hive tables

val df1 = spark.read.format(“orc”).option(“header”, “true”).option(“delimiter”, “|”).load(“s3a://path/to/table1”)

val df2 = spark.read.format(“orc”).option(“header”, “true”).option(“delimiter”, “|”).load(“s3a://path/to/table2”)

Dropping Unnecessary Columns

In our comparison, we want to exclude certain columns that are not relevant. For example, columns like insert_datetime_utc and extract_datetime might be auto-generated and not useful for our comparison. We’ll drop these columns as follows:

scala

// Define columns to drop

val dropColLst = “insert_datetime_utc,extract_datetime”

val dropColumns = dropColLst.split(“,”)

// Drop unnecessary columns

val leftTableFilteredDF = df1.select(df1.columns.filter(colName => !dropColumns.contains(colName)).map(colName => new Column(colName)): _*)

val rightTableFilteredDF = df2.select(df2.columns.filter(colName => !dropColumns.contains(colName)).map(colName => new Column(colName)): _*)

Comparing the Data

With our filtered DataFrames ready, we can proceed to compare the data. We’ll use the except function to identify discrepancies between the two tables:

scala

// Compare data between the two tables

val df12 = leftTableFilteredDF.except(rightTableFilteredDF)

val df21 = rightTableFilteredDF.except(leftTableFilteredDF)

The except function returns rows that are present in one DataFrame but not in the other. By performing this operation in both directions, we can identify all discrepancies between the two tables.

Conclusion

Comparing data between Hive tables is a critical task for maintaining data integrity and validating ETL processes. Apache Spark provides an efficient and scalable solution for this purpose. By leveraging Spark’s powerful in-memory processing capabilities, we can compare large datasets stored in Hive tables with ease.


Sunday, March 28, 2021

A month as a product manager on B2B, B2C products

Did well:


Need to improve/practice:

When someone says something, think of what point s/he is trying to make?


1st week:

High level overview of the GM&E.

Roadmap of the product

          Meeting with the Content team

          Meeting with the marketing team.

           

2nd week:

Wireframes for the HH login

          H Link email confirmation once hlink page is created.

Shoulder dates


3rd week:

Wrote user stories for redirects  

RFP redesign 

           

4th Week:

h link user stories

H link confirmatin email 

From id details


5th Week:

 


June 12, 2020:

1. Group Attendee Website

Analytics tickets for tracking/dashboard

confirmation email, pages etc.

rebranding content updates..


2. Redirects: Hilton to G360 P.L


3. Search Component for M.H.com migration to events.hilton.com


4. S.Widget for GM&E


5.Seamless RFP




 

 

 

 

  

 

 


 


    

                    

Friday, November 13, 2020

Hive, Scala snippets

 import org.apache.spark.sql.{Column, SaveMode, SparkSession}

 val df1=spark.read.format("orc").option("header", "true").option("delimiter","|").load("s3a:

val df2=spark.read.format("orc").option("header", "true").option("delimiter","|").load("s3a:


val dropColLst ="insert_datetime_utc,extract_datetime"

val dropColumns = dropColLst.split(",")


 val leftTableFilteredDF = df1.select(df1.columns.filter(colName => !dropColumns.contains(colName)).map(colName => new Column(colName)): _*)

 val rightTableFilteredDF = df2.select(df2.columns.filter(colName => !dropColumns.contains(colName)).map(colName => new Column(colName)): _*)

 val df12 = leftTableFilteredDF.except(rightTableFilteredDF)

val df21 = rightTableFilteredDF.except(leftTableFilteredDF)


    export SPARK_MAJOR_VERSION=2

SHOW PARTITIONS schema.tablename;

ALTER TABLE schema.tablename DROP IF EXISTS PARTITION(partition_by_month_id=202010);