Skip to main content

Complete Pipeline

This pipeline combines all the content routing techniques from this tutorial:

  • Severity routing - CRITICAL → PagerDuty, WARN → Slack, INFO → Elasticsearch
  • Input validation - Ensures required fields exist
  • Metadata enrichment - Adds routing timestamps and node ID

Full Configuration

content-routing.yaml
name: severity-based-routing
description: Route log messages based on severity level
type: pipeline
namespace: production
labels:
environment: production
pattern: content-routing

config:
input:
http_server:
address: 0.0.0.0:8080
path: /logs
timeout: 5s

pipeline:
processors:
# Parse JSON input and validate
- mapping: |
root = this

# Ensure severity field exists
if !this.severity.exists() {
throw("Missing required field: severity")
}

# Normalize severity to uppercase
root.severity = this.severity.string().uppercase()

# Add routing metadata
root.routed_at = now()
root.node_id = env("NODE_ID").or("unknown")

output:
switch:
cases:
# Case 1: Critical errors to PagerDuty
- check: this.severity == "CRITICAL" || this.severity == "FATAL"
output:
http_client:
url: https://events.pagerduty.com/v2/enqueue
verb: POST
headers:
Content-Type: application/json
Authorization: Token ${PAGERDUTY_TOKEN}

# Transform to PagerDuty event format
processors:
- mapping: |
root.routing_key = env("PAGERDUTY_ROUTING_KEY")
root.event_action = "trigger"
root.payload = {
"summary": this.message.or("Critical error occurred"),
"severity": "critical",
"source": this.source.or("unknown"),
"timestamp": this.timestamp,
"custom_details": this
}

# Fast delivery for critical alerts
max_retries: 5
retry_as_batch: false
backoff:
initial_interval: 500ms
max_interval: 5s

# Case 2: Warnings to Slack
- check: this.severity == "WARNING" || this.severity == "WARN"
output:
http_client:
url: ${SLACK_WEBHOOK_URL}
verb: POST
headers:
Content-Type: application/json

# Transform to Slack message format
processors:
- mapping: |
root.text = ":warning: " + this.message.or("Warning")
root.blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Severity:* " + this.severity + "\n*Source:* " + this.source.or("unknown") + "\n*Message:* " + this.message.or("N/A")
}
}
]

# Batch warnings to avoid Slack rate limits
batching:
count: 10
period: 30s

max_retries: 3

# Case 3: Everything else to Elasticsearch
- output:
elasticsearch:
urls:
- ${ELASTICSEARCH_URL:http://elasticsearch:9200}

# Use date-based indices
index: logs-${!timestamp_unix_date("2006-01-02")}
id: ${!json("event_id")}

batching:
count: 500
period: 10s

gzip_compression: true

max_retries: 3
backoff:
initial_interval: 1s
max_interval: 30s

Quick Test

# Test critical alert (goes to PagerDuty)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "CRITICAL", "message": "Database connection lost"}'

# Test warning (goes to Slack)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "WARN", "message": "High memory usage"}'

# Test info (goes to Elasticsearch)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "INFO", "message": "User logged in"}'

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy content-routing.yaml

# Or run locally with expanso-edge
expanso-edge run --config content-routing.yaml

Download

Download content-routing.yaml

What's Next?