Skip to content

Active Rules Engine

The Active Rules Engine is a core component of De.IoTB that provides schema-driven message parsing, data transformation, and event-driven automation.

Overview

The Active Rules engine serves several critical functions:

  1. Message Parsing: Converts raw device messages into structured data
  2. Data Normalization: Transforms values into standardized formats
  3. Event Triggering: Initiates automated workflows based on rule conditions
  4. Data Routing: Directs processed data to appropriate destinations

Rule Architecture

The Active Rules engine functions within the container architecture:

┌─────────────────────────────────────────────────┐
│                  IoT Device                     │
└───────────────────────┬─────────────────────────┘


┌─────────────────────────────────────────────────┐
│                  MQTT Broker                    │
└───────────────────────┬─────────────────────────┘


┌─────────────────────────────────────────────────┐
│                Channel Manager                  │
└───────────────────────┬─────────────────────────┘


┌─────────────────────────────────────────────────┐
│ ┌─────────────────────────────────────────────┐ │
│ │             Active Rules Engine             │ │
│ │                                             │ │
│ │  ┌─────────────┐       ┌─────────────────┐  │ │
│ │  │ Rule Lookup │──────▶│Schema Processor │  │ │
│ │  └─────────────┘       └─────────────────┘  │ │
│ │         │                      │            │ │
│ │         ▼                      ▼            │ │
│ │  ┌─────────────┐       ┌─────────────────┐  │ │
│ │  │  LRU Cache  │◀─────▶│ Type Conversion │  │ │
│ │  └─────────────┘       └─────────────────┘  │ │
│ └─────────────────────────────────────────────┘ │
└───────────────────────┬─────────────────────────┘


┌─────────────────────────────────────────────────┐
│                 Event Manager                   │
└─────────────────────────────────────────────────┘

Rule Structure

Rules are defined using the following schema:

typescript
interface Rule {
  id: string            // System-generated unique identifier
  reference: string     // User-provided reference code
  icode: string         // Internal workspace code
  type: 'REPORT' | 'COMMAND' // Message direction
  
  head: string[]        // Message identifiers (e.g., ["POS", "POSITION"])
  struct: string        // Template: "HEAD,{field1},{field2}$"
  
  schema: {
    [field: string]: {
      type: 'string' | 'number' | 'integer' | 'float' | 'boolean' | 'array'
      unit?: string
      description?: string
    }
  }
  
  connectors: string[]  // Associated workspace IDs
  tags: string[]        // Classification tags
  
  description: string   // Human-readable description
  tested: boolean       // Validation status
  enabled: boolean      // Rule activation status
  
  added: {
    at: number         // Timestamp
    by: string         // User identifier
  }
}

Rule Examples

Example 1: GPS Position Rule

javascript
{
  "reference": "position.report",
  "icode": "DEV-9X00-233",
  "type": "REPORT",
  "head": ["POS", "POSITION"],
  "struct": "HEAD,{latitude},{longitude},{speed},{heading}$",
  "schema": {
    "latitude": { 
      "type": "float", 
      "unit": "degrees",
      "description": "Latitude in decimal degrees"
    },
    "longitude": { 
      "type": "float", 
      "unit": "degrees",
      "description": "Longitude in decimal degrees"
    },
    "speed": { 
      "type": "number", 
      "unit": "km/h",
      "description": "Speed in kilometers per hour"
    },
    "heading": { 
      "type": "number", 
      "unit": "degrees",
      "description": "Heading in degrees (0-359)"
    }
  },
  "connectors": ["ws_abc"],
  "tags": ["gps", "location", "tracking"],
  "description": "Parse GPS position data from tracking devices",
  "tested": true,
  "enabled": true,
  "added": {
    "at": 1698765432000,
    "by": "[email protected]"
  }
}

Example 2: Sensor Reading Rule

