Skip to main content
Workers are the backbone of Activepieces, responsible for executing workflows, processing triggers, and handling background tasks. This guide explains worker architecture and configuration.

What are Workers?

Workers are Node.js processes that:
  • Consume jobs from Redis queue (BullMQ)
  • Execute workflows using the execution engine
  • Poll external APIs for scheduled triggers
  • Renew webhooks for trigger subscriptions
  • Process async tasks (emails, notifications)
Workers are separate from the API server, allowing independent scaling based on workload.

Worker Architecture

Worker Process Lifecycle

Job Types

Workers process different job types from the queue:

1. ExecuteFlowJob

ExecuteFlowJob
job
Purpose: Execute a workflow instanceTriggered by:
  • Webhooks
  • Manual runs
  • Scheduled triggers
  • Child flows
Data:
{
  flowVersionId: string
  projectId: string
  payload: unknown
  executionType: ExecutionType
  httpRequestId?: string
  parentRunId?: string
}
Processing:
  1. Load flow version from database
  2. Spawn execution engine
  3. Pass flow definition and payload
  4. Collect execution result
  5. Save flow run to database

2. PollingJob

PollingJob
job
Purpose: Poll external API for new dataTriggered by: Cron schedule (default: every 5 minutes)Data:
{
  flowVersionId: string
  projectId: string
  triggerType: TriggerType.POLLING
}
Processing:
  1. Load trigger configuration
  2. Call piece’s onEnable hook
  3. Fetch new items from API
  4. Deduplicate items
  5. Enqueue ExecuteFlowJob for each item

3. WebhookJob

WebhookJob
job
Purpose: Process webhook payloadTriggered by: Incoming webhook HTTP requestData:
{
  flowVersionId: string
  payload: unknown
  httpRequestId: string
}
Processing:
  1. Validate webhook signature
  2. Transform payload if needed
  3. Enqueue ExecuteFlowJob

4. RenewWebhookJob

RenewWebhookJob
job
Purpose: Renew webhook subscriptionsTriggered by: Cron schedule (before expiration)Data:
{
  flowVersionId: string
  projectId: string
}
Processing:
  1. Call piece’s renewal endpoint
  2. Update webhook registration
  3. Store new webhook URL/token

5. UserInteractionJob

UserInteractionJob
job
Purpose: Handle human-in-the-loop tasksData:
{
  flowRunId: string
  stepName: string
  action: 'APPROVE' | 'REJECT'
}
Processing:
  1. Resume paused flow run
  2. Continue from approval step
  3. Complete execution

Worker Configuration

Container Type

Control what runs in each container:
Default: Both API and workers in one container
.env
AP_CONTAINER_TYPE=WORKER_AND_APP
Use for:
  • Development
  • Small deployments
  • Single-server setups
Implementation (from docker-entrypoint.sh):
node --enable-source-maps packages/server/api/dist/src/bootstrap.js

Worker Concurrency

AP_WORKER_CONCURRENCY
number
Number of jobs processed simultaneously per workerFormula:
  • CPU-bound workflows: concurrency = CPU cores
  • I/O-bound workflows: concurrency = CPU cores × 2-4
Examples:
# 2 CPU cores, data processing
AP_WORKER_CONCURRENCY=2

# 4 CPU cores, API calls/webhooks
AP_WORKER_CONCURRENCY=12
Trade-offs:
  • Higher concurrency = more throughput
  • Higher concurrency = more memory usage
  • Too high = context switching overhead

Worker Token (Dedicated Workers)

AP_WORKER_TOKEN
string
Authentication token for dedicated worker deployments
.env
# Main app
AP_CONTAINER_TYPE=APP

# Dedicated worker
AP_CONTAINER_TYPE=WORKER
AP_WORKER_TOKEN=secure_random_token
Used to authenticate worker API calls to main app.

BullMQ Integration

Activepieces uses BullMQ for job queue management.

Queue Structure

Location: app/workers/queue/queue-manager.ts Queue configuration:
const queue = new Queue('activepieces', {
  connection: redis,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 1000  // Start with 1 second
    },
    removeOnComplete: {
      age: 3600  // Keep for 1 hour
    },
    removeOnFail: {
      age: 86400 * 7  // Keep for 7 days
    }
  }
})

Job Lifecycle

Job Priorities

// High priority (webhooks)
await queue.add('execute-flow', data, {
  priority: 1
})

// Normal priority (scheduled)
await queue.add('polling', data, {
  priority: 5
})

// Low priority (maintenance)
await queue.add('cleanup', data, {
  priority: 10
})

Delayed Jobs

// Execute after 1 hour
await queue.add('execute-flow', data, {
  delay: 3600000  // ms
})

Repeating Jobs

// Poll every 5 minutes
await queue.add('polling', data, {
  repeat: {
    every: 300000,  // 5 minutes in ms
    immediately: true
  }
})

// Cron schedule
await queue.add('cleanup', data, {
  repeat: {
    pattern: '0 2 * * *'  // 2 AM daily
  }
})

Worker Scaling

Horizontal Scaling

