使用 Dapr Workflow Python SDK 入门

如何使用 Dapr Python SDK 开始并运行工作流

我们来创建一个 Dapr 工作流,并通过控制台调用它。通过提供的 hello world 工作流示例,您将会:

此示例使用 dapr init 的默认配置在本地模式下运行。

在 Python 示例项目中,app.py 文件包含应用程序的设置,其中包括:

  • 工作流定义
  • 工作流活动定义
  • 工作流和工作流活动的注册

先决条件

设置环境

运行以下命令以安装使用 Dapr Python SDK 运行此工作流示例的必要依赖。

pip3 install -r demo_workflow/requirements.txt

克隆 [Python SDK 仓库]。

git clone https://github.com/dapr/python-sdk.git

从 Python SDK 根目录导航到 Dapr 工作流示例。

cd examples/demo_workflow

本地运行应用程序

要运行 Dapr 应用程序,您需要启动 Python 程序和一个 Dapr 辅助进程。在终端中运行:

dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py

注意: 由于 Windows 中未定义 Python3.exe,您可能需要使用 python app.py 而不是 python3 app.py

预期输出

== APP == ==========根据输入开始计数器增加==========

== APP == start_resp exampleInstanceID

== APP == 你好,计数器!
== APP == 新的计数器值是:1!

== APP == 你好,计数器!
== APP == 新的计数器值是:11!

== APP == 你好,计数器!
== APP == 你好,计数器!
== APP == 在暂停调用后从 hello_world_wf 获取响应:已暂停

== APP == 你好,计数器!
== APP == 在恢复调用后从 hello_world_wf 获取响应:运行中

== APP == 你好,计数器!
== APP == 新的计数器值是:111!

== APP == 你好,计数器!
== APP == 实例成功清除

== APP == start_resp exampleInstanceID

== APP == 你好,计数器!
== APP == 新的计数器值是:1112!

== APP == 你好,计数器!
== APP == 新的计数器值是:1122!

== APP == 在终止调用后从 hello_world_wf 获取响应:已终止
== APP == 在终止调用后从 child_wf 获取响应:已终止
== APP == 实例成功清除

发生了什么?

当您运行 dapr run 时,Dapr 客户端:

  1. 注册了工作流 (hello_world_wf) 及其活动 (hello_act)
  2. 启动了工作流引擎
def main():
    with DaprClient() as d:
        host = settings.DAPR_RUNTIME_HOST
        port = settings.DAPR_GRPC_PORT
        workflowRuntime = WorkflowRuntime(host, port)
        workflowRuntime = WorkflowRuntime()
        workflowRuntime.register_workflow(hello_world_wf)
        workflowRuntime.register_activity(hello_act)
        workflowRuntime.start()

        print("==========根据输入开始计数器增加==========")
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

然后 Dapr 暂停并恢复了工作流:

       # 暂停
        d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在暂停调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

        # 恢复
        d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在恢复调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

一旦工作流恢复,Dapr 触发了一个工作流事件并打印了新的计数器值:

        # 触发事件
        d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
                    event_name=eventName, event_data=eventData)

为了从您的状态存储中清除工作流状态,Dapr 清除了工作流:

        # 清除
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("实例成功清除")

然后示例演示了通过以下步骤终止工作流:

  • 使用与已清除工作流相同的 instanceId 启动一个新的工作流。
  • 在关闭工作流之前终止并清除工作流。
        # 启动另一个工作流
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

        # 终止
        d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        sleep(1)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在终止调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

        # 清除
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("实例成功清除")

下一步