当前位置:K88软件开发文章中心编程语言SQLStorm → 文章内容

Storm 事务性拓扑

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-18 8:37:44

n getNextWrite() - current;13 }1415 public long getNextRead() {16 String sNextRead = jedis.get(NEXT_READ);17 if(sNextRead == null) {18 return 1;19 }20 return Long.valueOf(sNextRead);21 }2223 public long getNextWrite() {24 return Long.valueOf(jedis.get(NEXT_WRITE));25 }2627 public void close() {28 jedis.disconnect();29 }3031 public void setNextRead(long nextRead) {32 jedis.set(NEXT_READ, ""+nextRead);33 }3435 public List<String> getMessages(long from, int quantity) {36 String[] keys = new String[quantity];37 for (int i = 0; i < quantity; i++) {38 keys[i] = ""+(i+from);39 }40 return jedis.mget(keys);41 }42} 仔细阅读每个方法,确保自己理解了它们的用处。协调者 Coordinator下面是本例的协调者实现。 01public static class TweetsTransactionalSpoutCoordinator implements ITransactionalSpout.Coordinator<TransactionMetadata> {02 TransactionMetadata lastTransactionMetadata;03 RQ rq = new RQ();04 long nextRead = 0;0506 public TweetsTransactionalSpoutCoordinator() {07 nextRead = rq.getNextRead();08 }0910 @Override11 public TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {12 long quantity = rq.getAvailableToRead(nextRead);13 quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;14 TransactionMetadata ret = new TransactionMetadata(nextRead, (int)quantity);15 nextRead += quantity;16 return ret;17 }1819 @Override20 public boolean isReady() {21 return rq.getAvailableToRead(nextRead) > 0;22 }2324 @Override25 public void close() {26 rq.close();27 }28} 值得一提的是,在整个拓扑中只会有一个提交者实例。创建提交者实例时,它会从 redis 读取一个从1开始的序列号,这个序列号标识要读取的 tweet 下一条。第一个方法是 isReady。在 initializeTransaction 之前调用它确认数据源已就绪并可读取。此方法应当相应的返回 true 或 false。在此例中,读取 tweets 数量并与已读数量比较。它们之间的不同就在于可读 tweets 数。如果它大于0,就意味着还有 tweets 未读。最后,执行 initializeTransaction。正如你看到的,它接收 txid 和 prevMetadata作为参数。第一个参数是 Storm 生成的事务 ID,作为批次的惟一性标识。prevMetadata 是协调器生成的前一个事务元数据对象。在这个例子中,首先确认有多少 tweets 可读。只要确认了这一点,就创建一个TransactionMetadata 对象,标识读取的第一个 tweet(译者注:对象属性 from ),以及读取的 tweets 数量(译者注:对象属性 quantity )。元数据对象一经返回,Storm 把它跟 txid 一起保存在 zookeeper。这样就确保了一旦发生故障,Storm 可以利用分发器(译者注:Emitter,见下文)重新发送批次。Emitter创建事务性 spout 的最后一步是实现分发器(Emitter)。实现如下:01public static class TweetsTransactionalSpoutEmitter implements ITransactionalSpout.Emitter<TransactionMetadata> {0203</pre>04<pre> RQ rq = new RQ();</pre>05<pre> public TweetsTransactionalSpoutEmitter() {}</pre>06<pre> @Override07 public void emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta, BatchOutputCollector collector) {08 rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);09 List<String> messages = rq.getMessages(coordinatorMeta.from, <span style="font-family: Georgia, 'Times New Roman', 'Bitstream Charter', Times, serif; font-size: 13px; line-height: 19px;">coordinatorMeta.quantity);10</span> long tweetId = coordinatorMeta.from;11 for (String message : messages) {12 collector.emit(new Values(tx, ""+tweetId, message));13 tweetId++;14 }15 }1617 @Override18 public void cleanupBefore(BigInteger txid) {}1920 @Override21 public void close() {22 rq.close();23 }</pre>24<pre>25} 分发器从数据源读取数据并从数据流组发送数据。分发器应当问题能够为相同的事务 id 和事务元数据发送相同的批次。这样,如果在处理批次的过程中发生了故障,Storm 就能够利用分发器重复相同的事务 id 和事务元数据,并确保批次已经重复过了。Storm 会在TransactionAttempt 对象里为尝试次数增加计数(译者注:attempt id )。这样就能知道批次已经重复过了。在这里 emitBatch 是个重要方法。在这个方法中,使用传入的元数据对象从 redis 得到tweets,同时增加 redis 维持的已读 tweets 数。当然它还会把读到的 tweets 分发到拓扑。Bolts首先看一下这个拓扑中的标准 bolt:01public class UserSplitterBolt implements IBasicBolt{02 private static final long serialVersionUID = 1L;0304 @Override05 public void declareOutputFields(OutputFieldsDeclarer declarer) {06 declarer.declareStream("users", new Fields("txid","tweet_id","user"));07 }0809 @Override10 public Map<String, Object> getComponentConfiguration() {11 return null;12 }1314 @Override15 public void prepare(Map stormConf, TopologyContext context) {}1617 @Override18 public void execute(Tuple input, BasicOutputCollector collector) {19 String tweet = input.getStringByField("tweet");20 String tweetId = input.getStringByField("tweet_id");21 StringTokenizer strTok = new StringTokenizer(tweet, " ");22 HashSet<String> us

上一页  [1] [2] [3] [4] [5]  下一页


Storm 事务性拓扑