如何:实现可插拔组件

学习如何编写和实现可插拔组件

在本指南中,您将学习实现可插拔组件的原因和方法。要了解如何配置和注册可插拔组件,请参阅如何:注册可插拔组件

实现可插拔组件

要实现可插拔组件,需在组件中实现 gRPC 服务。实现 gRPC 服务需要三个步骤:

找到 proto 定义文件

每个支持的服务接口(如状态存储、发布订阅、绑定、密钥存储)都提供了 proto 定义。

目前支持以下组件 API:

  • 状态存储
  • 发布订阅
  • 绑定
  • 密钥存储
组件 类型 gRPC 定义 内置参考实现 文档
状态存储 state state.proto Redis 概念, 如何, API 规范
发布订阅 pubsub pubsub.proto Redis 概念, 如何, API 规范
绑定 bindings bindings.proto Kafka 概念, 输入如何, 输出如何, API 规范
密钥存储 secretstores secretstore.proto Hashicorp/Vault 概念, 如何-secrets, API 规范

以下是可插拔组件状态存储的 gRPC 服务定义片段([state.proto]):

// StateStore 服务为状态存储组件提供 gRPC 接口。
service StateStore {
  // 使用给定的元数据初始化状态存储组件。
  rpc Init(InitRequest) returns (InitResponse) {}
  // 返回已实现的状态存储功能列表。
  rpc Features(FeaturesRequest) returns (FeaturesResponse) {}
  // Ping 状态存储。用于活跃性目的。
  rpc Ping(PingRequest) returns (PingResponse) {}
  
  // 从状态存储中删除指定的键。
  rpc Delete(DeleteRequest) returns (DeleteResponse) {}
  // 从给定的键获取数据。
  rpc Get(GetRequest) returns (GetResponse) {}
  // 设置指定键的值。
  rpc Set(SetRequest) returns (SetResponse) {}

  // 一次删除多个键。
  rpc BulkDelete(BulkDeleteRequest) returns (BulkDeleteResponse) {}
  // 一次检索多个键。
  rpc BulkGet(BulkGetRequest) returns (BulkGetResponse) {}
  // 一次设置多个键的值。
  rpc BulkSet(BulkSetRequest) returns (BulkSetResponse) {}
}

StateStore 服务接口总共公开了 9 个方法:

  • 2 个用于初始化和组件能力广告的方法(Init 和 Features)
  • 1 个用于健康或活跃性检查的方法(Ping)
  • 3 个用于 CRUD 的方法(Get、Set、Delete)
  • 3 个用于批量 CRUD 操作的方法(BulkGet、BulkSet、BulkDelete)

创建服务脚手架

使用 protocol buffers 和 gRPC 工具生成服务的脚手架。通过 gRPC 概念文档了解更多关于这些工具的信息。

这些工具生成针对任何 gRPC 支持的语言的代码。此代码作为您的服务器的基础,并提供:

  • 处理客户端调用的功能
  • 基础设施以:
    • 解码传入请求
    • 执行服务方法
    • 编码服务响应

生成的代码是不完整的。它缺少:

  • 您的目标服务定义的方法的具体实现(您可插拔组件的核心)。
  • 关于如何处理 Unix Socket Domain 集成的代码,这是 Dapr 特有的。
  • 处理与下游服务集成的代码。

在下一步中了解更多关于填补这些空白的信息。

定义服务

提供所需服务的具体实现。每个组件都有一个 gRPC 服务定义,用于其核心功能,与核心组件接口相同。例如:

  • 状态存储

    可插拔状态存储必须提供 StateStore 服务接口的实现。

    除了这个核心功能外,一些组件可能还会在其他可选服务下公开功能。例如,您可以通过定义 QueriableStateStore 服务和 TransactionalStateStore 服务的实现来添加额外功能。

  • 发布订阅

    可插拔发布订阅组件只有一个核心服务接口定义 pubsub.proto。它们没有可选的服务接口。

  • 绑定

    可插拔输入和输出绑定在 bindings.proto 上有一个核心服务定义。它们没有可选的服务接口。

  • 密钥存储

    可插拔密钥存储在 secretstore.proto 上有一个核心服务定义。它们没有可选的服务接口。

在使用 gRPC 和 protocol buffers 工具生成上述状态存储示例的服务脚手架代码后,您可以为 service StateStore 下定义的 9 个方法定义具体实现,以及初始化和与依赖项通信的代码。

