ApacheDoris整合IcebergFlinkCDC构建
导读:这是一篇非常完整全面的应用技术干货,手把手教你如何使用DorisIcebergFlinkCDC构建实时湖仓一体的联邦查询分析架构。按照本文中步骤一步步完成,完整体验搭建操作的完整过程。
作者ApacheDorisPMC成员张家锋1。概览
这篇教程将展示如何使用DorisIcebergFlinkCDC构建实时湖仓一体的联邦查询分析,Doris1。1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,同时本教程整个环境是都基于伪分布式环境搭建,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。1。1软件环境
本教程的演示环境如下:Centos7Apahcedoris1。1Hadoop3。3。3hive3。1。3Fink1。14。4flinksqlconnectormysqlcdc2。2。1ApacheIceberg0。13。2JDK1。8。0311MySQL8。0。29wgethttps:archive。apache。orgdisthadoopcorehadoop3。3。3hadoop3。3。3。tar。gzwgethttps:archive。apache。orgdisthivehive3。1。3apachehive3。1。3bin。tar。gzwgethttps:dlcdn。apache。orgflinkflink1。14。4flink1。14。4binscala2。12。tgzwgethttps:search。maven。orgremotecontent?filepathorgapacheicebergicebergflinkruntime1。140。13。2icebergflinkruntime1。140。13。2。jarwgethttps:repository。cloudera。comartifactoryclouderareposorgapacheflinkflinkshadedhadoop3uber3。1。1。7。2。9。01739。0flinkshadedhadoop3uber3。1。1。7。2。9。01739。0。jar1。2系统架构
我们整理架构图如下
首先我们从Mysql数据中使用Flink通过Binlog完成数据的实时采集然后再Flink中创建Iceberg表,Iceberg的元数据保存在hive里最后我们在Doris中创建Iceberg外表在通过Doris统一查询入口完成对Iceberg里的数据进行查询分析,供前端应用调用,这里iceberg外表的数据可以和Doris内部数据或者Doris其他外部数据源的数据进行关联查询分析
Doris湖仓一体的联邦查询架构如下:
Doris通过ODBC方式支持:MySQL,Postgresql,Oracle,SQLServer同时支持Elasticsearch外表1。0版本支持Hive外表1。1版本支持Iceberg外表1。2版本支持Hudi外表2。环境安装部署2。1安装Hadoop、Hivetarzxvfhadoop3。3。3。tar。gztarzxvfapachehive3。1。3bin。tar。gz
配置系统环境变量exportHADOOPHOMEdatahadoop3。3。3exportHADOOPCONFDIRHADOOPHOMEetchadoopexportHADOOPHDFSHOMEHADOOPHOMEexportHIVEHOMEdatahive3。1。3exportPATHPATH:HADOOPHOMEbin:HIVEHOMEbin:HIVEHOMEconf2。2配置hdfs2。2。1coresite。xml
vietchadoopcoresite。xmlconfigurationpropertynamefs。defaultFSnamevaluehdfs:localhost:9000valuepropertyconfiguration2。2。2hdfssite。xml
vietchadoophdfssite。xmlconfigurationpropertynamedfs。replicationnamevalue1valuepropertypropertynamedfs。namenode。name。dirnamevaluedatahdfsnamenodevaluepropertypropertynamedfs。datanode。data。dirnamevaluedatahdfsdatanodevaluepropertyconfiguration2。2。3修改Hadoop启动脚本
sbinstartdfs。sh
sbinstopdfs。sh
在文件开始加上下面的内容HDFSDATANODEUSERrootHADOOPSECUREDNUSERhdfsHDFSNAMENODEUSERrootHDFSSECONDARYNAMENODEUSERroot
sbinstartyarn。sh
sbinstopyarn。sh
在文件开始加上下面的内容YARNRESOURCEMANAGERUSERrootHADOOPSECUREDNUSERyarnYARNNODEMANAGERUSERroot2。3配置yarn
这里我改变了Yarn的一些端口,因为我是单机环境和Doris的一些端口冲突。你可以不启动yarn
vietchadoopyarnsite。xmlpropertynameyarn。resourcemanager。addressnamevaluejiafengtest:50056valuepropertypropertynameyarn。resourcemanager。scheduler。addressnamevaluejiafengtest:50057valuepropertypropertynameyarn。resourcemanager。resourcetracker。addressnamevaluejiafengtest:50058valuepropertypropertynameyarn。resourcemanager。admin。addressnamevaluejiafengtest:50059valuepropertypropertynameyarn。resourcemanager。webapp。addressnamevaluejiafengtest:9090valuepropertypropertynameyarn。nodemanager。localizer。addressnamevalue0。0。0。0:50060valuepropertypropertynameyarn。nodemanager。webapp。addressnamevalue0。0。0。0:50062valueproperty
vietchadoopmapredsite。xmpropertynamemapreduce。jobhistory。addressnamevalue0。0。0。0:10020valuepropertypropertynamemapreduce。jobhistory。webapp。addressnamevalue0。0。0。0:19888valuepropertypropertynamemapreduce。shuffle。portnamevalue50061valueproperty2。2。4启动hadoopsbinstartall。sh2。4配置Hive2。4。1创建hdfs目录hdfsdfsmkdirpuserhivewarehousehdfsdfsmkdirtmphdfsdfschmodgwuserhivewarehousehdfsdfschmodgwtmp2。4。2配置hivesite。xmllt;?xmlversion1。0?lt;?xmlstylesheettypetextxslhrefconfiguration。xsl?configurationpropertynamejavax。jdo。option。ConnectionURLnamevaluejdbc:mysql:localhost:3306hive?createDatabaseIfNotExisttruevaluepropertypropertynamejavax。jdo。option。ConnectionDriverNamenamevaluecom。mysql。jdbc。Drivervaluepropertypropertynamejavax。jdo。option。ConnectionUserNamenamevaluerootvaluepropertypropertynamejavax。jdo。option。ConnectionPasswordnamevalueMyNewPass4!valuepropertypropertynamehive。metastore。warehouse。dirnamevalueuserhivewarehousevaluedescriptionlocationofdefaultdatabaseforthewarehousedescriptionpropertypropertynamehive。metastore。urisnamevaluedescriptionThriftURIfortheremotemetastore。Usedbymetastoreclienttoconnecttoremotemetastore。descriptionpropertypropertynamejavax。jdo。PersistenceManagerFactoryClassnamevalueorg。datanucleus。api。jdo。JDOPersistenceManagerFactoryvaluepropertypropertynamehive。metastore。schema。verificationnamevaluefalsevaluepropertypropertynamedatanucleus。schema。autoCreateAllnamevaluetruevaluepropertyconfiguration2。4。3配置hiveenv。sh
加入以下内容HADOOPHOMEdatahadoop3。3。32。4。4hive元数据初始化schematoolinitSchemadbTypemysql2。4。5启动hivemetaservice
后台运行nohupbinhiveservicemetaservice1devnull21
验证lsofi:9083COMMANDPIDUSERFDTYPEDEVICESIZEOFFNODENAMEjava20700root567uIPv6546053480t0TCP:emcppmgmtsvc(LISTEN)2。5安装MySQL
具体请参照这里:
使用FlinkCDC实现MySQL数据实时入ApacheDoris2。5。1创建MySQL数据库表并初始化数据CREATEDATABASEdemo;USEdemo;CREATETABLEuserinfo(idintNOTNULLAUTOINCREMENT,nameVARCHAR(255)NOTNULLDEFAULTflink,addressVARCHAR(1024),phonenumberVARCHAR(512),emailVARCHAR(255),PRIMARYKEY(id))ENGINEInnoDB;INSERTINTOuserinfoVALUES(10001,user110,Shanghai,13347420870,NULL);INSERTINTOuserinfoVALUES(10002,user111,xian,13347420870,NULL);INSERTINTOuserinfoVALUES(10003,user112,beijing,13347420870,NULL);INSERTINTOuserinfoVALUES(10004,user113,shenzheng,13347420870,NULL);INSERTINTOuserinfoVALUES(10005,user114,hangzhou,13347420870,NULL);INSERTINTOuserinfoVALUES(10006,user115,guizhou,13347420870,NULL);INSERTINTOuserinfoVALUES(10007,user116,chengdu,13347420870,NULL);INSERTINTOuserinfoVALUES(10008,user117,guangzhou,13347420870,NULL);INSERTINTOuserinfoVALUES(10009,user118,xian,13347420870,NULL);2。6安装Flinktarzxvfflink1。14。4binscala2。12。tgz
然后需要将下面的依赖拷贝到Flink安装目录下的lib目录下,具体的依赖的lib文件如下:
下面将几个Hadoop和Flink里没有的依赖下载地址放在下面wgethttps:repo1。maven。orgmaven2comververicaflinksqlconnectormysqlcdc2。2。1flinksqlconnectormysqlcdc2。2。1。jarwgethttps:repo1。maven。orgmaven2orgapachethriftlibfb3030。9。3libfb3030。9。3。jarwgethttps:search。maven。orgremotecontent?filepathorgapacheicebergicebergflinkruntime1。140。13。2icebergflinkruntime1。140。13。2。jarwgethttps:repository。cloudera。comartifactoryclouderareposorgapacheflinkflinkshadedhadoop3uber3。1。1。7。2。9。01739。0flinkshadedhadoop3uber3。1。1。7。2。9。01739。0。jar
其他的:hadoop3。3。3sharehadoopcommonlibcommonsconfiguration22。1。1。jarhadoop3。3。3sharehadoopcommonlibcommonslogging1。1。3。jarhadoop3。3。3sharehadooptoolslibhadooparchivelogs3。3。3。jarhadoop3。3。3sharehadoopcommonlibhadoopauth3。3。3。jarhadoop3。3。3sharehadoopcommonlibhadoopannotations3。3。3。jarhadoop3。3。3sharehadoopcommonhadoopcommon3。3。3。jaradoop3。3。3sharehadoophdfshadoophdfs3。3。3。jarhadoop3。3。3sharehadoopclienthadoopclientapi3。3。3。jarhive3。1。3libhiveexec3。1。3。jarhive3。1。3libhivemetastore3。1。3。jarhive3。1。3libhivehcatalogcore3。1。3。jar2。6。1启动Flinkbinstartcluster。sh
启动后的界面如下:
2。6。2进入FlinkSQLClientbinsqlclient。shembedded
开启checkpoint,每隔3秒做一次checkpoint
Checkpoint默认是不开启的,我们需要开启Checkpoint来让Iceberg可以提交事务。并且,mysqlcdc在binlog读取阶段开始前,需要等待一个完整的checkpoint来避免binlog记录乱序的情况。注意:
这里是演示环境,checkpoint的间隔设置比较短,线上使用,建议设置为35分钟一次checkpoint。FlinkSQLSETexecution。checkpointing。interval3s;〔INFO〕Sessionpropertyhasbeenset。2。6。3创建IcebergCatalogCREATECATALOGhivecatalogWITH(typeiceberg,catalogtypehive,urithrift:localhost:9083,clients5,propertyversion1,warehousehdfs:localhost:8020userhivewarehouse);
查看catalogFlinkSQLshowcatalogs;catalognamedefaultcataloghivecatalog2rowsinset2。6。4创建MysqlCDC表CREATETABLEusersource(databasenameSTRINGMETADATAVIRTUAL,tablenameSTRINGMETADATAVIRTUAL,idDECIMAL(20,0)NOTNULL,nameSTRING,addressSTRING,phonenumberSTRING,emailSTRING,PRIMARYKEY(id)NOTENFORCED)WITH(connectormysqlcdc,hostnamelocalhost,port3306,usernameroot,passwordMyNewPass4!,databasenamedemo,tablenameuserinfo);
查询CDC表:selectfromusersource;
2。6。5创建Iceberg表查看catalogshowcatalogs;使用catalogusecataloghivecatalog;创建数据库CREATEDATABASEiceberghive;使用数据库useiceberghive;2。6。5。1创建表CREATETABLEallusersinfo(databasenameSTRING,tablenameSTRING,idDECIMAL(20,0)NOTNULL,nameSTRING,addressSTRING,phonenumberSTRING,emailSTRING,PRIMARYKEY(databasename,tablename,id)NOTENFORCED)WITH(catalogtypehive);
从CDC表里插入数据到Iceberg表里usecatalogdefaultcatalog;insertintohivecatalog。iceberghive。allusersinfoselectfromusersource;
在web界面可以看到任务的运行情况
然后停掉任务,我们去查询iceberg表selectfromhivecatalog。iceberghive。allusersinfo
可以看到下面的结果
我们去hdfs上可以看到hive目录下的数据及对应的元数据
我们也可以通过Hive建好Iceberg表,然后通过Flink将数据插入到表里
下载IcebergHive运行依赖wgethttps:repo1。maven。orgmaven2orgapacheicebergiceberghiveruntime0。13。2iceberghiveruntime0。13。2。jar
在hiveshell下执行:SETengine。hive。enabledtrue;SETiceberg。engine。hive。enabledtrue;SETiceberg。mr。cataloghive;addjarpathtoiiceberghiveruntime0。13。2。jar;
创建表CREATEEXTERNALTABLEiceberghive(idint,namestring)STOREDBYorg。apache。iceberg。mr。hive。HiveIcebergStorageHandlerLOCATIONhdfs:localhost:8020userhivewarehouseiceberdbiceberghiveTBLPROPERTIES(iceberg。mr。cataloghadoop,iceberg。mr。catalog。hadoop。warehouse。locationhdfs:localhost:8020userhivewarehouseiceberdbiceberghive);
然后再FlinkSQLClient下执行下面语句将数据插入到Iceber表里INSERTINTOhivecatalog。iceberghive。iceberghivevalues(2,c);INSERTINTOhivecatalog。iceberghive。iceberghivevalues(3,zhangfeng);
查询这个表selectfromhivecatalog。iceberghive。iceberghive
可以看到下面的结果
3。Doris查询Iceberg
ApacheDoris提供了Doris直接访问Iceberg外部表的能力,外部表省去了繁琐的数据导入工作,并借助Doris本身的OLAP的能力来解决Iceberg表的数据分析问题:支持Iceberg数据源接入Doris支持Doris与Iceberg数据源中的表联合查询,进行更加复杂的分析操作3。1安装Doris
这里我们不在详细讲解Doris的安装,如果你不知道怎么安装Doris请参照官方文档:快速入门3。2创建Iceberg外表CREATETABLEallusersinfoENGINEICEBERGPROPERTIES(iceberg。databaseiceberghive,iceberg。tableallusersinfo,iceberg。hive。metastore。uristhrift:localhost:9083,iceberg。catalog。typeHIVECATALOG);参数说明:ENGINE需要指定为ICEBERGPROPERTIES属性:iceberg。hive。metastore。uris:HiveMetastore服务地址iceberg。database:挂载Iceberg对应的数据库名iceberg。table:挂载Iceberg对应的表名,挂载Icebergdatabase时无需指定。iceberg。catalog。type:Iceberg中使用的catalog方式,默认为HIVECATALOG,当前仅支持该方式,后续会支持更多的Icebergcatalog接入方式。mysqlCREATETABLEallusersinfoENGINEICEBERGPROPERTIES(iceberg。databaseiceberghive,iceberg。tableallusersinfo,iceberg。hive。metastore。uristhrift:localhost:9083,iceberg。catalog。typeHIVECATALOG);QueryOK,0rowsaffected(0。23sec)mysqlselectfromallusersinfo;databasenametablenameidnameaddressphonenumberemaildemouserinfo10004user113shenzheng13347420870NULLdemouserinfo10005user114hangzhou13347420870NULLdemouserinfo10002user111xian13347420870NULLdemouserinfo10003user112beijing13347420870NULLdemouserinfo10001user110Shanghai13347420870NULLdemouserinfo10008user117guangzhou13347420870NULLdemouserinfo10009user118xian13347420870NULLdemouserinfo10006user115guizhou13347420870NULLdemouserinfo10007user116chengdu13347420870NULL9rowsinset(0。18sec)3。3同步挂载
当Iceberg表Schema发生变更时,可以通过REFRESH命令手动同步,该命令会将Doris中的Iceberg外表删除重建。同步Iceberg表REFRESHTABLEticeberg;同步Iceberg数据库REFRESHDATABASEicebergtestdb;3。4Doris和Iceberg数据类型对应关系
支持的Iceberg列类型与Doris对应关系如下表:
ICEBERG
DORIS
描述
BOOLEAN
BOOLEAN
INTEGER
INT
LONG
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
DATE
DATE
TIMESTAMP
DATETIME
Timestamp转成Datetime会损失精度
STRING
STRING
UUID
VARCHAR
使用VARCHAR来代替
DECIMAL
DECIMAL
TIME
不支持
FIXED
不支持
BINARY
不支持
STRUCT
不支持
LIST
不支持
MAP
不支持3。5注意事项Iceberg表Schema变更不会自动同步,需要在Doris中通过REFRESH命令同步Iceberg外表或数据库。当前默认支持的Iceberg版本为0。12。0,0。13。x,未在其他版本进行测试。后续后支持更多版本。3。6DorisFE配置
下面几个配置属于Iceberg外表系统级别的配置,可以通过修改fe。conf来配置,也可以通过ADMINSETCONFIG来配置。icebergtablecreationstrictmode创建Iceberg表默认开启strictmode。strictmode是指对Iceberg表的列类型进行严格过滤,如果有Doris目前不支持的数据类型,则创建外表失败。icebergtablecreationintervalsecond自动创建Iceberg表的后台任务执行间隔,默认为10s。maxicebergtablecreationrecordsizeIceberg表创建记录保留的最大值,默认为2000。仅针对创建Iceberg数据库记录。4。总结
这里DorisOnIceberg我们只演示了Iceberg单表的查询,你还可以联合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等进行联合查询分析,通过Doris对外提供统一的查询分析入口。
自此我们完整从搭建Hadoop,hive、flink、Mysql、Doris及DorisOnIceberg的使用全部介绍完了,Doris朝着数据仓库和数据融合的架构演进,支持湖仓一体的联邦查询,给我们的开发带来更多的便利,更高效的开发,省去了很多数据同步的繁琐工作,快快来体验吧。
大爆冷!国乒全运亚军22仍惊天逆转WTT卡塔尔乒乓球支线赛中,国乒派出了一批年轻选手为主的参赛阵容,单打冠军将有150分的积分。在资格赛决胜轮的比赛中,国乒折损2员大将,2021年陕西全运会混双亚军曹巍意外输给……
摩尔特里空砍2418沈梓捷16分献准绝杀深圳终结新疆3连胜北京时间3月14日,CBA常规赛,深圳以105104险胜新疆,终结新疆的3连胜势头。新疆队摩尔特里24分16篮板4助攻,唐才育18分7助攻,阿不都沙拉木23分7篮板4助攻……
读书带来的幸福2022该来的都在路上,我是江西嫁广西奔五的兔兔。姻缘天注定。相由心生,境由心造,一个人的容颜四十岁以后由自己负责,与经历有关,与读书有关。都说书是最好的美容剂,我总觉得爱读书……
经常吃三七粉,对心脑血管有好处?这3类人建议少喝想要养生来促进机体健康得找对方法,有的人会利用某些药材,食材等来获取丰富的营养,觉得摄入量越多身体条件越好,能增强抵抗力,提高器官功能,却不知道某些药物是有副作用的,需要自身条……
睡得晚,孩子注意力差来源:【生命时报】多动症即注意缺陷与多动障碍(ADHD),以儿童期注意力不集中、活动过度和情绪冲动为主要特征,与遗传和环境等因素有关。近日,日本名古屋大学医学部附属……
天才少女谷爱凌刷新时尚圈,冬奥冠军穿搭个性又飒气这两天冬奥冠军谷爱凌霸屏无数社交平台,天才少女不仅实力强、颜值高,而且时尚品味也是一绝啊看她的出片精彩程度不亚于明星博主。青蛙公主的穿搭甜酷,飒气,基本上单品里出现最多的……
剧本杀魔王去死去死团复盘解析剧透结局凶手是谁真相答案剧本名称:魔王去死去死团人数:6人(3男3女)可反串阅读量:1w字(单人阅读体量)时长:4H发行:司家侦探工作室《魔王去死去死团》剧本杀故事简介复……
拳王约书亚悬了?泰森富里直言约书亚二战乌西克会被KO2月14日,据美国拳击媒体披露,WBC重量级世界拳王泰森富里不看好约书亚复仇乌西克,尽管约书亚在一番战中输得差距不大,很多人觉得他有机会在二番战复仇,不过富里却坚持自己的观点,……
网传小鹏汽车因订单储备不足,开始变相降价7月16日,博主孙少军09发文称,因为产能爆发、订单储备不足,小鹏从今天起新订车客户以尾款减免形式可以优惠50001万元。同时,他还透露,除了小鹏汽车,还有几家也在偷偷放政策了……
外星人降落大楼楼顶我并不是自愿地来到这颗星球上的,现在想来那时候好像上一秒还躺在家乡的金黄色田野上(那时候正值家乡的阿伊节),下一秒就躺在大楼天台的雨水中。就在现在,用手指从半个一数到一的功夫。……
信仰有多烧钱?你有信仰吗?你有虔诚的追求吗?有为某一存在不辞辛劳、不计花销吗?今天坐车时,有幸认识了一位大哥,他是一位虔诚的佛教徒,从他嘴里,我得知了一个坚持数十年,千里奔波,只为烧香的故事……
戏耍皇马!老佛爷愤怒决定10亿续约维尼修斯,永不录用姆巴佩皇马追逐了1年多时间的世界第一身价球星姆巴佩,撕毁此前与皇马达成加盟协议,最终以一种莫名其妙的方式续约巴黎至2025年,让全世界足球迷都大吃一惊!而见惯了江湖风起云涌的皇……