指南:如何管理工作流

管理和运行工作流

现在,您已经在应用程序中编写了工作流及其活动,可以使用HTTP API调用来启动、终止和获取有关工作流的信息。 更多信息,请阅读 工作流 API 参考


在代码中管理工作流。 在编写工作流指南中的工作流示例中,工作流是使用以下API在代码中注册的:

  • start_workflow: 启动一个工作流实例
  • get_workflow: 获取工作流的状态信息
  • pause_workflow: 暂停或中止一个工作流实例,稍后可恢复该实例
  • resume_workflow: 恢复暂停的工作流实例
  • raise_workflow_event: 在工作流中引发事件
  • purge_workflow: 删除与特定工作流实例相关的所有元数据
  • terminate_workflow: 终止或停止工作流的特定实例
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

# Sane parameters
instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"

# Start the workflow
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)

# Get info on the workflow
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)

# Pause the workflow
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)

# Resume the workflow
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)

# Raise an event on the workflow. 
 d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
                    event_name=eventName, event_data=eventData)

# Purge the workflow
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)

# Terminate the workflow
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)

在代码中管理工作流。 在编写工作流指南中的工作流示例中,工作流是使用以下API在代码中注册的:

  • client.workflow.start: 启动一个工作流实例
  • client.workflow.get: 获取工作流的状态信息
  • client.workflow.pause: 暂停或中止一个工作流实例,稍后可恢复该实例
  • client.workflow.resume: 恢复暂停的工作流实例
  • client.workflow.purge: 删除与特定工作流实例相关的所有元数据
  • client.workflow.terminate: 终止或停止工作流的特定实例
import { DaprClient } from "@dapr/dapr";

async function printWorkflowStatus(client: DaprClient, instanceId: string) {
  const workflow = await client.workflow.get(instanceId);
  console.log(
    `Workflow ${workflow.workflowName}, created at ${workflow.createdAt.toUTCString()}, has status ${
      workflow.runtimeStatus
    }`,
  );
  console.log(`Additional properties: ${JSON.stringify(workflow.properties)}`);
  console.log("--------------------------------------------------\n\n");
}

