Skip to main content

Step 3: Filter Before Indexing - The Game Changer

This is where Expanso delivers what Splunk fundamentally cannot do: filter data before it reaches your indexers. In traditional Splunk, you pay to index everything, then filter during searches. With Expanso, you filter at the source and only send valuable data to Splunk.

The Cost Problem: You Pay to Index Noise

Typical Enterprise Log Breakdown

  • 70-80% is noise: DEBUG messages, health checks, duplicate events, verbose logging
  • 15-20% is operational: Normal INFO/WARN events worth keeping
  • 5-10% is critical: Errors, security events, business-relevant data

The Math That Will Shock Your CFO

Before Expanso:

  • 1 TB/day raw ingestion
  • Splunk Cloud pricing: ~$200/GB indexed
  • Daily cost: 1,000 GB × $200 = $200,000/day
  • Annual cost: $73M/year

After Expanso Edge Filtering:

  • 1 TB/day collected at edge
  • 700 GB/day filtered out (noise)
  • 300 GB/day sent to Splunk (valuable data)
  • Daily cost: 300 GB × $200 = $60,000/day
  • Annual cost: $22M/year
  • Annual savings: $51M 💰

Even for smaller deployments:

  • 10 GB/day → 3 GB/day = $511K/year savings
  • 100 GB/day → 30 GB/day = $5.1M/year savings

Splunk's Limitation: Filter After Indexing

In Splunk, you can only filter during search:

# This runs AFTER you've already paid to index everything
index=main sourcetype=json_logs level!=DEBUG
| where NOT match(message, "health.*check")
| dedup user, message

The problem: You've already indexed (and paid for) the DEBUG logs, health checks, and duplicates!

Expanso's Solution: Filter At The Source

With Expanso, filtering happens before data leaves your edge nodes:

Enhanced Pipeline with Smart Filtering

cat > ~/splunk-edge-pipeline.yaml << 'EOF'
apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-edge-filtered"
description: "Filter noise before it reaches Splunk - save 70% on indexing costs"

input:
file_watcher:
paths:
- "/var/log/expanso-demo/app.log"
- "/var/log/expanso-demo/security.log"
- "/var/log/expanso-demo/system.log"
poll_interval: "1s"
include_file_name: true

processors:
# 1. Parse data (from Step 2)
- mapping: |
root.source_file = file.name
root.collection_timestamp = timestamp()
root.host = hostname()

root.sourcetype = match file.name {
this.contains("app.log") => "json_logs"
this.contains("security.log") => "cef"
this.contains("system.log") => "syslog"
_ => "unknown"
}

# 2. JSON parsing and enrichment
- conditional:
condition: 'this.sourcetype == "json_logs"'
mapping: |
root = if this.type() == "object" {
this
} else {
this.parse_json().catch({"raw_message": this, "parse_error": true})
}

root.log_severity = match this.level {
"ERROR" => 3
"WARN" => 2
"INFO" => 1
_ => 0
}

# Flag events for filtering decisions
root.is_debug = this.level == "DEBUG"
root.is_health_check = this.message.contains("health check") || this.message.contains("Health check")
root.is_duplicate = false # Will be determined by dedup processor

# 3. CEF parsing
- conditional:
condition: 'this.sourcetype == "cef"'
mapping: |
root.cef = this.string().parse_regex("CEF:(?P<version>\\d+)\\|(?P<vendor>[^|]*)\\|(?P<product>[^|]*)\\|(?P<device_version>[^|]*)\\|(?P<signature_id>[^|]*)\\|(?P<name>[^|]*)\\|(?P<severity>[^|]*)\\|(?P<extensions>.*)")

root.vendor = this.cef.vendor
root.event_name = this.cef.name
root.severity = this.cef.severity
root.risk_score = match this.severity {
"Critical" => 10
"High" => 8
"Medium" => 5
"Low" => 2
_ => 1
}

# 4. SMART FILTERING - The Game Changer!

# 4a. Drop DEBUG logs (SPL equivalent: | where level!="DEBUG")
- conditional:
condition: 'this.level == "DEBUG"'
mapping: 'root = deleted()' # Completely remove from pipeline

# 4b. Sample health checks (keep 1 in 10, drop the rest)
- conditional:
condition: 'this.is_health_check == true'
mapping: |
# Use hash of content to ensure consistent sampling
let hash = (this.host + this.timestamp).hash("xxhash64") % 10
if hash != 0 {
root = deleted() # Drop 9 out of 10 health checks
} else {
root.sampled_health_check = true # Keep this one, mark it as sampled
}

# 4c. Deduplicate events (keep first occurrence in 5-minute window)
- dedup:
cache_size: 10000
drop_on: 'this.level + this.message + this.host'
dedupe_after: "5m"

# 4d. Filter out test/synthetic events
- conditional:
condition: 'this.user.contains("test") || this.user.contains("synthetic") || this.source_ip.contains("127.0.0.1")'
mapping: 'root = deleted()'

# 4e. Rate limiting for verbose sources
- throttle:
key: 'this.host + this.process_name'
limit: 100 # Max 100 events per minute per host+process
interval: "1m"

# 5. Add filtering metadata for analysis
- mapping: |
root.filtered_by_expanso = true
root.processing_timestamp = timestamp()

# Calculate data reduction
root.original_size_bytes = this.string().length()

# Tag event priority for further routing
root.priority = match {
this.sourcetype == "cef" && this.risk_score >= 8 => "critical"
this.level == "ERROR" => "high"
this.level == "WARN" => "medium"
this.sampled_health_check == true => "low"
_ => "normal"
}

# Still using stdout for now - we'll add Splunk HEC in Step 4
output:
stdout:
format: "json"

# Add a metrics output to track filtering efficiency
- metrics:
prometheus:
listen_addr: ":9090"
path: "/metrics"
EOF

Add a Volume Tracking Processor

Let's add another pipeline stage that tracks exactly how much data reduction we're achieving:

cat >> ~/splunk-edge-pipeline.yaml << 'EOF'

# Volume tracking processor (add before final output)
- mapping: |
# Track volumes for cost analysis
meta events_processed = (meta("events_processed").number().catch(0)) + 1
meta bytes_before_filtering = (meta("bytes_before_filtering").number().catch(0)) + this.original_size_bytes
meta bytes_after_filtering = (meta("bytes_after_filtering").number().catch(0)) + this.string().length()

# Calculate reduction percentage
let reduction_pct = if meta("bytes_before_filtering") > 0 {
((meta("bytes_before_filtering") - meta("bytes_after_filtering")).number() / meta("bytes_before_filtering").number()) * 100
} else { 0 }

root.volume_stats = {
"events_processed": meta("events_processed"),
"bytes_before": meta("bytes_before_filtering"),
"bytes_after": meta("bytes_after_filtering"),
"reduction_percentage": reduction_pct,
"estimated_daily_savings_usd": (reduction_pct / 100) * 200 * (meta("bytes_before_filtering") / 1024 / 1024 / 1024) * 365
}
EOF

Deploy and Test the Filtering

1. Deploy Enhanced Pipeline

expanso pipeline deploy ~/splunk-edge-pipeline.yaml

# Monitor the filtered output
expanso pipeline logs splunk-edge-filtered -f

2. Generate Mixed Test Data

Create a mix of valuable and noisy data to see the filtering in action:

# Add DEBUG noise (should be filtered out)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"DEBUG","message":"Cache lookup for user session","user":"john.doe","cache_key":"session_abc123"}' >> $TEST_DATA_DIR/app.log

echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"DEBUG","message":"SQL query trace: SELECT * FROM sessions","execution_time_ms":5}' >> $TEST_DATA_DIR/app.log

# Add health checks (should be sampled down)
for i in {1..10}; do
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"Health check passed","service":"user-service","status":"healthy"}' >> $TEST_DATA_DIR/app.log
done

# Add valuable ERROR events (should be kept)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"ERROR","message":"Payment processing failed","user":"[email protected]","transaction_id":"tx_12345","amount":99.99}' >> $TEST_DATA_DIR/app.log

# Add critical security event (should be kept)
echo 'CEF:0|Company|WebApp|1.0|800|Brute Force Attack|Critical|src=203.0.113.100 suser=admin act=login outcome=failure attempts=50 dpt=443' >> $TEST_DATA_DIR/security.log

# Add test user events (should be filtered out)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"User login","user":"test.user","source_ip":"127.0.0.1"}' >> $TEST_DATA_DIR/app.log

3. Monitor Filtering Results

You should see:

  • 0 DEBUG events in output (all filtered)
  • 1 health check out of 10 (90% reduction)
  • ERROR events preserved (valuable data kept)
  • Security events preserved (critical data kept)
  • Test user events filtered (synthetic data removed)

4. Check Volume Reduction Metrics

# View filtering metrics
curl http://localhost:9090/metrics | grep expanso

# Check pipeline statistics
expanso pipeline stats splunk-edge-filtered

Filter Types and SPL Equivalents

Filter TypeSPL EquivalentExpanso Edge FilteringCost Impact
Level Filteringlevel!="DEBUG"root = deleted() if DEBUG40-50% reduction
Health Check SamplingNOT (message="health*")Keep 1 in 10 via hash20-30% reduction
Deduplicationdedup user,messageBuilt-in dedup processor10-15% reduction
Test Data Removaluser!="test*"Conditional deletion5-10% reduction
Rate LimitingManual time windowsAutomatic throttling10-20% reduction

Total Typical Reduction: 70-85%

Advanced Filtering Patterns

1. Time-Based Filtering

# Only send business-hours data for non-critical events
- conditional:
condition: 'this.priority != "critical" && (timestamp().format("15").number() < 9 || timestamp().format("15").number() > 17)'
mapping: 'root = deleted()'

2. Adaptive Thresholds

# Increase filtering aggressiveness during high-volume periods
- mapping: |
let current_hour_events = meta("hour_" + timestamp().format("15")).number().catch(0) + 1
meta("hour_" + timestamp().format("15")) = current_hour_events

# Filter more aggressively if >1000 events this hour
if current_hour_events > 1000 && this.level == "INFO" {
root = deleted()
}

3. Content-Based Intelligence

# Filter known-good patterns
- conditional:
condition: 'this.message.contains("successfully") && this.level == "INFO" && this.response_time_ms < 100'
mapping: |
let hash = this.message.hash("xxhash64") % 20
if hash != 0 {
root = deleted() # Keep only 5% of successful fast operations
}

4. Compliance-Aware Filtering

# Never filter PII/compliance-related events
- conditional:
condition: 'this.message.contains("login") || this.message.contains("payment") || this.message.contains("access")'
mapping: |
root.compliance_protected = true
root.filter_exempt = true

Cost Impact Analysis

Let's calculate the real savings for different deployment sizes:

Small Deployment (10 GB/day)

  • Before: 10 GB × $200 × 365 = $730K/year
  • After (70% reduction): 3 GB × $200 × 365 = $219K/year
  • Annual Savings: $511K

Medium Deployment (100 GB/day)

  • Before: 100 GB × $200 × 365 = $7.3M/year
  • After (70% reduction): 30 GB × $200 × 365 = $2.19M/year
  • Annual Savings: $5.11M

Large Deployment (1 TB/day)

  • Before: 1,000 GB × $200 × 365 = $73M/year
  • After (70% reduction): 300 GB × $200 × 365 = $21.9M/year
  • Annual Savings: $51.1M

ROI Calculation

  • Expanso Edge cost: ~$5K/node/year
  • Break-even: 1 edge node saves $500K+/year
  • ROI: 10,000%+ for typical deployments

Monitoring Filter Effectiveness

1. Check Reduction Metrics

# View real-time filtering stats
expanso pipeline logs splunk-edge-filtered --filter "volume_stats"

2. Track Filtered Events

# Count filtered vs. passed events
expanso pipeline metrics splunk-edge-filtered | grep -E "(events_processed|events_filtered)"

3. Validate Critical Events Preserved

# Ensure no critical events were filtered
expanso pipeline logs splunk-edge-filtered --filter "level=ERROR" --count

What's Next?

Fantastic! You're now filtering out 70%+ of noise before it reaches Splunk, which translates to massive cost savings. Next, we'll configure the output to send your filtered, valuable data directly to Splunk via HEC.

Next Step: Step 4: Route to Splunk HEC


Key Takeaway: Edge filtering is the game-changer that traditional Splunk deployments can't achieve. You're not just saving money — you're making Splunk searches faster by reducing the data volume while preserving all critical information. This is why major enterprises are adopting edge data processing!