Complete Pipeline
This pipeline combines all priority queue techniques from this tutorial:
- Severity-based routing - CRITICAL → immediate, ERROR → high, WARN → normal, INFO/DEBUG → low
- Priority scoring - Numeric scores map to queue assignments
- Queue-specific batching - Critical: no batching (count: 1), Low: large batches (count: 1000)
- Delivery guarantees - Critical gets
ack_replicas: trueand more retries
Full Configuration
priority-queues.yaml
name: severity-based-priority-queues
description: Route log messages to priority queues based on severity
type: pipeline
namespace: production
config:
input:
http_server:
address: 0.0.0.0:8080
path: /logs
timeout: 5s
rate_limit: "10000/s"
pipeline:
processors:
# Step 1: Parse and validate
- json_documents:
parts: []
# Step 2: Classify priority based on severity
- mapping: |
root = this
if !this.severity.exists() {
throw("Missing required field: severity")
}
# Normalize severity to uppercase
root.severity = this.severity.string().uppercase()
# Map severity to priority
root.priority = match root.severity {
"CRITICAL" => "critical"
"FATAL" => "critical"
"ERROR" => "high"
"WARNING" => "normal"
"WARN" => "normal"
"INFO" => "low"
"DEBUG" => "low"
"TRACE" => "low"
_ => "normal"
}
# Add priority score for metrics
root.priority_score = match root.priority {
"critical" => 100
"high" => 75
"normal" => 50
"low" => 25
_ => 0
}
root.processed_at = now()
root.edge_node_id = env("NODE_ID").or("unknown")
output:
switch:
retry_until_success: false
cases:
# Case 1: Critical priority - immediate delivery
- check: this.priority == "critical"
output:
label: critical_queue
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-critical
# No batching - send immediately
batching:
count: 1
period: 0s
# Aggressive delivery guarantees
max_in_flight: 1
ack_replicas: true
idempotent_write: true
# Maximum retries for critical messages
max_retries: 10
backoff:
initial_interval: 100ms
max_interval: 2s
# Case 2: High priority - small batches, fast delivery
- check: this.priority == "high"
output:
label: high_priority_queue
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-high
# Small batches for fast delivery
batching:
count: 10
period: 1s
# Strong delivery guarantees
max_in_flight: 5
ack_replicas: true
# Moderate retries
max_retries: 5
backoff:
initial_interval: 500ms
max_interval: 5s
compression: snappy
# Case 3: Normal priority - standard batching
- check: this.priority == "normal"
output:
label: normal_queue
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-normal
# Standard batching for balanced throughput/latency
batching:
count: 100
period: 5s
# Balanced configuration
max_in_flight: 10
# Standard retries
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 10s
compression: snappy
# Case 4: Low priority - large batches, deferred delivery
- check: this.priority == "low"
output:
label: low_priority_queue
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-low
# Large batches to minimize overhead
batching:
count: 1000
period: 1m
# High concurrency for throughput
max_in_flight: 20
# Minimal retries
max_retries: 1
backoff:
initial_interval: 5s
max_interval: 30s
# Aggressive compression for low-priority bulk data
compression: gzip
# Default: Treat unknown priorities as normal
- output:
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-normal
batching:
count: 100
period: 5s
Quick Test
# Send critical event (immediate delivery, no batching)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "CRITICAL", "message": "Database connection lost"}'
# Send info event (batched delivery to low-priority queue)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "INFO", "message": "User logged in"}'
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy priority-queues.yaml
# Or run locally with expanso-edge
expanso-edge run --config priority-queues.yaml
Download
Download priority-queues.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Circuit Breakers - Add fault tolerance