Kafka

Kafka

http://kafka.apache.org/

Kafka是Apache开发的一款开源 流处理平台(网络信息流,日志流, 采样流), 由Scala和Java编写. Kafka是一种 高吞吐量的分布式发布订阅消息系统, 一般用作系统间解耦, 异步通讯, 削峰填谷等作用. 此外还提供了流处理插件 Kaka Streaming(类似Storm, Spark, Flink), 并且运行在应用端. 具有简单 , 入门要求低, 部署方便等优点.

工作形式

生产者向Kafka集群发送消息(Record), 每个Record 只能属于一个Topic(是消息的分类,比如短信主题, 用户主题), 但是一个消费者可以消费多个Topic. 每个Topic底层都会对应一组分区的日志, 这些分区日志用来持久化Record消息,并分片存储, 默认的分区策略为 hash(msgKey)%集群数量.

集群中的每个Kafka进程又称作Broker, 每个分区的数据将会分散存储到各个Broker中, 并且总有一个充当Leader(负责分区读写), 其他Broker充当Follower(负责分区的数据备份). 如果Leader宕机, Zookeeper就会重新在Follower中选举一个Broker充当新的Leader.

集群中的Leader的监控, 以及Topic的部分元数据是存储在Zookeeper集群中的.

Topic分区&日志

Kafka中所有消息是通过Topic为单位进行管理, 每个Kafka中的Topic通常会有多个订阅者, 负责订阅发送到该Topic中的数据, Kafka负责管理集群中每个Topic的一组日志分区.

生产者将数据发送到相应的Topic, 负责选择将哪条记录发送到哪个Partition. 不仅可以使用轮询平衡负载, 可以根据某些语义进行分区.

每组日志分区是一个有序的不可变的日志队列, 分区中的每个Record都被分配了唯一的序列编号成为offset, Kafka集群会持久化所有发布到Topic中的Record信息, 每个Record的持久化时间由配置: log.retention.hours=168 决定的, 默认是168小时, 也就是7天.

Kafka地层会定期检查日志文件, 然后将过期数据从log移除, 由于Kafka 使用硬盘存储日志文件, 因此使用Kafka缓存一些日志文件不存在问题.

由上图可知:

  1. Topic 包含 多个分区 Partion

  2. 一个Partition分区是一个队列

  3. 分区左侧是最早的消息, 右侧则是最新的消息

  4. Kafka只能保证同一分区中的消息的顺序, 而无法保证不同分区之间的顺序, 如果要保证Topic中的所有顺序, 可以只设置一个分区

  5. 每个Record在分区中都有一个唯一的标志: offset, 值越小, 进入队列的时间就越早

为什么要设置分区:

  1. 打破单机存储的容量限制

  2. 分区数将数据拆分后给多个Broker处理, 从某种程度上,提高了Kafka的写入性能, 所以可以应用到高并法以及大数据方向

生产者&消费者

Kafka是如何记录每一个消费者 读取到哪个偏移量的?

在消费者消费Topic中的数据的时候, 每个消费者会维护本次消费者对应的分区的偏移量, 消费者消费完一个分区的数据后, 会将本次的消费偏移量提交给Kafka集群. 因此每个消费者都可以随意控制更改偏移量. 因此多个消费者之间彼此相互独立.

其中每个分区又分为Leader与Follower, 集群的负载可以得到很好的平衡.

消费者和消费者组:

在Kafka中,消费者使用Consumer Group名称来标记自己, 并且发布到Topic的每条记录都会传递到每个订阅Consumer Group的一个消费者实例. 如果所有的Consumer实例具有不同的ConsumerGroup, 则每条记录将广播到所有的Consumer进程. 如果所有的Consumer示例具有相同的ConsumerGroup, 则Topic中的消息记录会在ConsumerGroup中被多个Conusmer均分消费(如上图, 消费组A内每个Consumer均分消费了两个分区).

