应用办公生活信息教育商业
投稿投诉
商业财经
汽车智能
教育国际
房产环球
信息数码
热点科技
生活手机
晨报新闻
办公软件
科学动态
应用生物
体育时事

SRS之RTMP推拉流分析

  SRS是一个简单高效的实时视频服务器,支持RTMPWebRTCHLSHTTPFLVSRTGB28181;本文以SRS4。0版本进行分析RTMP推拉流架构,SRS整体架构如下图(官网图片)所示:
  有图可知SRS支持多种客户端以不同的媒流体协议进行推流、拉流,内部还包括了不同协议的转换,同时还支持SRS的集群。推荐视频:SRS4。0RTMP推流读取数据,拉流转发数据
  本文主要分析在SRS中RTMP的推流、拉流源码分析,其核心类如下:SrsServerSRS流媒体服务
  SrsBufferListener监听器,主要是TCP的监听SrsTcpListenerTCP监听器SrsRtmpConnRTMP连接,
  对应了SrsStSocket和SrsCoroutineSrsRtmpServer提供与客户端之间的RTMP命令协议消息的交互服务,使SrsRtmpConn提供的socket读写数据SrsLiveSource描述路播放源,包括推流和拉流的描述SrsLiveConsumer拉流消费者,每路拉流客户端对应个SrsLiveConsumerSrsStSocket经过封装的socket接SrsRecvThread负责接收数据,但是要注意的是他这并不是从IO读取数据,从SrsRtmpServer类拉取数据,然后推送到SrsPublishRecvThread(推流),或者SrsQueueRecvThread(拉流)SrsQueueRecvThread主要于拉流,对应的是客户端服务器的控制消息,和视频消息没有关系。客户端读取数据还是从consumer的queue去读取。
  SrsPublishRecvThread主要于推流,内部封装了协程
  RTMP推拉流代码流程如下:
  C音视频学习资料免费获取方法:关注音视频开发T哥,点击链接即可免费获取2023年最新C音视频开发进阶独家免费学习大礼包!
  SRS网络模型分析
  在主函数runhybridserver中开始于srshybridrun()轮询,通过流体服务SrsServer::listen()进入服务端监听,这里分别对不同的协议进行了不同的监听处理,代码如下:srserrortSrsServer::listen(){srserrorterrsrssuccess;rtmp的listenif((errlistenrtmp())!srssuccess){returnsrserrorwrap(err,rtmplisten);}if((errlistenhttpapi())!srssuccess){returnsrserrorwrap(err,httpapilisten);}if((errlistenhttpsapi())!srssuccess){returnsrserrorwrap(err,httpsapilisten);}if((errlistenhttpstream())!srssuccess){returnsrserrorwrap(err,httpstreamlisten);}if((errlistenhttpsstream())!srssuccess){returnsrserrorwrap(err,httpsstreamlisten);}if((errlistenstreamcaster())!srssuccess){returnsrserrorwrap(err,streamcasterlisten);}if((errconnmanagerstart())!srssuccess){returnsrserrorwrap(err,connectionmanager);}returnerr;}
  进入RTMP对应的listen,这里主要通过SrsBufferListener进一步封装了listen,包括httpapi、httpsapi的监听都是用SrsBufferListener统一的封装类;
  C音视频学习资料免费获取方法:关注音视频开发T哥,点击链接即可免费获取2023年最新C音视频开发进阶独家免费学习大礼包!
  srserrortSrsBufferListener::listen(stringi,intp){srserrorterrsrssuccess;ipi;portp;srsfreep(listener);listenernewSrsTcpListener(this,ip,port);new一个SrsTcpListener对象,传一个指针if((errlistenerlisten())!srssuccess){returnsrserrorwrap(err,bufferedtcplisten);}stringvsrslistenertype2string(type);srstrace(slistenattcp:s:d,fdd,v。cstr(),ip。cstr(),port,listenerfd());returnerr;}
  在newSrsTcpListener时传入了this,其实是在构造的时候给handler赋值,继续进入SrsTcpListener::listen()每一个监听,对应一个协程srserrortSrsTcpListener::listen(){srserrorterrsrssuccess;rtmp使用的是tcp,开始listenif((errsrstcplisten(ip,port,lfd))!srssuccess){returnsrserrorwrap(err,listenats:d,ip。cstr(),port);}srsfreep(trd);trdnewSrsSTCoroutine(tcp,this);创建一个协程,传一个用户(SrsTcpListener)指针,如果协程需要回调,可以通过指针找到对应的对象if((errtrdstart())!srssuccess){启动协程,执行SrsSTCoroutine::cycle(),即handlecycle(),最终是SrsTcpListener::cycle()returnsrserrorwrap(err,startcoroutine);}returnerr;}
  启动协程进行监听,执行cycle(),代码如下:srserrortSrsTcpListener::cycle(){srserrorterrsrssuccess;while(true){if((errtrdpull())!srssuccess){读取错误码,判断协程是否结束,不为srssuccess时,说明该协程要退出returnsrserrorwrap(err,tcplistener);}srsnetfdtfdsrsaccept(lfd,NULL,NULL,SRSUTIMENOTIMEOUT);检测新连接if(fdNULL){returnsrserrornew(ERRORSOCKETACCEPT,acceptatfdd,srsnetfdfileno(lfd));}if((errsrsfdcloseexec(srsnetfdfileno(fd)))!srssuccess){returnsrserrorwrap(err,setcloseexec);}if((errhandlerontcpclient(fd))!srssuccess){handle就是new一个SrsTcpListener对象时,传入的ISrsTcpHandler指针,即SrsBufferListener(SrsBufferListener继承了ISrsTcpHandler)returnsrserrorwrap(err,handlefdd,srsnetfdfileno(fd));}}returnerr;}
  这里的ontcpclient实际执行的就是构造函数时传入this,即SrsBufferListener的成员函数,代码如下:监听新的连接srserrortSrsBufferListener::ontcpclient(srsnetfdtstfd){srserrorterrserveracceptclient(type,stfd);if(err!srssuccess){srswarn(acceptclientfailed,erriss,srserrordesc(err)。cstr());srsfreep(err);}returnsrssuccess;}
  进入acceptclient代码如下:type传递了对应的连接类型srserrortSrsServer::acceptclient(SrsListenerTypetype,srsnetfdtstfd){srserrorterrsrssuccess;ISrsStartableConnecitonconnNULL;将fd和一个conn绑定,并返回一个连接connif((errfdtoresource(type,stfd,conn))!srssuccess){if(srserrorcode(err)ERRORSOCKETGETPEERIPsrsconfigemptyipok()){srsclosestfd(stfd);srserrorreset(err);returnsrssuccess;}returnsrserrorwrap(err,fdtoresource);}srsassert(conn);directlyenqueue,thecyclethreadwillremovetheclient。connmanageradd(conn);把连接添加到connmanager进行管理启动类型对应的协程,比如启动rtmp连接对应的协程,每个SrsRtmpConn都有1:1对应的协程if((errconnstart())!srssuccess){returnsrserrorwrap(err,startconncoroutine);}returnerr;}
  此处首先将fd和一个conn绑定,并返回一个连接conn,代码如下:srserrortSrsServer::fdtoresource(SrsListenerTypetype,srsnetfdtstfd,ISrsStartableConnecitonpr){srserrorterrsrssuccess;intfdsrsnetfdfileno(stfd);stringipsrsgetpeerip(fd);intportsrsgetpeerport(fd);。。。。。。。。。。最大连接数判断处理。。。。。。。。。。Thecontextidmaychangeduringcreatingthebellowobjects。SrsContextRestore(srscontextgetid());new一个类型对应的连接if(typeSrsListenerRtmpStream){prnewSrsRtmpConn(this,stfd,ip,port);}elseif(typeSrsListenerHttpApi){prnewSrsHttpApi(false,this,stfd,httpapimux,ip,port);}elseif(typeSrsListenerHttpsApi){prnewSrsHttpApi(true,this,stfd,httpapimux,ip,port);}elseif(typeSrsListenerHttpStream){prnewSrsResponseOnlyHttpConn(false,this,stfd,httpserver,ip,port);}elseif(typeSrsListenerHttpsStream){prnewSrsResponseOnlyHttpConn(true,this,stfd,httpserver,ip,port);}else{srswarn(closefornoservicehandler。fdd,ips:d,fd,ip。cstr(),port);srsclosestfd(stfd);returnerr;}returnerr;}
  其次时将连接conn添加到connmanager进行管理,最后connstart()启动协程进行接收发送数据的处理,这里每一个SrsRtmpConn连接都有1:1对应SrsCoroutine协程,启动后进入SrsRtmpConn::docycle()轮询,代码如下:rtmp接收数据处理srserrortSrsRtmpConn::docycle(){srserrorterrsrssuccess;srstrace(RTMPclientips:d,fdd,ip。cstr(),port,srsnetfdfileno(stfd));设置收发超时时间rtmpsetrecvtimeout(SRSCONSTSRTMPTIMEOUT);rtmpsetsendtimeout(SRSCONSTSRTMPTIMEOUT);rtmp握手if((errrtmphandshake())!srssuccess){returnsrserrorwrap(err,rtmphandshake);}rtmp代理相关uint32triprtmpproxyrealip();if(rip0){srstrace(RTMPproxyrealclientipd。d。d。d,uint8t(rip24),uint8t(rip16),uint8t(rip8),uint8t(rip));}SrsRequestreqinforeq;if((errrtmpconnectapp(req))!srssuccess){握手成功后,处理client发送的connectreturnsrserrorwrap(err,rtmpconnecttcUrl);}setclientiptorequest。reqipip;保存客户端IPsrstrace(connectapp,tcUrls,pageUrls,swfUrls,schemas,vhosts,portd,apps,argss,reqtcUrl。cstr(),reqpageUrl。cstr(),reqswfUrl。cstr(),reqschema。cstr(),reqvhost。cstr(),reqport,reqapp。cstr(),(reqargs?(obj):null));showclientidentityif(reqargs){std::stringsrsversion;std::stringsrsserverip;intsrspid0;intsrsid0;SrsAmf0AnypropNULL;if((propreqargsensurepropertystring(srsversion))!NULL){srsversionproptostr();}if((propreqargsensurepropertystring(srsserverip))!NULL){srsserveripproptostr();}if((propreqargsensurepropertynumber(srspid))!NULL){srspid(int)proptonumber();}if((propreqargsensurepropertynumber(srsid))!NULL){srsid(int)proptonumber();}if(srspid0){srstrace(edgesrsips,versions,pidd,idd,srsserverip。cstr(),srsversion。cstr(),srspid,srsid);}}if((errservicecycle())!srssuccess){errsrserrorwrap(err,servicecycle);}srserrortr0srssuccess;if((r0ondisconnect())!srssuccess){errsrserrorwrap(err,ondisconnects,srserrordesc(r0)。cstr());srsfreep(r0);}Ifclientisredirecttootherservers,wealreadyloggedtheevent。if(srserrorcode(err)ERRORCONTROLREDIRECT){srserrorreset(err);}returnerr;}
  开始进行RTMP正常的握手交互过程、设置收发超时、rtmp代理,握手成功(处理client发送的connect请求);进入servicecycle(),继续数据交互,设置窗口大小、带宽大小、chunk大小、连接成功响应客户端。{srserrorterrsrssuccess;SrsRequestreqinforeq;窗口大小设置intoutacksizesrsconfiggetoutacksize(reqvhost);if(outacksize(errrtmpsetwindowacksize(outacksize))!srssuccess){returnsrserrorwrap(err,rtmp:setoutwindowacksize);}intinacksizesrsconfiggetinacksize(reqvhost);if(inacksize(errrtmpsetinwindowacksize(inacksize))!srssuccess){returnsrserrorwrap(err,rtmp:setinwindowacksize);}带宽设置if((errrtmpsetpeerbandwidth((int)(2。510001000),2))!srssuccess){returnsrserrorwrap(err,rtmp:setpeerbandwidth);}gettheipwhichclientconnected。std::stringlocalipsrsgetlocalip(srsnetfdfileno(stfd));dobandwidthtestifconnecttothevhostwhichisforbandwidthcheck。if(srsconfiggetbwcheckenabled(reqvhost)){if((errbandwidthbandwidthcheck(rtmp,skt,req,localip))!srssuccess){returnsrserrorwrap(err,rtmp:bandwidthcheck);}returnerr;}setchunksizetolarger。setthechunksizebeforeanylargerresponsegreaterthan128,tomakeOBShappy,seehttps:github。comossrssrsissues454intchunksizesrsconfiggetchunksize(reqvhost);从配置文件读取chunksize大小,进行设置,一般设置60k,如果太小就得拆分if((errrtmpsetchunksize(chunksize))!srssuccess){returnsrserrorwrap(err,rtmp:setchunksized,chunksize);}responsetheclientconnectok。if((errrtmpresponseconnectapp(req,localip。cstr()))!srssuccess){连接成功,响应客户端returnsrserrorwrap(err,rtmp:responseconnectapp);}if((errrtmponbwdone())!srssuccess){returnsrserrorwrap(err,rtmp:onbwdown);}真正的循环while(true){if((errtrdpull())!srssuccess){returnsrserrorwrap(err,rtmp:threadquit);}errstreamservicecycle();。。。。。。。。。。。。。。。。。。}returnerr;}
  来到streamservicecycle(),才是真正推流、拉流处理,值得注意的是,还对cachegop是否开启的设置。srserrortSrsRtmpConn::streamservicecycle(){srserrorterrsrssuccess;。。。。。。。。。。。。findasourcetoserve。SrsLiveSourcesourceNULL;一个直播对应一个SrsLiveSource,一个推流,0~N个拉流if((errsrssourcesfetchorcreate(req,server,source))!srssuccess){查找创建一个sourcereturnsrserrorwrap(err,rtmp:fetchsource);}srsassert(source!NULL);读取配置文件,设置是否需要cachegopboolenabledcachesrsconfiggetgopcache(reqvhost);默认是开的srstrace(sourceurls,ips,cached,isedged,sourceidss,reqgetstreamurl()。cstr(),ip。cstr(),enabledcache,infoedge,sourcesourceid()。cstr(),sourcepresourceid()。cstr());sourcesetcache(enabledcache);设置推流、拉流处理switch(infotype){caseSrsRtmpConnPlay:{responseconnectionstartplayif((errrtmpstartplay(inforesstreamid))!srssuccess){returnsrserrorwrap(err,rtmp:startplay);}if((errhttphooksonplay())!srssuccess){returnsrserrorwrap(err,rtmp:callbackonplay);}拉流errplaying(source);httphooksonstop();returnerr;}caseSrsRtmpConnFMLEPublish:{RTMP基本走这里if((errrtmpstartfmlepublish(inforesstreamid))!srssuccess){接收客户端相应的消息,并返回对应的响应returnsrserrorwrap(err,rtmp:startFMLEpublish);}returnpublishing(source);}caseSrsRtmpConnHaivisionPublish:{if((errrtmpstarthaivisionpublish(inforesstreamid))!srssuccess){returnsrserrorwrap(err,rtmp:startHAIVISIONpublish);}returnpublishing(source);}caseSrsRtmpConnFlashPublish:{if((errrtmpstartflashpublish(inforesstreamid))!srssuccess){returnsrserrorwrap(err,rtmp:startFLASHpublish);}returnpublishing(source);}default:{returnsrserrornew(ERRORSYSTEMCLIENTINVALID,rtmp:unknownclienttyped,infotype);}}returnerr;}
  推流流程
  推流流程主要是dopublishing,需要注意的是使用SrsPublishRecvThread封装好的协程与拉流使用的SrsQueueRecvThread区分开来,其代码如下:推流流程srserrortSrsRtmpConn::publishing(SrsLiveSourcesource){srserrorterrsrssuccess;SrsRequestreqinforeq;。。。。。。。。。。。。。。TODO:FIXME:Shouldrefinethestateofpublishing。if((erracquirepublish(source))srssuccess){协程实际是SrsPublishRecvThread内部封装的SrsRecvThread的SrsCoroutine成员变量trd,主要看docycle()的流程参数:rtmp:在协程中有一些rtmp接收数据的处理,req:URL相关,SrsPublishRecvThreadrtrd(rtmp,req,srsnetfdfileno(stfd),0,this,source,srscontextgetid());errdopublishing(source,rtrd);实际推流流程,source就是直播对应的那个sourcertrd。stop();}。。。。。。。。。。。returnerr;}srserrortSrsRtmpConn::dopublishing(SrsLiveSourcesource,SrsPublishRecvThreadrtrd){srserrorterrsrssuccess;SrsRequestreqinforeq;SrsPithyPrintpprintSrsPithyPrint::creatertmppublish();SrsAutoFree(SrsPithyPrint,pprint);updatethestatisticwhensourcedisconveried。SrsStatisticstatSrsStatistic::instance();if((errstatonclient(srscontextgetid()。cstr(),req,this,infotype))!srssuccess){returnsrserrorwrap(err,rtmp:statclient);}startisolaterecvthread。TODO:FIXME:Passthecallbackhere。if((errrtrdstart())!srssuccess){启动协程,SrsRecvThread::docycle()轮询读取数据returnsrserrorwrap(err,rtmp:receivethread);}initializethepublishtimeout。publish1stpkttimeoutsrsconfiggetpublish1stpkttimeout(reqvhost);publishnormaltimeoutsrsconfiggetpublishnormaltimeout(reqvhost);setthesockoptions。setsockoptions();if(true){boolmrsrsconfiggetmrenabled(reqvhost);srsutimetmrsleepsrsconfiggetmrsleep(reqvhost);srstrace(startpublishmrdd,p1stptd,pntd,tcpnodelayd,mr,srsu2msi(mrsleep),srsu2msi(publish1stpkttimeout),srsu2msi(publishnormaltimeout),tcpnodelay);}int64tnbmsgs0;uint64tnbframes0;while(true){if((errtrdpull())!srssuccess){returnsrserrorwrap(err,rtmp:threadquit);}pprintelapse();condwaitfortimeout。if(nbmsgs0){whennotgotmsgs,waitforalargertimeout。seehttps:github。comossrssrsissues441rtrdwait(publish1stpkttimeout);}else{rtrdwait(publishnormaltimeout);}checkthethreaderrorcode。if((errrtrderrorcode())!srssuccess){returnsrserrorwrap(err,rtmp:receivethread);}whennotgotanymessages,timeout。超时处理if(rtrdnbmsgs()nbmsgs){returnsrserrornew(ERRORSOCKETTIMEOUT,rtmp:publishtimeoutdms,nbmsgsd,nbmsgs?srsu2msi(publishnormaltimeout):srsu2msi(publish1stpkttimeout),(int)nbmsgs);}nbmsgsrtrdnbmsgs();收到消息数量Updatethestatforvideofps。remarkhttps:github。comossrssrsissues851SrsStatisticstatSrsStatistic::instance();if((errstatonvideoframes(req,(int)(rtrdnbvideoframes()nbframes)))!srssuccess){returnsrserrorwrap(err,rtmp:statvideoframes);}nbframesrtrdnbvideoframes();视频帧数量reportableif(pprintcanprint()){kbpssample();boolmrsrsconfiggetmrenabled(reqvhost);srsutimetmrsleepsrsconfiggetmrsleep(reqvhost);srstrace(SRSCONSTSLOGCLIENTPUBLISHtimed,okbpsd,d,d,ikbpsd,d,d,mrdd,p1stptd,pntd,(int)pprintage(),kbpsgetsendkbps(),kbpsgetsendkbps30s(),kbpsgetsendkbps5m(),kbpsgetrecvkbps(),kbpsgetrecvkbps30s(),kbpsgetrecvkbps5m(),mr,srsu2msi(mrsleep),srsu2msi(publish1stpkttimeout),srsu2msi(publishnormaltimeout));码率的计算,s,30s,5min的码率}}returnerr;}
  看看SrsPublishRecvThread的成员SrsRecvThreadtrd的docycle()的处理,这里主要是rtmprecvmessage(msg)接收消息,pumperconsume(msg)把消息推送给消费者。srserrortSrsRecvThread::docycle(){srserrorterrsrssuccess;while(true){if((errtrdpull())!srssuccess){returnsrserrorwrap(err,recvthread);}Whenthepumperisinterrupted,waitthenretry。if(pumperinterrupted()){srsusleep(timeout);continue;}SrsCommonMessagemsgNULL;Processthereceivedmessage。处理收到的消息,rtmp由SrsPublishRecvThread的构造函数传进来if((errrtmprecvmessage(msg))srssuccess){errpumperconsume(msg);推送给消费者,pumper也是从SrsPublishRecvThread的SrsRecvThread成员变量trd的构造函数传进来的}if(err!srssuccess){Interruptthereceivethreadforanyerror。trdinterrupt();Notifythepumpertoquitforerror。pumperinterrupt(err);returnsrserrorwrap(err,recvthread);}}returnerr;}
  consume内部进行消息数量、视频帧数量的统计,然后connhandlepublishmessage(source,msg)对消息的处理,最终执行函数processpublishmessage()。audio、video、metaData处理srserrortSrsRtmpConn::processpublishmessage(SrsLiveSourcesource,SrsCommonMessagemsg){srserrorterrsrssuccess;foredge,directlyproxymessagetoorigin。if(infoedge){if((errsourceonedgeproxypublish(msg))!srssuccess){returnsrserrorwrap(err,rtmp:proxypublish);}returnerr;}processaudiopacketRTMPMSGAudioMessage8if(msgheader。isaudio()){if((errsourceonaudio(msg))!srssuccess){audio的处理returnsrserrorwrap(err,rtmp:consumeaudio);}returnerr;}processvideopacketRTMPMSGVideoMessage9if(msgheader。isvideo()){if((errsourceonvideo(msg))!srssuccess){video处理returnsrserrorwrap(err,rtmp:consumevideo);}returnerr;}processaggregatepacketif(msgheader。isaggregate()){if((errsourceonaggregate(msg))!srssuccess){returnsrserrorwrap(err,rtmp:consumeaggregate);}returnerr;}processonMetaDataMetaData处理RTMPMSGAMF0DataMessage18或RTMPMSGAMF3DataMessage15if(msgheader。isamf0data()msgheader。isamf3data()){SrsPacketpktNULL;if((errrtmpdecodemessage(msg,pkt))!srssuccess){returnsrserrorwrap(err,rtmp:decodemessage);}SrsAutoFree(SrsPacket,pkt);if(dynamiccastSrsOnMetaDataPacket(pkt)){SrsOnMetaDataPacketmetadatadynamiccastSrsOnMetaDataPacket(pkt);将packet转成metaDataif((errsourceonmetadata(msg,metadata))!srssuccess){returnsrserrorwrap(err,rtmp:consumemetadata);}returnerr;}returnerr;}returnerr;}
  processpublishmessage对音频、视频、metaData进行处理;先看看音频处理,把msg发送给每一个拉流端消费者,这里的consumers容器保存所有拉流端消费者,在拉流流程中,新建消费者时添加的。音频数据处理srserrortSrsLiveSource::onaudio(SrsCommonMessagesharedaudio){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。convertsharedaudiotomsg,usershouldnotusesharedaudioagain。通过引用计数的方式,创建一个消息SrsSharedPtrMessagemsg;类似智能指针,数据拷贝实际上是浅拷贝,通过引用计数方式,为0释放内存if((errmsg。create(sharedaudio))!srssuccess){returnsrserrorwrap(err,createmessage);}directlyprocesstheaudiomessage。if(!mixcorrect){默认不做校正,就直接处理,就是不用放到map进行排序returnonaudioimp(msg);}insertmsgtothequeue。mixqueuepush(msg。copy());把流消息都插入到队列中,内部并按时间戳做了排序fetchsomeonefrommixqueue。从map中取出来SrsSharedPtrMessagemmixqueuepop();pop时间戳最小的出来if(!m){returnerr;}consumethemonotonicallyincreasemessage。if(misaudio()){erronaudioimp(m);}else{erronvideoimp(m);}srsfreep(m);returnerr;}srserrortSrsLiveSource::onaudioimp(SrsSharedPtrMessagemsg){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。。。。。。。。copytoallconsumer把msg拷贝到消费者对象的队列中,即把数据发给每个拉流端if(!dropforreduce){for(inti0;i(int)consumers。size();i){SrsLiveConsumerconsumerconsumers。at(i);if((errconsumerenqueue(msg,atc,jitteralgorithm))!srssuccess){把消息放到消费者队列returnsrserrorwrap(err,consumemessage);}}}cachethesequenceheaderofaac,orfirstpacketofmp3。forexample,themp3isusedforhlstowritetherightaudiocodec。TODO:FIXME:torefinethestreaminfosystem。if(isaacsequenceheader!metaash()){if((errmetaupdateash(msg))!srssuccess){更新audiosequencereturnsrserrorwrap(err,metaconsumeaudio);}}whensequenceheader,donotpushtogopcacheandadjustthetimestamp。if(issequenceheader){returnerr;}cachethelastgoppacketsif((errgopcachecache(msg))!srssuccess){returnsrserrorwrap(err,gopcacheconsumeaudio);}。。。。。。。。。。。。。returnerr;}
  类似的视频处理,如下:srserrortSrsLiveSource::onvideo(SrsCommonMessagesharedvideo){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。convertsharedvideotomsg,usershouldnotusesharedvideoagain。thepayloadistransfertomsg,andsettoNULLinsharedvideo。SrsSharedPtrMessagemsg;智能指针的封装if((errmsg。create(sharedvideo))!srssuccess){returnsrserrorwrap(err,createmessage);}directlyprocessthevideomessage。if(!mixcorrect){returnonvideoimp(msg);}insertmsgtothequeue。mixqueuepush(msg。copy());把流消息都插入到队列中,内部并按时间戳做了排序fetchsomeonefrommixqueue。SrsSharedPtrMessagemmixqueuepop();pop时间戳最小的消息出来if(!m){returnerr;}consumethemonotonicallyincreasemessage。if(misaudio()){erronaudioimp(m);}else{erronvideoimp(m);}srsfreep(m);returnerr;}srserrortSrsLiveSource::onvideoimp(SrsSharedPtrMessagemsg){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。cachethesequenceheaderifh264donotcachethesequenceheadertogopcache,returnhere。if(issequenceheader(errmetaupdatevsh(msg))!srssuccess){更新videosequencereturnsrserrorwrap(err,metaupdatevideo);}Copytohubtoallutilities。if((errhubonvideo(msg,issequenceheader))!srssuccess){returnsrserrorwrap(err,hubconsumevideo);}Forbridgertoconsumethemessage。if(bridger(errbridgeronvideo(msg))!srssuccess){returnsrserrorwrap(err,bridgerconsumevideo);}copytoallconsumer把数据发给拉流端的消费者(队列中)if(!dropforreduce){for(inti0;i(int)consumers。size();i){SrsLiveConsumerconsumerconsumers。at(i);if((errconsumerenqueue(msg,atc,jitteralgorithm))!srssuccess){把消息放到消费者队列中returnsrserrorwrap(err,consumevideo);}}}whensequenceheader,donotpushtogopcacheandadjustthetimestamp。if(issequenceheader){returnerr;}cachethelastgoppacketscachegop如果是I帧,就会清空掉,重新push新的数据if((errgopcachecache(msg))!srssuccess){returnsrserrorwrap(err,gopcacheconsumevdieo);}。。。。。。。。。returnerr;}
  metaData的处理如下:srserrortSrsLiveSource::onmetadata(SrsCommonMessagemsg,SrsOnMetaDataPacketmetadata){srserrorterrsrssuccess;。。。。。。。。。。。。。。Updatethemetacache。更新metaData保存起来boolupdatedfalse;if((errmetaupdatedata(msgheader,metadata,updated))!srssuccess){returnsrserrorwrap(err,updatemetadata);}if(!updated){returnerr;}whenalreadygotmetadata,dropwhenreducesequenceheader。booldropforreducefalse;if(metadata()srsconfiggetreducesequenceheader(reqvhost)){dropforreducetrue;srswarn(dropforreduceshmetadata,sized,msgsize);}copytoallconsumer把推流端发的metaData也插入消费队列中,便于拉流者知道if(!dropforreduce){std::vectorSrsLiveConsumer::iteratorit;for(itconsumers。begin();it!consumers。end();it){SrsLiveConsumerconsumerit;if((errconsumerenqueue(metadata(),atc,jitteralgorithm))!srssuccess){returnsrserrorwrap(err,consumemetadata);}}}Copytohubtoallutilities。returnhubonmetadata(metadata(),metadata);}
  拉流流程
  首先每一个拉流端都会绑定一个SrsConsumer消费者,每一个消费者对应一个SrsQueueRecvThread协程,执行doplayingsrserrortSrsRtmpConn::playing(SrsLiveSourcesource){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。。。。Createaconsumerofsource。SrsLiveConsumerconsumerNULL;消费者,每个拉流都会绑定一个SrsConsumerSrsAutoFree(SrsLiveConsumer,consumer);if((errsourcecreateconsumer(consumer))!srssuccess){returnsrserrorwrap(err,rtmp:createconsumer);}if((errsourceconsumerdumps(consumer))!srssuccess){returnsrserrorwrap(err,rtmp:dumpsconsumer);}每一个消费者独立一个协程SrsQueueRecvThreadtrd(consumer,rtmp,SRSPERFMWSLEEP,srscontextgetid());if((errtrd。start())!srssuccess){returnsrserrorwrap(err,rtmp:startreceivethread);}Deliverpacketstopeer。wakableconsumer;errdoplaying(source,consumer,trd);每个流source绑定一个消费者SrsConsumerwakableNULL;trd。stop();Dropallpacketsinreceivingthread。if(!trd。empty()){srswarn(dropthereceiveddmessages,trd。size());}returnerr;}srserrortSrsRtmpConn::doplaying(SrsLiveSourcesource,SrsLiveConsumerconsumer,SrsQueueRecvThreadrtrd){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。while(true){whensourceissettoexpired,disconnectit。if((errtrdpull())!srssuccess){判断协程是否退出returnsrserrorwrap(err,rtmp:threadquit);}collectelapseforpithyprint。pprintelapse();touseisolatethreadtorecv,canimproveabout33performance。while(!rtrdempty()){SrsCommonMessagemsgrtrdpump();if((errprocessplaycontrolmsg(consumer,msg))!srssuccess){播放控制处理returnsrserrorwrap(err,rtmp:playcontrolmessage);}}quitwhenrecvthreaderror。if((errrtrderrorcode())!srssuccess){returnsrserrorwrap(err,rtmp:recvthread);}ifdefSRSPERFQUEUECONDWAITwaitformessagetoincoming。seehttps:github。comossrssrsissues257consumerwait(mwmsgs,mwsleep);等数据累积一段时间攒一定数据,再发送endifgetmessagesfromconsumer。eachmsginmsgs。msgsmustbefree,fortheSrsMessageArrayneverfreethem。remarkwhenenablesendmininterval,onlyfetchonemessageatime。intcount(sendmininterval0)?1:0;if((errconsumerdumppackets(msgs,count))!srssuccess){从消费队列中一次读取出来,数据从SrsConsumerqueue来,实际是从source给过来的returnsrserrorwrap(err,rtmp:consumerdumppackets);}。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。sendoutmessages,allmessagesarefreedbysendandfreemessages()。noneedtoassertmsg,forthertmpwillassertit。if(count0(errrtmpsendandfreemessages(msgs。msgs,count,inforesstreamid))!srssuccess){发送数据,给到客户端,最终调用protocol封装好的socketapireturnsrserrorwrap(err,rtmp:senddmessages,count);}ifdurationspecified,andexceedit,stopplaylive。see:https:github。comossrssrsissues45if(userspecifieddurationtostop){if(durationreqduration){returnsrserrornew(ERRORRTMPDURATIONEXCEED,rtmp:timedupd,srsu2msi(duration),srsu2msi(reqduration));}}applytheminimalintervalfordeliverystreaminsrsutimet。if(sendmininterval0){srsusleep(sendmininterval);}Yieldtoanothercoroutines。seehttps:github。comossrssrsissues2194issuecomment777437476srsthreadyield();让出cpu,让其他协程继续运行}returnerr;}
  doplaying内部processplaycontrolmsg播放控制处理,consumerdumppackets(msgs,count)从消费队列读取数据,最终rtmpsendandfreemessages(msgs。msgs,count,inforesstreamid)发送到play客户端。作者:MrJuJu
  原文链接:https:www。cnblogs。comjujugop17039564。html
  音视频开发流媒体程序员

