为什么一个星期工作量的工作,我做了一个多月,还没结束 (基于socket的分布式数据处理程序Java版)

为什么一个星期工作量的工作,我做了一个多月,还没结束 (基于socket的分布式数据处理程序Java版),第1张

概述为什么一个星期工作量的工作,我做了一个多月,还没结束 为什么一个简单的小任务,我遇到这么多难题 这是一个HIK平台WIFI数据接入的工作,先看下我的代码提交记录: 首先有这两方面的原因:1、初学Jav

    为什么一个星期工作量的工作,我做了一个多月,还没结束

    为什么一个简单的小任务,我遇到这么多难题

    这是一个HIK平台WIFI数据接入的工作,先看下我的代码提交记录:

     首先有这两方面的原因:1、初学Java时间不长,不够熟练,这个原因浪费的时间并不多。2、与数据提供方沟通浪费的时间,因数据有问题,即deviceid和数据库中的deviceid对不上,导致程序筛选不到数据,需要对方修改相关配置,由于我感觉事情并不急,所以是隔几天催一下,浪费了不少天的时间,最后deviceid依然对不上,我是怎么解决的呢?由于基本上是一个设备布在一个地点,所以我是根据数据中的经纬度来筛选数据的,这种办法其实还是有一点点问题的。

    再说写代码花费的时间,服务部署到现网后,就开始发现BUG了,程序处理数据的速度不够,导致FTP上的ZIP包处理不完大量积压,因为我用的是单线程处理数据,遂改成多线程处理数据。

    经过优化和测试,发现数据处理速度还是不够,FTP上数据产生的速度大约是380条每秒,可能会更多,而单机每秒最多只能处理200多条数据,继续尝试优化,但还是不行。所以我面临第一个重要问题:把程序部署到多台机器,提高数据处理速度,程序该怎么写?

    对于这个问题,经过探索,我最终的解决办法是:把程序部署在5台机器上,程序跑起来后,能拿到机器名称,FTP上的ZIP文件名带有时间戳,根据机器名和时间戳的最后一位,把ZIP文件分配到5台机器处理,这样就解决了数据处理速度不够的问题。

    我想解决的第二个重要问题是:由于我目前做的是大数据维护,学了点Spark和Flink知识,所以我想用Spark改写,已经写好的程序是SpringBoot的,所以我要做的是SpringBoot整合Spark。周末我在家搭了一个Hadoop+Spark的分布式集群环境,写了个纯Spark的Demo跑,正常。然后写了个SpringBoot整合Spark的Demo,本地模式跑,即.setMaster("local[3]"),正常。但是提交到集群跑,web API和swagger在线文档正常,任务跑起来后在Spark Web页面可以看到,但日志报错。我试了3种方式,一种是打war包部署在tomcat里,一种是用命令java -jar运行jar包,一种是用spark-submit命令运行jar包,任务跑起来后都报错了,三种方式错误也不相同,搞到凌晨4点多,没有解决,放弃。不过,就算我成功了,接下来的问题,可能依然无法解决,就是数据处理完后,要推送到Kafka,网上的教学都是教Spark怎么处理Kafka流数据,Kafka数据是作为数据源的,不是作为目的地的,所以我的想法可能本身就是个问题。

    解释一下为什么要SpringBoot整合Spark,只用Spark不用SpringBoot不行吗?实际上之前同事就是这么干的,要么用SpringBoot,要么用Spark或Flink,没有把SpringBoot和二者在一起用过,为什么我要这么做?因为我要读MysqL数据库,我要用JDBC或者Mybatis等,如果不用SpringBoot,有些东西可能要自己搞,不太方便。

    既然SpringBoot整合Spark没有成功,那数据分配不均匀的问题怎么解决(FTP上ZIP文件5分种产生5个文件,2个数据量大,3个不是我需要的数据可以直接删了,但不管是按时间戳这个特征分配,还是按序号这个特征分配,都无法均匀分配,可能会导致一个节点数据积压,另外3个节点没有要处理的数据,即一个节点有难,3个节点围观,虽然时间尺度拉大后,数据分配是均匀的,但是数据处理延迟大了,5分钟产生一个ZIP文件,意味着,有的数据已经延迟处理了5分钟,而我这边还要延迟几分钟到几十分钟不等)?这是我准备解决的第三个重要问题。

    我用Socket解决了这个问题,我把程序部署在5个节点上,一个作为master节点,4个作为worker节点,程序启动后根据机器名判断,确定master节点,master节点从FTP上下载ZIP文件,FTP上ZIP文件可能很多,先只下载一个文件进行处理,一个文件中可能有多达10万条数据,也可能就几万条数据,还可能不是我需要的数据,直接删掉该ZIP文件即可,然后解析数据,再然后把数据集合平均分成4份,通过Socket发送给4个worker节点,worker节点收到数据集合,进行筛选和处理,然后推送到Kafka,数据处理完后,给master节点发送一条消息,可以重复发送几次,再加上是局域网,以确保master节点能收到消息(这个很重要,但这里也有问题),master节点收到4个节点数据处理完成的消息后,删除FTP上的该ZIP文件。然后进行下一次处理,直到FTP上的ZIP文件全部处理完成并删除。我今天写完后,把写ES日志和发Kafka以及删除ZIP文件的代码注释掉,耗时的地方用Thread.sleep代替,部署到真实环境,看能不能稳定跑上一天。我感觉用Socket实现分布式数据处理,虽然能解决数据分配不均匀的问题,但是程序稳定性变差了,如果master节点收不到worker节点发来的数据处理完成的消息怎么办?假设其中一个worker节点的程序挂掉了呢?

    虽然只是一个简单的小任务,我真的是非常努力,如果我最初的设想成功,以后的类似服务都可以这么写,意义重大,可惜没搞成功,退而求其次,用Socket写分布式处理程序,我好像迷迷糊糊明白为什么Spark要依赖Hadoop了,我自己用Socket写漏洞很多啊,没有把数据持久化,万一漰了,数据就丢了。

    有没有大佬给指点一下,我努力的思路是不是有问题,有没有代码又容易写,又不容易写错,程序又稳定可靠的方案?

 

