Step 1: Hash-Based Deduplication for Exact Duplicates
This step teaches the most common deduplication technique: hash-based deduplication. This pattern is perfect for catching exact duplicates, such as those caused by network retries where the same message is sent multiple times.
The Goal
You will build a pipeline that generates a unique "fingerprint" (a hash) for each message, remembers the fingerprints it has seen recently, and drops any message whose fingerprint it has already seen.
The "Hash -> Cache -> Check -> Drop" Pattern
- Hash: For each message, create a SHA-256 hash of its content. This creates a consistent, unique fingerprint.
- Cache: Use a
cacheprocessor to check if this fingerprint has been seen before. - Check: A
mappingprocessor checks the result from the cache. - Drop: If the cache found a match (meaning it's a duplicate), the message is deleted. If it's a new message, it's added to the cache for future checks and allowed to continue.
Implementation
-
Create the Deduplication Pipeline: Copy the following configuration into a file named
deduplicator.yaml.deduplicator.yamlname: hash-deduplicator
description: A pipeline that removes exact duplicate messages.
config:
# 1. Define a cache resource to remember the hashes we've seen.
cache_resources:
- label: dedup_cache
memory:
default_ttl: 60s # Remember hashes for 60 seconds
input:
http_server:
address: 0.0.0.0:8080
path: /ingest
pipeline:
processors:
# 2. HASH: Create a hash of the message content.
- mapping: |
root = this
# json_format() ensures keys are sorted, creating a consistent hash
root.dedup_hash = this.json_format().hash("sha256")
# 3. CACHE: Check if we've seen this hash before.
# The result will be available in `meta("cache")`.
- cache:
resource: dedup_cache
operator: get
key: ${! this.dedup_hash }
# 4. CHECK & DROP: Decide whether to keep or drop the message.
- mapping: |
let is_duplicate = meta("cache").exists()
root = this
if is_duplicate {
# If it's a duplicate, delete the message
root = deleted()
root.status = "dropped_duplicate" # This won't be in the final output
} else {
# If it's new, add its hash to the cache for next time
_ = cache_set("dedup_cache", this.dedup_hash, "seen")
root.status = "processed_new"
}
output:
stdout:
codec: lines -
Deploy and Test:
# --- Send the SAME message twice ---
curl -X POST http://localhost:8080/ingest \
-H "Content-Type: application/json" \
-d '{"event_id": "abc-123", "message": "hello"}'
curl -X POST http://localhost:8080/ingest \
-H "Content-Type: application/json" \
-d '{"event_id": "abc-123", "message": "hello"}' -
Verify: Check your logs or output. You will only see the message once. The first request was processed and its hash was cached. The second request generated the same hash, the cache found a match, and the message was dropped.
You have now implemented a robust deduplication pipeline for handling exact duplicates.