Skip to content

Pipeline Architecture

Deep dive into the pipeline orchestration system's design, components, and execution model.

System Overview

De. Pipelines are built on a three-tier architecture that separates design, validation, and execution concerns:

┌─────────────────────────────────────────────────────────────┐
│                 De. Pipeline Architecture                    │
├─────────────────────────────────────────────────────────────┤
│                                                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │         Design Layer - Pipeline Templates           │    │
│  │  • Template creation and versioning                 │    │
│  │  • Stage definitions and transitions                │    │
│  │  • Query configurations                             │    │
│  └──────────────────┬──────────────────────────────────┘    │
│                     │ Validation                             │
│                     ▼                                        │
│  ┌─────────────────────────────────────────────────────┐    │
│  │       Validation Layer - Deployed Pipelines         │    │
│  │  • Resource verification                            │    │
│  │  • Capability checks                                │    │
│  │  • Health monitoring setup                          │    │
│  │  • Performance baselining                           │    │
│  └──────────────────┬──────────────────────────────────┘    │
│                     │ Instantiation                          │
│                     ▼                                        │
│  ┌─────────────────────────────────────────────────────┐    │
│  │      Execution Layer - Pipeline Executions          │    │
│  │  • Active workflow instances                        │    │
│  │  • Stage progression tracking                       │    │
│  │  • Real-time state management                       │    │
│  │  • Results and metrics collection                   │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │         Worker System - Background Processing       │    │
│  │  • Transition Worker    (30s)                       │    │
│  │  • Monitoring Worker    (60s)                       │    │
│  │  • Health Worker        (300s)                      │    │
│  │  • Webhook Worker       (10s)                       │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                               │
└─────────────────────────────────────────────────────────────┘

Core Components

1. Pipeline Templates

Purpose: Reusable workflow blueprints

Schema:

typescript
interface PipelineTemplate {
  id: string
  name: string
  description?: string
  version: string
  
  // Workflow definition
  stages: Stage[]
  metadata: Record<string, any>
  
  // Configuration
  config: {
    timeout?: number
    retryPolicy?: RetryPolicy
    concurrency?: number
  }
  
  // Health monitoring
  health?: {
    simulationEnabled: boolean
    simulationInterval: number
    alertThresholds: HealthThresholds
  }
  
  // Lifecycle
  status: 'DRAFT' | 'PUBLISHED' | 'DEPRECATED'
  created: Timestamp
  updated: Timestamp
}

Lifecycle:

  1. DRAFT - Under development, can be edited
  2. PUBLISHED - Available for deployment
  3. DEPRECATED - Archived, no new deployments

2. Stages

Purpose: Individual workflow steps with actions and transitions

Schema:

typescript
interface Stage {
  name: string
  description?: string
  
  // Action to perform
  action: 'QUERY' | 'WEBHOOK' | 'WAIT'
  
  // Action-specific config
  query?: QueryConfig
  webhook?: WebhookConfig
  wait?: WaitConfig
  
  // Transition logic
  transitions: {
    onSuccess?: Transition
    onFailure?: Transition
    onTimeout?: Transition
  }
  
  // Execution constraints
  timeout?: number
  retries?: number
  retryDelay?: number
}

Stage States:

  • PENDING - Awaiting execution
  • RUNNING - Currently executing
  • SUCCESS - Completed successfully
  • FAILED - Execution failed
  • TIMEOUT - Exceeded time limit
  • SKIPPED - Bypassed by transition logic

3. Query System

Purpose: Dynamic resource selection with fallback strategies

Architecture:

typescript
interface QueryConfig {
  resource: ResourceType
  filters: Record<string, any>
  strategy: SelectionStrategy
  fallbackOptions?: FallbackOption[]
  cacheDuration?: number
}

type ResourceType = 
  | 'WAREHOUSE' 
  | 'CARRIER' 
  | 'TERMINAL' 
  | 'ROUTE' 
  | 'INVENTORY'
  | 'VEHICLE'

type SelectionStrategy =
  | 'BEST_MATCH'      // Highest score
  | 'CLOSEST'         // Geographic proximity
  | 'FASTEST'         // Shortest time
  | 'CHEAPEST'        // Lowest cost
  | 'HIGHEST_CAPACITY'
  | 'HIGHEST_RATED'
  | 'RANDOM'          // Load balancing

Query Execution Flow:

