Skip to main content

Step 1: Extract Multiple Tables

Extract data from multiple database tables in a single pipeline using the sequence input.

The Goal

  • Extract orders (incremental - last 24 hours)
  • Extract inventory (full table)
  • Extract order_items (incremental - last 24 hours)
  • Tag each record with its source table

Why Sequence Input?

Instead of running 3 separate pipelines:

Before: 3 pipelines, 3 schedules, 3 configs to maintain

After: 1 pipeline processes all tables sequentially

Implementation

step-1-extract.yaml
input:
sequence:
inputs:
# Orders - incremental (last 24h)
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: orders
columns: ["*"]
where: "updated_at >= CURRENT_DATE - INTERVAL '1 day'"
processors:
- mapping: |
root = this
root._table = "orders"
root._backup_type = "incremental"

# Inventory - full backup (smaller table)
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: inventory
columns: ["*"]
processors:
- mapping: |
root = this
root._table = "inventory"
root._backup_type = "full"

# Order items - incremental
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: order_items
columns: ["*"]
where: "created_at >= CURRENT_DATE - INTERVAL '1 day'"
processors:
- mapping: |
root = this
root._table = "order_items"
root._backup_type = "incremental"

pipeline:
processors:
- mapping: root = this # Pass through for now

output:
stdout:
codec: json_pretty

Understanding the Code

ComponentPurpose
sequence.inputsProcess each input in order
sql_select.whereIncremental filter (last 24h)
processors (per-input)Tag records with source table
root._tableEnables routing in output step
root._backup_typeDocuments backup strategy

Incremental vs Full Backup

StrategyUse WhenExample
IncrementalLarge tables, frequent changesorders, transactions
FullSmall tables, need complete stateinventory, config

Incremental Filter Patterns

# Last 24 hours by updated_at
where: "updated_at >= CURRENT_DATE - INTERVAL '1 day'"

# Last 24 hours by created_at
where: "created_at >= CURRENT_DATE - INTERVAL '1 day'"

# Since last backup (requires state tracking)
where: "updated_at > '${LAST_BACKUP_TIMESTAMP}'"

Expected Output

{
"order_id": 12345,
"customer_id": "CUST-001",
"total": 99.99,
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T14:22:00Z",
"_table": "orders",
"_backup_type": "incremental"
}

Production Considerations

Database Connection Pooling

For large extracts, configure connection limits:

sql_select:
driver: postgres
dsn: "...?pool_max_conns=5"

Handling Large Tables

For very large tables, add pagination:

sql_select:
table: orders
where: "updated_at >= CURRENT_DATE - INTERVAL '1 day'"
# Process in batches of 10000
batch_size: 10000

Multiple Databases

Extract from different databases in one pipeline:

sequence:
inputs:
- sql_select:
dsn: "postgres://...@db1/ecommerce"
table: orders
- sql_select:
dsn: "postgres://...@db2/inventory"
table: stock_levels

Next Step