每个Topic中含有相对较少的ConsumerGroup, 一个ConsumerGroup可以理解为一个’ 逻辑订阅者‘, 每个ConsumerGroup又由多个Conusmer组成, 实现了消费者的伸缩以及容错能力, 提高了消费者的可用性. ConsumerGroup中的所有Consumer实例,往往是一个服务集群. 每个消费组中的消费者的数量往往不会超过Topic中的分区的数量, 如果消费组中的Consumer实例超过了该Topic的分区的数量, 那么多余的Consumer会闲着. 但是当ConsumerGroup中的某个服务出现故障, 那么空闲的Consumer就会去顶替该服务进行服务消费.

所以Kafka不仅可以提高消息通讯的写入能力,还可以提升消费者的消费能力, 以及消息存储的分区能力.

顺序写入&ZeroCopy

Kafka具有高吞吐率, 即使工作在普通服务器上, Kafka也可以轻松支持美妙百万的级的写入请求, 超过了绝大多说的消息中间件. 所以他在日志处理等海量数据场景广泛应用. Kafka还会将信息收集到磁盘中,防止数据丢失.为了优化写入速度 Kafka采用了两个技术: 顺序写入 和 MMFile. (Memery Mapped File 内存映射文件) .

顺序写入:

因为硬盘是机械结构, 每次读写都会 寻址-> 写入,其中寻址是一个机械动作. 他是最耗时的. 所以硬盘最讨厌随机I/O. 最喜欢顺序IO. 这样省略了大量内存开销以及节省了IO寻址时间. 但是单独的使用顺序写入, Kafka的内存性能是无法与内存比较的.

零拷贝:

Kafka服务器在响应客户端读取的时候, 底层使用另拷贝技术, 直接将数据通过内核空间传递输出, 数据并没有抵达用户空间.

环境搭建

单机环境

下载Kafka二进制安装文件, 解压到指定路径, 可以看到Kafka目录结构如下:

bin
config
libs
site-docs

在config中配置server.properties:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
# 集群的kafka进程结点, 也就是broker的唯一标志, 由于是单机环境, 所有无需修改
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# kafka服务器底层监听的地址, 这里要填写主机名
listeners=PLAINTEXT://server1:9092

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
# log.dirs=/tmp/kafka-logs
# kafka 日志存储路径
log.dirs=/var/kafka

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
# 日志会保留168小时,也就是七天
log.retention.hours=168

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# 配置zookeeper服务, 单台
zookeeper.connect=server1:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

启动Kafka, 运行 ./bin/kafka-server-start.sh -daemon config/server.properties 命令 , 其中 -daemon 代表后台运行服务. 运行 ./bin/kafka-server-stop.sh 即可优雅关闭kafka服务.

非后态启动可以看日志,方便看报错信息

集群环境

同单机环境类似, 但是需要修改Brokerid, 比如四台机器, 其Brokerid为 0,1,2,3, 依次启动每个结点的BrokerId即可启动Kafka集群环境.

Topic管理

创建Topic

# kafka-topics.sh 该命令用于Topic管理
# --bootstrap-server 用来指定kafka服务地址

# --create 创建操作
# --topic 目标主题
# --partitions 3 三个分区, 因为这里是三台机器
# --replication-factor 2, 副本因子为2,每个分区在整个集群中共有两个备份, 并且是分散的
bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --create --topic Topic01 --partitions 3  --replication-factor 2
Created topic Topic01. # 返回此信息代表创建成功

查看集群中Topic列表

bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --list

查看分区备份

# 进入kafaka日志目录,这里我配置的是  /var/kafka-logs/
cd /var/kafka-logs/

# borker0
ll
drwxr-xr-x 2 root root 141 1月  24 18:20 Topic01-0
drwxr-xr-x 2 root root 141 1月  24 18:20 Topic01-2

# broker1
drwxr-xr-x 2 root root 141 1月  24 18:20 Topic01-0
drwxr-xr-x 2 root root 141 1月  24 18:20 Topic01-1

