应用办公生活信息教育商业
投稿投诉
商业财经
汽车智能
教育国际
房产环球
信息数码
热点科技
生活手机
晨报新闻
办公软件
科学动态
应用生物
体育时事

湖仓一体电商项目(十)业务实现之编写写入DWD层业务代码

  头条创作挑战赛业务实现之编写写入DWD层业务代码
  DWD层数据主要存储干净的明细数据,这里针对ODS层KAFKAODSTOPIC数据编写代码进行清洗写入对应的Kafkatopic和IcebergDWD层中。代码功能中有以下几点重要方面:针对KafkaODS层中的数据进行清洗,写入IcebergDWD层中。将数据除了写入IcebergDWD层中之外,还要写入Kafka中方便后续处理得到DWS层数据。一、代码编写
  编写处理KafkaODS层数据写入IcebergDWD层数据时,由于在KafkaKAFKAODSTOPICtopic中每条数据都已经有对应写入kafka的topic信息,所以这里我们只需要读取KAFKAODSTOPICtopic中的数据写入到IcebergDWD层中,另外动态获取每条数据写入Kafkatopic信息将每条数据写入到对应的topic即可。
  具体代码参照ProduceODSDataToDWD。scala,大体代码逻辑如下:caseclassDwdInfo(icebergodstblname:String,kafkadwdtopic:String,browseproductcode:String,browseproducttpcode:String,userip:String,obtainpoints:String,userid1:String,userid2:String,frontproducturl:String,logtime:String,browseproducturl:String,id:String,ip:String,logintm:String,logouttm:String)objectProduceODSDataToDWD{privatevalkafkaBrokers:StringConfigUtil。KAFKABROKERSdefmain(args:Array〔String〕):Unit{1。准备环境valenv:StreamExecutionEnvironmentStreamExecutionEnvironment。getExecutionEnvironmentvaltblEnv:StreamTableEnvironmentStreamTableEnvironment。create(env)env。enableCheckpointing(5000)importorg。apache。flink。streaming。api。scala。2。需要预先创建Catalog创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持createtableifnotexists。。。语法tblEnv。executeSql(createcataloghadoopicebergwith(typeiceberg,catalogtypehadoop,warehousehdfs:myclusterlakehousedata)。stripMargin)2。创建KafkaConnector,连接消费Kafkaods中数据tblEnv。executeSql(createtablekafkaodstbl(icebergodstblnamestring,kafkadwdtopicstring,datastring)with(connectorkafka,topicKAFKAODSTOPIC,properties。bootstrap。serversnode1:9092,node2:9092,node3:9092,scan。startup。modelatestoffset,也可以指定earliestoffset、latestoffsetproperties。group。idmygroupid,formatjson)。stripMargin)valodsTbl:TabletblEnv。sqlQuery(selecticebergodstblname,data,kafkadwdtopicfromkafkaodstbl。stripMargin)valodsDS:DataStream〔Row〕tblEnv。toAppendStream〔Row〕(odsTbl)3。设置Sink到Kafka数据输出到侧输出流标记valkafkaDataTagnewOutputTag〔JSONObject〕(kafkadata)4。表准换成对应的DataStream数据处理,清洗ODS中的数据,存入Iceberg{icebergodstblname:ODSBROWSELOG,data:{browseProductCode:yyRAteviDb,browseProductTpCode:120,userIp:117。233。5。190,obtainPoints:24,userId:uid464936,frontProductUrl:https:1P2RQbHFS2,logTime:1647065858856,browseProductUrl:https:RXmiOUxRTliu9TE0},kafkadwdtopic:KAFKADWDBROWSELOGTOPIC}{icebergodstblname:ODSUSERLOGIN,data:{database:lakehousedb,xid:14942,userid:uid283876,ip:215。148。233。254,commit:true,id:10052,type:insert,logouttm:1647066506140,table:mcuserlogin,ts:1647066504,logintm:1647051931534},kafkadwdtopic:KAFKADWDUSERLOGINTOPIC}这里将数据转换成DataStream后再转换成表写入Iceberg对数据只是时间进行清洗,转换成DwdInfo类型DataStream返回,先过滤一些数据为null的valdwdDS:DataStream〔DwdInfo〕odsDS。filter(row{row。getField(0)!nullrow。getField(1)!nullrow。getField(2)!null})。process(newProcessFunction〔Row,DwdInfo〕(){overridedefprocessElement(row:Row,context:ProcessFunction〔Row,DwdInfo〕Context,collector:Collector〔DwdInfo〕):Unit{valicebergodstblname:Stringrow。getField(0)。toStringvaldata:Stringrow。getField(1)。toStringvalkafkadwdtopic:Stringrow。getField(2)。toStringvaljsonObj:JSONObjectJSON。parseObject(data)清洗日期数据jsonObj。put(logTime,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logTime)))jsonObj。put(logintm,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logintm)))jsonObj。put(logouttm,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logouttm)))解析json嵌套数据valbrowseproductcode:StringjsonObj。getString(browseProductCode)valbrowseproducttpcode:StringjsonObj。getString(browseProductTpCode)valuserip:StringjsonObj。getString(userIp)valobtainpoints:StringjsonObj。getString(obtainPoints)valuserid1:StringjsonObj。getString(userid)valuserid2:StringjsonObj。getString(userId)valfrontproducturl:StringjsonObj。getString(frontProductUrl)vallogtime:StringjsonObj。getString(logTime)valbrowseproducturl:StringjsonObj。getString(browseProductUrl)valid:StringjsonObj。getString(id)valip:StringjsonObj。getString(ip)vallogintm:StringjsonObj。getString(logintm)vallogouttm:StringjsonObj。getString(logouttm)往各类数据datajson对象中加入sinkdwdtopic的信息jsonObj。put(kafkadwdtopic,kafkadwdtopic)context。output(kafkaDataTag,jsonObj)collector。collect(DwdInfo(icebergodstblname,kafkadwdtopic,browseproductcode,browseproducttpcode,userip,obtainpoints,userid1,userid2,frontproducturl,logtime,browseproducturl,id,ip,logintm,logouttm))}})valpropsnewProperties()props。setProperty(bootstrap。servers,kafkaBrokers)6。将以上数据写入到Kafka各自DWD层topic中,这里不再使用SQL方式,而是直接使用DataStream代码方式Sink到各自的DWD层代码中dwdDS。getSideOutput(kafkaDataTag)。addSink(newFlinkKafkaProducer〔JSONObject〕(KAFKADWDDEFAULTTOPIC,newKafkaSerializationSchema〔JSONObject〕{overridedefserialize(jsonObj:JSONObject,aLong:lang。Long):ProducerRecord〔Array〔Byte〕,Array〔Byte〕〕{valsinkDwdTopic:StringjsonObj。getString(kafkadwdtopic)newProducerRecord〔Array〔Byte〕,Array〔Byte〕〕(sinkDwdTopic,null,jsonObj。toString。getBytes())}},props,FlinkKafkaProducer。Semantic。ATLEASTONCE))env。execute()}}
  二、创建IcebergDWD层表
  代码在执行之前需要在Hive中预先创建对应的Iceberg表,创建Icebreg表方式如下:1、在Hive中添加Iceberg表格式需要的包
  启动HDFS集群,node1启动Hivemetastore服务,在Hive客户端启动Hive添加Iceberg依赖包:node1节点启动Hivemetastore服务〔rootnode1〕hiveservicemetastore在hive客户端node3节点加载两个jar包addjarsoftwarehive3。1。2libiceberghiveruntime0。12。1。jar;addjarsoftwarehive3。1。2liblibfb3030。9。3。jar;
  2、创建Iceberg表
  这里创建IcebergDWD表有DWDUSERLOGIN,创建语句如下:CREATETABLEDWDUSERLOGIN(idstring,useridstring,ipstring,logintmstring,logouttmstring)STOREDBYorg。apache。iceberg。mr。hive。HiveIcebergStorageHandlerLOCATIONhdfs:myclusterlakehousedataicebergdbDWDUSERLOGINTBLPROPERTIES(iceberg。cataloglocationbasedtable,write。metadata。deleteaftercommit。enabledtrue,write。metadata。previousversionsmax3);
  三、代码测试
  以上代码编写完成后,代码执行测试步骤如下:1、在Kafka中创建对应的topic在Kafka中创建KAFKADWDUSERLOGINTOPICtopic。kafkatopics。shzookeepernode3:2181,node4:2181,node5:2181createtopicKAFKADWDUSERLOGINTOPICpartitions3replicationfactor3监控以上topic数据〔rootnode1bin〕。kafkaconsoleconsumer。shbootstrapservernode1:9092,node2:9092,node3:9092topicKAFKADWDUSERLOGINTOPIC
  2、将代码中消费Kafka数据改成从头开始消费
  代码中KafkaConnector中属性scan。startup。mode设置为earliestoffset,从头开始消费数据。
  这里也可以不设置从头开始消费Kafka数据,而是直接启动实时向MySQL表中写入数据代码RTMockDBData。java代码,实时向MySQL对应的表中写入数据,这里需要启动maxwell监控数据,代码才能实时监控到写入MySQL的业务数据。3、执行代码,查看对应结果
  以上代码执行后在,在对应的KafkaKAFKADWDUSERLOGINTOPICtopic中都有对应的数据。在IcebergDWD层中对应的表中也有数据。
  Kafka中结果如下:
  IcebergDWD层表DWDUSERLOGIN中的数据如下:
  四、架构图

