Parse & Validate Logs
After securing the input, the next critical step is ensuring the data we receive is readable and meets our quality standards. In this step, we'll implement JSON parsing and field validation.
Goal
Ensure all processed data:
- Is valid JSON (handling parsing errors gracefully)
- Contains required fields (timestamp, level, service, message)
- Conforms to the expected schema
Configuration
We'll add processors to the pipeline to handle parsing and validation.
1. Safe JSON Parsing
We parse the incoming raw byte message into a JSON object. We verify it is an object and not an array or primitive.
pipeline:
processors:
# Parse JSON
- mapping: |
root = this.parse_json()
if root.type() != "object" {
throw("input must be a JSON object")
}
# Handle parsing failures (Optional: use a catch mechanism or let it fail to error output)
2. Field Validation
We verify that critical fields exist. If they are missing, we can either drop the log or set default values. Here, we enforce their presence.
# Validate required fields
- mapping: |
root = this
# Check required fields
if !this.exists("timestamp") || !this.exists("level") || \
!this.exists("service") || !this.exists("message") {
throw("missing required fields: timestamp, level, service, message")
}
# Normalize Level
root.level = this.level.uppercase()
Complete Step 2 Configuration
production-pipeline-step-2.yaml
input:
http_server:
address: "0.0.0.0:8080"
path: /logs/ingest
rate_limit: "1000/1s"
auth:
type: header
header: "X-API-Key"
required_value: "${LOG_API_KEY}"
pipeline:
processors:
# 1. Parse JSON
- mapping: |
root = this
try {
root = this.parse_json()
} catch {
# Keep original message if parsing fails, wrap it
root = {"message": content(), "level": "ERROR", "service": "parser", "timestamp": now()}
root.error = "failed to parse json"
}
# 2. Validate
- mapping: |
root = this
if root.type() == "object" {
if !root.exists("timestamp") { root.timestamp = now() }
if !root.exists("level") { root.level = "INFO" }
if !root.exists("service") { root.service = "unknown" }
root.level = root.level.uppercase()
}
output:
stdout: {}
Deployment & Verification
-
Test Valid Data:
curl -X POST http://localhost:8080/logs/ingest \
-H "X-API-Key: $LOG_API_KEY" \
-d '{"timestamp": "2023-01-01T12:00:00Z", "level": "info", "service": "auth", "message": "Login success"}'Output should show normalized level: "INFO"
-
Test Invalid Data (Plain Text):
curl -X POST http://localhost:8080/logs/ingest \
-H "X-API-Key: $LOG_API_KEY" \
-d 'Just a text log'Output should show the wrapped error object.
Next Steps
Now that we have valid structured data, let's add context to it.