Advanced Aggregation Patterns
Once you have mastered the three basic window types, you can combine them and add more sophisticated logic for production-grade analytics.
Pattern 1: Multi-Level Aggregation (Rollups)
You can chain aggregation pipelines together to create "rollups". For example, you can take the 1-minute tumbling window aggregates and feed them into another pipeline that aggregates them into 1-hour summaries.
Pipeline 1: 1-Minute Tumbling Windows (from Step 1)
- Input: Raw events
- Output: 1-minute summaries per sensor
Pipeline 2: 1-Hour Rollups
- Input: The 1-minute summaries from Pipeline 1
- Output: 1-hour summaries per sensor
# This pipeline consumes the output of the 1-minute pipeline
input:
kafka:
addresses: [ ${KAFKA_BROKERS} ]
topics: [ "1-minute-aggregates" ]
pipeline:
processors:
# 1. WINDOW: Round the 1-minute window to an hourly one
- mapping: |
let parsed_time = this.window_start.parse_timestamp()
let hour_window = parsed_time.ts_format("2006-01-02T15:00:00Z")
root = this
root.group_key = this.sensor_id + ":" + hour_window
# 2. CACHE & GROUP (using a 1-hour TTL)
# ...
# 3. AGGREGATE the aggregates
- mapping: |
root = {
"sensor_id": this[0].sensor_id,
"hourly_window": this[0].group_key.split(":")[1],
# Sum the counts from the 1-minute buckets
"total_events_in_hour": this.map_each(agg -> agg.event_count).sum(),
# Average the averages for an hourly average
"hourly_avg_temp": this.map_each(agg -> agg.avg_temperature).mean()
}
Pattern 2: Advanced Trend Analysis
The basic sliding window provides a moving average. For more advanced trend analysis, you can perform calculations like slope and correlation on the data points within the window.
- mapping: |
# This processor assumes it has a group of events from a sliding window
let temperatures = this.map_each(event -> event.temperature)
let n = temperatures.length()
if n >= 2 {
let first_temp = temperatures[0]
let last_temp = temperatures[n-1]
let time_span_minutes = 5 # Assuming a 5-minute window
root.temperature_change = (last_temp - first_temp).round(2)
root.temperature_slope_per_min = (root.temperature_change / time_span_minutes).round(3)
root.temperature_trend = match {
root.temperature_slope_per_min > 0.1 => "increasing",
root.temperature_slope_per_min < -0.1 => "decreasing",
_ => "stable"
}
}
Pattern 3: Grace Periods for Late Events
Network latency can cause events to arrive late. If an event for the 10:00:00 window arrives at 10:01:05, a strict 1-minute TTL would have already closed the window. To handle this, you set the cache default_ttl to be slightly longer than the window duration.
- Window Duration:
60s - Grace Period:
10s - Cache TTL:
70s
config:
cache_resources:
- label: window_cache
memory:
default_ttl: 70s # 60s window + 10s grace period
This ensures the window stays "open" for an extra 10 seconds to accept late-arriving data before it is aggregated. Choosing the right grace period is a trade-off between accuracy and latency.