如何实现一个新引擎

    1.2 实现ECP的主要接口

    • EngineConnPlugin: 启动EngineConn时,先找到对应的EngineConnPlugin类,以此为入口,获取其它核心接口的实现,是必须实现的主要接口。

    • EngineConnFactory: 实现如何启动一个引擎连接器,和如何启动一个引擎执行器的逻辑,是必须实现的接口。

      • 实现createEngineConn方法:返回一个EngineConn对象,其中,getEngine返回一个封装了与底层引擎连接信息的对象,同时包含Engine类型信息。
      • 对于只支持单一计算场景的引擎,继承SingleExecutorEngineConnFactory,实现createExecutor,返回对应的Executor。
      • 对于支持多计算场景的引擎,需要继承MultiExecutorEngineConnFactory,并为每种计算类型实现一个ExecutorFactory。EngineConnPlugin会通过反射获取所有的ExecutorFactory,根据实际情况返回对应的Executor。
    • EngineConnResourceFactory: 用于限定启动一个引擎所需要的资源,引擎启动前,将以此为依 据 向 Linkis Manager 申 请 资 源。非必须,默认可以使用GenericEngineResourceFactory。

    • EngineLaunchBuilder: 用于封装EngineConnManager可以解析成启动命令的必要信息。非必须,可以直接继承JavaProcessEngineConnLaunchBuilder。

    1.3 实现引擎Executor执行器逻辑

    Executor为执行器,作为真正的计算场景执行器,是实际的计算逻辑执行单元,也是对引擎各种具体能力的抽象,提供加锁、访问状态、获取日志等多种不同的服务。并根据实际的使用需要,Linkis默认提供以下的派生Executor基类,其类名和主要作用如下:

    • SensibleExecutor:
      • Executor存在多种状态,允许Executor切换状态
      • Executor切换状态后,允许做通知等操作
    • YarnExecutor: 指Yarn类型的引擎,能够获取得到applicationId和applicationURL和队列。
    • ResourceExecutor: 指引擎具备资源动态变化的能力,配合提供requestExpectedResource方法,用于每次希望更改资源时,先向RM申请新的资源;而resourceUpdate方法,用于每次引擎实际使用资源发生变化时,向RM汇报资源情况。
    • AccessibleExecutor: 是一个非常重要的Executor基类。如果用户的Executor继承了该基类,则表示该Engine是可以被访问的。这里需区分SensibleExecutor的state()和 AccessibleExecutor 的 getEngineStatus()方法:state()用于获取引擎状态,getEngineStatus()会获取引擎的状态、负载、并发等基础指标Metric数据。
    • 同时,如果继承了 AccessibleExecutor,会触发Engine 进程实例化多个EngineReceiver方法。EngineReceiver用于处理Entrance、EM和LinkisMaster的RPC请求,使得该引擎变成了一个可被访问的引擎,用户如果有特殊的RPC需求,可以通过实现RPCService接口,进而实现与AccessibleExecutor通信。
    • ExecutableExecutor: 是一个常驻型的Executor基类,常驻型的Executor包含:生产中心的Streaming应用、提交给Schedulis后指定要以独立模式运行的脚步、业务用户的业务应用等。
    • StreamingExecutor: Streaming为流式应用,继承自ExecutableExecutor,需具备诊断、do checkpoint、采集作业信息、监控告警的能力。
    • ComputationExecutor: 是常用的交互式引擎Executor,处理交互式执行任务,并且具备状态查询、任务kill等交互式能力。
    • ConcurrentComputationExecutor: 用户并发引擎Executor,常用于JDBC类型引擎,执行脚本时,由管理员账户拉起引擎实例,所有用户共享引擎实例。

    本章节以JDBC引擎举例,详解新引擎的实现过程,包括引擎代码的编译、安装、数据库配置,管理台引擎标签适配,以及Scripts中新引擎脚本类型扩展和工作流新引擎的任务节点扩展等。

    2.1 并发引擎设置默认启动用户

    JDBC引擎中的核心类继承的抽象类是ConcurrentComputationExecutor,计算类引擎中的核心类XXXEngineConnExecutor继承的抽象类是ComputationExecutor。这导致两者最大的一个区别是:JDBC引擎实例由管理员用户启动,被所有用户共享,以提高机器资源利用率;而计算引擎类型的脚本在提交时,每个用户下会启动一个引擎实例,用户间引擎实例互相隔离。这个在此处暂不细说,因为无论是并发引擎还是计算引擎,下文提到的额外修改流程应是一致的。

    相应的,如果你新增的引擎是并发引擎,那么你需要关注下这个类:AMConfiguration.scala,如果你新增的引擎是计算类引擎,则可忽略。

    1. object AMConfiguration {
    2. // 如果你的引擎是多用户并发引擎,那么这个配置项需要关注下
    3. val MULTI_USER_ENGINE_TYPES = CommonVars("wds.linkis.multi.user.engine.types", "jdbc,ck,es,io_file,appconn")
    4. private def getDefaultMultiEngineUser(): String = {
    5. // 此处应该是为了设置并发引擎拉起时的启动用户,默认jvmUser即是引擎服务Java进程的启动用户
    6. val jvmUser = Utils.getJvmUser
    7. s"""{jdbc:"$jvmUser", presto: "$jvmUser" es: "$jvmUser", ck:"$jvmUser", appconn:"$jvmUser", io_file:"root"}"""
    8. }
    9. }

    实现ComputationSingleExecutorEngineConnFactory接口的类JDBCEngineConnFactory中,下面两个方法需要实现:

    1. override protected def getEngineConnType: EngineType = EngineType.JDBC
    2. override protected def getRunType: RunType = RunType.JDBC

    因此需要在EngineType和RunType中增加JDBC对应的变量。

    1. // EngineType.scala中类似已存在引擎的变量定义,增加JDBC相关变量或代码
    2. object EngineType extends Enumeration with Logging {
    3. val JDBC = Value("jdbc")
    4. }
    5. def mapStringToEngineType(str: String): EngineType = str match {
    6. case _ if JDBC.toString.equalsIgnoreCase(str) => JDBC
    7. }
    8. // RunType.scla中
    9. object RunType extends Enumeration {
    10. val JDBC = Value("jdbc")
    11. }

    2.3 JDBC引擎标签中的版本号设置

    1. // 在LabelCommonConfig中增加JDBC的version配置
    2. public class LabelCommonConfig {
    3. public final static CommonVars<String> JDBC_ENGINE_VERSION = CommonVars.apply("wds.linkis.jdbc.engine.version", "4");
    4. }
    5. // 在EngineTypeLabelCreator的init方法中补充jdbc的匹配逻辑
    6. // 如果这一步不做,代码提交到引擎上时,引擎标签信息中会缺少版本号
    7. public class EngineTypeLabelCreator {
    8. private static void init() {
    9. defaultVersion.put(EngineType.JDBC().toString(), LabelCommonConfig.JDBC_ENGINE_VERSION.getValue());
    10. }
    11. }

    2.4 允许脚本编辑器打开的脚本文件类型

    在FileSource.scala中的fileType数组中增加jdbc引擎的脚本类型,如果不加,Scripts文件列表中不允许打开JDBC引擎的脚本类型

    1. // FileSource.scala中
    2. object FileSource {
    3. private val fileType = Array("......", "jdbc")
    4. }

    2.5 配置JDBC脚本变量存储和解析

    如果这个操作不做,JDBC的脚本中变量不能被正常存储和解析,脚本中直接使用${变量}时代码会执行失败!

    变量解析

    1. // QLScriptCompaction.scala
    2. class QLScriptCompaction private extends CommonScriptCompaction{
    3. override def belongTo(suffix: String): Boolean = {
    4. suffix match {
    5. ...
    6. case "jdbc" => true
    7. case _ => false
    8. }
    9. }
    10. }
    11. // QLScriptParser.scala
    12. override def belongTo(suffix: String): Boolean = {
    13. suffix match {
    14. case "jdbc" => true
    15. case _ => false
    16. }
    17. }
    18. }
    19. // CustomVariableUtils.scala中
    20. object CustomVariableUtils extends Logging {
    21. def replaceCustomVar(jobRequest: JobRequest, runType: String): (Boolean, String) = {
    22. runType match {
    23. ......
    24. case "hql" | "sql" | "fql" | "jdbc" | "hive"| "psql" => codeType = SQL_TYPE
    25. case _ => return (false, code)
    26. }
    27. }
    28. }

    web/src/dss/module/resourceSimple/engine.vue

    最终呈现给用户的效果:

    2.7 JDBC引擎的编译打包和安装部署

    JDBC引擎模块编译的示例命令如下:

    1. cd /linkis-project/linkis-engineconn-plugins/engineconn-plugins/jdbc
    2. mvn clean install -DskipTests

    assembly-combined-package/assembly-combined/src/main/assembly/assembly.xml

    1. <!--jdbc-->
    2. <fileSets>
    3. ......
    4. <fileSet>
    5. <directory>
    6. ../../linkis-engineconn-plugins/engineconn-plugins/jdbc/target/out/
    7. </directory>
    8. <outputDirectory>lib/linkis-engineconn-plugins/</outputDirectory>
    9. <includes>
    10. <include>**/*</include>
    11. </includes>
    12. </fileSet>
    13. </fileSets>

    然后对在项目根目录运行编译命令:

    1. mvn clean install -DskipTests

    编译成功后在assembly-combined-package/target/apache-linkis-1.x.x-incubating-bin.tar.gz和linkis-engineconn-plugins/engineconn-plugins/jdbc/target/目录下找到out.zip。

    上传out.zip文件到Linkis的部署节点,解压缩到Linkis安装目录/lib/linkis-engineconn-plugins/下面:

    引擎安装

    解压后别忘记删除out.zip,至此引擎编译和安装完成。

    2.8 JDBC引擎数据库配置

    在管理台选择添加引擎

    如果您希望在管理台支持引擎参数配置,可以按照JDBC引擎SQL示例修改数据库。

    此处以JDBC引擎为例,引擎安装完之后,要想运行新的引擎代码,还需对引擎进行数据库配置,以JDBC引擎为例,按照你自己实现的新引擎的情况,请按需修改。

    SQL参考如下:

    1. SET @JDBC_LABEL="jdbc-4";
    2. SET @JDBC_ALL=CONCAT('*-*,',@JDBC_LABEL);
    3. SET @JDBC_IDE=CONCAT('*-IDE,',@JDBC_LABEL);
    4. SET @JDBC_NODE=CONCAT('*-nodeexecution,',@JDBC_LABEL);
    5. -- JDBC
    6. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'jdbc引擎最大并发数', '2', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'jdbc');
    7. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.driver', '取值范围:对应JDBC驱动名称', 'jdbc驱动名称','', 'None', '', '0', '0', '1', '数据源配置', 'jdbc');
    8. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.connect.url', '例如:jdbc:hive2://127.0.0.1:10000', 'jdbc连接地址', 'jdbc:hive2://127.0.0.1:10000', 'None', '', '0', '0', '1', '数据源配置', 'jdbc');
    9. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.version', '取值范围:jdbc3,jdbc4', 'jdbc版本','jdbc4', 'OFT', '[\"jdbc3\",\"jdbc4\"]', '0', '0', '1', '数据源配置', 'jdbc');
    10. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.connect.max', '范围:1-20,单位:个', 'jdbc引擎最大连接数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '数据源配置', 'jdbc');
    11. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.auth.type', '取值范围:SIMPLE,USERNAME,KERBEROS', 'jdbc认证方式', 'USERNAME', 'OFT', '[\"SIMPLE\",\"USERNAME\",\"KERBEROS\"]', '0', '0', '1', '用户配置', 'jdbc');
    12. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.username', 'username', '数据库连接用户名', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
    13. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.password', 'password', '数据库连接密码', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
    14. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.principal', '例如:hadoop/host@KDC.COM', '用户principal', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
    15. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.keytab.location', '例如:/data/keytab/hadoop.keytab', '用户keytab文件路径', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
    16. insert into `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.jdbc.proxy.user.property', '例如:hive.server2.proxy.user', '用户代理配置', '', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
    17. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.cores', '取值范围:1-8,单位:个', 'jdbc引擎初始化核心个数', '1', 'NumInterval', '[1,8]', '0', '0', '1', 'jdbc引擎设置', 'jdbc');
    18. INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', '取值范围:1-8,单位:G', 'jdbc引擎初始化内存大小', '1g', 'Regex', '^([1-8])(G|g)$', '0', '0', '1', 'jdbc引擎设置', 'jdbc');
    19. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_ALL, 'OPTIONAL', 2, now(), now());
    20. insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
    21. (select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'jdbc' and label_value = @JDBC_ALL);
    22. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_IDE, 'OPTIONAL', 2, now(), now());
    23. insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType',@JDBC_NODE, 'OPTIONAL', 2, now(), now());
    24. select @label_id := id from linkis_cg_manager_label where `label_value` = @JDBC_IDE;
    25. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
    26. select @label_id := id from linkis_cg_manager_label where `label_value` = @JDBC_NODE;
    27. insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);
    28. -- jdbc default configuration
    29. insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
    30. (select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @JDBC_ALL);

    如果想重置引擎的数据库配置数据,参考文件如下,请按需进行修改使用:

    1. -- 清除jdbc引擎的初始化数据
    2. SET @JDBC_ALL=CONCAT('*-*,',@JDBC_LABEL);
    3. SET @JDBC_IDE=CONCAT('*-IDE,',@JDBC_LABEL);
    4. SET @JDBC_NODE=CONCAT('*-nodeexecution,',@JDBC_LABEL);
    5. delete from `linkis_ps_configuration_config_value` where `config_label_id` in
    6. (select `relation`.`engine_type_label_id` AS `config_label_id` FROM `linkis_ps_configuration_key_engine_relation` relation INNER JOIN `linkis_cg_manager_label` label ON relation.engine_type_label_id = label.id AND label.label_value = @JDBC_ALL);
    7. delete from `linkis_ps_configuration_key_engine_relation`
    8. where `engine_type_label_id` in
    9. (select label.id FROM `linkis_ps_configuration_config_key` config
    10. INNER JOIN `linkis_cg_manager_label` label
    11. ON config.engine_conn_type = 'jdbc' and label_value = @JDBC_ALL);
    12. delete from `linkis_ps_configuration_category`
    13. where `label_id` in (select id from `linkis_cg_manager_label` where `label_value` in(@JDBC_IDE, @JDBC_NODE));
    14. delete from `linkis_ps_configuration_config_key` where `engine_conn_type` = 'jdbc';
    15. delete from `linkis_cg_manager_label` where `label_value` in (@JDBC_ALL, @JDBC_IDE, @JDBC_NODE);

    最终的效果:

    JDBC引擎

    这样配置完之后,linkis-cli以及Scripts提交引擎脚本时,才能正确匹配到引擎的标签信息和数据源的连接信息,然后才能拉起你新加的引擎。

    2.9 DSS Scripts中新增JDBC脚本类型以及图标等信息

    如果你使用到了DSS的Scripts功能,还需要对dss项目中web的前端文件进行一些小小的改动,改动的目的是为了在Scripts中支持新建、打开、执行JDBC引擎脚本类型,以及实现引擎对应的图标、字体等。

    2.9.1 scriptis.js

    web/src/common/config/scriptis.js

    1. {
    2. rule: /\.jdbc$/i,
    3. lang: 'hql',
    4. executable: true,
    5. application: 'jdbc',
    6. runType: 'jdbc',
    7. ext: '.jdbc',
    8. scriptType: 'jdbc',
    9. abbr: 'jdbc',
    10. logo: 'fi-jdbc',
    11. color: '#444444',
    12. isCanBeNew: true,
    13. label: 'JDBC',
    14. isCanBeOpen: true
    15. },

    2.9.2 脚本复制支持

    web/src/apps/scriptis/module/workSidebar/workSidebar.vue

    2.9.3 logo与字体配色

    1. data() {
    2. return {
    3. isHover: false,
    4. iconColor: {
    5. 'fi-jdbc': '#444444',
    6. },
    7. };
    8. },

    web/src/apps/scriptis/module/workbench/modal.js

    1. let logoList = [
    2. { rule: /\.jdbc$/i, logo: 'fi-jdbc' },
    3. ];

    web/src/components/tree/support.js

    1. export const supportTypes = [
    2. // 这里大概没用到
    3. { rule: /\.jdbc$/i, logo: 'fi-jdbc' },
    4. ]

    引擎图标展示

    web/src/dss/module/resourceSimple/engine.vue

    1. methods: {
    2. calssifyName(params) {
    3. switch (params) {
    4. case 'jdbc':
    5. return 'JDBC';
    6. ......
    7. }
    8. }
    9. // 图标过滤
    10. supportIcon(item) {
    11. const supportTypes = [
    12. ......
    13. { rule: 'jdbc', logo: 'fi-jdbc' },
    14. ];
    15. }
    16. }

    web/src/dss/assets/projectIconFont/iconfont.css

    1. .fi-jdbc:before {
    2. content: "\e75e";
    3. }

    此处控制的应该是:

    找一个引擎图标的svg文件

    web/src/components/svgIcon/svg/fi-jdbc.svg

    如果新引擎后续需要贡献社区,那么新引擎对应的svg图标、字体等需要确认其所属的开源协议,或获取其版权许可。

    最终达成的效果:

    工作流适配

    在dss_workflow_node表中保存新加JDBC引擎的定义数据,参考SQL:

    1. # 引擎任务节点基本信息定义
    2. insert into `dss_workflow_node` (`id`, `name`, `appconn_name`, `node_type`, `jump_url`, `support_jump`, `submit_to_scheduler`, `enable_copy`, `should_creation_before_node`, `icon`) values('18','jdbc','-1','linkis.jdbc.jdbc',NULL,'1','1','1','0','svg文件');
    3. # svg文件对应新引擎任务节点图标
    4. # 引擎任务节点分类划分
    5. insert into `dss_workflow_node_to_group`(`node_id`,`group_id`) values (18, 2);
    6. # 引擎任务节点的基本信息(参数属性)绑定
    7. INSERT INTO `dss_workflow_node_to_ui`(`workflow_node_id`,`ui_id`) VALUES (18,45);

    web/src/apps/workflows/service/nodeType.js

    在web/src/apps/workflows/module/process/images/newIcon/目录下增加新引擎的图标

    web/src/apps/workflows/module/process/images/newIcon/jdbc

    同样贡献社区时,请考虑svg文件的lincese或版权。

    上述内容记录了新引擎的实现流程,以及额外需要做的一些引擎配置。目前,一个新引擎的扩展流程还是比较繁琐的,希望能在后续版本中,优化新引擎的扩展、以及安装等过程。