客户端
Dapr 客户端包使您能够从 Java 应用程序与其他 Dapr 应用程序进行交互。
注意
如果您还没有尝试过,请尝试其中一个快速入门,以快速了解如何使用 Dapr Java SDK 和 API 构建块。前提条件
初始化客户端
您可以这样初始化 Dapr 客户端:
DaprClient client = new DaprClientBuilder().build();
这会连接到默认的 Dapr gRPC 端点 localhost:50001
。
环境变量
Dapr Sidecar 端点
您可以使用标准化的 DAPR_GRPC_ENDPOINT
和 DAPR_HTTP_ENDPOINT
环境变量来指定不同的 gRPC 或 HTTP 端点。当设置了这些变量时,客户端将自动使用它们连接到 Dapr sidecar。
旧的环境变量 DAPR_HTTP_PORT
和 DAPR_GRPC_PORT
仍然受支持,但 DAPR_GRPC_ENDPOINT
和 DAPR_HTTP_ENDPOINT
优先。
Dapr API 令牌
如果您的 Dapr 实例需要 DAPR_API_TOKEN
环境变量,您可以在环境中设置,客户端会自动使用。
您可以在这里阅读更多关于 Dapr API 令牌认证的信息。
错误处理
最初,Dapr 中的错误遵循标准 gRPC 错误模型。为了提供更详细的信息,在 1.13 版本中引入了一个增强的错误模型,与 gRPC 的丰富错误模型对齐。Java SDK 扩展了 DaprException,以包含 Dapr 中添加的错误详细信息。
使用 Dapr Java SDK 处理 DaprException 并消费错误详细信息的示例:
...
try {
client.publishEvent("unknown_pubsub", "mytopic", "mydata").block();
} catch (DaprException exception) {
System.out.println("Dapr 异常的错误代码: " + exception.getErrorCode());
System.out.println("Dapr 异常的消息: " + exception.getMessage());
// DaprException 现在通过 `getStatusDetails()` 提供来自 Dapr 运行时的更多错误详细信息。
System.out.println("Dapr 异常的原因: " + exception.getStatusDetails().get(
DaprErrorDetails.ErrorDetailType.ERROR_INFO,
"reason",
TypeRef.STRING));
}
...
构建块
Java SDK 允许您与所有 Dapr 构建块进行接口交互。
调用服务
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
try (DaprClient client = (new DaprClientBuilder()).build()) {
// 调用 'GET' 方法 (HTTP) 跳过序列化: 返回类型为 Mono<byte[]>
// 对于 gRPC 设置 HttpExtension.NONE 参数
response = client.invokeMethod(SERVICE_TO_INVOKE, METHOD_TO_INVOKE, "{\"name\":\"World!\"}", HttpExtension.GET, byte[].class).block();
// 调用 'POST' 方法 (HTTP) 跳过序列化: 返回类型为 Mono<byte[]>
response = client.invokeMethod(SERVICE_TO_INVOKE, METHOD_TO_INVOKE, "{\"id\":\"100\", \"FirstName\":\"Value\", \"LastName\":\"Value\"}", HttpExtension.POST, byte[].class).block();
System.out.println(new String(response));
// 调用 'POST' 方法 (HTTP) 带序列化: 返回类型为 Mono<Employee>
Employee newEmployee = new Employee("Nigel", "Guitarist");
Employee employeeResponse = client.invokeMethod(SERVICE_TO_INVOKE, "employees", newEmployee, HttpExtension.POST, Employee.class).block();
}
- 有关服务调用的完整指南,请访问 How-To: Invoke a service。
- 访问 Java SDK 示例 以获取代码示例和尝试服务调用的说明
保存和获取应用程序状态
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import reactor.core.publisher.Mono;
try (DaprClient client = (new DaprClientBuilder()).build()) {
// 保存状态
client.saveState(STATE_STORE_NAME, FIRST_KEY_NAME, myClass).block();
// 获取状态
State<MyClass> retrievedMessage = client.getState(STATE_STORE_NAME, FIRST_KEY_NAME, MyClass.class).block();
// 删除状态
client.deleteState(STATE_STORE_NAME, FIRST_KEY_NAME).block();
}
- 有关状态操作的完整列表,请访问 How-To: Get & save state。
- 访问 Java SDK 示例 以获取代码示例和尝试状态管理的说明
发布和订阅消息
发布消息
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Metadata;
import static java.util.Collections.singletonMap;
try (DaprClient client = (new DaprClientBuilder()).build()) {
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message, singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
}
订阅消息
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class SubscriberController {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<?> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
if (bulkMessage.getEntries().size() == 0) {
return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
}
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
批量发布消息
注意: API 处于 Alpha 阶段
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class Solution {
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// 创建要发布的消息列表
List<String> messages = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}
// 使用批量发布 API 发布消息列表
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block()
}
}
}
- 有关发布消息和订阅主题的完整指南,请访问 How-To: Publish & subscribe。
- 访问 Java SDK 示例 以获取代码示例和尝试发布/订阅
与输出绑定交互
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
try (DaprClient client = (new DaprClientBuilder()).build()) {
// 发送带有消息的类; BINDING_OPERATION="create"
client.invokeBinding(BINDING_NAME, BINDING_OPERATION, myClass).block();
// 发送纯字符串
client.invokeBinding(BINDING_NAME, BINDING_OPERATION, message).block();
}
- 有关输出绑定的完整指南,请访问 How-To: Output bindings。
- 访问 Java SDK 示例 以获取代码示例和尝试输出绑定。
与输入绑定交互
import org.springframework.web.bind.annotation.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RestController
@RequestMapping("/")
public class myClass {
private static final Logger log = LoggerFactory.getLogger(myClass);
@PostMapping(path = "/checkout")
public Mono<String> getCheckout(@RequestBody(required = false) byte[] body) {
return Mono.fromRunnable(() ->
log.info("Received Message: " + new String(body)));
}
}
- 有关输入绑定的完整指南,请访问 How-To: Input bindings。
- 访问 Java SDK 示例 以获取代码示例和尝试输入绑定。
检索秘密
import com.fasterxml.jackson.databind.ObjectMapper;
importio.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import java.util.Map;
try (DaprClient client = (new DaprClientBuilder()).build()) {
Map<String, String> secret = client.getSecret(SECRET_STORE_NAME, secretKey).block();
System.out.println(JSON_SERIALIZER.writeValueAsString(secret));
}
- 有关秘密的完整指南,请访问 How-To: Retrieve secrets。
- 访问 Java SDK 示例 以获取代码示例和尝试检索秘密
Actors
actor 是一个具有单线程执行的隔离、独立的计算和状态单元。Dapr 提供了一种基于 虚拟 actor 模式 的 actor 实现,该模式提供了单线程编程模型,并且当 actor 不在使用时会被垃圾回收。使用 Dapr 的实现,您可以根据 actor 模型编写 Dapr actor,Dapr 利用底层平台提供的可扩展性和可靠性。
import io.dapr.actors.ActorMethod;
import io.dapr.actors.ActorType;
import reactor.core.publisher.Mono;
@ActorType(name = "DemoActor")
public interface DemoActor {
void registerReminder();
@ActorMethod(name = "echo_message")
String say(String something);
void clock(String message);
@ActorMethod(returns = Integer.class)
Mono<Integer> incrementAndGet(int delta);
}
- 有关 actor 的完整指南,请访问 How-To: Use virtual actors in Dapr。
- 访问 Java SDK 示例 以获取代码示例和尝试 actor
获取和订阅应用程序配置
注意这是一个预览 API,因此只能通过 DaprPreviewClient 接口访问,而不能通过普通的 DaprClient 接口访问
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// 获取单个键的配置
Mono<ConfigurationItem> item = client.getConfiguration(CONFIG_STORE_NAME, CONFIG_KEY).block();
// 获取多个键的配置
Mono<Map<String, ConfigurationItem>> items =
client.getConfiguration(CONFIG_STORE_NAME, CONFIG_KEY_1, CONFIG_KEY_2);
// 订阅配置更改
Flux<SubscribeConfigurationResponse> outFlux = client.subscribeConfiguration(CONFIG_STORE_NAME, CONFIG_KEY_1, CONFIG_KEY_2);
outFlux.subscribe(configItems -> configItems.forEach(...));
// 取消订阅配置更改
Mono<UnsubscribeConfigurationResponse> unsubscribe = client.unsubscribeConfiguration(SUBSCRIPTION_ID, CONFIG_STORE_NAME)
}
- 有关配置操作的完整列表,请访问 How-To: Manage configuration from a store。
- 访问 Java SDK 示例 以获取代码示例和尝试不同的配置操作。
查询保存的状态
注意这是一个预览 API,因此只能通过 DaprPreviewClient 接口访问,而不能通过普通的 DaprClient 接口访问
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.client.domain.query.Sorting;
import io.dapr.client.domain.query.filters.EqFilter;
try (DaprClient client = builder.build(); DaprPreviewClient previewClient = builder.buildPreviewClient()) {
String searchVal = args.length == 0 ? "searchValue" : args[0];
// 创建 JSON 数据
Listing first = new Listing();
first.setPropertyType("apartment");
first.setId("1000");
...
Listing second = new Listing();
second.setPropertyType("row-house");
second.setId("1002");
...
Listing third = new Listing();
third.setPropertyType("apartment");
third.setId("1003");
...
Listing fourth = new Listing();
fourth.setPropertyType("apartment");
fourth.setId("1001");
...
Map<String, String> meta = new HashMap<>();
meta.put("contentType", "application/json");
// 保存状态
SaveStateRequest request = new SaveStateRequest(STATE_STORE_NAME).setStates(
new State<>("1", first, null, meta, null),
new State<>("2", second, null, meta, null),
new State<>("3", third, null, meta, null),
new State<>("4", fourth, null, meta, null)
);
client.saveBulkState(request).block();
// 创建查询和查询状态请求
Query query = new Query()
.setFilter(new EqFilter<>("propertyType", "apartment"))
.setSort(Arrays.asList(new Sorting("id", Sorting.Order.DESC)));
QueryStateRequest request = new QueryStateRequest(STATE_STORE_NAME)
.setQuery(query);
// 使用预览客户端调用查询状态 API
QueryStateResponse<MyData> result = previewClient.queryState(request, MyData.class).block();
// 查看查询状态响应
System.out.println("Found " + result.getResults().size() + " items.");
for (QueryStateItem<Listing> item : result.getResults()) {
System.out.println("Key: " + item.getKey());
System.out.println("Data: " + item.getValue());
}
}
- 有关查询状态的完整指南,请访问 How-To: Query state。
- 访问 Java SDK 示例 以获取完整代码示例。
分布式锁
package io.dapr.examples.lock.grpc;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import reactor.core.publisher.Mono;
public class DistributedLockGrpcClient {
private static final String LOCK_STORE_NAME = "lockstore";
/**
* 执行各种方法以检查不同的 API。
*
* @param args 参数
* @throws Exception 抛出异常
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
tryLock(client);
unlock(client);
}
}
/**
* 尝试获取锁。
*
* @param client DaprPreviewClient 对象
*/
public static void tryLock(DaprPreviewClient client) {
System.out.println("*******尝试获取一个空闲的分布式锁********");
try {
LockRequest lockRequest = new LockRequest(LOCK_STORE_NAME, "resouce1", "owner1", 5);
Mono<Boolean> result = client.tryLock(lockRequest);
System.out.println("Lock result -> " + (Boolean.TRUE.equals(result.block()) ? "SUCCESS" : "FAIL"));
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
/**
* 解锁。
*
* @param client DaprPreviewClient 对象
*/
public static void unlock(DaprPreviewClient client) {
System.out.println("*******解锁一个分布式锁********");
try {
UnlockRequest unlockRequest = new UnlockRequest(LOCK_STORE_NAME, "resouce1", "owner1");
Mono<UnlockResponseStatus> result = client.unlock(unlockRequest);
System.out.println("Unlock result ->" + result.block().name());
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
}
- 有关分布式锁的完整指南,请访问 How-To: Use a Lock
- 访问 Java SDK 示例 以获取完整代码示例。
工作流
package io.dapr.examples.workflows;
import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 有关设置说明,请参阅 README。
*/
public class DemoWorkflowClient {
/**
* 主方法。
*
* @param args 输入参数(未使用)。
* @throws InterruptedException 如果程序被中断。
*/
public static void main(String[] args) throws InterruptedException {
DaprWorkflowClient client = new DaprWorkflowClient();
try (client) {
String separatorStr = "*******";
System.out.println(separatorStr);
String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
System.out.printf("Started new workflow instance with random ID: %s%n", instanceId);
System.out.println(separatorStr);
System.out.println("**GetInstanceMetadata:Running Workflow**");
WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);
System.out.printf("Result: %s%n", workflowMetadata);
System.out.println(separatorStr);
System.out.println("**WaitForInstanceStart**");
try {
WorkflowInstanceStatus waitForInstanceStartResult =
client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
System.out.printf("Result: %s%n", waitForInstanceStartResult);
} catch (TimeoutException ex) {
System.out.printf("waitForInstanceStart has an exception:%s%n", ex);
}
System.out.println(separatorStr);
System.out.println("**SendExternalMessage**");
client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
System.out.println(separatorStr);
System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **");
client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");
System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId);
System.out.println(separatorStr);
System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **");
client.raiseEvent(instanceId, "e2", "event 2 Payload");
System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId);
System.out.println(separatorStr);
System.out.println("**WaitForInstanceCompletion**");
try {
WorkflowInstanceStatus waitForInstanceCompletionResult =
client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
System.out.printf("Result: %s%n", waitForInstanceCompletionResult);
} catch (TimeoutException ex) {
System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex);
}
System.out.println(separatorStr);
System.out.println("**purgeInstance**");
boolean purgeResult = client.purgeInstance(instanceId);
System.out.printf("purgeResult: %s%n", purgeResult);
System.out.println(separatorStr);
System.out.println("**raiseEvent**");
String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId);
client.raiseEvent(eventInstanceId, "TestException", null);
System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId);
System.out.println(separatorStr);
String instanceToTerminateId = "terminateMe";
client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId);
System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId);
TimeUnit.SECONDS.sleep(5);
System.out.println("Terminate this workflow instance manually before the timeout is reached");
client.terminateWorkflow(instanceToTerminateId, null);
System.out.println(separatorStr);
String restartingInstanceId = "restarting";
client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId);
System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId);
System.out.println("Sleeping 30 seconds to restart the workflow");
TimeUnit.SECONDS.sleep(30);
System.out.println("**SendExternalMessage: RestartEvent**");
client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
System.out.println("Sleeping 30 seconds to terminate the eternal workflow");
TimeUnit.SECONDS.sleep(30);
client.terminateWorkflow(restartingInstanceId, null);
}
System.out.println("Exiting DemoWorkflowClient.");
System.exit(0);
}
}
- 有关工作流的完整指南,请访问:
- 了解更多关于如何使用 Java SDK 使用工作流。
Sidecar API
等待 sidecar
DaprClient
还提供了一个辅助方法来等待 sidecar 变得健康(仅限组件)。使用此方法时,请确保指定超时时间(以毫秒为单位)并使用 block() 来等待反应操作的结果。
// 在尝试使用 Dapr 组件之前,等待 Dapr sidecar 报告健康。
try (DaprClient client = new DaprClientBuilder().build()) {
System.out.println("Waiting for Dapr sidecar ...");
client.waitForSidecar(10000).block(); // 指定超时时间(以毫秒为单位)
System.out.println("Dapr sidecar is ready.");
...
}
// 在此处执行 Dapr 组件操作,例如获取秘密或保存状态。
关闭 sidecar
try (DaprClient client = new DaprClientBuilder().build()) {
logger.info("Sending shutdown request.");
client.shutdown().block();
logger.info("Ensuring dapr has stopped.");
...
}
了解更多关于 Dapr Java SDK 可用于添加到您的 Java 应用程序的包。
相关链接
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.