Complete Production Pipeline
This is the complete, production-ready pipeline that combines all the concepts from the previous steps. Use this as your starting point for real-world Splunk deployments.
📋
Ready to deploy?
Copy the complete pipeline YAML and paste it into Expanso Cloud.
Production Pipeline Configuration
# production-splunk-pipeline.yaml
# Complete production-ready Expanso Edge pipeline for Splunk integration
# Includes: filtering, parsing, multi-destination routing, compliance, monitoring
apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-production-integration"
description: "Production Splunk integration with edge processing - reduces indexing costs by 70%"
version: "1.0.0"
labels:
environment: "${ENVIRONMENT}"
team: "data-platform"
cost-optimization: "enabled"
# Input: Monitor log files like Splunk inputs.conf but better
input:
file_watcher:
# Monitor all log files in standard locations
paths:
- "/var/log/applications/*.log"
- "/var/log/security/*.log"
- "/var/log/system/*.log"
- "/opt/app/logs/*.log"
# Performance settings
poll_interval: "1s"
start_from_beginning: false # Only new data in production
include_file_name: true
# Handle log rotation properly
ignore_older: "24h"
close_inactive: "10m"
clean_inactive: "24h"
scan_frequency: "10s"
# Multiline log support (stack traces, etc.)
multiline:
pattern: '^(\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}|\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}|\{.*"timestamp")'
negate: false
match: after
timeout: "10s"
max_lines: 100
processors:
# 1. METADATA AND CLASSIFICATION
- mapping: |
# Basic metadata (like Splunk source/host fields)
root.source_file = file.name
root.source_path = file.path
root.collection_timestamp = timestamp()
root.host = hostname()
root.expanso_version = env("EXPANSO_VERSION").string().catch("unknown")
# Determine sourcetype based on file path and content
root.sourcetype = match {
file.path.contains("/security/") => "security_logs"
file.path.contains("/applications/") && this.string().contains('{"') => "app_json"
file.path.contains("/applications/") => "app_text"
file.path.contains("/system/") => "syslog"
this.string().contains("CEF:") => "cef"
this.string().contains('{"timestamp"') || this.string().contains('"@timestamp"') => "json_structured"
_ => "unknown"
}
# Initial event classification for filtering decisions
root.event_category = "unknown"
root.event_priority = "normal"
root.data_classification = "general"
# Track original message size for cost calculations
root.original_size_bytes = this.string().length()
# 2. JSON LOG PARSING (like Splunk props.conf with KV_MODE=json)
- conditional:
condition: 'this.sourcetype.contains("json") || this.sourcetype == "app_json"'
mapping: |
# Parse JSON with error handling
root = if this.string().parse_json().type() == "object" {
this.string().parse_json().merge({
"raw_message": this.string(),
"parse_success": true
})
} else {
{
"raw_message": this.string(),
"parse_success": false,
"parse_error": "invalid_json"
}.merge(this.without(""))
}
# Normalize timestamp formats
root.normalized_timestamp = match {
this.timestamp.type() == "string" => this.timestamp
this."@timestamp".type() == "string" => this."@timestamp"
this.time.type() == "string" => this.time
_ => timestamp()
}
# Log level normalization and severity scoring
root.log_level = this.level.string().catch(this.severity.string().catch("INFO")).uppercase()
root.log_severity = match this.log_level {
"FATAL", "CRITICAL" => 5
"ERROR" => 4
"WARN", "WARNING" => 3
"INFO" => 2
"DEBUG", "TRACE" => 1
_ => 2
}
# Event categorization for filtering and routing
root.event_category = match {
this.log_level == "ERROR" || this.log_level == "FATAL" => "error"
this.log_level == "DEBUG" || this.log_level == "TRACE" => "debug"
this.message.contains("health") && this.message.contains("check") => "health_check"
this.message.contains("login") || this.message.contains("auth") => "authentication"
this.message.contains("payment") || this.message.contains("transaction") => "financial"
this.endpoint.string().catch("").contains("/api/") => "api_call"
_ => "application"
}
# Response time analysis
if this.response_time_ms.type() == "number" {
root.response_category = match {
this.response_time_ms < 100 => "fast"
this.response_time_ms < 500 => "normal"
this.response_time_ms < 2000 => "slow"
_ => "very_slow"
}
}
# User domain extraction for analytics
if this.user.string().catch("").contains("@") {
root.user_domain = this.user.split("@")[1]
root.user_local = this.user.split("@")[0]
} else if this.user.string().catch("").contains(".") {
root.user_domain = "internal"
root.user_local = this.user
}
# 3. CEF SECURITY LOG PARSING
- conditional:
condition: 'this.sourcetype == "cef" || this.string().contains("CEF:")'
mapping: |
# Parse CEF format: CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension
root.cef_parsed = this.string().parse_regex("^CEF:(?P<version>\\d+)\\|(?P<vendor>[^|]*)\\|(?P<product>[^|]*)\\|(?P<device_version>[^|]*)\\|(?P<signature_id>[^|]*)\\|(?P<name>[^|]*)\\|(?P<severity>[^|]*)\\|(?P<extensions>.*)")
# Extract main CEF fields
root.vendor = this.cef_parsed.vendor
root.product = this.cef_parsed.product
root.signature_id = this.cef_parsed.signature_id
root.event_name = this.cef_parsed.name
root.severity = this.cef_parsed.severity
# Parse extensions into structured fields
root.cef_extensions = {}
for_each this.cef_parsed.extensions.split(" ") -> extension {
let parts = extension.split("=")
if parts.length() == 2 && parts[0] != "" {
root.cef_extensions = root.cef_extensions.merge({parts[0]: parts[1]})
}
}
# Extract key security fields
root.source_ip = this.cef_extensions.src.string().catch("")
root.destination_ip = this.cef_extensions.dst.string().catch("")
root.source_port = this.cef_extensions.spt.number().catch(0)
root.destination_port = this.cef_extensions.dpt.number().catch(0)
root.username = this.cef_extensions.suser.string().catch("")
# Security event classification
root.event_category = "security"
root.risk_score = match this.severity {
"Critical" => 10
"High" => 8
"Medium" => 5
"Low" => 2
_ => 1
}
# Set data classification based on security context
root.data_classification = match {
this.risk_score >= 8 => "sensitive"
this.event_name.contains("login") || this.event_name.contains("auth") => "pii"
_ => "security"
}
# 4. SYSLOG PARSING
- conditional:
condition: 'this.sourcetype == "syslog"'
mapping: |
# Parse traditional syslog format
root.syslog_parsed = this.string().parse_regex("^(?P<month>\\w{3})\\s+(?P<day>\\d{1,2})\\s+(?P<time>\\d{2}:\\d{2}:\\d{2})\\s+(?P<hostname>\\S+)\\s+(?P<process>[^\\[:]+)(?:\\[(?P<pid>\\d+)\\])?:\\s*(?P<message>.*)")
# Extract structured fields
root.log_month = this.syslog_parsed.month
root.log_day = this.syslog_parsed.day.number().catch(0)
root.log_time = this.syslog_parsed.time
root.syslog_hostname = this.syslog_parsed.hostname
root.process_name = this.syslog_parsed.process
root.process_id = this.syslog_parsed.pid.number().catch(0)
root.message = this.syslog_parsed.message
# Create ISO timestamp (current year assumed)
root.normalized_timestamp = timestamp().format("2006") + "-" +
match this.log_month {
"Jan" => "01"
"Feb" => "02"
"Mar" => "03"
"Apr" => "04"
"May" => "05"
"Jun" => "06"
"Jul" => "07"
"Aug" => "08"
"Sep" => "09"
"Oct" => "10"
"Nov" => "11"
"Dec" => "12"
_ => "01"
} + "-" +
this.log_day.string().format_int(2, "0") + "T" +
this.log_time + ".000Z"
# System event classification
root.event_category = match this.process_name {
"sshd" => "authentication"
"sudo" => "privilege_escalation"
"systemd" => "system"
"kernel" => "kernel"
"nginx", "httpd" => "web_server"
_ => "system"
}
# 5. COMPLIANCE AND PII DETECTION
- mapping: |
# Detect PII patterns and classify data
let message_text = this.message.string().catch(this.raw_message.string().catch(""))
# PII Detection patterns
root.contains_email = message_text.test("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b")
root.contains_ssn = message_text.test("\\b\\d{3}-\\d{2}-\\d{4}\\b")
root.contains_credit_card = message_text.test("\\b\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}\\b")
root.contains_phone = message_text.test("\\b\\d{3}[-.]?\\d{3}[-.]?\\d{4}\\b")
# Update data classification based on content
root.data_classification = match {
this.contains_ssn || this.contains_credit_card => "sensitive_pii"
this.contains_email || this.contains_phone => "pii"
this.event_category == "financial" => "financial"
this.event_category == "security" && this.risk_score >= 7 => "sensitive"
this.event_category == "authentication" => "pii"
_ => this.data_classification.string().catch("general")
}
# Determine data residency requirements
root.data_residency = match this.source_ip.string().catch("") {
this.test("^10\\.1\\.") => "us_west"
this.test("^10\\.2\\.") => "us_east"
this.test("^10\\.3\\.") => "eu_west"
this.test("^10\\.4\\.") => "asia_pacific"
_ => env("DEFAULT_DATA_RESIDENCY").string().catch("us_east")
}
# 6. PII MASKING (GDPR/HIPAA/CCPA Compliance)
- mapping: |
# Create masked version of message for general use
let original_message = this.message.string().catch(this.raw_message.string().catch(""))
# Mask email addresses
root.message_masked = original_message.replace_all_regex("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b", "***@***.***")
# Mask credit card numbers
root.message_masked = root.message_masked.replace_all_regex("\\b\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}\\b", "****-****-****-****")
# Mask SSNs
root.message_masked = root.message_masked.replace_all_regex("\\b\\d{3}-\\d{2}-\\d{4}\\b", "***-**-****")
# Mask phone numbers
root.message_masked = root.message_masked.replace_all_regex("\\b\\d{3}[-.]?\\d{3}[-.]?\\d{4}\\b", "***-***-****")
# Track masking for audit purposes
root.pii_masked = original_message != root.message_masked
root.original_message = if this.data_classification.contains("sensitive") {
"[REDACTED - " + this.data_classification + "]"
} else {
original_message
}
# 7. INTELLIGENT FILTERING - The Cost Savings Engine
# 7a. Drop DEBUG/TRACE entirely (40% volume reduction)
- conditional:
condition: 'this.log_level == "DEBUG" || this.log_level == "TRACE"'
mapping: 'root = deleted()'
# 7b. Sample health checks intelligently (20% volume reduction)
- conditional:
condition: 'this.event_category == "health_check"'
mapping: |
# Keep health checks based on:
# - One per minute per service (time-based sampling)
# - All failures (error conditions)
# - Statistical sampling for normal checks
let time_key = timestamp().format("2006-01-02-15-04") # Year-Month-Day-Hour-Minute
let service_key = this.host + ":" + (this.service.string().catch("unknown"))
let sample_key = "health_" + service_key + "_" + time_key
# Track samples per service per minute
let current_samples = meta(sample_key).number().catch(0)
# Always keep first health check per minute per service
if current_samples == 0 {
meta(sample_key) = 1
root.health_check_sampled = true
root.sample_reason = "first_per_minute"
# Always keep health check failures
} else if this.log_level == "ERROR" || this.log_level == "WARN" {
root.health_check_sampled = true
root.sample_reason = "error_condition"
# Sample 10% of remaining normal health checks
} else {
let hash = (this.host + this.message).hash("xxhash64") % 10
if hash == 0 {
root.health_check_sampled = true
root.sample_reason = "statistical_sample"
} else {
root = deleted()
}
}
# 7c. Deduplicate similar events (10% volume reduction)
- dedup:
cache_size: 10000
drop_on: 'this.log_level + "|" + this.event_category + "|" + this.host + "|" + this.message_masked'
dedupe_after: "5m"
# 7d. Rate limiting for verbose applications (15% volume reduction)
- throttle:
key: 'this.host + ":" + this.process_name.string().catch("unknown") + ":" + this.event_category'
limit: 1000 # Max 1000 events per minute per host+process+category
interval: "1m"
# 7e. Filter synthetic/test data
- conditional:
condition: |
this.user.string().catch("").contains("test") ||
this.user.string().catch("").contains("synthetic") ||
this.source_ip.string().catch("").contains("127.0.0.1") ||
this.source_ip.string().catch("").contains("::1") ||
this.message.string().catch("").contains("test-") ||
this.host.string().catch("").contains("test-")
mapping: 'root = deleted()'
# 8. FINAL ENRICHMENT AND ROUTING PREPARATION
- mapping: |
# Calculate final priority for routing decisions
root.final_priority = match {
this.data_classification.contains("sensitive") => "critical"
this.event_category == "security" && this.risk_score >= 7 => "critical"
this.log_severity >= 4 => "high"
this.log_severity == 3 => "medium"
this.event_category == "authentication" => "medium"
this.event_category == "financial" => "medium"
_ => "normal"
}
# Determine target Splunk index
root.target_index = match {
this.data_classification.contains("sensitive") => "compliance"
this.event_category == "security" || this.sourcetype == "cef" => "security"
this.log_severity >= 4 => "main"
this.event_category == "system" => "infrastructure"
_ => "main"
}
# Create Splunk-compatible sourcetype
root.target_sourcetype = "expanso:" + this.sourcetype
# Add processing metadata
root.processed_timestamp = timestamp()
root.processing_host = hostname()
root.pipeline_version = "1.0.0"
root.volume_reduction_estimate = 0.7 # 70% typical reduction
# Cost tracking
root.estimated_original_size = this.original_size_bytes / 0.3 # Assume 70% reduction
root.cost_savings_usd_per_event = (root.estimated_original_size - this.string().length()) * 0.0002 # $0.20 per MB
# 9. CREATE DESTINATION-SPECIFIC FORMATS
- mapping: |
# Splunk HEC format
root.splunk_event = {
"time": timestamp().format_timestamp_unix(),
"host": this.host,
"source": "expanso:" + this.source_file,
"sourcetype": this.target_sourcetype,
"index": this.target_index,
"event": this.without("splunk_event", "s3_event", "metrics_data")
}
# S3 archival format with metadata for analytics
root.s3_event = {
"timestamp": timestamp().format_timestamp_unix(),
"date_partition": timestamp().format("year=2006/month=01/day=02"),
"hour_partition": timestamp().format("hour=15"),
"data_classification": this.data_classification,
"data_residency": this.data_residency,
"event_priority": this.final_priority,
"pipeline_version": "1.0.0",
"retention_years": match this.data_classification {
this.contains("pii") => 7
"financial" => 10
"security" => 5
_ => 3
},
"event": this.without("splunk_event", "s3_event", "metrics_data")
}
# Metrics extraction for monitoring systems
if this.response_time_ms.type() == "number" || this.log_severity.type() == "number" {
root.metrics_data = [
if this.response_time_ms.type() == "number" {
{
"metric": "http_request_duration_seconds",
"value": this.response_time_ms / 1000,
"timestamp": timestamp().format_timestamp_unix(),
"labels": {
"host": this.host,
"endpoint": this.endpoint.string().catch("unknown"),
"method": this.method.string().catch("unknown"),
"status_code": this.status_code.string().catch("unknown")
}
}
},
{
"metric": "log_events_total",
"value": 1,
"timestamp": timestamp().format_timestamp_unix(),
"labels": {
"host": this.host,
"level": this.log_level,
"category": this.event_category,
"sourcetype": this.sourcetype,
"priority": this.final_priority
}
}
]
}
# PRODUCTION-GRADE MULTI-DESTINATION OUTPUT
output:
# PRIMARY: Splunk HEC with intelligent routing
switch:
# Critical events - dedicated high-priority pipeline
- condition: 'this.final_priority == "critical"'
output:
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
method: "POST"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
X-Splunk-Request-Channel: "expanso-critical"
timeout: "10s"
retry_policy:
max_retries: 5
initial_interval: "500ms"
max_interval: "10s"
backoff_multiplier: 2.0
# Small batches for low latency
batch:
count: 20
period: "2s"
byte_size: 524288 # 512KB
postmap: 'root = this.splunk_event'
# Normal events - standard pipeline with larger batches
- condition: 'this.final_priority != "critical"'
output:
http_client:
url: "https://${SPLUNK_HOST}:${SPLUNK_PORT}/services/collector/event"
method: "POST"
headers:
Authorization: "Splunk ${HEC_TOKEN}"
Content-Type: "application/json"
X-Splunk-Request-Channel: "expanso-normal"
timeout: "30s"
retry_policy:
max_retries: 3
initial_interval: "2s"
max_interval: "30s"
backoff_multiplier: 2.0
# Larger batches for efficiency
batch:
count: 100
period: "10s"
byte_size: 2097152 # 2MB
postmap: 'root = this.splunk_event'
# SECONDARY: Long-term storage in S3 for analytics and compliance
- condition: 'this.data_classification != "general"'
s3:
bucket: "${S3_BUCKET_PREFIX}-${this.data_residency}"
path: "${s3_event.date_partition}/${s3_event.hour_partition}/expanso-${timestamp().format_timestamp_unix('nano')}.jsonl"
region: "${AWS_REGION}"
credentials:
profile: "${AWS_PROFILE}"
storage_class: "INTELLIGENT_TIERING"
server_side_encryption: "aws:kms"
kms_key_id: "${KMS_KEY_ID}"
metadata:
"data-classification": "${this.data_classification}"
"retention-years": "${this.s3_event.retention_years}"
"pipeline-version": "1.0.0"
postmap: 'root = this.s3_event'
# TERTIARY: Metrics to monitoring systems
- condition: 'this.metrics_data.type() == "array"'
prometheus:
push_url: "${PROMETHEUS_PUSHGATEWAY_URL}/metrics/job/expanso-edge/instance/${hostname()}"
push_interval: "30s"
push_job_label: "expanso-splunk-integration"
postmap: 'root = this.metrics_data'
# QUATERNARY: Real-time alerts for critical events
- condition: 'this.final_priority == "critical" || this.log_severity >= 4'
http_client:
url: "${ALERTING_WEBHOOK_URL}"
method: "POST"
headers:
Content-Type: "application/json"
Authorization: "Bearer ${ALERT_TOKEN}"
timeout: "5s"
retry_policy:
max_retries: 2
initial_interval: "1s"
max_interval: "5s"
postmap: |
root = {
"alert_type": "log_event",
"severity": match this.final_priority {
"critical" => "critical"
_ => "warning"
},
"title": this.event_category + " event on " + this.host,
"description": this.message_masked,
"timestamp": timestamp(),
"source": this.host + ":" + this.source_file,
"classification": this.data_classification,
"index": this.target_index,
"metadata": {
"log_level": this.log_level,
"event_category": this.event_category,
"risk_score": this.risk_score.number().catch(0)
}
}
# FALLBACK: Local file storage if all outputs fail
- output:
file:
path: "/var/lib/expanso/fallback/failed-events-${timestamp().format('2006-01-02')}.jsonl"
codec: "lines"
postmap: 'root = {"timestamp": timestamp(), "error": "all_outputs_failed", "event": this.splunk_event}'
Environment Configuration
Create a production environment configuration file:
cat > .env.production << 'EOF'
# Splunk Configuration
SPLUNK_HOST=your-splunk-host.company.com
SPLUNK_PORT=8088
HEC_TOKEN=your-production-hec-token
# Index Configuration
MAIN_INDEX=main
SECURITY_INDEX=security
COMPLIANCE_INDEX=compliance
INFRASTRUCTURE_INDEX=infrastructure
# AWS Configuration (for S3 archival)
AWS_REGION=us-west-2
AWS_PROFILE=production
S3_BUCKET_PREFIX=company-logs
KMS_KEY_ID=arn:aws:kms:us-west-2:123456789012:key/your-kms-key
# Monitoring
PROMETHEUS_PUSHGATEWAY_URL=http://prometheus-pushgateway.monitoring.svc.cluster.local:9091
ALERTING_WEBHOOK_URL=https://hooks.slack.com/services/your/production/webhook
ALERT_TOKEN=your-production-alert-token
# Pipeline Configuration
ENVIRONMENT=production
DEFAULT_DATA_RESIDENCY=us_west
EXPANSO_VERSION=2.1.0
EOF