Skip to main content

Step 5: Advanced Splunk Patterns

Now that you have a solid Splunk integration, let's explore advanced patterns that extend your capabilities beyond traditional Splunk deployments. These patterns show how Expanso can enhance your entire data architecture, not just reduce costs.

Pattern 1: Multi-Destination Routing

Send data to multiple systems simultaneously - Splunk for real-time analysis, S3 for long-term storage, and metrics systems for monitoring.

The Multi-Destination Pipeline

cat > ~/advanced-splunk-pipeline.yaml << 'EOF'
apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-multi-destination"
description: "Advanced pipeline with multi-destination routing and compliance features"

input:
file_watcher:
paths:
- "/var/log/expanso-demo/app.log"
- "/var/log/expanso-demo/security.log"
- "/var/log/expanso-demo/system.log"
poll_interval: "1s"
include_file_name: true

processors:
# 1. Core parsing (same as before)
- mapping: |
root.source_file = file.name
root.collection_timestamp = timestamp()
root.host = hostname()

root.sourcetype = match file.name {
this.contains("app.log") => "json_logs"
this.contains("security.log") => "cef"
this.contains("system.log") => "syslog"
_ => "unknown"
}

# 2. Enhanced parsing with compliance fields
- conditional:
condition: 'this.sourcetype == "json_logs"'
mapping: |
root = if this.type() == "object" {
this
} else {
this.parse_json().catch({"raw_message": this, "parse_error": true})
}

# Standard enrichment
root.log_severity = match this.level {
"ERROR" => 3
"WARN" => 2
"INFO" => 1
_ => 0
}

# Compliance classification
root.data_classification = match {
this.user.contains("@") || this.message.contains("login") => "pii"
this.message.contains("payment") || this.message.contains("transaction") => "financial"
this.message.contains("medical") || this.message.contains("health") => "phi"
_ => "general"
}

# Data residency requirements
root.data_residency = match this.source_ip {
this.contains("10.0.") || this.contains("192.168.") => "us"
this.contains("172.16.") => "eu"
_ => "global"
}

# 3. PII Detection and Masking
- mapping: |
# Detect and mask PII before it goes to any destination
root.original_message = this.message

# Mask email addresses
root.message = this.message.replace_all_regex("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b", "***@***.***")

# Mask credit card numbers
root.message = root.message.replace_all_regex("\\b\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}\\b", "****-****-****-****")

# Mask SSNs
root.message = root.message.replace_all_regex("\\b\\d{3}-\\d{2}-\\d{4}\\b", "***-**-****")

# Track if masking occurred
root.pii_masked = if this.message != this.original_message { true } else { false }

# 4. Metrics Extraction
- conditional:
condition: 'this.response_time_ms.type() == "number"'
mapping: |
# Extract metrics for time-series systems
root.metrics = {
"response_time": this.response_time_ms,
"timestamp": timestamp().format_timestamp_unix(),
"endpoint": this.endpoint.string().catch("unknown"),
"status_code": this.status_code.number().catch(0),
"method": this.method.string().catch("unknown"),
"host": this.host
}

# 5. Filtering (preserve from Step 3)
- conditional:
condition: 'this.level == "DEBUG"'
mapping: 'root = deleted()'

# 6. Create destination-specific formats
- mapping: |
# Splunk HEC format
root.splunk_event = {
"time": timestamp().format_timestamp_unix(),
"host": this.host,
"source": "expanso-edge:" + this.host + ":" + this.source_file,
"sourcetype": "expanso:" + this.sourcetype,
"index": match {
this.data_classification == "pii" || this.data_classification == "financial" => "compliance"
this.sourcetype == "cef" => "security"
this.log_severity >= 3 => "main"
_ => "main"
},
"event": this.without("splunk_event", "s3_event", "metrics")
}

# S3 archival format (structured for analytics)
root.s3_event = {
"timestamp": timestamp().format_timestamp_unix(),
"date_partition": timestamp().format("2006/01/02"),
"hour_partition": timestamp().format("15"),
"data_classification": this.data_classification,
"data_residency": this.data_residency,
"event_data": this.without("splunk_event", "s3_event", "metrics"),
"retention_policy": match this.data_classification {
"pii" => "7_years"
"financial" => "10_years"
"phi" => "6_years"
_ => "3_years"
}
}

