Step 4: Route to Splunk HEC Like outputs.conf
Now that we've filtered out 70% of the noise, it's time to send the valuable remaining data to Splunk. We'll use Splunk's HTTP Event Collector (HEC), which is more flexible and reliable than traditional outputs.conf forwarding.
Traditional Splunk outputs.conf vs. Expanso HEC Output
Splunk outputs.conf Configuration
# outputs.conf - Traditional forwarder configuration
[tcpout]
defaultGroup = splunk_indexers
[tcpout:splunk_indexers]
server = splunk-indexer1:9997, splunk-indexer2:9997
compressed = true
useACK = true
# Limited routing - can't dynamically choose index
[tcpout:security_group]
server = splunk-indexer1:9997
compressed = true
Limitations:
- ❌ Static routing - can't dynamically route based on content
- ❌ No built-in retry logic for failed connections
- ❌ Limited metadata control
- ❌ Requires open ports (9997) and network configuration
- ❌ No native TLS without additional setup
Expanso HEC Output (Better!)
# Dynamic routing with full control
output:
switch:
- condition: 'this.priority == "critical" || this.sourcetype == "cef"'
output:
http_client:
url: "https://${SPLUNK_HOST}:8088/services/collector/event"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
timeout: "30s"
retry_policy:
max_retries: 3
backoff: "exponential"
- condition: 'this.target_index == "security"'
output:
http_client:
url: "https://${SPLUNK_HOST}:8088/services/collector/event"
# Route to dedicated security indexer cluster
Advantages:
- ✅ Dynamic routing based on event content
- ✅ Built-in retry and error handling
- ✅ TLS by default
- ✅ No firewall port management
- ✅ Full control over index/sourcetype per event
- ✅ Load balancing across multiple HEC endpoints
Complete Pipeline with Splunk HEC Output
Let's update our pipeline to send filtered data directly to Splunk:
cat > ~/splunk-edge-pipeline.yaml << 'EOF'
apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-edge-complete"
description: "Complete edge processing pipeline with Splunk HEC output"
input:
file_watcher:
paths:
- "/var/log/expanso-demo/app.log"
- "/var/log/expanso-demo/security.log"
- "/var/log/expanso-demo/system.log"
poll_interval: "1s"
include_file_name: true
processors:
# 1. Basic metadata and parsing
- mapping: |
root.source_file = file.name
root.collection_timestamp = timestamp()
root.host = hostname()
root.sourcetype = match file.name {
this.contains("app.log") => "json_logs"
this.contains("security.log") => "cef"
this.contains("system.log") => "syslog"
_ => "unknown"
}
# 2. JSON parsing
- conditional:
condition: 'this.sourcetype == "json_logs"'
mapping: |
root = if this.type() == "object" {
this
} else {
this.parse_json().catch({"raw_message": this, "parse_error": true})
}
root.log_severity = match this.level {
"ERROR" => 3
"WARN" => 2
"INFO" => 1
_ => 0
}
# 3. CEF parsing
- conditional:
condition: 'this.sourcetype == "cef"'
mapping: |
root.cef = this.string().parse_regex("CEF:(?P<version>\\d+)\\|(?P<vendor>[^|]*)\\|(?P<product>[^|]*)\\|(?P<device_version>[^|]*)\\|(?P<signature_id>[^|]*)\\|(?P<name>[^|]*)\\|(?P<severity>[^|]*)\\|(?P<extensions>.*)")
root.vendor = this.cef.vendor
root.event_name = this.cef.name
root.severity = this.cef.severity
root.risk_score = match this.severity {
"Critical" => 10
"High" => 8
"Medium" => 5
"Low" => 2
_ => 1
}
# 4. Filtering (from Step 3)
- conditional:
condition: 'this.level == "DEBUG"'
mapping: 'root = deleted()'
- conditional:
condition: 'this.is_health_check == true'
mapping: |
let hash = (this.host + this.timestamp).hash("xxhash64") % 10
if hash != 0 { root = deleted() }
# 5. Prepare for Splunk HEC - format according to HEC requirements
- mapping: |
# Determine target index based on content and risk
root.splunk_index = match {
this.sourcetype == "cef" => env("SECURITY_INDEX").string().catch("security")
this.level == "ERROR" => env("MAIN_INDEX").string().catch("main")
this.risk_score >= 7 => env("SECURITY_INDEX").string().catch("security")
_ => env("MAIN_INDEX").string().catch("main")
}
# Set appropriate sourcetype for Splunk
root.splunk_sourcetype = match this.sourcetype {
"json_logs" => "expanso:json"
"cef" => "expanso:cef"
"syslog" => "expanso:syslog"
_ => "expanso:unknown"
}
# Add source information
root.source = "expanso-edge:" + this.host + ":" + this.source_file
# Ensure timestamp is in epoch format for Splunk
root.splunk_timestamp = timestamp().format_timestamp_unix()
# 6. Create HEC-formatted event
- mapping: |
# Create Splunk HEC event format
# See: https://docs.splunk.com/Documentation/Splunk/latest/Data/FormateventsforHTTPEventCollector
root = {
"time": this.splunk_timestamp,
"host": this.host,
"source": this.source,
"sourcetype": this.splunk_sourcetype,
"index": this.splunk_index,
"event": this.without("splunk_timestamp", "splunk_index", "splunk_sourcetype", "source")
}
# Multiple outputs with intelligent routing
output:
# Route based on priority and content
switch:
# Critical security events - high priority HEC endpoint
- condition: 'this.event.risk_score >= 8 || this.event.level == "ERROR"'
output:
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
X-Splunk-Request-Channel: "critical-events"
timeout: "10s"
retry_policy:
max_retries: 5
initial_interval: "1s"
max_interval: "30s"
backoff_multiplier: 2.0
# Batch for efficiency
batch:
count: 50
period: "5s"
byte_size: 1048576 # 1MB batches
# Normal events - standard HEC endpoint
- condition: 'this.event.priority != "critical"'
output:
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
X-Splunk-Request-Channel: "normal-events"
timeout: "30s"
retry_policy:
max_retries: 3
initial_interval: "2s"
max_interval: "60s"
batch:
count: 100
period: "10s"
byte_size: 2097152 # 2MB batches
# Fallback - local file if HEC unavailable
- output:
file:
path: "/tmp/splunk-fallback-${timestamp().format('2006-01-02')}.json"
codec: "lines"
EOF
Environment Configuration
Make sure your environment variables are set correctly:
# Verify HEC configuration
echo "HEC_TOKEN: $HEC_TOKEN"
echo "SPLUNK_HOST: $SPLUNK_HOST"
echo "SPLUNK_PORT: $SPLUNK_PORT"
# Set index names if not already configured
export MAIN_INDEX="main"
export SECURITY_INDEX="security"
export METRICS_INDEX="metrics"
Deploy and Test HEC Integration
1. Deploy the Complete Pipeline
# Deploy pipeline with HEC output
expanso pipeline deploy ~/splunk-edge-pipeline.yaml
# Monitor pipeline status
expanso pipeline status splunk-edge-complete
2. Verify HEC Connectivity
# Test HEC endpoint manually
curl -k "https://$SPLUNK_HOST:$SPLUNK_PORT/services/collector/event" \
-H "Authorization: Splunk $HEC_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"time": 1644505200,
"host": "test-host",
"source": "expanso-test",
"sourcetype": "expanso:test",
"index": "main",
"event": {"message": "HEC connectivity test", "test": true}
}'
# Expected response: {"text":"Success","code":0}
3. Generate Test Events and Verify in Splunk
# Generate mixed priority events
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"ERROR","message":"Payment processing failed","user":"[email protected]","transaction_id":"tx_99999","amount":199.99}' >> $TEST_DATA_DIR/app.log
echo 'CEF:0|Company|WebApp|1.0|900|Account Takeover Attempt|Critical|src=203.0.113.200 suser=admin act=login outcome=failure attempts=100 dpt=443' >> $TEST_DATA_DIR/security.log
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"User session created","user":"normal.user","session_id":"sess_xyz789"}' >> $TEST_DATA_DIR/app.log
4. Search in Splunk to Verify Data
In your Splunk search interface:
# Search for Expanso-processed events
index=main sourcetype="expanso:*" earliest=-15m
# Verify critical events reached security index
index=security sourcetype="expanso:cef" risk_score>=8
# Check data volume reduction
index=main sourcetype="expanso:json"
| stats count by level
| eval expected_debug_count=0
Advanced HEC Configuration Patterns
1. Load Balancing Across Multiple Indexers
output:
switch:
- condition: 'this.event.host.hash("xxhash64") % 2 == 0'
output:
http_client:
url: "https://splunk-indexer1.company.com:8088/services/collector/event"
- condition: 'this.event.host.hash("xxhash64") % 2 == 1'
output:
http_client:
url: "https://splunk-indexer2.company.com:8088/services/collector/event"
2. Environment-Based Routing
processors:
- mapping: |
# Route based on environment
root.target_cluster = match env("ENVIRONMENT") {
"prod" => "https://prod-splunk.company.com:8088"
"staging" => "https://staging-splunk.company.com:8088"
_ => "https://dev-splunk.company.com:8088"
}
3. Circuit Breaker Pattern
output:
http_client:
# Circuit breaker prevents overwhelming Splunk during issues
circuit_breaker:
failure_threshold: 10 # Open after 10 failures
success_threshold: 3 # Close after 3 successes
timeout: "60s" # Try again after 1 minute
# Graceful degradation
fallback_outputs:
- file:
path: "/var/lib/expanso/failed-events.log"
- kafka:
brokers: ["kafka1:9092", "kafka2:9092"]
topic: "splunk-backup"
4. Compliance and Audit Logging
output:
# Primary: Send to Splunk
http_client:
url: "https://${SPLUNK_HOST}:8088/services/collector/event"
# Secondary: Audit trail for compliance
- file:
path: "/var/audit/splunk-events-${timestamp().format('2006-01-02')}.log"
condition: 'this.event.sourcetype == "expanso:cef" || this.event.level == "ERROR"'
Monitoring HEC Performance
1. Check Pipeline Health
# View HEC delivery metrics
expanso pipeline metrics splunk-edge-complete
# Check for HEC errors
expanso pipeline logs splunk-edge-complete --level error --filter "HEC"
2. Monitor Splunk HEC Token Usage
In Splunk, check HEC performance:
# HEC token usage and performance
index=_internal source=*splunkd.log* component=HttpEventCollector
| stats count, avg(response_time) by token_name
| sort -count
# Look for HEC errors
index=_internal source=*splunkd.log* component=HttpEventCollector level=ERROR
3. Validate Data Integrity
# Ensure all expected fields are present
index=main sourcetype="expanso:*"
| eval missing_fields = case(
isnull(collection_timestamp), "collection_timestamp",
isnull(host), "host",
isnull(source_file), "source_file",
1=1, "none"
)
| stats count by missing_fields
Troubleshooting Common HEC Issues
1. Authentication Errors (401)
# Verify token is valid
curl -k "https://$SPLUNK_HOST:8088/services/collector/health" \
-H "Authorization: Splunk $HEC_TOKEN"
# Should return: {"text":"HEC is available","code":17}
2. Index Access Errors (403)
# Check if HEC token can write to target indexes
| rest /services/data/inputs/http/hec_tokens
| search title="your-token-name"
| eval allowed_indexes=split(allowedIndexes, ",")
3. Connection Timeouts
# Increase timeouts for slow networks
http_client:
timeout: "120s"
keep_alive: "300s"
dial_timeout: "30s"
4. Certificate Issues
# Skip cert validation for self-signed certificates (dev only!)
http_client:
tls:
skip_verify: true # DO NOT use in production!
Performance Optimization
1. Optimal Batch Sizes
batch:
count: 100 # Events per batch
period: "10s" # Max wait time
byte_size: 1048576 # 1MB max batch size
2. Compression
http_client:
compression: "gzip" # Reduce network overhead
3. Connection Pooling
http_client:
max_idle_conns: 100
max_idle_conns_per_host: 10
idle_conn_timeout: "90s"
What's Next?
Perfect! Your filtered data is now flowing directly to Splunk via HEC with intelligent routing. You've reduced indexing costs by 70% while maintaining full data integrity. Next, we'll explore advanced patterns like multi-destination routing, compliance features, and metrics extraction.
→ Next Step: Step 5: Advanced Splunk Patterns
Key Takeaway: HEC provides much more flexibility than traditional outputs.conf forwarding. You get dynamic routing, better error handling, and the ability to send different event types to different indexers - all while maintaining the familiar Splunk concepts you already know!