Skip to main content

Step 2: Parse Data Like props.conf + transforms.conf

In Splunk, props.conf and transforms.conf handle parsing and field extraction. Expanso uses Bloblang mapping language, which is more expressive and happens in real-time at the edge.

Traditional Splunk Parsing Configuration

props.conf + transforms.conf Setup

In Splunk, you'd create complex configurations like this:

# props.conf
[json_logs]
KV_MODE = json
TIMESTAMP_FIELDS = timestamp
TIME_FORMAT = %Y-%m-%dT%H:%M:%S.%3NZ
EXTRACT-user_info = (?<user>\w+\.\w+)
EXTRACT-response_time = response_time_ms=(?<response_time>\d+)
EVAL-log_severity = case(level=="ERROR", 3, level=="WARN", 2, level=="INFO", 1, 1=1, 0)

[cef]
EXTRACT-cef_fields = CEF:\d+\|(?<vendor>[^|]*)\|(?<product>[^|]*)\|(?<version>[^|]*)\|(?<signature_id>[^|]*)\|(?<name>[^|]*)\|(?<severity>[^|]*)\|(?<extensions>.*)
EVAL-risk_score = case(severity=="Critical", 10, severity=="High", 8, severity=="Medium", 5, severity=="Low", 2, 1=1, 1)

