当前位置:K88软件开发文章中心大数据Apache Kafka → 文章内容

Apache Kafka 快速指南

减小字体 增大字体 作者:佚名  来源:网上搜集  发布时间:2019-1-26 10:04:58

fset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } }}编译 - 可以使用以下命令编译应用程序。javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java执行 - 可以使用以下命令执行应用程序java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>输入 - 打开生成器CLI并向主题发送一些消息。 你可以把smple输入为\'Hello Consumer\'。输出 - 以下是输出。Subscribed to topic Hello-Kafkaoffset = 3, key = null, value = Hello ConsumerApache Kafka - 用户组示例消费群是多线程或多机器的Kafka主题。消费者群体消费者可以使用相同的 group.id 加入群组一个组的最大并行度是组中的消费者数量←不是分区。Kafka将主题的分区分配给组中的使用者,以便每个分区仅由组中的一个使用者使用。Kafka保证消息只能被组中的一个消费者读取。消费者可以按照消息存储在日志中的顺序查看消息。重新平衡消费者添加更多进程/线程将导致Kafka重新平衡。 如果任何消费者或代理无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。 在此重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程。import java.util.Properties;import java.util.Arrays;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.ConsumerRecord;public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer <topic> <groupname>"); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }汇编javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java执行>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group在这里,我们为两个消费者创建了一个示例组名称为 my-group 。 同样,您可以在组中创建您的组和消费者数量。输入打开生产者CLI并发送一些消息 - Test consumer group 01Test consumer group 02第一个过程的输出Subscribed to topic Hello-kafkaoffset = 3, key = null, value = Test consumer group 01第二个过程的输出Subscribed to topic Hello-kafkaoffset = 3, key = null, value = Test consumer group 02现在希望你能通过使用Java客户端演示了解SimpleConsumer和ConsumeGroup。 现在,您了解如何使用Java客户端发送和接收消息。 让我们在下一章继续Kafka与大数据技术的集成。Apache Kafka - 与Storm集成在本章中,我们将学习如何将Kafka与Apache Storm集成。关于StormStorm最初由Nathan Marz和BackType的团队创建。 在短时间内,Apache Storm成为分布式实时处理系统的标准,允许您处理大量数据。 Storm是非常快的,并且一个基准时钟为每个节点每秒处理超过一百万个元组。 Apache Storm持续运行,从配置的源(Spouts)消耗数据,并将数据传递到处理管道(Bolts)。 联合,spout和Bolt做一个拓扑。与Storm集成Kafka和Storm自然互补,它们强大的合作能够实现快速移动的大数据的实时流分析。 Kafka和Storm集成是为了使开发人员更容易地从Storm拓扑获取和发布数据流。概念流spout是流的源。 例如,一个spout可以从Kafka Topic读取元组并将它们作为流发送。 Bolt消耗输入流,处理并可能发射新的流。 Bolt可以从运行函数,过滤元组,执行流聚合,流连接,与数据库交谈等等做任何事情。 Storm拓扑中的每个节点并行执行。 拓扑无限运行,直到终止它。 Storm将自动重新分配任何失败的任务。 此外,Storm保证没有数据丢失,即使机器停机和消息被丢弃。让我们详细了解Kafka-Storm集成API。 有三个主要类集成Kafka与Storm。 他们如下 - 经纪人 - ZkHosts&amp; 静态主机BrokerHosts是一个接口,ZkHosts和StaticHosts是它的两个主要实现。 ZkHosts用于通过在ZooKeeper中维护细节来动态跟踪Kafka代理,而StaticHosts用于手动/静态设置Kafka代理及其详细信息。 ZkHosts是访问Kafka代理的简单快捷的方式。ZkHosts的签名如下 - public ZkHosts(String brokerZkStr, String brokerZkPath)public ZkHosts(String brokerZkStr)其中brokerZkStr是ZooKeeper主机,brokerZkPath是ZooKeeper路径以维护Kafka代理详细信息。KafkaConfig API此API用于定义Kafka集群的配置设置。 Kafka Con-fig的签名定义如下public KafkaConfig(BrokerHosts hosts, string topic)主机 - BrokerHosts可以是ZkHosts / StaticHosts。主题 - 主题名称。SpoutConfig APISpoutconfig是KafkaConfig的扩展,支持额外的ZooKeeper信息

上一页  [1] [2] [3] [4] [5] [6] [7] [8]  下一页


Apache Kafka 快速指南