当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 起步

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:37

this.collector.emit(new Values(str)); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } } NOTE: Values 是一个 ArrarList 实现,它的元素就是传入构造器的参数。nextTuple() 会在同一个循环内被 ack() 和 fail() 周期性的调用。没有任务时它必须释放对线程的控制,其它方法才有机会得以执行。因此 nextTuple 的第一行就要检查是否已处理完成。如果完成了,为了降低处理器负载,会在返回前休眠一毫秒。如果任务完成了,文件中的每一行都已被读出并分发了。NOTE:元组(tuple)是一个具名值列表,它可以是任意 java 对象(只要它是可序列化的)。默认情况,Storm 会序列化字符串、字节数组、ArrayList、HashMap 和 HashSet 等类型。Bolts现在我们有了一个 spout,用来按行读取文件并每行发布一个元组,还要创建两个 bolts,用来处理它们(看图2-1)。bolts 实现了接口 backtype.storm.topology.IRichBolt。bolt最重要的方法是void execute(Tuple input),每次接收到元组时都会被调用一次,还会再发布若干个元组。NOTE: 只要必要,bolt 或 spout 会发布若干元组。当调用 nextTuple 或 execute 方法时,它们可能会发布0个、1个或许多个元组。你将在第五章学习更多这方面的内容。第一个 bolt,WordNormalizer,负责得到并标准化每行文本。它把文本行切分成单词,大写转化成小写,去掉头尾空白符。首先我们要声明 bolt 的出参: public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("word")); } 这里我们声明 bolt 将发布一个名为 “word” 的域。下一步我们实现 public void execute(Tuple input),处理传入的元组: public void execute(Tuple input){ String sentence=input.getString(0); String[] words=sentence.split(" "); for(String word : words){ word=word.trim(); if(!word.isEmpty()){ word=word.toLowerCase(); //发布这个单词 collector.emit(new Values(word)); } } //对元组做出应答 collector.ack(input); } 第一行从元组读取值。值可以按位置或名称读取。接下来值被处理并用collector对象发布。最后,每次都调用collector 对象的 ack() 方法确认已成功处理了一个元组。例2-2是这个类的完整代码。 //例2-2 src/main/java/bolts/WordNormalizer.java package bolts; import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer implements IRichBolt{ private OutputCollector collector; public void cleanup(){} / bolt从单词文件接收到文本行,并标准化它。 文本行会全部转化成小写,并切分它,从中得到所有单词。 / public void execute(Tuple input){ String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word=word.toLowerCase(); //发布这个单词 List a = new ArrayList(); a.add(input); collector.emit(a,new Values(word)); } } //对元组做出应答 collector.ack(input); } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } / 这个bolt只会发布“word”域 / public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } NOTE:通过这个例子,我们了解了在一次 execute 调用中发布多个元组。如果这个方法在一次调用中接收到句子 “This is the Storm book”,它将会发布五个元组。下一个bolt,WordCounter,负责为单词计数。这个拓扑结束时(cleanup() 方法被调用时),我们将显示每个单词的数量。NOTE: 这个例子的 bolt 什么也没发布,它把数据保存在 map 里,但是在真实的场景中可以把数据保存到数据库。package bolts;import java.util.HashMap;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class WordCounter implements IRichBolt{ Integer id; String name; Map counters; private OutputCollector collector; / 这个spout结束时(集群关闭的时候),我们会显示单词数量 / @Override public void cleanup(){ System.out.println("-- 单词数 【"+name+"-"+id+"】 --"); for(Map.Entry entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); } } / 为每个单词计数 /@Overridepublic void execute(Tuple input) { String str=input.getString(0); /** 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1 / if(!counters.containsKey(str)){

上一页  [1] [2] [3] [4]  下一页


Storm 起步