Step 1: Classify Priority Tiers
The first step in smart buffering is classifying messages into priority tiers based on their attributes.
The Goal
Categorize every message into one of three tiers:
- Important (Tier 1): Critical alerts, payment failures, security events
- Regular (Tier 2): Standard operations, user actions, API calls
- Archive (Tier 3): Debug logs, analytics, audit trails
Implementation
-
Create a new pipeline file:
touch smart-buffer-classify.yaml -
Add the classification processor:
smart-buffer-classify.yamlname: smart-buffer-classify
description: Classify messages into priority tiers
config:
input:
http_server:
address: 0.0.0.0:8080
path: /events
timeout: 5s
pipeline:
processors:
- json_documents:
parts: []
- mapping: |
root = this
let category = this.category.or("").string().lowercase()
let severity = this.severity.or("info").string().lowercase()
let event_type = this.event_type.or("").string().lowercase()
# Important: critical alerts, payment failures, security events
let is_important = match {
category == "important" => true,
severity == "critical" => true,
event_type.has_prefix("payment.failed") => true,
event_type.has_prefix("security.") => true,
_ => false
}
# Archive: debug logs, analytics, audit trails
let is_archive = match {
category == "archive" => true,
severity == "debug" => true,
event_type.has_prefix("analytics.") => true,
_ => false
}
# Assign priority tier (1=highest, 3=lowest)
root.priority_tier = match {
is_important => 1,
is_archive => 3,
_ => 2
}
root.priority_label = match root.priority_tier {
1 => "important",
2 => "regular",
3 => "archive"
}
output:
stdout:
codec: lines
Test
# Run the pipeline
expanso-edge run --config smart-buffer-classify.yaml
# In another terminal, send test messages:
# Important (tier 1)
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"severity": "critical", "message": "Database connection lost"}'
# Regular (tier 2)
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"severity": "info", "message": "User logged in"}'
# Archive (tier 3)
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"severity": "debug", "message": "Cache hit"}'
Verify
Check the output shows each message with its assigned priority_tier and priority_label.