当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 事务性拓扑

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:44

由 小路依依 创建, 最后一次修改 2016-08-12 事务性拓扑正如书中之前所提到的,使用 Storm 编程,可以通过调用 ack 和 fail 方法来确保一条消息的处理成功或失败。不过当元组被重发时,会发生什么呢?你又该如何砍不会重复计算? Storm0.7.0 实现了一个新特性——事务性拓扑,这一特性使消息在语义上确保你可以安全的方式重发消息,并保证它们只会被处理一次。在不支持事务性拓扑的情况下,你无法在准确性,可扩展性,以空错性上得到保证的前提下完成计算。NOTE:事务性拓扑是一个构建于标准 Storm spout 和 bolt 之上的抽象概念。设计在事务性拓扑中,Storm 以并行和顺序处理混合的方式处理元组。spout 并行分批创建供 bolt 处理的元组(译者注:下文将这种分批创建、分批处理的元组称做批次)。其中一些 bolt 作为提交者以严格有序的方式提交处理过的批次。这意味着如果你有每批五个元组的两个批次,将有两个元组被 bolt 并行处理,但是直到提交者成功提交了第一个元组之后,才会提交第二个元组。NOTE: 使用事务性拓扑时,数据源要能够重发批次,有时候甚至要重复多次。因此确认你的数据源——你连接到的那个 spout ——具备这个能力。 这个过程可以被描述为两个阶段: 处理阶段 纯并行阶段,许多批次同时处理。 提交阶段 严格有序阶段,直到批次一成功提交之后,才会提交批次二。 这两个阶段合起来称为一个 Storm 事务。 NOTE: Storm 使用 zookeeper 储存事务元数据,默认情况下就是拓扑使用的那个 zookeeper。你可以修改以下两个配置参数键指定其它的 zookeeper——transactional.zookeeper.servers 和transactional.zookeeper.port。事务实践下面我们要创建一个 Twitter 分析工具来了解事务的工作方式。我们从一个 Redis 数据库读取tweets,通过几个 bolt 处理它们,最后把结果保存在另一个 Redis 数据库的列表中。处理结果就是所有话题和它们的在 tweets 中出现的次数列表,所有用户和他们在 tweets 中出现的次数列表,还有一个包含发起话题和频率的用户列表。 这个工具的拓扑图。 图 拓扑概览正如你看到的,TweetsTransactionalSpout 会连接你的 tweet 数据库并向拓扑分发批次。UserSplitterBolt 和 HashTagSplitterBolt 两个 bolt,从 spout 接收元组。UserSplitterBolt 解析 tweets 并查找用户——以 @ 开头的单词——然后把这些单词分发到名为 users 的自定义数据流组。HashtagSplitterBolt 从 tweet 查找 # 开头的单词,并把它们分发到名为 hashtags 的自定义数据流组。第三个 bolt,UserHashtagJoinBolt,接收前面提到的两个数据流组,并计算具名用户的一条 tweet 内的话题数量。为了计数并分发计算结果,这是个 BaseBatchBolt(稍后有更多介绍)。最后一个 bolt——RedisCommitterBolt—— 接收以上三个 bolt 的数据流组。它为每样东西计数,并在对一个批次完成处理时,把所有结果保存到 redis。这是一种特殊的 bolt,叫做提交者,在本章后面做更多讲解。用 TransactionalTopologyBuilder 构建拓扑,代码如下:01TransactionalTopologyBuilder builder=02 new TransactionalTopologyBuilder("test", "spout", new TweetsTransactionalSpout());0304builder.setBolt("users-splitter", new UserSplitterBolt(), 4).shuffleGrouping("spout");05buildeer.setBolt("hashtag-splitter", new HashtagSplitterBolt(), 4).shuffleGrouping("spout");0607builder.setBolt("users-hashtag-manager", new UserHashtagJoinBolt(), r)08 .fieldsGrouping("users-splitter", "users", new Fields("tweet_id"))09 .fieldsGrouping("hashtag-splitter", "hashtags", new Fields("tweet_id"));1011builder.setBolt("redis-commiter", new RedisCommiterBolt())12 .globalGrouping("users-splitter", "users")13 .globalGrouping("hashtag-splitter", "hashtags")14 .globalGrouping("user-hashtag-merger"); 接下来就看看如何在一个事务性拓扑中实现 spout。Spout一个事务性拓扑的 spout 与标准 spout 完全不同。1public class TweetsTransactionalSpout extends BaseTransactionalSpout<TransactionMetadata>{ 正如你在这个类定义中看到的,TweetsTransactionalSpout 继承了带范型的BaseTransactionalSpout。指定的范型类型的对象是事务元数据集合。它将在后面的代码中用于从数据源分发批次。在这个例子中,TransactionMetadata 定义如下:01public class TransactionMetadata implements Serializable {02 private static final long serialVersionUID = 1L;03 long from;04 int quantity;0506 public TransactionMetadata(long from, int quantity) {07 this.from = from;08 this.quantity = quantity;09 }10} 该类的对象维护着两个属性 from 和 quantity,它们用来生成批次。spout 的最后需要实现下面的三个方法:01@Override02public ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(03 Map conf, TopologyContext context) {04 return new TweetsTransactionalSpoutCoordinator();05}0607@Override08public backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata> getEmitter(Map conf, TopologyContext contest) {09 return new TweetsTransactionalSpoutEmitter();10}1112@Override13public void declareOutputFields(OuputFieldsDeclarer declarer) {14 declarer.declare(new Fields("txid", "tweet_id", "tweet"));15} getCoordinator 方法,告诉 Storm 用来协调生成批次的类。getEmitter,负责读取批次并把它们分发到拓扑中的数据流组。最后,就像之前做过的,需要声明要分发的域。RQ 类为了让例子简单点,我们决定用一个类封装所有对 Redis 的操作。01public class RQ {02 public static final String NEXT_READ = "NEXT_READ";03 public static final String NEXT_WRITE = "NEXT_WRITE";0405 Jedis jedis;0607 public RQ() {08 jedis = new Jedis("localhost");09 }1011 public long getavailableToRead(long current) {12 retur

[1] [2] [3] [4] [5]  下一页


Storm 事务性拓扑