本篇我们继续聊聊kombu这个python实现的消息库中的一些常用算法实现,和各种排序算法不一样,都是解决一些具体的业务问题,非常有用。本文包括下面几个部分:LRU缓存淘汰算法令牌桶限流算法RoundRobin调度算法LamportClock时间戳算法LaxBoundedSemaphore有限信号量算法LRU缓存淘汰算法 缓存,顾名思义,就是将计算结果暂时存起来,以供后期使用,这样可以省去重复计算的工作。比如我们计算斐波那契数列的递归算法:根据定义递归求解deffib(n):ifn1:returnnreturnfib(n1)fib(n2) 我们求n为5的数,展开数学公式大概如下(这里简化python函数fib名称为数学函数f):f(5)f(4)f(3)f(3)f(2)f(2)f(1)f(2)f(1)f(1)f(0)f(1)f(0)f(1)f(1)f(0)f(1)f(1)f(0)f(1)f(0)f(1)5 根据数学公式,我们可以知道,在执行f(5)过程中,重复执行了5次f(1),3次f(0)。要提高执行效率,就可以用到缓存。最简单的实现版本:根据定义递归求解cache{}deffibv1(n):ifnincache:returncache〔n〕ifn1:resultnelse:resultfib(n1)fib(n2)cache〔n〕resultreturnresult 这种实现方式有2个弊端,一个是依赖一个外部的cache变量,另一个是cache功能和fib函数绑定,还需要修改fib函数。我们可以通过一个装饰器实现这个cache,而不用改动fib函数:defcachedecorator(fun):cache{}defwrapper(args,kwargs):ifargsincache:returncache〔args〕else:retfun(args,kwargs)cache〔args〕retreturnretreturnwrapper 使用的时候可以直接给fib函数添加上装饰器:cachedecoratordeffib(n):。。。 这种缓存实现实现方式,还是会有问题:无法进行清理,内存会持续增长。编程中有一句话是:命名和缓存失效是计算机科学里面最难应对的两件事。关于缓存淘汰有各种算法,请见参考链接,我这里重点介绍一下LRU和LFU。LRU(Leastrecentlyused)最早使用淘汰算法,核心特点是:最早的数先淘汰LFU(Leastfrequentlyused)最少使用淘汰算法,核心特点是:最少的数先淘汰 关于LRU,在我之前介绍tinydb时候有过介绍。其中的实现如下:classLRUCache(abc。MutableMapping,Generic〔K,V〕):definit(self,capacityNone):self。capacitycapacity缓存容量self。cacheOrderedDict()有序字典defget(self,key:K,default:DNone)Optional〔Union〔V,D〕〕:valueself。cache。get(key)从换成获取ifvalueisnotNone:delself。cache〔key〕self。cache〔key〕value更新缓存顺序returnvaluereturndefaultdefset(self,key:K,value:V):ifself。cache。get(key):delself。cache〔key〕self。cache〔key〕value更新缓存顺序及值else:self。cache〔key〕valueifself。capacityisnotNoneandself。lengthself。capacity:self。cache。popitem(lastFalse)淘汰最古老的数据 LRU的特点只要保持缓存数据是有序的,我们甚至不需要自己实现,使用系统functools中的实现:fromfunctoolsimportlrucachelrucache()deffib(n):。。。 kombu中给我们提供了一个线程安全的版本,主要实现:kombu5。0。0kombuutilsfunctional。pyclassLRUCache(UserDict):LRUCacheimplementationusingadoublylinkedlisttotrackaccess。definit(self,limitNone):self。limitlimitself。mutexthreading。RLock()self。dataOrderedDict()defgetitem(self,key):withself。mutex:valueself〔key〕self。data。pop(key)returnvaluedefsetitem(self,key,value):removeleastrecentlyusedkey。withself。mutex:ifself。limitandlen(self。data)self。limit:self。data。pop(next(iter(self。data)))self。data〔key〕value。。。 上面代码在设置和获取数据时候都先获取锁,然后再进行数据操作。 关于缓存使用,除了通过业务场景判断适用那种淘汰算法外,还可以使用具体的缓存命中率指标进行分析:defmemoize(maxsizeNone,keyfunNone,CacheLRUCache):Decoratortocachefunctionreturnvalue。defmemoize(fun):mutexthreading。Lock()cacheCache(limitmaxsize)wraps(fun)defM(args,kwargs):ifkeyfun:keykeyfun(args,kwargs)else:keyargs(KEYWORDMARK,)tuple(sorted(kwargs。items()))try:withmutex:valuecache〔key〕exceptKeyError:valuefun(args,kwargs)未命中需要执行函数M。misses1withmutex:cache〔key〕valueelse:命中率增加M。hits1returnvaluedefclear():Clearthecacheandresetcachestatistics。清理缓存及统计cache。clear()M。hitsM。misses0统计信息M。hitsM。misses0M。clearclearM。originalfuncfunreturnMreturnmemoize memoize的实现并不复杂,增加了hitsmisses数据,可以统计分析缓存的命中率,帮助正确使用LRU缓存。还添加了clear接口,可以在需要的时候对缓存直接进行清理。 注意memoize使用了一个锁,在LRUCache还是使用了一个锁,这个锁的使用,我们以后再讲。令牌桶限流算法 限流是指在系统面临高并发、大流量请求的情况下,限制新的流量对系统的访问,从而保证系统服务的安全性。常用的限流算法有计数器、漏斗算法和令牌桶算法。其中计数器算法又分固定窗口算法、滑动窗口算法,后者我们在TCP协议中经常会碰到。 算法中存在一个令牌桶,以恒定的速率向令牌桶中放入令牌。当请求来时,会首先到令牌桶中去拿令牌,如果拿到了令牌,则该请求会被处理,并消耗掉令牌;如果拿不到令牌,则该请求会被丢弃。当然令牌桶也有一定的容量,如果满了令牌就无法放进去了,这样算法就有限流作用。 又因为令牌产生的速率是很定的,如果消费速率较低,桶里会额外缓存一部分令牌,用于应对流量突发时候的消耗。下面是算法的示意图: TokenbucketDiagram 我们具体看看kombu中提供的实现。TokenBucket类:classTokenBucket::Therateintokenssecondthatthebucketwillberefilled。fillrateNone:Maximumnumberoftokensinthebucket。capacity1:Timestampofthelasttimeatokenwastakenoutofthebucket。timestampNonedefinit(self,fillrate,capacity1):容量上限self。capacityfloat(capacity)剩余令牌数,初始等于容量上限self。tokenscapacity填充率self。fillratefloat(fillrate)self。timestampmonotonic()数据容器self。contentsdeque()defadd(self,item):self。contents。append(item)defpop(self):先进先出returnself。contents。popleft() 代码包括:令牌速率fillrate桶的容量上限一个时间戳剩余令牌数算法提供了一个基于双端队列的数据容器,可以对容器进行先进先出操作 令牌桶是否可用的判断:defcanconsume(self,tokens1):Checkifoneormoretokenscanbeconsumed。Returns:bool:trueifthenumberoftokenscanbeconsumedfromthebucket。Iftheycanbeconsumed,acallwillalsoconsumetherequestednumberoftokensfromthebucket。Callswillonlyconsumetokens(thenumberrequested)orzerotokensitwillneverconsumeapartialnumberoftokens。iftokensself。gettokens():消费n个令牌self。tokenstokensreturnTruereturnFalsedefgettokens(self):ifself。tokensself。capacity:记录当前时间nowmonotonic()计算已经流失的令牌数量deltaself。fillrate(nowself。timestamp)更新容量上限或者剩余令牌和流失数量之和self。tokensmin(self。capacity,self。tokensdelta)self。timestampnowreturnself。tokens 我们可以看到,算法在进行令牌消费判断的同时,还会对桶的剩余流量进行自校正,很巧妙。 TokenBucket的使用在ConsumerMixin的run方法中。创建了一个速率为1的令牌桶,然后持续的进行消费。如果有令牌则消费消费者上的消息;如果没有令牌则进行休眠ch23celerykombu5。0。0kombumixins。py:240classConsumerMixin:defrun(self,tokens1,kwargs):restartlimitTokenBucket(1)。。。无限循环whilenotself。shouldstop:try:有令牌消费ifrestartlimit。canconsume(tokens):pragma:nocoverforinself。consume(limitNone,kwargs):passelse:没浪费休眠sleep(restartlimit。expectedtime(tokens))excepterrors:。。。 其中的休眠时间,是由令牌桶根据期望值计算得来:defexpectedtime(self,tokens1):Returnestimatedtimeoftokenavailability。Returns:float:thetimeinseconds。tokensself。gettokens()tokensmax(tokens,tokens)return(tokenstokens)self。fillrateRoundRobin调度算法 RoundRobin调度算法,最常见的大概是在nginx。RoundRobin方式可让nginx将请求按顺序轮流地分配到后端服务器上,它均衡地对待后端的每一台服务器,而不关心服务器实际的连接数和当前的系统负载,循环往复。在kombu中也提供了几种类似的调度算法:轮询调度公平调度 我们先看RoundRobin方式:classroundrobincycle:Iteratorthatcyclesbetweenitemsinroundrobin。轮询调度算法definit(self,itNone):self。itemsitifitisnotNoneelse〔〕defupdate(self,it):Updateitemsfromiterable。更新列表self。items〔:〕itdefconsume(self,n):Consumenitems。消费n个元素returnself。items〔:n〕defrotate(self,lastused):Movemostrecentlyuseditemtoendoflist。旋转:把最后一个元素放到列表某尾itemsself。itemstry:items。append(items。pop(items。index(lastused)))exceptValueError:passreturnlastused 算法实现很简单,就是一个有序队列,可以每次消费前n个有序元素,并且可以将最近使用的元素旋转到队尾。下面是旋转的单元测试:deftestroundrobincycle():itcyclebyname(roundrobin)(〔A,B,C〕)assertit。consume(3)〔A,B,C〕it。rotate(B)assertit。consume(3)〔A,C,B〕it。rotate(A)assertit。consume(3)〔C,B,A〕it。rotate(A)assertit。consume(3)〔C,B,A〕it。rotate(C)assertit。consume(3)〔B,A,C〕 还有一种公平循环的调度算法:classFairCycle:Cyclebetweenresources。Consumefromasetofresources,whereeachresourcegetsanequalchancetobeconsumedfrom。Arguments:fun(Callable):Callbacktocall。resources(Sequence〔Any〕):Listofresources。predicate(type):Exceptionpredicate。definit(self,fun,resources,predicateException):self。funfunself。resourcesresourcesself。predicatepredicate初始位置self。pos0 FairCycle是一种资源之间公平循环的调度算法,构造函数中:利用资源的函数多个资源的集合 使用的方式是使用get方法传入回调:defnext(self):while1:try:resourceself。resources〔self。pos〕位置加1self。pos1returnresourceexceptIndexError:到尾部后,重置位置self。pos0ifnotself。resources:raiseself。predicate()defget(self,callback,kwargs):Getfromnextresource。无限重试fortriedincount(0):forinfinity获取资源resourceself。next()try:利用资源returnself。fun(resource,callback,kwargs)exceptself。predicate:reraisewhenretriesexchausted。容错上限iftriedlen(self。resources)1:raise 调度主要体现再获取资源的next函数上,没次获取资源后位置标志进行后移,到尾部后在重置到0,继续下一轮循环。算法还可以对资源进行容错,也就是如果获取到的资源无法正常使用,还可以尝试使用下一个资源进行重试。LamportClock算法 兰波特时间戳算法(LamportClock),使用逻辑时间戳作为值的版本以允许跨服务器对值进行排序,是解决分布式系统时间一致的重要算法。 服务器上的系统时间,使用物理的晶体振荡测量,会有不准的情况。我们会经常遇到服务器或者快或者慢的情况,一般使用NTP服务,来和互联网上的某个时间源进行同步。如果本地时间提前了,进行联网校时后,会出现本地时间倒退的问题。而对于两台不同的服务器上,要进行时间统一,就更不能使用系统时间。 兰波特时间戳算法,原理如下:维护一个数字来表示时间戳,并且在每个集群节点都维护一个Lamport时钟的实例。如果事件在节点内发生,时间戳加1事件要发送到远端,则在消息总带上时间戳接收到远端的消息,时间戳Max(本地时间戳,消息中的时间戳)1(进行校正跳跃) 这个过程,可以看下面的图示: 从图中可以看到下面两点:对于每个节点的事件时间,都是递增有序的,比如A是〔4,5,7,10〕,B节点是〔2,3,4,6,7〕,C节点是〔1,5,6,8,9〕时间戳不是全局唯一,不同节点之间会存在序号重复,比如4号消息在A和B节点都存在,5号消息在A和C节点存在 了解算法的场景和原理后,我们再来看算法的实现。classLamportClock::Theclockscurrentvalue。value0definit(self,initialvalue0,LockLock):self。valueinitialvalueself。mutexLock()defadjust(self,other):withself。mutex:valueself。valuemax(self。value,other)1returnvaluedefforward(self):withself。mutex:self。value1returnself。value 算法的实现其实非常简单,就是转发的时候时间戳1;收到消息后进行校正,这个过程中使用线程锁,保证本地的有序。LaxBoundedSemaphore有限信号量算法 前面讲的几种算法,都是基于线程锁实现。使用锁会降低效率,如果在协程中,可以使用无锁的方案,会更高效。kombu的LaxBoundedSemaphore实现,可以作为一种参考。 我们先看使用示例:fromfutureimportprintstatementasprintfignore:justfoolingstupidpyflakesxLaxBoundedSemaphore(2)x。acquire(printf,HELLO1)HELLO1x。acquire(printf,HELLO2)HELLO2x。acquire(printf,HELLO3)x。waitersprivate,donotaccessdirectly〔print,(HELLO3,)〕x。release()HELLO3 示例展示了几步:创建一个大小为2的LaxBoundedSemaphore信号量申请信号,并且执行print函数,可以立即执行继续申请信号执行print函数,也可以立即执行再申请信号执行print函数,这时候由于信号超标,函数不会立即执行手工释放信号量,最后一次申请的print函数自动执行 下面是具体的实现,LaxBoundedSemaphore的构造函数:classLaxBoundedSemaphore:definit(self,value):信号容量self。initialvalueself。valuevalue使用双端队列,FIFOself。waitingdeque()self。addwaiterself。waiting。appendself。popwaiterself。waiting。popleft 申请执行回调函数,会进行信号判断,信号充足会执行行回调并消减一次信号量;信号量不足则将函数及参数放入代办的队列:defacquire(self,callback,partialargs,partialkwargs):Acquiresemaphore。Thiswillimmediatelyapplycallbackiftheresourceisavailable,otherwisethecallbackissuspendeduntilthesemaphoreisreleased。Arguments:callback(Callable):Thecallbacktoapply。partialargs(Any):partialargumentstocallback。valueself。valueifvalue0:容量不够的时候先暂存执行函数,并不更改可用数量self。addwaiter((callback,partialargs,partialkwargs))returnFalseelse:可用数量1self。valuemax(value1,0)直接执行函数callback(partialargs,partialkwargs)returnTrue 使用release时候会取出头部的代办函数,并进行执行,此时信号量不增不减。如果代办全部执行完成后,则逐步恢复信号量到默认值:defrelease(self):Releasesemaphore。Note:Ifthereareanywaitersthiswillapplythefirstwaiterthatiswaitingfortheresource(FIFOorder)。try:waiter,args,kwargsself。popwaiter()exceptIndexError:无缓存则只增加可用数量self。valuemin(self。value1,self。initialvalue)else:有缓存则执行第一个缓存,可用数量不变还是小于0waiter(args,kwargs)小结 本篇文章,我们学习了5种实用的业务算法。LRU缓存淘汰算法,可以对缓存中最早的数据进行淘汰。令牌桶限流算法,可以协助进行服务流量限流,较好的保护后端服务,避免突发流量的到时的崩溃。RoundRobin调度算法,可以进行负载的均衡,保障资源的平衡使用。LamportClock时间戳算法,可以在分布式系统中,进行不同服务之间的有序时间戳同步。LaxBoundedSemaphore有限信号量算法,是一种无锁算法,可高效的提供资源使用控制。小技巧 kombu中提供了一个自动重试算法,可以作为重试算法的模版:kombu5。0。0kombuutilsfunctional。pydefretryovertime(fun,catch,argsNone,kwargsNone,errbackNone,maxretriesNone,intervalstart2,intervalstep2,intervalmax30,callbackNone,timeoutNone):kwargs{}ifnotkwargselsekwargsargs〔〕ifnotargselseargsintervalrangefxrange(intervalstart,intervalmaxintervalstart,intervalstep,repeatlastTrue)超时时间endtime()timeoutiftimeoutelseNoneforretriesincount():try:returnfun(args,kwargs)exceptcatchasexc:超过次数ifmaxretriesisnotNoneandretriesmaxretries:raise超过时间ifendandtime()end:raise。。。休眠sleep(1。0) 从模版可以看到重试时候使用次数和超时时间两个维度进行跳出(不可能无限重试):使用count()进行无限循环使用time()进行超时限定使用maxretries容错上限次数限定每次错误后,都休眠一段时间,给被调用方机会,提高下一次成功的概率。