Step 5: Advanced Splunk Patterns
Now that you have a solid Splunk integration, let's explore advanced patterns that extend your capabilities beyond traditional Splunk deployments. These patterns show how Expanso can enhance your entire data architecture, not just reduce costs.
Pattern 1: Multi-Destination Routing
Send data to multiple systems simultaneously - Splunk for real-time analysis, S3 for long-term storage, and metrics systems for monitoring.
The Multi-Destination Pipeline
cat > ~/advanced-splunk-pipeline.yaml << 'EOF'
apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-multi-destination"
description: "Advanced pipeline with multi-destination routing and compliance features"
input:
file_watcher:
paths:
- "/var/log/expanso-demo/app.log"
- "/var/log/expanso-demo/security.log"
- "/var/log/expanso-demo/system.log"
poll_interval: "1s"
include_file_name: true
processors:
# 1. Core parsing (same as before)
- mapping: |
root.source_file = file.name
root.collection_timestamp = timestamp()
root.host = hostname()
root.sourcetype = match file.name {
this.contains("app.log") => "json_logs"
this.contains("security.log") => "cef"
this.contains("system.log") => "syslog"
_ => "unknown"
}
# 2. Enhanced parsing with compliance fields
- conditional:
condition: 'this.sourcetype == "json_logs"'
mapping: |
root = if this.type() == "object" {
this
} else {
this.parse_json().catch({"raw_message": this, "parse_error": true})
}
# Standard enrichment
root.log_severity = match this.level {
"ERROR" => 3
"WARN" => 2
"INFO" => 1
_ => 0
}
# Compliance classification
root.data_classification = match {
this.user.contains("@") || this.message.contains("login") => "pii"
this.message.contains("payment") || this.message.contains("transaction") => "financial"
this.message.contains("medical") || this.message.contains("health") => "phi"
_ => "general"
}
# Data residency requirements
root.data_residency = match this.source_ip {
this.contains("10.0.") || this.contains("192.168.") => "us"
this.contains("172.16.") => "eu"
_ => "global"
}
# 3. PII Detection and Masking
- mapping: |
# Detect and mask PII before it goes to any destination
root.original_message = this.message
# Mask email addresses
root.message = this.message.replace_all_regex("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b", "***@***.***")
# Mask credit card numbers
root.message = root.message.replace_all_regex("\\b\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}\\b", "****-****-****-****")
# Mask SSNs
root.message = root.message.replace_all_regex("\\b\\d{3}-\\d{2}-\\d{4}\\b", "***-**-****")
# Track if masking occurred
root.pii_masked = if this.message != this.original_message { true } else { false }
# 4. Metrics Extraction
- conditional:
condition: 'this.response_time_ms.type() == "number"'
mapping: |
# Extract metrics for time-series systems
root.metrics = {
"response_time": this.response_time_ms,
"timestamp": timestamp().format_timestamp_unix(),
"endpoint": this.endpoint.string().catch("unknown"),
"status_code": this.status_code.number().catch(0),
"method": this.method.string().catch("unknown"),
"host": this.host
}
# 5. Filtering (preserve from Step 3)
- conditional:
condition: 'this.level == "DEBUG"'
mapping: 'root = deleted()'
# 6. Create destination-specific formats
- mapping: |
# Splunk HEC format
root.splunk_event = {
"time": timestamp().format_timestamp_unix(),
"host": this.host,
"source": "expanso-edge:" + this.host + ":" + this.source_file,
"sourcetype": "expanso:" + this.sourcetype,
"index": match {
this.data_classification == "pii" || this.data_classification == "financial" => "compliance"
this.sourcetype == "cef" => "security"
this.log_severity >= 3 => "main"
_ => "main"
},
"event": this.without("splunk_event", "s3_event", "metrics")
}
# S3 archival format (structured for analytics)
root.s3_event = {
"timestamp": timestamp().format_timestamp_unix(),
"date_partition": timestamp().format("2006/01/02"),
"hour_partition": timestamp().format("15"),
"data_classification": this.data_classification,
"data_residency": this.data_residency,
"event_data": this.without("splunk_event", "s3_event", "metrics"),
"retention_policy": match this.data_classification {
"pii" => "7_years"
"financial" => "10_years"
"phi" => "6_years"
_ => "3_years"
}
}
# Multi-destination output with conditional routing
output:
# Primary: Splunk for real-time analysis
- condition: 'this.splunk_event.index != ""'
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
retry_policy:
max_retries: 3
batch:
count: 100
period: "10s"
postmap: 'root = this.splunk_event'
# Secondary: S3 for long-term storage and compliance
- condition: 'this.data_classification != ""'
s3:
bucket: "company-logs-${this.data_residency}"
path: "year=${s3_event.date_partition}/hour=${s3_event.hour_partition}/expanso-${timestamp().format_timestamp_unix('nano')}.json"
region: "${AWS_REGION}"
credentials:
profile: "default"
storage_class: "INTELLIGENT_TIERING"
server_side_encryption: "AES256"
postmap: 'root = this.s3_event'
# Tertiary: Metrics to Prometheus/InfluxDB
- condition: 'this.metrics.response_time.type() == "number"'
prometheus:
push_url: "http://prometheus-pushgateway:9091/metrics/job/expanso-edge/instance/${hostname()}"
push_interval: "30s"
postmap: |
root = [
{
"name": "http_request_duration_seconds",
"type": "histogram",
"help": "HTTP request duration in seconds",
"value": this.metrics.response_time / 1000,
"labels": {
"endpoint": this.metrics.endpoint,
"method": this.metrics.method,
"status_code": this.metrics.status_code.string(),
"host": this.metrics.host
}
}
]
# Quaternary: Critical alerts to webhook/PagerDuty
- condition: 'this.log_severity >= 3 || this.data_classification == "financial"'
http_client:
url: "${ALERTING_WEBHOOK_URL}"
method: "POST"
headers:
Content-Type: "application/json"
Authorization: "Bearer ${ALERT_TOKEN}"
postmap: |
root = {
"severity": match this.log_severity {
3 => "critical"
2 => "warning"
_ => "info"
},
"summary": this.message.string().catch("Unknown error"),
"source": this.host + ":" + this.source_file,
"timestamp": timestamp(),
"classification": this.data_classification,
"masked": this.pii_masked
}
EOF
Pattern 2: Adaptive Processing Based on Conditions
Automatically adjust processing based on volume, time of day, or system load:
cat >> ~/advanced-splunk-pipeline.yaml << 'EOF'
# Adaptive filtering processor (add before output)
- mapping: |
# Get current hour and system load
let current_hour = timestamp().format("15").number()
let is_business_hours = current_hour >= 9 && current_hour <= 17
let is_weekend = timestamp().format("Monday").contains("Saturday") || timestamp().format("Monday").contains("Sunday")
# Adaptive filtering based on conditions
root.filter_aggressiveness = match {
is_weekend => "high" # Filter more on weekends
!is_business_hours => "medium" # Filter more outside business hours
this.data_classification == "pii" => "low" # Always preserve PII
_ => "normal"
}
# Apply adaptive filtering
if this.filter_aggressiveness == "high" && this.level == "INFO" {
let hash = this.message.hash("xxhash64") % 10
if hash != 0 {
root = deleted() # Keep only 10% of INFO logs
}
}
if this.filter_aggressiveness == "medium" && this.level == "INFO" && this.response_time_ms < 100 {
let hash = this.message.hash("xxhash64") % 5
if hash != 0 {
root = deleted() # Keep only 20% of fast INFO logs
}
}
EOF
Pattern 3: Real-Time Anomaly Detection
Detect anomalies at the edge and alert before data reaches Splunk:
processors:
# Anomaly detection processor
- mapping: |
# Track baseline metrics in memory (sliding window)
let response_time = this.response_time_ms.number().catch(0)
let endpoint = this.endpoint.string().catch("unknown")
# Calculate rolling averages (this would use external cache in production)
let avg_response_key = "avg_response_" + endpoint
let current_avg = meta(avg_response_key).number().catch(response_time)
let new_avg = (current_avg * 0.9) + (response_time * 0.1) # Exponential moving average
meta(avg_response_key) = new_avg
# Detect anomalies (>3x normal response time)
root.is_anomaly = response_time > (new_avg * 3) && response_time > 1000
# Anomaly metadata
if this.is_anomaly {
root.anomaly_details = {
"current_response": response_time,
"baseline_response": new_avg,
"deviation_factor": response_time / new_avg,
"detection_time": timestamp()
}
}
Pattern 4: Geographic Data Routing
Route data based on geographic requirements for compliance:
processors:
- mapping: |
# Determine geographic region from IP
root.geo_region = match this.source_ip {
this.contains("10.1.") => "us_west"
this.contains("10.2.") => "us_east"
this.contains("10.3.") => "eu_west"
this.contains("10.4.") => "asia_pacific"
_ => "unknown"
}
# Apply data sovereignty rules
root.allowed_destinations = match this.data_classification + "_" + this.geo_region {
"pii_eu_west" => ["eu_splunk", "eu_s3"]
"financial_us_west" => ["us_splunk", "us_s3_encrypted"]
"phi_*" => ["local_only"]
_ => ["global_splunk", "global_s3"]
}
output:
# Route based on geographic compliance
switch:
- condition: 'this.allowed_destinations.contains("eu_splunk")'
output:
http_client:
url: "https://eu-splunk.company.com:8088/services/collector/event"
- condition: 'this.allowed_destinations.contains("us_splunk")'
output:
http_client:
url: "https://us-splunk.company.com:8088/services/collector/event"
- condition: 'this.allowed_destinations.contains("local_only")'
output:
file:
path: "/secure/local/phi-data-${timestamp().format('2006-01-02')}.log"
Pattern 5: Intelligent Sampling
Implement sophisticated sampling that preserves statistical significance:
processors:
- mapping: |
# Stratified sampling - preserve distribution across different dimensions
let sample_key = this.level + "_" + this.endpoint.string().catch("unknown")
let sample_count = meta("sample_" + sample_key).number().catch(0) + 1
meta("sample_" + sample_key) = sample_count
# Sample rates based on event type and frequency
root.sample_rate = match {
this.level == "ERROR" => 1.0 # Keep all errors
this.level == "WARN" => 0.8 # Keep 80% of warnings
this.level == "INFO" && this.endpoint.contains("/health") => 0.1 # Keep 10% of health checks
this.level == "INFO" => 0.5 # Keep 50% of other INFO
this.level == "DEBUG" => 0.0 # Drop all DEBUG
_ => 0.3
}
# Ensure we keep at least one sample per hour for each category
let hourly_sample_key = sample_key + "_" + timestamp().format("2006-01-02-15")
let hourly_sample_count = meta("hourly_" + hourly_sample_key).number().catch(0)
if hourly_sample_count == 0 {
# Force keep first sample of each type per hour
root.force_keep = true
meta("hourly_" + hourly_sample_key) = 1
} else {
root.force_keep = false
}
# Apply sampling decision
let random_value = this.message.hash("xxhash64") % 100 / 100.0
if !this.force_keep && random_value > this.sample_rate {
root = deleted()
}
Pattern 6: Performance Monitoring and Auto-Scaling
Monitor pipeline performance and adapt automatically:
processors:
- mapping: |
# Track pipeline performance metrics
meta("events_processed") = meta("events_processed").number().catch(0) + 1
meta("bytes_processed") = meta("bytes_processed").number().catch(0) + this.string().length()
let current_time = timestamp().format_timestamp_unix()
let last_reset = meta("last_perf_reset").number().catch(current_time)
# Reset counters every minute for rate calculation
if current_time - last_reset > 60 {
let events_per_minute = meta("events_processed")
let bytes_per_minute = meta("bytes_processed")
# Auto-adjust batch sizes based on throughput
root.optimal_batch_size = match {
events_per_minute > 10000 => 200 # High volume
events_per_minute > 1000 => 100 # Medium volume
_ => 50 # Low volume
}
# Auto-adjust filtering aggressiveness based on load
root.auto_filter_level = match {
events_per_minute > 5000 => "aggressive"
events_per_minute > 1000 => "normal"
_ => "permissive"
}
# Reset counters
meta("events_processed") = 0
meta("bytes_processed") = 0
meta("last_perf_reset") = current_time
}
Deploy and Test Advanced Patterns
1. Deploy the Advanced Pipeline
# Set additional environment variables for advanced features
export AWS_REGION="us-west-2"
export ALERTING_WEBHOOK_URL="https://hooks.slack.com/services/your/webhook/url"
export ALERT_TOKEN="your-alerting-token"
# Deploy advanced pipeline
expanso pipeline deploy ~/advanced-splunk-pipeline.yaml
# Monitor all destinations
expanso pipeline logs splunk-multi-destination -f
2. Test Multi-Destination Routing
# Generate PII event (should go to compliance index + S3)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"User login successful","user":"[email protected]","source_ip":"10.1.100.50","endpoint":"/auth/login","response_time_ms":150}' >> $TEST_DATA_DIR/app.log
# Generate financial transaction (should trigger alert)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"ERROR","message":"Payment processing failed for transaction tx_999999","user":"[email protected]","amount":50000,"endpoint":"/api/payments","response_time_ms":3000}' >> $TEST_DATA_DIR/app.log
# Generate anomalous response time
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"Database query completed","endpoint":"/api/users","response_time_ms":15000,"query":"SELECT * FROM users"}' >> $TEST_DATA_DIR/app.log
3. Verify Data in Multiple Destinations
Splunk:
# Check compliance data
index=compliance pii_masked=true earliest=-15m
# Check anomaly detection
index=main is_anomaly=true earliest=-15m
| stats avg(anomaly_details.deviation_factor) by endpoint
S3 (via AWS CLI):
# Check S3 storage
aws s3 ls s3://company-logs-us/year=$(date +%Y)/month=$(date +%m)/day=$(date +%d)/
# Verify retention policies are applied
aws s3api head-object --bucket company-logs-us --key "path/to/your/file.json"
Metrics (Prometheus):
# Check metrics endpoint
curl http://prometheus-pushgateway:9091/metrics | grep http_request_duration
Advanced Pattern Benefits
| Pattern | Traditional Splunk Limitation | Expanso Advantage | Business Impact |
|---|---|---|---|
| Multi-Destination | Single destination only | Simultaneous routing to multiple systems | Reduced vendor lock-in, compliance automation |
| Adaptive Processing | Static rules only | Dynamic adjustment based on conditions | Optimal resource utilization |
| Real-Time Anomalies | Search-time detection | Edge detection with immediate alerts | Faster incident response |
| Geographic Routing | Manual data management | Automatic compliance with sovereignty laws | Reduced legal risk |
| Intelligent Sampling | Simple random sampling | Stratified sampling preserving distributions | Better analytics accuracy |
| Auto-Scaling | Manual tuning | Automatic performance optimization | Reduced operational overhead |
Monitoring Advanced Features
1. Pipeline Health Dashboard
# View comprehensive metrics
expanso pipeline metrics splunk-multi-destination --format prometheus
# Check destination health
expanso pipeline logs splunk-multi-destination --filter "destination_status"
2. Compliance Reporting
# Generate compliance report
expanso pipeline logs splunk-multi-destination --filter "data_classification" --format json \
| jq -r '.[] | select(.pii_masked == true) | [.timestamp, .data_classification, .data_residency] | @csv'
3. Cost Analysis
# Splunk search for cost analysis
index=main sourcetype="expanso:*" earliest=-24h
| stats
count as events_sent,
sum(eval(len(_raw))) as bytes_sent,
dc(host) as edge_nodes
| eval
gb_sent = round(bytes_sent / 1024 / 1024 / 1024, 2),
daily_cost = gb_sent * 200,
estimated_original_volume = gb_sent / 0.3,
savings = (estimated_original_volume - gb_sent) * 200
| table events_sent, gb_sent, daily_cost, savings
What's Next?
Excellent! You've now implemented advanced patterns that go far beyond traditional Splunk capabilities. Next, we'll put together a complete production-ready pipeline with all best practices.
→ Next Step: Complete Production Pipeline
Key Takeaway: These advanced patterns show how Expanso doesn't just replace Heavy Forwarders - it enables entirely new capabilities that make your data architecture more intelligent, compliant, and cost-effective while maintaining full compatibility with your existing Splunk investment.