Skip to main content

Complete Pipeline

This pipeline combines all 6 GDPR compliance steps for cross-border data transfer:

  1. Tag data origin - Source region and extraction metadata
  2. Create GDPR record - Legal basis and PII field documentation
  3. Delete high-risk fields - Names, addresses, DOB β†’ age bucket
  4. Hash identifiers - Customer ID, email, IBAN, IP
  5. Generalize values - Amount buckets, hour-level timestamps
  6. Validate anonymization - Compliance gate before transfer

Result: GDPR-compliant anonymized data for global analytics with full audit trail.

Data Flow​

EU Database ──→ [Anonymization Pipeline] ──→ Global BigQuery (anonymized)
β”‚
β”œβ”€β”€β†’ EU Archive (full data, stays in EU)
β”‚
└──→ Audit Log (compliance records)

Full Configuration​

cross-border-gdpr.yaml
# Cross-Border GDPR Compliance Pipeline
# Anonymize data before it leaves the EU for global analytics
#
# Use case: EU financial transaction data needs to be aggregated globally,
# but GDPR Article 44+ restricts transfers of personal data outside EU/EEA.
# Solution: Fully anonymize at the edge before cross-border transfer.
#
# Different from remove-pii example:
# - Focuses on data RESIDENCY and cross-border transfers (not general PII removal)
# - SQL database input (not HTTP streams)
# - Financial transactions (not user activity)
# - Dual output: anonymized to global, full data stays in-region
# - GDPR Article 44 compliance audit trail
#
# Key features:
# - Tiered anonymization (delete/hash/generalize)
# - Dual destination: anonymized β†’ global, raw β†’ regional archive
# - Compliance attestation metadata
# - k-anonymity validation before transfer

name: eu-cross-border-compliance
description: GDPR-compliant data anonymization for cross-border analytics

input:
# Read from EU regional database
sql_select:
driver: postgres
dsn: "postgres://${DB_USER}:${DB_PASSWORD}@${EU_DB_HOST}:5432/transactions_eu"
table: customer_transactions
columns:
- transaction_id
- customer_id
- customer_name
- customer_email
- customer_dob
- customer_address
- iban
- transaction_amount
- transaction_currency
- merchant_name
- merchant_country
- transaction_timestamp
- ip_address
where: "transaction_timestamp >= NOW() - INTERVAL '1 hour'"

pipeline:
processors:
# Step 1: Tag with source region (critical for compliance routing)
- mapping: |
root = this
root._data_origin = {
"region": "EU",
"country": env("SOURCE_COUNTRY").or("DE"),
"database": "transactions_eu",
"extracted_at": now(),
"pipeline": "eu-cross-border-compliance"
}

# Step 2: Create GDPR compliance record BEFORE any transformation
- mapping: |
root = this
root._gdpr_compliance = {
"legal_basis": "legitimate_interest_analytics",
"original_pii_fields": [
"customer_id",
"customer_name",
"customer_email",
"customer_dob",
"customer_address",
"iban",
"ip_address"
],
"anonymization_applied": true,
"transfer_type": "cross_border_eu_to_global",
"gdpr_article": "Article 44 - General principle for transfers"
}

# Step 3: DELETE - Remove fields with no analytics value
# These are deleted entirely, not recoverable
- mapping: |
root = this

# Full name - no analytics value, high risk
root = root.without("customer_name")

# Full address - no analytics value
root = root.without("customer_address")

# Date of birth - delete, keep only age bucket
root.customer_age_bucket = match {
this.customer_dob == null => "unknown",
now().ts_unix() - this.customer_dob.ts_parse("2006-01-02").ts_unix() < 25 * 365 * 24 * 3600 => "18-24",
now().ts_unix() - this.customer_dob.ts_parse("2006-01-02").ts_unix() < 35 * 365 * 24 * 3600 => "25-34",
now().ts_unix() - this.customer_dob.ts_parse("2006-01-02").ts_unix() < 45 * 365 * 24 * 3600 => "35-44",
now().ts_unix() - this.customer_dob.ts_parse("2006-01-02").ts_unix() < 55 * 365 * 24 * 3600 => "45-54",
now().ts_unix() - this.customer_dob.ts_parse("2006-01-02").ts_unix() < 65 * 365 * 24 * 3600 => "55-64",
_ => "65+"
}
root = root.without("customer_dob")

# Step 4: HASH - Pseudonymize identifiers for aggregate analytics
# One-way hash means this is anonymization, not pseudonymization
- mapping: |
root = this

