responseObserver.onNext(DeductReply.newBuilder()
.setCode(code)
.setMessage(message)
.build());
}
@Override
public void onError(Throwable t) {
log.error(“批量减扣库存异常”, t);
}
@Override
public void onCompleted() {
log.info(“批量减扣库存完成,共计[{}]件商品”, totalCount);
responseObserver.onCompleted();
}
};
}
}
开发客户端应用- 在父工程grpc-turtorials下面新建名为double-stream-server-side的模块,其build.gradle内容如下:
plugins {
id ‘org.springframework.boot’
}
dependencies {
implementation ‘org.projectlombok:lombok’
implementation ‘org.springframework.boot:spring-boot-starter’
implementation ‘org.springframework.boot:spring-boot-starter-web’
implementation ‘net.devh:grpc-client-spring-boot-starter’
implementation project(’:grpc-lib’)
}
- 配置文件application.yml,设置自己的web端口号和服务端地址:
server:
port: 8082
spring:
application:
name: double-stream-client-side
grpc:
client:
gRPC配置的名字,GrpcClient注解会用到double-stream-server-side:
gRPC服务端地址address: ‘static://127.0.0.1:9901’
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext
-
启动类DoubleStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;
-
正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:
package com.bolingcavalry.grpctutorials;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver extends StreamObserver {
String getExtra();
}
- 重头戏来了,看看如何远程调用双向流类型的gRPC接口,代码中已经添加详细注释:
package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class GrpcClientService {
@GrpcClient(“double-stream-server-side”)
private StockServiceGrpc.StockServiceStub stockServiceStub;
public String batchDeduct(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted会在另一个线程中被执行,
// ExtendResponseObserver继承自StreamObserver
ExtendResponseObserver responseObserver = new ExtendResponseObserver() {
// 用stringBuilder保存所有来自服务端的响应
private StringBuilder stringBuilder = new StringBuilder();
@Override
public String getExtra() {
return stringBuilder.toString();
}
@Override
public void onNext(DeductReply value) {
log.info(“batch deduct on next”);
// 放入匿名类的成员变量中
stringBuilder.append(String.format(“返回码[%d],返回信息:%s
” , value.getCode(), value.getMessage()));
}
@Override
public void onError(Throwable t) {
log.error(“batch deduct gRPC request error”, t);
stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
countDownLatch.countDown();
}
@Override
public void onCompleted() {
log.info(“batch deduct on complete”);
// 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,
// 会继续往下执行
countDownLatch.countDown();
}
};
// 远程调用,此时数据还没有给到服务端
StreamObserver requestObserver = stockServiceStub.batchDeduct(responseObserver);
for(int i=0; i // 每次执行onNext都会发送一笔数据到服务端, // 服务端的onNext方法都会被执行一次 requestObserver.onNext(build(101 + i, 1 + i)); } // 客户端告诉服务端:数据已经发完了 requestObserver.onCompleted(); try { // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行, // 那里会执行countD 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》 【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享 ownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了, // await的超时时间设置为2秒 countDownLatch.await(2, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error(“countDownLatch await error”, e); } log.info(“service finish”); // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得 return responseObserver.getExtra(); } private static ProductOrder build(int productId, int num) { return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build(); } } package grpctutorials; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class GrpcClientController { @Autowired private GrpcClientService grpcClientService; @RequestMapping("/") public String printMessage(@RequestParam(defaultValue = “1”) int count) { return grpcClientService.batchDeduct(count); } } 欢迎分享,转载请注明来源:内存溢出
验证
你不孤单,欣宸原创一路相伴
评论列表(0条)