# Multi-destination output with conditional routing
output:
# Primary: Splunk for real-time analysis
- condition: 'this.splunk_event.index != ""'
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
retry_policy:
max_retries: 3
batch:
count: 100
period: "10s"
postmap: 'root = this.splunk_event'

# Secondary: S3 for long-term storage and compliance
- condition: 'this.data_classification != ""'
s3:
bucket: "company-logs-${this.data_residency}"
path: "year=${s3_event.date_partition}/hour=${s3_event.hour_partition}/expanso-${timestamp().format_timestamp_unix('nano')}.json"
region: "${AWS_REGION}"
credentials:
profile: "default"
storage_class: "INTELLIGENT_TIERING"
server_side_encryption: "AES256"
postmap: 'root = this.s3_event'

# Tertiary: Metrics to Prometheus/InfluxDB
- condition: 'this.metrics.response_time.type() == "number"'
prometheus:
push_url: "http://prometheus-pushgateway:9091/metrics/job/expanso-edge/instance/${hostname()}"
push_interval: "30s"
postmap: |
root = [
{
"name": "http_request_duration_seconds",
"type": "histogram",
"help": "HTTP request duration in seconds",
"value": this.metrics.response_time / 1000,
"labels": {
"endpoint": this.metrics.endpoint,
"method": this.metrics.method,
"status_code": this.metrics.status_code.string(),
"host": this.metrics.host
}
}
]

# Quaternary: Critical alerts to webhook/PagerDuty
- condition: 'this.log_severity >= 3 || this.data_classification == "financial"'
http_client:
url: "${ALERTING_WEBHOOK_URL}"
method: "POST"
headers:
Content-Type: "application/json"
Authorization: "Bearer ${ALERT_TOKEN}"
postmap: |
root = {
"severity": match this.log_severity {
3 => "critical"
2 => "warning"
_ => "info"
},
"summary": this.message.string().catch("Unknown error"),
"source": this.host + ":" + this.source_file,
"timestamp": timestamp(),
"classification": this.data_classification,
"masked": this.pii_masked
}
EOF

Pattern 2: Adaptive Processing Based on Conditions

Automatically adjust processing based on volume, time of day, or system load:

cat >> ~/advanced-splunk-pipeline.yaml << 'EOF'

# Adaptive filtering processor (add before output)
- mapping: |
# Get current hour and system load
let current_hour = timestamp().format("15").number()
let is_business_hours = current_hour >= 9 && current_hour <= 17
let is_weekend = timestamp().format("Monday").contains("Saturday") || timestamp().format("Monday").contains("Sunday")

# Adaptive filtering based on conditions
root.filter_aggressiveness = match {
is_weekend => "high" # Filter more on weekends
!is_business_hours => "medium" # Filter more outside business hours
this.data_classification == "pii" => "low" # Always preserve PII
_ => "normal"
}

# Apply adaptive filtering
if this.filter_aggressiveness == "high" && this.level == "INFO" {
let hash = this.message.hash("xxhash64") % 10
if hash != 0 {
root = deleted() # Keep only 10% of INFO logs
}
}

if this.filter_aggressiveness == "medium" && this.level == "INFO" && this.response_time_ms < 100 {
let hash = this.message.hash("xxhash64") % 5
if hash != 0 {
root = deleted() # Keep only 20% of fast INFO logs
}
}
EOF

Pattern 3: Real-Time Anomaly Detection

Detect anomalies at the edge and alert before data reaches Splunk:

processors:
# Anomaly detection processor
- mapping: |
# Track baseline metrics in memory (sliding window)
let response_time = this.response_time_ms.number().catch(0)
let endpoint = this.endpoint.string().catch("unknown")

# Calculate rolling averages (this would use external cache in production)
let avg_response_key = "avg_response_" + endpoint
let current_avg = meta(avg_response_key).number().catch(response_time)
let new_avg = (current_avg * 0.9) + (response_time * 0.1) # Exponential moving average

meta(avg_response_key) = new_avg

