Skip to main content

Complete Pipeline

This pipeline combines all priority queue techniques from this tutorial:

  • Severity-based routing - CRITICAL → immediate, ERROR → high, WARN → normal, INFO/DEBUG → low
  • Priority scoring - Numeric scores map to queue assignments
  • Queue-specific batching - Critical: no batching (count: 1), Low: large batches (count: 1000)
  • Delivery guarantees - Critical gets ack_replicas: true and more retries

Full Configuration

priority-queues.yaml
name: severity-based-priority-queues
description: Route log messages to priority queues based on severity
type: pipeline
namespace: production

config:
input:
http_server:
address: 0.0.0.0:8080
path: /logs
timeout: 5s
rate_limit: "10000/s"

pipeline:
processors:
# Step 1: Parse and validate
- json_documents:
parts: []

# Step 2: Classify priority based on severity
- mapping: |
root = this

if !this.severity.exists() {
throw("Missing required field: severity")
}

# Normalize severity to uppercase
root.severity = this.severity.string().uppercase()

# Map severity to priority
root.priority = match root.severity {
"CRITICAL" => "critical"
"FATAL" => "critical"
"ERROR" => "high"
"WARNING" => "normal"
"WARN" => "normal"
"INFO" => "low"
"DEBUG" => "low"
"TRACE" => "low"
_ => "normal"
}

# Add priority score for metrics
root.priority_score = match root.priority {
"critical" => 100
"high" => 75
"normal" => 50
"low" => 25
_ => 0
}

root.processed_at = now()
root.edge_node_id = env("NODE_ID").or("unknown")

output:
switch:
retry_until_success: false

cases:
# Case 1: Critical priority - immediate delivery
- check: this.priority == "critical"
output:
label: critical_queue
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-critical

# No batching - send immediately
batching:
count: 1
period: 0s

# Aggressive delivery guarantees
max_in_flight: 1
ack_replicas: true
idempotent_write: true

# Maximum retries for critical messages
max_retries: 10
backoff:
initial_interval: 100ms
max_interval: 2s

# Case 2: High priority - small batches, fast delivery
- check: this.priority == "high"
output:
label: high_priority_queue
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-high

# Small batches for fast delivery
batching:
count: 10
period: 1s

# Strong delivery guarantees
max_in_flight: 5
ack_replicas: true

# Moderate retries
max_retries: 5
backoff:
initial_interval: 500ms
max_interval: 5s

compression: snappy

# Case 3: Normal priority - standard batching
- check: this.priority == "normal"
output:
label: normal_queue
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-normal

# Standard batching for balanced throughput/latency
batching:
count: 100
period: 5s

# Balanced configuration
max_in_flight: 10

# Standard retries
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 10s

compression: snappy

# Case 4: Low priority - large batches, deferred delivery
- check: this.priority == "low"
output:
label: low_priority_queue
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-low

# Large batches to minimize overhead
batching:
count: 1000
period: 1m

# High concurrency for throughput
max_in_flight: 20

# Minimal retries
max_retries: 1
backoff:
initial_interval: 5s
max_interval: 30s

# Aggressive compression for low-priority bulk data
compression: gzip

# Default: Treat unknown priorities as normal
- output:
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-normal
batching:
count: 100
period: 5s

Quick Test

# Send critical event (immediate delivery, no batching)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "CRITICAL", "message": "Database connection lost"}'

# Send info event (batched delivery to low-priority queue)
curl -X POST http://localhost:8080/logs \
-H "Content-Type: application/json" \
-d '{"severity": "INFO", "message": "User logged in"}'

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy priority-queues.yaml

# Or run locally with expanso-edge
expanso-edge run --config priority-queues.yaml

Download

Download priority-queues.yaml

What's Next?