Skip to main content

Step 1: Hash-Based Deduplication for Exact Duplicates

This step teaches the most common deduplication technique: hash-based deduplication. This pattern is perfect for catching exact duplicates, such as those caused by network retries where the same message is sent multiple times.

The Goal

You will build a pipeline that generates a unique "fingerprint" (a hash) for each message, remembers the fingerprints it has seen recently, and drops any message whose fingerprint it has already seen.

The "Hash -> Cache -> Check -> Drop" Pattern

  1. Hash: For each message, create a SHA-256 hash of its content. This creates a consistent, unique fingerprint.
  2. Cache: Use a cache processor to check if this fingerprint has been seen before.
  3. Check: A mapping processor checks the result from the cache.
  4. Drop: If the cache found a match (meaning it's a duplicate), the message is deleted. If it's a new message, it's added to the cache for future checks and allowed to continue.

Implementation

  1. Create the Deduplication Pipeline: Copy the following configuration into a file named deduplicator.yaml.

    deduplicator.yaml
    name: hash-deduplicator
    description: A pipeline that removes exact duplicate messages.

    config:
    # 1. Define a cache resource to remember the hashes we've seen.
    cache_resources:
    - label: dedup_cache
    memory:
    default_ttl: 60s # Remember hashes for 60 seconds

    input:
    http_server:
    address: 0.0.0.0:8080
    path: /ingest

    pipeline:
    processors:
    # 2. HASH: Create a hash of the message content.
    - mapping: |
    root = this
    # json_format() ensures keys are sorted, creating a consistent hash
    root.dedup_hash = this.json_format().hash("sha256")

    # 3. CACHE: Check if we've seen this hash before.
    # The result will be available in `meta("cache")`.
    - cache:
    resource: dedup_cache
    operator: get
    key: ${! this.dedup_hash }

    # 4. CHECK & DROP: Decide whether to keep or drop the message.
    - mapping: |
    let is_duplicate = meta("cache").exists()
    root = this

    if is_duplicate {
    # If it's a duplicate, delete the message
    root = deleted()
    root.status = "dropped_duplicate" # This won't be in the final output
    } else {
    # If it's new, add its hash to the cache for next time
    _ = cache_set("dedup_cache", this.dedup_hash, "seen")
    root.status = "processed_new"
    }

    output:
    stdout:
    codec: lines
  2. Deploy and Test:

    # --- Send the SAME message twice ---
    curl -X POST http://localhost:8080/ingest \
    -H "Content-Type: application/json" \
    -d '{"event_id": "abc-123", "message": "hello"}'

    curl -X POST http://localhost:8080/ingest \
    -H "Content-Type: application/json" \
    -d '{"event_id": "abc-123", "message": "hello"}'
  3. Verify: Check your logs or output. You will only see the message once. The first request was processed and its hash was cached. The second request generated the same hash, the cache found a match, and the message was dropped.

You have now implemented a robust deduplication pipeline for handling exact duplicates.