请教storm本地模式启动的问题

请教storm本地模式启动的问题,第1张

一、介绍

storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。

Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies

因为多数程序开发者都此腊是使用windows系统进行程序开发,如果在本昌扒运机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。

二、实施步骤

如何基于eclipse+maven调试storm程序,步骤如下:

1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)

2.创建maven项目,并修改pom.xml,耐梁内容如pom.xml(机器联网,下载所需的依赖jar)

Github上的pom.xml,引入的依赖太多,有些不需要,

3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount

重要的是LocalCluster cluster = new LocalCluster()这一句

Config conf = new Config()

conf.setDebug(true)

conf.setNumWorkers(2)

LocalCluster cluster = new LocalCluster()

cluster.submitTopology("test", conf, builder.createTopology())

Utils.sleep(10000)

cluster.killTopology("test")

cluster.shutdown()

pom.xml文件

<project xmlns="" xmlns:xsi=""

xsi:schemaLocation=" ">

4.0.0

storm.starter

storm-starter

0.0.1-SNAPSHOT

jar

UTF-8

github-releases

clojars.org

junit

junit

4.11

test

storm

storm

0.9.0.1

provided

commons-collections

commons-collections

3.2.1

storm程序

package storm.starter

import java.util.HashMap

import java.util.Map

import storm.starter.spout.RandomSentenceSpout

import backtype.storm.Config

import backtype.storm.LocalCluster

import backtype.storm.StormSubmitter

import backtype.storm.topology.BasicOutputCollector

import backtype.storm.topology.OutputFieldsDeclarer

import backtype.storm.topology.TopologyBuilder

import backtype.storm.topology.base.BaseBasicBolt

import backtype.storm.tuple.Fields

import backtype.storm.tuple.Tuple

import backtype.storm.tuple.Values

/**

* This topology demonstrates Storm's stream groupings and multilang

* capabilities.

*/

public class WordCountTopology {

public static class SplitSentence extends BaseBasicBolt {

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

try {

String msg = input.getString(0)

System.out.println(msg + "-------------------")

if (msg != null) {

String[] s = msg.split(" ")

for (String string : s) {

collector.emit(new Values(string))

}

}

} catch (Exception e) {

e.printStackTrace()

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"))

}

}

public static class WordCount extends BaseBasicBolt {

Map counts = new HashMap()

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String word = tuple.getString(0)

Integer count = counts.get(word)

if (count == null)

count = 0

count++

counts.put(word, count)

collector.emit(new Values(word, count))

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word", "count"))

}

}

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder()

builder.setSpout("spout", new RandomSentenceSpout(), 5)

builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(

"spout")

builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",

new Fields("word"))

Config conf = new Config()

conf.setDebug(true)

if (args != null &&args.length >0) {

conf.setNumWorkers(3)

StormSubmitter.submitTopology(args[0], conf,

builder.createTopology())

} else {

conf.setMaxTaskParallelism(3)

LocalCluster cluster = new LocalCluster()

cluster.submitTopology("word-count", conf, builder.createTopology())

Thread.sleep(10000)

cluster.shutdown()

}

}

}

package storm.starter.spout

import backtype.storm.spout.SpoutOutputCollector

import backtype.storm.task.TopologyContext

import backtype.storm.topology.OutputFieldsDeclarer

import backtype.storm.topology.base.BaseRichSpout

import backtype.storm.tuple.Fields

import backtype.storm.tuple.Values

import backtype.storm.utils.Utils

import java.util.Map

import java.util.Random

public class RandomSentenceSpout extends BaseRichSpout {

SpoutOutputCollector _collector

Random _rand

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

_collector = collector

_rand = new Random()

}

@Override

public void nextTuple() {

Utils.sleep(100)

String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",

"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }

String sentence = sentences[_rand.nextInt(sentences.length)]

_collector.emit(new Values(sentence))

}

@Override

public void ack(Object id) {

}

@Override

public void fail(Object id) {

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"))

}

}

安装Storm软件分五步: 安装Zookeeper。 安装Storm的依赖环境:Java和Python。 下载并解压Storm安装孙慎包。 修改庆凯悔必要的Storm配置文件。 启动Storm程誉正序。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/12268022.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-24
下一篇 2023-05-24

发表评论

登录后才能评论

评论列表(0条)

保存