Chapter 2 — Data Ingestion: Sources, Formats, and Patterns

Learning Objectives

Pre-Reading Quiz — Section 1: Data Sources

1. A team currently runs nightly full-table SELECTs on a billion-row orders table to ingest fresh data. Which approach would best address both the load placed on the OLTP database and the staleness of features?

Increase the query frequency to every 15 minutes Switch to CDC by tailing the database transaction log via Debezium Replicate the entire database to a read replica and query that instead Cache query results in Redis for downstream consumers

2. Why is the lakehouse (Delta, Iceberg, Hudi over Parquet) typically chosen as the home of historical ML training data rather than a raw data lake?

It uses cheaper storage than S3 It eliminates the need for any orchestration tool It adds transactional semantics, schema evolution, and time travel for reproducibility It automatically converts files into TFRecord for TensorFlow

3. A real-time recommendation system needs to surface "items viewed in the last five minutes." Which source/mechanism pairing best supports this feature?

Nightly S3 export of clickstream archives Hourly Postgres query of a clicks table Kafka clickstream topic consumed by a stream processor Scheduled REST API poll every 60 seconds

1. Data Sources for ML

Key Points

ML pipelines rarely consume data from a single tidy source. A production fraud-detection model might pull transaction events from Kafka, account state from a Postgres database via change data capture, merchant metadata from a nightly S3 export, and risk-list updates from a third-party REST API. Each source has a native shape, a natural cadence, and a preferred ingestion mechanism. Forcing one paradigm onto all sources is the most common ingestion anti-pattern.

Relational databases and CDC. The simplest path — periodic SELECTs scheduled by Airflow — scales poorly: nightly scans waste compute when few rows changed, and they load a database that is also serving production traffic. CDC solves this by reading the transaction log directly. Debezium tails the MySQL binlog or the Postgres WAL, converts inserts/updates/deletes into structured events, and publishes them to Kafka. CDC preserves source-of-truth semantics while enabling incremental updates; the cost is operational complexity (log permissions, schema evolution, periodic full re-syncs).

Object storage, data lakes, and lakehouses. S3, GCS, and ADLS host files from batch ETL jobs, third-party providers, and log shippers. A data lake is permissive: any team can drop any file. A lakehouse layers a table format (Delta Lake, Iceberg, Hudi) on top of Parquet, giving the collection transactional semantics, schema evolution, and time travel. ML teams commonly use bronze (raw), silver (cleaned), and gold (feature-ready) zones, training on gold and snapshotting at a known version for reproducibility.

Event streams. Apache Kafka is the de facto self-managed standard; AWS Kinesis, Google Pub/Sub, and Azure Event Hubs are managed equivalents. Streams matter to ML for two reasons: they are the substrate of real-time features (e.g., "items viewed in the last 5 minutes"), and they serve as a durable buffer between source systems and downstream processors — a fraud pipeline can fall behind for an hour during a deploy without losing data.

APIs, logs, and sensors. External HTTP APIs (weather, FX, risk scores) need rate-limit awareness and retry logic. Application logs (shipped via Fluentd or Logstash) need parsing and timestamp normalization. IoT sensor feeds (often MQTT before crossing into Kafka) need gap detection and clock-skew handling.

Figure 2.1: Heterogeneous data sources feeding an ML ingestion layer

flowchart LR A[Postgres OLTP] -->|CDC via Debezium| E[Ingestion Layer] B[S3 Data Lake] -->|Batch read| E C[Kafka Clickstream] -->|Stream consumer| E D[Third-party REST API] -->|Scheduled pull| E E --> F[(Feature Store)] E --> G[(Training Lake)]
Post-Reading Quiz — Section 1: Data Sources

1. A team currently runs nightly full-table SELECTs on a billion-row orders table to ingest fresh data. Which approach would best address both the load placed on the OLTP database and the staleness of features?

