文章目录
  1. 1. 如何定义双向流呢?
  2. 2. 实操一把
    1. 2.1. 案例介绍
    2. 2.2. 服务IDL定义
    3. 2.3. 编写服务端–实现服务逻辑
    4. 2.4. 编写服务端–绑定服务实现类
    5. 2.5. 编写客户端–实现客户端逻辑
    6. 2.6. 案例运行
    7. 2.7. 客户端流式RPC机理
  3. 3. 总结

今天来实战一下gRPC的特性,双向流。

首先认识一下,什么是双向流?

所谓双向流式 RPC ,是由客户端调用方法来初始化,而服务端接收到客户端的元数据,方法名和截止时间。

并且服务端可以选择发送回它的初始元数据或等待客户端发送请求的一种通信方式。

双向流,特点在于双向,也就是请求响应如何起作用是完全取决于应用怎么处理,因为客户端和服务端能在任意顺序上读写 ,也就是说这些流的操作是完全独立的。

例如服务端可以一直等待,直到它接收到所有客户端的消息才写应答,或者服务端和客户端可以像”乒乓球”一样:服务端后得到一个请求就回送一个应答,接着客户端根据应答来发送另一个请求,以此类推。

通俗地说,客户端发送了 N 个请求,服务端返回 N 个或者 M 个响应,该特性能够充分利用 HTTP/2.0 的多路复用功能。

某个时刻,HTTP/2.0 链路上可以既有请求也有响应,实现了全双工通信(对比单行道和双向车道),双向流式RPC通信用一个简单的图表示如下:

grpc双向流.png

如何定义双向流呢?

一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:

比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。开发者可以通过在请求和响应前加 stream 关键字去制定方法的类型。

我们可以在服务的IDL定义文件proto中按照如下方式声明双向流式RPC接口。

1
2
3
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

声明方式为在请求和响应之前都添加关键字 stream

实操一把

有了基本的概念,我们还是直接上手实操一下。

案例介绍

本案例为一个简单的应答接口,客户端向服务端发送问候语,服务端接受问候并返回服务端的问候。

由于是双向流,所以我们可以在一次接口调用中,发送多次问候。

运行的效果就是客户端与服务端的日志是交替打印的,也就是说服务端在客户端调用接口的过程中就可以逐步发送响应结果给客户端,而不是像阻塞式请求一样,等待客户端请求发送完毕,再统一一次性返回接口。

这种双向流的接口处理模式的好处是显而易见的:

  • 如果传输的数据包过大,客户端可以将请求包拆分为多个小包发送至服务端。服务端依次处理小包,发送过程与处理过程互不干扰,互不依赖。
  • 服务端不需要等待客户端包全部发送,才能处理以及响应。

服务IDL定义

首先还是需要编写定义proto接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.snowalker.grpc.sdk.stream";
option java_outer_classname = "DoubleStreamProto";
// 服务IDL定义
service DoubleStreamService {
rpc chat (stream ChatRequest) returns (stream ChatResponse) {
}
}
// 请求
message ChatRequest {
int32 userId = 1;
string msg = 2;
}
// 响应
message ChatResponse {
int32 userId = 1;
string msg = 2;
}

重点关注chat接口定义,同时声明请求与响应为stream类型,标记这是一个双向流RPC。

对该proto文件进行编译,所在工程grpc-demo-sdk根路径下执行:

mvn clean compile -DskipTests

编写服务端–实现服务逻辑

服务端编写与普通的RPC接口相同,也是需要继承gRPC生成的XXXXGrpc.XXXXXImplBase,具体代码如下:

