PostgreSQL数据库进程Latchinterproce
Unix实现使用所谓的自管道(selfpipe)技巧来克服与poll()(或linux上的epollwait())和在信号处理程序中设置全局标志相关的竞争条件。当设置了闩锁并且当前进程正在等待它时,信号处理程序通过向管道写入一个字节来唤醒WaitLatch中的poll()。信号本身不会在所有平台上中断poll(),即使在它中断的平台上,在poll()调用之前到达的信号也不会阻止poll()进入睡眠。然而,管道上的传入字节可靠地中断睡眠,并导致poll()立即返回,即使信号在poll()开始之前到达。
当从拥有锁存器的同一进程调用SetLatch时,SetLatch将字节直接写入管道。如果它由另一个进程拥有,则发送SIGUSR1并且等待进程中的信号处理程序代表信号进程将字节写入管道。Windows实现使用由所有postmaster子进程继承的Windows事件。那里不需要selfpipe技巧。初始化processlocallatch设施InitializeLatchSupport
InitializeLatchSupport函数初始化processlocallatch设施。该函数必须在任何进程启动过程中InitLactch或OwnLatch函数调用之前调用。如果是在postmaster子进程中,selfpipeownerpid是用于设置该进程是否拥有selfpipe的标志(拥有selfpipe的进程pid)。如果selfpipeownerpid不为零,说明现在我们子进程继承了postmaster创建的selfpipe的连接。我们想要关闭postmaster创建的selfpipe的连接,然后才可以创建自己的selfpipes。建立运行信号处理函数唤醒WaitLatch函数中的poll、epollwait的selfpipe。
voidInitializeLatchSupport(void){
intpipefd〔2〕;
if(IsUnderPostmaster){在postmaster子进程中
Wemighthaveinheritedconnectionstoaselfpipecreatedbythe
postmaster。Itscriticalthatchildprocessescreatetheirown
selfpipes,ofcourse,andwereallywantthemtoclosethe
inheritedFDsforsafetyssake。
if(selfpipeownerpid!0){
Assertwegothroughherebutonceinachildprocess
Assert(selfpipeownerpid!MyProcPid);
ReleasepostmasterspipeFDs;ignoreanyerror释放postmaster的pipeFD
(void)close(selfpipereadfd);
(void)close(selfpipewritefd);
Cleanup,justforsafetyssake;wellsetthesebelow
selfpipereadfdselfpipewritefd1;
selfpipeownerpid0;
}
else
{
Postmasterdidntcreateaselfpipe。。。orelsewereinan
EXECBACKENDbuild,inwhichcaseitdoesntmattersincethe
postmasterspipeFDswereclosedbytheactionofFDCLOEXEC。
Assert(selfpipereadfd1);
}
}else{
Inpostmasterorstandalonebackend,assertwedothisbutonce
Assert(selfpipereadfd1);
Assert(selfpipeownerpid0);
}
Setuptheselfpipethatallowsasignalhandlertowakeupthe
poll()epollwait()inWaitLatch。Makethewriteendnonblocking,so
thatSetLatchwontblockiftheeventhasalreadybeensetmanytimes
fillingthekernelbuffer。Makethereadendnonblockingtoo,sothat
wecaneasilyclearthepipebyreadinguntilEAGAINorEWOULDBLOCK。
Also,makebothFDscloseonexec,sincewesurelydonotwantany
childprocessesmessingwiththem。
if(pipe(pipefd)0)elog(FATAL,pipe()failed:m);
if(fcntl(pipefd〔0〕,FSETFL,ONONBLOCK)1)elog(FATAL,fcntl(FSETFL)failedonreadendofselfpipe:m);
if(fcntl(pipefd〔1〕,FSETFL,ONONBLOCK)1)elog(FATAL,fcntl(FSETFL)failedonwriteendofselfpipe:m);
if(fcntl(pipefd〔0〕,FSETFD,FDCLOEXEC)1)elog(FATAL,fcntl(FSETFD)failedonreadendofselfpipe:m);
if(fcntl(pipefd〔1〕,FSETFD,FDCLOEXEC)1)elog(FATAL,fcntl(FSETFD)failedonwriteendofselfpipe:m);
selfpipereadfdpipefd〔0〕;
selfpipewritefdpipefd〔1〕;
selfpipeownerpidMyProcPid;
}初始化单个LatchInitLatch
InitLatch函数初始化进程本地的latch(自己初始化自己的latch)。Latch结构体包含isset、isstared、ownerpid三个成员。
typedefstructLatch{
sigatomictisset;
boolisshared;
intownerpid;
}Latch;
InitLatch函数初始化Latch结构体,isset设置为false,ownerpid为自己的进程pid,isshared设置为false。
voidInitLatch(Latchlatch){
latchissetfalse;
latchownerpidMyProcPid;
latchissharedfalse;
AssertInitializeLatchSupporthasbeencalledinthisprocess
Assert(selfpipereadfd0selfpipeownerpidMyProcPid);
}InitSharedLatch
InitSharedLatch初始化一个共享latch(可以从其他进程set这个latch),初始化完latch不属于任何进程,使用OwnLatch将该latch和当前进程关联。InitSharedLatch必须在postmassterfork子进程前由postmaster调用,通常在ShmemInitStruct分配包含Latch的共享内存块之后调用。Notethatotherhandlescreatedinthismodulearenevermarkedasinheritable。ThuswedonotneedtoworryaboutcleaningupchildprocessreferencestopostmasterprivatelatchesorWaitEventSets。
voidInitSharedLatch(Latchlatch){
latchissetfalse;
latchownerpid0;
latchissharedtrue;
}OwnLatch
OwnLatch将一个共享latch关联到当前进程上,允许该进程在该latch上等待。尽管有latch是否有所属的检查,但是我们在这里没有采用任何锁机制,所以我们不能在两个进程在同时对同一个锁竞争所属权时检测出来错误。这种情况下,调用者必须提供一个interlock来包含latch的所属权。任何进程调用OwnLatch函数,必须确保latchsigusr1handler函数由SIGUSR1信号处理handler调用,因为sharedlatch使用SIGUSER1作为进程间通信机制。
voidOwnLatch(Latchlatch){
Sanitychecks
Assert(latchisshared);
AssertInitializeLatchSupporthasbeencalledinthisprocess
Assert(selfpipereadfd0selfpipeownerpidMyProcPid);
if(latchownerpid!0)elog(ERROR,latchalreadyowned);
latchownerpidMyProcPid;
}DisownLatch
DisownLatch将一个共享latch和当前进程解关联
voidDisownLatch(Latchlatch){
Assert(latchisshared);
Assert(latchownerpidMyProcPid);
latchownerpid0;
}在Latch上等待WaitLatch
WaitLatch等待给定的latch被设置,或者是postmasterdeath或者是超时。wakeEvents是用于指定等待哪种类型事件的掩码。如果latch已经被设置(WLLATCHSET已经给定),函数立即返回。
超时时间以毫秒为单位,如果WLTIMEOUT标志设置了的话,超时时间必须大于等于0。尽管超时时间的类型为log,我们并不支持超时时间长于INTMAX毫秒。
latch必须由当前进程拥有,才能Wait。比如,itmustbeaprocesslocallatchinitializedwithInitLatch,orasharedlatchassociatedwiththecurrentprocessbycallingOwnLatch。
返回指示哪种条件导致当前进程在该latch上被唤醒的掩码。如果是多个唤醒条件,我们不能保证在单次调用中返回所有的条件,但是会至少返回一条。
intWaitLatch(Latchlatch,intwakeEvents,longtimeout,uint32waiteventinfo){
returnWaitLatchOrSocket(latch,wakeEvents,PGINVALIDSOCKET,timeout,waiteventinfo);
}WaitLatchOrSocket
WaitLatch就是调用WaitLatchOrSocket函数,WaitLatchOrSocket比WaitLatch多了一个pgsocket形参(WLSOCKET)。WaitLatch调用时使用PGINVALIDSOCKET,表示是在latch上wait。
当在一个socket上等待时,EOF和errorconditions总是会导致socket被反馈为readablewritableconnected,所以调用者必须处理这种情况。
wakeEvents必须包含或者WLEXITONPMDEATH或者是WLPOSTMASTERDEATH。WLEXITONPMDEATH用于在postmasterdie时自动退出。
WLPOSTMASTERDEATH用于在postmasterdie时在WaitLatchOrSocket函数返回掩码中设置WLPOSTMASTERDEATH标志,以表明postmasterdie。
intWaitLatchOrSocket(Latchlatch,intwakeEvents,pgsocketsock,longtimeout,uint32waiteventinfo){
intret0;
intrc;
WaitEventevent;
WaitEventSetsetCreateWaitEventSet(CurrentMemoryContext,3);
if(wakeEventsWLTIMEOUT)Assert(timeout0);
elsetimeout1;
if(wakeEventsWLLATCHSET)
AddWaitEventToSet(set,WLLATCHSET,PGINVALIDSOCKET,latch,NULL);
Postmastermanagedcallersmusthandlepostmasterdeathsomehow。
Assert(!IsUnderPostmaster(wakeEventsWLEXITONPMDEATH)(wakeEventsWLPOSTMASTERDEATH));
if((wakeEventsWLPOSTMASTERDEATH)IsUnderPostmaster)
AddWaitEventToSet(set,WLPOSTMASTERDEATH,PGINVALIDSOCKET,NULL,NULL);
if((wakeEventsWLEXITONPMDEATH)IsUnderPostmaster)
AddWaitEventToSet(set,WLEXITONPMDEATH,PGINVALIDSOCKET,NULL,NULL);
if(wakeEventsWLSOCKETMASK){
intev;
evwakeEventsWLSOCKETMASK;
AddWaitEventToSet(set,ev,sock,NULL,NULL);
}
rcWaitEventSetWait(set,timeout,event,1,waiteventinfo);
if(rc0)retWLTIMEOUT;
elseretevent。events(WLLATCHSETWLPOSTMASTERDEATHWLSOCKETMASK);
FreeWaitEventSet(set);
returnret;
}设置LatchSetLatch
SetLatch函数设置一个latch,唤醒等待的任何进程。如果在信号处理函数中调用该函数,确保在该函数之前和之后保存和恢复errno。主要工作就是设置latch中的isset,然后唤醒等待的进程(如果有的话)。
如果是当前进程在等待该latch,说明我们是在信号处理函数中设置的Latch,我们使用selfpipe唤醒poll或epollwait。如果是其他进程在等待该latch,则发送一个SIGUSR1信号。
voidSetLatch(Latchlatch){
pidtownerpid;
Thememorybarrierhastobeplacedheretoensurethatanyflagvariablespossiblychangedbythisprocesshavebeenflushedtomainmemory,beforewechecksetisset。
pgmemorybarrier();
Quickexitifalreadyset如果已经设置,直接返回
if(latchisset)return;
latchissettrue;
Seeifanyoneswaitingforthelatch。Itcanbethecurrentprocessif
wereinasignalhandler。Weusetheselfpipetowakeupthe
poll()epollwait()inthatcase。Ifitsanotherprocess,senda
signal。
Fetchownerpidonlyonce,incasethelatchisconcurrentlygetting
ownedordisowned。XXX:Thisassumesthatpidtisatomic,whichisnt
guaranteedtobetrue!Inpractice,theeffectiverangeofpidtfits
ina32bitinteger,andsoshouldbeatomic。Intheworstcase,we
mightendupsignalingthewrongprocess。Eventhen,yourevery
unluckyifaprocesswiththatboguspidexistsandbelongsto
Postgres;andPGdatabaseprocessesshouldhandleexcessSIGUSR1
interruptswithoutaproblemanyhow。
Anothersortofraceconditionthatspossiblehereisforanew
processtoownthelatchimmediatelyafterwelook,sowedontsignal
it。ThisisokaysolongasallcallersofResetLatchWaitLatchfollow
thestandardcodingconventionofwaitingatthebottomoftheirloops,
notthetop,sothattheyllcorrectlyprocesslatchsettingevents
thathappenbeforetheyentertheloop。
ownerpidlatchownerpid;
if(ownerpid0)return;
elseif(ownerpidMyProcPid){
if(waiting)sendSelfPipeByte();
}
elsekill(ownerpid,SIGUSR1);
}
仅获取ownerpid一次,以防闩锁同时被拥有或被剥夺。XXX:这假设pidt是原子的,这不能保证是真的!实际上,pidt的有效范围适合32位整数,因此应该是原子的。在最坏的情况下,我们可能最终会发出错误的过程信号。即便如此,如果存在带有该伪造pid的进程并且属于Postgres,那你就很不走运了;PG数据库进程应该可以毫无问题地处理过多的SIGUSR1中断。另一种可能的竞争条件是新进程在我们查看后立即拥有锁存器,因此我们不发出信号。只要ResetLatchWaitLatch的所有调用者都遵循在循环底部而不是顶部等待的标准编码约定,这样就可以正确处理在进入循环之前发生的闩锁设置事件。latchsigusr1handler
SetLatch使用SIGUSR1唤醒在latch上等待的进程。如果我们在等待,唤醒WaitLatch。
voidlatchsigusr1handler(void){
if(waiting)sendSelfPipeByte();
}
ResetLatch
清除latch的isset。在该函数调用之后调用WaitLatch会进入睡眠,除非在调用WaitLatch之前latch又被设置了。
voidResetLatch(Latchlatch){
Assert(latchownerpidMyProcPid);Onlytheownershouldresetthelatch
latchissetfalse;
Ensurethatthewritetoissetgetsflushedtomainmemorybeforewe
examineanyflagvariables。OtherwiseaconcurrentSetLatchmight
falselyconcludethatitneedntsignalus,eventhoughwehavemissed
seeingsomeflagupdatesthatSetLatchwassupposedtoinformusof。
pgmemorybarrier();
}