Step 1: Parse Multiple Timestamp Formats
A common data transformation challenge is handling the variety of timestamp formats that different systems produce. This step teaches the fundamental pattern for normalizing them: detect, then parse.
The Goal
You will build a pipeline that can correctly parse events with different timestamp formats, such as ISO8601 strings and Unix epoch numbers, into a standardized normalized_timestamp field.
Input 1 (ISO8601):
{"event_id": "A", "timestamp": "2025-10-20T18:23:45.123Z"}
Input 2 (Unix Seconds):
{"event_id": "B", "timestamp": 1729450425}
Desired Output (for both):
A standardized object, where normalized_timestamp is the same instant in time.
The "Detect -> Parse" Pattern
This is implemented in a single mapping processor:
- Detect: Use a
matchexpression on thetype()of thetimestampfield to detect whether it's astringor anumber. - Parse: Based on the type, call the appropriate parsing function (
.parse_timestamp()for strings,.ts_unix()for numbers).
Implementation
-
Create the Normalization Pipeline: Copy the following configuration into a file named
normalize-timestamps.yaml.normalize-timestamps.yamlname: timestamp-normalizer
description: A pipeline that parses multiple timestamp formats.
config:
input:
http_server:
address: 0.0.0.0:8080
path: /ingest
pipeline:
processors:
- mapping: |
root = this
root.original_timestamp = this.timestamp
# Detect the type and parse accordingly
root.normalized_timestamp = match this.timestamp.type() {
"string" => this.timestamp.parse_timestamp(),
"number" => this.timestamp.ts_unix(),
_ => "unknown format"
}
output:
stdout:
codec: lines -
Deploy and Test:
# --- Test 1: Send ISO8601 String ---
curl -X POST http://localhost:8080/ingest \
-H "Content-Type: application/json" \
-d '{"event_id": "A", "timestamp": "2025-10-20T18:23:45Z"}'
# --- Test 2: Send Unix Timestamp Number ---
curl -X POST http://localhost:8080/ingest \
-H "Content-Type: application/json" \
-d '{"event_id": "B", "timestamp": 1729450425}' -
Verify: Check your logs. You will see two output messages. Both will have a
normalized_timestampfield. Although the original formats were different, the normalized versions will both represent the same point in time.
You have now built a basic but robust timestamp normalization pipeline. This same "detect -> parse" pattern can be extended to handle many other formats.