Step 4: Prevent Priority Starvation
A constant stream of important messages could starve archive messages forever. We fix this with age-based priority escalation.
The Problemβ
If important messages keep arriving, lower-priority messages may never be sent. This violates the guarantee that all data eventually reaches its destination.
The Solutionβ
Boost a message's priority score based on how long it's been waiting. Eventually, even archive messages become "important enough" to send.
Implementationβ
-
Copy the previous pipeline:
cp smart-buffer-ordered.yaml smart-buffer-complete.yaml -
Add age-based escalation:
Replace the mapping processor in smart-buffer-complete.yaml- mapping: |
root = this
let category = this.category.or("").string().lowercase()
let severity = this.severity.or("info").string().lowercase()
let event_type = this.event_type.or("").string().lowercase()
let is_important = match {
category == "important" => true,
severity == "critical" => true,
event_type.has_prefix("payment.failed") => true,
_ => false
}
let is_archive = match {
category == "archive" => true,
severity == "debug" => true,
event_type.has_prefix("analytics.") => true,
_ => false
}
root.priority_tier = match {
is_important => 1,
is_archive => 3,
_ => 2
}
root.priority_label = match root.priority_tier {
1 => "important",
2 => "regular",
3 => "archive"
}
# Base priority score
let base_score = match root.priority_tier {
1 => 1000,
2 => 500,
3 => 100
}
# Calculate message age
let message_ts = this.timestamp.or(now().format_timestamp()).parse_timestamp()
let age_seconds = (now().unix() - message_ts.unix()).round()
root.age_seconds = age_seconds
# Age-based priority boost
let age_boost = match {
age_seconds > 7200 => 900, # > 2 hours: near-important
age_seconds > 1800 => 400, # > 30 min: regular priority
age_seconds > 300 => 100, # > 5 min: small boost
_ => 0
}
root.age_boost = age_boost
root.priority_score = base_score + age_boost
root.age_escalated = age_boost > 0
root.buffered_at = now()
How Age Boost Worksβ
| Message Age | Boost | Archive Score | Effect |
|---|---|---|---|
| < 5 min | 0 | 100 | Normal archive priority |
| 5-30 min | +100 | 200 | Still below regular |
| 30 min - 2 hr | +400 | 500 | Same as regular |
| > 2 hours | +900 | 1000 | Same as important |
After 2 hours, even archive messages are treated as important, guaranteeing delivery.
Testβ
# Send an archive message with an old timestamp
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{
"severity": "debug",
"message": "Old archive message",
"timestamp": "'$(date -u -v-3H +%Y-%m-%dT%H:%M:%SZ)'"
}'
This 3-hour-old archive message gets a +900 boost, bringing its score to 1000βequal to important messages.
Complete Pipelineβ
You've built a smart buffering system that:
- Classifies messages into priority tiers
- Assigns numeric scores for precise ordering
- Sorts buffers by priority before delivery
- Prevents starvation with age-based escalation