Telemetry Data Collection & Processing
This guide explains how to collect, process, and manage IoT device telemetry data on the De. platform.
Telemetry Overview
Telemetry in IoT refers to the automatic transmission of data from remote devices to a central system for monitoring, analysis, and control purposes. De.IoTB provides a comprehensive framework for handling telemetry data throughout its lifecycle, from ingestion to storage and analysis.
Data Flow Architecture
The telemetry data flow involves the following steps:
- Data Ingestion: Device publishes telemetry data to MQTT topics
- Channel Management: MQTT client subscribes to topic patterns and receives messages
- Data Normalization: Active Rules engine parses and transforms raw data
- Event Routing: Event Manager directs normalized data to appropriate destinations
- Data Storage: Normalized data is stored in MongoDB collections
- Real-time Delivery: Events are delivered to subscribed clients via Socket.IO
Telemetry Topic Structure
The De. platform uses a consistent topic structure for telemetry data:
{workspace_id}/devices/{device_id}/telemetry/{category}Common Telemetry Categories
| Category | Description | Example Data |
|---|---|---|
| location | Device location data | { lat: 37.7749, lng: -122.4194, accuracy: 5 } |
| sensors | Sensor readings | { temperature: 22.5, humidity: 65.2 } |
| status | Device status information | { battery: 87, signal: 72, memory: 45 } |
| diagnostics | System diagnostics | { cpu: 12, memory: 45, storage: 68 } |
| network | Network connectivity | { strength: 82, type: "4G", latency: 120 } |
Example Telemetry Topics
ws_abc/devices/dev_123/telemetry/location
ws_abc/devices/dev_123/telemetry/sensors
ws_abc/devices/dev_123/telemetry/status
ws_abc/devices/dev_123/telemetry/diagnosticsTelemetry Data Formats
De.IoTB supports multiple data formats to accommodate different device capabilities and use cases.
JSON Format (Recommended)
{
"timestamp": 1737429743000,
"value": 23.5,
"unit": "celsius",
"accuracy": 0.1,
"metadata": {
"sensorId": "temp-1",
"location": "outdoor"
}
}CSV Format with Active Rules
For bandwidth-constrained devices, sending compact CSV format:
TEMP,23.5,0.1,temp-1,outdoor$This format requires an Active Rule configuration:
const rule = {
head: ["TEMP", "TEMPERATURE"], // Message identifiers
struct: "HEAD,{value},{accuracy},{sensorId},{location}$",
schema: {
value: { type: "float", unit: "celsius" },
accuracy: { type: "float" },
sensorId: { type: "string" },
location: { type: "string" }
}
}Binary Data Support
For even more efficient transmission, De.IoTB supports binary data formats:
// On the device (using Protocol Buffers)
const payload = proto.encode({
timestamp: Date.now(),
value: 23.5,
accuracy: 0.1
})
// Publish binary data
client.publish(`${workspaceId}/devices/${deviceId}/telemetry/sensors`, payload)To process binary data, you need a corresponding decoder in your Active Rule.
Publishing Telemetry
MQTT Publishing (Device-Side)
// JavaScript example (Node.js)
const mqtt = require('mqtt')
const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
clientId: `device_${deviceId}_${Date.now()}`,
username: credentials.username,
password: credentials.password
})
// Simple telemetry publishing
function publishTelemetry(category, data) {
const topic = `${workspaceId}/devices/${deviceId}/telemetry/${category}`
const payload = JSON.stringify({
...data,
timestamp: Date.now()
})
client.publish(topic, payload, { qos: 1 }, (err) => {
if (err) console.error(`Failed to publish to ${topic}:`, err)
})
}
// Publish sensor data every minute
setInterval(() => {
publishTelemetry('sensors', {
temperature: 22.5 + (Math.random() * 2 - 1), // Random variation
humidity: 65 + (Math.random() * 5 - 2.5),
pressure: 1013.25 + (Math.random() * 1 - 0.5)
})
}, 60000)
// Publish device status every 5 minutes
setInterval(() => {
publishTelemetry('status', {
battery: Math.floor(batteryLevel),
signal: Math.floor(signalStrength),
uptime: process.uptime(),
memory: process.memoryUsage().heapUsed / 1024 / 1024
})
}, 300000)HTTP API (Alternative)
For devices that can't use MQTT:
// Using fetch API
async function sendTelemetry(category, data) {
try {
const response = await fetch(`https://iot.dedot.io/devices/${deviceId}/telemetry/${category}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${accessToken}`
},
body: JSON.stringify({
...data,
timestamp: Date.now()
})
})
if (!response.ok) {
throw new Error(`HTTP error ${response.status}`)
}
return await response.json()
} catch (error) {
console.error('Failed to send telemetry:', error)
}
}Telemetry Processing
De.IoTB processes telemetry data through the Active Rules engine.
Active Rules for Telemetry
The Active Rules engine provides schema-based parsing and normalization of device messages:
// Server-side rule definition
const temperatureRule = {
head: ["TEMP", "TEMPERATURE"],
struct: "HEAD,{value},{accuracy},{sensorId},{location}$",
schema: {
value: {
type: "float",
unit: "celsius",
description: "Temperature reading"
},
accuracy: {
type: "float",
description: "Reading accuracy"
},
sensorId: {
type: "string",
description: "Sensor identifier"
},
location: {
type: "string",
description: "Sensor location"
}
},
connectors: ["ws_abc"], // Applicable to specific workspace
enabled: true
}When a device sends a message like "TEMP,23.5,0.1,temp-1,outdoor$", the Active Rules engine:
- Identifies the message format from the "TEMP" header
- Applies the matching rule schema
- Extracts and converts fields to their specified types
- Normalizes the data to a structured JSON object
Type Transformations
The Active Rules engine performs automatic type conversion:
| Type | Description | Example Input | Transformed Output |
|---|---|---|---|
| string | No transformation | "hello" | "hello" |
| number | Parse as number | "42" | 42 |
| integer | Parse as integer | "42.9" | 42 |
| float | Parse as floating point | "42.9" | 42.9 |
| boolean | Convert to boolean | "true" or "1" | true |
| array | Split comma-separated values | "a,b,c" | ["a", "b", "c"] |
Data Storage
De.IoTB stores telemetry data in MongoDB collections with optimized indexing.
Records Collection
Telemetry records are stored in the records collection with the following schema:
interface TelemetryRecord {
_id: ObjectId // MongoDB document ID
nsp: string // Namespace (workspace)
chn: string // Channel ID
type: 'REPORT' // Record type (telemetry is REPORT)
status: 'SUCCESS' | 'FAILED'
topic: string // Full MQTT topic
rule?: string // Matched rule reference
qos?: number // MQTT QoS level
raw: string // Original raw message
normalized: object // Parsed data object
timestamp: number // Message timestamp
}Query API
Retrieve telemetry data through the De. SDK:
// Get latest telemetry for a device
const latestTelemetry = await access.request({
url: `/devices/${deviceId}/telemetry/latest`,
method: 'GET',
query: {
category: 'sensors' // Optional filter by category
}
})
console.log('Latest telemetry:', latestTelemetry.data)
// Query historical telemetry
const telemetryHistory = await access.request({
url: `/devices/${deviceId}/telemetry`,
method: 'GET',
query: {
category: 'sensors',
startTime: '2026-01-01T00:00:00.000Z',
endTime: '2026-01-19T23:59:59.999Z',
limit: 100,
page: 1,
sort: 'timestamp:desc'
}
})
console.log(`Retrieved ${telemetryHistory.data.length} records`)Aggregation & Statistics
// Get statistics for device telemetry
const statistics = await access.request({
url: `/devices/${deviceId}/telemetry/stats`,
method: 'GET',
query: {
category: 'sensors',
field: 'temperature',
timeframe: '24h' // '1h', '24h', '7d', '30d'
}
})
console.log('Temperature statistics:', statistics.data)
// Output:
// {
// count: 1440, // Number of data points
// min: 19.2, // Minimum value
// max: 26.8, // Maximum value
// avg: 22.7, // Average value
// stdDev: 1.2, // Standard deviation
// latest: 23.5, // Most recent value
// trend: "+0.2/h" // Trend (change per hour)
// }Real-time Data Access
De.IoTB enables real-time access to telemetry data via Socket.IO.
Socket.IO Client Integration
import io from 'socket.io-client'
const socket = io('wss://iot.dedot.io', {
auth: {
token: accessToken,
workspace: workspaceId
}
})
// Subscribe to specific device telemetry
socket.emit('subscribe:device', {
deviceId: 'dev_123',
categories: ['sensors', 'location']
})
// Listen for telemetry events
socket.on('telemetry', (data) => {
console.log(`Received ${data.category} data:`, data)
// Update UI with real-time data
updateDashboard(data)
})
// Subscribe to multiple devices
socket.emit('subscribe:devices', {
deviceIds: ['dev_123', 'dev_456', 'dev_789'],
categories: ['location'] // Just track locations
})
// You can also subscribe by group
socket.emit('subscribe:group', {
groupId: 'delivery-fleet',
categories: ['location', 'status']
})
// Unsubscribe when no longer needed
socket.emit('unsubscribe:device', { deviceId: 'dev_123' })Rate Limiting
De.IoTB implements rate limiting for real-time data subscriptions:
// Configure rate limits when provisioning a device
const deviceConfig = await access.request({
url: `/devices/${deviceId}/config`,
method: 'PATCH',
body: {
telemetry: {
throttling: {
sensors: 60, // One message per minute
location: 30, // One message per 30 seconds
status: 300 // One message per 5 minutes
},
aggregation: {
enabled: true, // Enable server-side aggregation
interval: 60 // Aggregate into 60-second buckets
}
}
}
})Data Retention & Time-Series Storage
De.IoTB provides configurable data retention policies:
// Configure workspace-level data retention
const retention = await access.request({
url: '/workspace/config/retention',
method: 'PATCH',
body: {
telemetry: {
default: '30d', // Keep all telemetry for 30 days
location: '90d', // Keep location data for 90 days
sensors: {
raw: '7d', // Keep raw data for 7 days
hourly: '180d', // Keep hourly aggregates for 180 days
daily: '3y' // Keep daily aggregates for 3 years
}
}
}
})Advanced Features
Time-Series Analysis
// Perform time-series analysis on telemetry data
const analysis = await access.request({
url: `/devices/${deviceId}/telemetry/analysis`,
method: 'POST',
body: {
category: 'sensors',
field: 'temperature',
timeframe: '7d',
operations: [
{ type: 'movingAverage', window: 5 },
{ type: 'outlierDetection', sensitivity: 2 },
{ type: 'forecast', horizon: 24 }
]
}
})
console.log('Analysis results:', analysis.data)Data Export
// Export telemetry data
const exportUrl = await access.request({
url: '/telemetry/export',
method: 'POST',
body: {
deviceIds: ['dev_123', 'dev_456'],
categories: ['sensors', 'location'],
timeframe: {
start: '2026-01-01T00:00:00.000Z',
end: '2026-01-31T23:59:59.999Z'
},
format: 'csv' // 'csv', 'json', or 'excel'
}
})
console.log('Export ready at:', exportUrl.data.url)Batch Imports
// Import historical telemetry data
const importJob = await access.request({
url: `/devices/${deviceId}/telemetry/import`,
method: 'POST',
body: {
category: 'sensors',
format: 'csv',
mapping: {
timestamp: 'column_a',
temperature: 'column_b',
humidity: 'column_c'
},
fileUrl: 'https://storage.example.com/historical-data.csv'
}
})
console.log('Import job started:', importJob.data.jobId)Best Practices
- Optimize Message Size: Keep telemetry messages compact to reduce bandwidth
- Use Appropriate QoS: Select QoS levels based on data criticality
- Include Timestamps: Always include device-side timestamps in telemetry
- Batch Small Readings: Combine frequent small readings for efficiency
- Use Categories: Organize telemetry into logical categories
- Normalize Units: Use consistent units across your device fleet
- Set Retention Policies: Configure data retention based on business needs
- Use Data Aggregation: Enable server-side aggregation for high-frequency data

