java版gRPC实战之五:双向流,rabbitmq持久化原理

java版gRPC实战之五:双向流,rabbitmq持久化原理,第1张

java版gRPC实战之五:双向流,rabbitmq持久化原理

responseObserver.onNext(DeductReply.newBuilder()

.setCode(code)

.setMessage(message)

.build());

}

@Override

public void onError(Throwable t) {

log.error(“批量减扣库存异常”, t);

}

@Override

public void onCompleted() {

log.info(“批量减扣库存完成,共计[{}]件商品”, totalCount);

responseObserver.onCompleted();

}

};

}

}

开发客户端应用
  • 在父工程grpc-turtorials下面新建名为double-stream-server-side的模块,其build.gradle内容如下:

plugins {

id ‘org.springframework.boot’

}

dependencies {

implementation ‘org.projectlombok:lombok’

implementation ‘org.springframework.boot:spring-boot-starter’

implementation ‘org.springframework.boot:spring-boot-starter-web’

implementation ‘net.devh:grpc-client-spring-boot-starter’

implementation project(’:grpc-lib’)

}

  • 配置文件application.yml,设置自己的web端口号和服务端地址:

server:

port: 8082

spring:

application:

name: double-stream-client-side

grpc:

client:

gRPC配置的名字,GrpcClient注解会用到

double-stream-server-side:

gRPC服务端地址

address: ‘static://127.0.0.1:9901’

enableKeepAlive: true

keepAliveWithoutCalls: true

negotiationType: plaintext

  • 启动类DoubleStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;

  • 正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:

package com.bolingcavalry.grpctutorials;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver extends StreamObserver {

String getExtra();

}

  • 重头戏来了,看看如何远程调用双向流类型的gRPC接口,代码中已经添加详细注释:

package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;

import com.bolingcavalry.grpctutorials.lib.ProductOrder;

import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;

import io.grpc.stub.StreamObserver;

import lombok.extern.slf4j.Slf4j;

import net.devh.boot.grpc.client.inject.GrpcClient;

import org.springframework.stereotype.Service;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

@Service

@Slf4j

public class GrpcClientService {

@GrpcClient(“double-stream-server-side”)

private StockServiceGrpc.StockServiceStub stockServiceStub;

public String batchDeduct(int count) {

CountDownLatch countDownLatch = new CountDownLatch(1);

// responseObserver的onNext和onCompleted会在另一个线程中被执行

// ExtendResponseObserver继承自StreamObserver

ExtendResponseObserver responseObserver = new ExtendResponseObserver() {

// 用stringBuilder保存所有来自服务端的响应

private StringBuilder stringBuilder = new StringBuilder();

@Override

public String getExtra() {

return stringBuilder.toString();

}

@Override

public void onNext(DeductReply value) {

log.info(“batch deduct on next”);

// 放入匿名类的成员变量中

stringBuilder.append(String.format(“返回码[%d],返回信息:%s
” , value.getCode(), value.getMessage()));

}

@Override

public void onError(Throwable t) {

log.error(“batch deduct gRPC request error”, t);

stringBuilder.append("batch deduct gRPC error, " + t.getMessage());

countDownLatch.countDown();

}

@Override

public void onCompleted() {

log.info(“batch deduct on complete”);

// 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,

// 会继续往下执行

countDownLatch.countDown();

}

};

// 远程调用,此时数据还没有给到服务端

StreamObserver requestObserver = stockServiceStub.batchDeduct(responseObserver);

for(int i=0; i

// 每次执行onNext都会发送一笔数据到服务端,

// 服务端的onNext方法都会被执行一次

requestObserver.onNext(build(101 + i, 1 + i));

}

// 客户端告诉服务端:数据已经发完了

requestObserver.onCompleted();

try {

// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,

// 那里会执行countD

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

ownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,

// await的超时时间设置为2秒

countDownLatch.await(2, TimeUnit.SECONDS);

} catch (InterruptedException e) {

log.error(“countDownLatch await error”, e);

}

log.info(“service finish”);

// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得

return responseObserver.getExtra();

}

private static ProductOrder build(int productId, int num) {

return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();

}

}

  • 最后做个web接口,可以通过web请求验证远程调用:

package grpctutorials;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.RestController;

@RestController

public class GrpcClientController {

@Autowired

private GrpcClientService grpcClientService;

@RequestMapping("/")

public String printMessage(@RequestParam(defaultValue = “1”) int count) {

return grpcClientService.batchDeduct(count);

}

}

  • 编码完成,开始验证;
验证
  • 启动服务端DoubleStreamServerSideApplication:

  • 启动客户端DoubleStreamClientSideApplication:

  • 这里要改:浏览器输入http://localhost:8083/?count=10,响应如下,可见远程调用gRPC服务成功,流式响应的每一笔返回都被客户端收到:

  • 下面是服务端日志,可见逐一处理了客户端的每一笔数据:

  • 下面是客户端日志,可见由于CountDownLatch的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted在另一个线程被执行完后,才会继续执行:

  • 至此,四种类型的gRPC服务及其客户端开发就完成了,一般的业务场景咱们都能应付自如,接下来的文章咱们会继续深入学习,了解复杂场景下的gRPC *** 作;
你不孤单,欣宸原创一路相伴

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5682324.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存