# broker2
drwxr-xr-x 2 root root 141 1月  24 18:20 Topic01-1
drwxr-xr-x 2 root root 141 1月  24 18:20 Topic01-2

查看主题的详细信息

bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --describe --topic Topic01

# Topic01 共有3个分区, 两个副本因子
Topic: Topic01  PartitionCount: 3   ReplicationFactor: 2    Configs: segment.bytes=1073741824
    # 分区0, 的备份的主结点也就是master 存储在 broker2, 共有两个副本, broker0 和 broker2
    Topic: Topic01  Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2,0
    # 分区1, 的备份的主结点也就是master 存储在 broker1, 共有两个副本, broker1和 broker2
    Topic: Topic01  Partition: 1    Leader: 1   Replicas: 1,2   Isr: 1,2
    # 分区2, 的备份的主结点也就是master 存储在 broker0, 共有两个副本, broker0 和 broker1
    Topic: Topic01  Partition: 2    Leader: 0   Replicas: 0,1   Isr: 0,1

修改Topic01分区数量

# 将topic01的分区数量由3改为2
bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --alter --topic Topic01 --partitions 2

Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 2.
[2021-01-24 20:56:06,670] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2.
 (kafka.admin.TopicCommand$)

注意, Topic的分区数量只能增大, 不能减小.

删除Topic

bin/kafka-topics.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --delete --topic Topic01

发布与订阅消息

组 g1 订阅消息

./bin/kafka-console-consumer.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --group g1 --topic Topic01 --property print.key=true --property print.value=true --property key.separator=,

稍后会进入阻塞状态,用来接受Topic01的消息.

  • --group g1 指定消费组

  • --property print.key=true 打印消息的key

  • --property print.value=true 打印消息的alue

  • --property key.separator=,,指定key和alue的分割符为逗号

发布消息到主题 Topic01

./bin/kafka-console-producer.sh --broker-list s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --topic Topic01

会进入控制台输入状态, 发送消息内容.

消费者组列表查看

./bin/kafka-consumer-groups.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --list
g1

查看消费者组g1组的详细信息

./bin/kafka-consumer-groups.sh --bootstrap-server s2.svc.com:9092,s3.svc.com:9092,s4.svc.com:9092 --describe --group g1

# 组名 | 主题 | 分区 | 数据消费情况:当前位置 | 数据消费情况:下一个位置 | 上两个之值之间的差值,代表未消费的数量 |  消费者id  | 消费者主机ip |  客户端id
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                        HOST            CLIENT-ID
g1              Topic01         0          1               1               0               consumer-g1-1-cffe946e-eb01-48ae-99c9-28d2499bca41 /192.168.1.112  consumer-g1-1
g1              Topic01         1          2               2               0               consumer-g1-1-cffe946e-eb01-48ae-99c9-28d2499bca41 /192.168.1.112  consumer-g1-1
g1              Topic01         2          0               0               0               consumer-g1-1-cffe946e-eb01-48ae-99c9-28d2499bca41 /192.168.1.112  consumer-g1-1

消费偏移量offset控制

初始订阅消费偏移量策略

当消费者第一次订阅某个Topic时, 在系统中并不存在该消费者的消费分区的偏移量。Kafka通过属性 auto.offset.reset 属性来确定第一次的偏移量:

  • latest, 默认值,将偏移量重置为最新的偏移量(也就是订阅时,分区的长度),设置此模式时,消费者只能消费到订阅后的消息

  • earliest, 自动将偏移量设置为最早的偏移量, 设置此模式时, 新消费者会从头消费指定消费分区的消息

  • none,如果未找到消费者之前的偏移量,则抛出异常

prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

消费者自动提交offset维护偏移量

Kafka消费者在消费数据时,默认会提交消费的偏移量, 这样就 可以保证所有消息至少可以被消费者消费一次, 可以通过以下两个参数对偏移量同步进行配置:

enable.auto.commit = true // 自动提交
auto.commit.interval.ms = 5000 // 每隔5s提交一次offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

手动提交偏移量