1. Apply filters to resource collection
2. Score remaining candidates using strategy
3. Select highest-scoring resource
4. If none found, try first fallback option
5. Repeat until match found or all options exhausted
6. Cache result for configured duration

Scoring Algorithm:

typescript
// Example: CLOSEST strategy
function scoreWarehouse(warehouse, execution) {
  const destination = execution.metadata.deliveryAddress
  const distance = calculateDistance(warehouse.location, destination)
  
  // Normalize distance to 0-100 score (closer = higher)
  const maxDistance = 500 // km
  const distanceScore = Math.max(0, 100 - (distance / maxDistance * 100))
  
  // Apply capability bonuses
  const capabilityBonus = warehouse.capabilities
    .filter(cap => execution.requiredCapabilities.includes(cap))
    .length * 5
  
  return Math.min(100, distanceScore + capabilityBonus)
}

4. Pipeline Executions

Purpose: Runtime instances processing real data

Schema:

typescript
interface PipelineExecution {
  id: string
  pipelineId: string
  
  // Current state
  status: 'PENDING' | 'RUNNING' | 'SUCCESS' | 'FAILED' | 'TIMEOUT'
  currentStageIndex: number
  stages: StageExecution[]
  
  // Execution context
  context: Context
  metadata: Record<string, any>
  
  // Results
  results: Record<string, any>
  error?: ExecutionError
  
  // Performance
  startedAt: Timestamp
  completedAt?: Timestamp
  duration?: number
  
  // Retry management
  retryAttempts: number
  nextRetryAt?: Timestamp
}

interface StageExecution extends Stage {
  status: StageStatus
  startedAt?: Timestamp
  completedAt?: Timestamp
  result?: any
  error?: StageError
  attempts: number
}

Execution Flow:

1. Create execution from pipeline
2. Queue initial stage transition
3. Transition Worker picks up transition
4. Execute stage action
5. Evaluate transition conditions
6. Queue next stage or complete
7. Monitoring Worker checks for timeouts
8. Health Worker validates pipeline health

5. Worker System

Purpose: Background processing for autonomous operation

Worker Manager

Orchestrates all workers for a workspace:

typescript
class WorkerManager {
  private workers: Map<string, BaseWorker>
  
  register(worker: BaseWorker): void
  unregister(workerId: string): void
  startAll(): void
  stopAll(): Promise<void>
  getStats(): WorkerStats
}

Base Worker

Foundation for all worker types:

typescript
abstract class BaseWorker {
  protected config: WorkerConfig
  protected intervalHandle?: NodeJS.Timeout
  
  abstract execute(): Promise<void>
  
  start(): void {
    this.intervalHandle = setInterval(
      () => this.execute(),
      this.config.intervalMs
    )
  }
  
  stop(): Promise<void>
  getStats(): MonitoringStats
}

Transition Worker

Responsibility: Execute stage transitions

Interval: 30 seconds

Process:

typescript
async execute() {
  // 1. Fetch pending transitions
  const transitions = await fetchQueuedTransitions()
  
  for (const transition of transitions) {
    try {
      // 2. Load execution context
      const execution = await loadExecution(transition.executionId)
      
      // 3. Execute stage action
      const result = await executeStageAction(
        execution.stages[execution.currentStageIndex]
      )
      
      // 4. Evaluate transition
      const nextStage = evaluateTransition(result, transition)
      
      // 5. Update execution state
      await updateExecution(execution, result, nextStage)
      
      // 6. Queue next transition if needed
      if (nextStage) {
        await queueTransition(execution.id, nextStage)
      }
      
      this.stats.transitionsProcessed++
    } catch (error) {
      await handleTransitionError(transition, error)
      this.stats.transitionErrors++
    }
  }
}

Monitoring Worker

Responsibility: Detect timeouts and stalled stages

Interval: 60 seconds

Process:

typescript
async execute() {
  const stats = {
    activeExecutions: 0,
    timedOutStages: 0,
    stalledStages: 0
  }
  
  // Check for timed-out stages
  await checkForTimedOutStages(this.App, this.context, stats)
  
  // Check for stalled stages
  await checkForStalledStages(this.App, this.context, stats)
  
  this.updateStats(stats)
}

Timeout Detection:

typescript
function checkStageTimeout(execution, stage) {
  const elapsed = Date.now() - stage.startedAt
  const timeout = stage.timeout || execution.pipeline.config.timeout || 3600000
  
  return elapsed > timeout
}

