项目中storm下发的bolt有2层:
首先编写一个topology:
public class HomeBandToplogy {
private static final String TOPOLOGY_NAME = "HomeBandToplogy"
private static final String KAFKA_SPOUT = "kafkaSpout"
private static final String KAFKA_BOLT = "kafkaBolt"
private static final String ANYNASIS_BOLT = "AnynasisBolt"
private static final Log log = LogFactory.getLog(HomeBandToplogy.class)
}
然后编写一个kafkabolt和一个AnynasisBolt,如下:
kafkabolt:
public class KafkaBolt extends BaseRichBolt {
OutputCollector collector
Log logger
}
AnynasisBolt:
public class AnynasisBolt extends BaseRichBolt {
private OutputCollector collector
Log logger
}
工厂类:
public class BoxFactory {
}
接口类:
public interface BoxService extends Serializable {
}
抽象类:
**
@override
public Boolean executeRedis( return null)
@override
public Boolean executeHbase( return null)
}
storm程序启动以后,小批量数据运行正常。
继续加大数据测试,数据量达到几十万的时候,出现异常,异常如下:
[ERROR] connection attempt 9 to Netty-Client-node5/172.16.1.100:6700 failed: java.net.ConnectException: Connection refused: node5/172.16.1.100:6700
2018-11-16 17:46:11.533 o.a.s.u.StormBoundedExponentialBackoffRetry client-boss-1 [WARN] WILL SLEEP FOR 420ms (MAX)
同时storm程序大量ack失败。
开始以后是线程数过多,以及环境资源紧张导致此种异常。
后来经过重重排查,将接口去掉,将抽象类中的方法变为抽象方法后,程序运行正常。
为何出现这样的异常呢?
原因在于,storm在处理的时候只会处理当前进程下的任务,跨进程的调度是无法实现的,故产生这样的故障。
如何基于eclipse+maven调试storm程序,步骤如下:1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)
2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)
Github上的pom.xml,引入的依赖太多,有些不需要,详细可以参考:
https://github.com/nathanmarz/storm-starter/blob/master/m2-pom.xml
3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount
重要的是LocalCluster cluster = new LocalCluster()这一句
具体参考:http://my.oschina.net/cloudcoder/blog/200105
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)