湖仓一体电商项目(十)业务实现之编写写入DWD层业务代码
头条创作挑战赛业务实现之编写写入DWD层业务代码
DWD层数据主要存储干净的明细数据,这里针对ODS层KAFKAODSTOPIC数据编写代码进行清洗写入对应的Kafkatopic和IcebergDWD层中。代码功能中有以下几点重要方面:针对KafkaODS层中的数据进行清洗,写入IcebergDWD层中。将数据除了写入IcebergDWD层中之外,还要写入Kafka中方便后续处理得到DWS层数据。一、代码编写
编写处理KafkaODS层数据写入IcebergDWD层数据时,由于在KafkaKAFKAODSTOPICtopic中每条数据都已经有对应写入kafka的topic信息,所以这里我们只需要读取KAFKAODSTOPICtopic中的数据写入到IcebergDWD层中,另外动态获取每条数据写入Kafkatopic信息将每条数据写入到对应的topic即可。
具体代码参照ProduceODSDataToDWD。scala,大体代码逻辑如下:caseclassDwdInfo(icebergodstblname:String,kafkadwdtopic:String,browseproductcode:String,browseproducttpcode:String,userip:String,obtainpoints:String,userid1:String,userid2:String,frontproducturl:String,logtime:String,browseproducturl:String,id:String,ip:String,logintm:String,logouttm:String)objectProduceODSDataToDWD{privatevalkafkaBrokers:StringConfigUtil。KAFKABROKERSdefmain(args:Array〔String〕):Unit{1。准备环境valenv:StreamExecutionEnvironmentStreamExecutionEnvironment。getExecutionEnvironmentvaltblEnv:StreamTableEnvironmentStreamTableEnvironment。create(env)env。enableCheckpointing(5000)importorg。apache。flink。streaming。api。scala。2。需要预先创建Catalog创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持createtableifnotexists。。。语法tblEnv。executeSql(createcataloghadoopicebergwith(typeiceberg,catalogtypehadoop,warehousehdfs:myclusterlakehousedata)。stripMargin)2。创建KafkaConnector,连接消费Kafkaods中数据tblEnv。executeSql(createtablekafkaodstbl(icebergodstblnamestring,kafkadwdtopicstring,datastring)with(connectorkafka,topicKAFKAODSTOPIC,properties。bootstrap。serversnode1:9092,node2:9092,node3:9092,scan。startup。modelatestoffset,也可以指定earliestoffset、latestoffsetproperties。group。idmygroupid,formatjson)。stripMargin)valodsTbl:TabletblEnv。sqlQuery(selecticebergodstblname,data,kafkadwdtopicfromkafkaodstbl。stripMargin)valodsDS:DataStream〔Row〕tblEnv。toAppendStream〔Row〕(odsTbl)3。设置Sink到Kafka数据输出到侧输出流标记valkafkaDataTagnewOutputTag〔JSONObject〕(kafkadata)4。表准换成对应的DataStream数据处理,清洗ODS中的数据,存入Iceberg{icebergodstblname:ODSBROWSELOG,data:{browseProductCode:yyRAteviDb,browseProductTpCode:120,userIp:117。233。5。190,obtainPoints:24,userId:uid464936,frontProductUrl:https:1P2RQbHFS2,logTime:1647065858856,browseProductUrl:https:RXmiOUxRTliu9TE0},kafkadwdtopic:KAFKADWDBROWSELOGTOPIC}{icebergodstblname:ODSUSERLOGIN,data:{database:lakehousedb,xid:14942,userid:uid283876,ip:215。148。233。254,commit:true,id:10052,type:insert,logouttm:1647066506140,table:mcuserlogin,ts:1647066504,logintm:1647051931534},kafkadwdtopic:KAFKADWDUSERLOGINTOPIC}这里将数据转换成DataStream后再转换成表写入Iceberg对数据只是时间进行清洗,转换成DwdInfo类型DataStream返回,先过滤一些数据为null的valdwdDS:DataStream〔DwdInfo〕odsDS。filter(row{row。getField(0)!nullrow。getField(1)!nullrow。getField(2)!null})。process(newProcessFunction〔Row,DwdInfo〕(){overridedefprocessElement(row:Row,context:ProcessFunction〔Row,DwdInfo〕Context,collector:Collector〔DwdInfo〕):Unit{valicebergodstblname:Stringrow。getField(0)。toStringvaldata:Stringrow。getField(1)。toStringvalkafkadwdtopic:Stringrow。getField(2)。toStringvaljsonObj:JSONObjectJSON。parseObject(data)清洗日期数据jsonObj。put(logTime,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logTime)))jsonObj。put(logintm,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logintm)))jsonObj。put(logouttm,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logouttm)))解析json嵌套数据valbrowseproductcode:StringjsonObj。getString(browseProductCode)valbrowseproducttpcode:StringjsonObj。getString(browseProductTpCode)valuserip:StringjsonObj。getString(userIp)valobtainpoints:StringjsonObj。getString(obtainPoints)valuserid1:StringjsonObj。getString(userid)valuserid2:StringjsonObj。getString(userId)valfrontproducturl:StringjsonObj。getString(frontProductUrl)vallogtime:StringjsonObj。getString(logTime)valbrowseproducturl:StringjsonObj。getString(browseProductUrl)valid:StringjsonObj。getString(id)valip:StringjsonObj。getString(ip)vallogintm:StringjsonObj。getString(logintm)vallogouttm:StringjsonObj。getString(logouttm)往各类数据datajson对象中加入sinkdwdtopic的信息jsonObj。put(kafkadwdtopic,kafkadwdtopic)context。output(kafkaDataTag,jsonObj)collector。collect(DwdInfo(icebergodstblname,kafkadwdtopic,browseproductcode,browseproducttpcode,userip,obtainpoints,userid1,userid2,frontproducturl,logtime,browseproducturl,id,ip,logintm,logouttm))}})valpropsnewProperties()props。setProperty(bootstrap。servers,kafkaBrokers)6。将以上数据写入到Kafka各自DWD层topic中,这里不再使用SQL方式,而是直接使用DataStream代码方式Sink到各自的DWD层代码中dwdDS。getSideOutput(kafkaDataTag)。addSink(newFlinkKafkaProducer〔JSONObject〕(KAFKADWDDEFAULTTOPIC,newKafkaSerializationSchema〔JSONObject〕{overridedefserialize(jsonObj:JSONObject,aLong:lang。Long):ProducerRecord〔Array〔Byte〕,Array〔Byte〕〕{valsinkDwdTopic:StringjsonObj。getString(kafkadwdtopic)newProducerRecord〔Array〔Byte〕,Array〔Byte〕〕(sinkDwdTopic,null,jsonObj。toString。getBytes())}},props,FlinkKafkaProducer。Semantic。ATLEASTONCE))env。execute()}}
二、创建IcebergDWD层表
代码在执行之前需要在Hive中预先创建对应的Iceberg表,创建Icebreg表方式如下:1、在Hive中添加Iceberg表格式需要的包
启动HDFS集群,node1启动Hivemetastore服务,在Hive客户端启动Hive添加Iceberg依赖包:node1节点启动Hivemetastore服务〔rootnode1〕hiveservicemetastore在hive客户端node3节点加载两个jar包addjarsoftwarehive3。1。2libiceberghiveruntime0。12。1。jar;addjarsoftwarehive3。1。2liblibfb3030。9。3。jar;
2、创建Iceberg表
这里创建IcebergDWD表有DWDUSERLOGIN,创建语句如下:CREATETABLEDWDUSERLOGIN(idstring,useridstring,ipstring,logintmstring,logouttmstring)STOREDBYorg。apache。iceberg。mr。hive。HiveIcebergStorageHandlerLOCATIONhdfs:myclusterlakehousedataicebergdbDWDUSERLOGINTBLPROPERTIES(iceberg。cataloglocationbasedtable,write。metadata。deleteaftercommit。enabledtrue,write。metadata。previousversionsmax3);
三、代码测试
以上代码编写完成后,代码执行测试步骤如下:1、在Kafka中创建对应的topic在Kafka中创建KAFKADWDUSERLOGINTOPICtopic。kafkatopics。shzookeepernode3:2181,node4:2181,node5:2181createtopicKAFKADWDUSERLOGINTOPICpartitions3replicationfactor3监控以上topic数据〔rootnode1bin〕。kafkaconsoleconsumer。shbootstrapservernode1:9092,node2:9092,node3:9092topicKAFKADWDUSERLOGINTOPIC
2、将代码中消费Kafka数据改成从头开始消费
代码中KafkaConnector中属性scan。startup。mode设置为earliestoffset,从头开始消费数据。
这里也可以不设置从头开始消费Kafka数据,而是直接启动实时向MySQL表中写入数据代码RTMockDBData。java代码,实时向MySQL对应的表中写入数据,这里需要启动maxwell监控数据,代码才能实时监控到写入MySQL的业务数据。3、执行代码,查看对应结果
以上代码执行后在,在对应的KafkaKAFKADWDUSERLOGINTOPICtopic中都有对应的数据。在IcebergDWD层中对应的表中也有数据。
Kafka中结果如下:
IcebergDWD层表DWDUSERLOGIN中的数据如下:
四、架构图