开始使用 Dapr Python gRPC 服务扩展

如何启动并运行 Dapr Python gRPC 扩展

Dapr Python SDK 提供了一个用于创建 Dapr 服务的内置 gRPC 服务器扩展 dapr.ext.grpc

安装

您可以通过以下命令下载并安装 Dapr gRPC 服务器扩展:


pip install dapr-ext-grpc

pip3 install dapr-ext-grpc-dev

示例

您可以使用 App 对象来创建一个服务器。

监听服务调用请求

可以使用 InvokeMethodRequestInvokeMethodResponse 对象来处理传入的请求。

以下是一个简单的服务示例,它会监听并响应请求:

from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse

app = App()

@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
    print(request.metadata, flush=True)
    print(request.text(), flush=True)

    return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")

app.run(50051)

完整示例可以在这里找到。

订阅主题

在订阅主题时,您可以指示 dapr 事件是否已被接受,或者是否应该丢弃或稍后重试。

from typing import Optional
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse

app = App()

# 默认的主题订阅
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
    print(event.Data(), flush=True)
    # 返回 None(或不显式返回)等同于返回 TopicEventResponse("success")。
    # 您还可以返回 TopicEventResponse("retry") 以便 dapr 记录消息并稍后重试交付,
    # 或者返回 TopicEventResponse("drop") 以丢弃消息
    return TopicEventResponse("success")

# 使用发布/订阅路由的特定处理程序
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
               rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
    print(event.Data(), flush=True)

# 禁用主题验证的处理程序
@app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
def mytopic_wildcard(event: v1.Event) -> None:
    print(event.Data(), flush=True)

app.run(50051)

完整示例可以在这里找到。

设置输入绑定触发器

from dapr.ext.grpc import App, BindingRequest

app = App()

@app.binding('kafkaBinding')
def binding(request: BindingRequest):
    print(request.text(), flush=True)

app.run(50051)

完整示例可以在这里找到。

相关链接