路口堵车太烦人,自带涂料P车道?滴滴大数据给大家指个明路路口堵车基本上每个人都遇见过吧,开车的人都怕碰上堵车,碰上赶时间那更是又着急又上火。连云港就有这么一位开车的小伙子,被广大网友称为连云港粉刷匠。被路口堵车弄得烦不胜烦的小伙子直……学生证火车票优惠时间(寒假学生优惠票开始全面发售)学生证火车票优惠时间(寒假学生优惠票开始全面发售)资料图来自中国铁路成都局集团公司消息:2020年寒假学生火车票已经开始全面开售,广大学生旅客可以购买今年12月1日……在古代,你知道串个门有多累吗作者:秦筱来源:《意林》穿衣梳妆停当,出发拜会朋友,这才是真正考验普通人个人风度的时候。别急,登门之前准备好礼物先。这倒不费什么脑筋,《周礼》早就规定好了见什么样的……改签的票能退吗(看懂火车票退改签新规)改签的票能退吗(看懂火车票退改签新规)今天,2021年春运正式开启。铁路出台多项春运服务举措,其中火车票退改签新规备受关注。线上线下各渠道售票预售期统一调整为15天……房屋常识翡翠国际城房产证什么时候办理下来谁知道呢很多朋友们现在对于房产方面的知识了解的偏少,不管是从购房的角度,或者是装修房子的角度,我们都需要对房产方面的一些知识进行了解,所以小编今天就在网上搜集了一些房产方面相关的知识来……房屋常识首套房需要交契税吗税是怎么算的呢很多朋友们现在对于房产方面的知识了解的偏少,不管是从购房的角度,或者是装修房子的角度,我们都需要对房产方面的一些知识进行了解,所以小编今天就在网上搜集了一些房产方面相关的知识来……房屋常识问下住房公积金封存是怎么回事很多朋友们现在对于房产方面的知识了解的偏少,不管是从购房的角度,或者是装修房子的角度,我们都需要对房产方面的一些知识进行了解,所以小编今天就在网上搜集了一些房产方面相关的知识来……汉洁环境参加中国石化泄漏管理专题研讨会参会详情2018年5月3031日,上海汉洁环境工程有限公司参加了在中国石化管道储运有限公司徐州会议中心举行的中国石化泄漏管理专题研讨会,本次研讨会由中国石化安全监管局主办……房屋常识想问下公积金贷年限是怎么算的很多朋友们现在对于房产方面的知识了解的偏少,不管是从购房的角度,或者是装修房子的角度,我们都需要对房产方面的一些知识进行了解,所以小编今天就在网上搜集了一些房产方面相关的知识来……房屋常识70年产权是怎么算的现在算还是交房算起很多朋友们现在对于房产方面的知识了解的偏少,不管是从购房的角度,或者是装修房子的角度,我们都需要对房产方面的一些知识进行了解,所以小编今天就在网上搜集了一些房产方面相关的知识来……打折怎么算(打折如何计算)打折怎么算(打折如何计算)什么是打折计算?打折计算考点有哪些?中公网校2019012114:30:14一、什么是打折计算?打折,即按售价的一定比例销售。打八折……别让目的颤抖毁了你作者:马志国来源:《意林》什么叫目的颤抖?有心理学家做过这样一个实验:让人给小小的绣花针引线,事先激发他们不同强度的目的性,比如设置价值不同的奖品。结果发现,目的性……
少数民族的风俗有哪些(56个民族的风土人情和风俗习惯)少数民族的风俗有哪些(56个民族的风土人情和风俗习惯)1、蒙古族(mnggz)蒙古族是一个历史悠久而又富有传奇色彩的民族,过着逐水草而迁徙的游牧生活。中国的大部分草原都留……侗族的传统节日(侗族传统的节日都有哪些)侗族的传统节日(侗族传统的节日都有哪些)每一个民族都有代表自己民族特色的传统节日,侗族当然也不例外。他们的一些节日可能是你平时没见过的,但却非常有特色,那是经过长时间的文化积淀……房屋常识离婚后按揭房如何更名怎么办理呢应该怎么办呢很多朋友们现在对于房产方面的知识了解的偏少,不管是从购房的角度,或者是装修房子的角度,我们都需要对房产方面的一些知识进行了解,所以小编今天就在网上搜集了一些房产方面相关的知识来……他曾任广东省委书记,89岁逝世成就不凡,三个儿子却无一人为官他们凭着这共同的信仰,开辟出东方净土,他们追随进步理念,解放思想,他们与人民群众戚戚相关,经过不断的努力,不断的坚持,使我们的祖国繁荣昌盛,而这其中就有一人,他在58岁曾任广东……国庆手抄报文字内容(国庆节手抄报文字内容资料大全)国庆手抄报文字内容(国庆节手抄报文字内容资料大全)国庆节手抄报文字内容资料大全,国庆节作文、祝福短信、爱国诗歌10月1日是国庆节,我们又迎来了祖国母亲的生日了。为此小编为……国庆节手抄报文字内容(国庆节手抄报文字内容资料大全)国庆节手抄报文字内容(国庆节手抄报文字内容资料大全)国庆节手抄报文字内容资料大全,国庆节作文、祝福短信、爱国诗歌10月1日是国庆节,我们又迎来了祖国母亲的生日了。为此小编……正确与错误都是相对的吗?正确与错误都是绝对的,要么是真理,要么是谬误。所谓相对,那是我们在逻辑认识上出了问题,没有掌握到分辨正确与错误的方式方法。逻辑学是联合国教科文组织认定的仅次于数学的第二大……告别2020的句子(再见2020你好2021句子寄语)告别2020的句子(再见2020你好2021句子寄语)今天是2020年的最后一天,2020年即将结束,迎来新的一年,也就是2021年。对即将过去的2020年有什么想说的吗……迎接12月份到来优美的句子(描写12月唯美句子)迎接12月份到来优美的句子(描写12月唯美句子)描写12月唯美句子迎接新的一个月说说2018最后一个月加油十二月,落于一年中最后一季,十二月,一年中最后一个月,所有……英国监管机构调查亚马逊和谷歌,是否存在虚假商品评论英国管制员星期五表示,他们正在研究谷歌和Amazon,因为他们担心在线Galias没有做必要的事情来停止对其基础上的项目和管理部门进行虚假调查。竞争和市场管理局说,这只是……八爪鱼8条腿,韩国却发现32条腿的章鱼,是日本核泄漏引起的吗对于生物而言,稳定的繁衍是最重要的前提,繁衍可以保证种群持续地扩张,而也是在繁衍的过程中,生物获得的可遗传的变异传播开来,导致后代种群基因库的改变,而这是生物进化的原因。生物的……生活知识科普情人节是几月几日在我们的生活当中有很多的常识性的知识大多数人都是不知道的,就好比最近就有很多小伙伴咨询小编问小编关于情人节是几月几日的这些知识点,这就说明了现在人们也开开渐渐关注起了情人节是几……
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网