Skip to main content

Complete Production Pipeline

This is the complete, production-ready pipeline that combines all the concepts from the previous steps. Use this as your starting point for real-world Splunk deployments.

📋
Ready to deploy?
Copy the complete pipeline YAML and paste it into Expanso Cloud.

Production Pipeline Configuration

# production-splunk-pipeline.yaml
# Complete production-ready Expanso Edge pipeline for Splunk integration
# Includes: filtering, parsing, multi-destination routing, compliance, monitoring

apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-production-integration"
description: "Production Splunk integration with edge processing - reduces indexing costs by 70%"
version: "1.0.0"
labels:
environment: "${ENVIRONMENT}"
team: "data-platform"
cost-optimization: "enabled"

# Input: Monitor log files like Splunk inputs.conf but better
input:
file_watcher:
# Monitor all log files in standard locations
paths:
- "/var/log/applications/*.log"
- "/var/log/security/*.log"
- "/var/log/system/*.log"
- "/opt/app/logs/*.log"

# Performance settings
poll_interval: "1s"
start_from_beginning: false # Only new data in production
include_file_name: true

# Handle log rotation properly
ignore_older: "24h"
close_inactive: "10m"
clean_inactive: "24h"
scan_frequency: "10s"

# Multiline log support (stack traces, etc.)
multiline:
pattern: '^(\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}|\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}|\{.*"timestamp")'
negate: false
match: after
timeout: "10s"
max_lines: 100

processors:
# 1. METADATA AND CLASSIFICATION
- mapping: |
# Basic metadata (like Splunk source/host fields)
root.source_file = file.name
root.source_path = file.path
root.collection_timestamp = timestamp()
root.host = hostname()
root.expanso_version = env("EXPANSO_VERSION").string().catch("unknown")

# Determine sourcetype based on file path and content
root.sourcetype = match {
file.path.contains("/security/") => "security_logs"
file.path.contains("/applications/") && this.string().contains('{"') => "app_json"
file.path.contains("/applications/") => "app_text"
file.path.contains("/system/") => "syslog"
this.string().contains("CEF:") => "cef"
this.string().contains('{"timestamp"') || this.string().contains('"@timestamp"') => "json_structured"
_ => "unknown"
}

# Initial event classification for filtering decisions
root.event_category = "unknown"
root.event_priority = "normal"
root.data_classification = "general"

# Track original message size for cost calculations
root.original_size_bytes = this.string().length()

# 2. JSON LOG PARSING (like Splunk props.conf with KV_MODE=json)
- conditional:
condition: 'this.sourcetype.contains("json") || this.sourcetype == "app_json"'
mapping: |
# Parse JSON with error handling
root = if this.string().parse_json().type() == "object" {
this.string().parse_json().merge({
"raw_message": this.string(),
"parse_success": true
})
} else {
{
"raw_message": this.string(),
"parse_success": false,
"parse_error": "invalid_json"
}.merge(this.without(""))
}

# Normalize timestamp formats
root.normalized_timestamp = match {
this.timestamp.type() == "string" => this.timestamp
this."@timestamp".type() == "string" => this."@timestamp"
this.time.type() == "string" => this.time
_ => timestamp()
}

# Log level normalization and severity scoring
root.log_level = this.level.string().catch(this.severity.string().catch("INFO")).uppercase()
root.log_severity = match this.log_level {
"FATAL", "CRITICAL" => 5
"ERROR" => 4
"WARN", "WARNING" => 3
"INFO" => 2
"DEBUG", "TRACE" => 1
_ => 2
}

# Event categorization for filtering and routing
root.event_category = match {
this.log_level == "ERROR" || this.log_level == "FATAL" => "error"
this.log_level == "DEBUG" || this.log_level == "TRACE" => "debug"
this.message.contains("health") && this.message.contains("check") => "health_check"
this.message.contains("login") || this.message.contains("auth") => "authentication"
this.message.contains("payment") || this.message.contains("transaction") => "financial"
this.endpoint.string().catch("").contains("/api/") => "api_call"
_ => "application"
}

