Http REST服务改为Thrift服务

Http REST服务改为Thrift服务,第1张

最近的一个项目中,来来回回折腾了几次技术方案,其中的改为Thrift的过程稍微记录一下。

本来是个业务过程很简单的项目,分为两个网络大区的两个服务端,其中的一个大区是传统的B/S架构,基于springboot开发,浏览器访问,没什么问题。另一个大区是C/S架构,我们一开始把服务端用SpringBoot实现了,用的是Springboot+JPA+MySQL,提供的是HTTP REST接口,客户端用的是CentOS+QT+libCurl实现,用libCurl对接服务端的REST接口。两个网络大区间是隔离的,通过正反向隔离装置可以进行文件的收发,因此两个服务端都有定时任务去扫描发送过来的文件。

问题出在CS的服务端,要去部署时,甲方大爷说了HTTP/HTTPS不符合网安的要求,不能用。没办法只能重写,但是又想尽量把之前的东西复用上。查了一下初步确定gRPC和Thrift两个方案,pRPC是HTTP2的,怕还是不行,最终选择了Thrift方案。

比较了一下gPRC和Thrift,其实定位还是有差别的。从最基础的TCP往上,Netty是把基础的TCP做了封装,而Thrift的最基础的网络通信部分和netty有点类似,在上面又加上了序列化和多语言支持的部分,而gRPC是工作在HTTP2协议之上的,主要是序列化和多语言支持的部分。

改造时想把数据库访问、网络大区消息同步这些重用起来,因此还是在原来的Springboot工程之上进行的修改。

首先进行Thrift接口的定义

namespace java com.sifang.ngauto.thrift

struct CommonResult {
  1:bool suc,
  2:string message
}

struct LoginResult {
  1:CommonResult result,
  2:i32 uid,
  3:i32 rid,
  4:string uname,
  5:string nameCh,
  6:i32 pid,
  7:string pname,
  8:string token
}

struct StationInfo {
  1:i32 id,
  2:string name,
  4:string comment
}

//此处省略三千字……

service OpsService {
  CommonResult deviceRegister(1:string devid, 2:string sign, 3:string cert),
  LoginResult userLogin(1:string username, 2:string password, 3:string devid),
  //此处省略三千字……
}

再用thrift工具生成java和C++代码,C++代码留给改造QT客户端用,

thrift-0.16.0.exe -r -gen cpp maintance_service.thrift

thrift-0.16.0.exe -r -gen java maintance_service.thrift

然后来改造Springboot工程,首先将生成的java代码放到工程里面。

新建java类,继承自生成的类,实现业务代码,主要是把之前Service层实现的代码拷贝过来,根据接口的传出类型修改,之前的Repository层的类都可以Autowired进来。

@Service
public class OpsServiceImpl implements OpsService.Iface{

    @Autowired
    InfoSyncUtil infoSyncUtil;
    @Autowired
    private UserRepository userRepository;

    @Override
    public LoginResult userLogin(java.lang.String username, java.lang.String password, java.lang.String devid) throws org.apache.thrift.TException {
    //……业务代码,把之前的Service层的代码考过来
    }

    @Override
    public CommonResult deviceRegister(java.lang.String devid, java.lang.String sign, java.lang.String cert) throws org.apache.thrift.TException {
    }

再把之前的Controller层、Service层的代码都去掉,保留数据库访问Repository和网络消息同步的代码。

然后pom文件中去掉web、swagger等,加上thrift。







    org.apache.thrift
    libthrift
    0.16.0

application.yml中去掉web的端口server.port,加上thrift的端口

#server:
#  port: 8080
thrift:
  port: 8090

 添加Thrift Server,用PostConstruct启动

@Configuration
public class ThriftServer {
    @Value("${thrift.port}")
    private Integer port;

    @Autowired
    private OpsServiceImpl opsService;

    @PostConstruct
    public void start() {
            TProcessor tprocessor = new OpsService.Processor(opsService);
            // 传输通道 - 非阻塞方式
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);

            //多线程半同步半异步
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
            tArgs.processor(tprocessor);

            tArgs.transportFactory(new TFramedTransport.Factory());
            //二进制协议
            tArgs.protocolFactory(new TBinaryProtocol.Factory());

            int numThreads = Math.max(2, Runtime.getRuntime().availableProcessors());
            int selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
            tArgs.workerThreads(numThreads);
            tArgs.selectorThreads(selectorThreads);

