Skip to main content

Step 1: Tag Data Origin

Every record needs origin metadata for compliance audits and data lineage. This creates a chain of custody showing where data came from and when.

The Goal

Add a _data_origin object containing:

  • Source region (EU)
  • Source country (DE, FR, etc.)
  • Source database identifier
  • Extraction timestamp
  • Pipeline identifier

Why This Matters

GDPR Article 30: Controllers must maintain records of processing activities, including the source of data.

Audit Trail: When regulators ask "where did this data come from?", you have a documented answer.

Incident Response: If a breach occurs, origin tags help scope the impact.

Implementation

step-1-origin.yaml
pipeline:
processors:
- mapping: |
root = this
root._data_origin = {
"region": "EU",
"country": env("SOURCE_COUNTRY").or("DE"),
"database": "transactions_eu",
"extracted_at": now(),
"pipeline": "eu-cross-border-compliance"
}

Understanding the Code

ExpressionWhat It Does
root._data_origin = {...}Add nested metadata object
env("SOURCE_COUNTRY")Read from environment variable
.or("DE")Default to "DE" if not set
now()UTC timestamp of processing

Expected Output

Input:

{
"transaction_id": "TXN-EU-2024-00001",
"customer_id": "CUST-DE-12345",
...
}

Output:

{
"transaction_id": "TXN-EU-2024-00001",
"customer_id": "CUST-DE-12345",
...,
"_data_origin": {
"region": "EU",
"country": "DE",
"database": "transactions_eu",
"extracted_at": "2024-01-15T02:00:00Z",
"pipeline": "eu-cross-border-compliance"
}
}

Production Considerations

Dynamic Country Detection

For multi-country EU deployments:

root._data_origin.country = match this.iban.slice(0, 2) {
"DE" => "DE",
"FR" => "FR",
"NL" => "NL",
"ES" => "ES",
_ => env("SOURCE_COUNTRY").or("EU")
}

Node Identification

Track which edge node processed the data:

root._data_origin.node_id = env("NODE_ID").or(env("HOSTNAME")).or("unknown")
root._data_origin.node_region = env("NODE_REGION").or("eu-west-1")

Pipeline Versioning

Track config version for debugging:

root._data_origin.pipeline_version = "2.1.0"
root._data_origin.config_hash = "a1b2c3d4"

Next Step