====================== 分隔线 ==============================================================================================

上面的使用Socket的方案存在的问题:1.master节点把大量数据发送到worker节点处理,占用大量带宽,并且耗时。2.master节点需要等待所有worker节点全部处理完成后,才能进行下一次处理,因网络传输耗时和各机器性能差异,导致某些worker节点空闲时间长,浪费大量时间

下面是新的使用Socket的方案,有下列优点:1.master节点和worker节点通信,只传输命令,不传输数据内容,节省带宽,省时可靠。2.worker节点处理完数据,立即请求下一次处理,每一个worker节点得到充分利用

 

最终实现方案(基于socket的分布式数据处理程序Java版):

通过分布式锁,把FTP上的zip文件,均匀的分配到7个worker主机节点进行处理,master节点不处理数据,只负责处理7个woker节点的锁请求与数据处理请求,7个worker节点只要有3个正常工作,就能跟上FTP上数据产生的速度,不会造成zip文件积压,该方式数据处理延迟很小,能够及时处理FTP上产生的数据


大致流程:

1.任务启动,判断当前worker节点在FTP上是否存在私有文件,如果存在,则进入直接处理流程
2.如果私有文件不存在,则worker节点向master节点请求锁
3.master节点收到请求,发放锁
4.worker节点收到锁,重命名FTP上的一个文件为私有文件名,该文件名对其它节点不可见(各worker节点通过正则表达式判断处理),然后释放锁
5.master节点收到释放锁的请求,释放信号量,使其能够继续处理下一个节点的锁请求
6.worker节点释放锁后,下载文件并处理数据,数据处理完,删除私有文件,并再次请求锁,以进行下一个文件的处理

主要实现代码:

1.ReadFtpfileService类:任务启动入口,判断FTP上私有文件是否存在,向master节点请求锁或者请求直接处理