Increase the query frequency to every 15 minutes Switch to CDC by tailing the database transaction log via Debezium Replicate the entire database to a read replica and query that instead Cache query results in Redis for downstream consumers

2. Why is the lakehouse (Delta, Iceberg, Hudi over Parquet) typically chosen as the home of historical ML training data rather than a raw data lake?

It uses cheaper storage than S3 It eliminates the need for any orchestration tool It adds transactional semantics, schema evolution, and time travel for reproducibility It automatically converts files into TFRecord for TensorFlow

3. A real-time recommendation system needs to surface "items viewed in the last five minutes." Which source/mechanism pairing best supports this feature?

Nightly S3 export of clickstream archives Hourly Postgres query of a clicks table Kafka clickstream topic consumed by a stream processor Scheduled REST API poll every 60 seconds
Pre-Reading Quiz — Section 2: Batch vs Streaming

1. Which workload is the strongest fit for a pure batch ingestion path rather than streaming?

Per-swipe credit card fraud scoring Computing 30-day customer spend aggregates for a weekly model refresh Recommendation features built from the user's last 5 minutes of clicks Real-time dynamic pricing reacting to current traffic

2. What is the principal risk of running a Lambda architecture in production?

Streaming is too slow to keep up with batch updates It cannot serve online features at low latency Two code paths (batch and stream) can drift, producing inconsistent feature values It requires Kappa for backfills

3. A team adopts Kappa architecture instead of Lambda. What is the most likely operational cost they will encounter?

They must maintain two separate feature definitions, one in SQL and one in Flink Online features become impossible to serve Backfilling a new feature requires replaying months of log, which is expensive and tricky They lose exactly-once semantics in Kafka

2. Batch vs. Streaming Ingestion

Key Points

Once you know where data lives, the next question is how often to move it. The decision is driven by one variable above all: how stale can features be before model quality suffers?

Batch patterns. An orchestrator (Airflow, Prefect, Dagster) triggers a Spark or SQL job on a cron schedule. The job reads a delta, transforms it, and writes to a destination table. Batch is ideal for (a) building large historical training sets from a lake, (b) computing slow-moving aggregates (30-day spend, lifetime value), and (c) backfills when a new feature definition must be applied to historical data.

Streaming with Kafka and Flink. A producer publishes to Kafka or Kinesis; a stream processor (Flink, Spark Structured Streaming, Kafka Streams) consumes events, applies windowed transformations, and writes results to an online store. Online stores (Redis, DynamoDB, Cassandra, Aerospike) serve features at single-digit-ms latency for inference. Streaming is the right choice when models must react within the same session.

Figure 2.2: Batch vs streaming ingestion paths

flowchart TD S[Source Systems] --> B{Latency Budget?} B -->|Minutes to hours| BATCH[Batch Path] B -->|Seconds| STREAM[Streaming Path] BATCH --> AF[Airflow Schedule] --> SP[Spark Job] --> OFF[(Offline Store)] STREAM --> KF[Kafka Topic] --> FL[Flink Processor] --> ON[(Online Store)] OFF --> M[Model Training] ON --> I[Model Inference]

Animation 1 — Batch (discrete blocks) vs Streaming (continuous events)

BATCH PATH STREAMING PATH cron Airflow Batch 1 Batch 2 Batch 3 Spark Job runs on cron Offline Store (training) latency: minutes — hours Producer app events Kafka Topic append log Flink processor Online Store (inference) latency: seconds — sub-second
Batch jobs arrive as discrete crates on a schedule; streaming events flow continuously into Kafka where a consumer pulses each time it commits.

Lambda and Kappa Architectures

Lambda maintains separate batch and speed layers. The batch layer computes accurate historical features into the offline store; the speed layer computes approximate recent features from a stream into the online store; the two are merged at serving time. Each layer uses its optimal tool but two code paths can drift in subtle ways — the canonical Lambda failure mode.

