简介
Kafka是一个分布式流处理平台。Kafka于2009年源自Linkedin,随后于2011年初开源,并于2012年10月23由Apache Incubator孵化出站。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。
流处理平台三个关键功能
- 发布和订阅记录流,类似于消息队列或企业消息传递系统。
- 以容错持久的方式存储记录流。
- 处理记录发生的流。
Kafka通常用于两大类应用
- 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
- 构建实时流应用程序,用于转换或相应数据流
几个基本概念
- Kafka作为一个集群运行在一台或多台可以跨越多个数据中心的服务器上。
- Kafka集群在称为主题的类别中存储记录流。
- 每个记录由一个键,一个值和一个时间戳组成。
Kafka 的架构
Kafka架构的主要术语包括Topic、Record和Broker。Topic由Record组成,Record持有不同的信息,而Broker则负责复制消息。
四个核心 API
- 生产者API:支持应用发布Record流。
- 消费者API:支持应用程序订阅Topic和处理Record流。
- Stream API:将输入流转换为输出流,并产生结果。
- Connector API:执行可重用的生产者和消费者API,可将Topic链接到现有应用程序。
安装及使用
基于Unix平台上使用
bin/
,脚本扩展名为.sh
。WIndows平台上使用
bin\windows\
,并且脚本扩展名为.bat
。以下命令均在Windows平台执行。
第1步:下载代码
下载 2.3.0版本并解压它。Windows平台直接解压。
1 | >cd kafka_2.12-2.3.0 |
第2步:启动服务器
Kafka使用ZooKeeper,首先启动ZooKeeper服务器,使用Kafka打包在一起的便捷脚本使用单节点的ZooKeeper实例。
1 | >bin\windows\zookeeper-server-start.bat config\zookeeper.properties |
ZooKeeper成功启动,并绑定到端口2181
。该端口是ZooKeeper的默认端口,可以在config\zookeeper.properties
中修改clientPort
来修改监听端口。
启动Kafka服务器:
1 | >bin\windows\kafka-server-start.bat config\server.properties |
windows环境下启动命令中的配置文件路径
.properties
需要..\..\xx.properties
。
第3步:创建一个主题
创建一个名为“HelloWord”的主题:
1 | >bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HelloWord |
通过运行list topic
命令查询创建的主题:
1 | >bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 |
或者也可以将代理配置设置为发布不存在的主题是自动创建主题。
第4步:启动一个生产者并发送消息
1 | >bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic HelloWord |
第5步:启动一个消费者并接收消息
1 | >bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic HelloWord --from-beginning |
Kafka 集群配置
- 配置 Kafka 的 Zookeeper 地址
zookeeper.connect
。Zookeeper集群可通过,
分开。
192.168.1.100
和 192.168.1.101
Zookeeper 配置:
1 | ############################# Zookeeper ############################# |
- 修改
broker.id
,broker.id
在集群中必须是唯一的。
192.168.1.100
配置 :
1 | # The id of the broker. This must be set to a unique integer for each broker. |
192.168.1.101
配置:
1 | # The id of the broker. This must be set to a unique integer for each broker. |
listeners
写入本机IP,集群中完成节点间通讯使用。
192.168.1.100
配置:
1 | # The address the socket server listens on. It will get the value returned from |
192.168.1.101
配置:
1 | # The address the socket server listens on. It will get the value returned from |
- 查询 Kafka 集群节点部署情况。
1 | zookeeper-shell.sh 192.168.1.200:2181 |
ls /brokers/ids
查询注册了 zookeeper
节点的 broker.id
。
get /controller
查询 leader,master 节点。
异常及处理
- 启动Kafka服务,命令窗口提示错误:
1 | >bin\windows\kafka-server-start.bat config\server.properties |
网上查找解决办法,修改kafka-server-satrt.bat
:
1 | set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %* |
给上述代码段的%CLASSPATH%
添加双引号""
。
- 启动生产者时Kafka报错:
1 | >bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic HelloWord |
因为配置文件conf\server.properties
没有启用PLAINTEXT
,修改配置文件:
1 | #listeners=PLAINTEXT://:9092 |