Complete Pipeline
This pipeline combines all production log processing techniques:
- HTTP input - Secure endpoint with rate limiting
- Validation - JSON schema validation with DLQ routing
- Enrichment - Add metadata, lineage, and audit trail
- Filtering - Severity-based routing and cost optimization
- PII redaction - Remove sensitive data for compliance
- Fan-out routing - Send to Elasticsearch, S3, and backup
Full Configuration
production-pipeline.yaml
name: production-pipeline-complete
type: pipeline
description: Complete Production Log Pipeline - HTTP input, validation, enrichment, filtering, PII redaction, and multi-destination output.
namespace: production
labels:
category: log-processing
pattern: production-pipeline
config:
input:
http_server:
address: "0.0.0.0:8080"
path: /logs/ingest
allowed_verbs: ["POST"]
rate_limit: "1000/1s"
pipeline:
processors:
# Step 1: Parse and validate JSON
- json_documents:
parts: []
- mapping: |
root = this
root.timestamp = this.timestamp.or(now())
root.level = this.level.uppercase().or("INFO")
root.service = this.service.or("unknown")
root.message = this.message.or("")
# Step 2: Enrich with metadata
- mapping: |
root = this
root.metadata = {
"node_id": env("NODE_ID").or("unknown"),
"region": env("REGION").or("unknown"),
"received_at": now(),
"request_id": uuid_v4()
}
# Step 3: Calculate severity score for routing
- mapping: |
root = this
root.severity_score = match this.level {
"FATAL" | "CRITICAL" => 5,
"ERROR" => 4,
"WARN" => 3,
"INFO" => 2,
_ => 1
}
root.priority = if this.severity_score >= 4 { "high" }
else if this.severity_score >= 3 { "medium" }
else { "low" }
meta priority = root.priority
# Step 4: Filter noise (health checks, debug in prod)
- mapping: |
let msg = this.message.lowercase()
root = if msg.contains("health check") || msg.contains("heartbeat") {
deleted()
} else if this.level == "DEBUG" && env("DEBUG_ENABLED").or("false") != "true" {
deleted()
} else {
this
}
# Step 5: Redact PII
- mapping: |
root = this
# Redact common PII patterns
root.message = this.message.
re_replace_all("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}", "[EMAIL]").
re_replace_all("\\d{3}-\\d{2}-\\d{4}", "[SSN]").
re_replace_all("4\\d{3}[\\s-]?\\d{4}[\\s-]?\\d{4}[\\s-]?\\d{4}", "[CARD]")
# Hash IPs if present
root.source_ip = if this.source_ip.exists() {
this.source_ip.hash("sha256").slice(0, 12)
} else { deleted() }
output:
broker:
pattern: fan_out
outputs:
# Elasticsearch for search
- switch:
- check: 'this.severity_score >= 2'
output:
http_client:
url: "${ELASTICSEARCH:http://es:9200}/logs-${!timestamp_unix_date('2006.01.02')}/_doc"
verb: POST
headers:
Content-Type: application/json
batching:
count: 100
period: 5s
# S3 for archival
- aws_s3:
bucket: "${S3_BUCKET:logs-archive}"
path: "logs/${!timestamp_unix_date('2006/01/02')}/${!uuid_v4()}.jsonl"
batching:
count: 500
period: 60s
compression: gzip
# Alerts for critical logs
- switch:
- check: 'meta("priority") == "high"'
output:
http_client:
url: "${ALERT_WEBHOOK:http://alerts:8080/webhook}"
verb: POST
batching:
count: 1
period: 1s
logger:
level: INFO
format: json
metrics:
prometheus:
path: /metrics
Quick Test
# Send a production log event
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $API_TOKEN" \
-d '{
"level": "ERROR",
"message": "Payment failed for user [email protected]",
"service": "payment",
"trace_id": "abc123",
"user_ip": "192.168.1.100"
}'
# Log is:
# - Validated against schema
# - Enriched with metadata
# - PII (email, IP) redacted
# - Routed to Elasticsearch + S3 based on severity
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy production-pipeline.yaml
# Or run locally with expanso-edge
expanso-edge run --config production-pipeline.yaml
Download
Download production-pipeline.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Content Routing - Route data by content