Complete Pipeline
This pipeline combines all 6 transformation steps for migrating DB2 transactions to BigQuery:
- Add lineage metadata - Full audit trail for compliance
- Normalize currency - Convert all amounts to USD
- Mask account numbers - PCI-DSS compliant masking with hash for joins
- Categorize transactions - MCC code to human-readable categories
- Standardize schema - Lowercase field names + partition field
- Validate required fields - Data quality gates before load
Result: Production-ready, compliant data in BigQuery with complete lineage.
Full Configuration
db2-to-bigquery.yaml
# DB2 to BigQuery Migration Pipeline
# Replaces DataStage ETL with edge-native processing
#
# Use case: Nightly batch migration of financial transactions from
# on-premise DB2 to Google BigQuery with DataStage-style transformations
#
# Key features:
# - SQL query against DB2 via ODBC
# - Currency normalization and enrichment
# - Account number masking for compliance
# - Transaction categorization
# - Lineage metadata for audit trail
# - Lands in BigQuery partitioned by date
name: db2-to-bigquery-transactions
description: Migrate financial transactions from DB2 to BigQuery with transformations
input:
# Query DB2 for yesterday's transactions
# Runs on a schedule (see deployment config)
sql_select:
driver: odbc
dsn: "Driver={IBM DB2 ODBC Driver};Database=${DB2_DATABASE};Hostname=${DB2_HOST};Port=${DB2_PORT};Protocol=TCPIP;Uid=${DB2_USER};Pwd=${DB2_PASSWORD};"
table: TRANSACTIONS
columns:
- TRANSACTION_ID
- ACCOUNT_NUMBER
- CUSTOMER_ID
- TRANSACTION_DATE
- TRANSACTION_TYPE
- AMOUNT
- CURRENCY
- MERCHANT_NAME
- MERCHANT_CATEGORY_CODE
- SOURCE_SYSTEM
- CREATED_AT
where: "TRANSACTION_DATE >= CURRENT DATE - 1 DAY AND TRANSACTION_DATE < CURRENT DATE"
args_mapping: ""
pipeline:
processors:
# Step 1: Add lineage metadata (critical for audit)
- mapping: |
root = this
root._lineage = {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": now(),
"node_id": env("NODE_ID").or("unknown")
}
# Step 2: Normalize currency to USD
# DataStage replacement: Currency lookup and conversion
- branch:
processors:
- mapping: |
# Currency conversion rates (in production, fetch from API)
let rates = {
"USD": 1.0,
"EUR": 1.08,
"GBP": 1.27,
"JPY": 0.0067,
"CHF": 1.13,
"CAD": 0.74
}
root = this
root.original_amount = this.AMOUNT
root.original_currency = this.CURRENCY
# Convert to USD
root.amount_usd = if this.CURRENCY == "USD" {
this.AMOUNT
} else {
this.AMOUNT * $rates.get(this.CURRENCY).or(1.0)
}
# Step 3: Mask account numbers for compliance
# Keep last 4 digits for reconciliation, hash full number for joins
- mapping: |
root = this
root.account_number_masked = "****-****-" + this.ACCOUNT_NUMBER.slice(-4)
root.account_number_hash = this.ACCOUNT_NUMBER.hash("sha256").slice(0, 16)
root = root.without("ACCOUNT_NUMBER")
# Step 4: Categorize transactions
# DataStage replacement: Lookup table / case statement
- mapping: |
root = this
# Map MCC codes to categories
let mcc = this.MERCHANT_CATEGORY_CODE.string()
root.transaction_category = match mcc {
this.has_prefix("54") => "GROCERY",
this.has_prefix("55") => "AUTOMOTIVE",
this.has_prefix("58") => "RESTAURANT",
this.has_prefix("59") => "RETAIL",
this.has_prefix("47") => "TRANSPORTATION",
this.has_prefix("40") || this.has_prefix("41") => "TRAVEL",
this.has_prefix("60") || this.has_prefix("61") => "FINANCIAL",
this.has_prefix("80") => "PROFESSIONAL_SERVICES",
_ => "OTHER"
}
# Step 5: Standardize field names for BigQuery schema
- mapping: |
root.transaction_id = this.TRANSACTION_ID
root.customer_id = this.CUSTOMER_ID
root.transaction_date = this.TRANSACTION_DATE
root.transaction_type = this.TRANSACTION_TYPE
root.merchant_name = this.MERCHANT_NAME
root.merchant_category_code = this.MERCHANT_CATEGORY_CODE
root.source_system = this.SOURCE_SYSTEM
root.created_at = this.CREATED_AT
# Carry forward transformed fields
root.amount_usd = this.amount_usd
root.original_amount = this.original_amount
root.original_currency = this.original_currency
root.account_number_masked = this.account_number_masked
root.account_number_hash = this.account_number_hash
root.transaction_category = this.transaction_category
root._lineage = this._lineage
# Add BigQuery partition field
root._partition_date = this.TRANSACTION_DATE.format("2006-01-02")
# Step 6: Validate required fields before loading
- mapping: |
root = if this.transaction_id == null ||
this.customer_id == null ||
this.amount_usd == null {
throw("Missing required field for BigQuery load")
} else {
this
}
output:
gcp_bigquery:
project: "${GCP_PROJECT}"
dataset: financial_data
table: transactions
format: NEWLINE_DELIMITED_JSON
write_disposition: WRITE_APPEND
# Partition by transaction date for query efficiency
time_partitioning:
field: _partition_date
type: DAY
batching:
count: 1000
period: 30s
Quick Test
# Set environment variables
export DB2_HOST=db2.internal.corp
export DB2_PORT=50000
export DB2_DATABASE=FINPROD
export DB2_USER=etl_reader
export DB2_PASSWORD=<secret>
export GCP_PROJECT=my-analytics-project
export NODE_ID=edge-node-datacenter-1
# Test with sample data (no DB2 required)
echo '{
"TRANSACTION_ID": "TXN-2024-00123456",
"ACCOUNT_NUMBER": "4532-1234-5678-9012",
"CUSTOMER_ID": "CUST-789012",
"TRANSACTION_DATE": "2024-01-15",
"TRANSACTION_TYPE": "PURCHASE",
"AMOUNT": 125.50,
"CURRENCY": "EUR",
"MERCHANT_NAME": "ACME Electronics GmbH",
"MERCHANT_CATEGORY_CODE": "5411",
"SOURCE_SYSTEM": "CORE_BANKING_EU",
"CREATED_AT": "2024-01-15T14:32:17Z"
}' | expanso-edge run --config db2-to-bigquery.yaml
Expected Output:
{
"transaction_id": "TXN-2024-00123456",
"customer_id": "CUST-789012",
"transaction_date": "2024-01-15",
"transaction_type": "PURCHASE",
"merchant_name": "ACME Electronics GmbH",
"merchant_category_code": "5411",
"source_system": "CORE_BANKING_EU",
"created_at": "2024-01-15T14:32:17Z",
"amount_usd": 135.54,
"original_amount": 125.50,
"original_currency": "EUR",
"account_number_masked": "****-****-9012",
"account_number_hash": "a1b2c3d4e5f67890",
"transaction_category": "GROCERY",
"_partition_date": "2024-01-15",
"_lineage": {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": "2024-01-16T02:00:00Z",
"node_id": "edge-node-datacenter-1"
}
}
Deploy to Production
Option 1: Run Locally
# Run on edge node with DB2 access
expanso-edge run --config db2-to-bigquery.yaml
Option 2: Deploy to Orchestrator
# Deploy to Expanso fleet
expanso-cli job deploy db2-to-bigquery.yaml --selector region=datacenter
# Verify deployment
expanso-cli job describe db2-to-bigquery-transactions
Option 3: Schedule Nightly
Create a cron schedule for nightly migration:
db2-to-bigquery-scheduled.yaml
name: db2-to-bigquery-transactions
schedule: "0 2 * * *" # 2 AM daily
input:
sql_select:
# ... same config ...
BigQuery Table Setup
Create the target table with partitioning:
CREATE TABLE IF NOT EXISTS `${GCP_PROJECT}.financial_data.transactions`
(
transaction_id STRING,
customer_id STRING,
transaction_date DATE,
transaction_type STRING,
merchant_name STRING,
merchant_category_code STRING,
source_system STRING,
created_at TIMESTAMP,
amount_usd FLOAT64,
original_amount FLOAT64,
original_currency STRING,
account_number_masked STRING,
account_number_hash STRING,
transaction_category STRING,
_partition_date DATE,
_lineage STRUCT<
source_system STRING,
source_table STRING,
pipeline STRING,
extracted_at TIMESTAMP,
node_id STRING
>
)
PARTITION BY _partition_date
CLUSTER BY transaction_category, customer_id;
Monitoring
Check Pipeline Health
# View recent logs
expanso-cli job logs --tail 100 | grep db2-to-bigquery
# Check metrics
curl http://localhost:4195/metrics | grep db2
BigQuery Verification
-- Check recent loads
SELECT
_lineage.extracted_at,
_lineage.node_id,
COUNT(*) as record_count
FROM `financial_data.transactions`
WHERE _partition_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY 1, 2
ORDER BY 1 DESC;
-- Verify no PII leakage
SELECT * FROM `financial_data.transactions`
WHERE account_number_masked NOT LIKE '****%'
LIMIT 10; -- Should return 0 rows
Download
Download db2-to-bigquery.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Cross-Border GDPR - Add regional data routing
- Nightly Backup - Backup patterns for enterprise data