Step 1: Tumbling Windows
This step teaches the fundamental pattern of time-series aggregation: tumbling windows. A tumbling window is a fixed-size, non-overlapping time bucket. For example, you can group all events that occurred in the same minute into a single summary event.
The Goal
You will transform a continuous stream of individual sensor readings into a series of 10-second summaries, calculating the total event count and average temperature for each window.
The "Window -> Cache -> Group -> Aggregate" Pattern
- Window: For each message, calculate which time window it belongs to. This is done by rounding the message's timestamp down to the nearest interval (e.g.,
10:23:47becomes10:23:40). - Cache: Store each message in a temporary cache, using a key that combines the sensor ID and the time window. The cache is set to expire items after the window duration.
- Group: When the cache expires an item (or a batch of items), a
group_byprocessor collects all messages that shared the same key. - Aggregate: A final
mappingprocessor takes the group of messages and calculates summary statistics.
Implementation
-
Create the Aggregation Pipeline: Copy the following configuration into a file named
tumbling-windows.yaml.tumbling-windows.yamlname: tumbling-windows-aggregator
description: A pipeline that aggregates sensor data into 10-second tumbling windows.
# 1. Define a cache resource to temporarily store events for each window.
# The TTL (time-to-live) determines when the window "closes".
config:
cache_resources:
- label: window_cache
memory:
default_ttl: 12s # 10s window + 2s grace period for late events
input:
generate: # Generate a stream of mock sensor data
interval: 100ms
mapping: |
root = {
"sensor_id": "sensor-" + (random_int() % 2).string(),
"temperature": (random_int() % 10) + 20.0,
"timestamp": now().ts_format("2006-01-02T15:04:05.999Z")
}
pipeline:
processors:
# 2. WINDOW: Calculate the 10-second window and create a group key
- mapping: |
let parsed_time = this.timestamp.parse_timestamp()
let window_start_unix = (parsed_time.unix() / 10).floor() * 10
let window_start = window_start_unix.ts_format_iso8601()
root = this
root.group_key = this.sensor_id + ":" + window_start
# 3. CACHE: Store the event until the window closes
- cache:
resource: window_cache
operator: add
key: ${! this.group_key }
value: ${! this }
# 4. GROUP: When the cache TTL expires, group all events with the same key
- group_by:
- key: ${! this.group_key }
value: ${! this }
# 5. AGGREGATE: Calculate statistics for the group
- mapping: |
let first_event = this[0]
let temperatures = this.map_each(event -> event.temperature)
root = {
"sensor_id": first_event.sensor_id,
"window_start": first_event.group_key.split(":")[1],
"event_count": this.length(),
"avg_temperature": temperatures.mean().round(2)
}
output:
stdout:
codec: lines -
Deploy and Observe: After deploying, watch the logs. You will see individual events being generated, but the output will only show summary messages every 10 seconds, one for each
sensor_id.
Verification
The output will be a stream of aggregated JSON objects. Every 10 seconds, you will see a summary for sensor-0 and sensor-1 representing the data from the previous window.
Example Output:
{"sensor_id":"sensor-0","window_start":"2025-11-22T20:49:10Z","event_count":50,"avg_temperature":24.4}
{"sensor_id":"sensor-1","window_start":"2025-11-22T20:49:10Z","event_count":50,"avg_temperature":24.88}
// ... 10 seconds later ...
{"sensor_id":"sensor-0","window_start":"2025-11-22T20:49:20Z","event_count":50,"avg_temperature":24.6}
{"sensor_id":"sensor-1","window_start":"2025-11-22T20:49:20Z","event_count":50,"avg_temperature":24.52}
You have successfully built a real-time aggregation pipeline using a tumbling window. This fundamental pattern is the basis for most time-series analysis.