# Detect anomalies (>3x normal response time)
root.is_anomaly = response_time > (new_avg * 3) && response_time > 1000

# Anomaly metadata
if this.is_anomaly {
root.anomaly_details = {
"current_response": response_time,
"baseline_response": new_avg,
"deviation_factor": response_time / new_avg,
"detection_time": timestamp()
}
}

Pattern 4: Geographic Data Routing

Route data based on geographic requirements for compliance:

processors:
- mapping: |
# Determine geographic region from IP
root.geo_region = match this.source_ip {
this.contains("10.1.") => "us_west"
this.contains("10.2.") => "us_east"
this.contains("10.3.") => "eu_west"
this.contains("10.4.") => "asia_pacific"
_ => "unknown"
}

# Apply data sovereignty rules
root.allowed_destinations = match this.data_classification + "_" + this.geo_region {
"pii_eu_west" => ["eu_splunk", "eu_s3"]
"financial_us_west" => ["us_splunk", "us_s3_encrypted"]
"phi_*" => ["local_only"]
_ => ["global_splunk", "global_s3"]
}

output:
# Route based on geographic compliance
switch:
- condition: 'this.allowed_destinations.contains("eu_splunk")'
output:
http_client:
url: "https://eu-splunk.company.com:8088/services/collector/event"

- condition: 'this.allowed_destinations.contains("us_splunk")'
output:
http_client:
url: "https://us-splunk.company.com:8088/services/collector/event"

- condition: 'this.allowed_destinations.contains("local_only")'
output:
file:
path: "/secure/local/phi-data-${timestamp().format('2006-01-02')}.log"

Pattern 5: Intelligent Sampling

Implement sophisticated sampling that preserves statistical significance:

processors:
- mapping: |
# Stratified sampling - preserve distribution across different dimensions
let sample_key = this.level + "_" + this.endpoint.string().catch("unknown")
let sample_count = meta("sample_" + sample_key).number().catch(0) + 1
meta("sample_" + sample_key) = sample_count

# Sample rates based on event type and frequency
root.sample_rate = match {
this.level == "ERROR" => 1.0 # Keep all errors
this.level == "WARN" => 0.8 # Keep 80% of warnings
this.level == "INFO" && this.endpoint.contains("/health") => 0.1 # Keep 10% of health checks
this.level == "INFO" => 0.5 # Keep 50% of other INFO
this.level == "DEBUG" => 0.0 # Drop all DEBUG
_ => 0.3
}

# Ensure we keep at least one sample per hour for each category
let hourly_sample_key = sample_key + "_" + timestamp().format("2006-01-02-15")
let hourly_sample_count = meta("hourly_" + hourly_sample_key).number().catch(0)

if hourly_sample_count == 0 {
# Force keep first sample of each type per hour
root.force_keep = true
meta("hourly_" + hourly_sample_key) = 1
} else {
root.force_keep = false
}

# Apply sampling decision
let random_value = this.message.hash("xxhash64") % 100 / 100.0
if !this.force_keep && random_value > this.sample_rate {
root = deleted()
}

Pattern 6: Performance Monitoring and Auto-Scaling

Monitor pipeline performance and adapt automatically:

processors:
- mapping: |
# Track pipeline performance metrics
meta("events_processed") = meta("events_processed").number().catch(0) + 1
meta("bytes_processed") = meta("bytes_processed").number().catch(0) + this.string().length()

let current_time = timestamp().format_timestamp_unix()
let last_reset = meta("last_perf_reset").number().catch(current_time)

# Reset counters every minute for rate calculation
if current_time - last_reset > 60 {
let events_per_minute = meta("events_processed")
let bytes_per_minute = meta("bytes_processed")

# Auto-adjust batch sizes based on throughput
root.optimal_batch_size = match {
events_per_minute > 10000 => 200 # High volume
events_per_minute > 1000 => 100 # Medium volume
_ => 50 # Low volume
}

# Auto-adjust filtering aggressiveness based on load
root.auto_filter_level = match {
events_per_minute > 5000 => "aggressive"
events_per_minute > 1000 => "normal"
_ => "permissive"
}

# Reset counters
meta("events_processed") = 0
meta("bytes_processed") = 0
meta("last_perf_reset") = current_time
}

