Step 2: Sliding Windows
While tumbling windows are great for discrete summaries, sliding windows are used for calculating smooth, moving averages. The key difference is that in a sliding window, a single event can belong to multiple, overlapping windows.
The Goal
You will modify your pipeline to calculate a 30-second moving average, updated every 10 seconds.
- The Window Size is 30 seconds.
- The Slide Interval is 10 seconds.
This means an event that occurs at 10:00:15 will be included in three different windows:
- The
10:00:00 - 10:00:30window. - The
10:00:10 - 10:00:40window. - The
10:00:20 - 10:00:50window.
The "Generate Keys -> Unarchive -> Aggregate" Pattern
The implementation is very similar to tumbling windows, with two key differences:
- Generate Keys: Instead of one
group_key, you will generate an array of keys for each message, one for each overlapping window it belongs to. - Unarchive: You will use the
unarchiveprocessor to create a copy of the message for each key in the array.
From there, the cache, group_by, and aggregate steps work exactly as they did for tumbling windows.
Implementation
-
Start with the Tumbling Window Pipeline: Copy
tumbling-windows.yamlto a new file namedsliding-windows.yaml.cp tumbling-windows.yaml sliding-windows.yaml -
Modify the Processors: Open
sliding-windows.yamland replace the entireprocessorsarray with the new logic below.Replace the 'processors' array in sliding-windows.yamlprocessors:
# 1. GENERATE an array of group keys for each message
- mapping: |
let parsed_time = this.timestamp.parse_timestamp()
root = this
# This event belongs to 3 windows (30s window / 10s slide = 3)
root.group_keys = [
this.sensor_id + ":" + ((parsed_time.unix() / 10).floor() * 10).ts_format_iso8601(),
this.sensor_id + ":" + (((parsed_time.unix() - 10) / 10).floor() * 10).ts_format_iso8601(),
this.sensor_id + ":" + (((parsed_time.unix() - 20) / 10).floor() * 10).ts_format_iso8601()
]
# 2. UNARCHIVE to create a copy for each key
- unarchive:
format: json_array
field: group_keys
- mapping: |
root = this
root.group_key = content()
# 3. CACHE (same as before)
- cache:
resource: window_cache
operator: add
key: ${! this.group_key }
value: ${! this }
# 4. GROUP (same as before)
- group_by:
- key: ${! this.group_key }
value: ${! this }
# 5. AGGREGATE (same as before)
- mapping: |
let first_event = this[0]
let temperatures = this.map_each(event -> event.temperature)
root = {
"sensor_id": first_event.sensor_id,
"window_start": first_event.group_key.split(":")[1],
"event_count": this.length(),
"moving_avg_temp": temperatures.mean().round(2)
}Note: You may also want to increase the
default_ttlon thewindow_cacheresource to32sto account for the longer window size. -
Deploy and Observe: Watch the logs. You will now see output messages appearing every 10 seconds (the slide interval), and the
event_countfor each will be larger, as it includes events from a 30-second window.
Verification
The output will be a stream of moving average calculations. Because the windows overlap, the change from one output to the next will be much smoother than with tumbling windows.
Example Output:
{"sensor_id":"sensor-0","window_start":"2025-11-22T21:15:00Z","event_count":150,"moving_avg_temp":24.5}
// ...10 seconds later, with 20 seconds of overlapping data...
{"sensor_id":"sensor-0","window_start":"2025-11-22T21:15:10Z","event_count":150,"moving_avg_temp":24.62}
You have now implemented a sliding window for smooth, real-time trend analysis.