Skip to main content

Step 4: Prevent Priority Starvation

A potential problem with priority queues is "starvation," where a constant stream of high-priority messages prevents low-priority messages from ever being processed.

In this final step, you will implement a simple but effective solution: age-based priority escalation. The older a message gets, the higher its priority becomes, ensuring that every message is eventually processed.

The Goal

You will modify the scoring processor to calculate the age of a message and add an age_boost to its priority_score.

Implementation

  1. Start with the Previous Pipeline: Copy the multi-criteria-router.yaml file you created in Step 3 to a new file named anti-starvation-router.yaml.

    cp multi-criteria-router.yaml anti-starvation-router.yaml
  2. Enhance the Scoring Processor: Open anti-starvation-router.yaml and replace the scoring mapping processor with this final version. The new logic calculates the message's age and adds a bonus to the score.

    Replace the scoring processor in anti-starvation-router.yaml
    - mapping: |
    root = this
    root.severity = this.severity.string().uppercase()
    root.customer_tier = this.customer_tier.or("free").string().lowercase()
    root.event_type = this.event_type.or("unknown").string().lowercase()

    # --- START: New additions for age-based boost ---

    # 1. Calculate the message age in seconds
    let message_timestamp = this.timestamp.parse_timestamp()
    root.age_seconds = (now().unix() - message_timestamp.unix()).round()

    # 2. Calculate a boost based on age
    let age_boost = match {
    root.age_seconds > 86400 => 100, # > 24 hours: massive boost
    root.age_seconds > 3600 => 40, # > 1 hour: strong boost
    root.age_seconds > 300 => 10, # > 5 minutes: small boost
    _ => 0
    }
    # --- END: New additions ---

    # Calculate severity score
    let severity_score = match root.severity {
    "CRITICAL" | "FATAL" => 80,
    "ERROR" => 60,
    "WARN" | "WARNING" => 40,
    _ => 20
    }

    # Determine tier multiplier
    let tier_multiplier = match root.customer_tier {
    "enterprise" => 2.0,
    "premium" => 1.5,
    _ => 1.0
    }

    # Add a bonus score based on event type
    let event_type_score = match {
    root.event_type.has_prefix("payment.") => 30,
    root.event_type.has_prefix("auth.") => 20,
    _ => 0
    }

    # Calculate the final score, now including the age_boost
    root.priority_score = (severity_score * tier_multiplier) + event_type_score + age_boost

    # Map the final score to a priority level
    root.priority = match {
    root.priority_score >= 100 => "critical",
    root.priority_score >= 80 => "high",
    root.priority_score >= 40 => "normal",
    _ => "low"
    }

3. Deploy and Test

Deploy the new pipeline and send two identical low-priority events: one recent, and one with a timestamp from two hours ago.

# Test a 'INFO' from a 'free' user (score: 20*1.0 + 0 + 0 = 20 => low)
curl -X POST http://localhost:8080/logs -H "Content-Type: application/json" \
-d
"severity": "INFO",
"customer_tier": "free",
"event_type": "user.login",
"timestamp": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"


# Test the SAME event, but with an old timestamp
# (score: 20*1.0 + 0 + 40 = 60 => normal)
curl -X POST http://localhost:8080/logs -H "Content-Type: application/json" \
-d
"severity": "INFO",
"customer_tier": "free",
"event_type": "user.login",
"timestamp": "'$(date -u -d "2 hours ago" +%Y-%m-%dT%H:%M:%SZ)'"

4. Verify

Check your Kafka topics. The first, recent event is correctly routed to logs-low. The second, older event is escalated to the logs-normal topic because its age gave it a +40 point boost, pushing its score over the "normal" threshold.

You have now built a fair and robust priority queue system that guarantees every message will eventually be processed.