当前位置:K88软件开发文章中心大数据Apache Storm → 文章内容

Apache Storm在Twitter上的应用

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:06:00

nf.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; }} Hashtag计数器spout发出的hashtag将被转发到HashtagCounterBolt。这个bolt将处理所有的hashtags,并使用Java Map对象将每个hashtags及其计数保存在内存中。完整的程序代码如下。编码:HashtagCounterBolt.java import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; }}提交拓扑提交拓扑是主要应用程序。Twitter拓扑由TwitterSampleSpout,HashtagReaderBolt和HashtagCounterBolt组成。以下程序代码显示如何提交拓扑。编码:TwitterHashtagStorm.java import java.util.*;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); }}构建和运行应用程序完整的应用程序有四个Java代码。他们如下 - TwitterSampleSpout.java HashtagReaderBolt.java HashtagCounterBolt.java TwitterHashtagStorm.java 您可以使用以下命令编译应用程序 - javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java使用以下命令执行应用程序 - javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret><keyword1> <keyword2> … <keywordN>输出应用程序将打印当前可用的主题标签及其计数。输出应类似于以下内容 - Result: jazztastic : 1Result: foodie : 1Result: Redskins : 1Result: Recipe : 1Result: cook : 1Result: android : 1Result: food : 2Result: NoToxicHorseMeat : 1Result: Purrs4Peace : 1Result: livemusic : 1Result: VIPremium : 1Result: Frome : 1Result: SundayRoast : 1Result: Millennials : 1Result: HealthWithKier : 1Result: LPs30DaysofGratitude : 1Result: cooking : 1Result: gameinsight : 1Result: Countryfile : 1Result: androidgames : 1

上一页  [1] [2] 


Apache Storm在Twitter上的应用