当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 拓扑

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:38

由 小路依依 创建, 最后一次修改 2016-08-12 拓扑在这一章,你将学到如何在同一个 Storm 拓扑结构内的不同组件之间传递元组,以及如何向一个运行中的 Storm 集群发布一个拓扑。数据流组设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被 bolts 消费的)。一个据数流组指定了每个 bolt 会消费哪些数据流,以及如何消费它们。NOTE:一个节点能够发布一个以上的数据流,一个数据流组允许我们选择接收哪个。数据流组在定义拓扑时设置,就像我们在第二章看到的:··· builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader");··· 在前面的代码块里,一个 bolt 由 TopologyBuilder 对象设定, 然后使用随机数据流组指定数据源。数据流组通常将数据源组件的 ID 作为参数,取决于数据流组的类型不同还有其它可选参数。NOTE:每个 InputDeclarer 可以有一个以上的数据源,而且每个数据源可以分到不同的组。随机数据流组随机流组是最常用的数据流组。它只有一个参数(数据源组件),并且数据源会向随机选择的 bolt 发送元组,保证每个消费者收到近似数量的元组。随机数据流组用于数学计算这样的原子操作。然而,如果操作不能被随机分配,就像第二章为单词计数的例子,你就要考虑其它分组方式了。域数据流组域数据流组允许你基于元组的一个或多个域控制如何把元组发送给 bolts。 它保证拥有相同域组合的值集发送给同一个 bolt。 回到单词计数器的例子,如果你用 word 域为数据流分组,word-normalizer bolt 将只会把相同单词的元组发送给同一个 word-counterbolt 实例。··· builder.setBolt("word-counter", new WordCounter(),2) .fieldsGrouping("word-normalizer", new Fields("word"));··· NOTE: 在域数据流组中的所有域集合必须存在于数据源的域声明中。全部数据流组全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向 bolts 发送信号。比如,你要刷新缓存,你可以向所有的 bolts 发送一个刷新缓存信号。在单词计数器的例子里,你可以使用一个全部数据流组,添加清除计数器缓存的功能(见拓扑示例) public void execute(Tuple input) { String str = null; try{ if(input.getSourceStreamId().equals("signals")){ str = input.getStringByField("action"); if("refreshCache".equals(str)) counters.clear(); } }catch (IllegalArgumentException e){ //什么也不做 } ··· } 我们添加了一个 if 分支,用来检查源数据流。 Storm 允许我们声明具名数据流(如果你不把元组发送到一个具名数据流,默认发送到名为 ”default“ 的数据流)。这是一个识别元组的极好的方式,就像这个例子中,我们想识别 signals 一样。 在拓扑定义中,你要向 word-counter bolt 添加第二个数据流,用来接收从 signals-spout 数据流发送到所有 bolt 实例的每一个元组。 builder.setBolt("word-counter", new WordCounter(),2) .fieldsGroupint("word-normalizer",new Fields("word")) .allGrouping("signals-spout","signals"); signals-spout的实现请参考git仓库。自定义数据流组你可以通过实现 backtype.storm.grouping.CustormStreamGrouping 接口创建自定义数据流组,让你自己决定哪些 bolt 接收哪些元组。让我们修改单词计数器示例,使首字母相同的单词由同一个 bolt 接收。 public class ModuleGrouping mplents CustormStreamGrouping, Serializable{ int numTasks = 0; @Override public List<Integer> chooseTasks(List<Object> values) { List<Integer> boltIds = new ArrayList<Integer>(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); }else{ boltIds.add(str.charAt(0) % numTasks); } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) { numTasks = targetTasks.size(); } } 这是一个 CustomStreamGrouping 的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的 bolt。按下述方式 word-normalizer 修改即可使用这个自定义数据流组。 builder.setBolt("word-normalizer", new WordNormalizer()) .customGrouping("word-reader", new ModuleGrouping()); 直接数据流组这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个 bolt 接收元组。要使用直接数据流组,在 WordNormalizer bolt 中,使用 emitDirect 方法代替 emit。 public void execute(Tuple input) { ... for(String word : words){ if(!word.isEmpty()){ ... collector.emitDirect(getWordCountIndex(word),new Values(word)); } } //对元组做出应答 collector.ack(input); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if(word.isEmpty()){ return 0; }else{ return word.charAt(0) % numCounterTasks; } } 在 prepare 方法中计算任务数 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); } 在拓扑定义中指定数据流将被直接分组: builder.setBolt("word-counter", new WordCounter(),

[1] [2]  下一页


Storm 拓扑