Step 1: Add Lineage Metadata
Every record needs an audit trail showing where it came from, when it was processed, and which node handled it. This replaces manual DataStage annotations with automatic lineage injection.
The Goal
Add a _lineage object to every record containing:
- Source system identifier
- Source table name
- Pipeline name
- Extraction timestamp
- Processing node ID
Why This Matters
Regulatory Compliance: Financial regulators require complete data lineage for audit purposes.
Debugging: When data looks wrong in BigQuery, trace it back to the exact source record.
Multi-Node Deployments: Know which edge node processed each record.
DataStage Equivalent
In DataStage, you'd typically:
- Create custom annotations in job properties
- Use a Transformer to add metadata columns
- Maintain a separate lineage table
Expanso simplifies this to a single mapping processor.
Implementation
-
Copy the foundation pipeline:
cp db2-foundation.yaml step-1-lineage.yaml -
Add the lineage processor:
step-1-lineage.yamlpipeline:
processors:
- mapping: |
root = this
root._lineage = {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": now(),
"node_id": env("NODE_ID").or("unknown")
} -
Test the transformation:
NODE_ID=edge-node-test expanso-edge run --config step-1-lineage.yaml
Understanding the Code
| Expression | What It Does |
|---|---|
root = this | Start with the original record |
root._lineage = {...} | Add a new nested object |
now() | Current UTC timestamp |
env("NODE_ID") | Read environment variable |
.or("unknown") | Fallback if env var not set |
Expected Output
Input:
{
"TRANSACTION_ID": "TXN-2024-00123456",
"ACCOUNT_NUMBER": "4532-1234-5678-9012",
...
}
Output:
{
"TRANSACTION_ID": "TXN-2024-00123456",
"ACCOUNT_NUMBER": "4532-1234-5678-9012",
...
"_lineage": {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": "2024-01-16T02:00:00Z",
"node_id": "edge-node-test"
}
}
Production Considerations
Dynamic Source Tracking
For pipelines reading from multiple tables:
- mapping: |
root = this
root._lineage = {
"source_system": meta("source_system").or("DB2_PROD"),
"source_table": meta("table_name").or("UNKNOWN"),
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": now(),
"node_id": env("NODE_ID").or("unknown")
}
Lineage Versioning
Track pipeline version for debugging migrations:
root._lineage.pipeline_version = "2.1.0"
root._lineage.config_hash = "a1b2c3d4"