javascript
{
  "reference": "sensor.environmental",
  "icode": "DEV-9X00-233",
  "type": "REPORT",
  "head": ["ENV", "ENVIRONMENTAL"],
  "struct": "HEAD,{temperature},{humidity},{pressure},{light},{battery}$",
  "schema": {
    "temperature": { 
      "type": "float", 
      "unit": "celsius",
      "description": "Temperature in degrees Celsius"
    },
    "humidity": { 
      "type": "float", 
      "unit": "%",
      "description": "Relative humidity percentage"
    },
    "pressure": { 
      "type": "float", 
      "unit": "hPa",
      "description": "Atmospheric pressure in hectopascals"
    },
    "light": { 
      "type": "integer", 
      "unit": "lux",
      "description": "Ambient light level in lux"
    },
    "battery": { 
      "type": "integer", 
      "unit": "%",
      "description": "Battery level percentage"
    }
  },
  "connectors": ["ws_abc"],
  "tags": ["sensors", "environmental"],
  "description": "Parse environmental sensor data",
  "tested": true,
  "enabled": true,
  "added": {
    "at": 1698765432000,
    "by": "[email protected]"
  }
}

Example 3: Device Command Rule

javascript
{
  "reference": "command.configuration",
  "icode": "DEV-9X00-233",
  "type": "COMMAND",
  "head": ["CFG", "CONFIG"],
  "struct": "HEAD,{interval},{mode},{led},{verbose}$",
  "schema": {
    "interval": { 
      "type": "integer", 
      "unit": "seconds",
      "description": "Reporting interval in seconds"
    },
    "mode": { 
      "type": "string", 
      "description": "Power mode: normal, eco, or power"
    },
    "led": { 
      "type": "boolean", 
      "description": "LED indicator enabled"
    },
    "verbose": { 
      "type": "boolean", 
      "description": "Verbose logging enabled"
    }
  },
  "connectors": ["ws_abc"],
  "tags": ["commands", "configuration"],
  "description": "Configuration command formatting",
  "tested": true,
  "enabled": true,
  "added": {
    "at": 1698765432000,
    "by": "[email protected]"
  }
}

Processing Algorithm

The Active Rules engine processes messages in the following steps:

  1. Message Reception: Raw message received from device via Channel Manager
  2. Header Extraction: First element identified as the message header
  3. Rule Lookup: Header matched against available rules in LRU cache
  4. Cache Management: If rule not in cache, loaded from database and cached
  5. Schema Application: Rule schema applied to message structure
  6. Field Extraction: Individual fields extracted based on structure pattern
  7. Type Conversion: Values converted according to schema type definitions
  8. Object Construction: Normalized object constructed from converted values
  9. Event Publishing: Processed data published to Event Manager

Type Conversion Details

The Active Rules engine performs the following type conversions:

Schema TypeInput ExampleOutputNotes
string"hello""hello"No transformation
number"42.5"42.5Parsed with Number()
integer"42.9"42Truncated to integer
float"42.9"42.9Maintained as floating-point
boolean"true" or "1"true"false", "0", empty strings become false
array"a,b,c"["a", "b", "c"]Comma-separated string split into array

Creating and Managing Rules

Create a New Rule

javascript
import De from '@de./sdk'

const access = new De.Access({
  workspace: 'your-workspace-id',
  accessToken: 'your-access-token',
  env: 'prod'
})

// Create a new rule
const rule = await access.request({
  url: '/rules',
  method: 'POST',
  body: {
    reference: 'fuel.level',
    type: 'REPORT',
    head: ['FUEL', 'FL'],
    struct: 'HEAD,{level},{range},{efficiency}$',
    schema: {
      level: {
        type: 'float',
        unit: '%',
        description: 'Fuel level percentage'
      },
      range: {
        type: 'integer',
        unit: 'km',
        description: 'Estimated remaining range'
      },
      efficiency: {
        type: 'float',
        unit: 'km/L',
        description: 'Current fuel efficiency'
      }
    },
    tags: ['vehicle', 'fuel'],
    description: 'Vehicle fuel level reporting'
  }
})

console.log('Rule created:', rule.data)

List Active Rules

javascript
// Get all rules
const rules = await access.request({
  url: '/rules',
  method: 'GET',
  query: {
    type: 'REPORT',            // Filter by type
    tags: 'vehicle,tracking',  // Filter by tags (comma-separated)
    enabled: true,             // Only enabled rules
    page: 1,
    limit: 50
  }
})

