Step 1: Collect O-RAN Metrics
Configure your Expanso Edge pipeline to collect telemetry from DU nodes using file-based or API-based inputs.
DU Telemetry Overview
O-RAN Distributed Units (DU) generate critical telemetry for network operations:
| Metric Category | Description | Update Frequency | Use Case |
|---|---|---|---|
| PTP Timing | IEEE 1588 precision timing offset | 1Hz | 5G synchronization compliance |
| Resource Blocks | Physical Resource Block (PRB) utilization | 1Hz | Capacity planning, congestion detection |
| System Performance | CPU, memory, interface utilization | 10s | Performance monitoring, SLA compliance |
| RF Measurements | RSRP, SINR, CQI from UE reports | 1Hz | Coverage optimization, quality assurance |
Collection Method 1: File-Based Input
Most DU implementations write telemetry to local log files. This approach reads files directly from mounted volumes.
DU File Structure
# Typical DU telemetry file structure
/opt/du/telemetry/
├── ptp4l/
│ └── offset.log # PTP offset measurements
├── scheduler/
│ ├── prb_dl.log # Downlink PRB utilization
│ └── prb_ul.log # Uplink PRB utilization
├── system/
│ ├── cpu.log # CPU utilization
│ └── memory.log # Memory usage
└── rf/
├── rsrp.log # Reference Signal Received Power
└── sinr.log # Signal-to-Interference+Noise Ratio
Pipeline Configuration
name: oran-file-collector
config:
input:
broker:
inputs:
# PTP timing data
- file:
paths: [ "/mnt/du-telemetry/ptp4l/offset.log" ]
scanner:
lines: {}
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "ptp_offset"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")
# Parse PTP offset from log line
# Example: "2024-02-10T17:23:45Z offset: -85ns"
let offset_match = this.re_find_all("offset: ([+-]?\\d+)ns")
root.ptp4l_offset_ns = offset_match.0.1.number()
root.raw_line = this
# PRB utilization data
- file:
paths: [ "/mnt/du-telemetry/scheduler/prb_*.log" ]
scanner:
lines: {}
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "prb_utilization"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")
# Parse PRB data from log line
# Example: "DL_PRB: 85%, UL_PRB: 42%"
let dl_match = this.re_find_all("DL_PRB: (\\d+)%")
let ul_match = this.re_find_all("UL_PRB: (\\d+)%")
root.prb_dl_pct = dl_match.0.1.number()
root.prb_ul_pct = ul_match.0.1.number()
root.raw_line = this
# System performance data
- file:
paths: [ "/mnt/du-telemetry/system/*.log" ]
scanner:
lines: {}
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "system_performance"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")
# Parse CPU utilization
# Example: "CPU: 76.4%, MEM: 8.2GB"
let cpu_match = this.re_find_all("CPU: ([\\d.]+)%")
let mem_match = this.re_find_all("MEM: ([\\d.]+)GB")
root.cpu_pct = cpu_match.0.1.number()
root.memory_gb = mem_match.0.1.number()
root.raw_line = this
# Continue to transformation step...
Collection Method 2: API-Based Input
For DUs that expose REST APIs, use HTTP polling for real-time collection.
DU API Endpoints
# Common O-RAN DU API patterns
GET /api/v1/metrics/ptp # PTP timing status
GET /api/v1/metrics/prb # Resource block utilization
GET /api/v1/metrics/system # System performance
GET /api/v1/metrics/rf # RF measurements
Pipeline Configuration
name: oran-api-collector
config:
input:
broker:
inputs:
# PTP metrics via API polling
- http_client:
url: "${DU_ENDPOINT}/api/v1/metrics/ptp"
verb: GET
headers:
Authorization: "Bearer ${DU_API_KEY}"
Accept: "application/json"
rate_limit: "1/s"
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "ptp_offset"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")
root.gnb_id = env("GNB_ID")
# Extract from JSON response
root.ptp4l_offset_ns = this.ptp.offset_nanoseconds
root.ptp_status = this.ptp.status
root.sync_source = this.ptp.master_id
# PRB metrics via API
- http_client:
url: "${DU_ENDPOINT}/api/v1/metrics/prb"
verb: GET
headers:
Authorization: "Bearer ${DU_API_KEY}"
rate_limit: "1/s"
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "prb_utilization"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")
# Extract PRB data from JSON
root.prb_dl_pct = this.scheduler.prb_dl_utilization_percent
root.prb_ul_pct = this.scheduler.prb_ul_utilization_percent
root.prb_total_available = this.scheduler.prb_total
root.active_ue_count = this.scheduler.active_ue_count
# System metrics via API
- http_client:
url: "${DU_ENDPOINT}/api/v1/metrics/system"
verb: GET
headers:
Authorization: "Bearer ${DU_API_KEY}"
rate_limit: "10s"
processors:
- mapping: |
root.timestamp = now()
root.metric_type = "system_performance"
root.du_id = env("DU_ID")
root.cell_id = env("CELL_ID")
# Extract system metrics from JSON
root.cpu_pct = this.system.cpu_utilization_percent
root.memory_gb = this.system.memory_used_bytes / 1000000000
root.memory_pct = this.system.memory_utilization_percent
root.disk_pct = this.system.disk_utilization_percent
root.network_rx_mbps = this.system.network_rx_bytes_per_sec / 1000000
root.network_tx_mbps = this.system.network_tx_bytes_per_sec / 1000000
# Continue to transformation step...
Collection Method 3: Hybrid Approach
Combine file and API collection for comprehensive telemetry:
name: oran-hybrid-collector
config:
input:
broker:
inputs:
# Real-time metrics via API (high frequency)
- http_client:
url: "${DU_ENDPOINT}/api/v1/metrics/realtime"
verb: GET
rate_limit: "1/s"
processors:
- mapping: 'root.source = "api_realtime"'
# Historical/batch data via files (lower frequency)
- file:
paths: [ "/mnt/du-telemetry/batch/*.json" ]
scanner:
lines: {}
processors:
- mapping: 'root.source = "file_batch"'
# Performance data via SNMP polling
- generate:
interval: "30s"
mapping: |
# Trigger SNMP collection
root = {}
root.source = "snmp_poll"
root.timestamp = now()
Data Validation
Implement validation to ensure data quality:
# Add validation processor after collection
pipeline:
processors:
- mapping: |
# Validate required fields
root = if [
this.timestamp.type() == "string",
this.du_id.type() == "string",
this.metric_type.type() == "string"
].all() {
this
} else {
deleted() # Drop invalid records
}
- mapping: |
# Validate PTP offset ranges (-1000000ns to +1000000ns)
root = if this.metric_type == "ptp_offset" {
if this.ptp4l_offset_ns >= -1000000 && this.ptp4l_offset_ns <= 1000000 {
this
} else {
# Log anomaly but keep record with error flag
root = this
root.validation_error = "ptp_offset_out_of_range"
root
}
} else {
this
}
Testing Your Collection
Verify metrics are being collected correctly:
# Test file-based collection
echo '{"ptp_offset": -42, "timestamp": "2024-02-10T17:23:45Z"}' >> /mnt/du-telemetry/test.log
# Test API endpoint
curl -H "Authorization: Bearer ${DU_API_KEY}" \
"${DU_ENDPOINT}/api/v1/metrics/ptp"
# Monitor pipeline logs
kubectl logs -f deployment/oran-collector -n expanso-pipelines
Next Steps
With O-RAN metrics flowing into your pipeline, proceed to transform and enrich the data with Bloblang processing.