package com.suncreate.wifi.service;import com.suncreate.wifi.hikmodel.socketData; com.suncreate.wifi.tool.@R_404_4807@s; com.suncreate.wifi.tool.socketUtil; org.slf4j.Logger; org.slf4j.LoggerFactory; org.springframework.beans.factory.annotation.Value; org.springframework.stereotype.Service; java.util.ArrayList; java.util.List;@Servicepublic class ReadFtpfileService {    private static final Logger log = LoggerFactory.getLogger(ReadFtpfileService.);    @Value("${ftp.host}")    private String ftpHost;    @Value("${ftp.port}"int ftpPort;    @Value("${ftp.username}" String ftpUsername;    @Value("${ftp.password}" String ftpPassword;    @Value("${node.name}" String nodename;    @Value("${master.ip}" String masterIp;    @Value("${master.name}" String mastername;    /**     * 处理FTP上的zip数据     */    voID ProcessZip() {        try {            if (nodename.equals(mastername)) return; //master节点不处理数据,跳过            log.info("ProcessZip() 开始");            List<String> ftpfileList = getFtpfileList();            if (ftpfileList != null && ftpfileList.size() > 0) {                boolean bl = false;                String filename = null;                for (String ftpfilename : ftpfileList) {                    if (ftpfilename.endsWith(nodename + ".zip")) {                        bl = true;                        filename = ftpfilename;                    }                }                if (bl) {                    SocketUtil.Send(masterIp,new SocketData(nodename,3,filename)); 3:直接处理请求                    ZipProcesstime.updateTime();                } else {                    SocketUtil.Send(masterIp,0)); 0:请求锁                    ZipProcesstime.updateTime();                }            }  {                ZipProcesstime.updateTime();                RunProcessZipThread thread = new RunProcessZipThread(this);                thread.start();            }            log.info("ProcessZip() 结束");        } catch (Exception e) {            log.error("ProcessZip 出错",e);        }    }    private List<String> getFtpfileList() {        List<String> result = new ArrayList<>();        @R_404_4807@s @R_404_4807@s = new @R_404_4807@s();         {            @[email protected](ftpHost,ftpPort,ftpUsername,ftpPassword);            @[email protected]("/");            result = @[email protected]();        }  (Exception e) {            log.error("getFtpfileList() Failed"finally {            @[email protected]();        }        return result;    }}
VIEw Code

2.socketServer类:用于启动Socket服务端线程

 com.suncreate.wifi.model.HotspotInfoCollected; com.suncreate.wifi.model.TermInfocharacteristics; com.suncreate.wifi.task.CheckScheduleConfig; com.suncreate.wifi.tool.SerializeUtil; org.springframework.beans.factory.annotation.autowired;import java.io.*; java.net.ServerSocket; java.net.socket; java.util.List; java.util.concurrent.CountDownLatch; java.util.concurrent.Executors; java.util.concurrent.ThreadPoolExecutor;@Service SocketServer {    final Logger log = LoggerFactory.getLogger(SocketServer.);    @autowired     KafkaSendService kafkaSendService;    @autowired     ReadFtpfileService readFtpfileService;    @Value("${ftp.host}" String ftpPassword;    @Value("${master.ip}" String masterIp;    @Value("${node.name}" String nodename;    @Value("${master.name}"voID start() throws IOException {        ServerSocket serverSocket = new ServerSocket(18060);        log.info("当前节点主机名称:" + nodename);        if (nodename.equals(mastername)) { master节点多起几个线程与worker节点通信            for (int i = 0; i < 7; i++) {                SocketThread socketThread =  SocketThread(serverSocket,kafkaSendService,readFtpfileService,ftpHost,ftpPassword,masterIp,nodename);                socketThread.start();            }        } else { worker节点起一个线程与master通信即可            SocketThread socketThread = VIEw Code

3.socketThread类:Socket服务端接收命令,处理命令

 com.suncreate.wifi.hikmodel.ZipData; org.slf4j.LoggerFactory;import java.util.concurrent.* java.util.regex.Pattern;class SocketThread extends Thread {    final Logger log = LoggerFactory.getLogger(SocketThread.);    static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(256static Semaphore semaphore = new Semaphore(1 ServerSocket serverSocket;     KafkaSendService kafkaSendService;     ReadFtpfileService readFtpfileService;     String ftpHost;     ftpPort;     String ftpUsername;     String ftpPassword;    private String masterIp; master节点IP     String nodename;    public SocketThread(ServerSocket serverSocket,KafkaSendService kafkaSendService,ReadFtpfileService readFtpfileService,String ftpHost, ftpPort,String ftpUsername,String ftpPassword,String masterIp,String nodename) {        this.serverSocket = serverSocket;        this.kafkaSendService = kafkaSendService;        this.readFtpfileService = readFtpfileService;        this.ftpHost = ftpHost;        this.ftpPort = ftpPort;        this.ftpUsername = ftpUsername;        this.ftpPassword = ftpPassword;        this.masterIp = masterIp;        this.nodename = nodename;    }     run() {        while ( {                    Socket socket = serverSocket.accept();                    SocketData socketData = ;                     {                        inputStream inputStream = socket.getinputStream();                        DatainputStream datainputStream =  DatainputStream(inputStream);                        BufferedinputStream bufferedinputStream =  BufferedinputStream(datainputStream);                        ByteArrayOutputStream byteArrayOutputStream =  ByteArrayOutputStream();                        byte[] bytes = new byte[10240];                         len;                        while ((len = bufferedinputStream.read(bytes)) > 0) {                            byteArrayOutputStream.write(bytes,0 (SocketData) SerializeUtil.Deserialize(byteArrayOutputStream.toByteArray());                    }  (Exception e2) {                        log.error("SocketServer socket 出错" {                        socket.close();                    }                    if (socketData == ) {                        continue;                    }                    0:请求锁                    if (socketData.getCommand().equals(0)) {                        log.info("收到节点 " + socketData.getNodename() + " " + socket.getInetAddress().getHostAddress() + " 的命令 0:请求锁");                        semaphore.tryAcquire(301:发放锁                        log.info("向节点 " + socket.getInetAddress().getHostAddress() + " 发放锁");                    }                    1:发放锁                    if (socketData.getCommand().equals(1)) {                        log.info("收到节点 " + socketData.getNodename() + " " + socket.getInetAddress().getHostAddress() + " 的命令 1:发放锁");                        List<String> ftpfileList = getFtpfileList();                        筛选                        String pattern = "^[\\S\\s]*-[0-9]*-[0-9]*-[0-9]*.zip$";                        int i = ftpfileList.size() - 1; i >= 0; i--) {                            String ftpfilename = ftpfileList.get(i);                            boolean bl = Pattern.matches(pattern,ftpfilename);                            if (!bl) {                                ftpfileList.remove(i);                            }                        }                        ) {                            String newFtpfilename = ftpfileList.get(0).replace(".zip","") + "-" + nodename + ".zip";                            renameFtpfile(ftpfileList.get(0),newFtpfilename);                            SocketUtil.Send(socket.getInetAddress().getHostAddress(),2)); 2:释放锁                            byte[] file = downloadFtpfile(newFtpfilename);                            if (file != ) {                                ZipData zipData =  ZipData(file);                                if (zipData.getHotsoptList().size() > 0 || zipData.getTerminfoList().size() > 0) {                                    processZipData(zipData);                                    removeFtpfile(ftpHost,newFtpfilename); 删除FTP上的zip文件                                    SocketUtil.Send(masterIp,1)">0:再次请求锁                                    ZipProcesstime.updateTime();                                }  {                                    removeFtpfile(ftpHost,1)">                                    RunProcessZipThread thread =  RunProcessZipThread(readFtpfileService);                                    thread.start();                                }                            }  {                                RunProcessZipThread thread =  RunProcessZipThread(readFtpfileService);                                thread.start();                            }                        }  {                            SocketUtil.Send(socket.getInetAddress().getHostAddress(),1)">2:释放锁                            RunProcessZipThread thread =  RunProcessZipThread(readFtpfileService);                            thread.start();                        }                    }                    2:释放锁                    if (socketData.getCommand().equals(2)) {                        log.info("收到节点 " + socketData.getNodename() + " " + socket.getInetAddress().getHostAddress() + " 的命令 2:释放锁");                        semaphore.release();                    }                    3:直接处理请求                    if (socketData.getCommand().equals(3)) {                        log.info("收到节点 " + socketData.getNodename() + " " + socket.getInetAddress().getHostAddress() + " 的命令 3:直接处理请求");                        SocketUtil.Send(socket.getInetAddress().getHostAddress(),4,socketData.getFtpfilename())); 4:直接处理命令                    }                    4:直接处理命令                    if (socketData.getCommand().equals(4)) {                        log.info("收到节点 " + socketData.getNodename() + " " + socket.getInetAddress().getHostAddress() + " 的命令 4:直接处理命令");                        String ftpfilename = socketData.getFtpfilename();                         downloadFtpfile(ftpfilename);                        ) {                            ZipData zipData =  ZipData(file);                            ) {                                processZipData(zipData);                                removeFtpfile(ftpHost,ftpfilename);                                 SocketUtil.Send(masterIp,1)">                                ZipProcesstime.updateTime();                            }  {                                removeFtpfile(ftpHost,1)">                                RunProcessZipThread thread =  {                            RunProcessZipThread thread =  RunProcessZipThread(readFtpfileService);                            thread.start();                        }                    }                }  (Exception e) {                    log.error("SocketServer while 出错" (Exception e) {            log.error("SocketServer run 出错" processZipData(ZipData zipData) {        CountDownLatch countDownLatch = new CountDownLatch(zipData.getHotsoptList().size() + zipData.getTerminfoList().size());         (HotspotInfoCollected hotspot : zipData.getHotsoptList()) {            RunnableSendHotspot runnableSendHotspot =  RunnableSendHotspot(kafkaSendService,hotspot,countDownLatch);            threadPool.submit(runnableSendHotspot);        }         (TermInfocharacteristics terminfo : zipData.getTerminfoList()) {            RunnableSendTerm runnableSendTerm =  RunnableSendTerm(kafkaSendService,terminfo,countDownLatch);            threadPool.submit(runnableSendTerm);        }         {            countDownLatch.await();        }  (InterruptedException e) {            log.error("countDownLatch.await() Failed" renameFtpfile(String oldname,String newname) {        @R_404_4807@s @R_404_4807@s =  (Exception e) {            log.error("RenameFtpfile() Failed" {            @[email protected]();        }    }     result;    }    byte[] downloadFtpfile(String filePath) {        byte[] result = ;        @R_404_4807@s @R_404_4807@s =  @[email protected](filePath);            int tryCount = 1;            while (result == null && tryCount < 5) {                log.info("下载FTP文件 " + filePath + " 失败,尝试再次下载" + tryCount);                result = @[email protected](filePath);                Thread.sleep(3000);                tryCount++;            }        }  (Exception e) {            log.error("downloadFtpfile(filePath) Failed"boolean removeFtpfile(String ftpHost,String filePath) {        boolean result =  @[email protected](filePath);        }  (Exception e) {            log.error("removeFtpfile() Failed" result;    }}
VIEw Code

4.RunProcessZipThread类:负责任务的重启

 * 用于重启数据处理任务 */class RunProcessZipThread  Thread {    public RunProcessZipThread(ReadFtpfileService readFtpfileService) {         readFtpfileService;    }     {            Thread.sleep(20000 (InterruptedException e) {            e.printstacktrace();        }        readFtpfileService.ProcessZip();    }}
VIEw Code

实际处理数据的代码:

1.RunnableSendHotspot类:

 com.Google.gson.Gson; com.suncreate.logback.elasticsearch.metric.DataType; com.suncreate.logback.elasticsearch.metric.ProcPhase; com.suncreate.logback.elasticsearch.metric.ProcStatus; com.suncreate.logback.elasticsearch.metric.SinkType; com.suncreate.logback.elasticsearch.util.MetricUtil; java.util.HashMap; java.util.concurrent.CountDownLatch;class RunnableSendHotspot implements Runnable {    final Logger log = LoggerFactory.getLogger(RunnableSendHotspot. HotspotInfoCollected hotspot;     CountDownLatch countDownLatch;     RunnableSendHotspot(KafkaSendService kafkaSendService,HotspotInfoCollected hotspot,CountDownLatch countDownLatch) {        this.hotspot = hotspot;        this.countDownLatch = countDownLatch;    }    @OverrIDe     run() {        log2ES(ProcPhase.collect.toString(),ProcStatus.suc.toString(),1);        kafkaSendService.sendHotspotInfoCollected(hotspot);        countDownLatch.countDown();    }     log2ES(String procPhase,String procStatus,Integer count) {        HashMap<String,Object> logMap;        logMap = (HashMap<String,Object>) MetricUtil.getMap("wifi_probe","hik"new Gson().toJson(logMap) + "    Count:" + countDownLatch.getCount());    }}
VIEw Code

2.RunnableSendTerm类:

class RunnableSendTerm final Logger log = LoggerFactory.getLogger(RunnableSendTerm. TermInfocharacteristics termInfo;     RunnableSendTerm(KafkaSendService kafkaSendService,TermInfocharacteristics termInfo,1)">this.termInfo = termInfo;        );        kafkaSendService.sendTermInfocharacteristics(termInfo);        countDownLatch.countDown();    }     countDownLatch.getCount());    }}
VIEw Code

辅助代码:

1.ZipProcesstime类:记录数据处理任务最后活动的时间

 java.util.Date; * 记录数据处理任务最后活动的时间  ZipProcesstime {    final Logger log = LoggerFactory.getLogger(ZipProcesstime.long time = System.currentTimeMillis();     updateTime() {        time = System.currentTimeMillis();        log.info("ZipProcesstime 已更新,时间戳=" + time);    }    long getTime() {         time;    }}
VIEw Code

2.CheckScheduleConfig类:配置一个定时任务用于监控数据处理任务是否存活,如果长时间不存活,则重启任务(实际运行过程中,这种情况未出现过)

 com.suncreate.wifi.task; com.suncreate.wifi.service.ZipProcesstime; com.suncreate.wifi.service.ReadFtpfileService; org.springframework.context.annotation.Configuration; org.springframework.scheduling.annotation.EnableScheduling; org.springframework.scheduling.annotation.SchedulingConfigurer; org.springframework.scheduling.config.ScheduledTaskRegistrar; org.springframework.scheduling.support.crontrigger; * master节点与worker节点之间的socket通信有可能失败,所以需要一个监控,以重启数据处理任务 */@Configuration@EnableSchedulingclass CheckScheduleConfig  SchedulingConfigurer {    final Logger log = LoggerFactory.getLogger(CheckScheduleConfig.);    @Value("${checkQuarter}" String checkQuarter;    @autowired     ReadFtpfileService readFtpfileService;    @Value("${node.name}" String mastername;    @OverrIDe     configureTasks(ScheduledTaskRegistrar taskRegistrar) {        taskRegistrar.addTriggerTask(() ->if (!nodename.equals(mastername)) { master节点不处理数据,跳过                double sec = (System.currentTimeMillis() - ZipProcesstime.getTime()) / 1000.0;                log.info("ZipProcesstime 已经 " + sec + " 秒没有更新");                if (sec > 1800) {                    log.info("ZipProcesstime 已经长时间没有更新,重启 ProcessZip");                    readFtpfileService.ProcessZip();                }            }        },triggerContext ->  crontrigger(checkQuarter).nextExecutionTime(triggerContext));    }}
VIEw Code

 

====================== 分隔线 ==============================================================================================

目前程序已在现网连续稳定运行一个月

 

====================== 分隔线 ==============================================================================================

2021年5月27日,突然想到一个更加稳定并且容易实现的方法:每台机器上跑的服务,各处理各的,不再相互通信,通过重命名FTP文件的方式实现并发处理,以增强服务稳定性

主要代码如下:

ReadFtpfileService类代码:

 java.util.regex.Pattern;@Service String ftpPassword;    @Value("${hostname}" String hostname;     {            log.info("ProcessZip() 开始" (String ftpfilename : ftpfileList) {                String newFtpfilename = ;                if (ftpfilename.endsWith(hostname + ".zip")) {                    newFtpfilename = ftpfilename;                }  {                     (isOriginalfilename(ftpfilename)) {                        newFtpfilename = ftpfilename.replace(".zip","") + "-" + hostname + ".zip" renameFtpfile(ftpfilename,newFtpfilename);                        bl) {                            newFtpfilename = ;                        }                    }                }                if (newFtpfilename != ) {                     downloadFtpfile(newFtpfilename);                    ) {                        ZipData zipData =  ZipData(file);                        CountDownLatch countDownLatch =  zipData.getTerminfoList().size());                         (HotspotInfoCollected hotspot : zipData.getHotsoptList()) {                            RunnableSendHotspot runnableSendHotspot =  (TermInfocharacteristics terminfo : zipData.getTerminfoList()) {                            RunnableSendTerm runnableSendTerm =  {                            countDownLatch.await();                        }  (InterruptedException e) {                            log.error("countDownLatch.await() Failed"删除ftp中的数据                        removeFtpfile(ftpHost,newFtpfilename);                    }                }            }            log.info("ProcessZip() 结束" (Exception e) {            log.error("ProcessZip() Failed"     * 判断FTP文件是否是未改过名的原始文件     boolean isOriginalfilename(String ftpfilename) {        String pattern = "^[\\S\\s]*-[0-9]*-[0-9]*-[0-9]*.zip$";         @[email protected](oldname,1)"> bl;    }     result;    }}
VIEw Code

 

====================== 分隔线 ==============================================================================================

2021年6月16日,距第一次提交代码已经有半年了,怎么又要修改了呢?同事说你部署的节点太多,容易出问题。“因为部署的节点多,所以容易出问题”这句话逻辑上对吗?

所以我又有想法了,最好能优化一下,只部署在一台机器上,一个进程搞定。其实几个月前我就怀疑代码中kafka生产者性能不行,但是修改测试其实是麻烦的,以前的同事写的好几个服务都是这样写的,按说不会有什么问题,所以始终没有花时间去验证。

原kafka生产者代码:

KafkaConfig.java:

 com.suncreate.wifi.config; org.apache.kafka.clIEnts.producer.ProducerConfig; org.apache.kafka.common.serialization.StringSerializer; org.springframework.context.annotation.Bean; org.springframework.kafka.annotation.EnableKafka; org.springframework.kafka.core.DefaultKafkaProducerFactory; org.springframework.kafka.core.KafkaTemplate; org.springframework.kafka.core.ProducerFactory; java.util.Map;@Configuration@EnableKafka KafkaConfig {    final Logger log = LoggerFactory.getLogger(KafkaConfig.);    @Value("${kafka.bootstrap-servers}" String bootstrapServers;    @Bean    public ProducerFactory<String,String> producerFactory() {        return new DefaultKafkaProducerFactory<>(producerConfigs());    }    @Bean    public Map<String,1)"> producerConfigs() {        Map<String,Object> props = new HashMap<>();        props.put(ProducerConfig.bootstrap_SERVERS_CONfig,bootstrapServers);        props.put(ProducerConfig.RETRIES_CONfig,"3");        props.put(ProducerConfig.liNGER_MS_CONfig,"1");        props.put(ProducerConfig.BATCH_SIZE_CONfig,"65536");        props.put(ProducerConfig.BUFFER_MEMORY_CONfig,"524288");        props.put(ProducerConfig.KEY_SERIAliZER_CLASS_CONfig,StringSerializer.);        props.put(ProducerConfig.VALUE_SERIAliZER_CLASS_CONfig,1)">);         props;    }    @Bean    public KafkaTemplate<String,1)"> kafkaTemplate() {        new KafkaTemplate<>(producerFactory());    }}
VIEw Code

KafkaSendService.java:

 java.sql.Date; java.util.HashMap; com.alibaba.fastJson.JsONObject; com.Google.gson.Gson; com.suncreate.wifi.model.DeviceInfo; com.suncreate.wifi.model.WifIData; org.springframework.kafka.core.ProducerFactory; org.springframework.kafka.support.SendResult; org.springframework.stereotype.Service; org.springframework.util.concurrent.ListenableFuture; org.springframework.util.concurrent.ListenableFutureCallback;@Service KafkaSendService {    final Logger log = LoggerFactory.getLogger(KafkaSendService.);    @autowired    KafkaTemplate<String,1)"> kafkaTemplate;    @autowired     BuildBaseinfo buildBaseinfo;    @Value("${kafka.topic}" String topic;     sendHotspotInfoCollected(HotspotInfoCollected data) {         {            DeviceInfo di = buildBaseinfo.GetDeviceInfo(data.getCollectionEquipmentLon(),data.getCollectionEquipmentLat());            di.Verify()) {                log2ES(ProcPhase.clear.toString(),ProcStatus.fail.toString(),1)">);                System.out.println("Equipment ID=" + data.getCollectionEquipmentID() + " can't get baseinfo";            }            WifIData wd =  WifIData();            wd.setdeviceid(di.getCameraNo());            wd.setdiscoverMacAddr(data.getApMac().replace('-',':'));            wd.setStartTime();            wd.setEndTime();            wd.setPower(Integer.parseInt(data.getApfIEldStrength()));            wd.setMacAppearTimes(1);            wd.setIpAddr(di.getIp());            wd.setLatitude(di.getLatitude());            wd.setLongitude(di.getLongitude());            wd.setDataSource("4");  数据来源,1-宇视,2-科达,3-大华,4-海康            wd.setCaptureTime(new Date(Long.parseLong(data.getCaptureTime()) * 1000));            wd.setInsertTime( Date(System.currentTimeMillis()));            wd.setBrand("");            wd.setXCoordinate(0d);            wd.setYCoordinate(0d);            wd.setCaptureType(1);            wd.setApSSID(data.getApSsID());            wd.setApEncryptType(data.getEncryptAlgorithmType());            wd.setApMac("");            log2ES(ProcPhase.clear.toString(),wd.getIpAddr());            sendMessage(JsONObject.toJsONString(wd));        }  (Exception e) {            log2ES(ProcPhase.clear.toString(),1)">);            log.error("Failed to clear HotspotInfo" sendTermInfocharacteristics(TermInfocharacteristics data) {         WifIData();            wd.setdeviceid(di.getCameraNo());            wd.setdiscoverMacAddr(data.getMac().replace('-',1)">);            wd.setPower(Integer.parseInt(data.getTerminalFIEldStrength()));            wd.setMacAppearTimes(1 Date(System.currentTimeMillis()));            wd.setBrand(data.getBrand());            wd.setXCoordinate(0d);            wd.setYCoordinate(0d);            wd.setCaptureType(0);            wd.setApSSID(data.getSsIDPisition());            wd.setApEncryptType("");            wd.setApMac(data.getAccessApMac() == null ? "" : data.getAccessApMac().replace('-',1)">));            log2ES(ProcPhase.clear.toString(),1)">);            log.error("Failed to clear TermInfo" sendMessage(String message) {        ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(topic,message);        kafkaTemplate.flush();        future.addCallback(new ListenableFutureCallback<SendResult<String,String>>() {            @OverrIDe            voID onSuccess(SendResult<String,1)"> result) {                log2ES("transport",1);            }            @OverrIDe             onFailure(Throwable ex) {                log2ES("transport",1)">);                log.error("Failed to send to kafka" Gson().toJson(logMap));    }     Gson().toJson(logMap));    }}
VIEw Code

优化后的kafka生产者代码:

KafkaProducer.java(注意和KafkaConfig.java对比):

 javax.annotation.postconstruct; java.util.Map; java.util.Random;@Service KafkaProducer {    final Logger log = LoggerFactory.getLogger(KafkaProducer. String bootstrapServers;    private Map<Integer,KafkaTemplate<String,1)"> map;    private Random random =  Random();    int producerCount = 10;    @postconstruct     init() {        map = ();        int i = 0; i < producerCount; i++) {            map.put(i,1)">new KafkaTemplate<>((getProducerConfigs())));        }    }    private Map<String,1)"> getProducerConfigs() {        Map<String,1)"> props;    }     getKafkaTemplate() {         map.get(random.nextInt(producerCount));    }}
VIEw Code

KafkaSendService.java:

);    @autowired    KafkaProducer kafkaProducer;    @autowired     sendMessage(String message) {        KafkaTemplate<String,String> kafkaTemplate = kafkaProducer.getKafkaTemplate();        ListenableFuture<SendResult<String,1)"> Gson().toJson(logMap));    }}
VIEw Code

修改后的结果:

单个程序进程生产kafka消息的速度从300多每秒提高到了3000多每秒,性能提升了一个数量级,因为我new了10个KafkaTemplate。但是我想不明白的是,按说DefaultKafkaProducerFactory应该是有连接池的啊,为什么只new一个DefaultKafkaProducerFactory性能这么差呢?

贴一下git记录(master分支,socket分支的记录就不贴了),这么单纯的一个小需求,前前后后花费了多少心思,标题我就不改了,不然过于夸张(新标题:本来一个星期工作量的工作,我干了半年):

 

总结

以上是内存溢出为你收集整理的为什么一个星期工作量的工作,我做了一个多月,还没结束 (基于socket的分布式数据处理程序Java版)全部内容,希望文章能够帮你解决为什么一个星期工作量的工作,我做了一个多月,还没结束 (基于socket的分布式数据处理程序Java版)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1212628.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存