这个具体实现和辅助代码是您可插拔组件的核心。它们定义了您的组件在处理来自 Dapr 的 gRPC 请求时的行为。

返回语义错误

返回语义错误也是可插拔组件协议的一部分。组件必须返回对用户应用程序具有语义意义的特定 gRPC 代码,这些错误用于从并发要求到仅信息的各种情况。

错误 gRPC 错误代码 源组件 描述
ETag 不匹配 codes.FailedPrecondition 状态存储 错误映射以满足并发要求
ETag 无效 codes.InvalidArgument 状态存储
批量删除行不匹配 codes.Internal 状态存储

状态管理概述中了解更多关于并发要求的信息。

以下示例演示了如何在您自己的可插拔组件中返回错误,并根据需要更改消息。


重要提示: 为了使用 .NET 进行错误映射,首先安装 Google.Api.CommonProtos NuGet 包

ETag 不匹配

var badRequest = new BadRequest();
var des = "提供的 ETag 字段与存储中的不匹配";
badRequest.FieldViolations.Add(    
   new Google.Rpc.BadRequest.Types.FieldViolation
       {        
         Field = "etag",
         Description = des
       });

var baseStatusCode = Grpc.Core.StatusCode.FailedPrecondition;
var status = new Google.Rpc.Status{    
   Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

ETag 无效

var badRequest = new BadRequest();
var des = "ETag 字段只能包含字母数字字符";
badRequest.FieldViolations.Add(
   new Google.Rpc.BadRequest.Types.FieldViolation
   {
      Field = "etag",
      Description = des
   });

var baseStatusCode = Grpc.Core.StatusCode.InvalidArgument;
var status = new Google.Rpc.Status
{
   Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

批量删除行不匹配

var errorInfo = new Google.Rpc.ErrorInfo();

errorInfo.Metadata.Add("expected", "100");
errorInfo.Metadata.Add("affected", "99");

var baseStatusCode = Grpc.Core.StatusCode.Internal;
var status = new Google.Rpc.Status{
    Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(errorInfo));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

就像 Dapr Java SDK 一样,Java 可插拔组件 SDK 使用 Project Reactor,它为 Java 提供了异步 API。

错误可以通过以下方式直接返回:

  1. 在您的方法返回的 MonoFlux 中调用 .error() 方法
  2. 提供适当的异常作为参数。

您也可以引发异常,只要它被捕获并反馈到您结果的 MonoFlux 中。

ETag 不匹配

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.FAILED_PRECONDITION.value())
    .setMessage("fake-err-msg-for-etag-mismatch")
    .addDetails(Any.pack(BadRequest.FieldViolation.newBuilder()
        .setField("etag")
        .setDescription("提供的 ETag 字段与存储中的不匹配")
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

ETag 无效

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.INVALID_ARGUMENT.value())
    .setMessage("fake-err-msg-for-invalid-etag")
    .addDetails(Any.pack(BadRequest.FieldViolation.newBuilder()
        .setField("etag")
        .setDescription("ETag 字段只能包含字母数字字符")
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

批量删除行不匹配

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.INTERNAL.value())
    .setMessage("fake-err-msg-for-bulk-delete-row-mismatch")
    .addDetails(Any.pack(ErrorInfo.newBuilder()
        .putAllMetadata(Map.ofEntries(
            Map.entry("affected", "99"),
            Map.entry("expected", "100")
        ))
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

ETag 不匹配

st := status.New(codes.FailedPrecondition, "fake-err-msg")
desc := "提供的 ETag 字段与存储中的不匹配"
v := &errdetails.BadRequest_FieldViolation{
	Field:       etagField,
	Description: desc,
}
br := &errdetails.BadRequest{}
br.FieldViolations = append(br.FieldViolations, v)
st, err := st.WithDetails(br)

ETag 无效

st := status.New(codes.InvalidArgument, "fake-err-msg")
desc := "ETag 字段只能包含字母数字字符"
v := &errdetails.BadRequest_FieldViolation{
	Field:       etagField,
	Description: desc,
}
br := &errdetails.BadRequest{}
br.FieldViolations = append(br.FieldViolations, v)
st, err := st.WithDetails(br)

批量删除行不匹配

st := status.New(codes.Internal, "fake-err-msg")
br := &errdetails.ErrorInfo{}
br.Metadata = map[string]string{
	affected: "99",
	expected: "100",
}
st, err := st.WithDetails(br)

下一步