当前位置:K88软件开发文章中心大数据Apache Kafka → 文章内容

Apache Kafka 快速指南

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:04:58

行 - 可以使用以下命令执行应用程序。java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name>输出Message sent successfullyTo check the above output open new terminal and type Consumer CLI command to receive messages.>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning12345678910简单消费者示例到目前为止,我们已经创建了一个发送消息到Kafka集群的生产者。 现在让我们创建一个消费者来消费Kafka集群的消息。 KafkaConsumer API用于消费来自Kafka集群的消息。 KafkaConsumer类的构造函数定义如下。public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs) configs - 返回消费者配置的地图。KafkaConsumer类具有下表中列出的以下重要方法。S.No方法和说明1 public java.util.Set< TopicPar- tition> assignment()获取由用户当前分配的分区集。2 public string subscription()订阅给定的主题列表以获取动态签名的分区。3 public void sub-scribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener)订阅给定的主题列表以获取动态签名的分区。4 public void unsubscribe()从给定的分区列表中取消订阅主题。5 public void sub-scribe(java.util.List< java.lang.String> topics)订阅给定的主题列表以获取动态签名的分区。 如果给定的主题列表为空,则将其视为与unsubscribe()相同。6 public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)参数模式以正则表达式的格式引用预订模式,而侦听器参数从预订模式获取通知。7 public void as-sign(java.util.List< TopicPartion> partitions)向客户手动分配分区列表。8 poll()使用预订/分配API之一获取指定的主题或分区的数据。 如果在轮询数据之前未预订主题,这将返回错误。9 public void commitSync()提交对主题和分区的所有子编制列表的最后一次poll()返回的提交偏移量。 相同的操作应用于commitAsyn()。10 public void seek(TopicPartition partition,long offset)获取消费者将在下一个poll()方法中使用的当前偏移值。11 public void resume()恢复暂停的分区。12 public void wakeup()唤醒消费者。ConsumerRecord APIConsumerRecord API用于从Kafka集群接收记录。 此API由主题名称,分区号(从中接收记录)和指向Kafka分区中的记录的偏移量组成。 ConsumerRecord类用于创建具有特定主题名称,分区计数和< key,value>的消费者记录。 对。 它有以下签名。public ConsumerRecord(string topic,int partition, long offset,K key, V value)主题 - 从Kafka集群接收的使用者记录的主题名称。分区 - 主题的分区。键 - 记录的键,如果没有键存在null将被返回。值 - 记录内容。ConsumerRecords APIConsumerRecords API充当ConsumerRecord的容器。 此API用于保存特定主题的每个分区的ConsumerRecord列表。 它的构造器定义如下。public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<Consumer-Record>K,V>>> records) TopicPartition - 返回特定主题的分区地图。记录 - ConsumerRecord的返回列表。ConsumerRecords类定义了以下方法。S.No方法和描述1 public int count()所有主题的记录数。2 public Set partitions()在此记录集中具有数据的分区集(如果没有返回数据,则该集为空)。3 public Iterator iterator()迭代器使您可以循环访问集合,获取或重新移动元素。4 public List records()获取给定分区的记录列表。配置设置Consumer客户端API主配置设置的配置设置如下所示 - S.No设置和说明1引导代理列表。2 group.id 将单个消费者分配给组。3 enable.auto.commit 如果值为true,则为偏移启用自动落实,否则不提交。4 auto.commit.interval.ms 返回更新的消耗偏移量写入ZooKeeper的频率。5 session.timeout.ms 表示Kafka在放弃和继续消费消息之前等待ZooKeeper响应请求(读取或写入)多少毫秒。SimpleConsumer应用程序生产者应用程序步骤在此保持不变。 首先,启动你的ZooKeeper和Kafka代理。 然后使用名为 SimpleCon-sumer.java 的Java类创建一个 SimpleConsumer 应用程序,并键入以下代码。import java.util.Properties;import java.util.Arrays;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.ConsumerRecord;public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)) //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("of

上一页  [1] [2] [3] [4] [5] [6] [7] [8]  下一页


Apache Kafka 快速指南