Complete Pipeline
This pipeline combines all schema validation techniques from this tutorial:
- JSON Schema validation - Validate incoming data against defined schema
- Smart routing - Valid data → analytics, invalid data → DLQ
- Error enrichment - Add validation error details to failed messages
- Quality metrics - Track validation pass/fail rates
Full Configuration
enforce-schema.yaml
name: enforce-schema-complete
type: pipeline
description: Complete Schema Validation Pipeline - Validates incoming data against JSON Schema, routes failures to DLQ.
namespace: production
labels:
category: data-security
pattern: schema-validation
config:
input:
http_server:
address: "0.0.0.0:8080"
path: /sensors
allowed_verbs: ["POST"]
pipeline:
processors:
# Step 1: Track message for DLQ context
- mapping: |
root = this
meta message_id = uuid_v4()
meta received_at = now()
meta original_content = content()
# Step 2: Parse JSON with error handling
- try:
- json_documents:
parts: []
- mapping: |
meta parse_status = "success"
catch:
- mapping: |
root = {
"message_id": meta("message_id"),
"error_type": "json_parse_error",
"error_message": error(),
"original_content": meta("original_content").bytes(0, 500),
"received_at": meta("received_at"),
"dlq_entry_time": now()
}
meta parse_status = "failed"
meta validation_status = "parse_failed"
# Step 3: Validate against JSON Schema
- switch:
- check: 'meta("parse_status") == "success"'
processors:
- try:
- json_schema:
schema_path: "file:///etc/expanso/schemas/sensor-schema.json"
- mapping: |
meta validation_status = "passed"
catch:
- mapping: |
root = {
"message_id": meta("message_id"),
"error_type": "schema_validation_error",
"error_message": error(),
"original_data": this,
"received_at": meta("received_at"),
"dlq_entry_time": now()
}
meta validation_status = "failed"
# Step 4: Add validation metadata to successful messages
- switch:
- check: 'meta("validation_status") == "passed"'
processors:
- mapping: |
root = this
root.validation = {
"status": "passed",
"validated_at": now(),
"message_id": meta("message_id")
}
output:
switch:
# Valid data → analytics
- check: 'meta("validation_status") == "passed"'
output:
http_client:
url: "${ANALYTICS_ENDPOINT:http://analytics:8080}/ingest"
verb: POST
headers:
Content-Type: application/json
batching:
count: 100
period: 5s
max_retries: 3
# Invalid data → DLQ
- output:
file:
path: "/var/log/expanso/dlq/${!timestamp_unix_date('2006-01-02')}/failures.jsonl"
codec: lines
logger:
level: INFO
format: json
metrics:
prometheus:
path: /metrics
Quick Test
# Send valid sensor data
curl -X POST http://localhost:8080/sensors \
-H "Content-Type: application/json" \
-d '{
"sensor_id": "temp-001",
"temperature": 72.5,
"humidity": 45.0,
"timestamp": "2024-01-15T10:30:00Z"
}'
# Result: Routed to analytics
# Send invalid data (missing required field)
curl -X POST http://localhost:8080/sensors \
-H "Content-Type: application/json" \
-d '{
"sensor_id": "temp-001",
"temperature": "not-a-number"
}'
# Result: Routed to DLQ with validation error details
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy enforce-schema.yaml
# Or run locally with expanso-edge
expanso-edge run --config enforce-schema.yaml
Download
Download enforce-schema.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Remove PII - Delete personal information