应用办公生活信息教育商业
投稿投诉
商业财经
汽车智能
教育国际
房产环球
信息数码
热点科技
生活手机
晨报新闻
办公软件
科学动态
应用生物
体育时事

如何学习kafka?

  作者:sinxu,腾讯CSIG后台开发
  本文主要从Kafka消费、堆积、稳定性、预案、成本控制等角度等最佳实践。
  引言:
  要确保Kafka在使用过程中的稳定性,需要从kafka在业务中的使用周期进行依次保障。主要可以分为:事先预防(通过规范的使用、开发,预防问题产生)、运行时监控(保障集群稳定,出问题能及时发现)、故障时解决(有完整的应急预案)这三阶段。一、事先预防
  事先预防即通过规范的使用、开发,预防问题产生。主要包含集群生产端消费端的一些最佳实践、上线前测试以及一些针对紧急情况(如消息积压等)的临时开关功能。Kafka调优原则:确定优化目标,并且定量给出目标(Kafka常见的优化目标是吞吐量、延时、持久性和可用性);确定了目标之后,需要明确优化的维度:通用性优化:操作系统、JVM等;针对性优化:优化Kafka的TPS、处理速度、延时等。
  1。生产端最佳实践1。1参数调优使用Java版的Client;使用kafkaproducerperftest。sh测试你的环境;设置内存、CPU、batch压缩;batch。size:该值设置越大,吞吐越大,但延迟也会越大;linger。ms:表示batch的超时时间,该值越大,吞吐越大、但延迟也会越大;max。in。flight。requests。per。connection:默认为5,表示client在blocking之前向单个连接(broker)发送的未确认请求的最大数,超过1时,将会影响数据的顺序性;compression。type:压缩设置,会提高吞吐量;acks:数据durability的设置;避免大消息(占用过多内存、降低broker处理速度);broker调整:增加num。replica。fetchers,提升Follower同步TPS,避免BrokerFullGC等;当吞吐量小于网络带宽时:增加线程、提高batch。size、增加更多producer实例、增加partition数;设置acks1时,如果延迟增大:可以增大num。replica。fetchers(follower同步数据的线程数)来调解;跨数据中心的传输:增加socket缓冲区设置以及OStcp缓冲区设置。1。2开发实践(1)做好Topic隔离
  根据具体场景(是否允许一定延迟、实时消息、定时周期任务等)区分kafkatopic,避免挤占或阻塞实时业务消息的处理。(2)做好消息流控
  如果下游消息消费存在瓶颈或者集群负载过高等,需要在生产端(或消息网关)实施流量生产速率的控制或者延时暂定消息发送等策略,避免短时间内发送大量消息。(3)做好消息补推
  手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。(4)做好消息顺序性保障
  如果需要在保证Kafka在分区内严格有序的话(即需要保证两个消息是有严格的先后顺序),需要设置key,让某类消息根据指定规则路由到同一个topic的同一个分区中(能解决大部分消费顺序的问题)。但是,需要避免分区内消息倾斜的问题(例如,按照店铺Id进行路由,容易导致消息不均衡的问题)。生产端:消息发送指定key,确保相同key的消息发送到同一个partition。消费端:单线程消费或者写N个内存queue,具有相同key的数据都到同一个内存queue;然后对于N个线程,每个线程分别消费一个内存queue;(5)适当提高消息发送效率批量发送:kafka先将消息缓存在内存中的双端队列(buffer)中,当消息量达到batchsize指定大小时进行批量发送,减少了网络传输频次,提高了传输效率;端到端压缩消息:将一批消息打包后进行压缩,发送给Broker服务器后,但频繁的压缩和解压也会降低性能,最终还是以压缩的方式传递到消费者的手上,在Consumer端进行解压;异步发送:将生产者改造为异步的方式,可以提升发送效率,但是如果消息异步产生过快,会导致挂起线程过多,内存不足,最终导致消息丢失;索引分区并行消费:当一个时间相对长的任务在执行时,它会占用该消息所在索引分区被锁定,后面的任务不能及时派发给空闲的客户端处理,若服务端如果启用索引分区并行消费的特性,就可以及时的把后面的任务派发给其他的客户端去执行,同时也不需要调整索引的分区数(但此类消息仅适用于无需保证消息顺序关系的消息)(6)保证消息发送可靠性Producer:如果对数据可靠性要求很高的话,在发送消息的时候,需要选择带有callBack的api进行发送,并设置acks、retries、factor等等些参数来保证Producer发送的消息不丢失。Broker:kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中,并采用了批量刷盘的做法,如果对数据可靠性要求很高的话,可以修改为同步刷盘的方式提高消息的可靠性。2。消费端最佳实践2。1参数调优吞吐量:调整partition数、OSpagecache(分配足够的内存来缓存数据);offsettopic(consumeroffsets):offsets。topic。replication。factor(默认为3)、offsets。retention。minutes(默认为1440,即1day);offsetcommit较慢:异步commit或手动commitfetch。min。bytes、fetch。max。wait。msmax。poll。interval。ms:调用poll()之后延迟的最大时间,超过这个时间没有调用poll()的话,就会认为这个consumer挂掉了,将会进行rebalancemax。poll。records:当调用poll()之后返回最大的record数,默认为500session。timeout。msConsumerRebalance:checktimeouts、checkprocessingtimeslogic、GCIssues网络配置2。2开发实践(1)做好消息消费幂等
  消息消费的幂等主要根据业务逻辑做调整。
  以处理订单消息为例:由订单编号订单状态唯一的幂等key,并存入redis;在处理之前,首先会去查Redis是否存在该Key,如果存在,则说明已经处理过了,直接丢掉;如果Redis没处理过,则将处理过的数据插入到业务DB上,再到最后把幂等Key插入到Redis上;
  简而言之,即通过Redis做前置处理DB唯一索引做最终保证来实现幂等性。(2)做好Consumer隔离
  在消息量非常大的情况下,实时和离线消费者同时消费一个集群,离线数据繁重的磁盘IO操作会直接影响实时业务的实时性和集群的稳定性。
  根据消费的实时性可以将消息消费者行为划分两类:实时消费者和离线消费者。实时消费者:对数据实时性要求较高;在实时消费的场景下,Kafka会利用系统的pagecache缓存,直接从内存转发给实时消费者(热读),磁盘压力为零,适合广告、推荐等业务场景。离线消费者(定时周期性消费者):通常是消费数分钟前或是数小时前的消息,这类消息通常存储在磁盘中,消费时会触发磁盘的IO操作(冷读),适合报表计算、批量计算等周期性执行的业务场景。(3)避免消息消费堆积延迟处理、控制速度,时间范围内分摊消息(针对实时性不高的消息);生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;避免很重的消费逻辑,优化consumerTPS:是否有大量DB操作;下游外部服务接口调用超时;是否有lock操作(导致线程阻塞);需要特别关注kafka异步链路中的涉及消息放大的逻辑;如果有较重的消费逻辑,需要调整xx参数,避免消息没消费完时,消费组退出,造成reblance等问题确保consumer端没有因为异常而导致消费hang住;如果使用的是消费者组,确保没有频繁地发生rebalance多线程消费,批量拉取处理;
  注:批量拉取处理时,需注意下kafka版本,springkafka2。2。11。RELEASE版本以下,如果配置kafka。batchListenertrue,但是将消息接收的元素设置为单个元素(非批量List),可能会导致kafka在拉取一批消息后,仅仅消费了头部的第一个消息。(4)避免Rebalance问题
  A。触发条件:消费者数量变化:新消费者加入、消费者下线(未能及时发送心跳,被踢出Group)、消费者主动退出消费组(Consumer消费时间过长导致)消费组内订阅的主题或者主题的分区数量发生变化;消费组对应的GroupCoorinator节点发生变化
  B。如何避免非必要rebalance(消费者下线、消费者主动退出消费组导致的reblance):需要仔细地设置session。timeout。ms(决定了Consumer存活性的时间间隔)和heartbeat。interval。ms(控制发送心跳请求频率的参数)的值。max。poll。interval。ms参数配置:控制Consumer实际消费能力对Rebalance的影响,限定了Consumer端应用程序两次调用poll方法的最大时间间隔。默认值是5分钟,表示Consumer程序如果在5分钟之内无法消费完poll方法返回的消息,那么Consumer会主动发起离开组的请求,Coordinator也会开启新一轮Rebalance。具体可以统计下历史的时间花费,把最长的时间为参考进行设置。(5)保证消息消费可靠性
  一般情况下,还是client消费broker丢消息的场景比较多,想client端消费数据不能丢,肯定是不能使用autoCommit的,所以必须是手动提交的。
  Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了,则此时消息就丢失了。
  (6)保证消息消费顺序性不同topic(乱序消息):如果支付与订单生成对应不同的topic,只能在consumer层面去处理了。同一个topic(乱序消息):一个topic可以对应多个分区,分别对应了多个consumer,与不同topic没什么本质上的差别。(可以理解为我们的服务有多个pod,生产者顺序发送消息,但被路由到不同分区,就可能变得乱序了,服务消费的就是无序的消息)同一个topic,同一个分区(顺序消息):Kafka的消息在分区内是严格有序的,例如把同一笔订单的所有消息,按照生成的顺序一个个发送到同一个topic的同一个分区。
  针对乱序消息:
  例如:订单和支付分别封装了各自的消息,但是消费端的业务场景需要按订单消息支付消息的顺序依次消费消息。宽表(业务主题相关的指标、维度、属性关联在一起的一张数据库表):消费消息时,只更新对应的字段就好,消息只会存在短暂的状态不一致问题,但是状态最终是一致的。例如订单,支付有自己的状态字段,订单有自己的状态字段,售后有自己的状态字段,就不需要保证支付、订单、售后消息的有序,即使消息无序,也只会更新自己的状态字段,不会影响到其他状态;消息补偿机制:将消息与DB进行对比,如果发现数据不一致,再重新发送消息至主进程处理,保证最终一致性;MQ队列:一个中间方(比如redis的队列)来维护MQ的顺序;业务保证:通过业务逻辑保障消费顺序;
  针对顺序消息:
  两者都是通过将消息绑定到定向的分区或者队列来保证顺序性,通过增加分区或者线程来提升消费能力。
  A。Consumer单线程顺序消费
  生产者在发送消息时,已保证消息在分区内有序,一个分区对应了一个消费者,保证了消息消费的顺序性。
  B。Consumer多线程顺序消费(具体策略在后面章节)
  单线程顺序消费的扩展能力很差。为了提升消费者的处理速度,除了横向扩展分区数,增加消费者外,还可以使用多线程顺序消费。
  将接收到的kafka数据进行hash取模(注意:如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。
  此外,这里通过配置中心进行开关、动态扩容缩容线程池。(7)处理Consumer的事务
  通过事务消息,可以很好的保证一些业务场景的事务逻辑,不会因为网络不可用等原因出现系统之间状态不一致。
  当更新任何一个服务出现故障时就抛出异常,事务消息不会被提交或回滚,消息服务器会回调发送端的事务查询接口,确定事务状态,发送端程序可以根据消息的内容对未做完的任务重新执行,然后告诉消息服务器该事务的状态。3。集群配置最佳实践3。1集群配置Broker评估:每个Broker的Partition数不应该超过2k、控制partition大小(不要超过25GB);集群评估(Broker的数量根据以下条件配置):数据保留时间、集群的流量大小;集群扩容:磁盘使用率应该在60以下、网络使用率应该在75以下;集群监控:保持负载均衡、确保topic的partition均匀分布在所有Broker上、确保集群的阶段没有耗尽磁盘或带宽3。2Topic评估Partition数:Partition数应该至少与最大consumergroup中consumer线程数一致;对于使用频繁的topic,应该设置更多的partition;控制partition的大小(25GB左右);考虑应用未来的增长(可以使用一种机制进行自动扩容);使用带key的topic;partition扩容:当partition的数据量超过一个阈值时应该自动扩容(实际上还应该考虑网络流量)。3。3分区配置
  设置多个分区在一定程度上是可以提高消费者消费的并发度,但是分区数量过多时可能会带来:句柄开销过大、生产端占用内存过大、可能增加端到端的延迟、影响系统可用性、故障恢复时间较长等问题。
  根据吞吐量的要求设置partition数:假设Producer单partition的吞吐量为Pconsumer消费一个partition的吞吐量为C而要求的吞吐量为T那么partition数至少应该大于TP、Tc的最大值4。性能调优
  调优目标:高吞吐量、低延时。4。1分层调优
  自上而下分为应用程序层、框架层、JVM层和操作系统层,层级越靠上,调优的效果越明显。
  4。2吞吐量(TPS)调优
  4。3延时调优
  5。稳定性测试
  kafka的稳定性测试主要在业务上线前针对Kafka实例集群健康性、高可用性的测试。5。1健康性检查
  (1)检查实例:查看Kafka实例对象中拿到所有的信息(例如IP、端口等);
  (2)测试可用性:访问生产者和消费者,测试连接。5。2高可用测试A。单节点异常测试:重启Leader副本或Follower副本所在Pod
  步骤:查看topic的副本信息删除相应pod脚本检测Kafka的可用性
  预期:对生产者和消费者的可用性均无影响。B。集群异常测试:重启所有pod
  步骤:删除所有pod脚本检测Kafka的可用性
  预期:所有brokerready后服务正常。二、运行时监控
  运行时监控主要包含集群稳定性配置与Kafka监控的最佳实践,旨在及时发现Kafka在运行时产生的相关问题与异常。
  1。集群稳定性监控1。1腾讯云CKafka集群配置
  合理进行kafka实例配,主要关注这几个数据:磁盘容量和峰值带宽消息保留时长;动态保留策略;A。磁盘容量和峰值带宽
  可根据实际业务的消息内容大小、发送消息qps等进行预估,可以尽量设置大点;具体数值可根据实例监控查看,如果短时间内磁盘使用百分比就达到较高值,则需扩容。
  峰值带宽最大生产流量副本数B。消息保留时长
  消息即使被消费,也会持久化到磁盘存储保留时长的时间。该设置会占用磁盘空间,如果每天消息量很大的话,可适当缩短保留时间。C。动态保留策略
  推荐开启动态保留设置。当磁盘容量达到阈值,则删除最早的消息,最多删除到保底时长范围外的消息(淘汰策略),可以很大程度避免磁盘被打满的情况。
  但有调整时不会主动通知,但我们可以通过配置告警感知磁盘容量的变化。1。2自建Kafka集群配置设置日志配置参数以使日志易于管理;了解kafka的(低)硬件需求;充分利用ApacheZooKeeper;以正确的方式设置复制和冗余;注意主题配置;使用并行处理;带着安全性思维配置和隔离Kafka;通过提高限制避免停机;保持低网络延迟;利用有效的监控和警报。1。3资源隔离A。Broker级别物理隔离
  如果不同业务线的topic会共享一块磁盘,若某个consumer出现问题而导致消费产生lag,进而导致频繁读盘,会影响在同一块磁盘的其他业务线TP的写入。
  解决:Broker级别物理隔离:创建Topic、迁移Topic、宕机恢复流程B。RPC队列隔离
  KafkaRPC队列缺少隔离,一旦某个topic处理慢,会导致所有请求hang住。
  解决:需要按照控制流、数据流分离,且数据流要能够按照topic做隔离。将call队列按照拆解成多个,并且为每个call队列都分配一个线程池。一个队列单独处理controller请求的队列(隔离控制流),其余多个队列按照topic做hash的分散开(数据流之间隔离)。
  如果一个topic出现问题,则只会阻塞其中的一个RPC处理线程池,以及call队列,可以保障其他的处理链路是畅通的。1。4智能限速
  整个限速逻辑实现在RPC工作线程处理的末端,一旦RPC处理完毕,则通过限速控制模块进行限速检测。配置等待时间,之后放入到delayedqueue中,否则放到responsequeue中。放入到delayedqueue中的请求,等待时间达到后,会被delayed线程放入到responsequeue中。最终在responsequeue中的请求被返回给consumer。2。Kafka监控白盒监控:服务或系统自身指标,如CPU负载、堆栈信息、连接数等;黑盒监控:一般是通过模拟外部用户对其可见的系统功能进行监控的一种监控方式,相关指标如消息的延迟、错误率和重复率等性能和可用性指标。
  2。1腾讯云CKafka告警
  针对CKafka,需要配置告警(此类告警一般为消息积压、可用性、集群机器健康性等检查)。A。指标
  如:实例健康状态、节点数量、健康节点数量、问题分区数、生产消息数、消费请求数、jvm内存利用率、平均生产响应时间、分区消费偏移量等。
  具体指标可以参考:https:cloud。tencent。comdocumentproduct59754514B。配置
  配置文档:https:cloud。tencent。comdocumentproduct59757244
  选择监控实例,配置告警内容和阈值。
  一般会对当前服务自身的kafka集群做告警配置,但是如果是依赖自身消息的下游服务出现消费问题,我们是感知不到了;而且针对消费端服务不共用同一个集群的情况,出现消息重复发送的问题,服务自身是很难发现的。C。预案
  在业务上线前,最好梳理下自身服务所涉及的topic消息(上游生产端和下游消费端),并细化告警配置,如果出现上游kafka异常或者下游kafka消息堆积可以及时感知。特别需要把可能有瞬时大量消息的场景(如批量数据导入、定时全量数据同步等)做一定的告警或者预案,避免服务不可用或者影响正常业务消息。2。2自建告警平台
  通过自建告警平台配置对服务自身的异常告警,其中包括对框架在使用kafka组件时抛出与kafka消费逻辑过程中抛出的业务异常。
  其中,可能需要异常升级的情况(由于)单独做下处理(针对springkafka):自定义kafka异常处理器:实现KafkaListenerErrorHandler接口的方法,注册自定义异常监听器,区分业务异常并抛出;消费Kafka消息时,将KafkaListener的errorHandler参数设置为定义的Kafka异常处理器;此后,指定的业务异常会被抛出,而不会被封装成Springkafka的框架异常,导致不能清晰地了解具体异常信息。2。3Kafka监控组件
  目前业界并没有公认的解决方案,各家都有各自的监控之道。KafkaManager:应该算是最有名的专属Kafka监控框架了,是独立的监控系统。KafkaMonitor:LinkedIn开源的免费框架,支持对集群进行系统测试,并实时监控测试结果。CruiseControl:也是LinkedIn公司开源的监控框架,用于实时监测资源使用率,以及提供常用运维操作等。无UI界面,只提供RESTAPI。JMX监控:由于Kafka提供的监控指标都是基于JMX的,因此,市面上任何能够集成JMX的框架都可以使用,比如Zabbix和Prometheus。已有大数据平台自己的监控体系:像Cloudera提供的CDH这类大数据平台,天然就提供Kafka监控方案。JMXTool:社区提供的命令行工具,能够实时监控JMX指标。答上这一条,属于绝对的加分项,因为知道的人很少,而且会给人一种你对Kafka工具非常熟悉的感觉。如果你暂时不了解它的用法,可以在命令行以无参数方式执行一下kafkarunclass。shkafka。tools。JmxTool,学习下它的用法。2。4KafkaMonitor
  其中,KafkaMonitor通过模拟客户端行为,生产和消费数据并采集消息的延迟、错误率和重复率等性能和可用性指标,可以很好地发现下游的消息消费情况进而可以动态地调整消息的发送。(使用过程中需注意对样本覆盖率、功能覆盖率、流量、数据隔离、时延的控制)
  KakfaMonitor优势:通过为每个Partition启动单独的生产任务,确保监控覆盖所有Partition。在生产的消息中包含了时间戳、序列号,KafkaMonitor可以依据这些数据对消息的延迟、丢失率和重复率进行统计。通过设定消息生成的频率,来达到控制流量的目的。生产的消息在序列化时指定为一个可配置的大小(验证对不同大小数据的处理能力、相同消息大小的性能比较)通过设定单独的Topic和ProducerID来操作Kafka集群,可避免污染线上数据,做到一定程度上的数据隔离。
  基于KafkaMonitor的设计思想,可以针对业务特点引入对消息的延迟、错误率和重复率等性能的监控告警指标。三、故障时解决
  防微杜渐,遇到问题故障时有完整的应急预案,以快速定位并解决问题。
  1。Kafka消息堆积紧急预案
  问题描述:消费端产生消息积压,导致依赖该消息的服务不能及时感知业务变化,导致一些业务逻辑、数据处理出现延迟,容易产生业务阻塞和数据一致性问题。
  方案:问题排查、扩容升配策略、消息Topic转换策略、可配置多线程的消费策略。1。1问题排查
  遇到消息积压时,具体可以从以下几个角度去定位问题原因:消息生产端数据量是否存在陡升的情况消息消费端消费能力是否有下降消息积压是发生在所有的partition还是所有的partition都有积压情况
  对于第1、2点导致的消息积压:为暂时性的消息积压,通过扩分区、扩容升配、多线程消费、批量消费等方式提高消费速度能在一定程度上解决这类问题。
  对于第3点导致的消息积压:可以采用消息Topic中转策略。1。2扩容升配策略检查生产端消费发送情况(主要检查是否继续有消息产生、是否存在逻辑缺陷、是否有重复消息发送)观察消费端的消费情况(预估下堆积消息的处理清理以及是否有降低趋势)若为生产端问题,则评估是否可以通过增加分区数、调整偏移量、删除topic(需要评估影响面)等解决消费端新增机器及依赖资源,提高消费能力;如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。1。3配置多线程的消费策略
  简而言之,即线程池消费动态线程池配置策略:将接收到的kafka数据进行hash取模(如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。
  设计思路:在应用启动时初始化对应业务的顺序消费线程池(demo中为订单消费线程池)订单监听类拉取消息提交任务至线程池中对应的队列线程池的线程处理绑定队列中的任务数据每个线程处理完任务后增加待提交的offsets标识数监听类中校验待提交的offsets数与拉取到的记录数是否相等,如果相等则手动提交offset(关闭kafka的自动提交,待本次拉取到的任务处理完成之后再提交位移)
  另外,可以根据业务流量调整的线程配置与pod的配置,如高峰期设置一个相对较高的并发级别数用来快速处理消息,平峰期设置一个较小的并发级别数来让出系统资源。这里,可以参考美团提供的一种配置中心修改配置动态设置线程池参数的思路,实现动态的扩容或者缩容。
  实现了动态扩容与缩容:通过配置中心刷新OrderKafkaListener监听类中的配置concurrent的值,通过set方法修改concurrent的值时,先修改stopped的值去停止当前正在执行的线程池。执行完毕后通过新的并发级别数新建一个新的线程池,实现了动态扩容与缩容。
  此外,还可以新增开关,它设置为true是可以中断启动中的线程池,故障时进行功能开关。
  注意:如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。1。4Topic中转策略
  当消息积压是发生在所有的partition还是所有的partition都有积压情况时,只能操作临时扩容,以更快的速度去消费数据了。
  设计思路:临时建立好原先10倍或者20倍的queue数量(新建一个topic,partition是原来的10倍)。然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息。这种做法相当于临时将queue资源和consumer资源扩大10倍,以正常速度的10倍来消费消息。等快速消费完了之后,恢复原来的部署架构,重新用原来的consumer机器来消费消息。
  改进:consumer程序可以写在服务里面;指定一个预案topic,在服务中预先写好对预案topic采用策略模式进行业务topic预案topic的转换;
  注意:如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。需要有个单独的topic转换服务,或修改服务代码,或在事前将多线程逻辑写好或者2。Kafka消费异常导致消费阻塞
  问题描述:某个消息消费异常或者某个操作较为耗时,导致单个pod的消费能力下降,甚至产生阻塞。
  方案:设置偏移量;开关多线程的消费策略;2。1设置偏移量调整偏移量:联系运维,将offset后移一位;消息补推:针对跳过的消息或某个时间段内的数据进行消息补推;如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。2。2开关多线程的消费策略
  参考上面的可配置多线程的消费策略,在发生阻塞时开启多线程消费开关。
  注:需要修改代码或者在事前将多线程逻辑写好3。Kafka消息丢失预案
  问题描述:服务没有按照预期消费到kafka消息,导致业务产生问题
  方案:根因分析;消息补推;3。1根因分析
  (1)生产端是否成功发送消费(源头丢失)Broker丢失消息:Kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中,异步刷盘有肯能造成源头数据丢失;Producer丢失消息:发送逻辑存在Bug,导致消息为发送成功。
  解决:需要检查生产端与集群健康性;消息补发。
  (2)是否被成功消费
  Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了。
  此外,如果消费逻辑有bug,也导致消息丢失的假象。
  解决:修复问题,视情况修改消费确认机制。
  (3)是否有其他服务共用了同一个消费组
  多服务误用同一个消费组会导致消息一定比率或规律性丢失。
  例如,创建用户的kafka消息,可能价格中心和促销服务误用了一个消费组,导致每个服务都是消费了部分消息,导致一些问题出现偶现的情况。
  解决:修改配置,重启服务,各种建立的消费组;事前需要有检查是否有多个服务共用一个消费的情况(检测比对);3。2消息补推通过业务影响查询影响的数据信息;构建kafka消息,进行消息补偿;如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。
  针对每个对外发送的服务,生产端一般都需要有较为完善的消息补推接口,并且消费端也需要保障消息消费的幂等)四、其他
  1。Kafka成本控制
  机器、存储和网络1。1机器
  需要重新评估你的实例类型决策:你的集群是否饱和?在什么情况下饱和?是否存在其他实例类型,可能比你第一次创建集群时选择的类型更合适?EBS优化实例与GP23或IO2驱动器的混合是否真的比i3或i3en机器(及其带来的优势)有更好的性价比?1。2存储与网络
  压缩在Kafka中并不新鲜,大多数用户已经知道了自己可以在GZIP、Snappy和LZ4之间做出选择。但自从KIP110被合并进Kafka,并添加了用于Zstandard压缩的压缩器后,它已实现了显著的性能改进,并且是降低网络成本的完美方式。
  以生产者端略高的CPU使用率为代价,你将获得更高的压缩率并在线上挤进更多信息。
  Amplitude在他们的帖子中介绍,在切换到Zstandard后,他们的带宽使用量减少了三分之二,仅在处理管道上就可以节省每月数万美元的数据传输成本。
  1。3集群
  不平衡的集群可能会损害集群性能,导致某些borker比其他broker的负载更大,让响应延迟更高,并且在某些情况下会导致这些broker的资源饱和,从而导致不必要的扩容,进而会影响集群成本。
  此外,不平衡集群还面临一个风险:在一个broker出故障后出现更高的MTTR(例如当该broker不必要地持有更多分区时),以及更高的数据丢失风险(想象一个复制因子为2的主题,其中一个节点由于启动时要加载的segment过多,于是难以启动)。2。消息消费的幂等
  定义:
  所谓幂等性,数学概念就是:f(f(x))f(x)。f函数表示对消息的处理。通俗点来讲就是,在消费者收到重复消息进行重复处理时,也要保证最终结果的一致性。
  比如,银行转账、下单等,不管重试多少次,都要保证最终结果一定是一致的。2。1利用数据库的唯一约束
  将数据库中的多个字段联合,创建一个唯一约束,即使多次操作也能保证表里至多存在一条记录(如创建订单、创建账单、创建流水等)。
  此外,只要是支持类似INSERTIFNOTEXIST语义的存储类系统(如Redis的SETNX)都可以用于实现幂等消费。2。2设置前置条件给数据变更设置一个前置条件(版本号version、updateTime);如果满足条件就更新数据,否则拒绝更新数据;在更新数据的时候,同时变更前置条件中的数据(版本号1、更新updateTime)。2。3记录并检查操作给每条消息都记录一个全局唯一ID;消费时,先根据这个全局唯一ID检查这条消息是否有被消费过;如果没有消费过,则更新数据,并将消费状态置为已消费状态。
  其中,在检查消费状态,然后更新数据并且设置消费状态中,三个操作必须作为一组操作保证原子性。
  参考:
  〔1〕https:www。infoq。cnarticleucSru1uKkSswLXPcjQgC?sourceappshare
  〔2〕https:blog。csdn。netqq32179907articledetails122599769
  〔3〕https:blog。csdn。netqq32179907articledetails122599769
  〔4〕https:zhuanlan。zhihu。comp513559802?utmsourcewechatsessionutmmediumsocialutmoi689250073002930176utmcampaignshareopn
  〔5〕https:blog。csdn。netphilip502articledetails118997899?utmmediumdistribute。waprelevant。nonetaskblog2defaultbaidujsbaidulandingworddefault0118997899blog125192952。waprelevantmultiplatformwhitelistv1spm1001。2101。3001。4242。1utmrelevantindex1
  〔6〕https:www。zhihu。comquestion483747691answer2392949203?utmsourcewechatsessionutmmediumsocialutmoi689250073002930176utmcontentgroup3Answerutmcampaignshareopn
  〔7〕https:zhuanlan。zhihu。comp354772550?utmsourcewechatsessionutmmediumsocialutmoi689250073002930176utmcampaignshareopn
  〔8〕https:www。infoq。cnarticlecontrastwithkafkaandjingdongjmq?sourceappshare
  〔9〕https:www。infoq。cnarticleBF3mm9haDscdHCXOLlf?sourceappshare
  〔10〕https:www。infoq。cnarticlewmM8WXzLEgfGMKYpbF0N?sourceappshare
  〔11〕https:www。infoq。cnarticleQ0oQzLQiay31MWiOBJH?sourceappshare

足三里健脾和胃消食补气足三里,足阳明胃经的下合穴,取之以健脾化痰,止咳平喘,胃经通络,宁心安神,调理气血,增强人体的免疫力。定位:在小腿外侧,犊鼻下3寸,胫骨前嵴外1横指处,犊鼻与解溪连线上。……都要坚持今天包子的几件小事:1。早上坐电梯,停在6楼,一家三口打算进来,孩子觉得太挤不肯进。电梯里的外卖小哥退了出去,想让多一点的空间给他们。突然心里百味杂陈,有点感动,有点心酸……梅罗反差!C罗再次错失良机,梅西替补3分钟2球阿根廷30牙买梅罗是永恒的话题,本赛季C罗由于个人原因未能与曼联参与季前赛,这直接导致37岁的C罗在身体以及状态均未达预期,本赛季俱乐部国家队0运动战进球;反观梅西本赛季渐入佳境进球如麻!……体育总局国家队严禁所有球员新纹身,已有需自行清除喜欢本频道的别忘了回复三连哦,你的支持是我努力的希望众所周知,现在的国字号球员不仅身体素质和球技与参加过02年韩日世界杯那一代国足球员相比差得太多,拿着高额的年薪不说,他……京东副总裁蔡磊43岁身患渐冻症,投千万寻药,羡慕路边拾荒老人蔡磊对于已经四十三岁的京东副总裁蔡磊来说,2019年是他人生中最糟糕的一年。因为他得知自己患上了罕见的渐冻症(ALS),受渐冻症折磨直至死去的病人不计其数。在……沉淀自己,是一个人了不起的能力喜欢美食的朋友都知道,成就一盘美味的是一份反复烹煮的卤水。不断地沉淀,就是为了那一刻的香飘四溢。人生也需要沉淀,生命中的很多事情,有时需要慢下来,用时间去沉淀美好。沉淀不是消沉……豪威发布超小型2亿像素传感器据悉,OVB0A采用了豪威自家的PureCelPlusS晶片堆叠技术,在11。4英寸的光学结构中,完成了2亿像素的封装,单个像素仅有0。56微米。在设计上,这颗传感器专为……季节限定的落日咖啡馆,也是天台露营电影院食无定味负不起任何人的责只是某个点刚好打动我或许你也是小翠的第978篇推荐请先欣赏在这里喝咖啡、喝酒时会看到的周围天台与繁华的小……冰箱怎么选才合适?建议大家记住这5点参数,真的很实用冰箱是我们家家户户都必备的电器,一般买一台要用好久好久,那么,到底该怎么选,才能选到一台性价比高又实用的冰箱呢?其实,冰箱的选购方法并不困难,只需要大家根据这5点参数来就……今年流行这4款九分裤,每一条都显瘦显高,够你搭配这个夏天时尚圈的变化实在是太快了,每天都有不同的流行单品出现,而以前流行的爆款单品可能在今年早就过时了,不过有一件单品每年夏天都会流行,那就是九分裤,相比短裤更遮肉,对微胖身材很友好,……曝湖凯正在商讨4换2交易方案!浓眉辅助塔图姆,布朗联手詹皇冲对于湖人当下的阵容来说,必须要做出变革,才有机会在新赛季打出更好的战绩,否则对于詹姆斯来说终究还是巧妇难为无米之炊。他的身边需要克莱这样攻防兼备的3D,而不是一群能得分却不能防……医生提醒心脏病患者要注意这5个饮食细节,别不当回事心脏相当于人体的中枢工作室,只有心脏维持正常的运作,全身的器官组织才能得到充足的氧气和血液供应。一旦心脏出现问题,所有器官都会因为缺血缺氧而出现问题。正是由于心脏的责任重大,心……
新春走基层丨世界冠军登榜激励队伍再出发升国旗、奏国歌,从小队员手中接过鲜花,将自己的照片挂到世界冠军墙上,这是每一名进入国家体操队的运动员所追求的目标。以登榜仪式开启冬训,以榜样激励年轻队员,是体操队坚持的传统,也……一加11和一加AcePro价格差距不大,买哪款更合适?这两款价格差距在500元,升级的配置还算比较多的,个人还是更建议配置更好的一加11:1、性能配置大升级。芯片以及运存都升级,骁龙8Gen2相比骁龙8性能提升也比较大,加上LPD……新赵灵儿路透来了,曾因扫黑风暴出圈,网友蛮有灵气最近,《新仙剑奇侠传》赵灵儿路透曝光,据悉,这个版本的赵灵儿是由22岁的新人杨雨潼扮演,通过路透图来看,姑娘扮相蛮美的,虽然图片不是高清,但还是感觉到这个角色的清纯气质。……为霞尚满天记老骥水庆霞17岁高龄接触足球,35岁高龄退役,55岁接起中国女足教鞭,56岁率领中国女足时隔16年重夺亚洲杯。球员时代5次参加亚洲杯,5次夺冠,主教练时代延续亚洲杯冠军之路,一直保……女孩12岁身高154,方案一能长到168,方案二能长到165这个身高要求,是孩子将来要从事舞蹈,模特等职业吗?也没有,就是我的身高还可以嘛,另外她们这个年龄的孩子普遍都比较高,这个孩子要是长得偏矮的话,我觉得有的时候跟朋友在一起呀……2023中国经济开局十问!国家统计局相关部门负责人问答录国家统计局18日公布今年一季度国民经济运行情况。一季度,随着疫情防控较快平稳转段,各项稳增长稳就业稳物价政策举措靠前发力,积极因素累积增多,国民经济企稳回升,开局良好。经……我国建成国家氢能动力质量监督检验中心来源:央视新闻客户端昨天(9日),我国首个国家氢能动力质量监督检验中心在重庆投入使用,这个中心的检测内容覆盖新能源汽车领域中,氢燃料电池汽车的全产业链。新建成的检验……开盘A股三大指数高开沪指涨0。06,全聚德复牌跌停金融界1月10日消息今日A股三大指数集体小幅高开,沪指涨0。06,报3178。02点,深成指涨0。05,报11455。6点,创业板指涨0。05,报2441。5点;科创50指数涨……部分苹果iPhone14手机将采用iPhone13同款A15IT之家12月21日消息,近期苹果iPhone14系列的爆料开始增多,而对于每年正常数字迭代的A系列芯片在明年可能出现变化。据微博博主手机晶片达人透露,不是所有2022年的iP……盘点娱乐圈中71对长得像的明星,大家看看,够全了吧这两位是娱乐圈中公认的长得像的,不过感觉章子怡的气质更好,气场更大蒋勤勤跟孙俪,自然不用说,她俩也是超像王珞丹,白百何,这对姐妹花也是很像啊董璇,佟丽娅,他俩……一贯的极简主义,HelmutLang2022早秋系列HelmutLangPreFall2022美国高端服饰品牌HELMUTLANG发布了其2022秋季系列,带回了品牌的经典服饰,采用上衣与下装融洽的搭配,包括从休闲以及时尚……CBA处罚杨鸣,众说纷纭中篮联公布了一张罚单,辽宁男篮主教练杨鸣赛后缺席赛后新闻发布会,给予批评警告,并且罚款一万元。看来辽宁男篮没有和CBA公司搞好关系啊。联赛处罚杨鸣,意思是为严肃联赛纪律,……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网