WorkflowExecutor API Reference
October 23, 2025 ยท View on GitHub
The WorkflowExecutor class is your main interface for managing workflows. It provides methods to register, start, monitor, and control workflow execution.
Constructor
new WorkflowExecutor(client: Client)
Creates a new WorkflowExecutor.
Parameters:
client(Client): An instance ofClient.
Methods
registerWorkflow(override: boolean, workflow: WorkflowDef): Promise<void>
Registers a workflow definition with Conductor.
Parameters:
override(boolean): Whether to override the existing workflow definition.workflow(WorkflowDef): The workflow definition.
Returns:
Promise<void>
Example:
import { WorkflowExecutor, workflow } from "@io-orkes/conductor-javascript";
const executor = new WorkflowExecutor(client);
// Register a workflow
await executor.registerWorkflow(
true,
workflow("email_workflow", [
simpleTask("send_email", "email_task", { to: "user@example.com" }),
])
);
startWorkflow(workflowRequest: StartWorkflowRequest): Promise<string>
Starts a new workflow execution.
Parameters:
workflowRequest(StartWorkflowRequest): The request to start a workflow.
Returns:
Promise<string>: The ID of the workflow instance.
Example:
import { WorkflowExecutor } from "@io-orkes/conductor-javascript";
const executor = new WorkflowExecutor(client);
// Start a workflow
const executionId = await executor.startWorkflow({
name: "email_workflow",
version: 1,
input: {
to: "user@example.com",
subject: "Welcome!",
message: "Welcome to our platform!",
},
});
console.log(`Workflow started with ID: ${executionId}`);
executeWorkflow(workflowRequest: StartWorkflowRequest, name: string, version: number, requestId: string, waitUntilTaskRef?: string): Promise<WorkflowRun>
executeWorkflow(workflowRequest: StartWorkflowRequest, name: string, version: number, requestId: string, waitUntilTaskRef: string, waitForSeconds: number, consistency: Consistency, returnStrategy: ReturnStrategy): Promise<EnhancedSignalResponse>
Executes a workflow synchronously and waits for completion. Can return different responses based on the provided parameters.
Parameters:
workflowRequest(StartWorkflowRequest): The request to start a workflow.name(string): The name of the workflow.version(number): The version of the workflow.requestId(string): A unique ID for the request.waitUntilTaskRef(string, optional): The reference name of the task to wait for.waitForSeconds(number, optional): The number of seconds to wait for the task.consistency(Consistency, optional): The consistency level for the read operations.returnStrategy(ReturnStrategy, optional): The strategy for what data to return.
Returns:
Promise<WorkflowRun | EnhancedSignalResponse>: AWorkflowRunobject or aEnhancedSignalResponseobject.
Example:
import {
WorkflowExecutor,
ReturnStrategy,
} from "@io-orkes/conductor-javascript";
const executor = new WorkflowExecutor(client);
// Execute workflow synchronously
const workflowRun = await executor.executeWorkflow(
{
name: "data_processing",
version: 1,
input: { fileId: "file_123" },
},
"data_processing",
1,
"req_123"
);
console.log(`Workflow completed with status: ${workflowRun.status}`);
startWorkflows(workflowsRequest: StartWorkflowRequest[]): Promise<string>[]
Starts multiple workflows at once.
Parameters:
workflowsRequest(StartWorkflowRequest[]): An array of workflow start requests.
Returns:
Promise<string>[]: An array of promises that resolve to the workflow instance IDs.
Example:
import { WorkflowExecutor } from "@io-orkes/conductor-javascript";
const executor = new WorkflowExecutor(client);
// Start multiple workflows
const workflowRequests = [
{ name: "email_workflow", version: 1, input: { to: "user1@example.com" } },
{ name: "email_workflow", version: 1, input: { to: "user2@example.com" } },
{ name: "email_workflow", version: 1, input: { to: "user3@example.com" } },
];
const promises = executor.startWorkflows(workflowRequests);
// Wait for all to complete
const executionIds = await Promise.all(promises);
console.log(`Started ${executionIds.length} workflows:`, executionIds);
goBackToTask(workflowInstanceId: string, taskFinderPredicate: TaskFinderPredicate, rerunWorkflowRequestOverrides: Partial<RerunWorkflowRequest> = {}): Promise<void>
Reruns a workflow from a specific task.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.taskFinderPredicate(TaskFinderPredicate): A function to find the task to rerun from.rerunWorkflowRequestOverrides(Partial<RerunWorkflowRequest>, optional): Overrides for the rerun request.
Returns:
Promise<void>
goBackToFirstTaskMatchingType(workflowInstanceId: string, taskType: string): Promise<void>
Reruns a workflow from the first task of a specific type.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.taskType(string): The type of the task to rerun from.
Returns:
Promise<void>
getWorkflow(workflowInstanceId: string, includeTasks: boolean, retry: number = 0): Promise<Workflow>
Gets the execution status of a workflow.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.includeTasks(boolean): Whether to include the tasks in the response.retry(number, optional): The number of times to retry on failure.
Returns:
Promise<Workflow>: The workflow execution status.
getWorkflowStatus(workflowInstanceId: string, includeOutput: boolean, includeVariables: boolean): Promise<WorkflowStatus>
Gets a summary of the current workflow status.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.includeOutput(boolean): Whether to include the output in the response.includeVariables(boolean): Whether to include the variables in the response.
Returns:
Promise<WorkflowStatus>: The workflow status summary.
getExecution(workflowInstanceId: string, includeTasks: boolean = true): Promise<Workflow>
Gets the execution status of a workflow, including tasks by default.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.includeTasks(boolean, optional): Whether to include the tasks in the response. Defaults totrue.
Returns:
Promise<Workflow>: The workflow execution status.
pause(workflowInstanceId: string): Promise<void>
Pauses a running workflow.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.
Returns:
Promise<void>
reRun(workflowInstanceId: string, rerunWorkflowRequest: Partial<RerunWorkflowRequest> = {}): Promise<string>
Reruns a workflow with new parameters.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.rerunWorkflowRequest(Partial<RerunWorkflowRequest>, optional): Overrides for the rerun request.
Returns:
Promise<string>: The ID of the new workflow instance.
restart(workflowInstanceId: string, useLatestDefinitions: boolean): Promise<void>
Restarts a workflow.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.useLatestDefinitions(boolean): Whether to use the latest workflow definition.
Returns:
Promise<void>
resume(workflowInstanceId: string): Promise<void>
Resumes a paused workflow.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.
Returns:
Promise<void>
retry(workflowInstanceId: string, resumeSubworkflowTasks: boolean): Promise<void>
Retries a workflow from the last failing task.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.resumeSubworkflowTasks(boolean): Whether to resume tasks in sub-workflows.
Returns:
Promise<void>
search(start: number, size: number, query: string, freeText: string, sort: string = "", skipCache: boolean = false): Promise<ScrollableSearchResultWorkflowSummary>
Searches for workflows.
Parameters:
start(number): The starting offset.size(number): The number of results to return.query(string): The search query.freeText(string): The free text to search for.sort(string, optional): The sort order.skipCache(boolean, optional): Whether to skip the cache.
Returns:
Promise<ScrollableSearchResultWorkflowSummary>: The search results.
skipTasksFromWorkflow(workflowInstanceId: string, taskReferenceName: string, skipTaskRequest: Partial<SkipTaskRequest>): Promise<void>
Skips a task in a running workflow.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.taskReferenceName(string): The reference name of the task to skip.skipTaskRequest(Partial<SkipTaskRequest>): The request to skip the task.
Returns:
Promise<void>
terminate(workflowInstanceId: string, reason: string): Promise<void>
Terminates a running workflow.
Parameters:
workflowInstanceId(string): The ID of the workflow instance.reason(string): The reason for termination.
Returns:
Promise<void>
updateTask(taskId: string, workflowInstanceId: string, taskStatus: TaskResultStatus, outputData: Record<string, any>): Promise<string>
Updates a task by its ID.
Parameters:
taskId(string): The ID of the task.workflowInstanceId(string): The ID of the workflow instance.taskStatus(TaskResultStatus): The new status of the task.outputData(Record<string, any>): The output data of the task.
Returns:
Promise<string>
updateTaskByRefName(taskReferenceName: string, workflowInstanceId: string, status: TaskResultStatus, taskOutput: Record<string, any>): Promise<string>
Updates a task by its reference name.
Parameters:
taskReferenceName(string): The reference name of the task.workflowInstanceId(string): The ID of the workflow instance.status(TaskResultStatus): The new status of the task.taskOutput(Record<string, any>): The output data of the task.
Returns:
Promise<string>
getTask(taskId: string): Promise<Task>
Gets a task by its ID.
Parameters:
taskId(string): The ID of the task.
Returns:
Promise<Task>: The task.
updateTaskSync(taskReferenceName: string, workflowInstanceId: string, status: TaskResultStatusEnum, taskOutput: Record<string, any>, workerId?: string): Promise<Workflow>
Updates a task by its reference name synchronously and returns the complete workflow.
Parameters:
taskReferenceName(string): The reference name of the task.workflowInstanceId(string): The ID of the workflow instance.status(TaskResultStatusEnum): The new status of the task.taskOutput(Record<string, any>): The output data of the task.workerId(string, optional): The ID of the worker.
Returns:
Promise<Workflow>: The updated workflow.
signal(workflowInstanceId: string, status: TaskResultStatusEnum, taskOutput: Record<string, any>, returnStrategy: ReturnStrategy = ReturnStrategy.TARGET_WORKFLOW): Promise<EnhancedSignalResponse>
Signals a workflow task and returns data based on the specified return strategy.
Parameters:
workflowInstanceId(string): The ID of the workflow instance to signal.status(TaskResultStatusEnum): The task status to set.taskOutput(Record<string, any>): The output data for the task.returnStrategy(ReturnStrategy, optional): The strategy for what data to return. Defaults toTARGET_WORKFLOW.
Returns:
Promise<EnhancedSignalResponse>: The response from the signal.
signalAsync(workflowInstanceId: string, status: TaskResultStatusEnum, taskOutput: Record<string, any>): Promise<void>
Signals a workflow task asynchronously (fire-and-forget).
Parameters:
workflowInstanceId(string): The ID of the workflow instance to signal.status(TaskResultStatusEnum): The task status to set.taskOutput(Record<string, any>): The output data for the task.
Returns:
Promise<void>
Type Definitions
Consistency
enum Consistency {
SYNCHRONOUS = "SYNCHRONOUS",
DURABLE = "DURABLE",
REGION_DURABLE = "REGION_DURABLE",
}
ReturnStrategy
enum ReturnStrategy {
TARGET_WORKFLOW = "TARGET_WORKFLOW",
BLOCKING_WORKFLOW = "BLOCKING_WORKFLOW",
BLOCKING_TASK = "BLOCKING_TASK",
BLOCKING_TASK_INPUT = "BLOCKING_TASK_INPUT",
}
TaskResultStatusEnum
enum TaskResultStatusEnum {
IN_PROGRESS = "IN_PROGRESS",
FAILED = "FAILED",
FAILED_WITH_TERMINAL_ERROR = "FAILED_WITH_TERMINAL_ERROR",
COMPLETED = "COMPLETED",
}
StartWorkflowRequest
type StartWorkflowRequest = {
correlationId?: string;
createdBy?: string;
externalInputPayloadStoragePath?: string;
idempotencyKey?: string;
idempotencyStrategy?: "FAIL" | "RETURN_EXISTING" | "FAIL_ON_RUNNING";
input?: {
[key: string]: unknown;
};
name: string;
priority?: number;
taskToDomain?: {
[key: string]: string;
};
version?: number;
workflowDef?: WorkflowDef;
};
WorkflowDef
interface WorkflowDef {
name: string;
description?: string;
version?: number;
tasks: WorkflowTask[];
inputParameters?: string[];
outputParameters?: Record<string, any>;
failureWorkflow?: string;
schemaVersion?: number;
restartable?: boolean;
workflowStatusListenerEnabled?: boolean;
workflowStatusListenerSink?: string;
ownerEmail?: string;
ownerApp?: string;
timeoutPolicy?: "TIME_OUT_WF" | "ALERT_ONLY";
timeoutSeconds?: number;
variables?: Record<string, any>;
inputTemplate?: Record<string, any>;
inputSchema?: SchemaDef;
outputSchema?: SchemaDef;
enforceSchema?: boolean;
maskedFields?: string[];
rateLimitConfig?: RateLimitConfig;
cacheConfig?: CacheConfig;
metadata?: Record<string, any>;
createTime?: number;
updateTime?: number;
createdBy?: string;
updatedBy?: string;
}
CacheConfig
export type CacheConfig = {
key?: string;
ttlInSecond?: number;
};
RateLimitConfig
export type RateLimitConfig = {
concurrentExecLimit?: number;
rateLimitKey?: string;
};
WorkflowTask
interface WorkflowTask {
name: string;
taskReferenceName: string;
type: string;
description?: string;
optional?: boolean;
inputParameters?: Record<string, any>;
asyncComplete?: boolean;
startDelay?: number;
retryCount?: number;
evaluatorType?: string;
expression?: string;
decisionCases?: Record<string, WorkflowTask[]>;
defaultCase?: WorkflowTask[];
forkTasks?: WorkflowTask[][];
joinOn?: string[];
joinStatus?: string;
loopCondition?: string;
loopOver?: WorkflowTask[];
dynamicTaskNameParam?: string;
dynamicForkTasksParam?: string;
dynamicForkTasksInputParamName?: string;
defaultExclusiveJoinTask?: string[];
caseExpression?: string;
caseValueParam?: string;
sink?: string;
taskDefinition?: TaskDef;
rateLimited?: boolean;
permissive?: boolean;
cacheConfig?: CacheConfig;
onStateChange?: Record<string, StateChangeEvent[]>;
scriptExpression?: string;
subWorkflowParam?: SubWorkflowParams;
}
SubWorkflowParams
export type SubWorkflowParams = {
idempotencyKey?: string;
idempotencyStrategy?: "FAIL" | "RETURN_EXISTING" | "FAIL_ON_RUNNING";
name?: string;
taskToDomain?: {
[key: string]: string;
};
version?: number;
workflowDefinition?: WorkflowDef;
};
StateChangeEvent
export type StateChangeEvent = {
payload?: {
[key: string]: unknown;
};
type: string;
};
WorkflowRun
type WorkflowRun = {
correlationId?: string;
createTime?: number;
createdBy?: string;
input?: {
[key: string]: unknown;
};
output?: {
[key: string]: unknown;
};
priority?: number;
requestId?: string;
responseType?:
| "TARGET_WORKFLOW"
| "BLOCKING_WORKFLOW"
| "BLOCKING_TASK"
| "BLOCKING_TASK_INPUT";
status?:
| "RUNNING"
| "COMPLETED"
| "FAILED"
| "TIMED_OUT"
| "TERMINATED"
| "PAUSED";
targetWorkflowId?: string;
targetWorkflowStatus?: string;
tasks?: Array<Task>;
updateTime?: number;
variables?: {
[key: string]: unknown;
};
workflowId?: string;
};
EnhancedSignalResponse
interface EnhancedSignalResponse extends SignalResponse {
correlationId?: string;
input?: {
[key: string]: unknown;
};
output?: {
[key: string]: unknown;
};
requestId?: string;
responseType?:
| "TARGET_WORKFLOW"
| "BLOCKING_WORKFLOW"
| "BLOCKING_TASK"
| "BLOCKING_TASK_INPUT";
targetWorkflowId?: string;
targetWorkflowStatus?: string;
workflowId?: string;
priority?: number;
variables?: Record<string, unknown>;
tasks?: Task[];
createdBy?: string;
createTime?: number;
status?: string;
updateTime?: number;
taskType?: string;
taskId?: string;
referenceTaskName?: string;
retryCount?: number;
taskDefName?: string;
workflowType?: string;
isTargetWorkflow(): boolean;
isBlockingWorkflow(): boolean;
isBlockingTask(): boolean;
isBlockingTaskInput(): boolean;
getWorkflow(): Workflow;
getBlockingTask(): Task;
getTaskInput(): Record<string, unknown>;
getWorkflowId(): string;
getTargetWorkflowId(): string;
hasWorkflowData(): boolean;
hasTaskData(): boolean;
getResponseType(): string;
isTerminal(): boolean;
isRunning(): boolean;
isPaused(): boolean;
getSummary(): string;
toDebugJSON(): Record<string, unknown>;
toString(): string;
}
TaskFinderPredicate
type TaskFinderPredicate = (task: Task) => boolean;
RerunWorkflowRequest
type RerunWorkflowRequest = {
correlationId?: string;
reRunFromTaskId?: string;
reRunFromWorkflowId?: string;
taskInput?: {
[key: string]: unknown;
};
workflowInput?: {
[key: string]: unknown;
};
};
Workflow
type Workflow = {
correlationId?: string;
createTime?: number;
createdBy?: string;
endTime?: number;
event?: string;
externalInputPayloadStoragePath?: string;
externalOutputPayloadStoragePath?: string;
failedReferenceTaskNames?: Array<string>;
failedTaskNames?: Array<string>;
history?: Array<Workflow>;
idempotencyKey?: string;
input?: {
[key: string]: unknown;
};
lastRetriedTime?: number;
output?: {
[key: string]: unknown;
};
ownerApp?: string;
parentWorkflowId?: string;
parentWorkflowTaskId?: string;
priority?: number;
rateLimitKey?: string;
rateLimited?: boolean;
reRunFromWorkflowId?: string;
reasonForIncompletion?: string;
startTime?: number;
status?:
| "RUNNING"
| "COMPLETED"
| "FAILED"
| "TIMED_OUT"
| "TERMINATED"
| "PAUSED";
taskToDomain?: {
[key: string]: string;
};
tasks?: Array<Task>;
updateTime?: number;
updatedBy?: string;
variables?: {
[key: string]: unknown;
};
workflowDefinition?: WorkflowDef;
workflowId?: string;
workflowName?: string;
workflowVersion?: number;
};
WorkflowStatus
type WorkflowStatus = {
correlationId?: string;
output?: {
[key: string]: unknown;
};
status?:
| "RUNNING"
| "COMPLETED"
| "FAILED"
| "TIMED_OUT"
| "TERMINATED"
| "PAUSED";
variables?: {
[key: string]: unknown;
};
workflowId?: string;
};
ScrollableSearchResultWorkflowSummary
type ScrollableSearchResultWorkflowSummary = {
queryId?: string;
results?: Array<WorkflowSummary>;
totalHits?: number;
};
WorkflowSummary
export type WorkflowSummary = {
correlationId?: string;
createdBy?: string;
endTime?: string;
event?: string;
executionTime?: number;
externalInputPayloadStoragePath?: string;
externalOutputPayloadStoragePath?: string;
failedReferenceTaskNames?: string;
failedTaskNames?: Array<string>;
idempotencyKey?: string;
input?: string;
inputSize?: number;
output?: string;
outputSize?: number;
priority?: number;
reasonForIncompletion?: string;
startTime?: string;
status?:
| "RUNNING"
| "COMPLETED"
| "FAILED"
| "TIMED_OUT"
| "TERMINATED"
| "PAUSED";
taskToDomain?: {
[key: string]: string;
};
updateTime?: string;
version?: number;
workflowId?: string;
workflowType?: string;
};
SkipTaskRequest
type SkipTaskRequest = {
taskInput?: {
[key: string]: unknown;
};
taskOutput?: {
[key: string]: unknown;
};
};
TaskResultStatus
type TaskResultStatus =
| "IN_PROGRESS"
| "FAILED"
| "FAILED_WITH_TERMINAL_ERROR"
| "COMPLETED";
TaskDef
type TaskDef = {
backoffScaleFactor?: number;
baseType?: string;
concurrentExecLimit?: number;
createTime?: number;
createdBy?: string;
description?: string;
enforceSchema?: boolean;
executionNameSpace?: string;
inputKeys?: Array<string>;
inputSchema?: SchemaDef;
inputTemplate?: {
[key: string]: unknown;
};
isolationGroupId?: string;
name: string;
outputKeys?: Array<string>;
outputSchema?: SchemaDef;
ownerApp?: string;
ownerEmail?: string;
pollTimeoutSeconds?: number;
rateLimitFrequencyInSeconds?: number;
rateLimitPerFrequency?: number;
responseTimeoutSeconds?: number;
retryCount?: number;
retryDelaySeconds?: number;
retryLogic?: "FIXED" | "EXPONENTIAL_BACKOFF" | "LINEAR_BACKOFF";
timeoutPolicy?: "RETRY" | "TIME_OUT_WF" | "ALERT_ONLY";
timeoutSeconds: number;
totalTimeoutSeconds: number;
updateTime?: number;
updatedBy?: string;
};
Task
type Task = {
callbackAfterSeconds?: number;
callbackFromWorker?: boolean;
correlationId?: string;
domain?: string;
endTime?: number;
executed?: boolean;
executionNameSpace?: string;
externalInputPayloadStoragePath?: string;
externalOutputPayloadStoragePath?: string;
firstStartTime?: number;
inputData?: {
[key: string]: unknown;
};
isolationGroupId?: string;
iteration?: number;
loopOverTask?: boolean;
outputData?: {
[key: string]: unknown;
};
parentTaskId?: string;
pollCount?: number;
queueWaitTime?: number;
rateLimitFrequencyInSeconds?: number;
rateLimitPerFrequency?: number;
reasonForIncompletion?: string;
referenceTaskName?: string;
responseTimeoutSeconds?: number;
retried?: boolean;
retriedTaskId?: string;
retryCount?: number;
scheduledTime?: number;
seq?: number;
startDelayInSeconds?: number;
startTime?: number;
status?:
| "IN_PROGRESS"
| "CANCELED"
| "FAILED"
| "FAILED_WITH_TERMINAL_ERROR"
| "COMPLETED"
| "COMPLETED_WITH_ERRORS"
| "SCHEDULED"
| "TIMED_OUT"
| "SKIPPED";
subWorkflowId?: string;
subworkflowChanged?: boolean;
taskDefName?: string;
taskDefinition?: TaskDef;
taskId?: string;
taskType?: string;
updateTime?: number;
workerId?: string;
workflowInstanceId?: string;
workflowPriority?: number;
workflowTask?: WorkflowTask;
workflowType?: string;
};
SchemaDef
export type SchemaDef = {
createTime?: number;
createdBy?: string;
data?: {
[key: string]: unknown;
};
externalRef?: string;
name: string;
ownerApp?: string;
type: "JSON" | "AVRO" | "PROTOBUF";
updateTime?: number;
updatedBy?: string;
version: number;
};