Skip to main content

Step 2: Transform and Enrich Data

Use Bloblang to process raw O-RAN telemetry, adding derived metrics, compliance classifications, and site metadata before routing to destinations.

Transformation Overview

Raw DU telemetry needs processing for effective analysis:

  • Timestamp normalization: Convert to nanosecond precision
  • Compliance classification: Apply PTP timing thresholds
  • Derived metrics: Calculate efficiency and health scores
  • Site enrichment: Add cell, sector, and regional metadata
  • Format conversion: Prepare for destination-specific schemas

Core Transformations

Timestamp Normalization

Ensure consistent timestamp formats across all metrics:

pipeline:
processors:
- mapping: |
# Normalize timestamps to RFC3339 with nanosecond precision
root = this

# Convert various timestamp formats to standard
root.timestamp_ns = match {
this.timestamp.type() == "string" => this.timestamp.ts_parse("2006-01-02T15:04:05Z").ts_unix_nano()
this.timestamp.type() == "number" => this.timestamp * 1000000000 # Convert seconds to nanoseconds
_ => now().ts_unix_nano()
}

# Standardized ISO timestamp
root.timestamp_iso = timestamp_ns().ts_format("2006-01-02T15:04:05.000000000Z")

# Processing timestamp for pipeline tracking
root.processed_at = now()

PTP Compliance Classification

Apply 5G timing requirements to PTP offset measurements:

pipeline:
processors:
- mapping: |
root = this

# PTP compliance thresholds for 5G networks
# Based on ITU-T G.8271.1 and 3GPP TS 38.104
let abs_offset = if this.ptp4l_offset_ns < 0 {
-this.ptp4l_offset_ns
} else {
this.ptp4l_offset_ns
}

root.ptp_compliance = match {
$abs_offset < 100 => "compliant" # Within ±100ns - fully compliant
$abs_offset < 1000 => "degraded" # ±100-1000ns - degraded performance
$abs_offset < 10000 => "critical" # ±1-10μs - critical timing error
_ => "non_compliant" # >±10μs - non-compliant
}

# Compliance score (0-100)
root.ptp_score = match {
$abs_offset < 100 => 100
$abs_offset < 1000 => 100 - (($abs_offset - 100) / 9) # 100-90
$abs_offset < 10000 => 90 - (($abs_offset - 1000) / 100) # 90-0
_ => 0
}.round()

# Alert level
root.ptp_alert_level = match this.ptp_compliance {
"compliant" => "none"
"degraded" => "warning"
"critical" => "major"
"non_compliant" => "critical"
}

PRB Efficiency Calculations

Calculate resource utilization efficiency and congestion indicators:

pipeline:
processors:
- mapping: |
root = this

# PRB efficiency calculations
root.prb_total_utilization = (this.prb_dl_pct + this.prb_ul_pct) / 2
root.prb_efficiency = root.prb_total_utilization.round()

# Spectral efficiency (bits/Hz) - typical values for 5G
root.spectral_efficiency_dl = match {
this.prb_dl_pct < 30 => 2.4 # Low load - high efficiency
this.prb_dl_pct < 70 => 3.2 # Medium load - optimal efficiency
this.prb_dl_pct < 90 => 2.8 # High load - decreasing efficiency
_ => 1.8 # Congestion - poor efficiency
}

# Congestion classification
root.congestion_level = match {
this.prb_dl_pct < 50 => "low"
this.prb_dl_pct < 80 => "medium"
this.prb_dl_pct < 95 => "high"
_ => "critical"
}

# Capacity headroom (percentage remaining)
root.capacity_headroom_pct = 100 - this.prb_dl_pct

# Predicted time to congestion (minutes) - simple linear projection
root.time_to_congestion_min = if this.prb_dl_pct > 70 {
(95 - this.prb_dl_pct) / 0.5 # Assuming 0.5% growth per minute
} else {
null
}

RF Quality Assessment

Process RF measurements for coverage and quality analysis:

pipeline:
processors:
- mapping: |
root = this

# RSRP (Reference Signal Received Power) classification
# Based on 3GPP TS 36.133 and typical deployment thresholds
root.rsrp_quality = match {
this.rsrp_dbm > -80 => "excellent" # >-80 dBm
this.rsrp_dbm > -90 => "good" # -80 to -90 dBm
this.rsrp_dbm > -100 => "fair" # -90 to -100 dBm
this.rsrp_dbm > -110 => "poor" # -100 to -110 dBm
_ => "very_poor" # <-110 dBm
}

# SINR (Signal-to-Interference+Noise Ratio) classification
root.sinr_quality = match {
this.sinr_db > 20 => "excellent" # >20 dB
this.sinr_db > 13 => "good" # 13-20 dB
this.sinr_db > 0 => "fair" # 0-13 dB
this.sinr_db > -6 => "poor" # -6-0 dB
_ => "very_poor" # <-6 dB
}

# Overall RF health score (0-100)
let rsrp_score = match {
this.rsrp_dbm > -80 => 100
this.rsrp_dbm > -90 => 80
this.rsrp_dbm > -100 => 60
this.rsrp_dbm > -110 => 40
_ => 20
}

let sinr_score = match {
this.sinr_db > 20 => 100
this.sinr_db > 13 => 80
this.sinr_db > 0 => 60
this.sinr_db > -6 => 40
_ => 20
}

root.rf_health_score = ((rsrp_score + sinr_score) / 2).round()

# Coverage area estimation (rough approximation)
root.estimated_coverage_radius_m = match {
this.rsrp_dbm > -80 => 500
this.rsrp_dbm > -90 => 1000
this.rsrp_dbm > -100 => 2000
this.rsrp_dbm > -110 => 3000
_ => 5000
}

Site Metadata Enrichment

Add contextual information about cell sites and network topology:

