Skip to main content

Step 3: Priority-Based Output Routing

Sorting within a batch isn't enough. If 20 regular messages are already batched and a new important message arrives, it still waits. The solution: separate output paths with different batching policies.

The Problem​

With a single output path:

  1. 20 regular messages arrive and start batching (5s window)
  2. 1 important message arrives
  3. Important message waits for the batch to complete
  4. After reconnection, all queued messages compete equally

The Solution​

Route each priority tier to its own output with tier-specific batching:

  • Important: count: 1 - immediate send, no batching
  • Regular: count: 50, period: 5s - moderate batching
  • Archive: count: 200, period: 30s - heavy batching

Important messages bypass the buffer entirely and ship immediately.

Implementation​

smart-buffer-priority-output.yaml
name: smart-buffer-priority-output
description: Priority-based output routing with tier-specific batching

config:
input:
http_server:
address: 0.0.0.0:8080
path: /events
timeout: 5s

pipeline:
processors:
- json_documents:
parts: []

- mapping: |
root = this

let category = this.category.or("").string().lowercase()
let severity = this.severity.or("info").string().lowercase()
let event_type = this.event_type.or("").string().lowercase()

let is_important = match {
category == "important" => true,
severity == "critical" => true,
event_type.has_prefix("payment.failed") => true,
_ => false
}

let is_archive = match {
category == "archive" => true,
severity == "debug" => true,
event_type.has_prefix("analytics.") => true,
_ => false
}

root.priority_tier = match {
is_important => 1,
is_archive => 3,
_ => 2
}

root.priority_label = match root.priority_tier {
1 => "important",
2 => "regular",
3 => "archive"
}

root.classified_at = now()

output:
switch:
retry_until_success: true
cases:
# IMPORTANT: No batching - ship immediately
- check: this.priority_tier == 1
output:
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: important
# No batching - each message sent immediately
batching:
count: 1
period: 0s
timeout: 10s
max_retries: 10
backoff:
initial_interval: 100ms
max_interval: 5s

# REGULAR: Moderate batching
- check: this.priority_tier == 2
output:
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: regular
batching:
count: 50
period: 5s
timeout: 30s
max_retries: 5

# ARCHIVE: Heavy batching - optimize for throughput
- check: this.priority_tier == 3
output:
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: archive
batching:
count: 200
period: 30s
timeout: 60s
max_retries: 3

How Preemption Works​

Timeline:
t=0s: 20 regular messages arrive → start 5s batch window
t=2s: 1 important message arrives → IMMEDIATELY SENT (bypasses batch)
t=5s: regular batch completes → 20 regular messages sent

The important message doesn't wait for the regular batch—it has its own output path with count: 1.

Reconnection Scenario​

After coming back online with a backlog:

QueueMessagesBatchingShips
Important5count: 1Immediately, one by one
Regular100count: 50After important, in 2 batches
Archive500count: 200Last, in 3 batches

Important messages ship first because their output has no batching delay.

Test​

# Terminal 1: Start pipeline
expanso-edge run --config smart-buffer-priority-output.yaml

# Terminal 2: Send messages in "wrong" order
# First, flood with regular messages
for i in {1..20}; do
curl -X POST http://localhost:8080/events \
-d '{"severity": "info", "message": "Regular '$i'"}'
done

# Then send important (should still ship first!)
curl -X POST http://localhost:8080/events \
-d '{"severity": "critical", "message": "IMPORTANT - should ship first"}'

Check your destination—the important message arrives before the regular batch completes.

Next Step​