Step 2: Add Backup Metadata
Add context information to each record for recovery scenarios and audit trails.
The Goal
Add a _backup_metadata object containing:
- Backup date (for organizing files)
- Backup timestamp (exact time)
- Source host and database
- Pipeline version
- Processing node ID
Why Metadata Matters
Recovery Context: Know exactly when and where data was backed up.
Audit Trail: Prove backup compliance with timestamps.
Debugging: Identify which node processed which records.
Implementation
step-2-metadata.yaml
pipeline:
processors:
# Step 2: Add backup metadata
- mapping: |
root = this
root._backup_metadata = {
"backup_date": now().ts_format("2006-01-02"),
"backup_timestamp": now(),
"source_host": env("DB_HOST").or("unknown"),
"source_database": env("DB_NAME").or("unknown"),
"pipeline_version": "1.0.0",
"node_id": env("NODE_ID").or("unknown")
}
Understanding the Code
| Field | Purpose | Example |
|---|---|---|
backup_date | File path organization | "2024-01-15" |
backup_timestamp | Exact backup time | "2024-01-15T02:00:00Z" |
source_host | Origin database server | "postgres.internal.corp" |
source_database | Database name | "ecommerce" |
pipeline_version | Config version tracking | "1.0.0" |
node_id | Processing node | "backup-node-1" |
Expected Output
{
"order_id": 12345,
"customer_id": "CUST-001",
"total": 99.99,
"_table": "orders",
"_backup_type": "incremental",
"_backup_metadata": {
"backup_date": "2024-01-15",
"backup_timestamp": "2024-01-15T02:00:00Z",
"source_host": "postgres.internal.corp",
"source_database": "ecommerce",
"pipeline_version": "1.0.0",
"node_id": "backup-node-1"
}
}
Production Considerations
Git Commit as Version
Track exact config version:
root._backup_metadata.pipeline_version = env("PIPELINE_VERSION").or("dev")
root._backup_metadata.git_commit = env("GIT_COMMIT").or("unknown")
Backup Job ID
Generate unique ID per backup run:
root._backup_metadata.job_id = uuid_v4()
root._backup_metadata.run_started_at = env("RUN_START_TIME").or(now())
Table-Specific Metadata
Add table row counts for validation:
root._backup_metadata.source_table = this._table
root._backup_metadata.extraction_query = match this._table {
"orders" => "updated_at >= CURRENT_DATE - INTERVAL '1 day'",
"inventory" => "full_table",
_ => "unknown"
}