storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。
Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies
因为多数程序开发者都此腊是使用windows系统进行程序开发,如果在本昌扒运机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。
二、实施步骤
如何基于eclipse+maven调试storm程序,步骤如下:
1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)
2.创建maven项目,并修改pom.xml,耐梁内容如pom.xml(机器联网,下载所需的依赖jar)
Github上的pom.xml,引入的依赖太多,有些不需要,
3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount
重要的是LocalCluster cluster = new LocalCluster()这一句
Config conf = new Config()
conf.setDebug(true)
conf.setNumWorkers(2)
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("test", conf, builder.createTopology())
Utils.sleep(10000)
cluster.killTopology("test")
cluster.shutdown()
pom.xml文件
<project xmlns="" xmlns:xsi=""
xsi:schemaLocation=" ">
4.0.0
storm.starter
storm-starter
0.0.1-SNAPSHOT
jar
UTF-8
github-releases
clojars.org
junit
junit
4.11
test
storm
storm
0.9.0.1
provided
commons-collections
commons-collections
3.2.1
storm程序
package storm.starter
import java.util.HashMap
import java.util.Map
import storm.starter.spout.RandomSentenceSpout
import backtype.storm.Config
import backtype.storm.LocalCluster
import backtype.storm.StormSubmitter
import backtype.storm.topology.BasicOutputCollector
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.topology.TopologyBuilder
import backtype.storm.topology.base.BaseBasicBolt
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Tuple
import backtype.storm.tuple.Values
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class WordCountTopology {
public static class SplitSentence extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String msg = input.getString(0)
System.out.println(msg + "-------------------")
if (msg != null) {
String[] s = msg.split(" ")
for (String string : s) {
collector.emit(new Values(string))
}
}
} catch (Exception e) {
e.printStackTrace()
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"))
}
}
public static class WordCount extends BaseBasicBolt {
Map counts = new HashMap()
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0)
Integer count = counts.get(word)
if (count == null)
count = 0
count++
counts.put(word, count)
collector.emit(new Values(word, count))
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"))
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder()
builder.setSpout("spout", new RandomSentenceSpout(), 5)
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(
"spout")
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",
new Fields("word"))
Config conf = new Config()
conf.setDebug(true)
if (args != null &&args.length >0) {
conf.setNumWorkers(3)
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology())
} else {
conf.setMaxTaskParallelism(3)
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("word-count", conf, builder.createTopology())
Thread.sleep(10000)
cluster.shutdown()
}
}
}
package storm.starter.spout
import backtype.storm.spout.SpoutOutputCollector
import backtype.storm.task.TopologyContext
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.topology.base.BaseRichSpout
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Values
import backtype.storm.utils.Utils
import java.util.Map
import java.util.Random
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector
Random _rand
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector
_rand = new Random()
}
@Override
public void nextTuple() {
Utils.sleep(100)
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }
String sentence = sentences[_rand.nextInt(sentences.length)]
_collector.emit(new Values(sentence))
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"))
}
}
安装Storm软件分五步: 安装Zookeeper。 安装Storm的依赖环境:Java和Python。 下载并解压Storm安装孙慎包。 修改庆凯悔必要的Storm配置文件。 启动Storm程誉正序。欢迎分享,转载请注明来源:内存溢出
评论列表(0条)