如何同步mysql数据到Doris中

如何同步mysql数据到Doris中,第1张

Doris官网定义 mysql原始表结构 1.doris中关联mysql外表 结果如下: 2.doris中关联kafka导入数据 查看作业 State为RUNNING,表示已经成功。 停止作业 3.通过flink导入mysql数据到doris 方法1:通过mysql-cdc写入kafka,kafka关联doris表。 方法2:通过阿里云DTS->datahub,然后通过Flink写入kafka,再关联到doris外表 如何处理delete数据?对于方法1,需要手动的删除doris中的数据;对于方法2,可以通过dts_operation_flag字段来标示,dts_operation_flag可以为I/U/D,分别表示添加、更新和删除。那我们就只需要在doris表中添加一个dts_operation_flag字段来标示就可以了,查询数据的时候就不再查询等于D的值。 如何处理脏数据?delete doris中的数据,然后insert正确的值;还有个方法是将关联一个外表(这个是正确的值),然后再将doris中的表和外表中的值diff,将diff的值insert到doris中。

注 修改配置之后,需要通过 metadata_failure_recovery=true 来清空 bdbje 的元数据, 因此当修改 FE 端口等都可以通过在 fe/conf/fe.conf 的方式来使其生效即可,然后将 fe.conf 文件中的 metadata_failure_recovery=true 删除防止下次再次清除咯。(源码: org.apache.doris.journal.bdbje.BDBDebugger )

修改 fe.conf 文件增加 enable_bdbje_debug_mode=true , 然后通过 sh start_fe.sh --daemon 启动 FE 即可进入 debug 模式。 进入 debug 模式之后,仅会启动 http server 和 MySQLServer 并打开 BDBJE 实例,但不会进入任何元数据的加载及后续其他启动流程。

进入 FE web ui 或 MySQL 客户端访问 Doris 服务:

当硬盘损坏之后,磁盘上 image 文件可能损坏,但内存中元数据完好的情况下可以从内存中 dump 元数据,在替换磁盘上的 image 文件,来恢复元数据,整个不停查询服务的 *** 作如下:

FE 启动脚本中指定了 -helper 参数,并且指向了正确的 leader FE 节点,那么该 FE 首先会通过 http 向 leader 节点询问自身的角色(即 ROLE )和 cluster_id 。然后拉取最新的 image 文件。读取 image 文件,生成元数据镜像后,启动 bdbje ,开始进行 bdbje 日志同步。同步完成后,开始回放 bdbje 中, image 文件之后的日志,完成最终的元数据镜像生成。

Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

下图展示了 Stream load 的主要流程,省略了一些导入细节。

Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。

用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。

导入的最终结果由 Coordinator BE 返回给用户。

目前 Stream Load 支持两个数据格式:CSV(文本) 和 JSON

Stream load 通过 HTTP 协议提交和传输数据。这里通过 curl 命令展示如何提交导入。

用户也可以通过其他 HTTP client 进行 *** 作。

示例:

user/passwd

Stream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。

Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。下面主要介绍了 Stream load 导入任务参数的部分参数意义。

这里以列类型为 TinyInt 来举例:

(注:当表中的列允许导入空值时)

这里以列类型为 Decimal(1,0) 举例:

(注:当表中的列允许导入空值时)

注意:

10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。

由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。

示例:

下面主要解释了 Stream load 导入结果参数:

用户无法手动取消 Stream load,Stream load 在超时或者导入错误后会被系统自动取消。

stream_load_default_timeout_second

导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。

默认的 timeout 时间为 600 秒。如果导入的源文件无法在规定时间内完成导入,用户可以在 stream load 请求中设置单独的超时时间。

或者调整 FE 的参数stream_load_default_timeout_second 来设置全局的默认超时时间。

streaming_load_max_mb

Stream load 的最大导入大小,默认为 10G,单位是 MB。如果用户的原始文件超过这个值,则需要调整 BE 的参数 streaming_load_max_mb。

示例1:

以 "table1_20211207" 为 Label,使用本地文件 table1_data 导入 table1 表。

本地文件 table1_data 以 , 作为数据之间的分隔,具体内容如下:

最终导入命令:

示例2:

以 "table2_20211207" 为 Label,使用本地文件 table2_data 导入 table2 表。

本地文件 table2_data 以 | 作为数据之间的分隔,具体内容如下:


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/7303570.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-04
下一篇 2023-04-04

发表评论

登录后才能评论

评论列表(0条)

保存