Complete Pipeline
This pipeline combines all the content routing techniques from this tutorial:
- Severity routing - CRITICAL → PagerDuty, WARN → Slack, INFO → Elasticsearch
- Input validation - Ensures required fields exist
- Metadata enrichment - Adds routing timestamps and node ID
Full Configuration
content-routing.yaml
name: severity-based-routing
description: Route log messages based on severity level
type: pipeline
namespace: production
labels:
environment: production
pattern: content-routing
config:
input:
http_server:
address: 0.0.0.0:8080
path: /logs
timeout: 5s
pipeline:
processors:
# Parse JSON input and validate
- mapping: |
root = this
# Ensure severity field exists
if !this.severity.exists() {
throw("Missing required field: severity")
}
# Normalize severity to uppercase
root.severity = this.severity.string().uppercase()
# Add routing metadata
root.routed_at = now()
root.node_id = env("NODE_ID").or("unknown")
output:
switch:
cases:
# Case 1: Critical errors to PagerDuty
- check: this.severity == "CRITICAL" || this.severity == "FATAL"
output:
http_client:
url: https://events.pagerduty.com/v2/enqueue
verb: POST
headers:
Content-Type: application/json
Authorization: Token ${PAGERDUTY_TOKEN}
# Transform to PagerDuty event format
processors:
- mapping: |
root.routing_key = env("PAGERDUTY_ROUTING_KEY")
root.event_action = "trigger"
root.payload = {
"summary": this.message.or("Critical error occurred"),
"severity": "critical",
"source": this.source.or("unknown"),
"timestamp": this.timestamp,
"custom_details": this
}
# Fast delivery for critical alerts
max_retries: 5
retry_as_batch: false
backoff:
initial_interval: 500ms
max_interval: 5s
# Case 2: Warnings to Slack
- check: this.severity == "WARNING" || this.severity == "WARN"
output:
http_client:
url: ${SLACK_WEBHOOK_URL}
verb: POST
headers:
Content-Type: application/json
# Transform to Slack message format
processors:
- mapping: |
root.text = ":warning: " + this.message.or("Warning")
root.blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Severity:* " + this.severity + "\n*Source:* " + this.source.or("unknown") + "\n*Message:* " + this.message.or("N/A")
}
}
]
# Batch warnings to avoid Slack rate limits
batching:
count: 10
period: 30s
max_retries: 3
# Case 3: Everything else to Elasticsearch
- output:
elasticsearch:
urls:
- ${ELASTICSEARCH_URL:http://elasticsearch:9200}
# Use date-based indices
index: logs-${!timestamp_unix_date("2006-01-02")}
id: ${!json("event_id")}
batching:
count: 500
period: 10s
gzip_compression: true
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 30s
Quick Test
# Test critical alert (goes to PagerDuty)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "CRITICAL", "message": "Database connection lost"}'
# Test warning (goes to Slack)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "WARN", "message": "High memory usage"}'
# Test info (goes to Elasticsearch)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "INFO", "message": "User logged in"}'
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy content-routing.yaml
# Or run locally with expanso-edge
expanso-edge run --config content-routing.yaml
Download
Download content-routing.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Circuit Breakers - Add fault tolerance to your pipeline