storm分布式计算与问题connection refuse排查。

storm分布式计算与问题connection refuse排查。,第1张

由于项目需要,需要用到storm做分布式计算与数据处理,storm的原理和相关介绍就不在此赘叙了。

项目中storm下发的bolt有2层:

首先编写一个topology:

public class HomeBandToplogy {

private static final String TOPOLOGY_NAME = "HomeBandToplogy"

private static final String KAFKA_SPOUT = "kafkaSpout"

private static final String KAFKA_BOLT = "kafkaBolt"

private static final String ANYNASIS_BOLT = "AnynasisBolt"

private static final Log log = LogFactory.getLog(HomeBandToplogy.class)

}

然后编写一个kafkabolt和一个AnynasisBolt,如下:

kafkabolt:

public class KafkaBolt extends BaseRichBolt {

OutputCollector collector

Log logger

}

AnynasisBolt:

public class AnynasisBolt extends BaseRichBolt {

private OutputCollector collector

Log logger

}

工厂类:

public class BoxFactory {

}

接口类:

public interface BoxService extends Serializable {

}

抽象类:

**

@override

public Boolean executeRedis( return null)

@override

public Boolean executeHbase( return null)

}

storm程序启动以后,小批量数据运行正常。

继续加大数据测试,数据量达到几十万的时候,出现异常,异常如下:

[ERROR] connection attempt 9 to Netty-Client-node5/172.16.1.100:6700 failed: java.net.ConnectException: Connection refused: node5/172.16.1.100:6700

2018-11-16 17:46:11.533 o.a.s.u.StormBoundedExponentialBackoffRetry client-boss-1 [WARN] WILL SLEEP FOR 420ms (MAX)

同时storm程序大量ack失败。

开始以后是线程数过多,以及环境资源紧张导致此种异常。

后来经过重重排查,将接口去掉,将抽象类中的方法变为抽象方法后,程序运行正常。

为何出现这样的异常呢?

原因在于,storm在处理的时候只会处理当前进程下的任务,跨进程的调度是无法实现的,故产生这样的故障。

如何基于eclipse+maven调试storm程序,步骤如下:

1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)

2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)

Github上的pom.xml,引入的依赖太多,有些不需要,详细可以参考:

https://github.com/nathanmarz/storm-starter/blob/master/m2-pom.xml

3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount

重要的是LocalCluster cluster = new LocalCluster()这一句

具体参考:http://my.oschina.net/cloudcoder/blog/200105

最近公司有个需求,需要在后端应用服务器上实时获取STORM集群的运行信息和topology相关的提交和控制,经过几天对STORM UI和CMD源码的分析,得出可以通过其thrift接口调用实现这些功能。先下载一个thrift库进行编码和安装。关于thrift可以参见这个地方。安装完成后,从STORM源码中将storm.thrift拷贝到thrift目录下。输入:

hrift -gen cpp storm.thrift

会得到一个gen-cpp目录,里面就是thrift先关脚本的C++实现。我们先看storm.thrift文件接口:

view sourceprint?

01.service Nimbus

02.{

03.//TOPOLOGY上传接口

04.void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf,4: StormTopology topology)

05.void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options)

06.void killTopology(1: string name)

07.void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e)

08.void activate(1: string name)

09.void deactivate(1: string name)

10.void rebalance(1: string name, 2: RebalanceOptions options)

11.

12.//TOPOLOGY JAR包上传接口

13.string beginFileUpload()

14.void uploadChunk(1: string location, 2: binary chunk)

15.void finishFileUpload(1: string location)

16.string beginFileDownload(1: string file)

17.binary downloadChunk(1: string id)

18.

19.//获取NIMBUS的配置信息

20.string getNimbusConf()

21.//获取STORM集群运行信息

22.ClusterSummary getClusterInfo()

23.//获取TOPOLOGY的运行状态信息

24.TopologyInfo getTopologyInfo(1: string id)

25.//获取TOPOLOGY对象信息

26.string getTopologyConf(1: string id)

27.StormTopology getTopology(1: string id)

28.StormTopology getUserTopology(1: string id)

29.}

生成C++文件后,我们就可以对其接口进行调用,由于thrift c++框架是使用boost库实现的,必须安装boost库依赖。实现的代码如下:

view sourceprint?

01.#define HAVE_NETDB_H //使用网络模块的宏必须打开

02.#include "Nimbus.h"

03.#include "storm_types.h"

04.

05.#include <string>

06.#include <iostream>

07.#include <set>

08.

09.#include <transport/TSocket.h>

10.#include <transport/TBufferTransports.h>

11.#include <protocol/TBinaryProtocol.h>

12.int test_storm_thrift()

13.{

14.boost::shared_ptr<TSocket>tsocket(new TSocket("storm-nimbus-server", 6627))

15.boost::shared_ptr<TTransport>ttransport(new TFramedTransport(tsocket, 1024 * 512))//此处必须使用TFramedTransport

16.boost::shared_ptr<TProtocol>tprotocol(new TBinaryProtocol(ttransport))

17.try{

18.//创建一个nimbus客户端对象

19.NimbusClient client(tprotocol)

20.//打开通道

21.ttransport->open()

22.

23.ClusterSummary summ

24.std::string conf

25.//对STORM的RPC调用,直接获取信息,同步进行的。

26.client.getNimbusConf(conf)

27.client.getClusterInfo(summ)

28.//关闭通道

29.ttransport->close()

30.}catch(TException &tx){

31.printf("InvalidOperation: %s

32.", tx.what())

33.}

34.}

以上代码就可以直接获取nimbus的配置和集群信息,其他接口以此类推。值得注意的是storm.thrift to C++生成的storm_types.h文件里其中operator <函数都未实现,所以必须手动进行添加实现,否则编译会有问题。

不仅仅C++可以实现STORM的控制,PHP和其他的语言也可以实现,只要thrift支持就OK。有兴趣可以实现一下试试看。

转载


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/yw/7883266.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-11
下一篇 2023-04-11

发表评论

登录后才能评论

评论列表(0条)

保存