Skip to main content

Complete Pipeline

This pipeline combines all timestamp normalization techniques from this tutorial:

  • Format detection - Auto-detect timestamp format (ISO8601, Unix, custom)
  • Timezone conversion - Convert all timestamps to UTC
  • Derived fields - Add hour, day_of_week, is_business_hours
  • Validation - Reject invalid or future timestamps

Full Configuration

normalize-timestamps.yaml
name: normalize-timestamps-complete
type: pipeline
description: Complete Timestamp Normalization Pipeline with format detection, UTC conversion, and metadata enrichment.
namespace: production
labels:
category: data-transformation
pattern: timestamp-normalization

config:
input:
kafka:
addresses: ["${KAFKA_BROKERS:localhost:9092}"]
topics: ["${INPUT_TOPIC:raw-events}"]
consumer_group: "timestamp-normalizer"
start_from_oldest: false

pipeline:
processors:
# Step 1: Preserve original and add metadata
- mapping: |
root = this
root.timestamp_original = this.timestamp
root.processing_metadata = {
"pipeline_version": "v1.3.0",
"processing_start": now().format_timestamp_iso8601(),
"stage": "format_parsing"
}

# Step 2: Detect timestamp format
- mapping: |
let ts = this.timestamp.string()

root = this
root.format_detected = if ts.re_match("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}") {
if ts.contains("Z") || ts.re_match("[-+]\\d{2}:?\\d{2}$") {
"iso8601_offset"
} else {
"iso8601_naive"
}
} else if ts.re_match("^\\d{10}$") {
"unix_seconds"
} else if ts.re_match("^\\d{13}$") {
"unix_milliseconds"
} else if ts.re_match("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$") {
"custom_datetime"
} else {
"unknown"
}

# Step 3: Parse timestamp
- catch:
- mapping: |
let format = this.format_detected
let ts = this.timestamp_original

root = this

root.timestamp_parsed = if format == "iso8601_offset" {
ts.parse_timestamp_iso8601()
} else if format == "iso8601_naive" {
(ts.string() + "Z").parse_timestamp_iso8601()
} else if format == "unix_seconds" {
ts.timestamp_unix()
} else if format == "unix_milliseconds" {
(ts.number() / 1000).timestamp_unix()
} else if format == "custom_datetime" {
(ts.string() + " UTC").parse_timestamp("2006-01-02 15:04:05 MST")
} else {
throw("Unsupported timestamp format: " + format)
}

root.processing_metadata.parse_success = true

- mapping: |
root = this
root.processing_metadata.parse_success = false
root.processing_metadata.parse_error = error()
root.timestamp_parsed = null

# Step 4: Convert to UTC
- mapping: |
root = this
root.processing_metadata.stage = "timezone_conversion"

root = if !this.processing_metadata.parse_success {
this
} else {
root.timestamp = this.timestamp_parsed.format_timestamp_iso8601("UTC")
root.processing_metadata.conversion_success = true
this
}

# Step 5: Enrich time metadata
- mapping: |
root = this
root.processing_metadata.stage = "metadata_enrichment"

root = if !this.processing_metadata.parse_success {
this
} else {
let ts = this.timestamp.parse_timestamp_iso8601()
let year = ts.format_timestamp("2006", "UTC").number()
let month = ts.format_timestamp("01", "UTC").number()
let day = ts.format_timestamp("02", "UTC").number()
let hour = ts.format_timestamp("15", "UTC").number()
let dow = ts.format_timestamp("1", "UTC").number()

root.time_metadata = {
"year": year,
"month": month,
"day": day,
"hour": hour,
"day_of_week": ts.format_timestamp("Monday", "UTC"),
"is_weekend": dow >= 6,
"is_business_hours": (dow >= 1 && dow <= 5) && (hour >= 9 && hour < 17),
"quarter": if month <= 3 { "Q1" }
else if month <= 6 { "Q2" }
else if month <= 9 { "Q3" }
else { "Q4" },
"hour_bucket": ts.format_timestamp("2006-01-02T15", "UTC") + ":00:00Z",
"day_bucket": ts.format_timestamp("2006-01-02", "UTC")
}

root.processing_metadata.enrichment_success = true
this
}

# Step 6: Quality scoring
- mapping: |
root = this

root.validation = {
"parse_success": this.processing_metadata.parse_success,
"timestamp_exists": this.timestamp.type() == "string",
"utc_format": this.timestamp.contains("Z")
}

let validations = this.validation.values()
let true_count = validations.map_each(v -> if v { 1 } else { 0 }).sum()
root.quality_score = true_count / validations.length()

# Step 7: Clean output
- mapping: |
root = this.without("processing_metadata").without("validation")

output:
kafka:
addresses: ["${KAFKA_BROKERS:localhost:9092}"]
topic: "${OUTPUT_TOPIC:normalized-timestamps}"
key: '${! json("event_id") }'
compression: "snappy"

logger:
level: "${LOG_LEVEL:INFO}"
format: json

metrics:
prometheus:
path: "/metrics"

Quick Test

# Send events with different timestamp formats
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event": "test1", "timestamp": "2024-01-15T10:30:00-08:00"}'

curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event": "test2", "timestamp": 1705340400}'

curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event": "test3", "timestamp": "01/15/2024 10:30 AM PST"}'

# All outputs have normalized UTC timestamps + derived fields

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy normalize-timestamps.yaml

# Or run locally with expanso-edge
expanso-edge run --config normalize-timestamps.yaml

Download

Download normalize-timestamps.yaml

What's Next?