# Customer ID β†’ anonymized cohort ID
# Salt ensures can't be reversed even with rainbow tables
let salt = env("ANONYMIZATION_SALT").or("gdpr-compliance-2024")
root.anonymized_customer_id = (this.customer_id.string() + $salt).hash("sha256").slice(0, 12)
root = root.without("customer_id")

# Email β†’ domain only (for B2B vs B2C analysis)
root.email_domain = if this.customer_email.contains("@") {
this.customer_email.split("@").index(1).lowercase()
} else {
"unknown"
}
root = root.without("customer_email")

# IBAN β†’ country code only (for geographic analysis)
root.bank_country = this.iban.slice(0, 2)
root = root.without("iban")

# IP address β†’ /16 subnet (country-level geolocation possible)
root.ip_subnet = if this.ip_address.contains(".") {
this.ip_address.split(".").slice(0, 2).join(".") + ".0.0/16"
} else {
"unknown"
}
root = root.without("ip_address")

# Step 5: GENERALIZE - Reduce precision on remaining fields
- mapping: |
root = this

# Transaction amount β†’ bucket (preserves distribution analysis)
root.amount_bucket = match {
this.transaction_amount < 10 => "0-10",
this.transaction_amount < 50 => "10-50",
this.transaction_amount < 100 => "50-100",
this.transaction_amount < 500 => "100-500",
this.transaction_amount < 1000 => "500-1000",
this.transaction_amount < 5000 => "1000-5000",
_ => "5000+"
}

# Keep exact amount for aggregate SUM calculations
# (amount alone without identifier is not personal data)
root.transaction_amount = this.transaction_amount

# Timestamp β†’ hour bucket (sufficient for pattern analysis)
root.transaction_hour = this.transaction_timestamp.ts_parse("2006-01-02T15:04:05Z").format("2006-01-02T15:00:00Z")

# Step 6: Validate anonymization completeness
- mapping: |
# Check no PII fields remain
let pii_fields = ["customer_id", "customer_name", "customer_email",
"customer_dob", "customer_address", "iban", "ip_address"]

let remaining_pii = $pii_fields.filter(f -> this.get(f).exists())

root = if $remaining_pii.length() > 0 {
throw("GDPR VIOLATION: PII fields still present: " + $remaining_pii.join(", "))
} else {
this
}

# Add compliance attestation
root._gdpr_compliance.anonymization_verified = true
root._gdpr_compliance.verification_timestamp = now()
root._gdpr_compliance.fields_removed = ["customer_name", "customer_address", "customer_dob"]
root._gdpr_compliance.fields_hashed = ["customer_id", "customer_email", "iban", "ip_address"]

output:
broker:
pattern: fan_out
outputs:
# Output 1: Anonymized data β†’ Global analytics (BigQuery US)
# This data is no longer "personal data" under GDPR
- label: global_analytics
gcp_bigquery:
project: "${GCP_GLOBAL_PROJECT}"
dataset: global_analytics
table: anonymized_transactions
format: NEWLINE_DELIMITED_JSON
write_disposition: WRITE_APPEND
batching:
count: 500
period: 30s

# Output 2: Raw data β†’ EU regional archive (stays in EU)
# Full data preserved for regulatory requirements
- label: eu_archive
processors:
# Re-add original data from input (captured in metadata)
- mapping: |
root = this
root._archive_metadata = {
"archived_at": now(),
"retention_policy": "7_years",
"data_classification": "personal_data_eu",
"gdpr_lawful_basis": "legal_obligation"
}
gcp_cloud_storage:
bucket: "${EU_ARCHIVE_BUCKET}" # Must be EU-region bucket
path: "transactions/${!now().format(\"2006/01/02\")}/eu-${!timestamp_unix()}.json"
content_type: application/json
storage_class: STANDARD
batching:
count: 1000
period: 60s

# Output 3: Compliance audit log
- label: compliance_audit
processors:
- mapping: |
root = {
"event_type": "cross_border_transfer",
"timestamp": now(),
"transaction_id": this.transaction_id,
"source_region": this._data_origin.region,
"destination": "global_analytics",
"anonymization_verified": this._gdpr_compliance.anonymization_verified,
"legal_basis": this._gdpr_compliance.legal_basis,
"gdpr_article": this._gdpr_compliance.gdpr_article
}
file:
path: "/var/log/expanso/gdpr-audit-${!timestamp_format(\"2006-01-02\")}.jsonl"
codec: lines

Quick Test​

