Step 2: Transform and Enrich Data
Use Bloblang to process raw O-RAN telemetry, adding derived metrics, compliance classifications, and site metadata before routing to destinations.
Transformation Overview
Raw DU telemetry needs processing for effective analysis:
- Timestamp normalization: Convert to nanosecond precision
- Compliance classification: Apply PTP timing thresholds
- Derived metrics: Calculate efficiency and health scores
- Site enrichment: Add cell, sector, and regional metadata
- Format conversion: Prepare for destination-specific schemas
Core Transformations
Timestamp Normalization
Ensure consistent timestamp formats across all metrics:
pipeline:
processors:
- mapping: |
# Normalize timestamps to RFC3339 with nanosecond precision
root = this
# Convert various timestamp formats to standard
root.timestamp_ns = match {
this.timestamp.type() == "string" => this.timestamp.ts_parse("2006-01-02T15:04:05Z").ts_unix_nano()
this.timestamp.type() == "number" => this.timestamp * 1000000000 # Convert seconds to nanoseconds
_ => now().ts_unix_nano()
}
# Standardized ISO timestamp
root.timestamp_iso = timestamp_ns().ts_format("2006-01-02T15:04:05.000000000Z")
# Processing timestamp for pipeline tracking
root.processed_at = now()
PTP Compliance Classification
Apply 5G timing requirements to PTP offset measurements:
pipeline:
processors:
- mapping: |
root = this
# PTP compliance thresholds for 5G networks
# Based on ITU-T G.8271.1 and 3GPP TS 38.104
let abs_offset = if this.ptp4l_offset_ns < 0 {
-this.ptp4l_offset_ns
} else {
this.ptp4l_offset_ns
}
root.ptp_compliance = match {
$abs_offset < 100 => "compliant" # Within ±100ns - fully compliant
$abs_offset < 1000 => "degraded" # ±100-1000ns - degraded performance
$abs_offset < 10000 => "critical" # ±1-10μs - critical timing error
_ => "non_compliant" # >±10μs - non-compliant
}
# Compliance score (0-100)
root.ptp_score = match {
$abs_offset < 100 => 100
$abs_offset < 1000 => 100 - (($abs_offset - 100) / 9) # 100-90
$abs_offset < 10000 => 90 - (($abs_offset - 1000) / 100) # 90-0
_ => 0
}.round()
# Alert level
root.ptp_alert_level = match this.ptp_compliance {
"compliant" => "none"
"degraded" => "warning"
"critical" => "major"
"non_compliant" => "critical"
}
PRB Efficiency Calculations
Calculate resource utilization efficiency and congestion indicators:
pipeline:
processors:
- mapping: |
root = this
# PRB efficiency calculations
root.prb_total_utilization = (this.prb_dl_pct + this.prb_ul_pct) / 2
root.prb_efficiency = root.prb_total_utilization.round()
# Spectral efficiency (bits/Hz) - typical values for 5G
root.spectral_efficiency_dl = match {
this.prb_dl_pct < 30 => 2.4 # Low load - high efficiency
this.prb_dl_pct < 70 => 3.2 # Medium load - optimal efficiency
this.prb_dl_pct < 90 => 2.8 # High load - decreasing efficiency
_ => 1.8 # Congestion - poor efficiency
}
# Congestion classification
root.congestion_level = match {
this.prb_dl_pct < 50 => "low"
this.prb_dl_pct < 80 => "medium"
this.prb_dl_pct < 95 => "high"
_ => "critical"
}
# Capacity headroom (percentage remaining)
root.capacity_headroom_pct = 100 - this.prb_dl_pct
# Predicted time to congestion (minutes) - simple linear projection
root.time_to_congestion_min = if this.prb_dl_pct > 70 {
(95 - this.prb_dl_pct) / 0.5 # Assuming 0.5% growth per minute
} else {
null
}
RF Quality Assessment
Process RF measurements for coverage and quality analysis:
pipeline:
processors:
- mapping: |
root = this
# RSRP (Reference Signal Received Power) classification
# Based on 3GPP TS 36.133 and typical deployment thresholds
root.rsrp_quality = match {
this.rsrp_dbm > -80 => "excellent" # >-80 dBm
this.rsrp_dbm > -90 => "good" # -80 to -90 dBm
this.rsrp_dbm > -100 => "fair" # -90 to -100 dBm
this.rsrp_dbm > -110 => "poor" # -100 to -110 dBm
_ => "very_poor" # <-110 dBm
}
# SINR (Signal-to-Interference+Noise Ratio) classification
root.sinr_quality = match {
this.sinr_db > 20 => "excellent" # >20 dB
this.sinr_db > 13 => "good" # 13-20 dB
this.sinr_db > 0 => "fair" # 0-13 dB
this.sinr_db > -6 => "poor" # -6-0 dB
_ => "very_poor" # <-6 dB
}
# Overall RF health score (0-100)
let rsrp_score = match {
this.rsrp_dbm > -80 => 100
this.rsrp_dbm > -90 => 80
this.rsrp_dbm > -100 => 60
this.rsrp_dbm > -110 => 40
_ => 20
}
let sinr_score = match {
this.sinr_db > 20 => 100
this.sinr_db > 13 => 80
this.sinr_db > 0 => 60
this.sinr_db > -6 => 40
_ => 20
}
root.rf_health_score = ((rsrp_score + sinr_score) / 2).round()
# Coverage area estimation (rough approximation)
root.estimated_coverage_radius_m = match {
this.rsrp_dbm > -80 => 500
this.rsrp_dbm > -90 => 1000
this.rsrp_dbm > -100 => 2000
this.rsrp_dbm > -110 => 3000
_ => 5000
}
Site Metadata Enrichment
Add contextual information about cell sites and network topology:
pipeline:
processors:
- mapping: |
root = this
# Site metadata from environment variables or lookup tables
root.region = env("REGION") | "us-west-2"
root.market = env("MARKET") | "seattle"
root.site_type = env("SITE_TYPE") | "urban-macro"
# Generate site identifiers
root.gnb_id = env("GNB_ID") | "gnb-" + this.region + "-" + this.cell_id.string().pad_left(3, "0")
root.enb_id = this.gnb_id # Backwards compatibility
# Cell sector information
root.sector = match this.cell_id {
"cell-1" => "alpha" # 0-120 degrees
"cell-2" => "beta" # 120-240 degrees
"cell-3" => "gamma" # 240-360 degrees
_ => "omni" # Omnidirectional
}
# Geographic coordinates (example - would come from CMDB)
root.coordinates = {
"latitude": 47.6062,
"longitude": -122.3321,
"altitude_m": 15
}
# Network hierarchy
root.network_hierarchy = {
"region": this.region,
"market": this.market,
"site_id": this.gnb_id,
"cell_id": this.cell_id,
"sector": this.sector
}
# Deployment metadata
root.deployment_info = {
"vendor": "ericsson",
"software_version": "22.Q4",
"hardware_model": "AIR-3268",
"deployment_date": "2023-08-15T00:00:00Z"
}
Multi-Destination Data Preparation
Format data appropriately for each destination:
pipeline:
processors:
# Common enrichment for all destinations
- mapping: |
root = this
# Add universal metadata
root.pipeline_id = "oran-edge-telemetry"
root.pipeline_version = "v1.2.0"
root.edge_node_id = env("NODE_ID")
root.collection_method = this.source | "api"
# Generate unique record ID
root.record_id = uuid_v4()
# Add tags for downstream filtering
root.tags = {
"environment": "production",
"data_source": "oran_du",
"compliance_level": this.ptp_compliance,
"congestion_level": this.congestion_level,
"rf_quality": this.rsrp_quality
}
# Branch processing based on destination requirements
- branch:
request_map: 'root = this'
result_map: 'root.destinations = this'
processors:
# Grafana/OTEL format
- mapping: |
root.grafana = {
"timestamp": this.timestamp_ns,
"metrics": {
"ptp_offset_ns": this.ptp4l_offset_ns,
"ptp_score": this.ptp_score,
"prb_dl_pct": this.prb_dl_pct,
"prb_ul_pct": this.prb_ul_pct,
"cpu_pct": this.cpu_pct,
"rf_health_score": this.rf_health_score
},
"labels": {
"du_id": this.du_id,
"cell_id": this.cell_id,
"gnb_id": this.gnb_id,
"region": this.region,
"ptp_compliance": this.ptp_compliance
}
}
# Parquet format (flattened for columnar efficiency)
- mapping: |
root.parquet = {
"timestamp_ns": this.timestamp_ns,
"du_id": this.du_id,
"cell_id": this.cell_id,
"gnb_id": this.gnb_id,
"region": this.region,
"ptp4l_offset_ns": this.ptp4l_offset_ns,
"ptp_compliance": this.ptp_compliance,
"ptp_score": this.ptp_score,
"prb_dl_pct": this.prb_dl_pct,
"prb_ul_pct": this.prb_ul_pct,
"prb_efficiency": this.prb_efficiency,
"cpu_pct": this.cpu_pct,
"rsrp_dbm": this.rsrp_dbm,
"sinr_db": this.sinr_db,
"rf_health_score": this.rf_health_score
}
# Cloudera format (nested JSON for analytics)
- mapping: |
root.cloudera = {
"event_time": this.timestamp_iso,
"source": {
"du_id": this.du_id,
"cell_id": this.cell_id,
"site_info": this.network_hierarchy
},
"metrics": {
"timing": {
"ptp_offset_ns": this.ptp4l_offset_ns,
"compliance": this.ptp_compliance,
"score": this.ptp_score
},
"resources": {
"prb_dl_pct": this.prb_dl_pct,
"prb_ul_pct": this.prb_ul_pct,
"efficiency": this.prb_efficiency,
"congestion": this.congestion_level
},
"performance": {
"cpu_pct": this.cpu_pct,
"rf_health": this.rf_health_score
}
}
}
Validation and Error Handling
Implement robust validation for transformed data:
pipeline:
processors:
- mapping: |
# Validate transformed data
root = this
# Check for required fields after transformation
let required_fields = [
this.timestamp_ns != null,
this.du_id != "",
this.ptp_compliance != null,
this.prb_efficiency != null
]
root.validation_passed = required_fields.all()
# Add validation metadata
root.validation = {
"timestamp": timestamp_ns().ts_format("2006-01-02T15:04:05Z"),
"pipeline_stage": "transform",
"passed": root.validation_passed,
"errors": if !root.validation_passed {
required_fields.enumerate().filter(pair -> !pair.value).map(pair -> "missing_field_" + pair.index.string())
} else {
[]
}
}
# Drop invalid records or route to error handling
- switch:
- check: this.validation_passed == false
processors:
- mapping: |
# Log error and route to dead letter queue
root = this
root.error_type = "validation_failed"
root.destination = "error_queue"
Testing Transformations
Verify your transformation logic:
# Test with sample data
echo '{
"timestamp": "2024-02-10T17:23:45Z",
"du_id": "du-001",
"cell_id": "cell-1",
"ptp4l_offset_ns": -85,
"prb_dl_pct": 73,
"prb_ul_pct": 41,
"cpu_pct": 76,
"rsrp_dbm": -92,
"sinr_db": 15
}' | benthos test transform-config.yaml
Next Steps
With data transformed and enriched, proceed to multi-destination routing to send processed telemetry to all your observability and analytics platforms.