# Response time analysis
if this.response_time_ms.type() == "number" {
root.response_category = match {
this.response_time_ms < 100 => "fast"
this.response_time_ms < 500 => "normal"
this.response_time_ms < 2000 => "slow"
_ => "very_slow"
}
}

# User domain extraction for analytics
if this.user.string().catch("").contains("@") {
root.user_domain = this.user.split("@")[1]
root.user_local = this.user.split("@")[0]
} else if this.user.string().catch("").contains(".") {
root.user_domain = "internal"
root.user_local = this.user
}

# 3. CEF SECURITY LOG PARSING
- conditional:
condition: 'this.sourcetype == "cef" || this.string().contains("CEF:")'
mapping: |
# Parse CEF format: CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension
root.cef_parsed = this.string().parse_regex("^CEF:(?P<version>\\d+)\\|(?P<vendor>[^|]*)\\|(?P<product>[^|]*)\\|(?P<device_version>[^|]*)\\|(?P<signature_id>[^|]*)\\|(?P<name>[^|]*)\\|(?P<severity>[^|]*)\\|(?P<extensions>.*)")

# Extract main CEF fields
root.vendor = this.cef_parsed.vendor
root.product = this.cef_parsed.product
root.signature_id = this.cef_parsed.signature_id
root.event_name = this.cef_parsed.name
root.severity = this.cef_parsed.severity

# Parse extensions into structured fields
root.cef_extensions = {}
for_each this.cef_parsed.extensions.split(" ") -> extension {
let parts = extension.split("=")
if parts.length() == 2 && parts[0] != "" {
root.cef_extensions = root.cef_extensions.merge({parts[0]: parts[1]})
}
}

# Extract key security fields
root.source_ip = this.cef_extensions.src.string().catch("")
root.destination_ip = this.cef_extensions.dst.string().catch("")
root.source_port = this.cef_extensions.spt.number().catch(0)
root.destination_port = this.cef_extensions.dpt.number().catch(0)
root.username = this.cef_extensions.suser.string().catch("")

# Security event classification
root.event_category = "security"
root.risk_score = match this.severity {
"Critical" => 10
"High" => 8
"Medium" => 5
"Low" => 2
_ => 1
}

# Set data classification based on security context
root.data_classification = match {
this.risk_score >= 8 => "sensitive"
this.event_name.contains("login") || this.event_name.contains("auth") => "pii"
_ => "security"
}

# 4. SYSLOG PARSING
- conditional:
condition: 'this.sourcetype == "syslog"'
mapping: |
# Parse traditional syslog format
root.syslog_parsed = this.string().parse_regex("^(?P<month>\\w{3})\\s+(?P<day>\\d{1,2})\\s+(?P<time>\\d{2}:\\d{2}:\\d{2})\\s+(?P<hostname>\\S+)\\s+(?P<process>[^\\[:]+)(?:\\[(?P<pid>\\d+)\\])?:\\s*(?P<message>.*)")

# Extract structured fields
root.log_month = this.syslog_parsed.month
root.log_day = this.syslog_parsed.day.number().catch(0)
root.log_time = this.syslog_parsed.time
root.syslog_hostname = this.syslog_parsed.hostname
root.process_name = this.syslog_parsed.process
root.process_id = this.syslog_parsed.pid.number().catch(0)
root.message = this.syslog_parsed.message

# Create ISO timestamp (current year assumed)
root.normalized_timestamp = timestamp().format("2006") + "-" +
match this.log_month {
"Jan" => "01"
"Feb" => "02"
"Mar" => "03"
"Apr" => "04"
"May" => "05"
"Jun" => "06"
"Jul" => "07"
"Aug" => "08"
"Sep" => "09"
"Oct" => "10"
"Nov" => "11"
"Dec" => "12"
_ => "01"
} + "-" +
this.log_day.string().format_int(2, "0") + "T" +
this.log_time + ".000Z"

# System event classification
root.event_category = match this.process_name {
"sshd" => "authentication"
"sudo" => "privilege_escalation"
"systemd" => "system"
"kernel" => "kernel"
"nginx", "httpd" => "web_server"
_ => "system"
}

