前言
在上一篇 grpc 从搭建到使用,详细了解了grpc的基础理论以及使用,本篇将继续深入了解grpc常用的通信模式;
grpc 常用通信模式
grpc为开发者提供了丰富的通信模式,以满足不同场景下业务的需求,其提供的常用的几种通信模式总结如下:
- 一元RPC通信;
- 服务端流式通信;
- 客户端RPC通信;
- 双向RPC通信;
不同的通信模式在实际使用过程中,编码上有所差别,接下来针对几种不同的模式逐一通过实际代码演示下使用过程;
一、一元RPC通信模式
客户端向服务端发送单个请求,并获得单个响应
简单来说,就是一个请求对应一个响应,这种通信模式最简单,也是实际业务中使用非常多的模式,在上一篇我们演示的demo案例,就是标准的一元RPC通信模式;
仍然以上一篇的案例工程为例进行说明,
1、在原始的proto描述文件中增加一个新的方法
2、重新编译并生成下服务相关的类
3、在自定义服务类中覆写 hello方法
@Override
public void hello(NewsProto.StringRequest request, StreamObserver<NewsProto.StringResponse> responseObserver) {
String name = request.getName();
NewsProto.StringResponse response = NewsProto.StringResponse.newBuilder().setResult("hello :" + name).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
4、客户端工程做同样的操作
- proto 文件补充一个方法;
- 重新编译生成服务相关的类文件;
5、编写客户端启动类并调用hello方法
import com.congge.news.proto.NewsProto;
import com.congge.news.proto.NewsServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
public class NewClientV2 {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9988).usePlaintext().build();
try {
NewsServiceGrpc.NewsServiceBlockingStub blockingStub = NewsServiceGrpc.newBlockingStub(managedChannel);
NewsProto.StringResponse response = blockingStub.hello(NewsProto.StringRequest.newBuilder().setName("jerry").build());
System.out.println(response.getResult());
}finally {
managedChannel.shutdown();
}
}
}
6、测试效果
启动服务端
启动客户端执行方法调用
二、服务端流式RPC模式
定义:从客户端发起一次请求,服务端会产生多次响应,比如客户端提供一批数据到服务端,服务端按照一定的规则解析参数之后,处理之后将响应结果以流的形式将结果返回客户端,下面来看具体的操作步骤;
1、定义服务端的proto文件
在这种模式下,最大的特点就是在服务类中的返回值前面,需要添加关键字 stream,表明这是流式RPC;
syntax = "proto3";
option java_multiple_files = false;
option java_package = "com.congge.sms.proto";
option java_outer_classname = "SmsProto";
package sms;
service SmsService {
rpc sendSms(SmsRequest) returns (stream SmsResponse) {}
}
message SmsRequest{
repeated string phoneNumber = 1;
string content = 2;
}
message SmsResponse{
string result = 1;
}
2、生成服务端的相关类
按照上面同样的流程操作即可
3、自定义服务实现
public class SmsService extends SmsServiceGrpc.SmsServiceImplBase{
@Override
public void sendSms(SmsProto.SmsRequest request, StreamObserver<SmsProto.SmsResponse> responseObserver) {
ProtocolStringList phoneNumberList = request.getPhoneNumberList();
for(String phoneNum : phoneNumberList){
SmsProto.SmsResponse response =
SmsProto.SmsResponse.newBuilder().setResult(request.getContent() + ":" + phoneNum + " 已处理").build();
responseObserver.onNext(response);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
responseObserver.onCompleted();
}
}
4、服务启动类上添加上面的服务实现
public class GrpcServer {
public static void main(String[] args) {
try {
Server start = ServerBuilder.forPort(9988)
.addService(new NewsService())
.addService(new SmsService())
.build().start();
System.out.println("GRPC服务端启动,端口号为:" + 9988);
try {
start.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
5、客户端工程中重复同样的过程
- proto 文件补充一个方法(与服务端保持一致即可);
- 重新编译生成服务相关的类文件;
6、客户端工程服务调用
public class NewClientV3 {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9988).usePlaintext().build();
try {
SmsServiceGrpc.SmsServiceBlockingStub smsService = SmsServiceGrpc.newBlockingStub(managedChannel);
Iterator<SmsProto.SmsResponse> iterator = smsService.sendSms(SmsProto.SmsRequest.newBuilder()
.setContent("讨论事情")
.addPhoneNumber("133***551")
.addPhoneNumber("187***226")
.addPhoneNumber("189***772")
.build());
while(iterator.hasNext()){
SmsProto.SmsResponse next = iterator.next();
System.out.println(next.getResult());
}
}finally {
managedChannel.shutdown();
}
}
}
7、分别启动服务端和客户端程序
观察客户端控制台输出结果,即向服务端发起批处理请求,服务端批量响应结果
三、客户端流式RPC模式
定义:顾名思义,即客户端发起了不定次数的请求,但服务端产生一次响应,下面通过代码演示具体的操作过程;
1、服务端 proto文件中增加一个方法定义
和服务端流式RPC模式不同的是,可以从定义文件中发现,在请求的参数体,需要用 stream进行标识;
2、执行编译操作,生成服务类
和上面的同样操作即可
3、修改服务端实现类
重写一下新增的方法
@Override
public StreamObserver<SmsProto.PhoneNumberRequest> createPhone( StreamObserver<SmsProto.PhoneNumberResponse> responseObserver) {
final StreamObserver<SmsProto.PhoneNumberResponse> responseObserverInner = responseObserver;
//异步基于 responseObserver 事件回调
return new StreamObserver<SmsProto.PhoneNumberRequest>() {
int i=0;
public void onNext(SmsProto.PhoneNumberRequest phoneNumberRequest) {
System.out.println(phoneNumberRequest.getPhoneNumber() + " 的手机号已经处理了");
i = i+1;
}
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
//客户端传输完毕,进行消息统计
public void onCompleted() {
responseObserverInner.onNext(SmsProto.PhoneNumberResponse.newBuilder().setResult("本次处理" + i + "个电话号码" ).build());
responseObserverInner.onCompleted();
}
};
}
4、客户端重复上面的过程生成相关的服务类
5、客户端服务调用
public class NewClientV5 {
private SmsServiceGrpc.SmsServiceStub smsServiceStub = null;
public static void main(String[] args) {
NewClientV5 clientV5 = new NewClientV5();
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9988).usePlaintext().build();
clientV5.smsServiceStub = SmsServiceGrpc.newStub(managedChannel);
try{
clientV5.createPhone();
}catch (Exception e){
e.printStackTrace();
}
}
private void createPhone() throws InterruptedException {
StreamObserver<SmsProto.PhoneNumberRequest> requestStreamObserver = smsServiceStub.createPhone(streamObserver);
for(int i=1;i<=10;i++){
SmsProto.PhoneNumberRequest request = SmsProto.PhoneNumberRequest.newBuilder().setPhoneNumber("133***" + i).build();
requestStreamObserver.onNext(request);
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
requestStreamObserver.onCompleted();
Thread.sleep(1000);
}
StreamObserver<SmsProto.PhoneNumberResponse> streamObserver = new StreamObserver<SmsProto.PhoneNumberResponse>(){
public void onNext(SmsProto.PhoneNumberResponse phoneNumberResponse) {
System.out.println(phoneNumberResponse.getResult());
}
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
public void onCompleted() {
System.out.println("处理完成");
}
};
}
6、运行两边的程序进行测试
启动服务端,再启动客户端,观察控制台输出
四、双向流式RPC通信
定义:客户端和服务端在请求处理和响应上来说是多对多的关系,即客户端可能一次发来多个请求,而服务端可以同时处理多个请求,逻辑上就构成了一种双向的流式RPC通信模式;
1、服务端 proto 文件添加一个方法
很明显,这种模式下,需要在请求参数和响应结果前面使用 stream进行标识;
2、执行编译生成服务类
3、重写新增的服务方法
@Override
public StreamObserver<SmsProto.PhoneNumberRequest> createAndSendSms(StreamObserver<SmsProto.PhoneNumberResponse> responseObserver) {
final StreamObserver<SmsProto.PhoneNumberResponse> responseObserverInner = responseObserver;
//异步基于 responseObserver 事件回调
return new StreamObserver<SmsProto.PhoneNumberRequest>() {
int i=0;
public void onNext(SmsProto.PhoneNumberRequest phoneNumberRequest) {
System.out.println(phoneNumberRequest.getPhoneNumber() + " 的手机号已经处理了");
responseObserverInner.onNext(SmsProto.PhoneNumberResponse.newBuilder().setResult(phoneNumberRequest.getPhoneNumber() + "手机号已发送小A").build());
responseObserverInner.onNext(SmsProto.PhoneNumberResponse.newBuilder().setResult(phoneNumberRequest.getPhoneNumber() + "手机号已发送小B").build());
responseObserverInner.onNext(SmsProto.PhoneNumberResponse.newBuilder().setResult(phoneNumberRequest.getPhoneNumber() + "手机号已发送小C").build());
i = i+1;
}
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
//客户端传输完毕,进行消息统计
public void onCompleted() {
//responseObserverInner.onNext(SmsProto.PhoneNumberResponse.newBuilder().setResult("本次处理" + i + "个电话号码" ).build());
responseObserverInner.onCompleted();
}
};
}
4、客户端同样执行编译等步骤
5、客户端重写调用服务的方法
public class NewClientV6 {
private SmsServiceGrpc.SmsServiceStub smsServiceStub = null;
public static void main(String[] args) {
NewClientV6 clientV5 = new NewClientV6();
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9988).usePlaintext().build();
clientV5.smsServiceStub = SmsServiceGrpc.newStub(managedChannel);
try{
clientV5.createPhone();
}catch (Exception e){
e.printStackTrace();
}
}
private void createPhone() throws InterruptedException {
StreamObserver<SmsProto.PhoneNumberRequest> requestStreamObserver = smsServiceStub.createAndSendSms(streamObserver);
for(int i=1;i<=10;i++){
SmsProto.PhoneNumberRequest request = SmsProto.PhoneNumberRequest.newBuilder().setPhoneNumber("133***" + i).build();
requestStreamObserver.onNext(request);
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
requestStreamObserver.onCompleted();
Thread.sleep(1000);
}
StreamObserver<SmsProto.PhoneNumberResponse> streamObserver = new StreamObserver<SmsProto.PhoneNumberResponse>(){
public void onNext(SmsProto.PhoneNumberResponse phoneNumberResponse) {
System.out.println(phoneNumberResponse.getResult());
}
public void onError(Throwable throwable) {
}
public void onCompleted() {
System.out.println("处理完成");
}
};
}
6、分别启动服务端和客户端程序进行测试
观察服务端和客户端各自的输出结果
从效果来说,客户端连续不断的发来多次请求,对于服务端来说,通过 StreamObserver 这个流式对象返回每一次的请求结果,而内部则是通过异步的处理方式完成每次请求的处理。
以上,通过代码演示了gRPC常用的4种通信模式,实际使用中需要结合业务场景具体选择,本篇到此结束,感谢观看!