Step 6: Validate Required Fields
Reject records with missing required fields before they reach BigQuery. This replaces DataStage Data Rules stages with conditional validation.
The Goal
- Check that
transaction_id,customer_id, andamount_usdare present - Reject invalid records with a clear error message
- Prevent bad data from polluting your BigQuery tables
Why This Matters
Data Quality: Bad records in BigQuery break dashboards and reports.
Cost Control: Rejecting early saves BigQuery storage and query costs.
Debugging: Clear error messages help identify source data issues.
DataStage Equivalent
In DataStage, validation typically requires:
- Data Rules Stage or Filter Stage
- Reject links to capture bad records
- Separate error handling jobs
Expanso simplifies this with conditional throw() that stops processing.
Implementation
Add the validation processor as the final step:
pipeline:
processors:
# Steps 1-5 from previous...
# Step 6: Validate required fields before BigQuery load
- mapping: |
root = if this.transaction_id == null ||
this.customer_id == null ||
this.amount_usd == null {
throw("Missing required field for BigQuery load")
} else {
this
}
Understanding the Code
| Expression | What It Does |
|---|---|
this.field == null | Check if field is missing or null |
|| | Logical OR (any condition triggers) |
throw("message") | Stop processing with error |
else { this } | Pass record through unchanged |
Expected Behavior
Valid Record: Passes through unchanged
Invalid Record (missing customer_id):
ERROR: Missing required field for BigQuery load
Record: {"transaction_id": "TXN-001", "amount_usd": 100.00, ...}
Production Considerations
Detailed Error Messages
Identify which field is missing:
- mapping: |
let missing = []
let missing = if this.transaction_id == null { $missing.append("transaction_id") } else { $missing }
let missing = if this.customer_id == null { $missing.append("customer_id") } else { $missing }
let missing = if this.amount_usd == null { $missing.append("amount_usd") } else { $missing }
root = if $missing.length() > 0 {
throw("Missing required fields: " + $missing.join(", "))
} else {
this
}
Dead Letter Queue
Route bad records to a DLQ instead of failing:
pipeline:
processors:
# ... transformation steps ...
- switch:
- check: this.transaction_id == null || this.customer_id == null
processors:
- mapping: |
root = this
root._error = "Missing required field"
root._failed_at = now()
- output:
gcp_pubsub:
project: "${GCP_PROJECT}"
topic: etl-dlq
- processors:
- mapping: root = this # Valid records continue
Type Validation
Check data types, not just presence:
root = if this.amount_usd.type() != "number" {
throw("amount_usd must be numeric, got: " + this.amount_usd.type())
} else if this.amount_usd < 0 {
throw("amount_usd cannot be negative: " + this.amount_usd.string())
} else {
this
}
Range Validation
Flag suspicious values:
root._warnings = []
# Flag unusually large transactions
root._warnings = if this.amount_usd > 100000 {
root._warnings.append("Large transaction: review required")
} else {
root._warnings
}
# Flag future dates
root._warnings = if this.transaction_date > now().ts_format("2006-01-02") {
root._warnings.append("Future transaction date")
} else {
root._warnings
}
Validation Metrics
Count validation failures for monitoring:
- metric:
type: counter
name: validation_failures
labels:
reason: ${! meta("error_reason").or("unknown") }
Complete Pipeline
You've built all 6 transformation steps! See the complete, production-ready configuration: