Step 3: Filter Before Indexing - The Game Changer
This is where Expanso delivers what Splunk fundamentally cannot do: filter data before it reaches your indexers. In traditional Splunk, you pay to index everything, then filter during searches. With Expanso, you filter at the source and only send valuable data to Splunk.
The Cost Problem: You Pay to Index Noise
Typical Enterprise Log Breakdown
- 70-80% is noise: DEBUG messages, health checks, duplicate events, verbose logging
- 15-20% is operational: Normal INFO/WARN events worth keeping
- 5-10% is critical: Errors, security events, business-relevant data
The Math That Will Shock Your CFO
Before Expanso:
- 1 TB/day raw ingestion
- Splunk Cloud pricing: ~$200/GB indexed
- Daily cost: 1,000 GB × $200 = $200,000/day
- Annual cost: $73M/year
After Expanso Edge Filtering:
- 1 TB/day collected at edge
- 700 GB/day filtered out (noise)
- 300 GB/day sent to Splunk (valuable data)
- Daily cost: 300 GB × $200 = $60,000/day
- Annual cost: $22M/year
- Annual savings: $51M 💰
Even for smaller deployments:
- 10 GB/day → 3 GB/day = $511K/year savings
- 100 GB/day → 30 GB/day = $5.1M/year savings
Splunk's Limitation: Filter After Indexing
In Splunk, you can only filter during search:
# This runs AFTER you've already paid to index everything
index=main sourcetype=json_logs level!=DEBUG
| where NOT match(message, "health.*check")
| dedup user, message
The problem: You've already indexed (and paid for) the DEBUG logs, health checks, and duplicates!
Expanso's Solution: Filter At The Source
With Expanso, filtering happens before data leaves your edge nodes:
Enhanced Pipeline with Smart Filtering
cat > ~/splunk-edge-pipeline.yaml << 'EOF'
apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-edge-filtered"
description: "Filter noise before it reaches Splunk - save 70% on indexing costs"
input:
file_watcher:
paths:
- "/var/log/expanso-demo/app.log"
- "/var/log/expanso-demo/security.log"
- "/var/log/expanso-demo/system.log"
poll_interval: "1s"
include_file_name: true
processors:
# 1. Parse data (from Step 2)
- mapping: |
root.source_file = file.name
root.collection_timestamp = timestamp()
root.host = hostname()
root.sourcetype = match file.name {
this.contains("app.log") => "json_logs"
this.contains("security.log") => "cef"
this.contains("system.log") => "syslog"
_ => "unknown"
}
# 2. JSON parsing and enrichment
- conditional:
condition: 'this.sourcetype == "json_logs"'
mapping: |
root = if this.type() == "object" {
this
} else {
this.parse_json().catch({"raw_message": this, "parse_error": true})
}
root.log_severity = match this.level {
"ERROR" => 3
"WARN" => 2
"INFO" => 1
_ => 0
}
# Flag events for filtering decisions
root.is_debug = this.level == "DEBUG"
root.is_health_check = this.message.contains("health check") || this.message.contains("Health check")
root.is_duplicate = false # Will be determined by dedup processor
# 3. CEF parsing
- conditional:
condition: 'this.sourcetype == "cef"'
mapping: |
root.cef = this.string().parse_regex("CEF:(?P<version>\\d+)\\|(?P<vendor>[^|]*)\\|(?P<product>[^|]*)\\|(?P<device_version>[^|]*)\\|(?P<signature_id>[^|]*)\\|(?P<name>[^|]*)\\|(?P<severity>[^|]*)\\|(?P<extensions>.*)")
root.vendor = this.cef.vendor
root.event_name = this.cef.name
root.severity = this.cef.severity
root.risk_score = match this.severity {
"Critical" => 10
"High" => 8
"Medium" => 5
"Low" => 2
_ => 1
}
# 4. SMART FILTERING - The Game Changer!
# 4a. Drop DEBUG logs (SPL equivalent: | where level!="DEBUG")
- conditional:
condition: 'this.level == "DEBUG"'
mapping: 'root = deleted()' # Completely remove from pipeline
# 4b. Sample health checks (keep 1 in 10, drop the rest)
- conditional:
condition: 'this.is_health_check == true'
mapping: |
# Use hash of content to ensure consistent sampling
let hash = (this.host + this.timestamp).hash("xxhash64") % 10
if hash != 0 {
root = deleted() # Drop 9 out of 10 health checks
} else {
root.sampled_health_check = true # Keep this one, mark it as sampled
}
# 4c. Deduplicate events (keep first occurrence in 5-minute window)
- dedup:
cache_size: 10000
drop_on: 'this.level + this.message + this.host'
dedupe_after: "5m"
# 4d. Filter out test/synthetic events
- conditional:
condition: 'this.user.contains("test") || this.user.contains("synthetic") || this.source_ip.contains("127.0.0.1")'
mapping: 'root = deleted()'
# 4e. Rate limiting for verbose sources
- throttle:
key: 'this.host + this.process_name'
limit: 100 # Max 100 events per minute per host+process
interval: "1m"
# 5. Add filtering metadata for analysis
- mapping: |
root.filtered_by_expanso = true
root.processing_timestamp = timestamp()
# Calculate data reduction
root.original_size_bytes = this.string().length()
# Tag event priority for further routing
root.priority = match {
this.sourcetype == "cef" && this.risk_score >= 8 => "critical"
this.level == "ERROR" => "high"
this.level == "WARN" => "medium"
this.sampled_health_check == true => "low"
_ => "normal"
}
# Still using stdout for now - we'll add Splunk HEC in Step 4
output:
stdout:
format: "json"
# Add a metrics output to track filtering efficiency
- metrics:
prometheus:
listen_addr: ":9090"
path: "/metrics"
EOF
Add a Volume Tracking Processor
Let's add another pipeline stage that tracks exactly how much data reduction we're achieving:
cat >> ~/splunk-edge-pipeline.yaml << 'EOF'
# Volume tracking processor (add before final output)
- mapping: |
# Track volumes for cost analysis
meta events_processed = (meta("events_processed").number().catch(0)) + 1
meta bytes_before_filtering = (meta("bytes_before_filtering").number().catch(0)) + this.original_size_bytes
meta bytes_after_filtering = (meta("bytes_after_filtering").number().catch(0)) + this.string().length()
# Calculate reduction percentage
let reduction_pct = if meta("bytes_before_filtering") > 0 {
((meta("bytes_before_filtering") - meta("bytes_after_filtering")).number() / meta("bytes_before_filtering").number()) * 100
} else { 0 }
root.volume_stats = {
"events_processed": meta("events_processed"),
"bytes_before": meta("bytes_before_filtering"),
"bytes_after": meta("bytes_after_filtering"),
"reduction_percentage": reduction_pct,
"estimated_daily_savings_usd": (reduction_pct / 100) * 200 * (meta("bytes_before_filtering") / 1024 / 1024 / 1024) * 365
}
EOF
Deploy and Test the Filtering
1. Deploy Enhanced Pipeline
expanso pipeline deploy ~/splunk-edge-pipeline.yaml
# Monitor the filtered output
expanso pipeline logs splunk-edge-filtered -f
2. Generate Mixed Test Data
Create a mix of valuable and noisy data to see the filtering in action:
# Add DEBUG noise (should be filtered out)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"DEBUG","message":"Cache lookup for user session","user":"john.doe","cache_key":"session_abc123"}' >> $TEST_DATA_DIR/app.log
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"DEBUG","message":"SQL query trace: SELECT * FROM sessions","execution_time_ms":5}' >> $TEST_DATA_DIR/app.log
# Add health checks (should be sampled down)
for i in {1..10}; do
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"Health check passed","service":"user-service","status":"healthy"}' >> $TEST_DATA_DIR/app.log
done
# Add valuable ERROR events (should be kept)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"ERROR","message":"Payment processing failed","user":"[email protected]","transaction_id":"tx_12345","amount":99.99}' >> $TEST_DATA_DIR/app.log
# Add critical security event (should be kept)
echo 'CEF:0|Company|WebApp|1.0|800|Brute Force Attack|Critical|src=203.0.113.100 suser=admin act=login outcome=failure attempts=50 dpt=443' >> $TEST_DATA_DIR/security.log
# Add test user events (should be filtered out)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"User login","user":"test.user","source_ip":"127.0.0.1"}' >> $TEST_DATA_DIR/app.log
3. Monitor Filtering Results
You should see:
- ✅ 0 DEBUG events in output (all filtered)
- ✅ 1 health check out of 10 (90% reduction)
- ✅ ERROR events preserved (valuable data kept)
- ✅ Security events preserved (critical data kept)
- ❌ Test user events filtered (synthetic data removed)
4. Check Volume Reduction Metrics
# View filtering metrics
curl http://localhost:9090/metrics | grep expanso
# Check pipeline statistics
expanso pipeline stats splunk-edge-filtered
Filter Types and SPL Equivalents
| Filter Type | SPL Equivalent | Expanso Edge Filtering | Cost Impact |
|---|---|---|---|
| Level Filtering | level!="DEBUG" | root = deleted() if DEBUG | 40-50% reduction |
| Health Check Sampling | NOT (message="health*") | Keep 1 in 10 via hash | 20-30% reduction |
| Deduplication | dedup user,message | Built-in dedup processor | 10-15% reduction |
| Test Data Removal | user!="test*" | Conditional deletion | 5-10% reduction |
| Rate Limiting | Manual time windows | Automatic throttling | 10-20% reduction |
Total Typical Reduction: 70-85%
Advanced Filtering Patterns
1. Time-Based Filtering
# Only send business-hours data for non-critical events
- conditional:
condition: 'this.priority != "critical" && (timestamp().format("15").number() < 9 || timestamp().format("15").number() > 17)'
mapping: 'root = deleted()'
2. Adaptive Thresholds
# Increase filtering aggressiveness during high-volume periods
- mapping: |
let current_hour_events = meta("hour_" + timestamp().format("15")).number().catch(0) + 1
meta("hour_" + timestamp().format("15")) = current_hour_events
# Filter more aggressively if >1000 events this hour
if current_hour_events > 1000 && this.level == "INFO" {
root = deleted()
}
3. Content-Based Intelligence
# Filter known-good patterns
- conditional:
condition: 'this.message.contains("successfully") && this.level == "INFO" && this.response_time_ms < 100'
mapping: |
let hash = this.message.hash("xxhash64") % 20
if hash != 0 {
root = deleted() # Keep only 5% of successful fast operations
}
4. Compliance-Aware Filtering
# Never filter PII/compliance-related events
- conditional:
condition: 'this.message.contains("login") || this.message.contains("payment") || this.message.contains("access")'
mapping: |
root.compliance_protected = true
root.filter_exempt = true
Cost Impact Analysis
Let's calculate the real savings for different deployment sizes:
Small Deployment (10 GB/day)
- Before: 10 GB × $200 × 365 = $730K/year
- After (70% reduction): 3 GB × $200 × 365 = $219K/year
- Annual Savings: $511K
Medium Deployment (100 GB/day)
- Before: 100 GB × $200 × 365 = $7.3M/year
- After (70% reduction): 30 GB × $200 × 365 = $2.19M/year
- Annual Savings: $5.11M
Large Deployment (1 TB/day)
- Before: 1,000 GB × $200 × 365 = $73M/year
- After (70% reduction): 300 GB × $200 × 365 = $21.9M/year
- Annual Savings: $51.1M
ROI Calculation
- Expanso Edge cost: ~$5K/node/year
- Break-even: 1 edge node saves $500K+/year
- ROI: 10,000%+ for typical deployments
Monitoring Filter Effectiveness
1. Check Reduction Metrics
# View real-time filtering stats
expanso pipeline logs splunk-edge-filtered --filter "volume_stats"
2. Track Filtered Events
# Count filtered vs. passed events
expanso pipeline metrics splunk-edge-filtered | grep -E "(events_processed|events_filtered)"
3. Validate Critical Events Preserved
# Ensure no critical events were filtered
expanso pipeline logs splunk-edge-filtered --filter "level=ERROR" --count
What's Next?
Fantastic! You're now filtering out 70%+ of noise before it reaches Splunk, which translates to massive cost savings. Next, we'll configure the output to send your filtered, valuable data directly to Splunk via HEC.
→ Next Step: Step 4: Route to Splunk HEC
Key Takeaway: Edge filtering is the game-changer that traditional Splunk deployments can't achieve. You're not just saving money — you're making Splunk searches faster by reducing the data volume while preserving all critical information. This is why major enterprises are adopting edge data processing!