Step 3: Session Windows
While tumbling and sliding windows are based on fixed time intervals, session windows are dynamic. A session window groups events together based on activity, and is "closed" only after a specific period of inactivity.
This is perfect for understanding user behavior, such as a user's journey on a website or a period of activity for a sensor.
The Goal
You will group a stream of events into "sessions" based on a 5-second inactivity timeout. If events for the same sensor arrive within 5 seconds of each other, they are part of the same session. If more than 5 seconds pass, the next event starts a new session.
The group_by Session Window
Expanso's group_by processor has built-in support for session windows, which simplifies this pattern significantly. You don't need to manage the timeout logic manually.
Implementation
-
Create the Session Pipeline: Copy the following configuration into a file named
session-windows.yaml.session-windows.yamlname: session-windows-aggregator
description: A pipeline that groups sensor data into sessions.
config:
input:
generate: # Generate a stream of mock sensor data with random gaps
interval: 'if random_int() % 10 > 7 { "6s" } else { "1s" }'
mapping: |
root = {
"sensor_id": "sensor-" + (random_int() % 2).string(),
"activity": "detected",
"timestamp": now().ts_format_iso8601()
}
pipeline:
processors:
# The group_by processor handles the session logic automatically.
# It will group all events by 'sensor_id' and wait for a 5-second
# gap in events for that sensor_id before releasing the group.
- group_by:
- session:
timeout: 5s
processors:
# This mapping runs once for each completed session
- mapping: |
let first_event = this[0]
let last_event = this[-1]
root = {
"sensor_id": first_event.sensor_id,
"session_start": first_event.timestamp,
"session_end": last_event.timestamp,
"events_in_session": this.length()
}
output:
stdout:
codec: lines -
Deploy and Observe: Watch the logs. You will see output messages appear at irregular intervals. This is because the pipeline only produces a result when a 5-second gap of inactivity is detected for a given sensor.
Verification
The output will be a stream of session summary objects. You will notice that the events_in_session count varies, and the duration between session_start and session_end is also dynamic, reflecting the natural clusters of activity in the input stream.
Example Output:
{"sensor_id":"sensor-1","session_start":"2025-11-22T22:10:05Z","session_end":"2025-11-22T22:10:08Z","events_in_session":4}
{"sensor_id":"sensor-0","session_start":"2025-11-22T22:10:04Z","session_end":"2025-11-22T22:10:10Z","events_in_session":6}
// ... a gap of more than 5 seconds occurs ...
{"sensor_id":"sensor-1","session_start":"2025-11-22T22:10:15Z","session_end":"2025-11-22T22:10:16Z","events_in_session":2}
You have now implemented session windows to analyze data based on natural activity patterns rather than fixed time intervals.