数据集成

    数据集成包括离线同步和实时同步,本文为您介绍离线和实时同步支持的数据源类型。

    1.离线同步支持数据源

    2.实时同步支持数据源

    数据源类型 Source Sink
    Mysql 支持 支持
    Mysql_cdc 支持 支持
    Kafka 支持 支持
    Redis 支持 支持
    Minio 支持 支持
    Oracle 支持 支持
    1. 选择数据源管理页面点击新建数据源。
    2. 填写数据源配置信息。
    1. 填写参数完毕后,点击右上角测试连接进行数据平台与将连接数据库进行连通性测试。
    2. 点击测试成功后,如果是MYSQL类型数据源可点击右上角自动探知,进行查看,同步数据库下对应表模型信息。

    3. 展示列表中,已勾选模型表示已创建源数据模型,可对未勾选模型进行创建源数据模型至源模型管理界面查看。

    4. 点击页面右侧编辑和删除可对已创建数据源进行管理。

    注意:

    1. 删除数据源时,如该数据源存在关联源数据模型及工作流,该数据源无法删除;
    2. INTERNAL(内部)/EXTERNAL(外部)区别:
    • 面向对象:INTERNAL主要针对企业内部系统数据源(已废弃),EXTERNAL针对企业外部系统数据源。
    • 功能:内部数据源在数据平台中创建的内部数据模型对应数据库有相同的表随之创建,外部数据源对应外部数据模型只拥有读取权限;
    1. 进入源模型管理页面选择创建INTERNAL(内部)/EXTERNAL(外部)源数据模型。源数据模型的类型取决去数据源的类型,即INTERNAL(内部)数据源对应INTERNAL(内部)源数据模型,EXTERNAL(外部)数据源对应EXTERNAL(外部)源数据模型;
    2. 点击右上角开始新增源数据模型(非mysql类型可以新建,mysql类型的直接通过数据源探知获取);
    参数 说明 是否必填项
    数据模型名称 数据模型名称
    数据源名称 选择数据模型所在的数据源
    数据模型标识 配置方式(参考4.3.3.2)
    描述 对模型进行描述
    1. SQL编译模式可写SQL对源数据模型进行定义表结构;

    1. 点击页面右侧编辑和删除可对已创建源数据模型进行管理;

    注意:

    1. 在内部模型中仅支持ALTER TABLE语法。
    2. 操作演示。

    数据集成 - 图2

    1. 离线数据集成任务配置时,任务节点所属工作流周期选择单次任务或周期任务。
    2. 拖拽锚点可创建新的节点任务,选择数据集成。

    3. 填写数据集成节点基础信息

    1. 填写配置信息参数
    模块 参数 说明
    模型同步来源 数据源 选择已配置完成的业务数据源
    数据模型 选择已配置完成的业务源数据模型
    加载策略 全量/增量
    加载时间 分/时/天(选择增量)
    过滤条件 对数据源中数据根据业务规则进行过滤
    模型同步目标 数据源 默认数据平台存储组件
    数据模型 对应数据平台ods表(如是第一次同步,点击加号进行新建ods表)
    是否去重 是否对抽取的数据集进行去重操作
    1. 完成基础信息和配置信息参数填写后,可在左下角进行指标配置字段配置,左侧字段可选择性移动到右侧ods表中。
    2. 可对每一个字段进行质量规则的配置,质量规则详情参考上述质量规则操作。

    3. 最后点击确认按钮完成数据集成节点配置。

    4. 操作演示。

    1. 实时任务配置时,任务节点所属工作流周期选择实时任务。
    2. 拖拽锚点可创建新的节点任务,选择数据集成,目前实时数据集成仅支持自定义编辑操作。

    3. 实时数据集成采用flink-connector,书写语法及参数配置可参考flink官网。

    4. 操作演示。

    数据集成 - 图4

    1. CREATE TABLE `rt_test1`
    2. (
    3. `freezeQty` DOUBLE COMMENT '冻结库存 '
    4. ,`distributionCode` STRING COMMENT '分区编码 '
    5. ,`id` BIGINT
    6. ) with (
    7. 'connector' = 'mysql-cdc',
    8. 'hostname' = 'rm-bp1p4wb6181in436c33150.mysql.rds.aliyuncs.com',
    9. 'username' = 'sync_binlog',
    10. 'database-name' = 'Demo_test',
    11. 'table-name' = 'rt_test1'
    12. );
    13. -- sink
    14. CREATE TABLE `ods_rt_test1`
    15. (
    16. `freezeQty` DOUBLE COMMENT '冻结库存 ',
    17. `distributionCode` STRING COMMENT '分区编码 ',
    18. `id` BIGINT,
    19. ) with (
    20. 'connector' = 'upsert-kafka',
    21. 'topic' = 'ods_rt_test1',
    22. 'properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092',
    23. 'key.format' = 'json',
    24. 'value.format' = 'json'
    25. );
    26. -- exec
    27. insert into ods_rt_test1
    28. from rt_test1;