Step 3: Priority-Based Output Routing
Sorting within a batch isn't enough. If 20 regular messages are already batched and a new important message arrives, it still waits. The solution: separate output paths with different batching policies.
The Problem​
With a single output path:
- 20 regular messages arrive and start batching (5s window)
- 1 important message arrives
- Important message waits for the batch to complete
- After reconnection, all queued messages compete equally
The Solution​
Route each priority tier to its own output with tier-specific batching:
- Important:
count: 1- immediate send, no batching - Regular:
count: 50, period: 5s- moderate batching - Archive:
count: 200, period: 30s- heavy batching
Important messages bypass the buffer entirely and ship immediately.
Implementation​
smart-buffer-priority-output.yaml
name: smart-buffer-priority-output
description: Priority-based output routing with tier-specific batching
config:
input:
http_server:
address: 0.0.0.0:8080
path: /events
timeout: 5s
pipeline:
processors:
- json_documents:
parts: []
- mapping: |
root = this
let category = this.category.or("").string().lowercase()
let severity = this.severity.or("info").string().lowercase()
let event_type = this.event_type.or("").string().lowercase()
let is_important = match {
category == "important" => true,
severity == "critical" => true,
event_type.has_prefix("payment.failed") => true,
_ => false
}
let is_archive = match {
category == "archive" => true,
severity == "debug" => true,
event_type.has_prefix("analytics.") => true,
_ => false
}
root.priority_tier = match {
is_important => 1,
is_archive => 3,
_ => 2
}
root.priority_label = match root.priority_tier {
1 => "important",
2 => "regular",
3 => "archive"
}
root.classified_at = now()
output:
switch:
retry_until_success: true
cases:
# IMPORTANT: No batching - ship immediately
- check: this.priority_tier == 1
output:
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: important
# No batching - each message sent immediately
batching:
count: 1
period: 0s
timeout: 10s
max_retries: 10
backoff:
initial_interval: 100ms
max_interval: 5s
# REGULAR: Moderate batching
- check: this.priority_tier == 2
output:
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: regular
batching:
count: 50
period: 5s
timeout: 30s
max_retries: 5
# ARCHIVE: Heavy batching - optimize for throughput
- check: this.priority_tier == 3
output:
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: archive
batching:
count: 200
period: 30s
timeout: 60s
max_retries: 3
How Preemption Works​
Timeline:
t=0s: 20 regular messages arrive → start 5s batch window
t=2s: 1 important message arrives → IMMEDIATELY SENT (bypasses batch)
t=5s: regular batch completes → 20 regular messages sent
The important message doesn't wait for the regular batch—it has its own output path with count: 1.
Reconnection Scenario​
After coming back online with a backlog:
| Queue | Messages | Batching | Ships |
|---|---|---|---|
| Important | 5 | count: 1 | Immediately, one by one |
| Regular | 100 | count: 50 | After important, in 2 batches |
| Archive | 500 | count: 200 | Last, in 3 batches |
Important messages ship first because their output has no batching delay.
Test​
# Terminal 1: Start pipeline
expanso-edge run --config smart-buffer-priority-output.yaml
# Terminal 2: Send messages in "wrong" order
# First, flood with regular messages
for i in {1..20}; do
curl -X POST http://localhost:8080/events \
-d '{"severity": "info", "message": "Regular '$i'"}'
done
# Then send important (should still ship first!)
curl -X POST http://localhost:8080/events \
-d '{"severity": "critical", "message": "IMPORTANT - should ship first"}'
Check your destination—the important message arrives before the regular batch completes.