Deploy and Test Advanced Patterns

1. Deploy the Advanced Pipeline

# Set additional environment variables for advanced features
export AWS_REGION="us-west-2"
export ALERTING_WEBHOOK_URL="https://hooks.slack.com/services/your/webhook/url"
export ALERT_TOKEN="your-alerting-token"

# Deploy advanced pipeline
expanso pipeline deploy ~/advanced-splunk-pipeline.yaml

# Monitor all destinations
expanso pipeline logs splunk-multi-destination -f

2. Test Multi-Destination Routing

# Generate PII event (should go to compliance index + S3)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"User login successful","user":"[email protected]","source_ip":"10.1.100.50","endpoint":"/auth/login","response_time_ms":150}' >> $TEST_DATA_DIR/app.log

# Generate financial transaction (should trigger alert)
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"ERROR","message":"Payment processing failed for transaction tx_999999","user":"[email protected]","amount":50000,"endpoint":"/api/payments","response_time_ms":3000}' >> $TEST_DATA_DIR/app.log

# Generate anomalous response time
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"Database query completed","endpoint":"/api/users","response_time_ms":15000,"query":"SELECT * FROM users"}' >> $TEST_DATA_DIR/app.log

3. Verify Data in Multiple Destinations

Splunk:

# Check compliance data
index=compliance pii_masked=true earliest=-15m

# Check anomaly detection
index=main is_anomaly=true earliest=-15m
| stats avg(anomaly_details.deviation_factor) by endpoint

S3 (via AWS CLI):

# Check S3 storage
aws s3 ls s3://company-logs-us/year=$(date +%Y)/month=$(date +%m)/day=$(date +%d)/

# Verify retention policies are applied
aws s3api head-object --bucket company-logs-us --key "path/to/your/file.json"

Metrics (Prometheus):

# Check metrics endpoint
curl http://prometheus-pushgateway:9091/metrics | grep http_request_duration

Advanced Pattern Benefits

PatternTraditional Splunk LimitationExpanso AdvantageBusiness Impact
Multi-DestinationSingle destination onlySimultaneous routing to multiple systemsReduced vendor lock-in, compliance automation
Adaptive ProcessingStatic rules onlyDynamic adjustment based on conditionsOptimal resource utilization
Real-Time AnomaliesSearch-time detectionEdge detection with immediate alertsFaster incident response
Geographic RoutingManual data managementAutomatic compliance with sovereignty lawsReduced legal risk
Intelligent SamplingSimple random samplingStratified sampling preserving distributionsBetter analytics accuracy
Auto-ScalingManual tuningAutomatic performance optimizationReduced operational overhead

Monitoring Advanced Features

1. Pipeline Health Dashboard

# View comprehensive metrics
expanso pipeline metrics splunk-multi-destination --format prometheus

# Check destination health
expanso pipeline logs splunk-multi-destination --filter "destination_status"

2. Compliance Reporting

# Generate compliance report
expanso pipeline logs splunk-multi-destination --filter "data_classification" --format json \
| jq -r '.[] | select(.pii_masked == true) | [.timestamp, .data_classification, .data_residency] | @csv'

3. Cost Analysis

# Splunk search for cost analysis
index=main sourcetype="expanso:*" earliest=-24h
| stats
count as events_sent,
sum(eval(len(_raw))) as bytes_sent,
dc(host) as edge_nodes
| eval
gb_sent = round(bytes_sent / 1024 / 1024 / 1024, 2),
daily_cost = gb_sent * 200,
estimated_original_volume = gb_sent / 0.3,
savings = (estimated_original_volume - gb_sent) * 200
| table events_sent, gb_sent, daily_cost, savings

What's Next?

Excellent! You've now implemented advanced patterns that go far beyond traditional Splunk capabilities. Next, we'll put together a complete production-ready pipeline with all best practices.

Next Step: Complete Production Pipeline


Key Takeaway: These advanced patterns show how Expanso doesn't just replace Heavy Forwarders - it enables entirely new capabilities that make your data architecture more intelligent, compliant, and cost-effective while maintaining full compatibility with your existing Splunk investment.