Step 2: Normalize Currency
Convert all transaction amounts to USD while preserving original values. This replaces DataStage Lookup Stages with inline conversion logic.
The Goal
- Convert
AMOUNTfrom any currency to USD - Preserve
original_amountandoriginal_currencyfor reconciliation - Create
amount_usdas the normalized field
DataStage Equivalent
In DataStage, currency conversion typically requires:
- Lookup Stage reading from a rates table
- Transformer Stage applying the conversion
- Database connection to fetch current rates
- Caching logic to avoid repeated lookups
Expanso simplifies this with inline rate maps or external API calls.
Implementation
Add the currency processor after lineage:
step-2-currency.yaml
pipeline:
processors:
# Step 1: Lineage (from previous step)
- mapping: |
root = this
root._lineage = {
"source_system": "DB2_PROD",
"source_table": "TRANSACTIONS",
"pipeline": "db2-to-bigquery-transactions",
"extracted_at": now(),
"node_id": env("NODE_ID").or("unknown")
}
# Step 2: Currency normalization
- branch:
processors:
- mapping: |
# Currency conversion rates (USD base)
let rates = {
"USD": 1.0,
"EUR": 1.08,
"GBP": 1.27,
"JPY": 0.0067,
"CHF": 1.13,
"CAD": 0.74
}
root = this
root.original_amount = this.AMOUNT
root.original_currency = this.CURRENCY
# Convert to USD
root.amount_usd = if this.CURRENCY == "USD" {
this.AMOUNT
} else {
this.AMOUNT * $rates.get(this.CURRENCY).or(1.0)
}
Understanding the Code
| Expression | What It Does |
|---|---|
let rates = {...} | Define a local variable with rate map |
$rates.get(this.CURRENCY) | Look up rate by currency code |
.or(1.0) | Default to 1.0 if currency not found |
this.AMOUNT * $rates... | Apply the conversion |
Why Use branch?
The branch processor creates an isolated scope:
- Variables defined with
letdon't leak to other processors - Cleaner separation of concerns
- Easier debugging and testing
Expected Output
Input:
{
"AMOUNT": 125.50,
"CURRENCY": "EUR",
...
}
Output:
{
"AMOUNT": 125.50,
"CURRENCY": "EUR",
"original_amount": 125.50,
"original_currency": "EUR",
"amount_usd": 135.54,
...
}
Production Considerations
Dynamic Rate Fetching
For real-time rates, call an external API:
- branch:
processors:
- http:
url: "https://api.exchangerate.host/latest?base=${! this.CURRENCY }"
verb: GET
- mapping: |
root = this
root.amount_usd = this.AMOUNT * this.rates.USD
Rate Caching
Cache rates to reduce API calls:
- cache:
resource: exchange_rates
operator: get
key: ${! this.CURRENCY }
- mapping: |
root.exchange_rate = if this.exists() { this } else { 1.0 }
Handling Unknown Currencies
Log and flag records with unknown currencies:
root.amount_usd = if $rates.exists(this.CURRENCY) {
this.AMOUNT * $rates.get(this.CURRENCY)
} else {
root._warnings = (root._warnings.or([])).append("Unknown currency: " + this.CURRENCY)
this.AMOUNT # Pass through unchanged
}