Skip to main content

Step 4: Route to Splunk HEC Like outputs.conf

Now that we've filtered out 70% of the noise, it's time to send the valuable remaining data to Splunk. We'll use Splunk's HTTP Event Collector (HEC), which is more flexible and reliable than traditional outputs.conf forwarding.

Traditional Splunk outputs.conf vs. Expanso HEC Output

Splunk outputs.conf Configuration

# outputs.conf - Traditional forwarder configuration
[tcpout]
defaultGroup = splunk_indexers

[tcpout:splunk_indexers]
server = splunk-indexer1:9997, splunk-indexer2:9997
compressed = true
useACK = true

# Limited routing - can't dynamically choose index
[tcpout:security_group]
server = splunk-indexer1:9997
compressed = true

Limitations:

  • ❌ Static routing - can't dynamically route based on content
  • ❌ No built-in retry logic for failed connections
  • ❌ Limited metadata control
  • ❌ Requires open ports (9997) and network configuration
  • ❌ No native TLS without additional setup

Expanso HEC Output (Better!)

# Dynamic routing with full control
output:
switch:
- condition: 'this.priority == "critical" || this.sourcetype == "cef"'
output:
http_client:
url: "https://${SPLUNK_HOST}:8088/services/collector/event"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
timeout: "30s"
retry_policy:
max_retries: 3
backoff: "exponential"

- condition: 'this.target_index == "security"'
output:
http_client:
url: "https://${SPLUNK_HOST}:8088/services/collector/event"
# Route to dedicated security indexer cluster

Advantages:

  • ✅ Dynamic routing based on event content
  • ✅ Built-in retry and error handling
  • ✅ TLS by default
  • ✅ No firewall port management
  • ✅ Full control over index/sourcetype per event
  • ✅ Load balancing across multiple HEC endpoints

Complete Pipeline with Splunk HEC Output

Let's update our pipeline to send filtered data directly to Splunk:

cat > ~/splunk-edge-pipeline.yaml << 'EOF'
apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-edge-complete"
description: "Complete edge processing pipeline with Splunk HEC output"

input:
file_watcher:
paths:
- "/var/log/expanso-demo/app.log"
- "/var/log/expanso-demo/security.log"
- "/var/log/expanso-demo/system.log"
poll_interval: "1s"
include_file_name: true

processors:
# 1. Basic metadata and parsing
- mapping: |
root.source_file = file.name
root.collection_timestamp = timestamp()
root.host = hostname()

root.sourcetype = match file.name {
this.contains("app.log") => "json_logs"
this.contains("security.log") => "cef"
this.contains("system.log") => "syslog"
_ => "unknown"
}

# 2. JSON parsing
- conditional:
condition: 'this.sourcetype == "json_logs"'
mapping: |
root = if this.type() == "object" {
this
} else {
this.parse_json().catch({"raw_message": this, "parse_error": true})
}

root.log_severity = match this.level {
"ERROR" => 3
"WARN" => 2
"INFO" => 1
_ => 0
}

# 3. CEF parsing
- conditional:
condition: 'this.sourcetype == "cef"'
mapping: |
root.cef = this.string().parse_regex("CEF:(?P<version>\\d+)\\|(?P<vendor>[^|]*)\\|(?P<product>[^|]*)\\|(?P<device_version>[^|]*)\\|(?P<signature_id>[^|]*)\\|(?P<name>[^|]*)\\|(?P<severity>[^|]*)\\|(?P<extensions>.*)")

root.vendor = this.cef.vendor
root.event_name = this.cef.name
root.severity = this.cef.severity
root.risk_score = match this.severity {
"Critical" => 10
"High" => 8
"Medium" => 5
"Low" => 2
_ => 1
}

# 4. Filtering (from Step 3)
- conditional:
condition: 'this.level == "DEBUG"'
mapping: 'root = deleted()'

- conditional:
condition: 'this.is_health_check == true'
mapping: |
let hash = (this.host + this.timestamp).hash("xxhash64") % 10
if hash != 0 { root = deleted() }

# 5. Prepare for Splunk HEC - format according to HEC requirements
- mapping: |
# Determine target index based on content and risk
root.splunk_index = match {
this.sourcetype == "cef" => env("SECURITY_INDEX").string().catch("security")
this.level == "ERROR" => env("MAIN_INDEX").string().catch("main")
this.risk_score >= 7 => env("SECURITY_INDEX").string().catch("security")
_ => env("MAIN_INDEX").string().catch("main")
}

