Skip to main content

Complete Pipeline

This pipeline combines all production log processing techniques:

  • HTTP input - Secure endpoint with rate limiting
  • Validation - JSON schema validation with DLQ routing
  • Enrichment - Add metadata, lineage, and audit trail
  • Filtering - Severity-based routing and cost optimization
  • PII redaction - Remove sensitive data for compliance
  • Fan-out routing - Send to Elasticsearch, S3, and backup

Full Configuration

production-pipeline.yaml
name: production-pipeline-complete
type: pipeline
description: Complete Production Log Pipeline - HTTP input, validation, enrichment, filtering, PII redaction, and multi-destination output.
namespace: production
labels:
category: log-processing
pattern: production-pipeline

config:
input:
http_server:
address: "0.0.0.0:8080"
path: /logs/ingest
allowed_verbs: ["POST"]
rate_limit: "1000/1s"

pipeline:
processors:
# Step 1: Parse and validate JSON
- json_documents:
parts: []

- mapping: |
root = this
root.timestamp = this.timestamp.or(now())
root.level = this.level.uppercase().or("INFO")
root.service = this.service.or("unknown")
root.message = this.message.or("")

# Step 2: Enrich with metadata
- mapping: |
root = this
root.metadata = {
"node_id": env("NODE_ID").or("unknown"),
"region": env("REGION").or("unknown"),
"received_at": now(),
"request_id": uuid_v4()
}

# Step 3: Calculate severity score for routing
- mapping: |
root = this
root.severity_score = match this.level {
"FATAL" | "CRITICAL" => 5,
"ERROR" => 4,
"WARN" => 3,
"INFO" => 2,
_ => 1
}

root.priority = if this.severity_score >= 4 { "high" }
else if this.severity_score >= 3 { "medium" }
else { "low" }

meta priority = root.priority

# Step 4: Filter noise (health checks, debug in prod)
- mapping: |
let msg = this.message.lowercase()
root = if msg.contains("health check") || msg.contains("heartbeat") {
deleted()
} else if this.level == "DEBUG" && env("DEBUG_ENABLED").or("false") != "true" {
deleted()
} else {
this
}

# Step 5: Redact PII
- mapping: |
root = this
# Redact common PII patterns
root.message = this.message.
re_replace_all("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}", "[EMAIL]").
re_replace_all("\\d{3}-\\d{2}-\\d{4}", "[SSN]").
re_replace_all("4\\d{3}[\\s-]?\\d{4}[\\s-]?\\d{4}[\\s-]?\\d{4}", "[CARD]")

# Hash IPs if present
root.source_ip = if this.source_ip.exists() {
this.source_ip.hash("sha256").slice(0, 12)
} else { deleted() }

output:
broker:
pattern: fan_out
outputs:
# Elasticsearch for search
- switch:
- check: 'this.severity_score >= 2'
output:
http_client:
url: "${ELASTICSEARCH:http://es:9200}/logs-${!timestamp_unix_date('2006.01.02')}/_doc"
verb: POST
headers:
Content-Type: application/json
batching:
count: 100
period: 5s

# S3 for archival
- aws_s3:
bucket: "${S3_BUCKET:logs-archive}"
path: "logs/${!timestamp_unix_date('2006/01/02')}/${!uuid_v4()}.jsonl"
batching:
count: 500
period: 60s
compression: gzip

# Alerts for critical logs
- switch:
- check: 'meta("priority") == "high"'
output:
http_client:
url: "${ALERT_WEBHOOK:http://alerts:8080/webhook}"
verb: POST
batching:
count: 1
period: 1s

logger:
level: INFO
format: json

metrics:
prometheus:
path: /metrics

Quick Test

# Send a production log event
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $API_TOKEN" \
-d '{
"level": "ERROR",
"message": "Payment failed for user [email protected]",
"service": "payment",
"trace_id": "abc123",
"user_ip": "192.168.1.100"
}'

# Log is:
# - Validated against schema
# - Enriched with metadata
# - PII (email, IP) redacted
# - Routed to Elasticsearch + S3 based on severity

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy production-pipeline.yaml

# Or run locally with expanso-edge
expanso-edge run --config production-pipeline.yaml

Download

Download production-pipeline.yaml

What's Next?