Skip to main content

Step 2: Add Backup Metadata

Add context information to each record for recovery scenarios and audit trails.

The Goal

Add a _backup_metadata object containing:

  • Backup date (for organizing files)
  • Backup timestamp (exact time)
  • Source host and database
  • Pipeline version
  • Processing node ID

Why Metadata Matters

Recovery Context: Know exactly when and where data was backed up.

Audit Trail: Prove backup compliance with timestamps.

Debugging: Identify which node processed which records.

Implementation

step-2-metadata.yaml
pipeline:
processors:
# Step 2: Add backup metadata
- mapping: |
root = this
root._backup_metadata = {
"backup_date": now().ts_format("2006-01-02"),
"backup_timestamp": now(),
"source_host": env("DB_HOST").or("unknown"),
"source_database": env("DB_NAME").or("unknown"),
"pipeline_version": "1.0.0",
"node_id": env("NODE_ID").or("unknown")
}

Understanding the Code

FieldPurposeExample
backup_dateFile path organization"2024-01-15"
backup_timestampExact backup time"2024-01-15T02:00:00Z"
source_hostOrigin database server"postgres.internal.corp"
source_databaseDatabase name"ecommerce"
pipeline_versionConfig version tracking"1.0.0"
node_idProcessing node"backup-node-1"

Expected Output

{
"order_id": 12345,
"customer_id": "CUST-001",
"total": 99.99,
"_table": "orders",
"_backup_type": "incremental",
"_backup_metadata": {
"backup_date": "2024-01-15",
"backup_timestamp": "2024-01-15T02:00:00Z",
"source_host": "postgres.internal.corp",
"source_database": "ecommerce",
"pipeline_version": "1.0.0",
"node_id": "backup-node-1"
}
}

Production Considerations

Git Commit as Version

Track exact config version:

root._backup_metadata.pipeline_version = env("PIPELINE_VERSION").or("dev")
root._backup_metadata.git_commit = env("GIT_COMMIT").or("unknown")

Backup Job ID

Generate unique ID per backup run:

root._backup_metadata.job_id = uuid_v4()
root._backup_metadata.run_started_at = env("RUN_START_TIME").or(now())

Table-Specific Metadata

Add table row counts for validation:

root._backup_metadata.source_table = this._table
root._backup_metadata.extraction_query = match this._table {
"orders" => "updated_at >= CURRENT_DATE - INTERVAL '1 day'",
"inventory" => "full_table",
_ => "unknown"
}

Next Step