Skip to main content

Step 3: Session Windows

While tumbling and sliding windows are based on fixed time intervals, session windows are dynamic. A session window groups events together based on activity, and is "closed" only after a specific period of inactivity.

This is perfect for understanding user behavior, such as a user's journey on a website or a period of activity for a sensor.

The Goal

You will group a stream of events into "sessions" based on a 5-second inactivity timeout. If events for the same sensor arrive within 5 seconds of each other, they are part of the same session. If more than 5 seconds pass, the next event starts a new session.

The group_by Session Window

Expanso's group_by processor has built-in support for session windows, which simplifies this pattern significantly. You don't need to manage the timeout logic manually.

Implementation

  1. Create the Session Pipeline: Copy the following configuration into a file named session-windows.yaml.

    session-windows.yaml
    name: session-windows-aggregator
    description: A pipeline that groups sensor data into sessions.

    config:
    input:
    generate: # Generate a stream of mock sensor data with random gaps
    interval: 'if random_int() % 10 > 7 { "6s" } else { "1s" }'
    mapping: |
    root = {
    "sensor_id": "sensor-" + (random_int() % 2).string(),
    "activity": "detected",
    "timestamp": now().ts_format_iso8601()
    }

    pipeline:
    processors:
    # The group_by processor handles the session logic automatically.
    # It will group all events by 'sensor_id' and wait for a 5-second
    # gap in events for that sensor_id before releasing the group.
    - group_by:
    - session:
    timeout: 5s
    processors:
    # This mapping runs once for each completed session
    - mapping: |
    let first_event = this[0]
    let last_event = this[-1]
    root = {
    "sensor_id": first_event.sensor_id,
    "session_start": first_event.timestamp,
    "session_end": last_event.timestamp,
    "events_in_session": this.length()
    }

    output:
    stdout:
    codec: lines
  2. Deploy and Observe: Watch the logs. You will see output messages appear at irregular intervals. This is because the pipeline only produces a result when a 5-second gap of inactivity is detected for a given sensor.

Verification

The output will be a stream of session summary objects. You will notice that the events_in_session count varies, and the duration between session_start and session_end is also dynamic, reflecting the natural clusters of activity in the input stream.

Example Output:

{"sensor_id":"sensor-1","session_start":"2025-11-22T22:10:05Z","session_end":"2025-11-22T22:10:08Z","events_in_session":4}
{"sensor_id":"sensor-0","session_start":"2025-11-22T22:10:04Z","session_end":"2025-11-22T22:10:10Z","events_in_session":6}
// ... a gap of more than 5 seconds occurs ...
{"sensor_id":"sensor-1","session_start":"2025-11-22T22:10:15Z","session_end":"2025-11-22T22:10:16Z","events_in_session":2}

You have now implemented session windows to analyze data based on natural activity patterns rather than fixed time intervals.