# 5. COMPLIANCE AND PII DETECTION
- mapping: |
# Detect PII patterns and classify data
let message_text = this.message.string().catch(this.raw_message.string().catch(""))

# PII Detection patterns
root.contains_email = message_text.test("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b")
root.contains_ssn = message_text.test("\\b\\d{3}-\\d{2}-\\d{4}\\b")
root.contains_credit_card = message_text.test("\\b\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}\\b")
root.contains_phone = message_text.test("\\b\\d{3}[-.]?\\d{3}[-.]?\\d{4}\\b")

# Update data classification based on content
root.data_classification = match {
this.contains_ssn || this.contains_credit_card => "sensitive_pii"
this.contains_email || this.contains_phone => "pii"
this.event_category == "financial" => "financial"
this.event_category == "security" && this.risk_score >= 7 => "sensitive"
this.event_category == "authentication" => "pii"
_ => this.data_classification.string().catch("general")
}

# Determine data residency requirements
root.data_residency = match this.source_ip.string().catch("") {
this.test("^10\\.1\\.") => "us_west"
this.test("^10\\.2\\.") => "us_east"
this.test("^10\\.3\\.") => "eu_west"
this.test("^10\\.4\\.") => "asia_pacific"
_ => env("DEFAULT_DATA_RESIDENCY").string().catch("us_east")
}

# 6. PII MASKING (GDPR/HIPAA/CCPA Compliance)
- mapping: |
# Create masked version of message for general use
let original_message = this.message.string().catch(this.raw_message.string().catch(""))

# Mask email addresses
root.message_masked = original_message.replace_all_regex("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b", "***@***.***")

# Mask credit card numbers
root.message_masked = root.message_masked.replace_all_regex("\\b\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}\\b", "****-****-****-****")

# Mask SSNs
root.message_masked = root.message_masked.replace_all_regex("\\b\\d{3}-\\d{2}-\\d{4}\\b", "***-**-****")

# Mask phone numbers
root.message_masked = root.message_masked.replace_all_regex("\\b\\d{3}[-.]?\\d{3}[-.]?\\d{4}\\b", "***-***-****")

# Track masking for audit purposes
root.pii_masked = original_message != root.message_masked
root.original_message = if this.data_classification.contains("sensitive") {
"[REDACTED - " + this.data_classification + "]"
} else {
original_message
}

# 7. INTELLIGENT FILTERING - The Cost Savings Engine

# 7a. Drop DEBUG/TRACE entirely (40% volume reduction)
- conditional:
condition: 'this.log_level == "DEBUG" || this.log_level == "TRACE"'
mapping: 'root = deleted()'

# 7b. Sample health checks intelligently (20% volume reduction)
- conditional:
condition: 'this.event_category == "health_check"'
mapping: |
# Keep health checks based on:
# - One per minute per service (time-based sampling)
# - All failures (error conditions)
# - Statistical sampling for normal checks

let time_key = timestamp().format("2006-01-02-15-04") # Year-Month-Day-Hour-Minute
let service_key = this.host + ":" + (this.service.string().catch("unknown"))
let sample_key = "health_" + service_key + "_" + time_key

# Track samples per service per minute
let current_samples = meta(sample_key).number().catch(0)

# Always keep first health check per minute per service
if current_samples == 0 {
meta(sample_key) = 1
root.health_check_sampled = true
root.sample_reason = "first_per_minute"
# Always keep health check failures
} else if this.log_level == "ERROR" || this.log_level == "WARN" {
root.health_check_sampled = true
root.sample_reason = "error_condition"
# Sample 10% of remaining normal health checks
} else {
let hash = (this.host + this.message).hash("xxhash64") % 10
if hash == 0 {
root.health_check_sampled = true
root.sample_reason = "statistical_sample"
} else {
root = deleted()
}
}

# 7c. Deduplicate similar events (10% volume reduction)
- dedup:
cache_size: 10000
drop_on: 'this.log_level + "|" + this.event_category + "|" + this.host + "|" + this.message_masked'
dedupe_after: "5m"