pipeline:
processors:
- mapping: |
root = this

# Site metadata from environment variables or lookup tables
root.region = env("REGION") | "us-west-2"
root.market = env("MARKET") | "seattle"
root.site_type = env("SITE_TYPE") | "urban-macro"

# Generate site identifiers
root.gnb_id = env("GNB_ID") | "gnb-" + this.region + "-" + this.cell_id.string().pad_left(3, "0")
root.enb_id = this.gnb_id # Backwards compatibility

# Cell sector information
root.sector = match this.cell_id {
"cell-1" => "alpha" # 0-120 degrees
"cell-2" => "beta" # 120-240 degrees
"cell-3" => "gamma" # 240-360 degrees
_ => "omni" # Omnidirectional
}

# Geographic coordinates (example - would come from CMDB)
root.coordinates = {
"latitude": 47.6062,
"longitude": -122.3321,
"altitude_m": 15
}

# Network hierarchy
root.network_hierarchy = {
"region": this.region,
"market": this.market,
"site_id": this.gnb_id,
"cell_id": this.cell_id,
"sector": this.sector
}

# Deployment metadata
root.deployment_info = {
"vendor": "ericsson",
"software_version": "22.Q4",
"hardware_model": "AIR-3268",
"deployment_date": "2023-08-15T00:00:00Z"
}

Multi-Destination Data Preparation

Format data appropriately for each destination:

pipeline:
processors:
# Common enrichment for all destinations
- mapping: |
root = this

# Add universal metadata
root.pipeline_id = "oran-edge-telemetry"
root.pipeline_version = "v1.2.0"
root.edge_node_id = env("NODE_ID")
root.collection_method = this.source | "api"

# Generate unique record ID
root.record_id = uuid_v4()

# Add tags for downstream filtering
root.tags = {
"environment": "production",
"data_source": "oran_du",
"compliance_level": this.ptp_compliance,
"congestion_level": this.congestion_level,
"rf_quality": this.rsrp_quality
}

# Branch processing based on destination requirements
- branch:
request_map: 'root = this'
result_map: 'root.destinations = this'
processors:
# Grafana/OTEL format
- mapping: |
root.grafana = {
"timestamp": this.timestamp_ns,
"metrics": {
"ptp_offset_ns": this.ptp4l_offset_ns,
"ptp_score": this.ptp_score,
"prb_dl_pct": this.prb_dl_pct,
"prb_ul_pct": this.prb_ul_pct,
"cpu_pct": this.cpu_pct,
"rf_health_score": this.rf_health_score
},
"labels": {
"du_id": this.du_id,
"cell_id": this.cell_id,
"gnb_id": this.gnb_id,
"region": this.region,
"ptp_compliance": this.ptp_compliance
}
}

# Parquet format (flattened for columnar efficiency)
- mapping: |
root.parquet = {
"timestamp_ns": this.timestamp_ns,
"du_id": this.du_id,
"cell_id": this.cell_id,
"gnb_id": this.gnb_id,
"region": this.region,
"ptp4l_offset_ns": this.ptp4l_offset_ns,
"ptp_compliance": this.ptp_compliance,
"ptp_score": this.ptp_score,
"prb_dl_pct": this.prb_dl_pct,
"prb_ul_pct": this.prb_ul_pct,
"prb_efficiency": this.prb_efficiency,
"cpu_pct": this.cpu_pct,
"rsrp_dbm": this.rsrp_dbm,
"sinr_db": this.sinr_db,
"rf_health_score": this.rf_health_score
}

# Cloudera format (nested JSON for analytics)
- mapping: |
root.cloudera = {
"event_time": this.timestamp_iso,
"source": {
"du_id": this.du_id,
"cell_id": this.cell_id,
"site_info": this.network_hierarchy
},
"metrics": {
"timing": {
"ptp_offset_ns": this.ptp4l_offset_ns,
"compliance": this.ptp_compliance,
"score": this.ptp_score
},
"resources": {
"prb_dl_pct": this.prb_dl_pct,
"prb_ul_pct": this.prb_ul_pct,
"efficiency": this.prb_efficiency,
"congestion": this.congestion_level
},
"performance": {
"cpu_pct": this.cpu_pct,
"rf_health": this.rf_health_score
}
}
}

Validation and Error Handling

Implement robust validation for transformed data:

pipeline:
processors:
- mapping: |
# Validate transformed data
root = this

# Check for required fields after transformation
let required_fields = [
this.timestamp_ns != null,
this.du_id != "",
this.ptp_compliance != null,
this.prb_efficiency != null
]

root.validation_passed = required_fields.all()

# Add validation metadata
root.validation = {
"timestamp": timestamp_ns().ts_format("2006-01-02T15:04:05Z"),
"pipeline_stage": "transform",
"passed": root.validation_passed,
"errors": if !root.validation_passed {
required_fields.enumerate().filter(pair -> !pair.value).map(pair -> "missing_field_" + pair.index.string())
} else {
[]
}
}

# Drop invalid records or route to error handling
- switch:
- check: this.validation_passed == false
processors:
- mapping: |
# Log error and route to dead letter queue
root = this
root.error_type = "validation_failed"
root.destination = "error_queue"

Testing Transformations

Verify your transformation logic:

# Test with sample data
echo '{
"timestamp": "2024-02-10T17:23:45Z",
"du_id": "du-001",
"cell_id": "cell-1",
"ptp4l_offset_ns": -85,
"prb_dl_pct": 73,
"prb_ul_pct": 41,
"cpu_pct": 76,
"rsrp_dbm": -92,
"sinr_db": 15
}' | benthos test transform-config.yaml

Next Steps

With data transformed and enriched, proceed to multi-destination routing to send processed telemetry to all your observability and analytics platforms.