Apache kafka(一)简介及入门

简介

Kafka是一个分布式流处理平台。Kafka于2009年源自Linkedin,随后于2011年初开源,并于2012年10月23由Apache Incubator孵化出站。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。

流处理平台三个关键功能

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统。
  • 以容错持久的方式存储记录流。
  • 处理记录发生的流。

Kafka通常用于两大类应用

  • 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
  • 构建实时流应用程序,用于转换或相应数据流

几个基本概念

  • Kafka作为一个集群运行在一台或多台可以跨越多个数据中心的服务器上。
  • Kafka集群在称为主题的类别中存储记录流。
  • 每个记录由一个键,一个值和一个时间戳组成。

Kafka 的架构

Kafka架构的主要术语包括Topic、Record和Broker。Topic由Record组成,Record持有不同的信息,而Broker则负责复制消息。

四个核心 API

  • 生产者API:支持应用发布Record流。
  • 消费者API:支持应用程序订阅Topic和处理Record流。
  • Stream API:将输入流转换为输出流,并产生结果。
  • Connector API:执行可重用的生产者和消费者API,可将Topic链接到现有应用程序。

Kafka-apis

安装及使用

基于Unix平台上使用bin/,脚本扩展名为.sh

WIndows平台上使用bin\windows\,并且脚本扩展名为.bat

以下命令均在Windows平台执行。

第1步:下载代码

下载 2.3.0版本并解压它。Windows平台直接解压。

1
>cd kafka_2.12-2.3.0

第2步:启动服务器

Kafka使用ZooKeeper,首先启动ZooKeeper服务器,使用Kafka打包在一起的便捷脚本使用单节点的ZooKeeper实例。

1
>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

ZooKeeper成功启动,并绑定到端口2181。该端口是ZooKeeper的默认端口,可以在config\zookeeper.properties中修改clientPort来修改监听端口。

启动Kafka服务器:

1
>bin\windows\kafka-server-start.bat config\server.properties

windows环境下启动命令中的配置文件路径 .properties 需要 ..\..\xx.properties

第3步:创建一个主题

创建一个名为“HelloWord”的主题:

1
>bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HelloWord

通过运行list topic命令查询创建的主题:

1
>bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

或者也可以将代理配置设置为发布不存在的主题是自动创建主题。

第4步:启动一个生产者并发送消息

1
2
3
>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic HelloWord
This is a message
hello,my is producer

第5步:启动一个消费者并接收消息

1
2
3
>bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic HelloWord --from-beginning
This is a message
hello,my is producer

Kafka 集群配置

  1. 配置 Kafka 的 Zookeeper 地址 zookeeper.connect 。Zookeeper集群可通过 , 分开。

192.168.1.100192.168.1.101 Zookeeper 配置:

1
2
3
4
5
6
7
8
9
10
11
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=192.168.1.200:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
  1. 修改 broker.idbroker.id 在集群中必须是唯一的。

192.168.1.100 配置 :

1
2
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

192.168.1.101 配置:

1
2
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
  1. listeners 写入本机IP,集群中完成节点间通讯使用。

192.168.1.100 配置:

1
2
3
4
5
6
7
# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.1.100:9092

192.168.1.101 配置:

1
2
3
4
5
6
7
# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.1.101:9092
  1. 查询 Kafka 集群节点部署情况。
1
2
3
4
5
6
>zookeeper-shell.sh 192.168.1.200:2181
...
>ls /brokers/ids
[0, 1]
>get /controller
...

ls /brokers/ids 查询注册了 zookeeper 节点的 broker.id

get /controller 查询 leader,master 节点。

异常及处理

  1. 启动Kafka服务,命令窗口提示错误:
1
2
>bin\windows\kafka-server-start.bat config\server.properties
错误: 找不到或无法加载主类 Files\Java\jdk1.7.0_75\lib\dt.jar;C:\Program

网上查找解决办法,修改kafka-server-satrt.bat

1
2
3
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*
修改为:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

给上述代码段的%CLASSPATH%添加双引号""

  1. 启动生产者时Kafka报错:
1
2
>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic HelloWord
WARN [Consumer clientId=consumer-1, groupId=console-consumer-950] Connection to node -1 could not be established. Broker may not be available.

因为配置文件conf\server.properties没有启用PLAINTEXT,修改配置文件:

1
2
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://localhost:9092

相关参考

0%