Skip to main content

Complete Pipeline

This pipeline combines all smart buffering techniques from this tutorial:

  • Priority classification - Messages categorized as important, regular, or archive
  • Numeric scoring - Precise ordering with base scores (1000/500/100) plus bonuses
  • Buffer sorting - Batches sorted by priority_score before delivery
  • Starvation prevention - Age-based boost ensures all messages eventually send

Full Configuration

smart-buffering.yaml
name: smart-buffering-pipeline
description: Priority-aware buffering - important messages ship first, even during backlog
type: pipeline
namespace: production

config:
input:
http_server:
address: 0.0.0.0:8080
path: /events
timeout: 5s
rate_limit: "10000/s"

pipeline:
processors:
# Step 1: Parse and validate
- json_documents:
parts: []

# Step 2: Classify priority level and calculate score
- mapping: |
root = this

# Determine priority tier from message attributes
let category = this.category.or("").string().lowercase()
let severity = this.severity.or("info").string().lowercase()
let event_type = this.event_type.or("").string().lowercase()

# Classify as important (priority 1)
let is_important = match {
category == "important" => true,
category == "critical" => true,
severity == "critical" => true,
severity == "fatal" => true,
event_type.has_prefix("payment.failed") => true,
event_type.has_prefix("security.breach") => true,
event_type.has_prefix("system.down") => true,
_ => false
}

# Classify as archive/generic (priority 3)
let is_archive = match {
category == "archive" => true,
category == "generic" => true,
category == "bulk" => true,
severity == "debug" => true,
severity == "trace" => true,
event_type.has_prefix("analytics.") => true,
event_type.has_prefix("audit.") => true,
_ => false
}

# Assign priority tier (1=highest, 3=lowest)
root.priority_tier = match {
is_important => 1,
is_archive => 3,
_ => 2 # Regular (default)
}

root.priority_label = match root.priority_tier {
1 => "important",
2 => "regular",
3 => "archive"
}

# Base priority score
let base_score = match root.priority_tier {
1 => 1000,
2 => 500,
3 => 100
}

# Age-based starvation prevention
let message_ts = this.timestamp.or(now().format_timestamp()).parse_timestamp()
let age_seconds = (now().unix() - message_ts.unix()).round()
root.age_seconds = age_seconds

# Age boost prevents starvation of low-priority messages
let age_boost = match {
age_seconds > 7200 => 900, # > 2 hours: escalate to important
age_seconds > 1800 => 400, # > 30 min: escalate to regular
age_seconds > 300 => 100, # > 5 min: small boost
_ => 0
}

root.age_boost = age_boost
root.priority_score = base_score + age_boost
root.age_escalated = age_boost > 0

# Add processing metadata
root.classified_at = now()
root.edge_node_id = env("NODE_ID").or("unknown")

# Priority-based output routing with tier-specific batching
# Key insight: Important messages bypass batching entirely
output:
switch:
retry_until_success: true
cases:
# IMPORTANT (tier 1): No batching - ship immediately
# New important messages preempt queued regular/archive batches
- check: this.priority_tier == 1
output:
label: important_output
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: important
# NO BATCHING - each message sent immediately
batching:
count: 1
period: 0s
timeout: 10s
# Maximum reliability for important messages
max_retries: 10
backoff:
initial_interval: 100ms
max_interval: 5s

# REGULAR (tier 2): Moderate batching
# Waits for important to clear, then ships in reasonable batches
- check: this.priority_tier == 2
output:
label: regular_output
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: regular
batching:
count: 50
period: 5s
timeout: 30s
max_retries: 5
backoff:
initial_interval: 500ms
max_interval: 10s

# ARCHIVE (tier 3): Heavy batching - optimize for throughput
# Ships last, in large batches to minimize overhead
- check: this.priority_tier == 3
output:
label: archive_output
http_client:
url: ${DESTINATION_URL}
verb: POST
headers:
Content-Type: application/json
X-Priority: archive
batching:
count: 200
period: 30s
timeout: 60s
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 30s

# Fallback for unclassified messages
- output:
label: fallback_output
http_client:
url: ${DESTINATION_URL}
verb: POST
batching:
count: 50
period: 5s

logger:
level: INFO
format: json

metrics:
prometheus:
prefix: smart_buffer

Quick Test

# Send important event (score: 1000, delivered first)
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"category": "important", "message": "Critical alert"}'

# Send regular event (score: 500, delivered second)
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"severity": "info", "message": "User action"}'

# Send archive event (score: 100, delivered last)
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"severity": "debug", "message": "Debug log"}'

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy smart-buffering.yaml

# Or run locally with expanso-edge
expanso-edge run --config smart-buffering.yaml

Download

Download smart-buffering.yaml

What's Next?