当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 事务性拓扑

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:44

atchNew,和 emitPartitionBatch。对于 emitPartitionBatchNew,从 Storm 接收分区参数,该参数决定应该从哪个分区读取批次。在这个方法中,决定获取哪些 tweets,生成相应的元数据对象,调用 emitPartitionBatch,返回元数据对象,并且元数据对象会在方法返回时立即保存到 zookeeper。 Storm 会为每一个分区发送相同的事务 ID,表示一个事务贯穿了所有数据分区。通过emitPartitionBatch 读取分区中的 tweets,并向拓扑分发批次。如果批次处理失败了,Storm 将会调用 emitPartitionBatch 利用保存下来的元数据重复这个批次。NOTE: 完整的源码请见:https://github.com/storm-book/examples-ch08-transactional-topologies(译者注:原文如此,实际上这个仓库里什么也没有)模糊的事务性拓扑到目前为止,你可能已经学会了如何让拥有相同事务 ID 的批次在出错时重播。但是在有些场景下这样做可能就不太合适了。然后会发生什么呢?事实证明,你仍然可以实现在语义上精确的事务,不过这需要更多的开发工作,你要记录由 Storm 重复的事务之前的状态。既然能在不同时刻为相同的事务 ID 得到不同的元组,你就需要把事务重置到之前的状态,并从那里继续。比如说,如果你为收到的所有 tweets 计数,你已数到5,而最后的事务 ID 是321,这时你多数了8个。你要维护以下三个值 ——previousCount=5,currentCount=13,以及lastTransactionId=321。假设事物 ID321 又发分了一次,而你又得到了4个元组,而不是之前的8个,提交器会探测到这是相同的事务 ID,它将会把结果重置到 previousCount 的值5,并在此基础上加4,然后更新 currentCount 为9。另外,在之前的一个事务被取消时,每个并行处理的事务都要被取消。这是为了确保你没有丢失任何数据。你的 spout 可以实现 IOpaquePartitionedTransactionalSpout,而且正如你看到的,协调器和分发器也很简单。01public static class TweetsOpaquePartitionedTransactionalSpoutCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator {02 @Override03 public boolean isReady() {04 return true;05 }06}0708public static class TweetsOpaquePartitionedTransactionalSpoutEmitter09 implements IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {10 PartitionedRQ rq = new PartitionedRQ();1112 @Override13 public TransactionMetadata emitPartitionBatch(TransactionAttempt tx,14 BatchOutputCollector collector, int partion,15 TransactionMetadata lastPartitonMeta) {16 long nextRead;1718 if(lastPartitionMeta == null) {19 nextRead = rq.getNextRead(partition);20 }else{21 nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;22 rq.setNextRead(partition, nextRead);//移动游标23 }2425 long quantity = rq.getAvailabletoRead(partition, nextRead);26 quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;27 TransactionMetadata metadata = new TransactionMetadata(nextRead, (int)quantity);28 emitMessages(tx, collector, partition, metadata);29 return metadata;30 }3132 private void emitMessage(TransactionAttempt tx, BatchOutputCollector collector,33 int partition, TransactionMetadata partitionMeta) {34 if(partitionMeta.quantity <= 0){return;}3536 List<String> messages = rq.getMessages(partition, partitionMeta.from, partitionMeta.quantity);37 long tweetId = partitionMeta.from;38 for(String msg : messages) {39 collector.emit(new Values(tx, ""+tweetId, msg));40 tweetId++;41 }42 }4344 @Override45 public int numPartitions() {46 return 4;47 }4849 @Override50 public void close() {}51} 最有趣的方法是 emitPartitionBatch,它获取之前提交的元数据。你要用它生成批次。这个批次不需要与之前的那个一致,你可能根本无法创建完全一样的批次。剩余的工作由提交器 bolts借助之前的状态完成。

上一页  [1] [2] [3] [4] [5] 


Storm 事务性拓扑