# Set appropriate sourcetype for Splunk
root.splunk_sourcetype = match this.sourcetype {
"json_logs" => "expanso:json"
"cef" => "expanso:cef"
"syslog" => "expanso:syslog"
_ => "expanso:unknown"
}

# Add source information
root.source = "expanso-edge:" + this.host + ":" + this.source_file

# Ensure timestamp is in epoch format for Splunk
root.splunk_timestamp = timestamp().format_timestamp_unix()

# 6. Create HEC-formatted event
- mapping: |
# Create Splunk HEC event format
# See: https://docs.splunk.com/Documentation/Splunk/latest/Data/FormateventsforHTTPEventCollector

root = {
"time": this.splunk_timestamp,
"host": this.host,
"source": this.source,
"sourcetype": this.splunk_sourcetype,
"index": this.splunk_index,
"event": this.without("splunk_timestamp", "splunk_index", "splunk_sourcetype", "source")
}

# Multiple outputs with intelligent routing
output:
# Route based on priority and content
switch:
# Critical security events - high priority HEC endpoint
- condition: 'this.event.risk_score >= 8 || this.event.level == "ERROR"'
output:
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
X-Splunk-Request-Channel: "critical-events"
timeout: "10s"
retry_policy:
max_retries: 5
initial_interval: "1s"
max_interval: "30s"
backoff_multiplier: 2.0
# Batch for efficiency
batch:
count: 50
period: "5s"
byte_size: 1048576 # 1MB batches

# Normal events - standard HEC endpoint
- condition: 'this.event.priority != "critical"'
output:
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
X-Splunk-Request-Channel: "normal-events"
timeout: "30s"
retry_policy:
max_retries: 3
initial_interval: "2s"
max_interval: "60s"
batch:
count: 100
period: "10s"
byte_size: 2097152 # 2MB batches

# Fallback - local file if HEC unavailable
- output:
file:
path: "/tmp/splunk-fallback-${timestamp().format('2006-01-02')}.json"
codec: "lines"
EOF

Environment Configuration

Make sure your environment variables are set correctly:

# Verify HEC configuration
echo "HEC_TOKEN: $HEC_TOKEN"
echo "SPLUNK_HOST: $SPLUNK_HOST"
echo "SPLUNK_PORT: $SPLUNK_PORT"

# Set index names if not already configured
export MAIN_INDEX="main"
export SECURITY_INDEX="security"
export METRICS_INDEX="metrics"

Deploy and Test HEC Integration

1. Deploy the Complete Pipeline

# Deploy pipeline with HEC output
expanso pipeline deploy ~/splunk-edge-pipeline.yaml

# Monitor pipeline status
expanso pipeline status splunk-edge-complete

2. Verify HEC Connectivity

# Test HEC endpoint manually
curl -k "https://$SPLUNK_HOST:$SPLUNK_PORT/services/collector/event" \
-H "Authorization: Splunk $HEC_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"time": 1644505200,
"host": "test-host",
"source": "expanso-test",
"sourcetype": "expanso:test",
"index": "main",
"event": {"message": "HEC connectivity test", "test": true}
}'

# Expected response: {"text":"Success","code":0}

3. Generate Test Events and Verify in Splunk

# Generate mixed priority events
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"ERROR","message":"Payment processing failed","user":"[email protected]","transaction_id":"tx_99999","amount":199.99}' >> $TEST_DATA_DIR/app.log

echo 'CEF:0|Company|WebApp|1.0|900|Account Takeover Attempt|Critical|src=203.0.113.200 suser=admin act=login outcome=failure attempts=100 dpt=443' >> $TEST_DATA_DIR/security.log

echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"INFO","message":"User session created","user":"normal.user","session_id":"sess_xyz789"}' >> $TEST_DATA_DIR/app.log

4. Search in Splunk to Verify Data

In your Splunk search interface:

# Search for Expanso-processed events
index=main sourcetype="expanso:*" earliest=-15m

# Verify critical events reached security index
index=security sourcetype="expanso:cef" risk_score>=8

# Check data volume reduction
index=main sourcetype="expanso:json"
| stats count by level
| eval expected_debug_count=0

