阿里云消息队列Kafka版(kafka集群消息同步)

admin 107 2022-12-19

阿里云服务器优惠多,折扣错,惊喜多,请咨询:www.wqiis.com

本文目录一览:

kafka消费者和offset的关系,以及异常处理问题

earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

简单来说,如果partition里已经有数据,但还没有消费,earliest就会从没消费的起始点来消费,反观latest就不会去消费;如果partition已经有已消费的数据,再放新的数据进去,那么它们都会从新的数据开始消费。

offset会保存在kafka内部,一开始发送数据到kafka的时候就有offset,只是有没有提交而已。而使用spring-kafka时,客户端在监听topic的时候,它有2种提交offset的方式:

1、自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。

2、手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。

spring-kafka版本2.5.5,官网 ,设置的是批量消费。

因为是批量消费,所以@KafkaListener需要使用list来接收消息,如果使用单个bean会报错。正常不设置异常处理,它会不断循环重复消费这条数据,不像别的地方说有一定数量的重试。

实现接口new BatchErrorHandler自定义属于自己的批量异常处理,但只会到:

public void handle(Exception thrownException, ConsumerRecords?, ? data)

而不到

public void handle(Exception thrownException, ConsumerRecords?, ? data, Consumer?, ? consumer, MessageListenerContainer container)

再定义逻辑自定处理。如果像官网那样seek回开始的offset,也是无限循环,不太了解所以不采用。

实现接口ConsumerAwareListenerErrorHandler,注意区别是有个Listener的,

如果同时存在局部和全局,在@KafkaListener注解中标注了这个局部的异常处理器,会优先使用局部的。

如果发生异常,来的是一批数据,如果头部发生了异常,那么后面的都会略过。按照参考链接中的异常处理,定义一个死信,来接收这些失败的msg,如果异常处理在全局异常处理器中,那么它们都被发送到死信,后续就算数据是正确的,都不会处理,所以 还是建议个人使用try catch来包裹 处理,个人尝试在kafka处理业务远程插入两条数据,第一条错误,第二条正确,try catch中第一条自定发到死信,第二条会正确入库。

参考:

kafka之consumer参数auto.offset.reset

kafka 消费者offset记录位置和方式

消息队列-kafka消费异常问题

Kafka - 异常处理 待试

阿里云支持goldengate吗

kafka是分布式消息队列或者叫分布式消息中间件,有时候会叫做一种MQ产品(Message Queue),同类型的有RabbitMQ,ActiveMQ等等。 MQTT是一种即时消息传输协议,Message Queuing Telemetry Transport,也就是一种即时信息传输的一种格式约定,与其...

SpringCloudAlibaba(一):概述与重要组件

前一篇提到了我们为什么要替换PHP语言采用Java语言。而Java语言的框架选型上来说有太多的选择,常见的有Dubbo,SpringCloud等。我们选择了SpringCloudAlibaba。替换PHP语言到SpringCloudAlibaba是个大工程,主要是业务迁移部分。讨论之初我也确认过是否迁移原有的业务,得到的明确答复是 迁移 。 那么这么来说也就简单了,复杂的就是工期问题了。

SpringCloud Alibaba是Alibaba结合自身的微服务实践开源的一套微服务全家桶 ,在SpringCloud项目中进行孵化并且毕业。既然是SpringCloud的项目那么阿里云其实包含其商业化的产品。 例如Nacos在阿里云就有其商业化的版本 MSE 。 同时SpringCloud Alibaba的相关组件是经历过双十一大促考验的产品。稳定性较高。

SpringCloud Alibaba是SpringCloud的子项目,其实很多相关的文章都提到了SpringCloud Alibaba与SpringCloud的关系,其中有很多的论点都比较有意思。大家可以去搜索一下。

SpringCloud Alibaba是依赖SpringCloud相关的标准实现的一套微服务的架构。结合阿里巴巴的相关实践与阿里云的相关服务实现的一些组件得以更快的实现相关产品业务。

Nacos实现了服务的配置中心与服务注册发现的功能,Nacos可以通过可视化的配置降低相关的学习与维护成本,实现动态的配置管理与分环境的配置中心控制。 同时Nacos提供了基于http/RCP的服务注册与发现功能。