女子坚持每天喝小米粥,润肠养胃调节睡眠,半年后身体如何?黄敏(化名)是一名白领,半年前查出慢性胃炎,医生叮嘱她注意饮食,不要吃过多坚硬、刺激性的食物,以免对胃造成进一步的伤害。听到医生的话,黄敏回想起小时候的经历,那时只要一生……76人118117爵士!无解的不是末节18分,而是哈登赛后这在今天结束的一场NBA比赛中,76人以118117险胜爵士,此役两队打得非常激烈,实际上76人首节领先了17分,但次节和第三节都是爵士占据上风,末节两队僵持了很长时间,结果哈登……Nature王春生最新锂电池成果!一、研究背景基于碳酸酯的最先进的电解质不能满足极端锂(Li)离子电池(LIBs)的大部分要求,因为它们的电压窗口被限制在4。3V,它们的工作温度范围很窄,从20到50,而……情满九道弯5组家庭结局杨树茂一家圆满,史小娜一家悲惨《情满九道弯》是由刘家成执导,韩东君、陈瑶、热依扎、种丹妮领衔主演,萨日娜、毕彦君、罗京民、王莎莎、鲍晓、海一天等特别出演的一部年代剧。该剧一方面讲述了杨树茂、叶菲、史小……女人真不容易,没睡过一个整觉,作为男人,我只能尽力的让老公快醒醒,我涨奶了。那我去给你拿吸奶器。已经半夜两点,母乳喂养真的很累,月子里没让老婆照顾宝宝。但是每晚她涨奶还是要醒来三到五次。孕晚期晚上本来就没睡好,卸货了还是睡不好。她……全球招募体验官!寻宝闵行十佳文旅特色酒店旅行未必远方,诗意就在近乡给热爱生活的你一个机会一场免费的闵行特色酒店之旅一站式食住行游娱的专属服务这里有米其林黑珍珠双料餐厅的至尊服务豪华……被日本街拍惊艳到了!裙子帆布鞋搭配时髦又有女人味,真美时尚不分国界,爱美不分地点。曾经以为只有在中国的街拍中才能看到千姿百态的街头时尚,没想到日本女生在街拍中的表现也如此让人惊艳!从轻熟的OL风裙装到休闲元气的牛仔裙,日本女……美国纽约时报就经济意义而言,互联网曾让我们失望来源:环球时报美国《纽约时报》4月4日文章,原题:就经济意义而言,互联网曾让我们失望关于互联网曾对经济带来的影响,我或许有过不正确的判断。但在过去的几十年中,那些断言信息……白夜极光评测二游养成类的立绘天花板之作白夜极光在2021年6月17日上线国际服,这让很多二游玩家痛心疾首,这么一款高质量的二游不能在国服玩到属实遗憾。之前不在国服上线的主要原因是没有版号,好消息是这款游戏在今年终于……同时的相对性,狭义相对论的核心,弄懂了你会发现相对论很简单!提到狭义相对论,少不了讲同时的相对性,必然要讲到经典的雷击火车实验:两道闪电击中车头车尾,我们通过分析,得出了地面系觉得这两个事件是同时发生的,而火车系并不这么认为。于是……配色对于女生而言,重不重要?看完这3组对比图后,你就明白了听说彩色和充满美感的配色能促进多巴胺释放,原来配色不仅能带来时尚,还附赠快乐和幸福,难怪时髦精都把配色看得比穿搭还重要!事实上,女生的美不只是用好身材、高颜值写就,配色也……看展去!一群摄影爱好者把沙漠搬到了南京3月26日,内蒙古奈曼旗宝古图沙漠摄影巡展在南京金陵图书馆开幕,100余幅反映奈曼旗宝古图沙漠自然风光、人文风情、历史风貌的摄影作品集中展出。一进展厅,浩浩渺渺的沙漠摄影……
优雅气质修炼之如何做减法为什么我们觉得有的人气质好,或者气质不一样。那肯定是他们看了许多书,走了许多路,经历了和我们不一样的事,才有了那份独一无二的气质。有时候我们想要的东西太多,就难得有淡定从……豪砸1。2亿欧!英超领头羊引进7大强援,誓要撼动2强争霸格局今天凌晨,英超又一笔重磅转会基本敲定,据罗马诺报道,目前的领头羊热刺从乌迪内斯引进19岁小将乌多吉,花费超过2000万欧元,乌多吉将租借回乌迪内斯一年,在明年夏季回归热刺。在乌……2022年六一儿童节快乐祝福语母爱是一个圆六一国际儿童节即将到来,在这个全世界儿童的节日里,为儿童朋友们分享一些快乐儿童节的祝福语,祝你们开心快乐,健康成长!1、转眼又到儿童节,送份问候祝福你:……云南腾冲甘蔗寨走上乡村振兴民族团结之路图为游客拍摄甘蔗寨壁画。吴鸿摄中新网腾冲9月18日电(吴鸿)沿着蜿蜒的山路,穿过古桥雄关,甘蔗寨的入口便在绵延而上的梯田顶端。这个拥有600多年历史的传统村落如今走上乡村……向太自曝30年没下厨,如今成煮饭婆,向佐郭碧婷疑似分居熟悉向华强和向太的都知道,俩人结婚多年来一直都恩爱如初,向太也备受向华强的宠爱,从来没有受到过恋情绯闻的影响,可以说是豪门里不得多的的恩爱夫妻。结婚后的向太堪称人生赢家,……生完孩子之后来聊一聊生孩子这件事01自我介绍本人和媳妇均是92年人,去年7月喜得男宝一枚,目前媳妇待业在家,美名其曰灵活就业、自由职业者。02没生孩子之前的想法其实我们俩都不抵触生孩子的,总……景山奶茶故宫下午茶你都吃过吗有了这些点心店、咖啡店我又多了一个逛公园的理由天坛公园天坛拾光天坛西门附近的甜品店,主打各种天坛元素,我们点的春色满园上面是蓝色的天坛形状冰淇淋,点缀了可食用……AG瓜VV张角二选一,初晨没了!一诺全面转型打野KPL吃瓜篇:【AG瓜:VV张角二选一,初晨没了!一诺全面转型打野】AG秋季赛的征程止步十强,是回归KPL之后最差的一次成绩。菲菲表示面临最长的休赛期,同时也暗示AG大洗……高纤热量低,农历年前后吃冬笋最佳时,3大族群要慎食高纤、热量低,农历年前后吃冬笋最对时竹笋热量低又富有营养价值,是相当受欢迎的美食。农历年前后是冬笋(孟宗笋)产季,冬笋风味独特,质地细密,香脆可口,许多人在过年期间会料理象征着……奥运史上第一个云奥运要来了!北京冬奥会都有哪些数字创新?知道吗?百年奥运史上的第一个云上奥运,就要来啦!全球新冠疫情持续蔓延的当下,对百年奥运的举办提出了挑战,为了保证赛事的顺利推进,保障观众的观赛体验,中国用数字科技,回答了如何在……2022互联网公司年会奖品大赏阳光普照都是iPhone13就在裁员的消息在互联网圈蔓延的时候,游戏圈却开始吹响了年会奖品内卷的号角。前不久,沐瞳率先曝光了他们年会奖品,不管是苹果全家桶,还是游戏设备,还是8000块旅游基金,奖品……专访超时空之轮2加藤正人22年后,你的心情怎么样在各方的努力下,在奇迹的帮助下,《超时空之轮2》和《另一个伊甸:超越时空的猫》终于完成了。要知道这件事的来龙去脉,最好的办法就是加藤正人(以下简称加藤),有兴趣的人可别错过。……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网