Command & Control Operations
This guide explains how to implement cloud-to-device commands in the De. IoT platform.
Command Overview
Command & Control (C2) operations enable applications to send instructions to IoT devices. De.IoTB provides a bidirectional communication channel for sending commands to devices and receiving acknowledgments or responses.
Command Flow Architecture
┌──────────────┐ ┌───────────────┐ ┌─────────────┐
│ │ │ │ │ │
│ Cloud App │────1────▶ De.IoTB │────2────▶ IoT Device │
│ │ │ │ │ │
└──────┬───────┘ └───────┬───────┘ └──────┬──────┘
│ │ │
│ │ │
│ │ │
│ │ ┌─────────────▼─────────────┐
│ │ │ │
│ │ │ Device executes command │
│ │ │ │
│ │ └─────────────┬─────────────┘
│ │ │
│ │ │
┌──────▼───────┐ ┌───────▼───────┐ ┌────────▼───────┐
│ │ │ │ │ │
│ Cloud App │◀───4────│ De.IoTB │◀──3───│ IoT Device │
│ │ │ │ │ │
└──────────────┘ └───────────────┘ └────────────────┘- Client application sends a command via API or WebSocket
- De.IoTB routes command to the target device via MQTT
- Device executes command and sends response/acknowledgment
- De.IoTB routes the response back to the client application
Command Topic Structure
Commands use a standardized topic structure:
{workspace_id}/devices/{device_id}/commands/{command_type}Common Command Types
| Command Type | Description | Example Payload |
|---|---|---|
| config | Update device configuration | { "reportInterval": 60, "mode": "lowPower" } |
| reboot | Restart the device | { "delay": 5, "reason": "maintenance" } |
| update | Trigger firmware update | { "version": "1.2.3", "url": "https://..." } |
| query | Request specific information | { "fields": ["status", "diagnostics"] } |
| action | Execute specific action | { "action": "captureImage", "params": {...} } |
Example Command Topics
ws_abc/devices/dev_123/commands/config
ws_abc/devices/dev_123/commands/reboot
ws_abc/devices/dev_123/commands/updateSending Commands
REST API Method
Send commands using the REST API:
javascript
import De from '@de./sdk'
const access = new De.Access({
workspace: 'your-workspace-id',
accessToken: 'your-access-token',
env: 'prod'
})
// Send a command to a device
const commandResponse = await access.request({
url: `/devices/${deviceId}/commands/config`,
method: 'POST',
body: {
reportInterval: 60, // Update reporting interval to 60 seconds
mode: 'balanced', // Set power mode to 'balanced'
telemetry: {
enabled: true,
categories: ['sensors', 'location']
}
}
})
console.log('Command sent:', commandResponse.data)
// Output:
// {
// commandId: "cmd_abc123def456",
// status: "sent",
// timestamp: "2026-01-19T15:30:22.000Z",
// timeoutSeconds: 30,
// acknowledgment: false
// }WebSocket Method
Send commands via WebSocket for real-time applications:
javascript
import io from 'socket.io-client'
const socket = io('wss://iot.dedot.io', {
auth: {
token: accessToken,
workspace: workspaceId
}
})
// Send a command to a device
socket.emit('command', {
deviceId: 'dev_123',
type: 'config',
payload: {
reportInterval: 60,
mode: 'balanced'
},
options: {
timeout: 30, // Command timeout in seconds
priority: 'normal', // 'low', 'normal', or 'high'
ackRequired: true // Require acknowledgment
}
}, (response) => {
// Callback with command receipt confirmation
console.log('Command registered:', response)
})
// Listen for command status updates
socket.on('command:status', (update) => {
console.log(`Command ${update.commandId} status: ${update.status}`)
console.log('Response data:', update.response)
})Batch Commands
Send the same command to multiple devices:
javascript
// Batch command via REST API
const batchResponse = await access.request({
url: '/devices/batch/commands',
method: 'POST',
body: {
deviceIds: ['dev_123', 'dev_456', 'dev_789'],
type: 'reboot',
payload: {
delay: 30, // Seconds before reboot
reason: 'maintenance'
},
options: {
timeout: 60
}
}
})
console.log(`Commands sent to ${batchResponse.data.successful} devices`)
console.log(`Failed: ${batchResponse.data.failed}`)Command Options
De.IoTB supports various command options:
javascript
const commandOptions = {
// Command timeout (seconds)
timeout: 30,
// Command priority (affects processing order)
priority: 'normal', // 'low', 'normal', or 'high'
// Require acknowledgment
ackRequired: true,
// Execute at specific time
scheduledTime: '2026-01-20T08:00:00.000Z',
// Expire command if not delivered
expireAfter: 3600, // seconds
// Retry configuration
retry: {
count: 3,
interval: 10 // seconds between retries
},
// Conditional execution
condition: {
field: 'battery', // Device state field
operator: '>', // Comparison operator
value: 20 // Threshold value
},
// Command metadata (for tracking purposes)
metadata: {
initiator: '[email protected]',
reason: 'performance optimization',
ticketId: 'MAINT-123'
}
}
// Send command with options
await access.request({
url: `/devices/${deviceId}/commands/config`,
method: 'POST',
body: {
...configPayload,
options: commandOptions
}
})Receiving Commands on Devices
JavaScript (Node.js)
javascript
const mqtt = require('mqtt')
const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
clientId: `device_${deviceId}_${Date.now()}`,
username: credentials.username,
password: credentials.password
})
// Subscribe to commands
client.on('connect', () => {
// Subscribe to all command types
client.subscribe(`${workspaceId}/devices/${deviceId}/commands/#`, { qos: 1 })
// Alternatively, subscribe to specific command types
// client.subscribe(`${workspaceId}/devices/${deviceId}/commands/config`, { qos: 1 })
// client.subscribe(`${workspaceId}/devices/${deviceId}/commands/reboot`, { qos: 1 })
})
// Handle incoming commands
client.on('message', (topic, payload) => {
try {
console.log(`Received message on topic: ${topic}`)
// Extract command type from topic
const topicParts = topic.split('/')
const commandType = topicParts[topicParts.length - 1]
// Parse command payload
const command = JSON.parse(payload.toString())
console.log(`Command type: ${commandType}`, command)
// Extract command ID if present
const commandId = command.commandId
// Process command based on type
switch (commandType) {
case 'config':
handleConfigCommand(command)
break
case 'reboot':
handleRebootCommand(command)
break
case 'update':
handleUpdateCommand(command)
break
default:
console.log(`Unknown command type: ${commandType}`)
}
// Send acknowledgment if command ID is provided
if (commandId) {
sendCommandAcknowledgment(commandId, 'received')
// After processing is complete, send success status
// sendCommandResponse(commandId, 'success', { result: 'Configuration updated' })
}
} catch (error) {
console.error('Error processing command:', error)
// Send error response if command ID is available
if (command?.commandId) {
sendCommandResponse(commandId, 'error', {
error: error.message
})
}
}
})
// Function to send command acknowledgment
function sendCommandAcknowledgment(commandId, status) {
const topic = `${workspaceId}/devices/${deviceId}/commands/ack`
const payload = JSON.stringify({
commandId,
status,
timestamp: Date.now()
})
client.publish(topic, payload, { qos: 1 })
}
// Function to send command response
function sendCommandResponse(commandId, status, data = {}) {
const topic = `${workspaceId}/devices/${deviceId}/commands/response`
const payload = JSON.stringify({
commandId,
status, // 'success', 'error', 'in_progress'
data,
timestamp: Date.now()
})
client.publish(topic, payload, { qos: 1 })
}
// Handle specific command types
function handleConfigCommand(command) {
console.log('Updating device configuration...')
// Apply configuration changes
if (command.reportInterval) {
setReportingInterval(command.reportInterval)
}
if (command.mode) {
setPowerMode(command.mode)
}
// Additional configuration parameters...
console.log('Configuration updated successfully')
}
function handleRebootCommand(command) {
console.log('Preparing to reboot...')
// Send acknowledgment before rebooting
if (command.commandId) {
sendCommandAcknowledgment(command.commandId, 'in_progress')
}
// Delay if specified
const delay = command.delay || 0
setTimeout(() => {
console.log(`Rebooting now (reason: ${command.reason || 'requested'})`)
// Actual reboot code would be here
process.exit(0) // Simulated reboot
}, delay * 1000)
}
function handleUpdateCommand(command) {
console.log(`Firmware update requested: ${command.version}`)
// Firmware update implementation...
}Python
python
import json
import time
import paho.mqtt.client as mqtt
# Command handler functions
def handle_config(command):
print(f"Updating configuration: {command}")
# Apply configuration changes
if 'reportInterval' in command:
set_reporting_interval(command['reportInterval'])
if 'mode' in command:
set_power_mode(command['mode'])
return {
"status": "success",
"message": "Configuration updated"
}
def handle_reboot(command):
print(f"Reboot requested: {command}")
delay = command.get('delay', 0)
# In a real implementation, you'd schedule a reboot
print(f"Will reboot in {delay} seconds")
return {
"status": "in_progress",
"message": f"Rebooting in {delay} seconds"
}
# Command handlers map
command_handlers = {
'config': handle_config,
'reboot': handle_reboot
# Add more handlers as needed
}
# MQTT callbacks
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker!")
# Subscribe to all commands
topic = f"{workspace_id}/devices/{device_id}/commands/#"
client.subscribe(topic, qos=1)
print(f"Subscribed to {topic}")
else:
print(f"Failed to connect, return code: {rc}")
def on_message(client, userdata, msg):
print(f"Message received on topic: {msg.topic}")
# Extract command type from topic
topic_parts = msg.topic.split('/')
command_type = topic_parts[-1]
try:
# Parse command payload
command = json.loads(msg.payload.decode())
print(f"Command type: {command_type}, payload: {command}")
# Get command ID if present
command_id = command.get('commandId')
# Process command if handler exists
if command_type in command_handlers:
# Send acknowledgment if needed
if command_id:
send_ack(client, command_id, 'received')
# Process the command
result = command_handlers[command_type](command)
# Send response if command ID exists
if command_id:
send_response(client, command_id, result)
else:
print(f"Unknown command type: {command_type}")
if command_id:
send_response(client, command_id, {
"status": "error",
"message": f"Unsupported command type: {command_type}"
})
except json.JSONDecodeError as e:
print(f"Invalid command format: {e}")
except Exception as e:
print(f"Error processing command: {e}")
if 'command_id' in locals() and command_id:
send_response(client, command_id, {
"status": "error",
"message": str(e)
})
def send_ack(client, command_id, status):
"""Send command acknowledgment"""
topic = f"{workspace_id}/devices/{device_id}/commands/ack"
payload = json.dumps({
"commandId": command_id,
"status": status,
"timestamp": int(time.time() * 1000)
})
client.publish(topic, payload, qos=1)
print(f"Acknowledgment sent for command {command_id}")
def send_response(client, command_id, result):
"""Send command response"""
topic = f"{workspace_id}/devices/{device_id}/commands/response"
payload = json.dumps({
"commandId": command_id,
"status": result.get("status", "success"),
"data": result,
"timestamp": int(time.time() * 1000)
})
client.publish(topic, payload, qos=1)
print(f"Response sent for command {command_id}")
# Set up MQTT client
client = mqtt.Client(client_id=f"device_{device_id}_{int(time.time())}")
client.username_pw_set(credentials['username'], credentials['password'])
client.on_connect = on_connect
client.on_message = on_message
# Connect to the broker
client.connect("broker.dedot.io", 8883, keepalive=60)
# Start network loop
client.loop_forever()Command Tracking & History
Track command execution and history:
javascript
// Get command status
const commandStatus = await access.request({
url: `/commands/${commandId}`,
method: 'GET'
})
console.log('Command status:', commandStatus.data)
// Output:
// {
// commandId: "cmd_abc123def456",
// deviceId: "dev_123",
// type: "config",
// status: "delivered", // 'sent', 'delivered', 'acknowledged', 'executing', 'completed', 'failed', 'expired'
// payload: { reportInterval: 60, mode: "balanced" },
// sentAt: "2026-01-19T15:30:22.000Z",
// deliveredAt: "2026-01-19T15:30:23.000Z",
// acknowledgment: { status: "received", timestamp: "2026-01-19T15:30:23.100Z" },
// response: null, // Will contain response data when completed
// metadata: { initiator: "[email protected]" }
// }
// Get command history for a device
const commandHistory = await access.request({
url: `/devices/${deviceId}/commands/history`,
method: 'GET',
query: {
type: 'config', // Filter by command type
status: 'completed', // Filter by status
startTime: '2026-01-01T00:00:00.000Z',
endTime: '2026-01-31T23:59:59.999Z',
limit: 50,
page: 1
}
})
console.log(`Found ${commandHistory.data.length} commands`)Command Response Handling
Process command responses and set up automatic retries:
javascript
// Create retry policy for commands
const retryPolicy = await access.request({
url: '/workspace/config/command-retry',
method: 'PATCH',
body: {
policies: [
{
type: 'config',
maxRetries: 3,
retryInterval: 60, // seconds
exponentialBackoff: true
},
{
type: 'update',
maxRetries: 5,
retryInterval: 300, // 5 minutes
exponentialBackoff: true
}
],
defaultPolicy: {
maxRetries: 2,
retryInterval: 30
}
}
})Command Scheduling
Schedule commands for future execution:
javascript
// Schedule a command
const scheduledCommand = await access.request({
url: `/devices/${deviceId}/commands/schedule`,
method: 'POST',
body: {
type: 'config',
payload: {
mode: 'lowPower'
},
schedule: {
time: '2026-01-20T22:00:00.000Z',
recurring: 'daily',
endAfter: 7 // days (optional)
},
options: {
timeout: 60,
retryCount: 3
}
}
})
console.log('Scheduled command:', scheduledCommand.data)
// Get scheduled commands
const scheduledCommands = await access.request({
url: `/devices/${deviceId}/commands/schedule`,
method: 'GET'
})
console.log(`Found ${scheduledCommands.data.length} scheduled commands`)
// Cancel scheduled command
await access.request({
url: `/commands/schedule/${scheduleId}`,
method: 'DELETE'
})Command Authorization
De.IoTB provides fine-grained command authorization:
javascript
// Set command permissions for a role
const permissions = await access.request({
url: '/workspace/roles/technician/permissions',
method: 'PATCH',
body: {
commands: {
config: {
allowed: true,
devices: ['dev_123', 'dev_456'], // Specific devices
deviceGroups: ['sensors'], // Or device groups
fields: ['reportInterval'], // Restricted fields
maxValue: { // Field constraints
reportInterval: 300
}
},
reboot: {
allowed: true,
requireApproval: true, // Requires second approval
approvers: ['admin']
},
update: {
allowed: false // Disallow firmware updates
},
"*": {
allowed: false // Default deny other commands
}
}
}
})Command Schema Validation
Define command schemas for validation:
javascript
// Define command schemas
const schemas = await access.request({
url: '/workspace/config/command-schemas',
method: 'POST',
body: {
schemas: [
{
type: 'config',
properties: {
reportInterval: {
type: 'integer',
minimum: 10,
maximum: 3600,
description: 'Reporting interval in seconds'
},
mode: {
type: 'string',
enum: ['lowPower', 'balanced', 'performance'],
description: 'Device power mode'
}
},
required: ['reportInterval']
},
{
type: 'reboot',
properties: {
delay: {
type: 'integer',
minimum: 0,
maximum: 300,
description: 'Delay before reboot in seconds'
},
reason: {
type: 'string',
maxLength: 200,
description: 'Reason for reboot'
}
}
}
]
}
})Security Best Practices
- Command Authentication: Verify command authenticity using signatures or tokens
- Input Validation: Always validate command payloads against schemas
- Command Rate Limiting: Limit command frequency to prevent abuse
- Command Auditing: Log all commands for security analysis
- Least Privilege: Grant minimum required command permissions to users
- Secure Channel: Only send commands over encrypted connections
- Command Replay Prevention: Use nonce values to prevent replay attacks