Kappa takes a different approach: a single streaming pipeline is the source of truth; both historical and real-time processing run through the same code, with history reconstructed by replaying the log. Feature logic is implemented exactly once. The cost is that replaying a year of log to backfill a new feature is expensive and operationally tricky.

Figure 2.3: Lambda architecture with batch, speed, and serving layers

flowchart TD SRC[Source Events] --> BL[Batch Layer] SRC --> SL[Speed Layer] BL -->|Spark on lake| OFF[(Offline Feature Store)] SL -->|Flink on stream| ON[(Online Feature Store)] OFF --> SV[Serving Layer] ON --> SV SV --> APP[ML Application]

Animation 2 — Lambda Architecture: data forks into batch + speed, then merges at the serving layer

Source Events Kafka / CDC / API BATCH LAYER SPEED LAYER Spark on Lake historical, accurate Offline Store training history Flink on Stream recent, approximate Online Store fresh feature values Serving Layer merges both ML Application
Source events fork into the batch layer (Spark on the lake, accurate but stale) and the speed layer (Flink on the stream, fresh but approximate); the serving layer merges both for the ML application.
AspectLambda ArchitectureKappa Architecture
LayersSeparate batch + speedSingle streaming pipeline
StorageOffline store + online storeStream is source of truth; materializes to both
Code pathsTwo (batch SQL/Spark, stream Flink)One (single stream processor)
BackfillNative via batchReplay log (expensive)
Best fitMost production ML feature storesReal-time-heavy, event-sourced systems
Main riskCode drift between layersReplay cost and operational complexity

In practice, vendor feature stores (Feast, Tecton, SageMaker Feature Store) implement a Lambda-like dual store but mitigate drift by letting users declare feature logic once in a DSL that generates both pipelines.

Post-Reading Quiz — Section 2: Batch vs Streaming

1. Which workload is the strongest fit for a pure batch ingestion path rather than streaming?

Per-swipe credit card fraud scoring Computing 30-day customer spend aggregates for a weekly model refresh Recommendation features built from the user's last 5 minutes of clicks Real-time dynamic pricing reacting to current traffic

2. What is the principal risk of running a Lambda architecture in production?

Streaming is too slow to keep up with batch updates It cannot serve online features at low latency Two code paths (batch and stream) can drift, producing inconsistent feature values It requires Kappa for backfills

3. A team adopts Kappa architecture instead of Lambda. What is the most likely operational cost they will encounter?

They must maintain two separate feature definitions, one in SQL and one in Flink Online features become impossible to serve Backfilling a new feature requires replaying months of log, which is expensive and tricky They lose exactly-once semantics in Kafka
Pre-Reading Quiz — Section 3: File Formats

1. A query selects only 3 of 200 columns from a feature table for model training. Which format property makes this query dramatically cheaper on Parquet than on Avro?

Avro's schema-on-write enforcement Column pruning — Parquet reads only the requested columns from disk Parquet's mandatory ZSTD compression Avro stores integers as text strings

2. Why is Avro typically the canonical format for Kafka topics in production, even though analytical queries prefer Parquet?

Avro has higher compression than Parquet Avro supports column pruning natively Avro plus a schema registry handles long-term schema evolution as producers add fields Kafka brokers cannot store Parquet files

3. A TensorFlow team materializes a Parquet feature table into TFRecord for training. Why is TFRecord rarely the canonical storage format itself?

TFRecord cannot be read on Linux Its schema is defined in parsing code, Spark integration is awkward, and cross-framework reuse is painful TFRecord does not support gzip compression TFRecord requires a schema registry to deserialize

3. File Formats for ML Data

Key Points

The file format you choose for training data is not a serialization detail — it determines I/O throughput, storage cost, schema-evolution flexibility, and how easily teams can share data. ML workloads have particular characteristics (wide rows, repeated reads, mixed access across frameworks) that interact with format choice in non-obvious ways.

