Complete Pipeline
This pipeline combines all time-windowed aggregation techniques from this tutorial:
- Tumbling windows - Fixed, non-overlapping time buckets (e.g., 5-minute windows)
- Sliding windows - Overlapping windows for smoother trends
- Session windows - Activity-based grouping with timeouts
- Multi-level aggregation - Combine window types for different use cases
Full Configuration
aggregate-time-windows.yaml
name: aggregate-time-windows-complete
type: pipeline
description: Production-ready time-windowed aggregation with tumbling, sliding, and multi-level aggregation patterns.
namespace: production
labels:
category: data-transformation
pattern: time-windows
config:
input:
kafka:
addresses:
- "${KAFKA_BROKER_1:kafka-1:9092}"
- "${KAFKA_BROKER_2:kafka-2:9092}"
topics: ["sensor-events"]
consumer_group: "time-window-aggregation-${INSTANCE_ID:default}"
pipeline:
processors:
# Input validation
- try:
processors:
- json: {}
- mapping: |
if !this.exists("sensor_id") {
error("Missing required field: sensor_id")
}
if !this.exists("timestamp") {
error("Missing required field: timestamp")
}
if !this.exists("temperature") {
error("Missing required field: temperature")
}
root = this
root.processing_started_at = now().ts_format("2006-01-02T15:04:05.000Z")
root.pipeline_instance = "${INSTANCE_ID:default}"
root.location = this.get("location").or("unknown_location")
root.facility = this.get("facility").or("unknown_facility")
catch:
- mapping: |
root = {
"error_type": "input_validation_error",
"error_message": error(),
"original_payload": this,
"timestamp": now().ts_format("2006-01-02T15:04:05.000Z")
}
- mapping: 'deleted()'
# Multi-window aggregation
- branch:
request_map: |
let parsed_timestamp = this.timestamp.parse_timestamp("2006-01-02T15:04:05.000Z")
let minute_bucket = parsed_timestamp.ts_format("2006-01-02T15:04:00Z")
root = [
# Tumbling windows (1-minute)
this.merge({
"aggregation_type": "tumbling",
"window_duration_minutes": 1,
"group_key": this.sensor_id + "|tumbling_1m|" + minute_bucket,
"window_start": minute_bucket
}),
# Sliding windows (5-minute)
this.merge({
"aggregation_type": "sliding",
"window_duration_minutes": 5,
"group_key": this.sensor_id + "|sliding_5m|" + minute_bucket,
"window_start": minute_bucket
}),
# Multi-level sensor aggregation
this.merge({
"aggregation_type": "multi_level",
"aggregation_level": "sensor",
"group_key": this.sensor_id + "|sensor|" + minute_bucket
}),
# Multi-level location aggregation
this.merge({
"aggregation_type": "multi_level",
"aggregation_level": "location",
"group_key": this.location + "|location|" + minute_bucket
})
]
processors:
- group_by:
- key: ${! this.group_key }
value: ${! this }
size: 1000
- mapping: |
let events = this.sort_by(event -> event.timestamp)
let first_event = events[0]
let aggregation_type = first_event.aggregation_type
root.aggregation_type = aggregation_type
root.group_key = first_event.group_key
root.aggregation_timestamp = now().ts_format("2006-01-02T15:04:05.000Z")
root.event_count = events.length()
let temperatures = events.map_each(e -> e.temperature)
root.temperature_avg = temperatures.mean().round(2)
root.temperature_min = temperatures.min()
root.temperature_max = temperatures.max()
if aggregation_type == "tumbling" {
root.sensor_id = first_event.sensor_id
root.location = first_event.location
root.window_start = first_event.window_start
root.window_duration_minutes = first_event.window_duration_minutes
} else if aggregation_type == "sliding" {
root.sensor_id = first_event.sensor_id
root.location = first_event.location
root.window_start = first_event.window_start
root.window_duration_minutes = first_event.window_duration_minutes
if temperatures.length() >= 3 {
let first_temp = temperatures[0]
let last_temp = temperatures[temperatures.length() - 1]
root.temperature_change = (last_temp - first_temp).round(2)
root.temperature_trend = match {
root.temperature_change > 1 => "increasing"
root.temperature_change < -1 => "decreasing"
_ => "stable"
}
}
} else if aggregation_type == "multi_level" {
root.aggregation_level = first_event.aggregation_level
if first_event.aggregation_level == "sensor" {
root.sensor_id = first_event.sensor_id
root.location = first_event.location
} else if first_event.aggregation_level == "location" {
root.location = first_event.location
root.sensor_count = events.map_each(e -> e.sensor_id).unique().length()
}
}
root.data_quality_score = match {
root.event_count >= 50 => 0.95
root.event_count >= 30 => 0.85
root.event_count >= 10 => 0.70
_ => 0.50
}
output:
broker:
pattern: fan_out
outputs:
- http_client:
url: "${ANALYTICS_ENDPOINT:http://localhost:8080}/aggregations"
verb: POST
headers:
Content-Type: application/json
batching:
count: 500
period: "10s"
timeout: "15s"
max_retries: 2
- file:
path: "/var/expanso/buffer/aggregations-${!timestamp_unix()}.jsonl"
codec: lines
logger:
level: "${LOG_LEVEL:INFO}"
format: json
metrics:
prometheus:
path: "/metrics"
Quick Test
# Send sensor readings over time
for i in {1..10}; do
curl -X POST http://localhost:8080/sensors \
-H "Content-Type: application/json" \
-d "{
\"sensor_id\": \"temp-001\",
\"temperature\": $((70 + RANDOM % 10)),
\"timestamp\": \"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"
}"
sleep 1
done
# Wait for window to close, then check aggregated output
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy aggregate-time-windows.yaml
# Or run locally with expanso-edge
expanso-edge run --config aggregate-time-windows.yaml
Download
Download aggregate-time-windows.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Deduplicate Events - Remove duplicate messages