Sentinel是面向分布式微服务架构的轻量级高可用的流控组件,以流量作为切入点,从流量控制,熔断降级,系统负载保护等维度帮助用户保证服务的稳定性。常用与实现限流、熔断降级等策略。

RocketMQ基于Java的高性能、高吞吐量的消息队列,在SpringCloud Alibaba生态用于实现消息驱动的业务开发,常见的消息队列有Kafka、RocketMQ、RabbitMQ等,相关的比较文档可以自行去翻阅。

既然是微服务的产品,那么肯定会用到分布式事物。Seata就是阿里巴巴开源的一个高性能分布式事物的解决方案。

Dubbo已经在圈内很火了,SpringCloud Alibaba基于上面提到的Nacos服务注册中心也同样整合了Dubbo。

SpringCloud Alibaba还有一些其他的组件选择,例如schedulerX、SMS、OSS等。但是由于其主要是阿里云的商业化产品就不再过多的进行介绍。集成其商业化产品时才能用到。

SpringCloud Alibaba是基于SpringCloud标准由阿里巴巴实现的微服务全家桶,可插拔的方式实现组件的替换,在某些场景中我们需要的组件可以自由进行选择。例如需要分布式链路跟踪我们可以增加sleuth组件用于实现分布式链路跟踪业务等。

很多人提到SpringCloudAlibaba的商业问题,记得当年SpringCloudAlibaba推出第一版的时候我也评论了...卖产品全家桶。不可否认是有那么一些,但是其实它本身的很多组件又不一定非要选择商业版本。这个可以自由进行选择。

Kafka压缩

首先说明一点kafka的压缩和kafka的compact是不同的,compact就是相同的key只保留一条,是数据清理方式的一种和kafka的定期删除策略是并列的;而kafka的压缩是指数据不删除只是采用压缩算法进行压缩。

kafka从0.7版本就开始支持压缩功能:

1)kafka的发送端将消息按照批量(如果批量设置一条或者很小,可能有相反的效果)的方式进行压缩。

2)服务器端直接将压缩消息保存(特别注意,如果kafka的版本不同,那么就存在broker需要先解压缩再压缩的问题,导致消耗资源过多)。

3)消费端自动解压缩,测试了下,发送端无论采用什么压缩模式,消费端无论设置什么解压模式,都可以自动完成解压缩功能。

4)压缩消息可以和非压缩消息混存,也就是说如果你kafka里面先保存的是非压缩消息,后面改成压缩,不用担心,kafka消费端自动支持。

测试的kafka版本:kafka_2.12-1.1.1

测试的kafka客户端版本:0.10.2.1

测试数据的条数:20000

kafka支持三种压缩算法,lz4、snappy、gzip,

通过上面数据来看,gzip的压缩效果最好,但是生成耗时更长,snappy和lz4的数据差不多,更倾向于lz4,具huxi大神的书上所说kafka里面对snappy做了硬编码,所以性能上最好的是lz4,推荐使用此压缩算法。

压缩率对比:

性能对比图:

很简单:

消费端无论设置什么压缩模式,都可以正确的解压kafka的消息,也就是说消费端可以不设置解压缩,

不过可能性能有所下降。

Kafka consumer 解析

上一篇说了Kafka consumer的处理逻辑、实现原理及相关的特点,本篇来看看Kafka 另一个client Consumer,作为生产者消费者的另一端,consumer提供了消费消息的能力,下面来看看Kafka中的consumer 应该如何正确使用及实现原理。

常见的消息引擎中通常有 经典的生产者消费者模式 、 发布订阅模式 两种

是一种点对点的方式,消息不会被重复消费,可以粗暴的理解为消息被消费后就被标记删除或者已删除了,这是常见的消息队列通常的模式。比如说进程间通信,这种基于队列实现消息传输服务的。

相对于生产者 消费者模式,消息可能会被多方消费,可以简单的理解为一份报纸的内容,订阅它的人都可以读到它,当一个人读完之后也就没必要再次去读了。并且在发布订阅模式中,通常有个概念叫做topic,每个topic 有对应的发布者(publisher)、订阅者(subsciber)。

那Kafka是如何实现生产者消费者两种模式的呢?往后看~

