Data Processing

Apache Spark

  • When you deploy a compute cluster or SQL warehouse on Azure Databricks, Apache Spark is configured and deployed to virtual machines.

Data Frame

  • A dataset organized into named columns. You can think of a DataFrame like a spreadsheet or a SQL table, a two-dimensional labeled data structure of a series of records (similar to rows in a table) and columns of different types. DataFrames provide a rich set of functions (for example, select columns, filter, join, and aggregate) that allow you to perform common data manipulation and analysis tasks efficiently.

  • DataFrames are immutable. Because of this, after performing transformations, a new DataFrame is returned that has to be saved to a variable in order to access it in subsequent operations

Vs Pandas

  • both Pandas and Apache Spark are used for data manipulation and analysis

Feature

Pandas

Apache Spark (PySpark)

Scale

Single machine (Local)

Distributed Cluster (Multiple machines)

Memory

Loads everything into RAM

Can process data larger than RAM

Execution

Eager (Runs immediately)

Lazy (Plans first, runs later)

Performance

Faster for small datasets (< 1GB)

Faster for massive datasets (> 10GB)

Complexity

Simple to set up and debug

Requires cluster management/overhead

  • Use Pandas if: Your data fits comfortably in your computer's RAM, you are doing exploratory data analysis (EDA), or you are working on a local machine/laptop.

  • Use Spark if: Your data is "Big Data" (multi-gigabyte or terabyte scale), you are building production data pipelines in the cloud (like on Databricks), or you need to process streaming data in real-time

Reading File

  • Create data frame from file

Write

  • Write data frame to the table or file

SQL

Type of Table

Managed Table

  • Created under the database directory

  • Managed data location and lifecycle

  • If dropping the table, will delete the underlying data files

External Table

  • Created outside the database directory

  • User-managed outside the default location

  • If dropping the table, will not delete the underlying data files

File Location

  • For Unity Catalog, the data will be stored on

  • For Hive meta store (legacy approach), tables reside in the DBFS default location.

Describe Command

1. Simple Describe

  • Shows the basic schema: column names, data types, and any comments associated with those columns.

Output Includes:

  • col_name: Name of the column.

  • data_type: The type (string, int, array, etc.).

  • comment: Any documentation written during table creation.


2. Describe Extended (Most Common)

  • This is usually what developers want. It provides the basic schema plus detailed metadata about the table's physical storage and configuration.

Output Includes:

  • Database/Schema name

  • Owner (Who created it)

  • Created Time

  • Location (The exact S3/ADLS path where the data lives)

  • Provider (e.g., delta, parquet, csv)

  • Table Properties (Any custom tags or Delta configurations like delta.autoOptimize)

  • Partition Information (If the table is partitioned)


3. Describe Detail (Delta Specific)

  • If you are using Delta Lake, this command provides even deeper technical specifics regarding the current state of the Delta table.

Output Includes:

  • Format: Always delta.

  • SizeInBytes: Total physical size on disk.

  • NumFiles: How many parquet files make up the table (great for checking "small file problems").

  • CreatedAt & LastModified: Timestamps for table lifecycle.

  • MinReaderVersion / MinWriterVersion: Protocol versions for Delta features.


4. Describe History (Audit Trail)

  • While not strictly for "schema," this is a vital related command for Delta tables. It shows the transaction log (who changed what and when).

  • Use case: Finding out when a table was last updated or identifying which user ran a specific DELETE or UPDATE command.

Array / Json Operation

Json

get_json_object

{"id": 10, "city": "NY"}

get_json_object(col, '$.city')

"NY"

from_json

{"rank": 1, "score": 95}

from_json(col, 'rank INT, score INT')

{rank: 1, score: 95} (Struct)

to_json

{a: 1, b: 2} (Struct)

to_json(col)

"{"a":1,"b":2}" (String)

json_tuple

{"user": "A", "age": 30}

json_tuple(col, 'user', 'age')

["A", 30] (Multiple Columns)

Array

flatten

[[1, 2], [3]]

flatten(col)

[1, 2, 3]

array_distinct

[1, 1, 2]

array_distinct(col)

[1, 2]

array_sort

[3, 1, 2]

array_sort(col)

[1, 2, 3]

array_remove

[1, 2, 3]

array_remove(col, 2)

[1, 3]

slice

[A, B, C, D]

slice(col, 2, 2)

[B, C]

Explode

This is the most common array operation. it creates a new row for every element in the array.

  • Input Row: user_id: 1, skills: ["SQL", "Spark"]

  • SQL: SELECT user_id, explode(skills) AS skill FROM table GROUP BY ...

[1, 2, 3]

1

2

3

collect_list

Collects all values from a group into an array, including duplicates.

  • Input Rows: A, A, B

  • SQL: SELECT collect_list(col) FROM table

  • Result: ["A", "A", "B"]

collect_set

Collects only unique values from a group into an array.

  • Input Rows: A, A, B

  • SQL: SELECT collect_set(col) FROM table

  • Result: ["A", "B"]

High Order function

1. transform (The "Map" function)

This is used to modify every element in an array.

  • Example: Imagine you have an array of temperatures in Celsius and want to convert them to Fahrenheit.

2. filter

This removes elements from an array that do not meet a specific condition.

  • Example: Keeping only the "High Priority" IDs from a list.

3. exists

This returns a boolean (true/false) if at least one element in the array matches your criteria.

  • Example: Checking if a customer's order history contains a specific product category.

4. aggregate (The "Reduce" function)

This is the most powerful (and complex) higher-order function. It reduces an entire array down to a single value by applying a stateful calculation.

  • Example: Calculating a custom weighted sum of an array.

Advanced SQL Command

