Kafka

Kafka

http://kafka.apache.org/

Kafka是Apache开发的一款开源 流处理平台(网络信息流,日志流, 采样流), 由Scala和Java编写. Kafka是一种 高吞吐量的分布式发布订阅消息系统, 一般用作系统间解耦, 异步通讯, 削峰填谷等作用. 此外还提供了流处理插件 Kaka Streaming(类似Storm, Spark, Flink), 并且运行在应用端. 具有简单 , 入门要求低, 部署方便等优点.

工作形式

生产者向Kafka集群发送消息(Record), 每个Record 只能属于一个Topic(是消息的分类,比如短信主题, 用户主题), 但是一个消费者可以消费多个Topic. 每个Topic底层都会对应一组分区的日志, 这些分区日志用来持久化Record消息,并分片存储, 默认的分区策略为 hash(msgKey)%集群数量.

集群中的每个Kafka进程又称作Broker, 每个分区的数据将会分散存储到各个Broker中, 并且总有一个充当Leader(负责分区读写), 其他Broker充当Follower(负责分区的数据备份). 如果Leader宕机, Zookeeper就会重新在Follower中选举一个Broker充当新的Leader.

集群中的Leader的监控, 以及Topic的部分元数据是存储在Zookeeper集群中的.

image-20210117212057579

Topic分区&日志

Kafka中所有消息是通过Topic为单位进行管理, 每个Kafka中的Topic通常会有多个订阅者, 负责订阅发送到该Topic中的数据, Kafka负责管理集群中每个Topic的一组日志分区.

生产者将数据发送到相应的Topic, 负责选择将哪条记录发送到哪个Partition. 不仅可以使用轮询平衡负载, 可以根据某些语义进行分区.

每组日志分区是一个有序的不可变的日志队列, 分区中的每个Record都被分配了唯一的序列编号成为offset, Kafka集群会持久化所有发布到Topic中的Record信息, 每个Record的持久化时间由配置: log.retention.hours=168 决定的, 默认是168小时, 也就是7天.

Kafka地层会定期检查日志文件, 然后将过期数据从log移除, 由于Kafka 使用硬盘存储日志文件, 因此使用Kafka缓存一些日志文件不存在问题.

image-20210117220055179

由上图可知:

  1. Topic 包含 多个分区 Partion

  2. 一个Partition分区是一个队列

  3. 分区左侧是最早的消息, 右侧则是最新的消息

  4. Kafka只能保证同一分区中的消息的顺序, 而无法保证不同分区之间的顺序, 如果要保证Topic中的所有顺序, 可以只设置一个分区

  5. 每个Record在分区中都有一个唯一的标志: offset, 值越小, 进入队列的时间就越早

为什么要设置分区:

  1. 打破单机存储的容量限制

  2. 分区数将数据拆分后给多个Broker处理, 从某种程度上,提高了Kafka的写入性能, 所以可以应用到高并法以及大数据方向

生产者&消费者

Kafka是如何记录每一个消费者 读取到哪个偏移量的?

在消费者消费Topic中的数据的时候, 每个消费者会维护本次消费者对应的分区的偏移量, 消费者消费完一个分区的数据后, 会将本次的消费偏移量提交给Kafka集群. 因此每个消费者都可以随意控制更改偏移量. 因此多个消费者之间彼此相互独立.

其中每个分区又分为Leader与Follower, 集群的负载可以得到很好的平衡.

image-20210118143412749

消费者和消费者组:

在Kafka中,消费者使用Consumer Group名称来标记自己, 并且发布到Topic的每条记录都会传递到每个订阅Consumer Group的一个消费者实例. 如果所有的Consumer实例具有不同的ConsumerGroup, 则每条记录将广播到所有的Consumer进程. 如果所有的Consumer示例具有相同的ConsumerGroup, 则Topic中的消息记录会在ConsumerGroup中被多个Conusmer均分消费(如上图, 消费组A内每个Consumer均分消费了两个分区).

每个Topic中含有相对较少的ConsumerGroup, 一个ConsumerGroup可以理解为一个’ 逻辑订阅者‘, 每个ConsumerGroup又由多个Conusmer组成, 实现了消费者的伸缩以及容错能力, 提高了消费者的可用性. ConsumerGroup中的所有Consumer实例,往往是一个服务集群. 每个消费组中的消费者的数量往往不会超过Topic中的分区的数量, 如果消费组中的Consumer实例超过了该Topic的分区的数量, 那么多余的Consumer会闲着. 但是当ConsumerGroup中的某个服务出现故障, 那么空闲的Consumer就会去顶替该服务进行服务消费.

所以Kafka不仅可以提高消息通讯的写入能力,还可以提升消费者的消费能力, 以及消息存储的分区能力.

顺序写入&ZeroCopy

Kafka具有高吞吐率, 即使工作在普通服务器上, Kafka也可以轻松支持美妙百万的级的写入请求, 超过了绝大多说的消息中间件. 所以他在日志处理等海量数据场景广泛应用. Kafka还会将信息收集到磁盘中,防止数据丢失.为了优化写入速度 Kafka采用了两个技术: 顺序写入 和 MMFile. (Memery Mapped File 内存映射文件) .

