当前位置:K88软件开发文章中心大数据Apache Storm → 文章内容

Apache Storm工作实例

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:05:58

入元组。多元组可以被处理和输出为单个输出元组。处理的元组可以通过使用OutputCollector类发出。cleanup cleanup方法的签名如下 - cleanup() declareOutputFields declareOutputFields方法的签名如下- declareOutputFields(OutputFieldsDeclarer declarer)这里的参数declarer用于声明输出流id,输出字段等。此方法用于指定元组的输出模式。呼叫日志创建者bolt呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。完整的代码如下。编码 - CallLogCreatorBolt.java //import util packagesimport java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;//import Storm IRichBolt packageimport backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;//Create a class CallLogCreatorBolt which implement IRichBolt interfacepublic class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; }}呼叫日志计数器Bolt呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。完整的代码如下。编码 - CallLogCounterBolt.java import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; }}创建拓扑Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而容易的方法来创建复杂的拓扑。TopologyBuilder类具有设置spout(setSpout)和设置bolt(setBolt)的方法。最后,TopologyBuilder有createTopology来创建拓扑。使用以下代码片段创建拓扑 - TopologyBuilder builder = new TopologyBuilder();builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout");builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); shuffleGrouping和fieldsGrouping方法有助于为spout和bolts设置流分组。本地集群为了开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。 “submitTopology”的参数之一是“Config”类的实例。“Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并使用prepare方法发送到所有任务(spout和bolt)。一旦拓扑提交到集群,我们将等待10秒钟,集群计算提交的拓扑,然后使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 - 编码 - LogAnalyserStorm.java import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;//import storm configuration packagesimport backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;//Create main class LogAnalyserStorm submit topology.public class LogAnalyserStorm { public static void main(String[] args) throws Exc

上一页  [1] [2] [3]  下一页


Apache Storm工作实例