[syslog]
EXTRACT-syslog_fields = ^(?<month>\w+)\s+(?<day>\d+)\s+(?<time>\d+:\d+:\d+)\s+(?<host>\w+)\s+(?<process>[^[]*)\[?(?<pid>\d+)?\]?:\s*(?<message>.*)

SPL Search-Time Field Extraction

# Extract fields during search (expensive!)
index=main sourcetype=json_logs
| rex field=message "Database connection timeout.*database=\"(?<db_name>[^\"]+)\""
| eval response_category = case(response_time_ms < 100, "fast", response_time_ms < 500, "normal", 1=1, "slow")
| eval user_domain = split(user, "@")[1]
| eval hour_of_day = strftime(_time, "%H")

Expanso Bloblang Mapping (Better!)

With Expanso, all parsing happens at collection time with more expressive syntax:

Update Pipeline with Parsing

cat > ~/splunk-edge-pipeline.yaml << 'EOF'
apiVersion: v1
kind: Pipeline
metadata:
name: "splunk-edge-collection"
description: "Parse data like props.conf but in real-time"

input:
file_watcher:
paths:
- "/var/log/expanso-demo/app.log"
- "/var/log/expanso-demo/security.log"
- "/var/log/expanso-demo/system.log"
poll_interval: "1s"
include_file_name: true
multiline:
pattern: '^(\d{4}-\d{2}-\d{2}T|\w{3}\s+\d{1,2}\s|\{"timestamp")'
negate: false
match: after

processors:
# 1. Basic metadata (same as Step 1)
- mapping: |
root.source_file = file.name
root.collection_timestamp = timestamp()
root.host = hostname()

root.sourcetype = match file.name {
this.contains("app.log") => "json_logs"
this.contains("security.log") => "cef"
this.contains("system.log") => "syslog"
_ => "unknown"
}

# 2. Parse JSON logs (equivalent to KV_MODE=json)
- conditional:
condition: 'this.sourcetype == "json_logs"'
mapping: |
# Parse JSON if not already parsed
root = if this.type() == "object" {
this
} else {
this.parse_json().catch({"raw_message": this, "parse_error": true})
}

# Add computed fields (like Splunk EVAL)
root.log_severity = match this.level {
"ERROR" => 3
"WARN" => 2
"INFO" => 1
_ => 0
}

# Response time categorization
root.response_category = if this.response_time_ms.type() == "number" {
match {
this.response_time_ms < 100 => "fast"
this.response_time_ms < 500 => "normal"
_ => "slow"
}
}

# Extract user domain (equivalent to SPL split)
root.user_domain = if this.user.contains("@") {
this.user.split("@")[1]
} else if this.user.contains(".") {
"internal"
} else {
"unknown"
}

# Time-based fields
root.hour_of_day = timestamp().format("15")
root.day_of_week = timestamp().format("Monday")

# 3. Parse CEF logs (Security events)
- conditional:
condition: 'this.sourcetype == "cef"'
mapping: |
# Parse CEF format: CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension
root.cef = this.string().parse_regex("CEF:(?P<version>\\d+)\\|(?P<vendor>[^|]*)\\|(?P<product>[^|]*)\\|(?P<device_version>[^|]*)\\|(?P<signature_id>[^|]*)\\|(?P<name>[^|]*)\\|(?P<severity>[^|]*)\\|(?P<extensions>.*)")

# Flatten main CEF fields to root level
root.vendor = this.cef.vendor
root.product = this.cef.product
root.signature_id = this.cef.signature_id
root.event_name = this.cef.name
root.severity = this.cef.severity

# Parse extensions (key=value pairs)
root.extensions = {}
for_each this.cef.extensions.split(" ") -> value {
let kv = value.split("=")
if kv.length() == 2 {
root.extensions = root.extensions.merge({kv[0]: kv[1]})
}
}

# Add risk scoring (like Splunk EVAL)
root.risk_score = match this.severity {
"Critical" => 10
"High" => 8
"Medium" => 5
"Low" => 2
_ => 1
}

# Extract IP addresses for geo enrichment
root.source_ip = this.extensions.src.string().catch("")
root.dest_ip = this.extensions.dst.string().catch("")

# 4. Parse Syslog (traditional format)
- conditional:
condition: 'this.sourcetype == "syslog"'
mapping: |
# Parse syslog format
root.syslog = this.string().parse_regex("(?P<month>\\w+)\\s+(?P<day>\\d+)\\s+(?P<time>\\d+:\\d+:\\d+)\\s+(?P<hostname>\\w+)\\s+(?P<process>[^\\[:]*)\\[?(?P<pid>\\d+)?\\]?:\\s*(?P<message>.*)")

# Flatten and enhance
root.log_month = this.syslog.month
root.log_day = this.syslog.day.number()
root.log_time = this.syslog.time
root.log_hostname = this.syslog.hostname
root.process_name = this.syslog.process
root.process_id = this.syslog.pid.number().catch(0)
root.message = this.syslog.message

# Create structured timestamp
root.timestamp = timestamp().format("2006") + "-01-" + this.log_day.string().format_int(2, "0") + "T" + this.log_time + ".000Z"

# Process classification
root.process_category = match this.process_name {
"sshd" => "authentication"
"nginx" => "web_server"
"systemd" => "system"
"kernel" => "kernel"
_ => "other"
}

# 5. Final enrichment and routing
- mapping: |
# Determine target index (like outputs.conf)
root.target_index = match this.sourcetype {
"cef" => "security"
"syslog" => if this.process_category == "authentication" { "security" } else { "main" }
_ => "main"
}

# Add data volume tracking
root.raw_size_bytes = this.string().length()

# Classification for filtering (used in Step 3)
root.event_class = match {
this.sourcetype == "cef" => "security"
this.level == "ERROR" => "error"
this.level == "DEBUG" => "debug"
this.process_name == "nginx" && this.message.contains("GET /api/health") => "health_check"
_ => "normal"
}

output:
stdout:
format: "json"
EOF

Side-by-Side: SPL vs Bloblang Translations

Here are common Splunk field extraction patterns and their Bloblang equivalents:

1. Regular Expression Extraction

SPL:

| rex field=message "response_time=(?<response_time>\d+)"

Bloblang:

root.response_time = this.message.parse_regex("response_time=(?P<response_time>\\d+)").response_time.number()

2. Conditional Field Creation

SPL:

| eval priority = case(level="ERROR", "high", level="WARN", "medium", 1=1, "low")

Bloblang:

root.priority = match this.level {
"ERROR" => "high"
"WARN" => "medium"
_ => "low"
}

3. String Manipulation

SPL:

| eval user_domain = split(user, "@")[1]
| eval upper_user = upper(user)

Bloblang:

root.user_domain = this.user.split("@")[1]
root.upper_user = this.user.uppercase()

4. Time-Based Fields

SPL:

| eval hour = strftime(_time, "%H")
| eval is_business_hours = if(hour>=9 AND hour<=17, "yes", "no")

Bloblang:

root.hour = timestamp().format("15")
root.is_business_hours = if this.hour.number() >= 9 && this.hour.number() <= 17 { "yes" } else { "no" }

5. JSON Path Extraction

SPL:

| spath input=json_field path=user.preferences.theme output=theme

Bloblang:

root.theme = this.json_field.user.preferences.theme

Test Your Parsing

1. Deploy Updated Pipeline

# Deploy the enhanced pipeline
expanso pipeline deploy ~/splunk-edge-pipeline.yaml

# Monitor parsed output
expanso pipeline logs splunk-edge-collection -f

2. Generate Test Events

Add some complex events to test parsing:

# JSON with nested data
echo '{"timestamp":"'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'","level":"ERROR","message":"Database connection timeout","user":"[email protected]","database":"users_db","response_time_ms":2500,"retry_count":3}' >> $TEST_DATA_DIR/app.log

# CEF security event
echo 'CEF:0|Company|WebApp|1.0|700|Suspicious Login|High|src=203.0.113.25 dst=192.168.1.5 suser=admin act=login outcome=failure reason=multiple_attempts dpt=443' >> $TEST_DATA_DIR/security.log

# Syslog event
echo "$(date '+%b %d %H:%M:%S') web01 sshd[54321]: Failed password for invalid user hacker from 203.0.113.50 port 12345 ssh2" >> $TEST_DATA_DIR/system.log

3. Verify Parsed Output

You should see enriched output like:

{
"timestamp": "2024-02-10T10:30:15.123Z",
"level": "ERROR",
"message": "Database connection timeout",
"user": "[email protected]",
"database": "users_db",
"response_time_ms": 2500,
"retry_count": 3,
"log_severity": 3,
"response_category": "slow",
"user_domain": "company.com",
"hour_of_day": "10",
"sourcetype": "json_logs",
"target_index": "main",
"event_class": "error",
"raw_size_bytes": 187
}

Performance Comparison: Search-Time vs Real-Time

AspectSplunk (Search-Time)Expanso (Real-Time)
CPU UsageHigh during searchesLow, distributed
Search PerformanceSlow (parse every search)Fast (pre-parsed)
StorageRaw + indexed fieldsStructured JSON
MemorySearch-head intensiveEdge processing
LatencyParse delay on searchImmediate

Advanced Bloblang Techniques

1. Error Handling

root.parsed_data = this.raw_json.parse_json().catch({
"parse_error": true,
"raw_data": this.raw_json
})

2. Conditional Processing

if this.level == "DEBUG" {
root = deleted() # Drop debug logs entirely
}

3. Data Enrichment

# GeoIP lookup (if service available)
root.geo = this.source_ip.http_request("GET", "http://geoip-service/lookup/" + this.source_ip).parse_json().catch({})

4. Array Processing

# Process array fields
root.user_tags = this.tags.map_each(tag -> tag.uppercase())

What's Next?

Excellent! Your data is now being parsed in real-time at the edge, which is more efficient than Splunk's search-time parsing. Next, we'll implement the game-changing capability that Splunk can't do natively: filtering before data reaches the indexers.

Next Step: Step 3: Filter Before Indexing


Key Takeaway: Bloblang is more expressive than SPL and processes data in real-time. If you can write SPL searches, you can learn Bloblang — and your parsing will be faster and more efficient!