Integration Guide
Step-by-step guide to building and deploying your first De. Pipeline for logistics automation.
Prerequisites
Before integrating De. Pipelines, ensure you have:
- De. Workspace - Active workspace with pipeline service enabled
- Authentication - Valid API credentials (see Authentication Guide)
- SDK Installed - De. SDK for your platform (Web or React Native)
- Context Understanding - Familiar with pipeline context (
wid,xcode,type)
Quick Start (15 minutes)
Let's build a complete order fulfillment pipeline from scratch.
Step 1: Define Pipeline Template
Create a template that automates order fulfillment:
import { DeClient } from '@dedot/sdk'
const client = new DeClient({
apiKey: 'your-api-key',
workspace: 'your-workspace-id'
})
// Define the pipeline template
const template = {
name: "Order Fulfillment Pipeline",
description: "Automate order fulfillment from validation to carrier assignment",
version: "1.0.0",
stages: [
{
name: "Validate Inventory",
description: "Check if products are in stock",
action: "QUERY",
query: {
resource: "INVENTORY",
filters: {
sku: "${execution.metadata.sku}",
quantity: { $gte: "${execution.metadata.quantity}" }
},
strategy: "BEST_MATCH"
},
transitions: {
onSuccess: {
type: "CONTINUE",
next: "Select Warehouse",
condition: {
field: "result.available",
operator: "EQUALS",
value: true
}
},
onFailure: {
type: "FAIL",
next: "Handle Out of Stock"
}
},
timeout: 30000,
retries: 2
},
{
name: "Select Warehouse",
description: "Find closest warehouse with capacity",
action: "QUERY",
query: {
resource: "WAREHOUSE",
filters: {
status: "OPERATIONAL",
"inventory.sku": "${execution.metadata.sku}",
"capacity.available": { $gt: 0 }
},
strategy: "CLOSEST",
fallbackOptions: [
{
strategy: "FASTEST",
filters: { status: "OPERATIONAL" }
}
]
},
transitions: {
onSuccess: {
type: "CONTINUE",
next: "Assign Carrier"
},
onFailure: {
type: "FAIL",
next: "Manual Review"
}
},
timeout: 30000
},
{
name: "Assign Carrier",
description: "Select optimal carrier for delivery",
action: "QUERY",
query: {
resource: "CARRIER",
filters: {
status: "ACTIVE",
serviceArea: "${execution.results.Select Warehouse.location.zone}",
vehicleType: ["VAN", "TRUCK"]
},
strategy: "CHEAPEST"
},
transitions: {
onSuccess: {
type: "CONTINUE",
next: "Notify Systems"
},
onFailure: {
type: "SKIP",
next: "Select Warehouse", // Try different warehouse
retries: 1
}
},
timeout: 30000
},
{
name: "Notify Systems",
description: "Send notifications to external systems",
action: "WEBHOOK",
webhook: {
url: "https://api.yourcompany.com/orders/fulfill",
method: "POST",
headers: {
"Authorization": "Bearer ${config.apiToken}",
"Content-Type": "application/json"
},
body: {
orderId: "${execution.metadata.orderId}",
warehouseId: "${execution.results.Select Warehouse.id}",
carrierId: "${execution.results.Assign Carrier.id}",
estimatedDelivery: "${execution.results.Assign Carrier.estimatedDelivery}"
},
timeout: 5000,
retries: 3
},
transitions: {
onSuccess: {
type: "COMPLETE"
},
onFailure: {
type: "RETRY",
retries: 2,
retryDelay: 5000
}
}
}
],
config: {
timeout: 300000, // 5 minutes max
concurrency: 10, // Max 10 parallel executions
retryPolicy: {
maxAttempts: 3,
backoffMultiplier: 2,
initialDelay: 1000
}
},
health: {
simulationEnabled: true,
simulationInterval: 600, // 10 minutes
alertThresholds: {
healthScore: 70,
errorRate: 0.1,
avgDuration: 120000 // 2 minutes
}
}
}
// Create the template
const createdTemplate = await client.pipelines.templates.create(template)
console.log('Template created:', createdTemplate.id)Step 2: Validate & Deploy Pipeline
Validate the template and deploy it as an active pipeline:
// Validate template
const validation = await client.pipelines.templates.validate(
createdTemplate.id
)
if (!validation.valid) {
console.error('Validation errors:', validation.errors)
throw new Error('Template validation failed')
}
console.log('Validation passed:', validation.checks)
// Deploy pipeline from template
const pipeline = await client.pipelines.deploy({
templateId: createdTemplate.id,
name: "Production Order Fulfillment",
description: "Active fulfillment pipeline for all orders",
// Override template config if needed
config: {
concurrency: 20 // Higher concurrency for production
}
})
console.log('Pipeline deployed:', pipeline.id)
console.log('Status:', pipeline.status.current) // 'ACTIVE'Step 3: Execute Pipeline
Trigger pipeline execution with order data:
// Execute pipeline for a specific order
const execution = await client.pipelines.execute(pipeline.id, {
// Execution metadata - available as ${execution.metadata.*}
metadata: {
orderId: "ORD-12345",
sku: "WIDGET-001",
quantity: 5,
customerId: "CUST-789",
deliveryAddress: {
lat: 40.7128,
lng: -74.0060,
city: "New York",
zip: "10001"
}
},
// Optional: Override pipeline config for this execution
config: {
timeout: 180000 // 3 minutes for this specific order
}
})
console.log('Execution started:', execution.id)
console.log('Current stage:', execution.stages[execution.currentStageIndex].name)Step 4: Monitor Execution
Track execution progress in real-time:
// Poll execution status
async function waitForCompletion(executionId: string) {
let status = 'PENDING'
while (status === 'PENDING' || status === 'RUNNING') {
const execution = await client.pipelines.executions.get(executionId)
status = execution.status
console.log(`Status: ${status}`)
console.log(`Current stage: ${execution.stages[execution.currentStageIndex].name}`)
if (status === 'RUNNING') {
const currentStage = execution.stages[execution.currentStageIndex]
console.log(` Stage status: ${currentStage.status}`)
if (currentStage.result) {
console.log(` Stage result:`, currentStage.result)
}
}
// Wait before next check
await new Promise(resolve => setTimeout(resolve, 2000))
}
return await client.pipelines.executions.get(executionId)
}
const completed = await waitForCompletion(execution.id)
if (completed.status === 'SUCCESS') {
console.log('✅ Order fulfilled successfully!')
console.log('Results:', completed.results)
console.log('Duration:', completed.duration, 'ms')
} else {
console.error('❌ Execution failed:', completed.error)
}Step 5: View Pipeline Health
Check pipeline health and performance:
// Get pipeline with health metrics
const pipelineStatus = await client.pipelines.get(pipeline.id)
console.log('Pipeline Health:', {
status: pipelineStatus.status.current,
healthScore: pipelineStatus.healthScore,
lastHealthCheck: pipelineStatus.lastHealthCheck,
metrics: {
totalExecutions: pipelineStatus.metrics.totalExecutions,
successRate: pipelineStatus.metrics.successRate,
avgDuration: pipelineStatus.metrics.avgDuration,
errorRate: pipelineStatus.metrics.errorRate
}
})
// View recent simulations
if (pipelineStatus.lastSimulation) {
console.log('Last Simulation:', {
success: pipelineStatus.lastSimulation.success,
duration: pipelineStatus.lastSimulation.duration,
timestamp: pipelineStatus.lastSimulation.completedAt
})
}
// Check for alerts
if (pipelineStatus.alerts?.length > 0) {
console.warn('⚠️ Active Alerts:')
pipelineStatus.alerts.forEach(alert => {
console.warn(` - ${alert.severity}: ${alert.message}`)
})
}Complete Integration Example
Here's a production-ready integration with error handling:
import { DeClient } from '@dedot/sdk'
class OrderFulfillmentService {
private client: DeClient
private pipelineId: string
constructor(apiKey: string, workspaceId: string) {
this.client = new DeClient({ apiKey, workspace: workspaceId })
}
async initialize() {
// Check if pipeline already exists
const pipelines = await this.client.pipelines.list({
filters: { name: "Order Fulfillment Pipeline" }
})
if (pipelines.length > 0) {
this.pipelineId = pipelines[0].id
console.log('Using existing pipeline:', this.pipelineId)
return
}
// Create and deploy new pipeline
const template = await this.createTemplate()
const validation = await this.client.pipelines.templates.validate(template.id)
if (!validation.valid) {
throw new Error(`Template validation failed: ${validation.errors.join(', ')}`)
}
const pipeline = await this.client.pipelines.deploy({
templateId: template.id,
name: "Order Fulfillment Pipeline"
})
this.pipelineId = pipeline.id
console.log('Pipeline deployed:', this.pipelineId)
}
async fulfillOrder(order: Order): Promise<FulfillmentResult> {
try {
// Execute pipeline
const execution = await this.client.pipelines.execute(this.pipelineId, {
metadata: {
orderId: order.id,
sku: order.items[0].sku,
quantity: order.items[0].quantity,
deliveryAddress: order.deliveryAddress
}
})
// Wait for completion (with timeout)
const result = await this.waitForCompletion(execution.id, 300000)
if (result.status === 'SUCCESS') {
return {
success: true,
warehouseId: result.results['Select Warehouse']?.id,
carrierId: result.results['Assign Carrier']?.id,
estimatedDelivery: result.results['Assign Carrier']?.estimatedDelivery,
duration: result.duration
}
} else {
return {
success: false,
error: result.error?.message || 'Unknown error',
failedStage: result.stages[result.currentStageIndex]?.name
}
}
} catch (error) {
console.error('Order fulfillment error:', error)
return {
success: false,
error: error.message
}
}
}
async getPipelineHealth(): Promise<HealthReport> {
const pipeline = await this.client.pipelines.get(this.pipelineId)
return {
status: pipeline.status.current,
healthScore: pipeline.healthScore,
metrics: pipeline.metrics,
alerts: pipeline.alerts || []
}
}
private async waitForCompletion(executionId: string, timeout: number) {
const startTime = Date.now()
while (Date.now() - startTime < timeout) {
const execution = await this.client.pipelines.executions.get(executionId)
if (execution.status === 'SUCCESS' || execution.status === 'FAILED') {
return execution
}
await new Promise(resolve => setTimeout(resolve, 2000))
}
throw new Error('Execution timeout')
}
private async createTemplate() {
// Template creation logic from Step 1
// ...
}
}
// Usage
const service = new OrderFulfillmentService(
process.env.DE_API_KEY,
process.env.DE_WORKSPACE_ID
)
await service.initialize()
// Fulfill orders
const order = await getOrder('ORD-12345')
const result = await service.fulfillOrder(order)
if (result.success) {
console.log('✅ Order fulfilled:', result)
} else {
console.error('❌ Fulfillment failed:', result.error)
}
// Monitor health
const health = await service.getPipelineHealth()
console.log('Pipeline health:', health)Advanced Features
Dynamic Query Filters
Use execution metadata in query filters:
{
name: "Select Warehouse",
action: "QUERY",
query: {
resource: "WAREHOUSE",
filters: {
// Access execution metadata
region: "${execution.metadata.deliveryAddress.region}",
// Access previous stage results
"inventory.sku": "${execution.results.Validate Inventory.sku}",
// Complex expressions
"capacity.available": {
$gte: "${execution.metadata.quantity * 1.2}" // 20% buffer
}
}
}
}Conditional Transitions
Complex branching logic:
{
transitions: {
onSuccess: {
type: "CONTINUE",
next: "Express Shipping",
condition: {
logic: "AND",
conditions: [
{
field: "execution.metadata.priority",
operator: "EQUALS",
value: "HIGH"
},
{
field: "result.distance",
operator: "LESS_THAN",
value: 50
}
]
}
},
onFailure: {
type: "SKIP",
next: "Standard Shipping"
}
}
}Webhook Authentication
Secure webhook requests:
{
action: "WEBHOOK",
webhook: {
url: "https://api.partner.com/fulfill",
method: "POST",
headers: {
"Authorization": "Bearer ${config.partnerApiKey}",
"X-Signature": "${generateSignature(body, config.webhookSecret)}",
"X-Request-ID": "${execution.id}"
},
body: {
orderId: "${execution.metadata.orderId}",
timestamp: "${Date.now()}"
}
}
}Parallel Stage Execution
Execute multiple stages concurrently:
{
name: "Parallel Resource Selection",
action: "QUERY",
parallel: [
{
name: "Select Warehouse",
query: { resource: "WAREHOUSE", /* ... */ }
},
{
name: "Select Carrier",
query: { resource: "CARRIER", /* ... */ }
},
{
name: "Check Inventory",
query: { resource: "INVENTORY", /* ... */ }
}
],
// Wait for all parallel actions to complete
waitForAll: true,
timeout: 30000
}Testing Pipelines
Manual Testing
Execute pipeline with test data:
const testExecution = await client.pipelines.execute(pipelineId, {
metadata: {
orderId: `TEST-${Date.now()}`,
sku: "TEST-WIDGET",
quantity: 1,
deliveryAddress: {
lat: 40.7128,
lng: -74.0060
}
},
tags: ['test', 'manual']
})Simulation Mode
Enable automatic simulations:
await client.pipelines.update(pipelineId, {
health: {
simulationEnabled: true,
simulationInterval: 300, // Every 5 minutes
testData: {
orderId: "SIM-${timestamp}",
sku: "TEST-ITEM",
quantity: 1
}
}
})
// View simulation results
const pipeline = await client.pipelines.get(pipelineId)
console.log('Last simulation:', pipeline.lastSimulation)Dry Run Mode
Test without side effects:
const dryRun = await client.pipelines.executions.dryRun(pipelineId, {
metadata: { /* test data */ }
})
console.log('Dry run results:')
dryRun.stages.forEach(stage => {
console.log(`${stage.name}: ${stage.predictedResult}`)
})
console.log('Estimated duration:', dryRun.estimatedDuration)Production Best Practices
1. Error Handling
Always handle pipeline failures:
try {
const result = await fulfillOrder(order)
if (!result.success) {
// Log failure
await logFailure(order.id, result.error, result.failedStage)
// Trigger fallback
await fallbackFulfillment(order)
// Alert operations team
await sendAlert('Order fulfillment failed', order.id)
}
} catch (error) {
await handleCriticalError(error)
}2. Monitoring & Alerts
Set up health monitoring:
// Check pipeline health every 5 minutes
setInterval(async () => {
const health = await service.getPipelineHealth()
if (health.healthScore < 70) {
await sendAlert('Pipeline health degraded', {
score: health.healthScore,
status: health.status,
alerts: health.alerts
})
}
}, 300000)3. Resource Limits
Configure appropriate limits:
const pipeline = await client.pipelines.deploy({
templateId: template.id,
config: {
concurrency: 50, // Max 50 parallel executions
timeout: 300000, // 5 minute timeout
maxExecutionsPerHour: 10000, // Rate limit
retryPolicy: {
maxAttempts: 3,
backoffMultiplier: 2
}
}
})4. Version Management
Use template versioning:
// Create new version
const v2Template = await client.pipelines.templates.create({
...originalTemplate,
version: "2.0.0",
stages: updatedStages
})
// Deploy side-by-side
const v2Pipeline = await client.pipelines.deploy({
templateId: v2Template.id,
name: "Order Fulfillment v2"
})
// Gradually shift traffic
// ... migrate executions ...
// Disable old version
await client.pipelines.update(oldPipelineId, {
status: 'DISABLED'
})Troubleshooting
Common Issues
Execution Stuck in PENDING
- Check worker status: Workers may be stopped
- Verify context: Ensure workspace has active workers
- Check logs: Look for initialization errors
High Failure Rate
- Review failed executions: Identify common failure patterns
- Check resource availability: Ensure warehouses/carriers are active
- Validate filters: Filters may be too restrictive
Slow Execution
- Check stage timeouts: May need adjustment
- Review query complexity: Optimize filters
- Monitor database performance: Add indexes if needed
Debug Mode
Enable detailed logging:
const execution = await client.pipelines.execute(pipelineId, {
metadata: { /* ... */ },
debug: true // Enable debug logging
})
// View detailed logs
const logs = await client.pipelines.executions.getLogs(execution.id)
console.log('Execution logs:', logs)
