Step 1: Severity-Based Priority Routing
In this step, you will implement the most common priority queue pattern: routing log messages to different Kafka topics based on their severity. This ensures that critical alerts are processed immediately while routine logs are handled efficiently.
The Goal
You will modify a basic logging pipeline to route messages to logs-critical, logs-high, logs-normal, or logs-low topics based on the value of the severity field in the message.
The "Classify -> Switch" Pattern
- Classify: First, you'll add a
mappingprocessor to create a new field calledprioritybased on the existingseverityfield. This standardizes different severity inputs (e.g., "WARN" and "WARNING") into a consistent priority level. - Switch: Then, you'll replace the single
kafkaoutput with aswitchoutput that directs the message to the correct Kafka topic based on the newpriorityfield.
Implementation
-
Start with the Foundation: Copy the
priority-queues-foundation.yamlto a new file namedseverity-router.yaml.cp examples/data-routing/priority-queues-foundation.yaml severity-router.yaml -
Add the Classification Processor: Open
severity-router.yamland add the "Classify"mappingprocessor to thepipelinesection.Add this to the 'processors' array in severity-router.yaml# This goes after the existing 'mapping' processor
- mapping: |
root = this
# Normalize severity to uppercase for consistent matching
root.severity = this.severity.string().uppercase()
# Map severity to a standard priority level
root.priority = match root.severity {
"CRITICAL" | "FATAL" => "critical",
"ERROR" => "high",
"WARN" | "WARNING" => "normal",
_ => "low" # Default everything else to low priority
} -
Replace the Output with a Switch: Now, replace the entire
outputsection with theswitchblock below.Replace the 'output' section in severity-router.yamloutput:
switch:
cases:
- check: this.priority == "critical"
output:
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-critical
# For critical logs, we send immediately (no batching)
batching:
count: 1
- check: this.priority == "high"
output:
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-high
batching:
count: 10 # Small batch for high-priority
period: 1s
- check: this.priority == "normal"
output:
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-normal
batching:
count: 100 # Standard batch size
period: 5s
# Default case for "low" priority and any others
- output:
kafka:
addresses: ["${KAFKA_BROKERS}"]
topic: logs-low
batching:
count: 1000 # Large batch for low-priority
period: 60s -
Deploy and Test:
# Send a critical event
curl -X POST http://localhost:8080/logs -H "Content-Type: application/json" \
-d '{"severity": "CRITICAL", "message": "Database connection lost"}'
# Send an info event
curl -X POST http://localhost:8080/logs -H "Content-Type: application/json" \
-d '{"severity": "INFO", "message": "User logged in"}' -
Verify: Use a console consumer to check the different Kafka topics. You will see that the critical message arrived immediately in
logs-critical, while the info message was batched and sent tologs-low.
You have now implemented a basic but powerful priority queue system based on log severity.