# Set environment variables
export EU_DB_HOST=postgres.eu-west-1.internal
export DB_USER=analytics_reader
export DB_PASSWORD=<secret>
export SOURCE_COUNTRY=DE
export ANONYMIZATION_SALT=$(openssl rand -hex 32)
export GCP_GLOBAL_PROJECT=global-analytics
export EU_ARCHIVE_BUCKET=eu-west-1-archive

# Test with sample data
echo '{
"transaction_id": "TXN-EU-2024-00001",
"customer_id": "CUST-DE-12345",
"customer_name": "Hans Schmidt",
"customer_email": "[email protected]",
"customer_dob": "1985-03-15",
"customer_address": "Hauptstraße 42, 10115 Berlin, Germany",
"iban": "DE89370400440532013000",
"transaction_amount": 249.99,
"transaction_currency": "EUR",
"merchant_name": "Tech Store GmbH",
"merchant_country": "DE",
"transaction_timestamp": "2024-01-15T14:32:17Z",
"ip_address": "91.64.42.17"
}' | expanso-edge run --config cross-border-gdpr.yaml

Expected Output (anonymized):

{
"transaction_id": "TXN-EU-2024-00001",
"anonymized_customer_id": "x9y8z7w6e5r4",
"email_domain": "example.de",
"bank_country": "DE",
"ip_subnet": "91.64.0.0/16",
"customer_age_bucket": "35-44",
"transaction_amount": 249.99,
"amount_bucket": "100-500",
"transaction_currency": "EUR",
"merchant_name": "Tech Store GmbH",
"merchant_country": "DE",
"transaction_hour": "2024-01-15T14:00:00Z",
"_data_origin": {
"region": "EU",
"country": "DE",
"database": "transactions_eu",
"extracted_at": "2024-01-15T02:00:00Z",
"pipeline": "eu-cross-border-compliance"
},
"_gdpr_compliance": {
"legal_basis": "legitimate_interest_analytics",
"anonymization_applied": true,
"anonymization_verified": true,
"verification_timestamp": "2024-01-15T02:00:00Z",
"fields_removed": ["customer_name", "customer_address", "customer_dob"],
"fields_hashed": ["customer_id", "customer_email", "iban", "ip_address"]
}
}

Deploy to Production​

EU-Only Deployment​

Deploy only to EU-located edge nodes:

# Deploy with region selector
expanso-cli job deploy cross-border-gdpr.yaml \
--selector region=eu \
--selector compliance=gdpr

# Verify deployment location
expanso-cli job describe eu-cross-border-compliance

Schedule Hourly​

Run every hour for near-real-time analytics:

cross-border-gdpr-scheduled.yaml
name: eu-cross-border-compliance
schedule: "0 * * * *" # Every hour

input:
sql_select:
where: "transaction_timestamp >= NOW() - INTERVAL '1 hour'"
# ... rest of config

BigQuery Table Setup​

Create the anonymized transactions table:

CREATE TABLE IF NOT EXISTS `global-analytics.global_analytics.anonymized_transactions`
(
transaction_id STRING,
anonymized_customer_id STRING,
email_domain STRING,
bank_country STRING,
ip_subnet STRING,
customer_age_bucket STRING,
transaction_amount FLOAT64,
amount_bucket STRING,
transaction_currency STRING,
merchant_name STRING,
merchant_country STRING,
transaction_hour TIMESTAMP,
_data_origin STRUCT<
region STRING,
country STRING,
database STRING,
extracted_at TIMESTAMP,
pipeline STRING
>,
_gdpr_compliance STRUCT<
legal_basis STRING,
anonymization_applied BOOL,
anonymization_verified BOOL,
verification_timestamp TIMESTAMP,
fields_removed ARRAY<STRING>,
fields_hashed ARRAY<STRING>
>
)
PARTITION BY DATE(transaction_hour)
CLUSTER BY merchant_country, customer_age_bucket;

Compliance Verification​

Verify No PII in Global Dataset​

-- This query should return 0 rows
SELECT * FROM `global_analytics.anonymized_transactions`
WHERE
anonymized_customer_id IS NULL
OR _gdpr_compliance.anonymization_verified != true
LIMIT 10;

Audit Trail Query​

-- Daily compliance report
SELECT
DATE(transaction_hour) as date,
_data_origin.country as source_country,
COUNT(*) as records_transferred,
COUNTIF(_gdpr_compliance.anonymization_verified) as verified_count
FROM `global_analytics.anonymized_transactions`
WHERE transaction_hour >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY 1, 2
ORDER BY 1 DESC, 2;

Download​

Download cross-border-gdpr.yaml

What's Next?​