Add more worker containers:
docker-compose.yml
services:
  worker-1:
    image: ghcr.io/activepieces/activepieces:0.79.0
    environment:
      AP_CONTAINER_TYPE: WORKER
      AP_WORKER_CONCURRENCY: "4"
    env_file: .env
    
  worker-2:
    image: ghcr.io/activepieces/activepieces:0.79.0
    environment:
      AP_CONTAINER_TYPE: WORKER
      AP_WORKER_CONCURRENCY: "4"
    env_file: .env
    
  worker-3:
    image: ghcr.io/activepieces/activepieces:0.79.0
    environment:
      AP_CONTAINER_TYPE: WORKER
      AP_WORKER_CONCURRENCY: "4"
    env_file: .env

Kubernetes Scaling

apiVersion: apps/v1
kind: Deployment
metadata:
  name: activepieces-worker
spec:
  replicas: 5
  selector:
    matchLabels:
      app: activepieces-worker
  template:
    metadata:
      labels:
        app: activepieces-worker
    spec:
      containers:
      - name: worker
        image: ghcr.io/activepieces/activepieces:0.79.0
        env:
        - name: AP_CONTAINER_TYPE
          value: "WORKER"
        - name: AP_WORKER_CONCURRENCY
          value: "4"
        resources:
          requests:
            cpu: 1000m
            memory: 2Gi
          limits:
            cpu: 2000m
            memory: 4Gi

Auto-scaling Rules

Scale based on queue depth:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: activepieces-worker
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: External
    external:
      metric:
        name: redis_queue_length
      target:
        type: AverageValue
        averageValue: "100"  # Scale when >100 jobs/worker

Monitoring Workers

Queue Metrics

Check queue health:
# Queue depth
redis-cli LLEN bull:activepieces:waiting

# Active jobs
redis-cli LLEN bull:activepieces:active

# Failed jobs
redis-cli LLEN bull:activepieces:failed

# Completed jobs
redis-cli LLEN bull:activepieces:completed

Queue UI

Enable BullMQ Board for visual monitoring:
.env
AP_QUEUE_UI_ENABLED=true
AP_QUEUE_UI_USERNAME=admin
AP_QUEUE_UI_PASSWORD=secure_password
Access at: http://your-domain/admin/queues Features:
  • View waiting/active/failed jobs
  • Retry failed jobs
  • Remove jobs
  • View job data and stack traces
  • Queue statistics

Logs

Monitor worker logs:
# Docker Compose
docker compose logs -f worker

# Kubernetes
kubectl logs -f deployment/activepieces-worker

Metrics API

Activepieces exposes queue metrics:
curl http://localhost:3000/api/v1/admin/queue/metrics
Response:
{
  "waiting": 42,
  "active": 8,
  "completed": 1523,
  "failed": 12,
  "delayed": 0,
  "paused": 0
}

Performance Tuning

Redis Configuration

Optimize Redis for queue performance:
redis.conf
# Memory
maxmemory 2gb
maxmemory-policy allkeys-lru

# Persistence (optional)
save 900 1
save 300 10
save 60 10000

# Networking
tcp-backlog 511
timeout 0

Worker Optimization

Increase Concurrency

For I/O-bound workflows:
AP_WORKER_CONCURRENCY=16

Pre-warm Cache

Load pieces into memory on startup:
AP_PRE_WARM_CACHE=true

Adjust Timeouts

Increase for long-running workflows:
AP_FLOW_TIMEOUT_SECONDS=1200

Resource Limits

Set Docker resource limits:
deploy:
  resources:
    limits:
      cpus: '2'
      memory: 4G

Error Handling

Retry Strategy

BullMQ retries failed jobs automatically:
{
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 1000  // 1s, 2s, 4s
  }
}

Failed Job Retention

Configure how long to keep failed jobs:
.env
AP_REDIS_FAILED_JOB_RETENTION_DAYS=7
AP_REDIS_FAILED_JOB_RETENTION_MAX_COUNT=100

Dead Letter Queue

After max retries, jobs move to failed queue:
# View failed jobs
redis-cli LRANGE bull:activepieces:failed 0 -1

# Manually retry
curl -X POST http://localhost:3000/api/v1/admin/queue/retry/:jobId

Best Practices

Use dedicated containers for production:Benefits:
  • Independent scaling
  • Resource optimization
  • Better fault isolation
  • Easier monitoring
Alert when queue grows:
# Alert if waiting > 1000
if [ $(redis-cli LLEN bull:activepieces:waiting) -gt 1000 ]; then
  alert "Queue backlog detected"
fi
Prevent memory leaks:
resources:
  limits:
    memory: 4Gi
Visual debugging:
AP_QUEUE_UI_ENABLED=true

Troubleshooting

Symptoms: Jobs not processingCheck:
  1. Workers running: docker ps | grep worker
  2. Redis connection: redis-cli ping
  3. Worker logs for errors
Fix:
# Restart workers
docker compose restart worker
Symptoms: Workers consuming too much RAMCheck:
  1. Worker concurrency: AP_WORKER_CONCURRENCY
  2. Workflow complexity
  3. Memory leaks in custom code
Fix:
# Reduce concurrency
AP_WORKER_CONCURRENCY=2

# Set memory limits
AP_SANDBOX_MEMORY_LIMIT=128
Symptoms: Many jobs in failed queueCheck:
  1. Failed job details in Queue UI
  2. Worker logs for stack traces
  3. External API availability
Fix:
# View failed job
redis-cli LINDEX bull:activepieces:failed 0

# Increase timeout
AP_FLOW_TIMEOUT_SECONDS=600

Next Steps

Engine

Understand execution engine

Scaling

Scale workers horizontally

Architecture

System architecture overview

Monitoring

Setup monitoring and alerts