async function start() {
  const client = new DaprClient();

  // Start a new workflow instance
  const instanceId = await client.workflow.start("OrderProcessingWorkflow", {
    Name: "Paperclips",
    TotalCost: 99.95,
    Quantity: 4,
  });
  console.log(`Started workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // Pause a workflow instance
  await client.workflow.pause(instanceId);
  console.log(`Paused workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // Resume a workflow instance
  await client.workflow.resume(instanceId);
  console.log(`Resumed workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // Terminate a workflow instance
  await client.workflow.terminate(instanceId);
  console.log(`Terminated workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // Wait for the workflow to complete, 30 seconds!
  await new Promise((resolve) => setTimeout(resolve, 30000));
  await printWorkflowStatus(client, instanceId);

  // Purge a workflow instance
  await client.workflow.purge(instanceId);
  console.log(`Purged workflow instance ${instanceId}`);
  // This will throw an error because the workflow instance no longer exists.
  await printWorkflowStatus(client, instanceId);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});

在代码中管理工作流。 在OrderProcessingWorkflow示例中,来自编写工作流指南,工作流在代码中注册。 现在,您可以启动、终止正在运行的工作流程,并获取相关信息:

string orderId = "exampleOrderId";
string workflowComponent = "dapr";
string workflowName = "OrderProcessingWorkflow";
OrderPayload input = new OrderPayload("Paperclips", 99.95);
Dictionary<string, string> workflowOptions; // This is an optional parameter

// Start the workflow. This returns back a "StartWorkflowResponse" which contains the instance ID for the particular workflow instance.
StartWorkflowResponse startResponse = await daprClient.StartWorkflowAsync(orderId, workflowComponent, workflowName, input, workflowOptions);

// Get information on the workflow. This response contains information such as the status of the workflow, when it started, and more!
GetWorkflowResponse getResponse = await daprClient.GetWorkflowAsync(orderId, workflowComponent, eventName);

// Terminate the workflow
await daprClient.TerminateWorkflowAsync(orderId, workflowComponent);

// Raise an event (an incoming purchase order) that your workflow will wait for. This returns the item waiting to be purchased.
await daprClient.RaiseWorkflowEventAsync(orderId, workflowComponent, workflowName, input);

// Pause
await daprClient.PauseWorkflowAsync(orderId, workflowComponent);

// Resume
await daprClient.ResumeWorkflowAsync(orderId, workflowComponent);

// Purge the workflow, removing all inbox and history information from associated instance
await daprClient.PurgeWorkflowAsync(orderId, workflowComponent);

在代码中管理工作流。 在Java SDK的工作流示例中,工作流是使用以下API在代码中注册的:

  • scheduleNewWorkflow: 启动新的工作流实例
  • getInstanceState: 获取工作流的状态信息
  • waitForInstanceStart: 暂停或中止一个工作流实例,稍后可恢复该实例
  • raiseEvent: 为正在运行的工作流实例触发事件/任务
  • waitForInstanceCompletion: 等待工作流完成其任务
  • purgeInstance: 删除与特定工作流实例相关的所有元数据
  • terminateWorkflow: 终止工作流程
  • purgeInstance: 删除与特定工作流相关的所有元数据
package io.dapr.examples.workflows;

import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;

// ...
public class DemoWorkflowClient {

  // ...
  public static void main(String[] args) throws InterruptedException {
    DaprWorkflowClient client = new DaprWorkflowClient();

    try (client) {
      // Start a workflow
      String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
      
      // Get status information on the workflow
      WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);

      // Wait or pause for the workflow instance start
      try {
        WorkflowInstanceStatus waitForInstanceStartResult =
            client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
      }

      // Raise an event for the workflow; you can raise several events in parallel
      client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
      client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
      client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
      client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");

      // Wait for workflow to complete running through tasks
      try {
        WorkflowInstanceStatus waitForInstanceCompletionResult =
            client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
      } 

      // Purge the workflow instance, removing all metadata associated with it
      boolean purgeResult = client.purgeInstance(instanceId);

      // Terminate the workflow instance
      client.terminateWorkflow(instanceToTerminateId, null);

    System.exit(0);
  }
}

在代码中管理工作流。 在Go SDK的工作流示例中,工作流是使用以下API在代码中注册的:

  • StartWorkflow: 启动新的工作流实例
  • GetWorkflow: 获取工作流的状态信息
  • PauseWorkflow: 暂停或中止一个工作流实例,稍后可恢复该实例
  • RaiseEventWorkflow: 为正在运行的工作流实例触发事件/任务
  • ResumeWorkflow: 等待工作流完成其任务
  • PurgeWorkflow:删除与特定工作流实例相关的所有元数据
  • TerminateWorkflow: 终止工作流程
// Start workflow
type StartWorkflowRequest struct {
	InstanceID        string // Optional instance identifier
	WorkflowComponent string
	WorkflowName      string
	Options           map[string]string // Optional metadata
	Input             any               // Optional input
	SendRawInput      bool              // Set to True in order to disable serialization on the input
}

type StartWorkflowResponse struct {
	InstanceID string
}

// Get the workflow status
type GetWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

type GetWorkflowResponse struct {
	InstanceID    string
	WorkflowName  string
	CreatedAt     time.Time
	LastUpdatedAt time.Time
	RuntimeStatus string
	Properties    map[string]string
}

// Purge workflow
type PurgeWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// Terminate workflow
type TerminateWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// Pause workflow
type PauseWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// Resume workflow
type ResumeWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// Raise an event for the running workflow
type RaiseEventWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
	EventName         string
	EventData         any
	SendRawData       bool // Set to True in order to disable serialization on the data
}

使用 HTTP 调用管理工作流。 下面的示例插入了 编写工作流示例 中的属性,并随机设置了实例 ID 号。

启动工作流

要使用 ID 12345678 启动工作流,请运行:

POST http://localhost:3500/v1.0-beta1/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678

请注意,工作流实例 ID 只能包含字母数字字符、下划线和破折号。

终止工作流程

要使用 ID 12345678 终止工作流,请运行:

POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/terminate

引发事件

对于支持订阅外部事件的工作流组件(如 Dapr 工作流引擎),您可以使用以下 “raise event” API 向特定工作流实例发送已命名的事件。

POST http://localhost:3500/v1.0-beta1/workflows/<workflowComponentName>/<instanceID>/raiseEvent/<eventName>

eventName 可以是任何函数。

暂停或恢复工作流

若要规划停机时间、等待输入等,可以暂停,然后恢复工作流。 要暂停一个带有ID 12345678的工作流,直到触发恢复,请运行:

POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/pause

要恢复一个带有 ID 12345678 的工作流,请运行:

POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/resume

清除工作流

清除 API 可用于从底层状态存储中永久删除工作流元数据,包括任何已存储的输入、输出和工作流历史记录。 这通常有助于实施数据保留政策和释放资源。

只有处于 “已完成”、“已失败 “或 “已终止 “状态的工作流实例才能被清除。 如果工作流处于任何其他状态,调用清除会返回错误。

POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/purge

获取工作流程信息

要获取工作流信息(输出和输入)的 ID 为 12345678,请运行:

GET http://localhost:3500/v1.0-beta1/workflows/dapr/12345678

了解有关这些HTTP调用的更多信息,请参阅工作流API参考指南

下一步