# 7d. Rate limiting for verbose applications (15% volume reduction)
- throttle:
key: 'this.host + ":" + this.process_name.string().catch("unknown") + ":" + this.event_category'
limit: 1000 # Max 1000 events per minute per host+process+category
interval: "1m"

# 7e. Filter synthetic/test data
- conditional:
condition: |
this.user.string().catch("").contains("test") ||
this.user.string().catch("").contains("synthetic") ||
this.source_ip.string().catch("").contains("127.0.0.1") ||
this.source_ip.string().catch("").contains("::1") ||
this.message.string().catch("").contains("test-") ||
this.host.string().catch("").contains("test-")
mapping: 'root = deleted()'

# 8. FINAL ENRICHMENT AND ROUTING PREPARATION
- mapping: |
# Calculate final priority for routing decisions
root.final_priority = match {
this.data_classification.contains("sensitive") => "critical"
this.event_category == "security" && this.risk_score >= 7 => "critical"
this.log_severity >= 4 => "high"
this.log_severity == 3 => "medium"
this.event_category == "authentication" => "medium"
this.event_category == "financial" => "medium"
_ => "normal"
}

# Determine target Splunk index
root.target_index = match {
this.data_classification.contains("sensitive") => "compliance"
this.event_category == "security" || this.sourcetype == "cef" => "security"
this.log_severity >= 4 => "main"
this.event_category == "system" => "infrastructure"
_ => "main"
}

# Create Splunk-compatible sourcetype
root.target_sourcetype = "expanso:" + this.sourcetype

# Add processing metadata
root.processed_timestamp = timestamp()
root.processing_host = hostname()
root.pipeline_version = "1.0.0"
root.volume_reduction_estimate = 0.7 # 70% typical reduction

# Cost tracking
root.estimated_original_size = this.original_size_bytes / 0.3 # Assume 70% reduction
root.cost_savings_usd_per_event = (root.estimated_original_size - this.string().length()) * 0.0002 # $0.20 per MB

# 9. CREATE DESTINATION-SPECIFIC FORMATS
- mapping: |
# Splunk HEC format
root.splunk_event = {
"time": timestamp().format_timestamp_unix(),
"host": this.host,
"source": "expanso:" + this.source_file,
"sourcetype": this.target_sourcetype,
"index": this.target_index,
"event": this.without("splunk_event", "s3_event", "metrics_data")
}

# S3 archival format with metadata for analytics
root.s3_event = {
"timestamp": timestamp().format_timestamp_unix(),
"date_partition": timestamp().format("year=2006/month=01/day=02"),
"hour_partition": timestamp().format("hour=15"),
"data_classification": this.data_classification,
"data_residency": this.data_residency,
"event_priority": this.final_priority,
"pipeline_version": "1.0.0",
"retention_years": match this.data_classification {
this.contains("pii") => 7
"financial" => 10
"security" => 5
_ => 3
},
"event": this.without("splunk_event", "s3_event", "metrics_data")
}

# Metrics extraction for monitoring systems
if this.response_time_ms.type() == "number" || this.log_severity.type() == "number" {
root.metrics_data = [
if this.response_time_ms.type() == "number" {
{
"metric": "http_request_duration_seconds",
"value": this.response_time_ms / 1000,
"timestamp": timestamp().format_timestamp_unix(),
"labels": {
"host": this.host,
"endpoint": this.endpoint.string().catch("unknown"),
"method": this.method.string().catch("unknown"),
"status_code": this.status_code.string().catch("unknown")
}
}
},
{
"metric": "log_events_total",
"value": 1,
"timestamp": timestamp().format_timestamp_unix(),
"labels": {
"host": this.host,
"level": this.log_level,
"category": this.event_category,
"sourcetype": this.sourcetype,
"priority": this.final_priority
}
}
]
}

# PRODUCTION-GRADE MULTI-DESTINATION OUTPUT
output:
# PRIMARY: Splunk HEC with intelligent routing
switch:
# Critical events - dedicated high-priority pipeline
- condition: 'this.final_priority == "critical"'
output:
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
method: "POST"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
X-Splunk-Request-Channel: "expanso-critical"
timeout: "10s"
retry_policy:
max_retries: 5
initial_interval: "500ms"
max_interval: "10s"
backoff_multiplier: 2.0
# Small batches for low latency
batch:
count: 20
period: "2s"
byte_size: 524288 # 512KB
postmap: 'root = this.splunk_event'

