Skip to main content

Step 6: Validate Required Fields

Reject records with missing required fields before they reach BigQuery. This replaces DataStage Data Rules stages with conditional validation.

The Goal

  • Check that transaction_id, customer_id, and amount_usd are present
  • Reject invalid records with a clear error message
  • Prevent bad data from polluting your BigQuery tables

Why This Matters

Data Quality: Bad records in BigQuery break dashboards and reports.

Cost Control: Rejecting early saves BigQuery storage and query costs.

Debugging: Clear error messages help identify source data issues.

DataStage Equivalent

In DataStage, validation typically requires:

  1. Data Rules Stage or Filter Stage
  2. Reject links to capture bad records
  3. Separate error handling jobs

Expanso simplifies this with conditional throw() that stops processing.

Implementation

Add the validation processor as the final step:

step-6-validate.yaml
pipeline:
processors:
# Steps 1-5 from previous...

# Step 6: Validate required fields before BigQuery load
- mapping: |
root = if this.transaction_id == null ||
this.customer_id == null ||
this.amount_usd == null {
throw("Missing required field for BigQuery load")
} else {
this
}

Understanding the Code

ExpressionWhat It Does
this.field == nullCheck if field is missing or null
||Logical OR (any condition triggers)
throw("message")Stop processing with error
else { this }Pass record through unchanged

Expected Behavior

Valid Record: Passes through unchanged

Invalid Record (missing customer_id):

ERROR: Missing required field for BigQuery load
Record: {"transaction_id": "TXN-001", "amount_usd": 100.00, ...}

Production Considerations

Detailed Error Messages

Identify which field is missing:

- mapping: |
let missing = []
let missing = if this.transaction_id == null { $missing.append("transaction_id") } else { $missing }
let missing = if this.customer_id == null { $missing.append("customer_id") } else { $missing }
let missing = if this.amount_usd == null { $missing.append("amount_usd") } else { $missing }

root = if $missing.length() > 0 {
throw("Missing required fields: " + $missing.join(", "))
} else {
this
}

Dead Letter Queue

Route bad records to a DLQ instead of failing:

pipeline:
processors:
# ... transformation steps ...

- switch:
- check: this.transaction_id == null || this.customer_id == null
processors:
- mapping: |
root = this
root._error = "Missing required field"
root._failed_at = now()
- output:
gcp_pubsub:
project: "${GCP_PROJECT}"
topic: etl-dlq
- processors:
- mapping: root = this # Valid records continue

Type Validation

Check data types, not just presence:

root = if this.amount_usd.type() != "number" {
throw("amount_usd must be numeric, got: " + this.amount_usd.type())
} else if this.amount_usd < 0 {
throw("amount_usd cannot be negative: " + this.amount_usd.string())
} else {
this
}

Range Validation

Flag suspicious values:

root._warnings = []

# Flag unusually large transactions
root._warnings = if this.amount_usd > 100000 {
root._warnings.append("Large transaction: review required")
} else {
root._warnings
}

# Flag future dates
root._warnings = if this.transaction_date > now().ts_format("2006-01-02") {
root._warnings.append("Future transaction date")
} else {
root._warnings
}

Validation Metrics

Count validation failures for monitoring:

- metric:
type: counter
name: validation_failures
labels:
reason: ${! meta("error_reason").or("unknown") }

Complete Pipeline

You've built all 6 transformation steps! See the complete, production-ready configuration: