Skip to main content

Step 1: Tumbling Windows

This step teaches the fundamental pattern of time-series aggregation: tumbling windows. A tumbling window is a fixed-size, non-overlapping time bucket. For example, you can group all events that occurred in the same minute into a single summary event.

The Goal

You will transform a continuous stream of individual sensor readings into a series of 10-second summaries, calculating the total event count and average temperature for each window.

The "Window -> Cache -> Group -> Aggregate" Pattern

  1. Window: For each message, calculate which time window it belongs to. This is done by rounding the message's timestamp down to the nearest interval (e.g., 10:23:47 becomes 10:23:40).
  2. Cache: Store each message in a temporary cache, using a key that combines the sensor ID and the time window. The cache is set to expire items after the window duration.
  3. Group: When the cache expires an item (or a batch of items), a group_by processor collects all messages that shared the same key.
  4. Aggregate: A final mapping processor takes the group of messages and calculates summary statistics.

Implementation

  1. Create the Aggregation Pipeline: Copy the following configuration into a file named tumbling-windows.yaml.

    tumbling-windows.yaml
    name: tumbling-windows-aggregator
    description: A pipeline that aggregates sensor data into 10-second tumbling windows.

    # 1. Define a cache resource to temporarily store events for each window.
    # The TTL (time-to-live) determines when the window "closes".
    config:
    cache_resources:
    - label: window_cache
    memory:
    default_ttl: 12s # 10s window + 2s grace period for late events

    input:
    generate: # Generate a stream of mock sensor data
    interval: 100ms
    mapping: |
    root = {
    "sensor_id": "sensor-" + (random_int() % 2).string(),
    "temperature": (random_int() % 10) + 20.0,
    "timestamp": now().ts_format("2006-01-02T15:04:05.999Z")
    }

    pipeline:
    processors:
    # 2. WINDOW: Calculate the 10-second window and create a group key
    - mapping: |
    let parsed_time = this.timestamp.parse_timestamp()
    let window_start_unix = (parsed_time.unix() / 10).floor() * 10
    let window_start = window_start_unix.ts_format_iso8601()
    root = this
    root.group_key = this.sensor_id + ":" + window_start

    # 3. CACHE: Store the event until the window closes
    - cache:
    resource: window_cache
    operator: add
    key: ${! this.group_key }
    value: ${! this }

    # 4. GROUP: When the cache TTL expires, group all events with the same key
    - group_by:
    - key: ${! this.group_key }
    value: ${! this }

    # 5. AGGREGATE: Calculate statistics for the group
    - mapping: |
    let first_event = this[0]
    let temperatures = this.map_each(event -> event.temperature)

    root = {
    "sensor_id": first_event.sensor_id,
    "window_start": first_event.group_key.split(":")[1],
    "event_count": this.length(),
    "avg_temperature": temperatures.mean().round(2)
    }

    output:
    stdout:
    codec: lines
  2. Deploy and Observe: After deploying, watch the logs. You will see individual events being generated, but the output will only show summary messages every 10 seconds, one for each sensor_id.

Verification

The output will be a stream of aggregated JSON objects. Every 10 seconds, you will see a summary for sensor-0 and sensor-1 representing the data from the previous window.

Example Output:

{"sensor_id":"sensor-0","window_start":"2025-11-22T20:49:10Z","event_count":50,"avg_temperature":24.4}
{"sensor_id":"sensor-1","window_start":"2025-11-22T20:49:10Z","event_count":50,"avg_temperature":24.88}
// ... 10 seconds later ...
{"sensor_id":"sensor-0","window_start":"2025-11-22T20:49:20Z","event_count":50,"avg_temperature":24.6}
{"sensor_id":"sensor-1","window_start":"2025-11-22T20:49:20Z","event_count":50,"avg_temperature":24.52}

You have successfully built a real-time aggregation pipeline using a tumbling window. This fundamental pattern is the basis for most time-series analysis.