# Normal events - standard pipeline with larger batches
- condition: 'this.final_priority != "critical"'
output:
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
method: "POST"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
X-Splunk-Request-Channel: "expanso-normal"
timeout: "30s"
retry_policy:
max_retries: 3
initial_interval: "2s"
max_interval: "30s"
backoff_multiplier: 2.0
# Larger batches for efficiency
batch:
count: 100
period: "10s"
byte_size: 2097152 # 2MB
postmap: 'root = this.splunk_event'

# SECONDARY: Long-term storage in S3 for analytics and compliance
- condition: 'this.data_classification != "general"'
s3:
bucket: "${S3_BUCKET_PREFIX}-${this.data_residency}"
path: "${s3_event.date_partition}/${s3_event.hour_partition}/expanso-${timestamp().format_timestamp_unix('nano')}.jsonl"
region: "${AWS_REGION}"
credentials:
profile: "${AWS_PROFILE}"
storage_class: "INTELLIGENT_TIERING"
server_side_encryption: "aws:kms"
kms_key_id: "${KMS_KEY_ID}"
metadata:
"data-classification": "${this.data_classification}"
"retention-years": "${this.s3_event.retention_years}"
"pipeline-version": "1.0.0"
postmap: 'root = this.s3_event'

# TERTIARY: Metrics to monitoring systems
- condition: 'this.metrics_data.type() == "array"'
prometheus:
push_url: "${PROMETHEUS_PUSHGATEWAY_URL}/metrics/job/expanso-edge/instance/${hostname()}"
push_interval: "30s"
push_job_label: "expanso-splunk-integration"
postmap: 'root = this.metrics_data'

# QUATERNARY: Real-time alerts for critical events
- condition: 'this.final_priority == "critical" || this.log_severity >= 4'
http_client:
url: "${ALERTING_WEBHOOK_URL}"
method: "POST"
headers:
Content-Type: "application/json"
Authorization: "Bearer ${ALERT_TOKEN}"
timeout: "5s"
retry_policy:
max_retries: 2
initial_interval: "1s"
max_interval: "5s"
postmap: |
root = {
"alert_type": "log_event",
"severity": match this.final_priority {
"critical" => "critical"
_ => "warning"
},
"title": this.event_category + " event on " + this.host,
"description": this.message_masked,
"timestamp": timestamp(),
"source": this.host + ":" + this.source_file,
"classification": this.data_classification,
"index": this.target_index,
"metadata": {
"log_level": this.log_level,
"event_category": this.event_category,
"risk_score": this.risk_score.number().catch(0)
}
}

# FALLBACK: Local file storage if all outputs fail
- output:
file:
path: "/var/lib/expanso/fallback/failed-events-${timestamp().format('2006-01-02')}.jsonl"
codec: "lines"
postmap: 'root = {"timestamp": timestamp(), "error": "all_outputs_failed", "event": this.splunk_event}'

Environment Configuration

Create a production environment configuration file:

cat > .env.production << 'EOF'
# Splunk Configuration
SPLUNK_HOST=your-splunk-host.company.com
SPLUNK_PORT=8088
HEC_TOKEN=your-production-hec-token

# Index Configuration
MAIN_INDEX=main
SECURITY_INDEX=security
COMPLIANCE_INDEX=compliance
INFRASTRUCTURE_INDEX=infrastructure

# AWS Configuration (for S3 archival)
AWS_REGION=us-west-2
AWS_PROFILE=production
S3_BUCKET_PREFIX=company-logs
KMS_KEY_ID=arn:aws:kms:us-west-2:123456789012:key/your-kms-key

# Monitoring
PROMETHEUS_PUSHGATEWAY_URL=http://prometheus-pushgateway.monitoring.svc.cluster.local:9091
ALERTING_WEBHOOK_URL=https://hooks.slack.com/services/your/production/webhook
ALERT_TOKEN=your-production-alert-token