            // 多线程半同步半异步的服务模型
            TServer server = new TThreadedSelectorServer(tArgs);
            log.info("Thrift TThreadedSelectorServer start at port: {}, workerThreads: {}, selectorThreads: {} ......", port, numThreads, selectorThreads);
            server.serve(); // 启动服务
        } catch (TTransportException e) {
            log.error("Thrift TThreadedSelectorServer start fail : {}", e.getMessage());
        }
    }
}

服务端改造完成,写个java客户端测试一下,把生成的java代码包含进来,然后写个Client类。

public class TestClient {
    public static void main(String[] args) {
        try {
            TFramedTransport transport =
                    new TFramedTransport(new TSocket("localhost", 8090,2000));
            //TSocket m_transport = new TSocket("127.0.0.1", 8080,2000);//创建一个传输层对象(TTransport),具体采用的传输方式是TFramedTransport,要与服务器端保持一致,
            TBinaryProtocol protocol = new TBinaryProtocol(transport);//创建一个通信协议对象(TProtocol),具体采用的通信协议是二进制协议
            OpsService.Client testClient = new OpsService.Client(protocol);//创建一个Thrift客户端对象(TestThriftService.Client),Thrift的客户端类TestThriftService.Client已经在文件TestThriftService.java中,由Thrift编译器自动为我们生成

            System.out.println("连接服务器");
            transport.open();//打开socket,建立与服务器直接的socket连接

            String token = null;

            System.out.println("1、测试设备注册");
            CommonResult res = testClient.deviceRegister("123456789", "", "");

            if(res.isSuc()) {
                System.out.println("--设备注册成功");
            }
            else {
                System.out.println("--设备注册失败: " + res.getMessage());
            }

            System.out.println("2、用户登录");
            LoginResult res1 = testClient.userLogin("nfdwzd", "fa376b17f37342c3e61d1e06a86c29306fe12bb743b2b1c677dc9ec157242dd4", "123456789");

            if(res1.getResult().isSuc()) {
                token = res1.getToken();
                System.out.println("--用户登录成功,用户id:"+res1.getUid()+",用户单位:"+res1.getPname());
            }
            else {
                System.out.println("--登录失败: " + res1.getResult().getMessage());
            }

            // 此处省略三千字……

            transport.close();//使用完成关闭socket
            }
        catch (TException e){
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    }
}

测试居然失败了!!!

跟踪进服务端,发现进了相应的服务端函数,但是Repository的数据库调用的时候就停止了。然后就是各种调试,分析,包冲突?版本?注入有问题?几个小时没结果,又把JPA换成Mybatis测试一下,这下可以了,但是原先的@Scheduled定时任务又没有执行,突然想到了ThriftServer开始了之后会一直accept导致线程阻塞,把Server修改成一个单独的线程执行。

@Configuration
public class ThriftServer {
    @Value("${thrift.port}")
    private Integer port;

    @Autowired
    private OpsServiceImpl opsService;

    @PostConstruct
    public void start() {
        new Thread() {
            @Override
            public void run() {
                try {
                    TProcessor tprocessor = new OpsService.Processor(opsService);
                    // 传输通道 - 非阻塞方式
                    TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);

                    //多线程半同步半异步
                    TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
                    tArgs.processor(tprocessor);

                    tArgs.transportFactory(new TFramedTransport.Factory());
                    //二进制协议
                    tArgs.protocolFactory(new TBinaryProtocol.Factory());

                    int numThreads = Math.max(2, Runtime.getRuntime().availableProcessors());
                    int selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
                    tArgs.workerThreads(numThreads);
                    tArgs.selectorThreads(selectorThreads);

                    // 多线程半同步半异步的服务模型
                    TServer server = new TThreadedSelectorServer(tArgs);
                    log.info("Thrift TThreadedSelectorServer start at port: {}, workerThreads: {}, selectorThreads: {} ......", port, numThreads, selectorThreads);
                    server.serve(); // 启动服务
                } catch (TTransportException e) {
                    log.error("Thrift TThreadedSelectorServer start fail : {}", e.getMessage());
                }
            }
        }.start();
    }
}

测试通过!!!

还遗留了一些问题,单线程的时候JPA调用被阻塞了,Mybatis却没有,这个有空研究一下。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/876999.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存