当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 事务性拓扑

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:44

ers = new HashSet<String>();2324 while(strTok.hasMoreTokens()) {25 String user = strTok.nextToken();2627 //确保这是个真实的用户,并且在这个tweet中没有重复28 if(user.startsWith("@") && !users.contains(user)) {29 collector.emit("users", new Values(tx, tweetId, user));30 users.add(user);31 }32 }33 }3435 @Override36 public void cleanup(){}37} 正如本章前面提到的,UserSplitterBolt 接收元组,解析 tweet 文本,分发 @ 开头的单词————tweeter 用户。HashtagSplitterBolt 的实现也非常相似。01public class HashtagSplitterBolt implements IBasicBolt{02 private static final long serialVersionUID = 1L;0304 @Override05 public void declareOutputFields(OutputFieldsDeclarer declarer) {06 declarer.declareStream("hashtags", new Fields("txid","tweet_id","hashtag"));07 }0809 @Override10 public Map<String, Object> getComponentConfiguration() {11 return null;12 }1314 @Override15 public void prepare(Map stormConf, TopologyContext context) {}1617 @Oerride18 public void execute(Tuple input, BasicOutputCollector collector) {19 String tweet = input.getStringByField("tweet");20 String tweetId = input.getStringByField("tweet_id");21 StringTokenizer strTok = new StringTokenizer(tweet, " ");22 TransactionAttempt tx = (TransactionAttempt)input.getValueByField("txid");23 HashSet<String> words = new HashSet<String>();2425 while(strTok.hasMoreTokens()) {26 String word = strTok.nextToken();2728 if(word.startsWith("#") && !words.contains(word)){29 collector.emit("hashtags", new Values(tx, tweetId, word));30 words.add(word);31 }32 }33 }3435 @Override36 public void cleanup(){}37} 现在看看 UserHashTagJoinBolt 的实现。首先要注意的是它是一个 BaseBatchBolt。这意味着,execute 方法会操作接收到的元组,但是不会分发新的元组。批次完成时,Storm 会调用 finishBatch 方法。01public void execute(Tuple tuple) {02 String source = tuple.getSourceStreamId();03 String tweetId = tuple.getStringByField("tweet_id");0405 if("hashtags".equals(source)) {06 String hashtag = tuple.getStringByField("hashtag");07 add(tweetHashtags, tweetId, hashtag);08 } else if("users".equals(source)) {09 String user = tuple.getStringByField("user");10 add(userTweets, user, tweetId);11 }12} 既然要结合 tweet 中提到的用户为出现的所有话题计数,就需要加入前面的 bolts 创建的两个数据流组。这件事要以批次为单位进程,在批次处理完成时,调用 finishBatch 方法。01@Override02public void finishBatch() {03 for(String user:userTweets.keySet()){04 Set<String> tweets = getUserTweets(user);05 HashMap<String, Integer> hashtagsCounter = new HashMap<String, Integer>();06 for(String tweet:tweets){07 Set<String> hashtags=getTweetHashtags(tweet);08 if(hashtags!=null){09 for(String hashtag:hashtags){10 Integer count=hashtagsCounter.get(hashtag);11 if(count==null){count=0;}12 count++;13 hashtagsCounter.put(hashtag,count);14 }15 }16 }17 for(String hashtag:hashtagsCounter.keySet()){18 int count=hashtagsCounter.get(hashtag);19 collector.emit(new Values(id,user,hashtag,count));20 }21 }22} 这个方法计算每对用户-话题出现的次数,并为之生成和分发元组。你可以在 GitHub 上找到并下载完整代码。(译者注:https://github.com/storm-book/examples-ch08-transactional-topologies 这个仓库里没有代码,谁知道哪里有代码麻烦说一声。)提交者 bolts我们已经学习了,批次通过协调器和分发器怎样在拓扑中传递。在拓扑中,这些批次中的元组以并行的,没有特定次序的方式处理。协调者 bolts 是一类特殊的批处理 bolts,它们实现了 IComh mitter 或者通过TransactionalTopologyBuilder 调用 setCommiterBolt 设置了提交者 bolt。它们与其它的批处理 bolts 最大的不同在于,提交者 bolts的finishBatch 方法在提交就绪时执行。这一点发生在之前所有事务都已成功提交之后。另外,finishBatch 方法是顺序执行的。因此如果同时有事务 ID1 和事务 ID2 两个事务同时执行,只有在 ID1 没有任何差错的执行了 finishBatch 方法之后,ID2 才会执行该方法。下面是这个类的实现01public class RedisCommiterCommiterBolt extends BaseTransactionalBolt implements ICommitter {02 public static final String LAST_COMMITED_TRANSACTION_FIELD = "LAST_COMMIT";03 TransactionAttempt id;04 BatchOutputCollector collector;05 Jedis jedis;0607 @Override08 public void prepare(Map conf, TopologyContext context,09 BatchOutputCollector collector, TransactionAttempt id) {10 this.id = id;11 this.collector = collector;12 this.jedis = new Jedis("localhost");13 }1415 HashMap<String, Long> hashtags = new HashMap<String,Long>();16 HashMap<String, Long> users = new HashMap<String, Long>();17 HashMap<String, Long> usersHashtags = new HashMap<String, Long>();1819 private void count(HashMap<String, Long> map, String key, int count) {20 L

上一页  [1] [2] [3] [4] [5]  下一页


Storm 事务性拓扑