Advanced HEC Configuration Patterns

1. Load Balancing Across Multiple Indexers

output:
switch:
- condition: 'this.event.host.hash("xxhash64") % 2 == 0'
output:
http_client:
url: "https://splunk-indexer1.company.com:8088/services/collector/event"
- condition: 'this.event.host.hash("xxhash64") % 2 == 1'
output:
http_client:
url: "https://splunk-indexer2.company.com:8088/services/collector/event"

2. Environment-Based Routing

processors:
- mapping: |
# Route based on environment
root.target_cluster = match env("ENVIRONMENT") {
"prod" => "https://prod-splunk.company.com:8088"
"staging" => "https://staging-splunk.company.com:8088"
_ => "https://dev-splunk.company.com:8088"
}

3. Circuit Breaker Pattern

output:
http_client:
# Circuit breaker prevents overwhelming Splunk during issues
circuit_breaker:
failure_threshold: 10 # Open after 10 failures
success_threshold: 3 # Close after 3 successes
timeout: "60s" # Try again after 1 minute

# Graceful degradation
fallback_outputs:
- file:
path: "/var/lib/expanso/failed-events.log"
- kafka:
brokers: ["kafka1:9092", "kafka2:9092"]
topic: "splunk-backup"

4. Compliance and Audit Logging

output:
# Primary: Send to Splunk
http_client:
url: "https://${SPLUNK_HOST}:8088/services/collector/event"

# Secondary: Audit trail for compliance
- file:
path: "/var/audit/splunk-events-${timestamp().format('2006-01-02')}.log"
condition: 'this.event.sourcetype == "expanso:cef" || this.event.level == "ERROR"'

Monitoring HEC Performance

1. Check Pipeline Health

# View HEC delivery metrics
expanso pipeline metrics splunk-edge-complete

# Check for HEC errors
expanso pipeline logs splunk-edge-complete --level error --filter "HEC"

2. Monitor Splunk HEC Token Usage

In Splunk, check HEC performance:

# HEC token usage and performance  
index=_internal source=*splunkd.log* component=HttpEventCollector
| stats count, avg(response_time) by token_name
| sort -count

# Look for HEC errors
index=_internal source=*splunkd.log* component=HttpEventCollector level=ERROR

3. Validate Data Integrity

# Ensure all expected fields are present
index=main sourcetype="expanso:*"
| eval missing_fields = case(
isnull(collection_timestamp), "collection_timestamp",
isnull(host), "host",
isnull(source_file), "source_file",
1=1, "none"
)
| stats count by missing_fields

Troubleshooting Common HEC Issues

1. Authentication Errors (401)

# Verify token is valid
curl -k "https://$SPLUNK_HOST:8088/services/collector/health" \
-H "Authorization: Splunk $HEC_TOKEN"

# Should return: {"text":"HEC is available","code":17}

2. Index Access Errors (403)

# Check if HEC token can write to target indexes
| rest /services/data/inputs/http/hec_tokens
| search title="your-token-name"
| eval allowed_indexes=split(allowedIndexes, ",")

3. Connection Timeouts

# Increase timeouts for slow networks
http_client:
timeout: "120s"
keep_alive: "300s"
dial_timeout: "30s"

4. Certificate Issues

# Skip cert validation for self-signed certificates (dev only!)
http_client:
tls:
skip_verify: true # DO NOT use in production!

Performance Optimization

1. Optimal Batch Sizes

batch:
count: 100 # Events per batch
period: "10s" # Max wait time
byte_size: 1048576 # 1MB max batch size

2. Compression

http_client:
compression: "gzip" # Reduce network overhead

3. Connection Pooling

http_client:
max_idle_conns: 100
max_idle_conns_per_host: 10
idle_conn_timeout: "90s"

What's Next?

Perfect! Your filtered data is now flowing directly to Splunk via HEC with intelligent routing. You've reduced indexing costs by 70% while maintaining full data integrity. Next, we'll explore advanced patterns like multi-destination routing, compliance features, and metrics extraction.

Next Step: Step 5: Advanced Splunk Patterns


Key Takeaway: HEC provides much more flexibility than traditional outputs.conf forwarding. You get dynamic routing, better error handling, and the ability to send different event types to different indexers - all while maintaining the familiar Splunk concepts you already know!