Skip to main content

Step 2: Normalize Currency

Convert all transaction amounts to USD while preserving original values. This replaces DataStage Lookup Stages with inline conversion logic.

The Goal

  • Convert AMOUNT from any currency to USD
  • Preserve original_amount and original_currency for reconciliation
  • Create amount_usd as the normalized field

DataStage Equivalent

In DataStage, currency conversion typically requires:

  1. Lookup Stage reading from a rates table
  2. Transformer Stage applying the conversion
  3. Database connection to fetch current rates
  4. Caching logic to avoid repeated lookups

Expanso simplifies this with inline rate maps or external API calls.

Implementation

Add the currency processor after lineage:

step-2-currency.yaml
pipeline:
processors:
# Step 1: Lineage (from previous step)
- mapping: |
root = this
root._lineage = {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": now(),
"node_id": env("NODE_ID").or("unknown")
}

# Step 2: Currency normalization
- branch:
processors:
- mapping: |
# Currency conversion rates (USD base)
let rates = {
"USD": 1.0,
"EUR": 1.08,
"GBP": 1.27,
"JPY": 0.0067,
"CHF": 1.13,
"CAD": 0.74
}

root = this
root.original_amount = this.AMOUNT
root.original_currency = this.CURRENCY

# Convert to USD
root.amount_usd = if this.CURRENCY == "USD" {
this.AMOUNT
} else {
this.AMOUNT * $rates.get(this.CURRENCY).or(1.0)
}

Understanding the Code

ExpressionWhat It Does
let rates = {...}Define a local variable with rate map
$rates.get(this.CURRENCY)Look up rate by currency code
.or(1.0)Default to 1.0 if currency not found
this.AMOUNT * $rates...Apply the conversion

Why Use branch?

The branch processor creates an isolated scope:

  • Variables defined with let don't leak to other processors
  • Cleaner separation of concerns
  • Easier debugging and testing

Expected Output

Input:

{
"AMOUNT": 125.50,
"CURRENCY": "EUR",
...
}

Output:

{
"AMOUNT": 125.50,
"CURRENCY": "EUR",
"original_amount": 125.50,
"original_currency": "EUR",
"amount_usd": 135.54,
...
}

Production Considerations

Dynamic Rate Fetching

For real-time rates, call an external API:

- branch:
processors:
- http:
url: "https://api.exchangerate.host/latest?base=${! this.CURRENCY }"
verb: GET
- mapping: |
root = this
root.amount_usd = this.AMOUNT * this.rates.USD

Rate Caching

Cache rates to reduce API calls:

- cache:
resource: exchange_rates
operator: get
key: ${! this.CURRENCY }
- mapping: |
root.exchange_rate = if this.exists() { this } else { 1.0 }

Handling Unknown Currencies

Log and flag records with unknown currencies:

root.amount_usd = if $rates.exists(this.CURRENCY) {
this.AMOUNT * $rates.get(this.CURRENCY)
} else {
root._warnings = (root._warnings.or([])).append("Unknown currency: " + this.CURRENCY)
this.AMOUNT # Pass through unchanged
}

Next Step