应用办公生活信息教育商业
投稿投诉
商业财经
汽车智能
教育国际
房产环球
信息数码
热点科技
生活手机
晨报新闻
办公软件
科学动态
应用生物
体育时事

Python神器Celery源码阅读(4)

  本篇我们继续聊聊kombu这个python实现的消息库中的一些常用算法实现,和各种排序算法不一样,都是解决一些具体的业务问题,非常有用。本文包括下面几个部分:LRU缓存淘汰算法令牌桶限流算法RoundRobin调度算法LamportClock时间戳算法LaxBoundedSemaphore有限信号量算法LRU缓存淘汰算法
  缓存,顾名思义,就是将计算结果暂时存起来,以供后期使用,这样可以省去重复计算的工作。比如我们计算斐波那契数列的递归算法:根据定义递归求解deffib(n):ifn1:returnnreturnfib(n1)fib(n2)
  我们求n为5的数,展开数学公式大概如下(这里简化python函数fib名称为数学函数f):f(5)f(4)f(3)f(3)f(2)f(2)f(1)f(2)f(1)f(1)f(0)f(1)f(0)f(1)f(1)f(0)f(1)f(1)f(0)f(1)f(0)f(1)5
  根据数学公式,我们可以知道,在执行f(5)过程中,重复执行了5次f(1),3次f(0)。要提高执行效率,就可以用到缓存。最简单的实现版本:根据定义递归求解cache{}deffibv1(n):ifnincache:returncache〔n〕ifn1:resultnelse:resultfib(n1)fib(n2)cache〔n〕resultreturnresult
  这种实现方式有2个弊端,一个是依赖一个外部的cache变量,另一个是cache功能和fib函数绑定,还需要修改fib函数。我们可以通过一个装饰器实现这个cache,而不用改动fib函数:defcachedecorator(fun):cache{}defwrapper(args,kwargs):ifargsincache:returncache〔args〕else:retfun(args,kwargs)cache〔args〕retreturnretreturnwrapper
  使用的时候可以直接给fib函数添加上装饰器:cachedecoratordeffib(n):。。。
  这种缓存实现实现方式,还是会有问题:无法进行清理,内存会持续增长。编程中有一句话是:命名和缓存失效是计算机科学里面最难应对的两件事。关于缓存淘汰有各种算法,请见参考链接,我这里重点介绍一下LRU和LFU。LRU(Leastrecentlyused)最早使用淘汰算法,核心特点是:最早的数先淘汰LFU(Leastfrequentlyused)最少使用淘汰算法,核心特点是:最少的数先淘汰
  关于LRU,在我之前介绍tinydb时候有过介绍。其中的实现如下:classLRUCache(abc。MutableMapping,Generic〔K,V〕):definit(self,capacityNone):self。capacitycapacity缓存容量self。cacheOrderedDict()有序字典defget(self,key:K,default:DNone)Optional〔Union〔V,D〕〕:valueself。cache。get(key)从换成获取ifvalueisnotNone:delself。cache〔key〕self。cache〔key〕value更新缓存顺序returnvaluereturndefaultdefset(self,key:K,value:V):ifself。cache。get(key):delself。cache〔key〕self。cache〔key〕value更新缓存顺序及值else:self。cache〔key〕valueifself。capacityisnotNoneandself。lengthself。capacity:self。cache。popitem(lastFalse)淘汰最古老的数据
  LRU的特点只要保持缓存数据是有序的,我们甚至不需要自己实现,使用系统functools中的实现:fromfunctoolsimportlrucachelrucache()deffib(n):。。。
  kombu中给我们提供了一个线程安全的版本,主要实现:kombu5。0。0kombuutilsfunctional。pyclassLRUCache(UserDict):LRUCacheimplementationusingadoublylinkedlisttotrackaccess。definit(self,limitNone):self。limitlimitself。mutexthreading。RLock()self。dataOrderedDict()defgetitem(self,key):withself。mutex:valueself〔key〕self。data。pop(key)returnvaluedefsetitem(self,key,value):removeleastrecentlyusedkey。withself。mutex:ifself。limitandlen(self。data)self。limit:self。data。pop(next(iter(self。data)))self。data〔key〕value。。。
  上面代码在设置和获取数据时候都先获取锁,然后再进行数据操作。
  关于缓存使用,除了通过业务场景判断适用那种淘汰算法外,还可以使用具体的缓存命中率指标进行分析:defmemoize(maxsizeNone,keyfunNone,CacheLRUCache):Decoratortocachefunctionreturnvalue。defmemoize(fun):mutexthreading。Lock()cacheCache(limitmaxsize)wraps(fun)defM(args,kwargs):ifkeyfun:keykeyfun(args,kwargs)else:keyargs(KEYWORDMARK,)tuple(sorted(kwargs。items()))try:withmutex:valuecache〔key〕exceptKeyError:valuefun(args,kwargs)未命中需要执行函数M。misses1withmutex:cache〔key〕valueelse:命中率增加M。hits1returnvaluedefclear():Clearthecacheandresetcachestatistics。清理缓存及统计cache。clear()M。hitsM。misses0统计信息M。hitsM。misses0M。clearclearM。originalfuncfunreturnMreturnmemoize
  memoize的实现并不复杂,增加了hitsmisses数据,可以统计分析缓存的命中率,帮助正确使用LRU缓存。还添加了clear接口,可以在需要的时候对缓存直接进行清理。
  注意memoize使用了一个锁,在LRUCache还是使用了一个锁,这个锁的使用,我们以后再讲。令牌桶限流算法
  限流是指在系统面临高并发、大流量请求的情况下,限制新的流量对系统的访问,从而保证系统服务的安全性。常用的限流算法有计数器、漏斗算法和令牌桶算法。其中计数器算法又分固定窗口算法、滑动窗口算法,后者我们在TCP协议中经常会碰到。
  算法中存在一个令牌桶,以恒定的速率向令牌桶中放入令牌。当请求来时,会首先到令牌桶中去拿令牌,如果拿到了令牌,则该请求会被处理,并消耗掉令牌;如果拿不到令牌,则该请求会被丢弃。当然令牌桶也有一定的容量,如果满了令牌就无法放进去了,这样算法就有限流作用。
  又因为令牌产生的速率是很定的,如果消费速率较低,桶里会额外缓存一部分令牌,用于应对流量突发时候的消耗。下面是算法的示意图:
  TokenbucketDiagram
  我们具体看看kombu中提供的实现。TokenBucket类:classTokenBucket::Therateintokenssecondthatthebucketwillberefilled。fillrateNone:Maximumnumberoftokensinthebucket。capacity1:Timestampofthelasttimeatokenwastakenoutofthebucket。timestampNonedefinit(self,fillrate,capacity1):容量上限self。capacityfloat(capacity)剩余令牌数,初始等于容量上限self。tokenscapacity填充率self。fillratefloat(fillrate)self。timestampmonotonic()数据容器self。contentsdeque()defadd(self,item):self。contents。append(item)defpop(self):先进先出returnself。contents。popleft()
  代码包括:令牌速率fillrate桶的容量上限一个时间戳剩余令牌数算法提供了一个基于双端队列的数据容器,可以对容器进行先进先出操作
  令牌桶是否可用的判断:defcanconsume(self,tokens1):Checkifoneormoretokenscanbeconsumed。Returns:bool:trueifthenumberoftokenscanbeconsumedfromthebucket。Iftheycanbeconsumed,acallwillalsoconsumetherequestednumberoftokensfromthebucket。Callswillonlyconsumetokens(thenumberrequested)orzerotokensitwillneverconsumeapartialnumberoftokens。iftokensself。gettokens():消费n个令牌self。tokenstokensreturnTruereturnFalsedefgettokens(self):ifself。tokensself。capacity:记录当前时间nowmonotonic()计算已经流失的令牌数量deltaself。fillrate(nowself。timestamp)更新容量上限或者剩余令牌和流失数量之和self。tokensmin(self。capacity,self。tokensdelta)self。timestampnowreturnself。tokens
  我们可以看到,算法在进行令牌消费判断的同时,还会对桶的剩余流量进行自校正,很巧妙。
  TokenBucket的使用在ConsumerMixin的run方法中。创建了一个速率为1的令牌桶,然后持续的进行消费。如果有令牌则消费消费者上的消息;如果没有令牌则进行休眠ch23celerykombu5。0。0kombumixins。py:240classConsumerMixin:defrun(self,tokens1,kwargs):restartlimitTokenBucket(1)。。。无限循环whilenotself。shouldstop:try:有令牌消费ifrestartlimit。canconsume(tokens):pragma:nocoverforinself。consume(limitNone,kwargs):passelse:没浪费休眠sleep(restartlimit。expectedtime(tokens))excepterrors:。。。
  其中的休眠时间,是由令牌桶根据期望值计算得来:defexpectedtime(self,tokens1):Returnestimatedtimeoftokenavailability。Returns:float:thetimeinseconds。tokensself。gettokens()tokensmax(tokens,tokens)return(tokenstokens)self。fillrateRoundRobin调度算法
  RoundRobin调度算法,最常见的大概是在nginx。RoundRobin方式可让nginx将请求按顺序轮流地分配到后端服务器上,它均衡地对待后端的每一台服务器,而不关心服务器实际的连接数和当前的系统负载,循环往复。在kombu中也提供了几种类似的调度算法:轮询调度公平调度
  我们先看RoundRobin方式:classroundrobincycle:Iteratorthatcyclesbetweenitemsinroundrobin。轮询调度算法definit(self,itNone):self。itemsitifitisnotNoneelse〔〕defupdate(self,it):Updateitemsfromiterable。更新列表self。items〔:〕itdefconsume(self,n):Consumenitems。消费n个元素returnself。items〔:n〕defrotate(self,lastused):Movemostrecentlyuseditemtoendoflist。旋转:把最后一个元素放到列表某尾itemsself。itemstry:items。append(items。pop(items。index(lastused)))exceptValueError:passreturnlastused
  算法实现很简单,就是一个有序队列,可以每次消费前n个有序元素,并且可以将最近使用的元素旋转到队尾。下面是旋转的单元测试:deftestroundrobincycle():itcyclebyname(roundrobin)(〔A,B,C〕)assertit。consume(3)〔A,B,C〕it。rotate(B)assertit。consume(3)〔A,C,B〕it。rotate(A)assertit。consume(3)〔C,B,A〕it。rotate(A)assertit。consume(3)〔C,B,A〕it。rotate(C)assertit。consume(3)〔B,A,C〕
  还有一种公平循环的调度算法:classFairCycle:Cyclebetweenresources。Consumefromasetofresources,whereeachresourcegetsanequalchancetobeconsumedfrom。Arguments:fun(Callable):Callbacktocall。resources(Sequence〔Any〕):Listofresources。predicate(type):Exceptionpredicate。definit(self,fun,resources,predicateException):self。funfunself。resourcesresourcesself。predicatepredicate初始位置self。pos0
  FairCycle是一种资源之间公平循环的调度算法,构造函数中:利用资源的函数多个资源的集合
  使用的方式是使用get方法传入回调:defnext(self):while1:try:resourceself。resources〔self。pos〕位置加1self。pos1returnresourceexceptIndexError:到尾部后,重置位置self。pos0ifnotself。resources:raiseself。predicate()defget(self,callback,kwargs):Getfromnextresource。无限重试fortriedincount(0):forinfinity获取资源resourceself。next()try:利用资源returnself。fun(resource,callback,kwargs)exceptself。predicate:reraisewhenretriesexchausted。容错上限iftriedlen(self。resources)1:raise
  调度主要体现再获取资源的next函数上,没次获取资源后位置标志进行后移,到尾部后在重置到0,继续下一轮循环。算法还可以对资源进行容错,也就是如果获取到的资源无法正常使用,还可以尝试使用下一个资源进行重试。LamportClock算法
  兰波特时间戳算法(LamportClock),使用逻辑时间戳作为值的版本以允许跨服务器对值进行排序,是解决分布式系统时间一致的重要算法。
  服务器上的系统时间,使用物理的晶体振荡测量,会有不准的情况。我们会经常遇到服务器或者快或者慢的情况,一般使用NTP服务,来和互联网上的某个时间源进行同步。如果本地时间提前了,进行联网校时后,会出现本地时间倒退的问题。而对于两台不同的服务器上,要进行时间统一,就更不能使用系统时间。
  兰波特时间戳算法,原理如下:维护一个数字来表示时间戳,并且在每个集群节点都维护一个Lamport时钟的实例。如果事件在节点内发生,时间戳加1事件要发送到远端,则在消息总带上时间戳接收到远端的消息,时间戳Max(本地时间戳,消息中的时间戳)1(进行校正跳跃)
  这个过程,可以看下面的图示:
  从图中可以看到下面两点:对于每个节点的事件时间,都是递增有序的,比如A是〔4,5,7,10〕,B节点是〔2,3,4,6,7〕,C节点是〔1,5,6,8,9〕时间戳不是全局唯一,不同节点之间会存在序号重复,比如4号消息在A和B节点都存在,5号消息在A和C节点存在
  了解算法的场景和原理后,我们再来看算法的实现。classLamportClock::Theclockscurrentvalue。value0definit(self,initialvalue0,LockLock):self。valueinitialvalueself。mutexLock()defadjust(self,other):withself。mutex:valueself。valuemax(self。value,other)1returnvaluedefforward(self):withself。mutex:self。value1returnself。value
  算法的实现其实非常简单,就是转发的时候时间戳1;收到消息后进行校正,这个过程中使用线程锁,保证本地的有序。LaxBoundedSemaphore有限信号量算法
  前面讲的几种算法,都是基于线程锁实现。使用锁会降低效率,如果在协程中,可以使用无锁的方案,会更高效。kombu的LaxBoundedSemaphore实现,可以作为一种参考。
  我们先看使用示例:fromfutureimportprintstatementasprintfignore:justfoolingstupidpyflakesxLaxBoundedSemaphore(2)x。acquire(printf,HELLO1)HELLO1x。acquire(printf,HELLO2)HELLO2x。acquire(printf,HELLO3)x。waitersprivate,donotaccessdirectly〔print,(HELLO3,)〕x。release()HELLO3
  示例展示了几步:创建一个大小为2的LaxBoundedSemaphore信号量申请信号,并且执行print函数,可以立即执行继续申请信号执行print函数,也可以立即执行再申请信号执行print函数,这时候由于信号超标,函数不会立即执行手工释放信号量,最后一次申请的print函数自动执行
  下面是具体的实现,LaxBoundedSemaphore的构造函数:classLaxBoundedSemaphore:definit(self,value):信号容量self。initialvalueself。valuevalue使用双端队列,FIFOself。waitingdeque()self。addwaiterself。waiting。appendself。popwaiterself。waiting。popleft
  申请执行回调函数,会进行信号判断,信号充足会执行行回调并消减一次信号量;信号量不足则将函数及参数放入代办的队列:defacquire(self,callback,partialargs,partialkwargs):Acquiresemaphore。Thiswillimmediatelyapplycallbackiftheresourceisavailable,otherwisethecallbackissuspendeduntilthesemaphoreisreleased。Arguments:callback(Callable):Thecallbacktoapply。partialargs(Any):partialargumentstocallback。valueself。valueifvalue0:容量不够的时候先暂存执行函数,并不更改可用数量self。addwaiter((callback,partialargs,partialkwargs))returnFalseelse:可用数量1self。valuemax(value1,0)直接执行函数callback(partialargs,partialkwargs)returnTrue
  使用release时候会取出头部的代办函数,并进行执行,此时信号量不增不减。如果代办全部执行完成后,则逐步恢复信号量到默认值:defrelease(self):Releasesemaphore。Note:Ifthereareanywaitersthiswillapplythefirstwaiterthatiswaitingfortheresource(FIFOorder)。try:waiter,args,kwargsself。popwaiter()exceptIndexError:无缓存则只增加可用数量self。valuemin(self。value1,self。initialvalue)else:有缓存则执行第一个缓存,可用数量不变还是小于0waiter(args,kwargs)小结
  本篇文章,我们学习了5种实用的业务算法。LRU缓存淘汰算法,可以对缓存中最早的数据进行淘汰。令牌桶限流算法,可以协助进行服务流量限流,较好的保护后端服务,避免突发流量的到时的崩溃。RoundRobin调度算法,可以进行负载的均衡,保障资源的平衡使用。LamportClock时间戳算法,可以在分布式系统中,进行不同服务之间的有序时间戳同步。LaxBoundedSemaphore有限信号量算法,是一种无锁算法,可高效的提供资源使用控制。小技巧
  kombu中提供了一个自动重试算法,可以作为重试算法的模版:kombu5。0。0kombuutilsfunctional。pydefretryovertime(fun,catch,argsNone,kwargsNone,errbackNone,maxretriesNone,intervalstart2,intervalstep2,intervalmax30,callbackNone,timeoutNone):kwargs{}ifnotkwargselsekwargsargs〔〕ifnotargselseargsintervalrangefxrange(intervalstart,intervalmaxintervalstart,intervalstep,repeatlastTrue)超时时间endtime()timeoutiftimeoutelseNoneforretriesincount():try:returnfun(args,kwargs)exceptcatchasexc:超过次数ifmaxretriesisnotNoneandretriesmaxretries:raise超过时间ifendandtime()end:raise。。。休眠sleep(1。0)
  从模版可以看到重试时候使用次数和超时时间两个维度进行跳出(不可能无限重试):使用count()进行无限循环使用time()进行超时限定使用maxretries容错上限次数限定每次错误后,都休眠一段时间,给被调用方机会,提高下一次成功的概率。

可能发现生命的恒星德尔塔帕沃尼斯(DeltaPavonis)这个黄星和我们的太阳一样古老,但又和太阳非常相似。它是寻找类地球行星和生命的主要目标之一。星座:帕沃距离:19。92……写作金句摘抄1。心中燃烧真理之火让信念流彩,头脑深植信仰之根让忠诚如磐,行动汇聚奋斗之力让梦想成真。2。点燃了理想信念之灯,筑牢了学习成长之路,强化了实干担当之本,高举了创新作为之斧……7款珍藏已久的神仙级Windows软件,款款深入人心,让电脑相信每个人的电脑中都有自己喜欢的软件,这些软件往往会在关键时刻帮助到你,而作为一个软件爱好者,我也收藏了许多实用的电脑软件,今天就来给大家分享7个我私藏已久的Windows软件……S25终极边路敲定,强度超过木兰,成为当之无愧边路一霸大家好,我是超大神兽。原创不易,期待您的关注。大家都知道,在王者荣耀中,有各种各样的边路英雄。随着S26赛季的即将来临,不少玩家都会选择适合自己的位置来进行上分。而在王者……尊重每个人的选择有人说这个世界上没有完全相同的两片树叶,这话有道理,就像我们在草原上看羊群,我们看每只羊长得都一样,而在牧羊者看来都有区别。所以,千姿百态是这个世界的常态,整齐划一是人的一种妄……不仅干好本职,须眉剃须刀新玩法,居然可以当充电宝手电和风扇剃胡须是每个男人都要干的事,毕竟几天不剃胡须,胡子渣又长长了,作为须眉(SMATE)的粉丝,须眉剃须刀T6pro推出后,小编及时入手,想抢先体验一把这剃须刀的新玩法。从产……2021西藏青海自驾游(川进青出)西藏是我一直想去的地方,为此平时经常查找有关进藏的资料,可以说酝酿了好久,准备了好多年,终于在2021年7月孩子放假时成行!西藏是一种毒,不去解不了!由于是首次进藏……这个低调的墨西哥镇子,色彩缤纷文艺可爱,让人念念不忘最近想给大家讲讲给我记忆最深刻的一些环球故事,一起回顾下曾经的美丽世界。第一个故事,想给大家讲讲关于国外跨年,虽然一直在各国游历着,但鲜少有在国外跨年的经历。人生第一次也……和平精英全球总决赛槽点多,INFIN选手单局狂搜26个大包和平精英全球总决赛PMGC在北京时间2021年11月30日12月24日火热开战。四十支来自世界各地的最强和平精英职业战队齐聚一堂争夺冠军。届时整场比赛将会在虎牙实时直播,……陈欧旗下共享充电现身鱿鱼游戏,想不到这才是绝招?如果要问最近有什么热剧流行,韩剧《鱿鱼游戏》绝对位列第一。除了多次登上国内微博热搜、坐拥15亿阅读量的热度,《鱿鱼游戏》在国外也是一炮而红。YouTube、Ins、微博、知乎等……发行商成立30周年Steam消逝的光芒初代免费玩以消逝的光芒系列被玩家所熟知的Techland迎来了成立30周年,该厂商近日在Steam平台开启了30周年特卖和优惠活动,也开放了《消逝的光芒》初代免费给所有用户体验,本次免费……NH成PCL最后牌面,GEX刷新观众认知,证明自己不逊色于S如果你也喜欢看《绝地求生》的比赛,那么肯定知道,在11月19日12月19日期间,PGC全球总决赛正在火热进行中。而HY也会对这个比赛进行全程直播,给观众们带来这场不容错过的视觉……
10余年追寻记录!自驾游好地分享山水诗之茗岙梯田梯田与村舍依恋,云雾与炊烟交融;彩霞与银光相映,线条与色块相映。这是摄影家徐道浙对永嘉茗岙的赞词!春满山田追寻茗岙还要从2010年说起,当时因一篇摄影家……瓜子脸适合什么发型(女生瓜子脸适合什么发型)当红模特和女演员等海外名人的短发一次性介绍!你可能想为一个成熟的夏天打造完美的风格。对于轻薄的短发,剪掉头发的末端,并以少女风格完成。剪成一个短发,使你的健康活力魅……春日生活打卡春日生活打卡季由于疫情,很少带家人和孩子一起出去,今年疫情出去春日踏青又泡汤了。不过我觉得只要有发现美的眼睛,到处都是美景。如今小区成了我们踏青的好去处哈哈!这次疫情也让我有更……亿联银行荣获第五届数字金融创新大赛双项金奖2022年6月27日,由中国金融认证中心(CFCA)、中国电子银行联合宣传年主办的第五届(2022)数字金融创新大赛,在历时2个多月的精彩角逐后终于落下帷幕,亿联银行在214个……先考是什么意思(故显考某公讳是什么意思)我老太爷原来是一个秀才,我的父亲临终以前,已经将太爷、太奶奶、爷爷、奶奶的坟墓都一次性立了4个墓碑,在典礼仪式上,我为了投其所好,主动承诺了为父母建造合墓和立碑的事宜。父母先后……满五唯一是什么意思(房子70万满2年过户费多少钱)满五唯一是二手房市场里房子满五年,以家庭为单位的唯一住房。满五是如何界定的呢,一般是指从房产证或者契税票出证时间开始算起,时间满五年或超过五年。房屋是否满五年税局参……海狗丸是什么(老公吃了海狗丸厉害)近期娱乐圈的瓜层出不穷,平时无人问津的海狗丸一夜爆火,海狗被碰瓷了,海狗招谁惹谁了!其实海狗丸的主要成分是海马和狗鞭,和海狗没半毛钱关系,况且海狗还是国家二级保护动物。平时无人……南昌有什么好玩的地方(南昌红谷滩有什么好玩的地方推荐)经过双十一血拼后的你还在吃土么?手上没钱还按捺不住一颗出去游玩的心该怎么办?今天大城哥梳理了南昌几个风景很美还免费的好去处关键地……h7n9是什么病毒(甲型h1n1事件始末)(专家:宋秀明,医学博士,上海中医药大学附属医院呼吸科副主任医师,科普中国微平台原创首发)近日,据国家卫计委网站公布2017年1月全国法定传染病疫情情况,显示1月份全国人……胆囊炎吃什么好(胆囊炎的克星食物)先来说说胆囊是个什么东东。胆囊是储存浓缩胆汁的器官,胆汁是消化脂肪必不可少的东西。当我们吃了含有脂肪的食物,胆囊就会收缩,通过胆囊管和胆总管将胆汁挤入十二指肠。顺便说一句,胆囊……国际经济与贸易专业学什么(十大后悔的专业)高考是大众认知中最重要的考试,分数的高低会决定考生选择的范围,考得好就可以去好学校,考得不好甚至可能上不了大学。但是对于分数的过度,会让很多人忽略了志愿报考的重要性。要知……辽篮再次大胜!不过球迷建议杨鸣,这3人赶紧休息吧辽篮再次大胜!不过球迷建议杨鸣,这3人赶紧休息吧亲情提示:亲爱的读者,如何能每天能读到这样的体育资讯呢?点击右上角的关注按钮即可,您的关注将是我创作的最大动力……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网