当前位置:K88软件开发文章中心大数据Apache Storm → 文章内容

Apache Storm Trident

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:05:59

由 chalex 创建,小路依依 最后一次修改 2016-12-12 Trident是Storm的延伸。像Storm,Trident也是由Twitter开发的。开发Trident的主要原因是在Storm上提供高级抽象,以及状态流处理和低延迟分布式查询。Trident使用spout和bolt,但是这些低级组件在执行之前由Trident自动生成。 Trident具有函数,过滤器,联接,分组和聚合。Trident将流处理为一系列批次,称为事务。通常,这些小批量的大小将是大约数千或数百万个元组,这取决于输入流。这样,Trident不同于Storm,它执行元组一元组处理。批处理概念非常类似于数据库事务。每个事务都分配了一个事务ID。该事务被认为是成功的,一旦其所有的处理完成。然而,处理事务的元组中的一个的失败将导致整个事务被重传。对于每个批次,Trident将在事务开始时调用beginCommit,并在结束时提交。Trident拓扑Trident API公开了一个简单的选项,使用“TridentTopology”类创建Trident拓扑。基本上,Trident拓扑从流出接收输入流,并对流上执行有序的操作序列(滤波,聚合,分组等)。Storm元组被替换为Trident元组,bolt被操作替换。一个简单的Trident拓扑可以创建如下 - TridentTopology topology = new TridentTopology();Trident TuplesTrident Tuples是一个命名的值列表。TridentTuple接口是Trident拓扑的数据模型。TridentTuple接口是可由Trident拓扑处理的数据的基本单位。Trident SpoutTrident spout与类似于Storm spout,附加选项使用Trident的功能。实际上,我们仍然可以使用IRichSpout,我们在Storm拓扑中使用它,但它本质上是非事务性的,我们将无法使用Trident提供的优点。具有使用Trident的特征的所有功能的基本spout是“ITridentSpout”。它支持事务和不透明的事务语义。其他的spouts是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。除了这些通用spouts,Trident有许多样品实施trident spout。其中之一是FeederBatchSpout输出,我们可以使用它发送trident tuples的命名列表,而不必担心批处理,并行性等。 FeederBatchSpout创建和数据馈送可以如下所示完成 - TridentTopology topology = new TridentTopology();FeederBatchSpout testSpout = new FeederBatchSpout( ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));topology.newStream("fixed-batch-spout", testSpout)testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));Trident操作Trident依靠“Trident操作”来处理trident tuples的输入流。Trident API具有多个内置操作来处理简单到复杂的流处理。这些操作的范围从简单验证到复杂的trident tuples分组和聚合。让我们经历最重要和经常使用的操作。过滤过滤器是用于执行输入验证任务的对象。Trident过滤器获取trident tuples字段的子集作为输入,并根据是否满足某些条件返回真或假。如果返回true,则该元组保存在输出流中;否则,从流中移除元组。过滤器将基本上继承自BaseFilter类并实现isKeep方法。这里是一个滤波器操作的示例实现 - public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(1) % 2 == 0; }}input[1, 2][1, 3][1, 4]output[1, 2][1, 4]可以使用“each”方法在拓扑中调用过滤器功能。“Fields”类可以用于指定输入(trident tuple的子集)。示例代码如下 - TridentTopology topology = new TridentTopology();topology.newStream("spout", spout).each(new Fields("a", "b"), new MyFilter())函数 函数是用于对单个trident tuple执行简单操作的对象。它需要一个trident tuple字段的子集,并发出零个或多个新的trident tuple字段。 函数基本上从BaseFunction类继承并实现execute方法。下面给出了一个示例实现:public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int a = tuple.getInteger(0); int b = tuple.getInteger(1); collector.emit(new Values(a + b)); }}input[1, 2][1, 3][1, 4]output[1, 2, 3][1, 3, 4][1, 4, 5]与过滤操作类似,可以使用每个方法在拓扑中调用函数操作。示例代码如下 - TridentTopology topology = new TridentTopology();topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));聚合聚合是用于对输入批处理或分区或流执行聚合操作的对象。Trident有三种类型的聚合。他们如下 - aggregate -单独聚合每批trident tuple。在聚合过程期间,首先使用全局分组将元组重新分区,以将同一批次的所有分区组合到单个分区中。 partitionAggregate -聚合每个分区,而不是整个trident tuple。分区集合的输出完全替换输入元组。分区集合的输出包含单个字段元组。 persistentaggregate -聚合所有批次中的所有trident tuple,并将结果存储在内存或数据库中。TridentTopology topology = new TridentTopology();// aggregate operationtopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .aggregate(new Count(), new Fields(“count”))// partitionAggregate operationtopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .partitionAggregate(new Count(), new Fields(“count"))// persistentAggregate - saving the count to memorytopology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口创建聚合操作。上面例子中使用的“计数”聚合器是内置聚合器之一,它使用“CombinerAggregator”实现,实现如下 - public class Count implements CombinerAggregator<Long> { @Override public Long init(TridentTuple tuple) { return 1L; } @Override public Long

[1] [2] [3]  下一页


Apache Storm Trident