kafka中有一个概念叫做consumer group,每个group 去订阅对应的topic,topic的每条消息只能发送到订阅它的消费者组的其中一个实例上,并且每个消费者至多使用一个消费者组来标示自己。这样不难得出,当某个topic 仅有一个group来消费时(组内有一个或者多个consumer),这个topic的消息的消费模式就是典型的生产者消费者模式。

而当某个topic 被多个消费者组订阅,而每个组仅有一个消费者时,每条消息就会被广播到每个消费者上。

这里需要注意下,还有个叫做独立消费者(standalone consumer)的概念,对于consumer group 是以group 为单位进行消息消费的,而standalone 会单独的执行消费,以consumer 实例为单位进行消费的。

是时候来看看Kafka consumer 端的实现原理了,先从最基础的group 开始,当前较新版本的consumer是依赖于broker端的coordinator来完成组的管理的(主要是把分配方案通知到每个consumer实例上),当然了这里涉及一个一致性策略,当无法达成这个策略是,就直接抛异常请求人工介入处理了。

coordinator 实现组的管理,依赖的主要是consumer group的状态,仅有 Empty(组内没有任何active consumer)、PreparingRebalance(group 正在准备进行rebalance)、AwaitingSync(所有组员已经加入组并等待leader consumer发送分区的分配方案)、Stable(group开始正常消费)、Dead(该group 已经被废弃)这五个状态,那他们是如何轮转的可以简单的看一下状态机。

就整个过程来说,可以大致分为加入组阶段、状态同步阶段。

加入组阶段:当明确group的coordinator之后,组内成员需要显式的发送JoinGroup请求(主要包括 订阅信息、成员id等元数据信息)给对应的coordinator,然后coordinator选择对应的consumer 作为leader,然后再给其他成员产生响应(一个空数组)。当然啦,如果某个consumer 指定的分配策略是其他consumer 不支持的,那么这个实例是不被接受的。现有的分区策略主要有:range、round-robin、sticky,其中sticky是其中可以最大限度保证分区的负载的均衡分配机rebalance之后的最少分配变动。

offset 概念这里需要单独抽出来说一下,因为在Kafka 里面存在两个offset的概念,一个指的是consumer 中的offset,一个是broker中的offset

concumer offset 用来记录当前消费了多少条消息,这个offset的状态是由consumer group来维护的,通过检查点机制对于offset的值进行持久化(内部就是一个map)

broker offset 消息在broker 端的位移值,根据之前说过的几个概念可以大致的理解为一个topic,partition,offset可以唯一的标示到一条消息。

因为新版本和旧版本Kafka 所采用的offset保存策略是不同的,旧版本中主要依赖于Zookeeper,但是zookeeper不是干这事儿的啊,所以kafka 在数量很大的消费发生时,zookeeper读写会异常的频繁,导致很容易成为整个Kafka系统的瓶颈。所以新版本对这种方式作出了重大更新,不再依赖于Zookeeper 来进行状态的保存,而是在broker 端直接开一个内部使用的topic,也就是_consumer_offsets topic,并且kafka 为了兼容老版本的consumer 还提供了 offsets.storage=kafka这样一个适配参数。

最后要说的一点就是consumer 端的Rebalance 过程(rebalance是针对consumer group来说的,如果是standalone consumer 则没有这个概念),rebalance也就是如何达成一致来分配订阅topic的所有分区。这个rebalance的代价还是不小的,我们是需要避免高频的rebalance的。常见的rebalance 场景有:新成员加入组、组内成员崩溃(这种场景无法主动通知,需要被动的检测才行,并且需要一个session.timeout 才检测到)、成员主动离组。

consumer 是可以执行任意次rebalance的,为了区分两次rebalance上的数据(防止无效或者延迟的offset提交),consumer 设计了一个叫做rebalance generation的标示。

对应常见的rebalance请求有:

JoinGroup:consumer 请求加入组

SyncGroup:group leader把分配方案同步给组内所有成员

Heartbeat:consumer 定期向coordination汇报心跳表示自己还存活

LeaveGroup:consumer 主动通知coordinator该consumer即将离组

DescribeGroup:查看组的所有信息。

Consumer端常见的概念大致就这么多。

上一篇:华为云gpu(华为云gpu服务器特点)
下一篇:华为云社区(华为的社区)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~