Step 1: Extract Multiple Tables
Extract data from multiple database tables in a single pipeline using the sequence input.
The Goal
- Extract orders (incremental - last 24 hours)
- Extract inventory (full table)
- Extract order_items (incremental - last 24 hours)
- Tag each record with its source table
Why Sequence Input?
Instead of running 3 separate pipelines:
Before: 3 pipelines, 3 schedules, 3 configs to maintain
After: 1 pipeline processes all tables sequentially
Implementation
step-1-extract.yaml
input:
sequence:
inputs:
# Orders - incremental (last 24h)
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: orders
columns: ["*"]
where: "updated_at >= CURRENT_DATE - INTERVAL '1 day'"
processors:
- mapping: |
root = this
root._table = "orders"
root._backup_type = "incremental"
# Inventory - full backup (smaller table)
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: inventory
columns: ["*"]
processors:
- mapping: |
root = this
root._table = "inventory"
root._backup_type = "full"
# Order items - incremental
- sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_NAME}?sslmode=require"
table: order_items
columns: ["*"]
where: "created_at >= CURRENT_DATE - INTERVAL '1 day'"
processors:
- mapping: |
root = this
root._table = "order_items"
root._backup_type = "incremental"
pipeline:
processors:
- mapping: root = this # Pass through for now
output:
stdout:
codec: json_pretty
Understanding the Code
| Component | Purpose |
|---|---|
sequence.inputs | Process each input in order |
sql_select.where | Incremental filter (last 24h) |
processors (per-input) | Tag records with source table |
root._table | Enables routing in output step |
root._backup_type | Documents backup strategy |
Incremental vs Full Backup
| Strategy | Use When | Example |
|---|---|---|
| Incremental | Large tables, frequent changes | orders, transactions |
| Full | Small tables, need complete state | inventory, config |
Incremental Filter Patterns
# Last 24 hours by updated_at
where: "updated_at >= CURRENT_DATE - INTERVAL '1 day'"
# Last 24 hours by created_at
where: "created_at >= CURRENT_DATE - INTERVAL '1 day'"
# Since last backup (requires state tracking)
where: "updated_at > '${LAST_BACKUP_TIMESTAMP}'"
Expected Output
{
"order_id": 12345,
"customer_id": "CUST-001",
"total": 99.99,
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T14:22:00Z",
"_table": "orders",
"_backup_type": "incremental"
}
Production Considerations
Database Connection Pooling
For large extracts, configure connection limits:
sql_select:
driver: postgres
dsn: "...?pool_max_conns=5"
Handling Large Tables
For very large tables, add pagination:
sql_select:
table: orders
where: "updated_at >= CURRENT_DATE - INTERVAL '1 day'"
# Process in batches of 10000
batch_size: 10000
Multiple Databases
Extract from different databases in one pipeline:
sequence:
inputs:
- sql_select:
dsn: "postgres://...@db1/ecommerce"
table: orders
- sql_select:
dsn: "postgres://...@db2/inventory"
table: stock_levels