Skip to main content

Complete Pipeline

This pipeline combines all deduplication techniques from this tutorial:

  • Hash-based deduplication - Use content hash to detect exact duplicates
  • Fingerprint deduplication - Detect near-duplicates with key field matching
  • ID-based deduplication - Use event_id for idempotent processing
  • Time-windowed dedup - Only check for duplicates within time window

Full Configuration

deduplicate-events.yaml
name: deduplicate-events-complete
type: pipeline
description: Complete Event Deduplication Pipeline - Detects and filters duplicate events using cache-based deduplication.
namespace: production
labels:
category: data-transformation
pattern: deduplication

cache_resources:
- label: dedup_cache
memory:
default_ttl: 1h
cap: 100000

config:
input:
http_server:
address: "0.0.0.0:8080"
path: /events
allowed_verbs: ["POST"]

pipeline:
processors:
# Step 1: Parse and validate
- json_documents:
parts: []

- mapping: |
root = this
root.received_at = now()

# Validate required fields
root = if !this.event_id.exists() {
throw("Missing required field: event_id")
} else {
this
}

# Step 2: Generate deduplication key
- mapping: |
root = this

# Create composite key for deduplication
# Options: event_id only, or event_id + type + timestamp window
root.dedup_key = if this.dedup_strategy.or("event_id") == "composite" {
# Composite: event_type + source + 1-minute window
let window = (this.timestamp.or(now()).ts_unix() / 60).floor()
this.event_type.or("unknown") + ":" + this.source.or("unknown") + ":" + window.string()
} else {
# Simple: just event_id
this.event_id
}

meta dedup_key = root.dedup_key

# Step 3: Check cache for existing entry
- cache:
resource: dedup_cache
operator: get
key: '${!meta("dedup_key")}'

- mapping: |
root = this
root.is_duplicate = meta("cache_result").exists() && meta("cache_result") != ""
meta is_duplicate = root.is_duplicate

# Step 4: Store new events in cache
- switch:
- check: '!meta("is_duplicate")'
processors:
- cache:
resource: dedup_cache
operator: set
key: '${!meta("dedup_key")}'
value: '${!json("received_at")}'

# Step 5: Add dedup metadata
- mapping: |
root = this
root.dedup_result = {
"is_duplicate": this.is_duplicate,
"dedup_key": meta("dedup_key"),
"checked_at": now()
}

output:
switch:
# New events → downstream processing
- check: '!meta("is_duplicate")'
output:
kafka:
addresses: ["${KAFKA_BROKERS:localhost:9092}"]
topic: unique-events
batching:
count: 100
period: 5s

# Duplicates → metrics/logging only
- output:
file:
path: "/var/expanso/duplicates/${!timestamp_unix_date('2006-01-02')}/dupes.jsonl"
codec: lines
batching:
count: 500
period: 60s

logger:
level: INFO
format: json

metrics:
prometheus:
path: /metrics

Quick Test

# Send an event
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event_id": "evt_001", "data": "hello"}'

# Send the same event again (will be deduplicated)
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event_id": "evt_001", "data": "hello"}'

# Only 1 event appears in output

Deploy

# Deploy to Expanso orchestrator
expanso-cli job deploy deduplicate-events.yaml

# Or run locally with expanso-edge
expanso-edge run --config deduplicate-events.yaml

Download

Download deduplicate-events.yaml

What's Next?