# Pipeline Configuration
ENVIRONMENT=production
DEFAULT_DATA_RESIDENCY=us_west
EXPANSO_VERSION=2.1.0
EOF

Deployment Instructions

1. Pre-Deployment Checklist

# Validate pipeline configuration
expanso pipeline validate production-splunk-pipeline.yaml

# Test Splunk connectivity
curl -k "https://$SPLUNK_HOST:$SPLUNK_PORT/services/collector/health" \
-H "Authorization: Splunk $HEC_TOKEN"

# Verify AWS credentials and S3 access
aws s3 ls s3://${S3_BUCKET_PREFIX}-us-west/ --profile $AWS_PROFILE

# Check Prometheus pushgateway
curl $PROMETHEUS_PUSHGATEWAY_URL/metrics

# Test alerting webhook
curl -X POST $ALERTING_WEBHOOK_URL \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $ALERT_TOKEN" \
-d '{"text": "Expanso pipeline deployment test"}'

2. Deploy Pipeline

# Load environment variables
source .env.production

# Deploy to production
expanso pipeline deploy production-splunk-pipeline.yaml --environment production

# Verify deployment
expanso pipeline status splunk-production-integration

3. Production Monitoring Setup

# Set up log rotation for fallback files
echo '#!/bin/bash
find /var/lib/expanso/fallback/ -name "*.jsonl" -mtime +7 -delete
' > /etc/cron.daily/expanso-cleanup

chmod +x /etc/cron.daily/expanso-cleanup

# Create monitoring dashboard queries for Splunk
cat > monitoring-queries.spl << 'EOF'
# Volume and cost tracking
index=main sourcetype="expanso:*" earliest=-24h
| stats
count as events_indexed,
sum(eval(len(_raw))) as bytes_indexed,
avg(volume_reduction_estimate) as avg_reduction,
sum(cost_savings_usd_per_event) as daily_savings
| eval
gb_indexed = round(bytes_indexed / 1024 / 1024 / 1024, 2),
estimated_original_gb = round(gb_indexed / (1 - avg_reduction), 2),
actual_cost = gb_indexed * 200,
estimated_original_cost = estimated_original_gb * 200,
total_savings = estimated_original_cost - actual_cost

# Error rate monitoring
index=main sourcetype="expanso:*" earliest=-1h
| stats count by final_priority, log_level
| eval error_rate = if(log_level="ERROR", count, 0)
| stats sum(error_rate) as errors, sum(count) as total
| eval error_percentage = round((errors/total)*100, 2)

# Processing performance
index=_internal source=*splunkd.log* component=HttpEventCollector earliest=-1h
| stats
count as hec_events,
avg(response_time) as avg_response_time,
max(response_time) as max_response_time
| eval health_status = if(avg_response_time < 100, "healthy", "degraded")
EOF

4. Health Checks

# Create health check script
cat > /usr/local/bin/expanso-health-check << 'EOF'
#!/bin/bash
set -e

echo "Expanso Pipeline Health Check - $(date)"
echo "=================================="

# Check pipeline status
echo "Pipeline Status:"
expanso pipeline status splunk-production-integration

# Check recent errors
echo -e "\nRecent Errors (last 5 minutes):"
expanso pipeline logs splunk-production-integration --level error --since 5m | head -10

