Skip to main content

Step 1: Add Lineage Metadata

Every record needs an audit trail showing where it came from, when it was processed, and which node handled it. This replaces manual DataStage annotations with automatic lineage injection.

The Goal

Add a _lineage object to every record containing:

  • Source system identifier
  • Source table name
  • Pipeline name
  • Extraction timestamp
  • Processing node ID

Why This Matters

Regulatory Compliance: Financial regulators require complete data lineage for audit purposes.

Debugging: When data looks wrong in BigQuery, trace it back to the exact source record.

Multi-Node Deployments: Know which edge node processed each record.

DataStage Equivalent

In DataStage, you'd typically:

  1. Create custom annotations in job properties
  2. Use a Transformer to add metadata columns
  3. Maintain a separate lineage table

Expanso simplifies this to a single mapping processor.

Implementation

  1. Copy the foundation pipeline:

    cp db2-foundation.yaml step-1-lineage.yaml
  2. Add the lineage processor:

    step-1-lineage.yaml
    pipeline:
    processors:
    - mapping: |
    root = this
    root._lineage = {
    "source_system": "DB2_PROD",
    "source_table": "TRANSACTIONS",
    "pipeline": "db2-to-bigquery-transactions",
    "extracted_at": now(),
    "node_id": env("NODE_ID").or("unknown")
    }
  3. Test the transformation:

    NODE_ID=edge-node-test expanso-edge run --config step-1-lineage.yaml

Understanding the Code

ExpressionWhat It Does
root = thisStart with the original record
root._lineage = {...}Add a new nested object
now()Current UTC timestamp
env("NODE_ID")Read environment variable
.or("unknown")Fallback if env var not set

Expected Output

Input:

{
"TRANSACTION_ID": "TXN-2024-00123456",
"ACCOUNT_NUMBER": "4532-1234-5678-9012",
...
}

Output:

{
"TRANSACTION_ID": "TXN-2024-00123456",
"ACCOUNT_NUMBER": "4532-1234-5678-9012",
...
"_lineage": {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": "2024-01-16T02:00:00Z",
"node_id": "edge-node-test"
}
}

Production Considerations

Dynamic Source Tracking

For pipelines reading from multiple tables:

- mapping: |
root = this
root._lineage = {
"source_system": meta("source_system").or("DB2_PROD"),
"source_table": meta("table_name").or("UNKNOWN"),
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": now(),
"node_id": env("NODE_ID").or("unknown")
}

Lineage Versioning

Track pipeline version for debugging migrations:

root._lineage.pipeline_version = "2.1.0"
root._lineage.config_hash = "a1b2c3d4"

Next Step