当前位置:K88软件开发文章中心大数据Apache Kafka → 文章内容

Apache Kafka 整合 Storm

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:04:52

ic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple input) { String str = input.getString(0); if(!counters.containsKey(str)){ counters.put(str, 1); }else { Integer c = counters.get(str) +1; counters.put(str, c); } collector.ack(input); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counters.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; }}提交拓扑Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而容易的方法来创建复杂的拓扑。 TopologyBuilder类具有设置spout(setSpout)和设置bolt(setBolt)的方法。 最后,TopologyBuilder有createTopology来创建to-pology。 shuffleGrouping和fieldsGrouping方法有助于为喷头和Bolt设置流分组。本地集群 - 为了开发目的,我们可以使用 LocalCluster 对象创建本地集群,然后使用 LocalCluster的 submitTopology 类。KafkaStormSample.javaimport backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import java.util.ArrayList;import java.util.List;import java.util.UUID;import backtype.storm.spout.SchemeAsMultiScheme;import storm.kafka.trident.GlobalPartitionInformation;import storm.kafka.ZkHosts;import storm.kafka.Broker;import storm.kafka.StaticHosts;import storm.kafka.BrokerHosts;import storm.kafka.SpoutConfig;import storm.kafka.KafkaConfig;import storm.kafka.KafkaSpout;import storm.kafka.StringScheme;public class KafkaStormSample { public static void main(String[] args) throws Exception{ Config config = new Config(); config.setDebug(true); config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); String zkConnString = "localhost:2181"; String topic = "my-first-topic"; BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic, UUID.randomUUID().toString()); kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.forceFromStart = true; kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig)); builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout"); builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("KafkaStormSample", config, builder.create-Topology()); Thread.sleep(10000); cluster.shutdown(); }}在移动编译之前,Kakfa-Storm集成需要策展人ZooKeeper客户端java库。 策展人版本2.9.1支持Apache Storm 0.9.5版(我们在本教程中使用)。 下载下面指定的jar文件并将其放在java类路径中。curator-client-2.9.1.jarcurator-framework-2.9.1.jar在包括依赖文件之后,使用以下命令编译程序,javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java执行启动Kafka Producer CLI(在上一章节中解释),创建一个名为 my-first-topic 的新主题,并提供一些样本消息,如下所示 - hellokafkastormsparktest messageanother test message现在使用以下命令执行应用程序 - java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*":. KafkaStormSample此应用程序的示例输出如下所示 - storm : 1test : 2spark : 1another : 1kafka : 1hello : 1message : 2

上一页  [1] [2] 


Apache Kafka 整合 Storm