# Check processing rates
echo -e "\nProcessing Metrics:"
expanso pipeline metrics splunk-production-integration --format json | jq -r '
"Events/minute: " + (.events_per_minute // 0 | tostring),
"Bytes/minute: " + (.bytes_per_minute // 0 | tostring),
"Error rate: " + (.error_rate // 0 | tostring) + "%"'

# Check Splunk connectivity
echo -e "\nSplunk Connectivity:"
if curl -s -k "https://$SPLUNK_HOST:$SPLUNK_PORT/services/collector/health" \
-H "Authorization: Splunk $HEC_TOKEN" | grep -q "HEC is available"; then
echo "✓ Splunk HEC is available"
else
echo "✗ Splunk HEC is not available"
exit 1
fi

# Check fallback file accumulation
echo -e "\nFallback Files:"
fallback_count=$(find /var/lib/expanso/fallback/ -name "*.jsonl" 2>/dev/null | wc -l)
if [ $fallback_count -eq 0 ]; then
echo "✓ No fallback files (all outputs healthy)"
else
echo "⚠ $fallback_count fallback files found (check output health)"
fi

echo -e "\nHealth check completed successfully!"
EOF

chmod +x /usr/local/bin/expanso-health-check

# Set up automated health checks
echo "*/5 * * * * /usr/local/bin/expanso-health-check >> /var/log/expanso-health.log 2>&1" | crontab -

Cost Validation

After deployment, validate your cost savings:

1. Baseline Measurement (First Week)

# Splunk search for baseline metrics
index=main sourcetype="expanso:*" earliest=-7d
| eval day = strftime(_time, "%Y-%m-%d")
| stats
count as events,
sum(original_size_bytes) as original_bytes,
sum(eval(len(_raw))) as processed_bytes
by day
| eval
original_gb = round(original_bytes / 1024 / 1024 / 1024, 2),
processed_gb = round(processed_bytes / 1024 / 1024 / 1024, 2),
reduction_pct = round(((original_gb - processed_gb) / original_gb) * 100, 1),
daily_savings = (original_gb - processed_gb) * 200
| sort day

2. Monthly Cost Report

# Monthly cost analysis
index=main sourcetype="expanso:*" earliest=-30d
| eval month = strftime(_time, "%Y-%m")
| stats
sum(original_size_bytes) as original_bytes,
sum(eval(len(_raw))) as processed_bytes,
dc(_time) as active_days
by month
| eval
original_gb = round(original_bytes / 1024 / 1024 / 1024, 2),
processed_gb = round(processed_bytes / 1024 / 1024 / 1024, 2),
reduction_pct = round(((original_gb - processed_gb) / original_gb) * 100, 1),
monthly_savings = (original_gb - processed_gb) * 200,
annual_savings_projection = monthly_savings * 12
| table month, original_gb, processed_gb, reduction_pct, monthly_savings, annual_savings_projection

Performance Tuning

Based on your actual traffic patterns, you may need to adjust these settings:

High Volume Environments (greater than 10k events/minute)

# Increase batch sizes
batch:
count: 500
period: "5s"
byte_size: 5242880 # 5MB

# Increase worker pools
input:
file_watcher:
max_concurrent_files: 20
read_buffer_size: 65536

# More aggressive filtering
processors:
- throttle:
limit: 5000 # Higher rate limit

Low Volume Environments (Under 1k events/minute)

# Smaller batches for lower latency
batch:
count: 10
period: "30s"
byte_size: 262144 # 256KB

# Less aggressive filtering to preserve more data
processors:
- throttle:
limit: 100

Security Hardening

For production deployments, implement these security measures:

1. TLS Configuration

http_client:
tls:
min_version: "1.2"
cert_file: "/etc/expanso/certs/client.crt"
key_file: "/etc/expanso/certs/client.key"
ca_file: "/etc/expanso/certs/ca.crt"

2. Network Security

# Firewall rules (allow only necessary outbound connections)
sudo ufw allow out 443/tcp # Splunk HEC
sudo ufw allow out 9091/tcp # Prometheus pushgateway
sudo ufw deny out 22/tcp # Deny SSH outbound

3. File Permissions

# Secure configuration files
sudo chown expanso:expanso production-splunk-pipeline.yaml
sudo chmod 640 production-splunk-pipeline.yaml

# Secure environment file
sudo chown expanso:expanso .env.production
sudo chmod 600 .env.production

Troubleshooting Guide

Next: Troubleshooting Common Issues


You now have a complete, production-ready Splunk integration! This pipeline will:

  • ✅ Reduce indexing costs by 70%+
  • ✅ Preserve all critical data
  • ✅ Ensure compliance with PII masking
  • ✅ Provide multi-destination routing
  • ✅ Enable real-time alerting
  • ✅ Support long-term analytics via S3

Your Splunk investment just became 3x more cost-effective while gaining capabilities that traditional Heavy Forwarders could never provide!