Health Worker

Responsibility: Continuous testing and health scoring

Interval: 300 seconds (5 minutes)

Process:

typescript
async execute() {
  const stats = {
    pipelinesChecked: 0,
    healthScoresCalculated: 0,
    statusChanges: 0,
    alertsGenerated: 0
  }
  
  // Run continuous simulations and health checks
  await runContinuousTests(this.App, this.context, stats)
  
  this.updateStats(stats)
}

Health Score Calculation:

typescript
function calculateHealthScore(pipeline, executions) {
  const recentExecutions = executions.filter(
    e => e.completedAt > Date.now() - 86400000 // Last 24h
  )
  
  if (recentExecutions.length === 0) return 100
  
  // Success rate (40% weight)
  const successRate = recentExecutions.filter(e => e.status === 'SUCCESS').length 
    / recentExecutions.length * 40
  
  // Average duration vs baseline (30% weight)
  const avgDuration = average(recentExecutions.map(e => e.duration))
  const durationScore = Math.max(0, 30 - (avgDuration / pipeline.baseline.duration - 1) * 30)
  
  // Error rate (20% weight)
  const errorRate = recentExecutions.filter(e => e.error).length / recentExecutions.length
  const errorScore = Math.max(0, 20 - errorRate * 20)
  
  // Simulation success (10% weight)
  const simulationScore = pipeline.lastSimulation?.success ? 10 : 0
  
  return Math.min(100, successRate + durationScore + errorScore + simulationScore)
}

Webhook Worker

Responsibility: Process webhook queue

Interval: 10 seconds

Process:

typescript
async execute() {
  const webhooks = await fetchQueuedWebhooks()
  
  for (const webhook of webhooks) {
    try {
      const response = await sendWebhook(webhook)
      
      await markWebhookDelivered(webhook.id, response)
      this.stats.webhooksDelivered++
    } catch (error) {
      if (webhook.retries < webhook.maxRetries) {
        await requeueWebhook(webhook, error)
        this.stats.webhookRetries++
      } else {
        await markWebhookFailed(webhook.id, error)
        this.stats.webhookFailures++
      }
    }
  }
}

6. Transition System

Purpose: Manage stage progression logic

Transition Types:

typescript
interface Transition {
  type: TransitionType
  next?: string  // Stage name
  condition?: TransitionCondition
  retries?: number
  retryDelay?: number
}

type TransitionType = 
  | 'CONTINUE'   // Proceed to next stage
  | 'SKIP'       // Jump to specified stage
  | 'RETRY'      // Re-execute current stage
  | 'FAIL'       // Mark execution as failed
  | 'COMPLETE'   // End execution successfully

Condition Evaluation:

typescript
interface TransitionCondition {
  field: string              // e.g., "result.available"
  operator: ConditionOperator
  value: any
  logic?: 'AND' | 'OR'
  conditions?: TransitionCondition[]  // Nested conditions
}

type ConditionOperator =
  | 'EQUALS'
  | 'NOT_EQUALS'
  | 'GREATER_THAN'
  | 'LESS_THAN'
  | 'CONTAINS'
  | 'EXISTS'

Evaluation Engine:

typescript
function evaluateCondition(condition, context) {
  const fieldValue = getNestedValue(context, condition.field)
  
  switch (condition.operator) {
    case 'EQUALS':
      return fieldValue === condition.value
    case 'GREATER_THAN':
      return fieldValue > condition.value
    case 'CONTAINS':
      return Array.isArray(fieldValue) 
        ? fieldValue.includes(condition.value)
        : String(fieldValue).includes(condition.value)
    case 'EXISTS':
      return fieldValue !== undefined && fieldValue !== null
    // ... other operators
  }
  
  // Handle nested conditions
  if (condition.conditions) {
    const results = condition.conditions.map(c => evaluateCondition(c, context))
    return condition.logic === 'AND' 
      ? results.every(r => r)
      : results.some(r => r)
  }
}

Data Flow

Pipeline Execution Lifecycle

