Skip to content
· Platform Team ·

Building a Composable ETL Framework for Spark

How we replaced bespoke PySpark scripts with a config-driven, hook-based framework inspired by Rust's composition model.

spark data-engineering architecture python iceberg
Building a Composable ETL Framework for Spark

Most data teams hit the same problem eventually: every pipeline is written differently. The boilerplate — Spark session setup, Iceberg catalog registration, S3A client tuning, shuffle configuration — gets repeated in every project. New pipelines start as copies of old ones. Bugs in shared patterns have to be fixed everywhere individually.

We built a framework that standardizes the infrastructure without constraining the business logic.

ComposableETL pipeline diagram showing Extract stage flowing through three removable POST_EXTRACT hook cards (DeduplicationMixin priority 5, CleaningMixin priority 10, StateCodeMixin priority 20) into Compact stage, with ETLContext carrying named DataFrames between stages and YAML Config feeding infrastructure settings

Composition Over Inheritance

The instinct with ETL frameworks is to build a deep class hierarchy: BaseETLSparkETLIcebergETLDailyIcebergETL. Each layer adds behavior, each layer constrains the next. By the time a data engineer subclasses the leaf, they are fighting the framework more than using it.

We took inspiration from Rust’s trait system instead. In Rust, you compose behavior by implementing traits on a type — there is no inheritance hierarchy. The type declares what capabilities it has, and the compiler ensures everything fits together. Python does not have traits, but it has mixins and the strategy pattern, which achieve the same goal.

Our ComposableETL base class accepts pluggable strategies for each phase of the pipeline: extractors read data, transformers process it, compactors write it. Cross-cutting concerns — CLI argument parsing, Write-Audit-Publish, empty data handling — are mixins that compose independently.

Here is what a production ETL looks like:

from lakehouse_spark_core import ComposableETL, ProcessDateCLIMixin
from lakehouse_spark_core.mixins import WAPMixin
from helpers.deduplication_hook import DeduplicationMixin
from helpers.cleaning_hook import CleaningMixin
from helpers.enrichment_hook import EnrichmentMixin
class DataCleaningETL(
ProcessDateCLIMixin,
WAPMixin,
DeduplicationMixin,
CleaningMixin,
EnrichmentMixin,
ComposableETL,
):
"""Transforms raw data into cleaned, standardized format."""

That is the entire class. No method overrides, no abstract method implementations. All behavior is composed from mixins:

  • ProcessDateCLIMixin — adds --process-date and --config CLI arguments
  • WAPMixin — enables Write-Audit-Publish with branch-based atomic writes
  • DeduplicationMixin — removes duplicate records by a composite key
  • CleaningMixin — drops columns, trims whitespace, standardizes text, filters invalid data
  • EnrichmentMixin — adds derived columns and mappings

Each mixin is reusable. CleaningMixin can be mixed into any ETL that needs data cleaning. WAPMixin can be added to any pipeline that writes to Iceberg. They compose without knowing about each other.

The Hook System

Mixins define what behavior a pipeline has. Hooks define when that behavior runs. The hook system is the mechanism that makes composition work at runtime.

Every hook registers at a specific pipeline stage with a priority:

class DeduplicationMixin:
@register_hook(ProcessingHook.POST_EXTRACT, priority=10)
def deduplicate_data(self, context: ETLContext):
df = context.get_dataframe("raw_data")
# Window-based deduplication by composite key
window = Window.partitionBy(
"year", "month", "day", "id", "timestamp"
).orderBy("timestamp")
df_deduped = df.withColumn("rn", row_number().over(window)) \
.filter(col("rn") == 1).drop("rn")
context.add_dataframe("deduplicated_data", df_deduped)
class CleaningMixin:
@register_hook(ProcessingHook.POST_EXTRACT, priority=20)
def clean_data(self, context: ETLContext):
df = context.get_dataframe("deduplicated_data")
# Drop columns, trim whitespace, regex replace, filter invalid records
# ...
context.add_dataframe("cleaned_data", df)
class EnrichmentMixin:
@register_hook(ProcessingHook.POST_EXTRACT, priority=30)
def enrich_data(self, context: ETLContext):
df = context.get_dataframe("cleaned_data")
# Add derived columns and mappings
# ...
context.add_dataframe("cleaned_data", df)