关闭自动提交策略:

prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交偏移量:

// 此次消费完毕
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
// 消费者异步提交offset,需要一个OffsetCommitCallback回调
consumer.commitAsync(offsets, (offsets1, exception) -> System.out.println("完成:" + record.offset() + "提交!"));

Acks应答与Retries

Kafka 生产者在发送完一个的消息之后要求Broker在规定的额时间Ack应答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。

Acks应答策略

  • acks=0: 生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。(一个不确认都行,性能高,比如日志收集)

  • acks=1: Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果Leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。(有个Leader确认就行了)

  • acks=all:意味着Leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks = -1设置。

Retries重试策略

如果 生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制, 生产者开始重试发送。默认策略如下:

# 默认,如果brokers的leader在30s内不给回复
request.timeout.ms = 30000
# 生产者就会重试2147483647次,持续发送
retries = 2147483647

acks&retries作用

用来最大程度的保证生产者的数据发送到Broker上。但是可能会导致重复数据的产生,以及数据发送的效率。

配置acks和retries策略

prop.put(ProducerConfig.ACKS_CONFIG, "all"); // Ack策略
prop.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1"); // 如果1ms内没有ack回应,则重试
prop.put(ProducerConfig.RETRIES_CONFIG, "10"); // 重试十次

保证幂等性

HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

Kafka在0.11.0.0版本支持增加了对幂等的支持。幂等是针对 生产者角度 的特性。 幂等可以保证生产者发送的消息,不会丢失(acks和retries),而且不会重复(唯一标识)。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:

唯一标识

要想区分请求是否重复, 请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识。记录下已处理过的请求标识:光有唯一标识还不够,还需要 记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝掉

Kafka幂等

幂等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Kafka Topic中仅仅一次。在初始化期间,kafka会给生产者生成一个唯一的ID称为Producer ID或PID。PID和序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID / TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。

配置Kafka幂等

# 默认不开启,为false
# enable.idempotence= false
enable.idempotence= true

# 开启幂等,必须设置acks为all以及开启重试
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // acks机制必须设置为all
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试必须大于0,3是代表不算第一次,如果第一次失败了就再试三次
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 生产者在发送数据时如果多于n个记录未被应答,则客户端(生产者)会被阻塞。 这个值默认是5.  kafka幂等配置需要此数值小于等于5

Kafka-Eagle 监控

安装文档:

安装

下载kafka-eagle安装包,解压到目标路径,这里是 /root/kafka-eagle-web-1.4.0

  1. 设置环境变量: export KE_HOME=/root/kafka-eagle-web-1.4.0, export PATH=$PATH:$KE_HOME/bin

  2. 配置 conf/system-config.properties, kafka连接的zookeeper集群,用来读取kafka集群信息:

    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=s1.svc.com:2181,s2.svc.com:2181,s3.svc.com:2181,s3.svc.com:2181
  3. 开启报表图,需要kafka开启jmx端口,让其监听:

    kafka.eagle.metrics.charts=true

    修改kafka的 kafka-server-start.sh,开启jmx端口,让其检测性能指标使用:

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
       export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
       # 增加此行
       export JMX_PORT="7788"
    fi

    重启kafka。

  4. 配置topic删除密码:

    kafka.eagle.topic.token=keadmin
  5. 配置kafka-eagle连接的数据库, 默认使用sqllite,可以替换成mysql:

    kafka.eagle.driver=org.sqlite.JDBC
    kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
    kafka.eagle.username=root
    kafka.eagle.password=www.kafka-eagle.org
    
    ######################################
    # kafka mysql jdbc driver address
    ######################################
    #kafka.eagle.driver=com.mysql.jdbc.Driver
    #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNu
    ll
    #kafka.eagle.username=root
    #kafka.eagle.password=123456

启动与使用

ke.sh start

访问界面即可,日志会打印访问地址。 默认用户名密码为 admin, 123456

Dashboard:


Topic:


Consumers:


Cluster:

最后更新于