Skip to main content

Step 4: Prevent Priority Starvation

A constant stream of important messages could starve archive messages forever. We fix this with age-based priority escalation.

The Problem​

If important messages keep arriving, lower-priority messages may never be sent. This violates the guarantee that all data eventually reaches its destination.

The Solution​

Boost a message's priority score based on how long it's been waiting. Eventually, even archive messages become "important enough" to send.

Implementation​

  1. Copy the previous pipeline:

    cp smart-buffer-ordered.yaml smart-buffer-complete.yaml
  2. Add age-based escalation:

    Replace the mapping processor in smart-buffer-complete.yaml
    - mapping: |
    root = this

    let category = this.category.or("").string().lowercase()
    let severity = this.severity.or("info").string().lowercase()
    let event_type = this.event_type.or("").string().lowercase()

    let is_important = match {
    category == "important" => true,
    severity == "critical" => true,
    event_type.has_prefix("payment.failed") => true,
    _ => false
    }

    let is_archive = match {
    category == "archive" => true,
    severity == "debug" => true,
    event_type.has_prefix("analytics.") => true,
    _ => false
    }

    root.priority_tier = match {
    is_important => 1,
    is_archive => 3,
    _ => 2
    }

    root.priority_label = match root.priority_tier {
    1 => "important",
    2 => "regular",
    3 => "archive"
    }

    # Base priority score
    let base_score = match root.priority_tier {
    1 => 1000,
    2 => 500,
    3 => 100
    }

    # Calculate message age
    let message_ts = this.timestamp.or(now().format_timestamp()).parse_timestamp()
    let age_seconds = (now().unix() - message_ts.unix()).round()
    root.age_seconds = age_seconds

    # Age-based priority boost
    let age_boost = match {
    age_seconds > 7200 => 900, # > 2 hours: near-important
    age_seconds > 1800 => 400, # > 30 min: regular priority
    age_seconds > 300 => 100, # > 5 min: small boost
    _ => 0
    }

    root.age_boost = age_boost
    root.priority_score = base_score + age_boost
    root.age_escalated = age_boost > 0
    root.buffered_at = now()

How Age Boost Works​

Message AgeBoostArchive ScoreEffect
< 5 min0100Normal archive priority
5-30 min+100200Still below regular
30 min - 2 hr+400500Same as regular
> 2 hours+9001000Same as important

After 2 hours, even archive messages are treated as important, guaranteeing delivery.

Test​

# Send an archive message with an old timestamp
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{
"severity": "debug",
"message": "Old archive message",
"timestamp": "'$(date -u -v-3H +%Y-%m-%dT%H:%M:%SZ)'"
}'

This 3-hour-old archive message gets a +900 boost, bringing its score to 1000β€”equal to important messages.

Complete Pipeline​

You've built a smart buffering system that:

  1. Classifies messages into priority tiers
  2. Assigns numeric scores for precise ordering
  3. Sorts buffers by priority before delivery
  4. Prevents starvation with age-based escalation