顺序写入:

因为硬盘是机械结构, 每次读写都会 寻址-> 写入,其中寻址是一个机械动作. 他是最耗时的. 所以硬盘最讨厌随机I/O. 最喜欢顺序IO. 这样省略了大量内存开销以及节省了IO寻址时间. 但是单独的使用顺序写入, Kafka的内存性能是无法与内存比较的.

零拷贝:

Kafka服务器在响应客户端读取的时候, 底层使用另拷贝技术, 直接将数据通过内核空间传递输出, 数据并没有抵达用户空间.

image-20210120204055459

环境搭建

单机环境

下载Kafka二进制安装文件, 解压到指定路径, 可以看到Kafka目录结构如下:

在config中配置server.properties:

启动Kafka, 运行 ./bin/kafka-server-start.sh -daemon config/server.properties 命令 , 其中 -daemon 代表后台运行服务. 运行 ./bin/kafka-server-stop.sh 即可优雅关闭kafka服务.

非后态启动可以看日志,方便看报错信息

集群环境

同单机环境类似, 但是需要修改Brokerid, 比如四台机器, 其Brokerid为 0,1,2,3, 依次启动每个结点的BrokerId即可启动Kafka集群环境.

Topic管理

创建Topic

查看集群中Topic列表

查看分区备份

查看主题的详细信息

修改Topic01分区数量

注意, Topic的分区数量只能增大, 不能减小.

删除Topic

发布与订阅消息

组 g1 订阅消息

稍后会进入阻塞状态,用来接受Topic01的消息.

  • --group g1 指定消费组

  • --property print.key=true 打印消息的key

  • --property print.value=true 打印消息的alue

  • --property key.separator=,,指定key和alue的分割符为逗号

发布消息到主题 Topic01

会进入控制台输入状态, 发送消息内容.

消费者组列表查看

查看消费者组g1组的详细信息

消费偏移量offset控制

初始订阅消费偏移量策略

当消费者第一次订阅某个Topic时, 在系统中并不存在该消费者的消费分区的偏移量。Kafka通过属性 auto.offset.reset 属性来确定第一次的偏移量:

  • latest, 默认值,将偏移量重置为最新的偏移量(也就是订阅时,分区的长度),设置此模式时,消费者只能消费到订阅后的消息

  • earliest, 自动将偏移量设置为最早的偏移量, 设置此模式时, 新消费者会从头消费指定消费分区的消息

  • none,如果未找到消费者之前的偏移量,则抛出异常

消费者自动提交offset维护偏移量

Kafka消费者在消费数据时,默认会提交消费的偏移量, 这样就 可以保证所有消息至少可以被消费者消费一次, 可以通过以下两个参数对偏移量同步进行配置:

手动提交偏移量

关闭自动提交策略:

手动提交偏移量:

Acks应答与Retries

Kafka 生产者在发送完一个的消息之后要求Broker在规定的额时间Ack应答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。

Acks应答策略

  • acks=0: 生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。(一个不确认都行,性能高,比如日志收集)

  • acks=1: Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果Leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。(有个Leader确认就行了)

  • acks=all:意味着Leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks = -1设置。

Retries重试策略

如果 生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制, 生产者开始重试发送。默认策略如下:

acks&retries作用

用来最大程度的保证生产者的数据发送到Broker上。但是可能会导致重复数据的产生,以及数据发送的效率。

配置acks和retries策略

保证幂等性

HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

Kafka在0.11.0.0版本支持增加了对幂等的支持。幂等是针对 生产者角度 的特性。 幂等可以保证生产者发送的消息,不会丢失(acks和retries),而且不会重复(唯一标识)。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:

唯一标识

要想区分请求是否重复, 请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识。记录下已处理过的请求标识:光有唯一标识还不够,还需要 记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝掉

Kafka幂等

幂等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Kafka Topic中仅仅一次。在初始化期间,kafka会给生产者生成一个唯一的ID称为Producer ID或PID。PID和序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID / TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。

1611994338386

配置Kafka幂等

Kafka-Eagle 监控

安装文档:

安装

下载kafka-eagle安装包,解压到目标路径,这里是 /root/kafka-eagle-web-1.4.0

  1. 设置环境变量: export KE_HOME=/root/kafka-eagle-web-1.4.0, export PATH=$PATH:$KE_HOME/bin

  2. 配置 conf/system-config.properties, kafka连接的zookeeper集群,用来读取kafka集群信息:

  3. 开启报表图,需要kafka开启jmx端口,让其监听:

    修改kafka的 kafka-server-start.sh,开启jmx端口,让其检测性能指标使用:

    重启kafka。

  4. 配置topic删除密码:

  5. 配置kafka-eagle连接的数据库, 默认使用sqllite,可以替换成mysql:

启动与使用

访问界面即可,日志会打印访问地址。 默认用户名密码为 admin, 123456

Dashboard:

image-20210131132646280

Topic:

image-20210131133301115

Consumers:

image-20210131133452946

Cluster:

image-20210131133626556
image-20210131134006447

最后更新于

这有帮助吗?