Row-based: CSV, JSON, Avro. CSV and JSON are ubiquitous but inefficient: text wastes space, parsing is CPU-heavy, and there is no native schema enforcement. They are fine for ad-hoc inspection; they are wrong for production ML. Apache Avro is the serious row-based format: compact binary with the schema stored separately (typically in a Confluent Schema Registry), enabling field-by-field deserialization. Avro's defining strength is schema evolution — writer and reader schemas reconcile at read time, supporting added fields with defaults and renamed fields via aliases. This makes Avro the canonical format for Kafka topics, where event schemas evolve over years.

Columnar: Parquet and ORC. Columnar formats store all values of a column contiguously, which is transformative for analytical workloads. They share three key advantages: high compression (similar values pack well together), predicate pushdown (skip whole row groups based on column statistics), and column pruning (read only the columns the query needs). Parquet is the dominant choice in modern lakehouses with first-class Spark, Trino, Snowflake, and PyArrow integration. For ML, Parquet is almost always right for silver and gold layers — feature tables, training datasets, and the offline tier of a feature store.

Schema evolution on raw Parquet is workable for adds but tricky for drops or type changes. ML teams delegate evolution to a table format layered on top: Delta Lake, Iceberg, or Hudi tracks versioned schemas, supports MERGE INTO operations, and enables time travel for training reproducibility.

TFRecord and Petastorm. TFRecord is TensorFlow's native training format: a sequence of length-prefixed protobuf tf.train.Example records, optimized for sequential reads via tf.data.TFRecordDataset. When the input pipeline is the GPU-throughput bottleneck, TFRecord can be more stable than reading Parquet through a Python adapter. The cost is significant: the schema lives in parsing code (not the file), Spark integration is awkward, and cross-framework reuse is painful. Most teams should use TFRecord only as a derived training artifact materialized from Parquet for a specific TensorFlow job. Petastorm bridges Parquet and deep-learning frameworks, exposing a Parquet dataset as a streaming PyTorch/TensorFlow dataset and removing much of the motivation for TFRecord.

Compression tradeoffs. Stronger compression (Gzip, ZSTD high level) reduces storage and network cost but raises CPU on every read. Snappy and LZ4 are common defaults for ML training data because read CPU often matters more than storage. ZSTD is a strong middle ground — compression close to Gzip with decompression speed close to Snappy.

FormatStorageCompressionSchema EvolutionBest ML Use
ParquetColumnarSnappy, ZSTD, GzipModerate (via Delta/Iceberg/Hudi)Lake, offline feature store, gold training tables
ORCColumnarZlib, Snappy, ZSTDModerate (via table format)Hive-legacy lakes
AvroRow-basedSnappy, DeflateStrongest (registry, aliases, defaults)Kafka topics, raw bronze layer
TFRecordRow-based (protobuf)Gzip file-levelWeak (code-defined)Derived training artifact for high-throughput TF jobs
Post-Reading Quiz — Section 3: File Formats

1. A query selects only 3 of 200 columns from a feature table for model training. Which format property makes this query dramatically cheaper on Parquet than on Avro?

Avro's schema-on-write enforcement Column pruning — Parquet reads only the requested columns from disk Parquet's mandatory ZSTD compression Avro stores integers as text strings

2. Why is Avro typically the canonical format for Kafka topics in production, even though analytical queries prefer Parquet?

Avro has higher compression than Parquet Avro supports column pruning natively Avro plus a schema registry handles long-term schema evolution as producers add fields Kafka brokers cannot store Parquet files

3. A TensorFlow team materializes a Parquet feature table into TFRecord for training. Why is TFRecord rarely the canonical storage format itself?

TFRecord cannot be read on Linux Its schema is defined in parsing code, Spark integration is awkward, and cross-framework reuse is painful TFRecord does not support gzip compression TFRecord requires a schema registry to deserialize
Pre-Reading Quiz — Section 4: Reliability

1. A Kafka consumer writes feature updates to an online store that does not support Kafka transactions. The pipeline must tolerate retries without corrupting the store. Which combination achieves this?