1
public class DoubleStreamServiceImpl extends DoubleStreamServiceGrpc.DoubleStreamServiceImplBase {

重写chat方法,实现服务端逻辑,可以看到,业务逻辑同阻塞式接口不同,这里将业务逻辑写在了onNext回调方法中,因此我们可以知道这是一种异步回调机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* @param responseObserver
* @return
*/
@Override
public StreamObserver<ChatRequest> chat(StreamObserver<ChatResponse> responseObserver) {
return new StreamObserver<ChatRequest>() {
@Override
public void onNext(ChatRequest chatRequest) {
int userId = chatRequest.getUserId();
String msg = chatRequest.getMsg();
logger.info("[DoubleStreamServiceImpl] 服务端处理开始....");
logger.info("[DoubleStreamServiceImpl] 客户端说: [" + msg + "]");
responseObserver.onNext(ChatResponse.newBuilder()
.setUserId(chatRequest.getUserId())
.setMsg("这是一条来自[服务端]的消息: 你好,收到了-" + userId + " 的消息. " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ssSSS").format(new Date()) + "\n")
.build());
}
@Override
public void onError(Throwable throwable) {
logger.warning("[DoubleStreamServiceImpl] gRPC dealing error");
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

构造 Streaming 响应对象 StreamObserver并实现 onNext 等接口,由于服务端也是 Streaming模式,因此响应是多个的,也就是说 onNext 会被调用多次。

在onNext方法中编写服务端业务逻辑,这里主要做的就是取出请求体ChatRequest中的userId,以及msg,打印输出,构造响应对象ChatResponse,并return。

实现 onCompleted 方法,调用 responseObserver.onCompleted() 将请求返回客户端。

编写服务端–绑定服务实现类

同一般的阻塞式RPC接口相同,想要使双向流式RPC生效,还需要注册服务实现类到服务端ServerBuilder中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@SneakyThrows
private void startServer() {
int serverPort = 10881;
server = ServerBuilder.forPort(serverPort)
// 上文中的报价服务实现类
.addService(new OrderServiceImpl())
// 添加双向流式RPC实现
.addService(new DoubleStreamServiceImpl())
.build();
server.start();
logger.info("OrderServerBoot started, listening on:" + serverPort);
// 优雅停机
addGracefulShowdownHook();
}

编写客户端–实现客户端逻辑

接着编写客户端逻辑。

双向流式RPC的客户端实现方式与传统阻塞式客户端也是不同的。

首先定义接口ExtendResponseObserver,继承StreamObserver,用于返回文本格式的响应体,方便观察。

1
2
3
public interface ExtendResponseObserver<T> extends StreamObserver<T> {
String getExtra();
}

接着编写客户端逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* @author snowalker
* @version 1.0
* @date 2022/3/16 23:49
* @className
* @desc
*/
public class DoubleStreamClient {
private static final Logger logger = Logger.getLogger(DoubleStreamClient.class.getName());
private final DoubleStreamServiceGrpc.DoubleStreamServiceStub doubleStreamServiceStub;
public DoubleStreamClient(ManagedChannel channel) {
doubleStreamServiceStub = DoubleStreamServiceGrpc.newStub(channel);
}
public String chat(String msg, int user, int count) {
ExtendResponseObserver<ChatResponse> chatResponseStreamObserver = new ExtendResponseObserver<ChatResponse>() {
@Override
public String getExtra() {
return stringBuilder.toString();
}
// 用stringBuilder保存所有来自服务端的响应
private StringBuilder stringBuilder = new StringBuilder();
@Override
public void onNext(ChatResponse chatResponse) {
logger.info("[DoubleStreamClient] onNext.....");
// 放入匿名类的成员变量中
System.out.println(chatResponse.getMsg());
stringBuilder.append(String.format("服务端响应:%s<br>, 用户:%d" , chatResponse.getMsg(), chatResponse.getUserId()));
}
@Override
public void onError(Throwable throwable) {
logger.warning("[DoubleStreamClient] gRPC request error");
stringBuilder.append("[DoubleStreamClient]chat gRPC error, " + throwable.getMessage());
}
@Override
public void onCompleted() {
logger.info("[DoubleStreamClient] onCompleted");
}
};
// 重点!!!! RPC调用发起
StreamObserver<ChatRequest> chatRequestStreamObserver = doubleStreamServiceStub.chat(chatResponseStreamObserver);
for(int i = 0; i < count; i++) {
// 每次执行onNext都会发送一笔数据到服务端,
// 服务端的onNext方法都会被执行一次
ChatRequest chatRequest = ChatRequest.newBuilder()
.setUserId(user)
.setMsg("这是一条来自客户端的消息: 你好," + user + new SimpleDateFormat("yyyy-MM-dd HH:mm:ssSSS").format(new Date()))
.buildPartial();
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
chatRequestStreamObserver.onNext(chatRequest);
}
// 客户端告诉服务端:数据已经发完了
chatRequestStreamObserver.onCompleted();
logger.info("service finish");
return chatResponseStreamObserver.getExtra();
}
}

解释下代码:

  • 客户端首先定义ExtendResponseObserver实例,并实现onNext、onError、onCompleted、getExtra等回调方法,用于处理服务端响应、异常情况、请求完成、返回字符形式的响应;

  • 客户端通过在循环中调用 requestObserver 的 onNext 方法,发送请求消息到服务端;

    1
    2
    // 重点!!!! RPC调用发起
    StreamObserver<ChatRequest> chatRequestStreamObserver = doubleStreamServiceStub.chat(chatResponseStreamObserver);
  • 当RPC请求发送完成之后,通过调用 onCompleted() 通知服务端数据已经发送完成;

  • 需要特别注意,客户端发送请求的stub为流式stub为DoubleStreamServiceGrpc.newStub(channel); 而非newBlockingStub。

案例运行

案例运行完毕,我们运行看看效果。

本文开始,运行案例相关展示尝试通过gif动态展示:

run.gif

  • 先后启动了服务端与客户端,
  • 客户端在一次调用中循环发送chat请求到服务端
  • 服务端边处理请求边响应给客户端
  • 可以看到服务端请求日志在一次调用中是边处理边打印的,客户端日志也是逐步输出的
  • 表明双向流式RPC是异步的、高效的、非阻塞的。

客户端流式RPC机理

稍微翻看源码,可以发现,实际上客户端底层 onNext 方法调用了 ClientCall 的消息发送方法,代码如下(CallToStreamObserverAdapter 类):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
private boolean frozen;
private final ClientCall<T, ?> call;
private Runnable onReadyHandler;
private boolean autoFlowControlEnabled = true;
public CallToStreamObserverAdapter(ClientCall<T, ?> call) {
this.call = call;
}
private void freeze() {
this.frozen = true;
}
@Override
public void onNext(T value) {
call.sendMessage(value);
}

特别注意的是,对于双向 Streaming 模式,只支持异步调用方式。

总结

本文我们主要了解了gRPC的双向流式调用,对于这种流模式调用,可以充分利用 HTTP/2.0 协议的多路复用功能,实现在在一条 HTTP 链路上并行双向传输数据(全双工),它可以有效解决 HTTP/1.X 的数据单向传输问题,在大幅减少 HTTP 连接的情况下充分利用单条链路的性能,其性能可以媲美传统的 RPC 私有长连接协议:即通过更少的链路,实现更高的性能。

下篇文章,我们将为我们的grpc通信加入基于Nacos的服务注册发现能力,不见不散。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 如何定义双向流呢?
  2. 2. 实操一把
    1. 2.1. 案例介绍
    2. 2.2. 服务IDL定义
    3. 2.3. 编写服务端–实现服务逻辑
    4. 2.4. 编写服务端–绑定服务实现类
    5. 2.5. 编写客户端–实现客户端逻辑
    6. 2.6. 案例运行
    7. 2.7. 客户端流式RPC机理
  3. 3. 总结
Fork me on GitHub