Step 1: Tag Data Origin
Every record needs origin metadata for compliance audits and data lineage. This creates a chain of custody showing where data came from and when.
The Goal
Add a _data_origin object containing:
- Source region (EU)
- Source country (DE, FR, etc.)
- Source database identifier
- Extraction timestamp
- Pipeline identifier
Why This Matters
GDPR Article 30: Controllers must maintain records of processing activities, including the source of data.
Audit Trail: When regulators ask "where did this data come from?", you have a documented answer.
Incident Response: If a breach occurs, origin tags help scope the impact.
Implementation
step-1-origin.yaml
pipeline:
processors:
- mapping: |
root = this
root._data_origin = {
"region": "EU",
"country": env("SOURCE_COUNTRY").or("DE"),
"database": "transactions_eu",
"extracted_at": now(),
"pipeline": "eu-cross-border-compliance"
}
Understanding the Code
| Expression | What It Does |
|---|---|
root._data_origin = {...} | Add nested metadata object |
env("SOURCE_COUNTRY") | Read from environment variable |
.or("DE") | Default to "DE" if not set |
now() | UTC timestamp of processing |
Expected Output
Input:
{
"transaction_id": "TXN-EU-2024-00001",
"customer_id": "CUST-DE-12345",
...
}
Output:
{
"transaction_id": "TXN-EU-2024-00001",
"customer_id": "CUST-DE-12345",
...,
"_data_origin": {
"region": "EU",
"country": "DE",
"database": "transactions_eu",
"extracted_at": "2024-01-15T02:00:00Z",
"pipeline": "eu-cross-border-compliance"
}
}
Production Considerations
Dynamic Country Detection
For multi-country EU deployments:
root._data_origin.country = match this.iban.slice(0, 2) {
"DE" => "DE",
"FR" => "FR",
"NL" => "NL",
"ES" => "ES",
_ => env("SOURCE_COUNTRY").or("EU")
}
Node Identification
Track which edge node processed the data:
root._data_origin.node_id = env("NODE_ID").or(env("HOSTNAME")).or("unknown")
root._data_origin.node_region = env("NODE_REGION").or("eu-west-1")
Pipeline Versioning
Track config version for debugging:
root._data_origin.pipeline_version = "2.1.0"
root._data_origin.config_hash = "a1b2c3d4"