CTE (Common Table Expression)

  • CTE is a temporary result set that you define within the execution of a single query. It starts with the WITH keyword. It only exists for the duration of that specific command.

  • Purpose: To make complex queries more readable by breaking them into logical steps.

  • Scope: Only the query immediately following it.

  • Storage: In-memory during execution; nothing is saved to the Databricks Catalog.

Create OR Replace

  • To allows you to either create a new table or completely overwrite an existing one with a single atomic operation.

  • If it doesn't exist: It creates the table from scratch.

  • If it does exist: It "drops" the old version and replaces it with the new definition and data.

Insert Overwrite

  • Used to replace the data within a table without changing the table's metadata (like its schema, permissions, or generated columns)

  • Preserves all historical versions of the data, you can time travel to previous versions

Create OR Replace VS Insert Overwrite

Similarity

  • The old data (row) are gone , but replacing with new data

Difference

  • Must match the existing table schema when using INSERT OVERWRITE , while the

Merge INTO

  • Mainly for atomic concurrency upserts

Copying & Cloning

  • Create Table As Select (CTAS)

  • Deep Clone

  • Fully copy data + metadata from source

  • Can sync change

  • Shallow Clone

  • Quickly create a copy of table

Views

Querying File

  • It will be an external table with same location of csv

Copy Into

  • COPY INTO command is a high-performance, retryable SQL command used to load data from a cloud storage location (like S3, ADLS, or GCS) into a Delta Lake table.

  • One off solution to ingest the raw data from source

Delta Live Table (DLT)

  • a declarative framework for building reliable, maintainable, and testable data processing pipelines.

Change Data Capture (CDC)

  • CDC is a design pattern that identifies and captures only the changes (Inserts, Updates, and Deletes) made to a source database. Instead of copying the entire database every night, you just stream a "feed" of the rows that changed.

  • Here is the example

user_id
name
city
op_type
sequence_num

101

Alice

London

INSERT

001

101

Alice

New York

UPDATE

002

102

Bob

Paris

DELETE

003

Validation / On Violation

Action
SQL syntax
Python syntax
Result

EXPECT

dp.expect

Invalid records are written to the target.

EXPECT ... ON VIOLATION DROP ROW

dp.expect_or_drop

Invalid records are dropped before data is written to the target. The count of dropped records is logged alongside other dataset metrics.

EXPECT ... ON VIOLATION FAIL UPDATE

dp.expect_or_fail

Invalid records prevent the update from succeeding. Manual intervention is required before reprocessing. This expectation causes a failure of a single flow and does not cause other flows in your pipeline to fail.

Streaming

  • either a traditional approach where you reprocess the entire dataset, each time you receive a new update to your data.

  • write a custom logic to only capture those files or records that have been added since the last time an update was run.

Structured Streaming

  • It allows users to interact with ever-growing data source as if it were just a static table of records.

    So new data in the input data stream is simply treated as new rows appended to a table.

    And such a table representing an infinite data source is seen as "unbounded" table.

    As we said, an input data stream could be a directory of files, a messaging system like Kafka, or simply a Delta table.

  • spark.readStream() to query the delta table as a stream source, which allows to process all of the data present in the table as well as any new data that arrive later.

  • This creates a streaming data frame on which we can apply any transformation as if it were just a

    static data frame.

  • To persist the result of a streaming query. We need to write them out to durable storage using dataframe.writeStream method. With the writeStream method, we can configure our output.

Auto Loader

  • Auto loader can scale to support real time ingestion of millions of files per hour. And since it is based on spark structured streaming, Auto Loader uses checkpointing to track the ingestion process and to store metadata of the discovered files.

  • Ensures that data files are processed exactly once. And in case of failure, Auto Loader can resume from where it left off.

  • Has a specific format of StreamReader called cloudFiles. And in order to specify the format of the source files, we use simply cloudFiles.format option.

  • Detect new files as they arrive and queue them for ingestion. Once we read the files, we write the data into a target table using the StreamWriter, where you provide the location to store the checkpointing information.

  • Inferred all data to be STRING, since not safely determine a specific data type (like Integer, Boolean, or Timestamp) for your columns, so it chose the "safest" format possible: Plain Text by default. Schema hint can be provided as below

Schema Evolution Mode

addNewColumns

(Default) Updates the schema and restarts the stream to include the new column.

failOnNewColumns

Stops the stream and throws an error so you can manually review the change.

rescue

Keeps the table schema exactly as is, but puts new/mismatched data into a _rescued_data column.

none

Ignores new columns and doesn't capture them at all (not recommended).

Autoloader vs Readable Stream

Feature

Auto Loader

Read Stream (Direct)

Data Source

Files (S3/ADLS/GCS)

Messaging (Kafka/Kinesis) or Delta Tables

Tracking

Tracks filenames/checkpoint

Tracks offsets/checkpoint

Schema

Automatic inference & evolution

Usually manual (must parse JSON/Avro)

Best For

Initial ingestion of raw data

Real-time event processing

Multi-Hop Architecture

  • Multi hop architecture usually consists of three layers: bronze, silver and gold.

  • Bronze table contains raw data ingested from various sources. Like json files, operational databases, or Kafka Stream

  • Silver table that provides more refined view of our data. For example, data can be cleaned and filtered at this level. And we can also join fields from various brands table to enrich our silver records.

  • Gold table that provides business-level aggregations, often used for reporting and dashboarding or even for machine learning.

Example

Bronze

  • Load the file into order-raw table and created view

  • Create temporary view and injected into bronze table

Silver

  • Create temporary view and join other table

  • inject into silver table

Gold

  • Aggregate the table to form the view

  • Output as a result table

Last updated