Skip to main content

Complete Pipeline

This pipeline combines all 6 transformation steps for migrating DB2 transactions to BigQuery:

  1. Add lineage metadata - Full audit trail for compliance
  2. Normalize currency - Convert all amounts to USD
  3. Mask account numbers - PCI-DSS compliant masking with hash for joins
  4. Categorize transactions - MCC code to human-readable categories
  5. Standardize schema - Lowercase field names + partition field
  6. Validate required fields - Data quality gates before load

Result: Production-ready, compliant data in BigQuery with complete lineage.

Full Configuration

db2-to-bigquery.yaml
# DB2 to BigQuery Migration Pipeline
# Replaces DataStage ETL with edge-native processing
#
# Use case: Nightly batch migration of financial transactions from
# on-premise DB2 to Google BigQuery with DataStage-style transformations
#
# Key features:
# - SQL query against DB2 via ODBC
# - Currency normalization and enrichment
# - Account number masking for compliance
# - Transaction categorization
# - Lineage metadata for audit trail
# - Lands in BigQuery partitioned by date

name: db2-to-bigquery-transactions
description: Migrate financial transactions from DB2 to BigQuery with transformations

input:
# Query DB2 for yesterday's transactions
# Runs on a schedule (see deployment config)
sql_select:
driver: odbc
dsn: "Driver={IBM DB2 ODBC Driver};Database=${DB2_DATABASE};Hostname=${DB2_HOST};Port=${DB2_PORT};Protocol=TCPIP;Uid=${DB2_USER};Pwd=${DB2_PASSWORD};"
table: TRANSACTIONS
columns:
- TRANSACTION_ID
- ACCOUNT_NUMBER
- CUSTOMER_ID
- TRANSACTION_DATE
- TRANSACTION_TYPE
- AMOUNT
- CURRENCY
- MERCHANT_NAME
- MERCHANT_CATEGORY_CODE
- SOURCE_SYSTEM
- CREATED_AT
where: "TRANSACTION_DATE >= CURRENT DATE - 1 DAY AND TRANSACTION_DATE < CURRENT DATE"
args_mapping: ""

pipeline:
processors:
# Step 1: Add lineage metadata (critical for audit)
- mapping: |
root = this
root._lineage = {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": now(),
"node_id": env("NODE_ID").or("unknown")
}

# Step 2: Normalize currency to USD
# DataStage replacement: Currency lookup and conversion
- branch:
processors:
- mapping: |
# Currency conversion rates (in production, fetch from API)
let rates = {
"USD": 1.0,
"EUR": 1.08,
"GBP": 1.27,
"JPY": 0.0067,
"CHF": 1.13,
"CAD": 0.74
}

root = this
root.original_amount = this.AMOUNT
root.original_currency = this.CURRENCY

# Convert to USD
root.amount_usd = if this.CURRENCY == "USD" {
this.AMOUNT
} else {
this.AMOUNT * $rates.get(this.CURRENCY).or(1.0)
}

# Step 3: Mask account numbers for compliance
# Keep last 4 digits for reconciliation, hash full number for joins
- mapping: |
root = this
root.account_number_masked = "****-****-" + this.ACCOUNT_NUMBER.slice(-4)
root.account_number_hash = this.ACCOUNT_NUMBER.hash("sha256").slice(0, 16)
root = root.without("ACCOUNT_NUMBER")

# Step 4: Categorize transactions
# DataStage replacement: Lookup table / case statement
- mapping: |
root = this

# Map MCC codes to categories
let mcc = this.MERCHANT_CATEGORY_CODE.string()

root.transaction_category = match mcc {
this.has_prefix("54") => "GROCERY",
this.has_prefix("55") => "AUTOMOTIVE",
this.has_prefix("58") => "RESTAURANT",
this.has_prefix("59") => "RETAIL",
this.has_prefix("47") => "TRANSPORTATION",
this.has_prefix("40") || this.has_prefix("41") => "TRAVEL",
this.has_prefix("60") || this.has_prefix("61") => "FINANCIAL",
this.has_prefix("80") => "PROFESSIONAL_SERVICES",
_ => "OTHER"
}

# Step 5: Standardize field names for BigQuery schema
- mapping: |
root.transaction_id = this.TRANSACTION_ID
root.customer_id = this.CUSTOMER_ID
root.transaction_date = this.TRANSACTION_DATE
root.transaction_type = this.TRANSACTION_TYPE
root.merchant_name = this.MERCHANT_NAME
root.merchant_category_code = this.MERCHANT_CATEGORY_CODE
root.source_system = this.SOURCE_SYSTEM
root.created_at = this.CREATED_AT

# Carry forward transformed fields
root.amount_usd = this.amount_usd
root.original_amount = this.original_amount
root.original_currency = this.original_currency
root.account_number_masked = this.account_number_masked
root.account_number_hash = this.account_number_hash
root.transaction_category = this.transaction_category
root._lineage = this._lineage