Priority ordering is the key. DeduplicationMixin runs at priority 10 (dedup first), CleaningMixin at 20 (clean the deduped data), EnrichmentMixin at 30 (enrich after cleaning). The framework reserves priorities 1-9 for internal hooks; user hooks start at 10.

The ETLContext object flows through the pipeline, carrying named DataFrames, metrics, runtime config, and execution metadata between hooks — if you are familiar with Go, think of it as similar to context.Context. Each hook reads from and writes to the context, creating a data flow chain without hooks needing to know about each other. DeduplicationMixin produces deduplicated_data; CleaningMixin consumes it and produces cleaned_data; EnrichmentMixin enriches cleaned_data in place.

Available hook points span the full pipeline lifecycle: PRE_EXTRACT, POST_EXTRACT, PRE_TRANSFORM, POST_TRANSFORM, PRE_COMPACT, POST_COMPACT, PIPELINE_START, PIPELINE_END, PIPELINE_ERROR, and WAP-specific hooks for audit workflows.

Config-Driven Pipelines

The Python class defines what the pipeline does (through mixin composition). The YAML config defines where it reads, where it writes, and how Spark is tuned:

environment: dev
app_name: data-cleaning-etl-daily
catalog_config:
catalog_name: my_lakehouse
catalog_uri: https://catalog.example.com/catalog
warehouse: my-warehouse
data_sources:
raw_data:
extractor: iceberg_table
catalog: my_lakehouse
namespace: silver
table: data_raw
prune_date_partitions: true
partition_columns: [year, month, day]
targets:
cleaned_output:
compactor: iceberg_table
catalog: my_lakehouse
namespace: silver
table: data_cleaned
schema_class: schemas.DataCleanedTableSchema
mode: overwrite_partitions
create_if_not_exists: true
input: cleaned_data
wap:
enabled: true
branch_prefix: "wap"
on_failure: "rollback"
# Anything you put here is available in ETLContext at runtime
additional_settings:
dedup_key: ["id", "timestamp"]
null_fill_defaults: {"score": 0, "label": "unknown"}
filter_threshold: 0.95

A data engineer adding a new source does not touch Python — they add a data_sources entry. Changing shuffle partitions or S3A settings is a config change, not a code change. The YAML is infrastructure; it changes when you tune Spark or switch environments. The hooks are business logic; they change when the data model evolves. They evolve at different speeds.

The framework handles all the boilerplate: Spark session creation, Iceberg catalog registration, S3A client configuration, shuffle plugin setup. None of that leaks into the ETL code.

Project Organization

Convention matters more than clever structure. When every ETL module looks the same, onboarding is faster, code reviews are easier, and no one has to guess where the config or tests live.

A repo is a logical group of related ETLs — for example, all pipelines in a data domain (raw ingestion, cleaning, deduplication, aggregation) live in one repo. Each ETL module follows the same skeleton:

my-etl/
├── raw/
│ ├── config/
│ │ ├── dev-daily.yaml
│ │ └── dev-bulk.yaml
│ ├── helpers/
│ │ └── type_conversion_hook.py
│ ├── schemas/
│ │ └── data_raw.py
│ ├── tests/
│ │ ├── test_raw_etl.py
│ │ └── test_raw_schema.py
│ ├── raw_daily.py
│ └── raw_date_range.py
├── cleaned/
│ ├── config/
│ ├── helpers/
│ ├── schemas/
│ ├── tests/
│ ├── cleaned_daily.py
│ └── cleaned_date_range.py
├── enriched/
│ ├── ...
│ └── housekeeping/
└── aggregated/
└── ...

Every module has the same layout: config/ for environment-specific YAML, helpers/ for hook mixins, schemas/ for Iceberg table definitions, tests/ for unit and integration tests, and the ETL entry points at the module root. Daily and bulk (date range) variants share the same hooks and schemas but use different configs tuned for their workload profile.

Factory-Based Components

Extractors and compactors are created through a factory registry. When the framework reads extractor: iceberg_table or compactor: iceberg_table from config, it looks up the corresponding class in the registry and instantiates it:

class ExtractorFactory:
_registry = {
S3ParquetExtractorConfig: S3ParquetExtractor,
IcebergTableExtractorConfig: IcebergTableExtractor,
}
@classmethod
def register(cls, config_type, extractor_class):
cls._registry[config_type] = extractor_class

Adding a new extractor for a different ecosystem — say Kafka, Delta Lake, or a REST API — means writing the extractor class and registering it with the factory. Existing pipelines are untouched. The same pattern applies to compactors: IcebergTableCompactor, S3ParquetCompactor, and ParallelIcebergTableCompactor (for concurrent writes to multiple tables) are all interchangeable through the factory.

Iceberg Table Lifecycle

Iceberg comes with a lot of operational overhead that every pipeline would otherwise handle ad-hoc. The framework absorbs it:

Schema-driven table creation. The schema_class in the target config points to a Python class that defines the table’s schema, partition spec, properties, and DDL statements. No external schema registry needed — the schema lives in the ETL codebase, versioned alongside the pipeline code, and is interpreted at import time. The framework generates the DDL, creates the table if it does not exist, seeds an initial empty snapshot (required for proper WAP branch ancestry), and applies write ordering — all automatically.

class DataCleanedTableSchema(TableSchema):
@property
def schema(self) -> StructType:
return StructType([...])
@property
def table_properties(self):
return {
"write.metadata.delete-after-commit.enabled": "true",
"write.metadata.previous-versions-max": "3",
}
@property
def additional_ddl_statements(self):
return [
"ALTER TABLE {table} WRITE LOCALLY ORDERED BY timestamp, id"
]

Write-Audit-Publish. The compactor handles WAP automatically when enabled in config — creates a branch, writes data, runs audits (row count, schema validation), and publishes to main only if audits pass. Rollback is automatic on failure.

Housekeeping. Snapshot expiration, orphan file cleanup, data compaction, and manifest compaction are all config-driven. A housekeeping job is the same pattern as an ETL — a class with mixins and a YAML config:

class MyHousekeeping(HousekeepingCLIMixin, StandardHousekeeping):
pass # All behavior from config

Schema evolution. The compactor supports enable_schema_evolution: true in config, which automatically adds new columns from the DataFrame to the Iceberg table without manual ALTER TABLE statements.

Testing Without a Spark Cluster

Spark tests are slow. Starting a JVM, initializing a SparkSession, running even a trivial transformation — that is 30-60 seconds before a single assertion. In CI, this adds up fast.

We use DuckDB’s experimental PySpark API as a drop-in replacement for unit tests. DuckDB implements enough of the DataFrame API to run transformations, joins, and aggregations without a JVM:

from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
spark = DuckDBSparkSession.builder.appName("test").getOrCreate()
# Same DataFrame API, no JVM
df = spark.createDataFrame([("a", 1), ("b", 2)], ["key", "value"])
result = df.filter(df.value > 1)

The framework provides an ETLTestHarness that runs full pipeline workflows in test mode and returns structured results:

@dataclass
class ETLTestResult:
success: bool
execution_time: float
input_records: int
output_records: int
validation_errors: List[str]
performance_metrics: Dict[str, Any]

Unit tests using DuckDB run in seconds. Integration tests using a real SparkSession run against local Iceberg catalogs. The separation keeps CI fast while still validating the full stack before deployment.

What We Learned

Composition over inheritance. Mixins and hooks compose better than deep class hierarchies. A data engineer assembles behavior by listing mixins in the class declaration — no need to understand or override framework internals.

Declarative infrastructure, imperative business logic. Spark tuning, catalog settings, shuffle config — declarative in YAML. Deduplication logic, data cleaning rules, enrichment steps — imperative in hooks. Best of both worlds, and they evolve at different speeds.

Test without the full stack. DuckDB for unit tests, real Spark for integration. The DuckDB path runs in seconds and catches most logic errors. The Spark path runs in CI and catches serialization and catalyst issues.

The framework pays for itself after the third pipeline. The first two pipelines feel like overhead — learning the config format, writing the first hooks. By the third, you are reusing existing mixins and writing only the config. By the tenth, new pipelines take an afternoon instead of a week.

Manoj Babu Katragadda

Manoj Babu Katragadda

Principal Platform Engineer

Meghanath Macha

Meghanath Macha

Head of AI

$ cd ../blog