console.log(`Found ${rules.data.length} rules`)

Update an Existing Rule

javascript
// Update a rule
const updatedRule = await access.request({
  url: `/rules/${ruleId}`,
  method: 'PATCH',
  body: {
    head: ['FUEL', 'FL', 'FUELLEVEL'],  // Add alternative header
    schema: {
      level: {
        type: 'float',
        unit: '%',
        description: 'Updated description'
      },
      // Add new field
      timestamp: {
        type: 'integer',
        description: 'Timestamp in milliseconds'
      }
    },
    enabled: true  // Enable/disable rule
  }
})

console.log('Rule updated:', updatedRule.data)

Test a Rule

javascript
// Test rule against sample data
const testResult = await access.request({
  url: '/rules/test',
  method: 'POST',
  body: {
    ruleId: 'rule_123',  // Optional, specify rule ID
    // Or provide rule definition directly
    rule: {
      head: ['TEMP'],
      struct: 'HEAD,{value},{unit}$',
      schema: {
        value: { type: 'float' },
        unit: { type: 'string' }
      }
    },
    // Sample messages to test
    samples: [
      'TEMP,23.5,C$',
      'TEMP,72.1,F$',
      'TEMPERATURE,18.2,C$'  // Test alternative header
    ]
  }
})

console.log('Test results:', testResult.data)
// Output:
// {
//   success: 2,
//   failed: 1,
//   results: [
//     {
//       input: 'TEMP,23.5,C$',
//       success: true,
//       output: { value: 23.5, unit: 'C' },
//       matchedRule: 'rule_123'
//     },
//     {
//       input: 'TEMP,72.1,F$',
//       success: true,
//       output: { value: 72.1, unit: 'F' },
//       matchedRule: 'rule_123'
//     },
//     {
//       input: 'TEMPERATURE,18.2,C$',
//       success: false,
//       error: 'Header not matched',
//       matchedRule: null
//     }
//   ]
// }

Delete a Rule

javascript
// Delete a rule
await access.request({
  url: `/rules/${ruleId}`,
  method: 'DELETE'
})

Rule Templates

De.IoTB provides pre-built rule templates for common device types:

javascript
// Get available rule templates
const templates = await access.request({
  url: '/rules/templates',
  method: 'GET'
})

console.log('Available templates:', templates.data)

// Create rule from template
const newRule = await access.request({
  url: '/rules/from-template',
  method: 'POST',
  body: {
    templateId: 'gps-tracker',
    reference: 'custom.gps',
    customizations: {
      // Override specific fields
      schema: {
        altitude: {
          type: 'float',
          unit: 'meters',
          description: 'Altitude above sea level'
        }
      }
    }
  }
})

Advanced Features

Batch Operations

javascript
// Bulk create/update rules
const batchResult = await access.request({
  url: '/rules/batch',
  method: 'POST',
  body: {
    rules: [
      {
        reference: 'rule1',
        head: ['HEAD1'],
        struct: 'HEAD,{field1},{field2}$',
        schema: { /* ... */ }
      },
      {
        reference: 'rule2',
        head: ['HEAD2'],
        struct: 'HEAD,{fieldA},{fieldB}$',
        schema: { /* ... */ }
      }
    ],
    operation: 'create'  // 'create', 'update', or 'upsert'
  }
})

console.log(`Processed ${batchResult.data.successful} rules successfully`)

Import/Export

javascript
// Export rules
const exportedRules = await access.request({
  url: '/rules/export',
  method: 'GET',
  query: {
    format: 'json',  // 'json' or 'csv'
    tags: 'vehicle'  // Optional filter
  }
})

// Download export
console.log('Export URL:', exportedRules.data.url)

// Import rules from file
const importResult = await access.request({
  url: '/rules/import',
  method: 'POST',
  body: {
    fileUrl: 'https://example.com/rules.json',
    operation: 'upsert'  // 'create', 'update', or 'upsert'
  }
})

console.log(`Imported ${importResult.data.count} rules`)