# Add BigQuery partition field
root._partition_date = this.TRANSACTION_DATE.format("2006-01-02")

# Step 6: Validate required fields before loading
- mapping: |
root = if this.transaction_id == null ||
this.customer_id == null ||
this.amount_usd == null {
throw("Missing required field for BigQuery load")
} else {
this
}

output:
gcp_bigquery:
project: "${GCP_PROJECT}"
dataset: financial_data
table: transactions
format: NEWLINE_DELIMITED_JSON
write_disposition: WRITE_APPEND
# Partition by transaction date for query efficiency
time_partitioning:
field: _partition_date
type: DAY
batching:
count: 1000
period: 30s

Quick Test

# Set environment variables
export DB2_HOST=db2.internal.corp
export DB2_PORT=50000
export DB2_DATABASE=FINPROD
export DB2_USER=etl_reader
export DB2_PASSWORD=<secret>
export GCP_PROJECT=my-analytics-project
export NODE_ID=edge-node-datacenter-1

# Test with sample data (no DB2 required)
echo '{
"TRANSACTION_ID": "TXN-2024-00123456",
"ACCOUNT_NUMBER": "4532-1234-5678-9012",
"CUSTOMER_ID": "CUST-789012",
"TRANSACTION_DATE": "2024-01-15",
"TRANSACTION_TYPE": "PURCHASE",
"AMOUNT": 125.50,
"CURRENCY": "EUR",
"MERCHANT_NAME": "ACME Electronics GmbH",
"MERCHANT_CATEGORY_CODE": "5411",
"SOURCE_SYSTEM": "CORE_BANKING_EU",
"CREATED_AT": "2024-01-15T14:32:17Z"
}' | expanso-edge run --config db2-to-bigquery.yaml

Expected Output:

{
"transaction_id": "TXN-2024-00123456",
"customer_id": "CUST-789012",
"transaction_date": "2024-01-15",
"transaction_type": "PURCHASE",
"merchant_name": "ACME Electronics GmbH",
"merchant_category_code": "5411",
"source_system": "CORE_BANKING_EU",
"created_at": "2024-01-15T14:32:17Z",
"amount_usd": 135.54,
"original_amount": 125.50,
"original_currency": "EUR",
"account_number_masked": "****-****-9012",
"account_number_hash": "a1b2c3d4e5f67890",
"transaction_category": "GROCERY",
"_partition_date": "2024-01-15",
"_lineage": {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": "2024-01-16T02:00:00Z",
"node_id": "edge-node-datacenter-1"
}
}

Deploy to Production

Option 1: Run Locally

# Run on edge node with DB2 access
expanso-edge run --config db2-to-bigquery.yaml

Option 2: Deploy to Orchestrator

# Deploy to Expanso fleet
expanso-cli job deploy db2-to-bigquery.yaml --selector region=datacenter

# Verify deployment
expanso-cli job describe db2-to-bigquery-transactions

Option 3: Schedule Nightly

Create a cron schedule for nightly migration:

db2-to-bigquery-scheduled.yaml
name: db2-to-bigquery-transactions
schedule: "0 2 * * *" # 2 AM daily

input:
sql_select:
# ... same config ...

BigQuery Table Setup

Create the target table with partitioning:

CREATE TABLE IF NOT EXISTS `${GCP_PROJECT}.financial_data.transactions`
(
transaction_id STRING,
customer_id STRING,
transaction_date DATE,
transaction_type STRING,
merchant_name STRING,
merchant_category_code STRING,
source_system STRING,
created_at TIMESTAMP,
amount_usd FLOAT64,
original_amount FLOAT64,
original_currency STRING,
account_number_masked STRING,
account_number_hash STRING,
transaction_category STRING,
_partition_date DATE,
_lineage STRUCT<
source_system STRING,
source_table STRING,
pipeline STRING,
extracted_at TIMESTAMP,
node_id STRING
>
)
PARTITION BY _partition_date
CLUSTER BY transaction_category, customer_id;

Monitoring

Check Pipeline Health

# View recent logs
expanso-cli job logs --tail 100 | grep db2-to-bigquery

# Check metrics
curl http://localhost:4195/metrics | grep db2

BigQuery Verification

-- Check recent loads
SELECT
_lineage.extracted_at,
_lineage.node_id,
COUNT(*) as record_count
FROM `financial_data.transactions`
WHERE _partition_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY 1, 2
ORDER BY 1 DESC;

-- Verify no PII leakage
SELECT * FROM `financial_data.transactions`
WHERE account_number_masked NOT LIKE '****%'
LIMIT 10; -- Should return 0 rows

Download

Download db2-to-bigquery.yaml

What's Next?