当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 事务性拓扑

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:44

ong value = map.get(key);21 if(value == null){value = (long)0;}22 value += count;23 map.put(key,value);24 }2526 @Override27 public void execute(Tuple tuple) {28 String origin = tuple. getSourceComponent();29 if("sers-splitter".equals(origin)) {30 String user = tuple.getStringByField("user");31 count(users, user, 1);32 } else if("hashtag-splitter".equals(origin)) {33 String hashtag = tuple.getStringByField("hashtag");34 count(hashtags, hashtag, 1);35 } else if("user-hashtag-merger".quals(origin)) {36 String hashtag = tuple.getStringByField("hashtag");37 String user = tuple.getStringByField("user");38 String key = user + ":" + hashtag;39 Integer count = tuple.getIntegerByField("count");40 count(usersHashtags, key, count);41 }42 }4344 @Override45 public void finishBatch() {46 String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);47 String currentTransaction = ""+id.getTransactionId();4849 if(currentTransaction.equals(lastCommitedTransaction)) {return;}5051 Transaction multi = jedis.multi();5253 multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);5455 Set<String> keys = hashtags.keySet();56 for (String hashtag : keys) {57 Long count = hashtags.get(hashtag);58 multi.hincrBy("hashtags", hashtag, count);59 }6061 keys = users.keySet();62 for (String user : keys) {63 Long count =users.get(user);64 multi.hincrBy("users",user,count);65 }6667 keys = usersHashtags.keySet();68 for (String key : keys) {69 Long count = usersHashtags.get(key);70 multi.hincrBy("users_hashtags", key, count);71 }7273 multi.exec();74 }7576 @Override77 public void declareOutputFields(OutputFieldsDeclarer declarer) {}78} 这个实现很简单,但是在 finishBatch 有一个细节。1...2multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);3... 在这里向数据库保存提交的最后一个事务 ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务 ID,并在提交前检查。分区的事务 Spouts对一个 spout 来说,从一个分区集合中读取批次是很普通的。接着这个例子,你可能有很多redis 数据库,而 tweets 可能会分别保存在这些 redis 数据库里。通过实现IPartitionedTransactionalSpout,Storm 提供了一些工具用来管理每个分区的状态并保证重播的能力。 下面我们修改 TweetsTransactionalSpout,使它可以处理数据分区。 首先,继承 BasePartitionedTransactionalSpout,它实现了IPartitionedTransactionalSpout。1public class TweetsPartitionedTransactionalSpout extends2 BasePartitionedTransactionalSpout<TransactionMetadata> {3...4} 然后告诉 Storm 谁是你的协调器。01public static class TweetsPartitionedTransactionalCoordinator implements Coordinator {02 @Override03 public int numPartitions() {04 return 4;05 }0607 @Override08 public boolean isReady() {09 return true;10 }1112 @Override13 public void close() {}14} 在这个例子里,协调器很简单。numPartitions 方法,告诉 Storm 一共有多少分区。而且你要注意,不要返回任何元数据。对于 IPartitionedTransactionalSpout,元数据由分发器直接管理。 下面是分发器的实现:01public static class TweetsPartitionedTransactionalEmitter02 implements Emitter<TransactionMetadata> {03 PartitionedRQ rq = new ParttionedRQ();0405 @Override06 public TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx,07 BatchOutputCollector collector, int partition,08 TransactionMetadata lastPartitioonMeta) {09 long nextRead;1011 if(lastPartitionMeta == null) {12 nextRead = rq.getNextRead(partition);13 }else{14 nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;15 rq.setNextRead(partition, nextRead); //移动游标16 }1718 long quantity = rq.getAvailableToRead(partition, nextRead);19 quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;20 TransactionMetadata metadata = new TransactionMetadata(nextRead, (int)quantity);2122 emitPartitionBatch(tx, collector, partition, metadata);23 return metadata;24 }2526 @Override27 public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector,28 int partition, TransactionMetadata partitionMeta) {29 if(partitionMeta.quantity <= 0){30 return;31 }3233 List<String> messages = rq.getMessages(partition, partitionMeta.from,34 partitionMeta.quantity);3536 long tweetId = partitionMeta.from;37 for (String msg : messages) {38 collector.emit(new Values(tx, ""+tweetId, msg));39 tweetId++;40 }41 }4243 @Override44 public void close() {}45} 这里有两个重要的方法,emitPartitionB

上一页  [1] [2] [3] [4] [5]  下一页


Storm 事务性拓扑