Query System Architecture
Deep dive into De. Queries design, data architecture, and execution model.
System Overview
De. Queries provide intelligent service discovery and matching through a hybrid architecture that balances real-time data access with performance optimization.
┌─────────────────────────────────────────────────────────────┐
│ De. Query Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Service Registry (Query Database) │ │
│ │ • Service metadata and capabilities │ │
│ │ • Location and coverage data │ │
│ │ • Performance metrics │ │
│ │ • Synced from LSP databases every 5 minutes │ │
│ └──────────────────┬──────────────────────────────────┘ │
│ │ Discovery & Matching │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Query Services (Business Logic) │ │
│ │ • NetworkDiscoveryService │ │
│ │ • MatchingService │ │
│ │ • CapacityQueryService │ │
│ │ • PerformanceQueryService │ │
│ │ • PricingQueryService │ │
│ └──────────────────┬──────────────────────────────────┘ │
│ │ Real-time Data │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ LSP Database (Real-time Queries) │ │
│ │ • Live capacity data │ │
│ │ • Current pricing rules │ │
│ │ • Operational status │ │
│ │ • Cached for 5 minutes (configurable) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘Core Components
1. Service Registry
Purpose: Centralized index of all logistics services with searchable metadata
Data Model:
interface ServiceRegistryEntry {
compositeId: string // Format: "lsp:serviceId"
lsp: string // LSP xcode
serviceId: string // Service identifier
serviceType: SPType // WAREHOUSE, CARRIER, HUB, etc.
name: string
// Location data
location: {
coordinates: [number, number] // [lat, lng]
address: Address
timezone: string
country: string
city: string
region?: string
}
// Service capabilities (flexible schema)
capabilities: Record<string, any>
// Coverage information
coverage: {
geographic: GeographicCoverage[]
serviceTypes: string[]
}
// Performance summary (cached from LSP)
performance?: {
onTimeDeliveryRate: number
orderAccuracy: number
overallScore: number
lastUpdated: Date
}
// Metadata
status: 'ACTIVE' | 'INACTIVE' | 'MAINTENANCE'
tags: string[]
created: Timestamp
updated: Timestamp
lastSynced: Timestamp
}Database Indexes:
// Geospatial index for proximity queries
db.service_registry.createIndex({ 'location.coordinates': '2dsphere' })
// Service type and status for filtering
db.service_registry.createIndex({ serviceType: 1, status: 1 })
// LSP and service ID for direct lookups
db.service_registry.createIndex({ lsp: 1, serviceId: 1 })
// Performance scores for sorting
db.service_registry.createIndex({ 'performance.overallScore': -1 })
// Full-text search on name and tags
db.service_registry.createIndex({ name: 'text', tags: 'text' })Sync Strategy:
// Periodic sync from LSP databases
class ServiceRegistrySyncService {
private syncInterval = 300000 // 5 minutes
async syncServices(context: Context) {
const lspDb = this.App.database.main(context)
// Fetch services from LSP collections
const warehouses = await lspDb.collection('warehouses').find({ status: 'OPERATIONAL' })
const carriers = await lspDb.collection('carriers').find({ status: 'ACTIVE' })
const hubs = await lspDb.collection('hubs').find({ status: 'OPERATIONAL' })
// Transform and upsert to Query registry
for (const warehouse of warehouses) {
await this.upsertService({
compositeId: `${context.xcode}:${warehouse.id}`,
lsp: context.xcode,
serviceId: warehouse.id,
serviceType: 'WAREHOUSE',
capabilities: warehouse.capabilities,
location: warehouse.location,
// ... other fields
})
}
}
}2. Network Discovery Service
Purpose: Find services matching geographic and capability criteria
Query Pipeline:
class NetworkDiscoveryService {
async discoverServices(criteria: QueryCriteria): Promise<ServiceRegistryEntry[]> {
// 1. Build MongoDB query
const query = this.buildQuery(criteria)
// 2. Execute query with geospatial filtering
let services = await this.realmDb.collection('service_registry')
.find(query)
.toArray()
// 3. Apply capability filters
if (criteria.capabilities) {
services = this.applyCapabilityFilters(services, criteria.capabilities)
}
// 4. Calculate proximity scores
if (criteria.location?.coordinates) {
services = this.filterByProximity(
services,
criteria.location.coordinates,
criteria.location.maxDistance
)
}
// 5. Sort by relevance
return services.sort((a, b) => b.score - a.score)
}
}Proximity Scoring:
private filterByProximity(
services: ServiceRegistryEntry[],
targetCoordinates: [number, number],
maxDistance?: number
): ServiceRegistryEntry[] {
return services
.map(service => {
const distance = this.calculateDistance(
targetCoordinates,
service.location.coordinates
)
// Score: 100 at 0km, 0 at maxDistance
const score = maxDistance
? Math.max(0, 100 - (distance / maxDistance * 100))
: 100
return {
...service,
distance,
proximityScore: score
}
})
.filter(s => !maxDistance || s.distance <= maxDistance)
.sort((a, b) => a.distance - b.distance)
}
private calculateDistance(
coord1: [number, number],
coord2: [number, number]
): number {
// Haversine formula for great-circle distance
const [lat1, lon1] = coord1
const [lat2, lon2] = coord2
const R = 6371 // Earth's radius in km
const dLat = this.toRad(lat2 - lat1)
const dLon = this.toRad(lon2 - lon1)
const a =
Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(this.toRad(lat1)) * Math.cos(this.toRad(lat2)) *
Math.sin(dLon / 2) * Math.sin(dLon / 2)
const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))
return R * c
}3. Matching Service
Purpose: Intelligent order-to-service matching with multi-factor scoring
Scoring Algorithm:
class MatchingService {
async match(request: MatchRequest): Promise<MatchResult> {
// 1. Discover candidate services
const candidates = await this.discoverCandidates(request)
// 2. Score each candidate
const scoredCandidates = await Promise.all(
candidates.map(service => this.scoreService(service, request))
)
// 3. Apply strategy weights
const weighted = this.applyStrategyWeights(scoredCandidates, request.strategy)
// 4. Select top recommendations
const recommendations = this.selectRecommended(weighted, 5)
return {
recommendations,
totalCandidates: candidates.length,
strategy: request.strategy
}
}
private async scoreService(
service: ServiceRegistryEntry,
request: MatchRequest
): Promise<ScoredService> {
const scores = {
capability: await this.scoreCapabilityMatch(service, request),
proximity: this.scoreProximity(service, request),
capacity: await this.scoreCapacity(service, request),
performance: this.scorePerformance(service, request),
cost: await this.scoreCost(service, request)
}
// Calculate overall score (weighted average)
const weights = this.getStrategyWeights(request.strategy)
scores.overall =
scores.capability * weights.capability +
scores.proximity * weights.proximity +
scores.capacity * weights.capacity +
scores.performance * weights.performance +
scores.cost * weights.cost
return { service, scores }
}
}Scoring Factors:
1. Capability Score (0-100)
private scoreCapabilityMatch(
service: ServiceRegistryEntry,
request: MatchRequest
): number {
let score = 0
let matchedCount = 0
let totalRequired = 0
// Check required capabilities
for (const requirement of request.requirements || []) {
totalRequired++
if (this.hasCapability(service, requirement)) {
matchedCount++
score += 100 / totalRequired
}
}
// Check item-specific requirements
for (const item of request.items || []) {
if (item.requiresColdChain && service.capabilities?.environmental?.temperatureControlled) {
score += 10
}
if (item.requiresHazmatHandling && service.capabilities?.handling?.hazmat) {
score += 10
}
}
return Math.min(100, score)
}2. Proximity Score (0-100)
private scoreProximity(
service: ServiceRegistryEntry,
request: MatchRequest
): number {
if (!request.delivery?.coordinates) return 50 // Neutral if no location
const distance = this.calculateDistance(
request.delivery.coordinates,
service.location.coordinates
)
// Physical services: 100 at 0km, 0 at 500km
const maxDistance = 500
return Math.max(0, 100 - (distance / maxDistance * 100))
}3. Capacity Score (0-100)
private async scoreCapacity(
service: ServiceRegistryEntry,
request: MatchRequest
): Promise<number> {
// Query real-time capacity from LSP
const capacity = await this.capacityService.queryCapacity({
lsp: service.lsp,
serviceId: service.serviceId,
capacityType: 'STORAGE_SLOTS'
})
if (!capacity) return 50 // Unknown capacity = neutral
const required = this.calculateRequiredCapacity(request)
const utilizationAfter = (capacity.used + required) / capacity.total
// Prefer services with 20-80% utilization (not empty, not full)
if (utilizationAfter > 0.9) return 0 // Too full
if (utilizationAfter < 0.2) return 80 // Very available
return 100 // Optimal utilization
}4. Performance Score (0-100)
private scorePerformance(
service: ServiceRegistryEntry,
request: MatchRequest
): number {
if (!service.performance) return 70 // No data = slightly above neutral
return service.performance.overallScore || 70
}5. Cost Score (0-100)
private async scoreCost(
service: ServiceRegistryEntry,
request: MatchRequest
): Promise<number> {
// Get pricing estimate
const pricing = await this.pricingService.estimateCost({
lsp: service.lsp,
serviceId: service.serviceId,
serviceType: service.serviceType,
orderContext: {
items: request.items,
requirements: request.requirements,
deliveryLocation: request.delivery
}
})
// Normalize cost to 0-100 score (lower cost = higher score)
const maxCost = 1000 // Assume max reasonable cost
return Math.max(0, 100 - (pricing.totalCost / maxCost * 100))
}Strategy Weights:
private getStrategyWeights(strategy: MatchingStrategy) {
switch (strategy) {
case 'BALANCED':
return {
capability: 0.30,
proximity: 0.20,
capacity: 0.20,
performance: 0.15,
cost: 0.15
}
case 'COST_OPTIMIZED':
return {
capability: 0.25,
proximity: 0.10,
capacity: 0.15,
performance: 0.10,
cost: 0.40
}
case 'SPEED_OPTIMIZED':
return {
capability: 0.25,
proximity: 0.40,
capacity: 0.15,
performance: 0.15,
cost: 0.05
}
case 'QUALITY_OPTIMIZED':
return {
capability: 0.30,
proximity: 0.10,
capacity: 0.10,
performance: 0.40,
cost: 0.10
}
}
}4. Capacity Query Service
Purpose: Real-time capacity queries with intelligent caching
Caching Strategy:
class CapacityQueryService {
private cache = new Map<string, CachedCapacity>()
private defaultCacheDuration = 300000 // 5 minutes
async queryCapacity(request: CapacityQueryRequest): Promise<Capacity> {
const cacheKey = `${request.lsp}:${request.serviceId}:${request.capacityType}`
// Check cache first
const cached = this.cache.get(cacheKey)
if (cached && Date.now() - cached.timestamp < this.defaultCacheDuration) {
return cached.data
}
// Query LSP database
const lspDb = this.App.database.main({ xcode: request.lsp })
const capacity = await this.fetchCapacity(lspDb, request)
// Cache result
this.cache.set(cacheKey, {
data: capacity,
timestamp: Date.now()
})
return capacity
}
async batchQueryCapacity(
requests: CapacityQueryRequest[]
): Promise<Map<string, Capacity>> {
// Execute queries in parallel
const results = await Promise.all(
requests.map(req => this.queryCapacity(req))
)
// Map results by service ID
return new Map(
results.map((capacity, i) => [
`${requests[i].lsp}:${requests[i].serviceId}`,
capacity
])
)
}
invalidateCache(lsp: string, serviceId: string, capacityType?: string) {
if (capacityType) {
this.cache.delete(`${lsp}:${serviceId}:${capacityType}`)
} else {
// Invalidate all capacity types for this service
for (const key of this.cache.keys()) {
if (key.startsWith(`${lsp}:${serviceId}:`)) {
this.cache.delete(key)
}
}
}
}
}5. Pricing Query Service
Purpose: Cost estimation and price comparison
Pricing Calculation:
class PricingQueryService {
async estimateCost(request: PricingEstimateRequest): Promise<PricingEstimate> {
// 1. Fetch pricing rules from LSP
const lspDb = this.App.database.main({ xcode: request.lsp })
const pricingRules = await lspDb.collection('pricing')
.find({
serviceId: request.serviceId,
serviceType: request.serviceType,
status: 'ACTIVE'
})
.toArray()
// 2. Calculate base rate
const baseRate = this.calculateBaseRate(pricingRules, request.orderContext)
// 3. Apply surcharges
const surcharges = this.calculateSurcharges(pricingRules, request.orderContext)
// 4. Calculate taxes
const taxes = this.calculateTaxes(baseRate + surcharges, request.orderContext)
// 5. Add fees
const fees = this.calculateFees(pricingRules, request.orderContext)
return {
totalCost: baseRate + surcharges + taxes + fees,
currency: pricingRules[0]?.currency?.code || 'USD',
breakdown: { baseRate, surcharges, taxes, fees },
appliedRules: pricingRules.map(r => r.id)
}
}
private calculateBaseRate(
rules: PricingRule[],
context: OrderContext
): number {
// Find applicable base rate rule
const baseRateRule = rules.find(r =>
r.chargeType === 'BASE_RATE' &&
this.matchesConditions(r, context)
)
if (!baseRateRule) return 0
// Calculate based on pricing model
switch (baseRateRule.pricingModel) {
case 'FLAT_RATE':
return baseRateRule.rate
case 'PER_UNIT':
const totalUnits = context.items?.reduce((sum, item) => sum + item.quantity, 0) || 0
return baseRateRule.rate * totalUnits
case 'TIERED':
return this.calculateTieredRate(baseRateRule.tiers, context)
default:
return 0
}
}
}Performance Optimization
Query Optimization
1. Index Strategy
// Compound indexes for common queries
db.service_registry.createIndex({
serviceType: 1,
status: 1,
'location.coordinates': '2dsphere'
})
// Partial indexes for active services only
db.service_registry.createIndex(
{ serviceType: 1, 'performance.overallScore': -1 },
{ partialFilterExpression: { status: 'ACTIVE' } }
)2. Query Hints
// Use specific index for geospatial queries
await db.collection('service_registry')
.find(query)
.hint({ 'location.coordinates': '2dsphere' })
.toArray()3. Projection Optimization
// Only fetch needed fields
await db.collection('service_registry')
.find(query)
.project({
compositeId: 1,
serviceType: 1,
name: 1,
location: 1,
capabilities: 1,
performance: 1
})
.toArray()Caching Strategy
Multi-Level Caching:
// Level 1: In-memory cache (fastest)
private memoryCache = new Map<string, CachedData>()
// Level 2: Redis cache (shared across instances)
private redisCache: Redis
async getCached<T>(key: string): Promise<T | null> {
// Check memory first
const memoryCached = this.memoryCache.get(key)
if (memoryCached && Date.now() - memoryCached.timestamp < 300000) {
return memoryCached.data as T
}
// Check Redis
const redisCached = await this.redisCache.get(key)
if (redisCached) {
const data = JSON.parse(redisCached)
// Store in memory for next time
this.memoryCache.set(key, { data, timestamp: Date.now() })
return data as T
}
return null
}Parallel Query Execution
async batchDiscover(
criteria: QueryCriteria[]
): Promise<ServiceRegistryEntry[][]> {
// Execute queries in parallel
return Promise.all(
criteria.map(c => this.discoverServices(c))
)
}Scalability Considerations
Horizontal Scaling
Service Registry Sharding:
// Shard by LSP to distribute load
const shard = hash(lsp) % TOTAL_SHARDS
const db = this.getShardedDatabase(shard)Read Replicas:
// Use read replicas for query-heavy operations
const realmDb = this.App.database.realm(context, { readPreference: 'secondaryPreferred' })Load Balancing
// Round-robin across query service instances
const instance = instances[requestCount % instances.length]
await instance.discoverServices(criteria)
