Skip to main content

Step 1: Collect O-RAN Metrics

Configure your Expanso Edge pipeline to collect telemetry from DU nodes using file-based or API-based inputs.

DU Telemetry Overview

O-RAN Distributed Units (DU) generate critical telemetry for network operations:

Metric CategoryDescriptionUpdate FrequencyUse Case
PTP TimingIEEE 1588 precision timing offset1Hz5G synchronization compliance
Resource BlocksPhysical Resource Block (PRB) utilization1HzCapacity planning, congestion detection
System PerformanceCPU, memory, interface utilization10sPerformance monitoring, SLA compliance
RF MeasurementsRSRP, SINR, CQI from UE reports1HzCoverage optimization, quality assurance

Collection Method 1: File-Based Input

Most DU implementations write telemetry to local log files. This approach reads files directly from mounted volumes.

DU File Structure

# Typical DU telemetry file structure
/opt/du/telemetry/
├── ptp4l/
│ └── offset.log # PTP offset measurements
├── scheduler/
│ ├── prb_dl.log # Downlink PRB utilization
│ └── prb_ul.log # Uplink PRB utilization
├── system/
│ ├── cpu.log # CPU utilization
│ └── memory.log # Memory usage
└── rf/
├── rsrp.log # Reference Signal Received Power
└── sinr.log # Signal-to-Interference+Noise Ratio

Pipeline Configuration

name: oran-file-collector
config:
input:
broker:
inputs:
# PTP timing data
- file:
paths: [ "/mnt/du-telemetry/ptp4l/offset.log" ]
scanner:
lines: {}
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "ptp_offset"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")

# Parse PTP offset from log line
# Example: "2024-02-10T17:23:45Z offset: -85ns"
let offset_match = this.re_find_all("offset: ([+-]?\\d+)ns")
root.ptp4l_offset_ns = offset_match.0.1.number()
root.raw_line = this

# PRB utilization data
- file:
paths: [ "/mnt/du-telemetry/scheduler/prb_*.log" ]
scanner:
lines: {}
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "prb_utilization"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")

# Parse PRB data from log line
# Example: "DL_PRB: 85%, UL_PRB: 42%"
let dl_match = this.re_find_all("DL_PRB: (\\d+)%")
let ul_match = this.re_find_all("UL_PRB: (\\d+)%")
root.prb_dl_pct = dl_match.0.1.number()
root.prb_ul_pct = ul_match.0.1.number()
root.raw_line = this

# System performance data
- file:
paths: [ "/mnt/du-telemetry/system/*.log" ]
scanner:
lines: {}
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "system_performance"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")

# Parse CPU utilization
# Example: "CPU: 76.4%, MEM: 8.2GB"
let cpu_match = this.re_find_all("CPU: ([\\d.]+)%")
let mem_match = this.re_find_all("MEM: ([\\d.]+)GB")
root.cpu_pct = cpu_match.0.1.number()
root.memory_gb = mem_match.0.1.number()
root.raw_line = this

# Continue to transformation step...

Collection Method 2: API-Based Input

For DUs that expose REST APIs, use HTTP polling for real-time collection.

DU API Endpoints

# Common O-RAN DU API patterns
GET /api/v1/metrics/ptp # PTP timing status
GET /api/v1/metrics/prb # Resource block utilization
GET /api/v1/metrics/system # System performance
GET /api/v1/metrics/rf # RF measurements

Pipeline Configuration

name: oran-api-collector
config:
input:
broker:
inputs:
# PTP metrics via API polling
- http_client:
url: "${DU_ENDPOINT}/api/v1/metrics/ptp"
verb: GET
headers:
Authorization: "Bearer ${DU_API_KEY}"
Accept: "application/json"
rate_limit: "1/s"
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "ptp_offset"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")
root.gnb_id = env("GNB_ID")

# Extract from JSON response
root.ptp4l_offset_ns = this.ptp.offset_nanoseconds
root.ptp_status = this.ptp.status
root.sync_source = this.ptp.master_id

# PRB metrics via API
- http_client:
url: "${DU_ENDPOINT}/api/v1/metrics/prb"
verb: GET
headers:
Authorization: "Bearer ${DU_API_KEY}"
rate_limit: "1/s"
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "prb_utilization"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")

# Extract PRB data from JSON
root.prb_dl_pct = this.scheduler.prb_dl_utilization_percent
root.prb_ul_pct = this.scheduler.prb_ul_utilization_percent
root.prb_total_available = this.scheduler.prb_total
root.active_ue_count = this.scheduler.active_ue_count

# System metrics via API
- http_client:
url: "${DU_ENDPOINT}/api/v1/metrics/system"
verb: GET
headers:
Authorization: "Bearer ${DU_API_KEY}"
rate_limit: "10s"
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "system_performance"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")

# Extract system metrics from JSON
root.cpu_pct = this.system.cpu_utilization_percent
root.memory_gb = this.system.memory_used_bytes / 1000000000
root.memory_pct = this.system.memory_utilization_percent
root.disk_pct = this.system.disk_utilization_percent
root.network_rx_mbps = this.system.network_rx_bytes_per_sec / 1000000
root.network_tx_mbps = this.system.network_tx_bytes_per_sec / 1000000

# Continue to transformation step...

Collection Method 3: Hybrid Approach

Combine file and API collection for comprehensive telemetry:

name: oran-hybrid-collector
config:
input:
broker:
inputs:
# Real-time metrics via API (high frequency)
- http_client:
url: "${DU_ENDPOINT}/api/v1/metrics/realtime"
verb: GET
rate_limit: "1/s"
processors:
- mapping: 'root.source = "api_realtime"'

# Historical/batch data via files (lower frequency)
- file:
paths: [ "/mnt/du-telemetry/batch/*.json" ]
scanner:
lines: {}
processors:
- mapping: 'root.source = "file_batch"'

# Performance data via SNMP polling
- generate:
interval: "30s"
mapping: |
# Trigger SNMP collection
root = {}
root.source = "snmp_poll"
root.timestamp = now()

Data Validation

Implement validation to ensure data quality:

# Add validation processor after collection
pipeline:
processors:
- mapping: |
# Validate required fields
root = if [
this.timestamp.type() == "string",
this.du_id.type() == "string",
this.metric_type.type() == "string"
].all() {
this
} else {
deleted() # Drop invalid records
}

- mapping: |
# Validate PTP offset ranges (-1000000ns to +1000000ns)
root = if this.metric_type == "ptp_offset" {
if this.ptp4l_offset_ns >= -1000000 && this.ptp4l_offset_ns <= 1000000 {
this
} else {
# Log anomaly but keep record with error flag
root = this
root.validation_error = "ptp_offset_out_of_range"
root
}
} else {
this
}

Testing Your Collection

Verify metrics are being collected correctly:

# Test file-based collection
echo '{"ptp_offset": -42, "timestamp": "2024-02-10T17:23:45Z"}' >> /mnt/du-telemetry/test.log

# Test API endpoint
curl -H "Authorization: Bearer ${DU_API_KEY}" \
"${DU_ENDPOINT}/api/v1/metrics/ptp"

# Monitor pipeline logs
kubectl logs -f deployment/oran-collector -n expanso-pipelines

Next Steps

With O-RAN metrics flowing into your pipeline, proceed to transform and enrich the data with Bloblang processing.