Skip to main content

Step 2: Sliding Windows

While tumbling windows are great for discrete summaries, sliding windows are used for calculating smooth, moving averages. The key difference is that in a sliding window, a single event can belong to multiple, overlapping windows.

The Goal

You will modify your pipeline to calculate a 30-second moving average, updated every 10 seconds.

  • The Window Size is 30 seconds.
  • The Slide Interval is 10 seconds.

This means an event that occurs at 10:00:15 will be included in three different windows:

  1. The 10:00:00 - 10:00:30 window.
  2. The 10:00:10 - 10:00:40 window.
  3. The 10:00:20 - 10:00:50 window.

The "Generate Keys -> Unarchive -> Aggregate" Pattern

The implementation is very similar to tumbling windows, with two key differences:

  1. Generate Keys: Instead of one group_key, you will generate an array of keys for each message, one for each overlapping window it belongs to.
  2. Unarchive: You will use the unarchive processor to create a copy of the message for each key in the array.

From there, the cache, group_by, and aggregate steps work exactly as they did for tumbling windows.

Implementation

  1. Start with the Tumbling Window Pipeline: Copy tumbling-windows.yaml to a new file named sliding-windows.yaml.

    cp tumbling-windows.yaml sliding-windows.yaml
  2. Modify the Processors: Open sliding-windows.yaml and replace the entire processors array with the new logic below.

    Replace the 'processors' array in sliding-windows.yaml
    processors:
    # 1. GENERATE an array of group keys for each message
    - mapping: |
    let parsed_time = this.timestamp.parse_timestamp()
    root = this

    # This event belongs to 3 windows (30s window / 10s slide = 3)
    root.group_keys = [
    this.sensor_id + ":" + ((parsed_time.unix() / 10).floor() * 10).ts_format_iso8601(),
    this.sensor_id + ":" + (((parsed_time.unix() - 10) / 10).floor() * 10).ts_format_iso8601(),
    this.sensor_id + ":" + (((parsed_time.unix() - 20) / 10).floor() * 10).ts_format_iso8601()
    ]

    # 2. UNARCHIVE to create a copy for each key
    - unarchive:
    format: json_array
    field: group_keys
    - mapping: |
    root = this
    root.group_key = content()

    # 3. CACHE (same as before)
    - cache:
    resource: window_cache
    operator: add
    key: ${! this.group_key }
    value: ${! this }

    # 4. GROUP (same as before)
    - group_by:
    - key: ${! this.group_key }
    value: ${! this }

    # 5. AGGREGATE (same as before)
    - mapping: |
    let first_event = this[0]
    let temperatures = this.map_each(event -> event.temperature)
    root = {
    "sensor_id": first_event.sensor_id,
    "window_start": first_event.group_key.split(":")[1],
    "event_count": this.length(),
    "moving_avg_temp": temperatures.mean().round(2)
    }

    Note: You may also want to increase the default_ttl on the window_cache resource to 32s to account for the longer window size.

  3. Deploy and Observe: Watch the logs. You will now see output messages appearing every 10 seconds (the slide interval), and the event_count for each will be larger, as it includes events from a 30-second window.

Verification

The output will be a stream of moving average calculations. Because the windows overlap, the change from one output to the next will be much smoother than with tumbling windows.

Example Output:

{"sensor_id":"sensor-0","window_start":"2025-11-22T21:15:00Z","event_count":150,"moving_avg_temp":24.5}
// ...10 seconds later, with 20 seconds of overlapping data...
{"sensor_id":"sensor-0","window_start":"2025-11-22T21:15:10Z","event_count":150,"moving_avg_temp":24.62}

You have now implemented a sliding window for smooth, real-time trend analysis.