┌─────────────────────────────────────────────────────────────┐
│                  Execution Lifecycle                         │
├─────────────────────────────────────────────────────────────┤
│                                                               │
│  1. CREATE EXECUTION                                         │
│     ├─ Load pipeline definition                             │
│     ├─ Initialize execution context                         │
│     ├─ Set metadata and parameters                          │
│     └─ Queue first stage transition                         │
│                                                               │
│  2. STAGE EXECUTION (Loop)                                   │
│     ├─ Transition Worker picks up queued transition         │
│     ├─ Load current stage definition                        │
│     ├─ Execute stage action:                                │
│     │  ├─ QUERY: Search and select resource                │
│     │  ├─ WEBHOOK: Send HTTP request                       │
│     │  └─ WAIT: Check condition or timer                   │
│     ├─ Store stage result                                   │
│     ├─ Evaluate transition conditions                       │
│     ├─ Determine next stage                                 │
│     └─ Queue next transition or complete                    │
│                                                               │
│  3. MONITORING (Parallel)                                    │
│     ├─ Monitoring Worker checks for timeouts                │
│     ├─ Handle timed-out stages                              │
│     └─ Detect stalled executions                            │
│                                                               │
│  4. COMPLETION                                               │
│     ├─ Mark execution status (SUCCESS/FAILED)               │
│     ├─ Calculate total duration                             │
│     ├─ Update pipeline metrics                              │
│     └─ Trigger completion webhooks                          │
│                                                               │
│  5. HEALTH TRACKING (Background)                             │
│     ├─ Health Worker calculates score                       │
│     ├─ Run periodic simulations                             │
│     ├─ Generate alerts if degraded                          │
│     └─ Update pipeline status                               │
│                                                               │
└─────────────────────────────────────────────────────────────┘

Scalability Design

Horizontal Scaling

De. Pipelines support horizontal scaling through workspace sharding:

typescript
// Each service instance handles a subset of workspaces
const INSTANCE_ID = process.env.INSTANCE_ID || 0
const TOTAL_INSTANCES = process.env.TOTAL_INSTANCES || 1

// Shard workspaces by ID modulo
const workspaces = await db.find({
  status: 'active',
  $expr: { 
    $eq: [
      { $mod: [{ $toInt: '$wid' }, TOTAL_INSTANCES] },
      INSTANCE_ID
    ]
  }
})

// Initialize workers only for this shard
for (const workspace of workspaces) {
  initializePipelineWorkers(App, workspace.context)
}

Scaling Strategy:

  • 1-1,000 workspaces: Single instance
  • 1,000-10,000: 10 instances (1,000 each)
  • 10,000-100,000: 100 instances (1,000 each)
  • 100,000+: Dynamic auto-scaling

Performance Optimization

Query Caching:

typescript
// Cache query results to reduce database load
const cacheKey = `query:${resource}:${hash(filters)}`
const cached = await cache.get(cacheKey)

if (cached && Date.now() - cached.timestamp < cacheDuration) {
  return cached.result
}

const result = await executeQuery(resource, filters)
await cache.set(cacheKey, { result, timestamp: Date.now() })

Batch Processing:

typescript
// Process multiple transitions in parallel
const transitions = await fetchQueuedTransitions(batchSize: 100)
await Promise.all(transitions.map(t => processTransition(t)))

Connection Pooling:

typescript
// Database connection pools per context
const poolConfig = {
  minSize: 10,
  maxSize: 100,
  acquireTimeout: 30000,
  idleTimeout: 300000
}

Error Handling

Error Categories

Validation Errors (4xx)

  • Invalid pipeline configuration
  • Missing required fields
  • Resource not found
  • Permission denied

Execution Errors (5xx)

  • Stage action failure
  • Query timeout
  • Webhook delivery failure
  • Worker crash

System Errors

  • Database connection failure
  • Memory exhaustion
  • Network partition

Error Recovery

typescript
// Stage-level retry with exponential backoff
async function executeStageWithRetry(stage, execution) {
  const maxRetries = stage.retries || 3
  const baseDelay = stage.retryDelay || 1000
  
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await executeStage(stage, execution)
    } catch (error) {
      if (attempt === maxRetries - 1) throw error
      
      const delay = baseDelay * Math.pow(2, attempt)
      await sleep(delay)
    }
  }
}

Security Considerations

Context Isolation

  • Each workspace has isolated pipeline executions
  • No cross-workspace data access
  • Separate database collections per context

Webhook Security

  • HTTPS only for production webhooks
  • Request signing for verification
  • Timeout limits to prevent hanging
  • Rate limiting on external calls

Query Filtering

  • Resource access validated against workspace permissions
  • Filters sanitized to prevent injection
  • Results limited to authorized resources

Next Steps