目录
一 说明
二 步骤
1.创建项目
2.引入依赖
3.主方法
4.创建Spout类
5.创建Bolt01
6.创建Bolt02
7.本地运行结果
8. 提交到Storm集群
三 总结
一 说明
通过最简单的方法,一分钟构建一个apache storm程序,初探storm原理。在此基础上再去探究strom深层机制,或许更加容易。
本文使用FirstSpout产生数据,Bolt01进行单词切分,Bolt02进行单词统计。
二 步骤 1.创建项目使用IDEA创建Maven项目。步骤File->New->Project。
2.引入依赖在POM文件中添加。
3.主方法org.apache.storm storm-core1.0.1
创建类TestMain,添加main方法
public static void main(String[] args) { //创建任务拓扑 TopologyBuilder builder = new TopologyBuilder(); //设置拓扑关系 builder.setSpout("firstSpout",new FirstSpout()); builder.setBolt("bolt01",new Bolt01()).shuffleGrouping("firstSpout"); builder.setBolt("bolt02",new Bolt02()).shuffleGrouping("bolt01"); //启动Topology Config conf = new Config(); StormTopology topology = builder.createTopology(); if(args != null && args.length > 0) { try { //提交到storm集群 StormSubmitter.submitTopology(args[0],conf,topology); } catch (Exception e) { e.printStackTrace(); } }else { //本地启动 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("storm-test", conf, topology); } }4.创建Spout类
创建类FirstSpout,产生源数据。
public class FirstSpout extends baseRichSpout { private SpoutOutputCollector collector; private int num = 0; //初始化时调用 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } //业务逻辑,产生源数据 public void nextTuple() { try { num++; this.collector.emit(new Values(num,"First Storm Project")); System.out.println("spout send: " + num); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //定义向下传递的数据格式声明 outputFieldsDeclarer.declare(new Fields("number","message")); } }5.创建Bolt01
业务逻辑处理,本文用作单词切分。
public class Bolt01 extends baseRichBolt { private OutputCollector collector; //Bolt启动前执行 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { collector = outputCollector; } //数据处理方法 public void execute(Tuple tuple) { //接收数据 String msg = tuple.getStringByField("message"); System.out.println("Bolt01 accepts:"+ tuple.getInteger(0)+" "+msg); if (msg!=null && msg.length()>0){ //单词切分 String[] words = msg.split(" "); //将切分后的单词发送到下一个Bolt for(String word : words){ this.collector.emit(new Values(word)); } } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //向下传递的数据格式声明 outputFieldsDeclarer.declare(new Fields("bNumber")); } }
Bolt获取上一步数据常用方法有两种,以String类型为例,
6.创建Bolt02tuple.getString(0); //获取第一个数据
tuple.getStringByField("message"); //“message”为上层传递的数据格式声明
业务逻辑处理,本文用作单词统计。
public class Bolt02 extends baseRichBolt { private OutputCollector collector; private int sum = 0; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { collector = outputCollector; } public void execute(Tuple tuple) { String word = tuple.getStringByField("bNumber"); //单词数量统计 if (word !=null && word.length()>0){ sum++; } System.out.println("words statistics:"+sum); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }7.本地运行结果
8. 提交到Storm集群spout send 1
Bolt01 accepts:1 My First Project
words statistics:1
words statistics:2
words statistics:3
spout send 2
Bolt01 accepts:2 My First Project
words statistics:4
words statistics:5
words statistics:6
spout send 3
Bolt01 accepts:3 My First Project
words statistics:7
words statistics:8
words statistics:9
spout send 4
Bolt01 accepts:4 My First Project
words statistics:10
words statistics:11
words statistics:12
程序打包,运行mvn package,打出storm-test-1.0-SNAPSHOT.jar包。
jar包上次到服务器。
使用以下命令提交。
三 总结storm jar storm-test-1.0-SNAPSHOT.jar com.weichai.TestMain storm-test
本文提供了一种简单的storm程序构建方法,没有涉及多个workers,tasks,以及ack可靠性等。但以本文为基础,进而研究更多storm参数,或许会更加轻松。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)