Disable idempotence on the producer and rely on the broker At-least-once delivery from Kafka plus sink-side upserts keyed by a stable event ID Use only exactly-once semantics inside Kafka and ignore the sink Drop every message that produces a 5xx and continue

2. A consumer encounters a single poison message that fails validation due to a business-rule violation. The team wants the rest of the pipeline to keep moving. What is the correct response?

Stop the consumer until an engineer manually fixes the message Discard the message silently to avoid alerting noise Publish it to a dead letter queue with error metadata, alert, and continue Retry the message indefinitely with exponential backoff

3. A producer needs to add a new optional field to an Avro-encoded event. Which approach preserves backward compatibility for existing consumers?

Add the field as required with no default Rename an existing field instead of adding a new one Add the field with a default value and register the new schema version Drop an old required field to make room for the new one

4. Ingestion Reliability

Key Points

A pipeline that ingests correctly 99% of the time is not 1% wrong — it is broken. The 1% manifests as silent feature drift, training-serving skew, and missing labels that degrade accuracy in ways that are nearly impossible to debug after the fact. Reliable ingestion rests on four pillars: idempotency, schema management, backpressure handling, and lineage.

Idempotency and Exactly-Once Semantics

Distributed systems fail. Network calls time out, brokers restart, consumers get rebalanced. Every reliable pipeline must assume retries will happen and ensure that processing the same message twice produces the same result as processing it once. Within Kafka, enable.idempotence=true on a producer assigns it a producer ID and tracks sequence numbers per partition, so a retry after an in-flight failure does not duplicate the message. For read-process-write pipelines that stay inside Kafka, transactional producers go further: initTransactions(), beginTransaction(), sendOffsetsToTransaction(), commitTransaction() make output records and offset commits atomic, with consumers in read_committed mode seeing only committed transactions.

Outside Kafka, transactional guarantees do not extend — a feature store or a lake cannot participate in a Kafka transaction. The practical pattern is at-least-once delivery from Kafka combined with idempotent writes at the sink. Every event carries a stable identifier (a UUID assigned upstream, or a hash of entity ID and event time), and the sink performs upserts keyed by that identifier. Writing the same event twice overwrites the same row with the same value; the duplicate is invisible downstream.

For lake ingestion, the idiom is MERGE INTO on a Delta/Iceberg/Hudi table keyed by event ID. For online feature stores, it is SET keyed by (entity_id, feature_name) with a write-time check that ignores updates with timestamps older than the current value, preventing out-of-order events from overwriting fresh data with stale data. Compacted Kafka topics provide a third option: keyed by entity, the broker retains only the latest value per key.

Analogy: idempotency is like a hotel reservation confirmation number. If your booking app crashes and you click "Reserve" again, the hotel uses the confirmation number to recognize the duplicate and charges you once, not twice.

Figure 2.4: Kafka exactly-once flow across producer, broker, and consumer

