Step 4: Prevent Priority Starvation
A potential problem with priority queues is "starvation," where a constant stream of high-priority messages prevents low-priority messages from ever being processed.
In this final step, you will implement a simple but effective solution: age-based priority escalation. The older a message gets, the higher its priority becomes, ensuring that every message is eventually processed.
The Goal
You will modify the scoring processor to calculate the age of a message and add an age_boost to its priority_score.
Implementation
-
Start with the Previous Pipeline: Copy the
multi-criteria-router.yamlfile you created in Step 3 to a new file namedanti-starvation-router.yaml.cp multi-criteria-router.yaml anti-starvation-router.yaml -
Enhance the Scoring Processor: Open
anti-starvation-router.yamland replace the scoringmappingprocessor with this final version. The new logic calculates the message's age and adds a bonus to the score.Replace the scoring processor in anti-starvation-router.yaml- mapping: |
root = this
root.severity = this.severity.string().uppercase()
root.customer_tier = this.customer_tier.or("free").string().lowercase()
root.event_type = this.event_type.or("unknown").string().lowercase()
# --- START: New additions for age-based boost ---
# 1. Calculate the message age in seconds
let message_timestamp = this.timestamp.parse_timestamp()
root.age_seconds = (now().unix() - message_timestamp.unix()).round()
# 2. Calculate a boost based on age
let age_boost = match {
root.age_seconds > 86400 => 100, # > 24 hours: massive boost
root.age_seconds > 3600 => 40, # > 1 hour: strong boost
root.age_seconds > 300 => 10, # > 5 minutes: small boost
_ => 0
}
# --- END: New additions ---
# Calculate severity score
let severity_score = match root.severity {
"CRITICAL" | "FATAL" => 80,
"ERROR" => 60,
"WARN" | "WARNING" => 40,
_ => 20
}
# Determine tier multiplier
let tier_multiplier = match root.customer_tier {
"enterprise" => 2.0,
"premium" => 1.5,
_ => 1.0
}
# Add a bonus score based on event type
let event_type_score = match {
root.event_type.has_prefix("payment.") => 30,
root.event_type.has_prefix("auth.") => 20,
_ => 0
}
# Calculate the final score, now including the age_boost
root.priority_score = (severity_score * tier_multiplier) + event_type_score + age_boost
# Map the final score to a priority level
root.priority = match {
root.priority_score >= 100 => "critical",
root.priority_score >= 80 => "high",
root.priority_score >= 40 => "normal",
_ => "low"
}
3. Deploy and Test
Deploy the new pipeline and send two identical low-priority events: one recent, and one with a timestamp from two hours ago.
# Test a 'INFO' from a 'free' user (score: 20*1.0 + 0 + 0 = 20 => low)
curl -X POST http://localhost:8080/logs -H "Content-Type: application/json" \
-d
"severity": "INFO",
"customer_tier": "free",
"event_type": "user.login",
"timestamp": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
# Test the SAME event, but with an old timestamp
# (score: 20*1.0 + 0 + 40 = 60 => normal)
curl -X POST http://localhost:8080/logs -H "Content-Type: application/json" \
-d
"severity": "INFO",
"customer_tier": "free",
"event_type": "user.login",
"timestamp": "'$(date -u -d "2 hours ago" +%Y-%m-%dT%H:%M:%SZ)'"
4. Verify
Check your Kafka topics. The first, recent event is correctly routed to logs-low. The second, older event is escalated to the logs-normal topic because its age gave it a +40 point boost, pushing its score over the "normal" threshold.
You have now built a fair and robust priority queue system that guarantees every message will eventually be processed.