一分钟构建Apache Storm程序

一分钟构建Apache Storm程序,第1张

一分钟构建Apache Storm程序

目录

一 说明

二 步骤 

1.创建项目

2.引入依赖

3.主方法

4.创建Spout类

5.创建Bolt01

6.创建Bolt02

7.本地运行结果

8. 提交到Storm集群

三 总结


 

一 说明

通过最简单的方法,一分钟构建一个apache storm程序,初探storm原理。在此基础上再去探究strom深层机制,或许更加容易。

本文使用FirstSpout产生数据,Bolt01进行单词切分,Bolt02进行单词统计。

二 步骤  1.创建项目

使用IDEA创建Maven项目。步骤File->New->Project。

2.引入依赖

在POM文件中添加。provided如果本地运行需要注释掉,如果提交到storm集群,需要取消注释。


    
        org.apache.storm
        storm-core
        1.0.1
        
    
3.主方法

创建类TestMain,添加main方法

public static void main(String[] args) {
	//创建任务拓扑
	TopologyBuilder builder = new TopologyBuilder();
	//设置拓扑关系
	builder.setSpout("firstSpout",new FirstSpout());
	builder.setBolt("bolt01",new Bolt01()).shuffleGrouping("firstSpout");
	builder.setBolt("bolt02",new Bolt02()).shuffleGrouping("bolt01");
    
    //启动Topology
	Config conf = new Config();
	StormTopology topology = builder.createTopology();
    if(args != null && args.length > 0) {
		try {
            //提交到storm集群
			StormSubmitter.submitTopology(args[0],conf,topology);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}else {
		//本地启动
		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("storm-test", conf, topology);
	}
}

4.创建Spout类

创建类FirstSpout,产生源数据。

public class FirstSpout extends baseRichSpout {
    private SpoutOutputCollector collector;
    private int num = 0;
    //初始化时调用
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }
    //业务逻辑,产生源数据
    public void nextTuple() {
        try {
            num++;
            this.collector.emit(new Values(num,"First Storm Project"));
            System.out.println("spout send: " + num);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //定义向下传递的数据格式声明
        outputFieldsDeclarer.declare(new Fields("number","message"));
    }
}
5.创建Bolt01

业务逻辑处理,本文用作单词切分

public class Bolt01 extends baseRichBolt {
    private OutputCollector collector;
    //Bolt启动前执行
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        collector = outputCollector;
    }
    //数据处理方法
    public void execute(Tuple tuple) {
        //接收数据
        String msg = tuple.getStringByField("message");
        System.out.println("Bolt01 accepts:"+ tuple.getInteger(0)+" "+msg);
        if (msg!=null && msg.length()>0){
            //单词切分
            String[] words = msg.split(" ");
            //将切分后的单词发送到下一个Bolt
            for(String word : words){
                this.collector.emit(new Values(word));
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //向下传递的数据格式声明
        outputFieldsDeclarer.declare(new Fields("bNumber"));
    }
}

Bolt获取上一步数据常用方法有两种,以String类型为例,

tuple.getString(0);       //获取第一个数据

tuple.getStringByField("message");    //“message”为上层传递的数据格式声明

6.创建Bolt02

业务逻辑处理,本文用作单词统计。

public class Bolt02 extends baseRichBolt {
    private OutputCollector collector;
    private int sum = 0;
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        collector = outputCollector;
    }
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("bNumber");
        //单词数量统计
        if (word !=null && word.length()>0){
            sum++;
        }
        System.out.println("words statistics:"+sum);
    }
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}

7.本地运行结果

spout send 1
Bolt01 accepts:1 My First Project
words statistics:1
words statistics:2
words statistics:3
spout send 2
Bolt01 accepts:2 My First Project
words statistics:4
words statistics:5
words statistics:6
spout send 3
Bolt01 accepts:3 My First Project
words statistics:7
words statistics:8
words statistics:9
spout send 4
Bolt01 accepts:4 My First Project
words statistics:10
words statistics:11
words statistics:12

8. 提交到Storm集群

 程序打包,运行mvn package,打出storm-test-1.0-SNAPSHOT.jar包。

jar包上次到服务器。

使用以下命令提交。

storm jar storm-test-1.0-SNAPSHOT.jar com.weichai.TestMain storm-test

三 总结

本文提供了一种简单的storm程序构建方法,没有涉及多个workers,tasks,以及ack可靠性等。但以本文为基础,进而研究更多storm参数,或许会更加轻松。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5719807.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存