Complete Pipeline
This pipeline combines all deduplication techniques from this tutorial:
- Hash-based deduplication - Use content hash to detect exact duplicates
- Fingerprint deduplication - Detect near-duplicates with key field matching
- ID-based deduplication - Use event_id for idempotent processing
- Time-windowed dedup - Only check for duplicates within time window
Full Configuration
deduplicate-events.yaml
name: deduplicate-events-complete
type: pipeline
description: Complete Event Deduplication Pipeline - Detects and filters duplicate events using cache-based deduplication.
namespace: production
labels:
category: data-transformation
pattern: deduplication
cache_resources:
- label: dedup_cache
memory:
default_ttl: 1h
cap: 100000
config:
input:
http_server:
address: "0.0.0.0:8080"
path: /events
allowed_verbs: ["POST"]
pipeline:
processors:
# Step 1: Parse and validate
- json_documents:
parts: []
- mapping: |
root = this
root.received_at = now()
# Validate required fields
root = if !this.event_id.exists() {
throw("Missing required field: event_id")
} else {
this
}
# Step 2: Generate deduplication key
- mapping: |
root = this
# Create composite key for deduplication
# Options: event_id only, or event_id + type + timestamp window
root.dedup_key = if this.dedup_strategy.or("event_id") == "composite" {
# Composite: event_type + source + 1-minute window
let window = (this.timestamp.or(now()).ts_unix() / 60).floor()
this.event_type.or("unknown") + ":" + this.source.or("unknown") + ":" + window.string()
} else {
# Simple: just event_id
this.event_id
}
meta dedup_key = root.dedup_key
# Step 3: Check cache for existing entry
- cache:
resource: dedup_cache
operator: get
key: '${!meta("dedup_key")}'
- mapping: |
root = this
root.is_duplicate = meta("cache_result").exists() && meta("cache_result") != ""
meta is_duplicate = root.is_duplicate
# Step 4: Store new events in cache
- switch:
- check: '!meta("is_duplicate")'
processors:
- cache:
resource: dedup_cache
operator: set
key: '${!meta("dedup_key")}'
value: '${!json("received_at")}'
# Step 5: Add dedup metadata
- mapping: |
root = this
root.dedup_result = {
"is_duplicate": this.is_duplicate,
"dedup_key": meta("dedup_key"),
"checked_at": now()
}
output:
switch:
# New events → downstream processing
- check: '!meta("is_duplicate")'
output:
kafka:
addresses: ["${KAFKA_BROKERS:localhost:9092}"]
topic: unique-events
batching:
count: 100
period: 5s
# Duplicates → metrics/logging only
- output:
file:
path: "/var/expanso/duplicates/${!timestamp_unix_date('2006-01-02')}/dupes.jsonl"
codec: lines
batching:
count: 500
period: 60s
logger:
level: INFO
format: json
metrics:
prometheus:
path: /metrics
Quick Test
# Send an event
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event_id": "evt_001", "data": "hello"}'
# Send the same event again (will be deduplicated)
curl -X POST http://localhost:8080/events \
-H "Content-Type: application/json" \
-d '{"event_id": "evt_001", "data": "hello"}'
# Only 1 event appears in output
Deploy
# Deploy to Expanso orchestrator
expanso-cli job deploy deduplicate-events.yaml
# Or run locally with expanso-edge
expanso-edge run --config deduplicate-events.yaml
Download
Download deduplicate-events.yaml
What's Next?
- Troubleshooting - Common issues and solutions
- Normalize Timestamps - Standardize time formats