Version Control

javascript
// Get rule history
const ruleHistory = await access.request({
  url: `/rules/${ruleId}/history`,
  method: 'GET'
})

console.log('Rule versions:', ruleHistory.data)

// Restore previous version
await access.request({
  url: `/rules/${ruleId}/restore`,
  method: 'POST',
  body: {
    versionId: 'v2'  // Version to restore
  }
})

Performance Optimization

The Active Rules engine uses several optimization techniques:

LRU Cache

The engine implements an LRU (Least Recently Used) cache for frequently accessed rules:

javascript
// Configure cache settings
await access.request({
  url: '/workspace/config/rules-cache',
  method: 'PATCH',
  body: {
    maxSize: 100,       // Maximum rules in cache
    ttl: 3600,          // Time-to-live in seconds
    cleanupInterval: 300 // Cache cleanup interval
  }
})

// Get cache statistics
const cacheStats = await access.request({
  url: '/rules/cache/stats',
  method: 'GET'
})

console.log('Cache stats:', cacheStats.data)
// Output:
// {
//   size: 47,          // Current cache size
//   maxSize: 100,      // Maximum cache size
//   hits: 12543,       // Cache hit count
//   misses: 378,       // Cache miss count
//   hitRate: 0.97,     // Cache hit rate (0-1)
//   avgAccessTime: 0.8 // Average access time (ms)
// }

// Clear cache (rarely needed)
await access.request({
  url: '/rules/cache',
  method: 'DELETE'
})

Rule Compilation

The engine pre-compiles rules for faster execution:

javascript
// Compile all rules
await access.request({
  url: '/rules/compile',
  method: 'POST'
})

Integration with Event System

Rules can trigger events when matched:

javascript
// Create event binding for rule
await access.request({
  url: '/rules/events',
  method: 'POST',
  body: {
    ruleId: 'rule_123',
    events: [
      {
        condition: {
          field: 'temperature',
          operator: '>',
          value: 30
        },
        actions: [
          {
            type: 'notification',
            config: {
              title: 'High Temperature Alert',
              message: 'Temperature exceeded threshold',
              severity: 'warning'
            }
          },
          {
            type: 'webhook',
            config: {
              url: 'https://example.com/webhook',
              method: 'POST',
              headers: {
                'Content-Type': 'application/json'
              }
            }
          }
        ]
      }
    ]
  }
})

Monitoring

Monitor rule performance and errors:

javascript
// Get rule usage statistics
const ruleStats = await access.request({
  url: `/rules/${ruleId}/stats`,
  method: 'GET',
  query: {
    period: '24h'  // '1h', '24h', '7d', '30d'
  }
})

console.log('Rule statistics:', ruleStats.data)
// Output:
// {
//   matchCount: 5432,     // Number of successful matches
//   errorCount: 43,       // Number of processing errors
//   averageProcessingTime: 1.2, // Average processing time (ms)
//   matchRate: 0.98,      // Success rate (0-1)
//   mostFrequentErrors: [
//     { type: 'type_conversion', field: 'temperature', count: 23 },
//     { type: 'missing_field', field: 'humidity', count: 12 }
//   ]
// }

// Get overall system statistics
const systemStats = await access.request({
  url: '/rules/stats',
  method: 'GET'
})

console.log('System statistics:', systemStats.data)

Security Considerations

Rule Validation

The Active Rules engine performs strict validation to prevent security issues:

  1. Input Validation: All rule definitions are validated against a strict schema
  2. Schema Enforcement: Only allowed types and properties are permitted
  3. Access Control: Rules are scoped to specific workspaces
  4. Performance Limits: Maximum complexity limits prevent DoS attacks
  5. Execution Timeouts: Rule processing has time limits

Best Practices

  1. Rule Testing: Always test rules with sample data before deployment
  2. Field Validation: Define clear field types and validation rules
  3. Error Handling: Configure appropriate error handling for rule failures
  4. Monitoring: Set up alerts for rule processing errors
  5. Documentation: Document rule purposes and expected message formats
  6. Version Control: Track rule changes and enable rollbacks if needed

Next Steps