sequenceDiagram participant P as Producer participant B as Kafka Broker participant C as Consumer P->>B: initTransactions(transactional.id) P->>B: beginTransaction() P->>B: send(record, seq#) B-->>P: ack (dedup via PID+seq) P->>B: sendOffsetsToTransaction() P->>B: commitTransaction() B->>C: deliver (read_committed) C->>C: process exactly once

Animation 3 — Kafka exactly-once: producer ID + sequence, broker dedup, consumer commits offset

Producer enable.idempotence transactional.id Kafka Broker dedup via PID + seq append on commit Consumer read_committed commit offset msg PID=42 seq=7 committed record 1. initTransactions() register transactional.id 2. send(record, seq#) retry safe 3. broker dedups drops duplicate seq 4. commitTransaction() offset + record atomic 5. consume + commit exactly once OK
Producer tags each message with its PID + sequence number; the broker dedups duplicates from in-flight retries; a transactional commit atomically writes records and offsets; the read_committed consumer processes each record exactly once.

Schema Evolution

Source schemas change — a microservice adds a field, a type widens, a nullable column becomes required. If ingestion treats every change as breaking, every minor source update halts the pipeline. The pattern is a schema registry: Confluent Schema Registry (and compatibles) stores Avro/Protobuf/JSON Schema definitions keyed by topic/subject and enforces compatibility (backward, forward, full) on every new version. Producers register the schema before publishing; consumers fetch the writer's schema by ID and reconcile with their reader's schema at deserialization.

Practical rules: add new fields with defaults (backward compatible); avoid renames (use aliases); never drop required fields; never change a field's type incompatibly. ML pipelines should always include stable identifiers — entity ID, event ID, event timestamp — as required fields, because these are the keys on which idempotency depends. On the lake side, table formats (Delta, Iceberg, Hudi) extend schema evolution to columnar files as metadata operations that do not rewrite historical data.

Backpressure, Retries, and Dead Letter Queues

A pipeline that ingests faster than its sink can absorb does not just slow down — it falls over. Memory fills, GC pauses extend, consumers get evicted, and the lag chart turns into a wall. Backpressure is the mechanism by which a slow downstream signals an upstream producer to slow down. In Kafka consumers, backpressure is largely manual: tune max.poll.records, max.partition.fetch.bytes, and fetch.max.bytes so each poll returns only what the processor can handle before the next deadline. In stream-processing frameworks (Flink, Spark Structured Streaming, Kafka Streams), backpressure is automatic.

When the sink fails on a specific record — bad schema, business-rule violation, missing reference data — the answer is a dead letter queue. The failing message is published to a side topic with error metadata; the main pipeline keeps moving. Operators triage the DLQ separately, often replaying records back to the main topic after fixing the underlying issue. For transient errors (5xx, rate limits), exponential backoff with jitter prevents thundering-herd retries that turn a brief outage into a sustained one.

Failure TypeDetectionResponse
Transient sink error (5xx, throttle)HTTP code, exception typeExponential backoff with jitter, retry in place
Permanent data error (schema, business rule)Validation, parsing failurePublish to DLQ with metadata, alert, continue
Slow sink (backpressure)Consumer lag, queue depthReduce poll size, slow consumption
Kafka rebalanceConsumer group eventCommit offsets, replay last batch (idempotency handles duplicates)
Sink unavailable (extended outage)Repeated failuresPause consumer, alert, manual recovery

Lineage

Lineage is knowing, for any training row or feature value, exactly which source events produced it, through which transformations, at which versions. It matters for three reasons: debugging (trace a suspicious feature to its source), compliance (regulated industries need to reproduce any prediction), and reproducibility (retraining last quarter's model requires reconstructing last quarter's data). Lineage is captured at multiple layers: table formats record version histories; orchestrators record job inputs/outputs; OpenLineage, Marquez, and DataHub aggregate signals into a graph; MLflow and feature stores like Feast record the dataset versions and feature definitions used in each training run.

Post-Reading Quiz — Section 4: Reliability

1. A Kafka consumer writes feature updates to an online store that does not support Kafka transactions. The pipeline must tolerate retries without corrupting the store. Which combination achieves this?

Disable idempotence on the producer and rely on the broker At-least-once delivery from Kafka plus sink-side upserts keyed by a stable event ID Use only exactly-once semantics inside Kafka and ignore the sink Drop every message that produces a 5xx and continue

2. A consumer encounters a single poison message that fails validation due to a business-rule violation. The team wants the rest of the pipeline to keep moving. What is the correct response?

Stop the consumer until an engineer manually fixes the message Discard the message silently to avoid alerting noise Publish it to a dead letter queue with error metadata, alert, and continue Retry the message indefinitely with exponential backoff

3. A producer needs to add a new optional field to an Avro-encoded event. Which approach preserves backward compatibility for existing consumers?

Add the field as required with no default Rename an existing field instead of adding a new one Add the field with a default value and register the new schema version Drop an old required field to make room for the new one

Your Progress

Answer Explanations