Modules

    For example, users can define their own geo functions and plug them into Flink as built-in functions to be used in Flink SQL and Table APIs. Another example is users can load an out-of-shelf Hive module to use Hive built-in functions as Flink built-in functions.

    contains all of Flink’s system (built-in) functions and is loaded and enabled by default.

    The HiveModule provides Hive built-in functions as Flink’s system functions to SQL and Table API users. Flink’s Hive documentation provides full details on setting up the module.

    Users can develop custom modules by implementing the Module interface. To use custom modules in SQL CLI, users should develop both a module and its corresponding module factory by implementing the ModuleFactory interface.

    A module factory defines a set of properties for configuring the module when the SQL CLI bootstraps. Properties are passed to a discovery service where the service tries to match the properties to a ModuleFactory and instantiate a corresponding module instance.

    A module can be loaded, enabled, disabled and unloaded. When TableEnvironment loads a module initially, it enables the module by default. Flink supports multiple modules and keeps track of the loading order to resolve metadata. Besides, Flink only resolves the functions among enabled modules. E.g., when there are two functions of the same name residing in two modules, there will be three conditions.

    • If both of the modules are enabled, then Flink resolves the function according to the resolution order of the modules.
    • If one of them is disabled, then Flink resolves the function to the enabled module.
    • If both of the modules are disabled, then Flink cannot resolve the function.

    Besides, users can also disable modules by not declaring them. E.g., users can specify Flink to disable core module by USE MODULES hive (However, it is strongly not recommended disabling core module). Disable a module does not unload it, and users can enable it again by using it. E.g., users can bring back core module and place it in the first by USE MODULES core, hive. A module can be enabled only when it is loaded already. Using an unloaded module will throw an Exception. Eventually, users can unload a module.

    The difference between disabling and unloading a module is that TableEnvironment still keeps the disabled modules, and users can list all loaded modules to view the disabled modules.

    Objects provided by modules are considered part of Flink’s system (built-in) objects; thus, they don’t have any namespaces.

    Users can use SQL to load/unload/use/list modules in both Table API and SQL CLI.

    Java

    Scala

    1. val settings = EnvironmentSettings.inStreamingMode()
    2. val tableEnv = TableEnvironment.create(setting)
    3. // Show initially loaded and enabled modules
    4. tableEnv.executeSql("SHOW MODULES").print()
    5. // +-------------+
    6. // | module name |
    7. // +-------------+
    8. // | core |
    9. // +-------------+
    10. tableEnv.executeSql("SHOW FULL MODULES").print()
    11. // +-------------+------+
    12. // | module name | used |
    13. // +-------------+------+
    14. // | core | true |
    15. // +-------------+------+
    16. // Load a hive module
    17. tableEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '...')")
    18. // Show all enabled modules
    19. tableEnv.executeSql("SHOW MODULES").print()
    20. // +-------------+
    21. // | module name |
    22. // +-------------+
    23. // | core |
    24. // | hive |
    25. // +-------------+
    26. // Show all loaded modules with both name and use status
    27. tableEnv.executeSql("SHOW FULL MODULES")
    28. // +-------------+------+
    29. // | module name | used |
    30. // +-------------+------+
    31. // | core | true |
    32. // | hive | true |
    33. // +-------------+------+
    34. // Change resolution order
    35. tableEnv.executeSql("USE MODULES hive, core")
    36. tableEnv.executeSql("SHOW MODULES").print()
    37. // +-------------+
    38. // | module name |
    39. // +-------------+
    40. // | hive |
    41. // | core |
    42. // +-------------+
    43. tableEnv.executeSql("SHOW FULL MODULES").print()
    44. // +-------------+------+
    45. // | module name | used |
    46. // +-------------+------+
    47. // | hive | true |
    48. // | core | true |
    49. // +-------------+------+
    50. // Disable core module
    51. tableEnv.executeSql("USE MODULES hive")
    52. tableEnv.executeSql("SHOW MODULES").print()
    53. // +-------------+
    54. // | module name |
    55. // +-------------+
    56. // | hive |
    57. // +-------------+
    58. tableEnv.executeSql("SHOW FULL MODULES").print()
    59. // +-------------+-------+
    60. // | module name | used |
    61. // +-------------+-------+
    62. // | hive | true |
    63. // | core | false |
    64. // +-------------+-------+
    65. // Unload hive module
    66. tableEnv.executeSql("UNLOAD MODULE hive")
    67. tableEnv.executeSql("SHOW MODULES").print()
    68. // Empty set
    69. tableEnv.executeSql("SHOW FULL MODULES").print()
    70. // +-------------+-------+
    71. // | module name | used |
    72. // +-------------+-------+
    73. // | hive | false |
    74. // +-------------+-------+

    SQL Client

    1. -- Show initially loaded and enabled modules
    2. Flink SQL> SHOW MODULES;
    3. +-------------+
    4. | module name |
    5. +-------------+
    6. | core |
    7. +-------------+
    8. 1 row in set
    9. Flink SQL> SHOW FULL MODULES;
    10. +-------------+------+
    11. | module name | used |
    12. +-------------+------+
    13. | core | true |
    14. +-------------+------+
    15. 1 row in set
    16. -- Load a hive module
    17. Flink SQL> LOAD MODULE hive WITH ('hive-version' = '...');
    18. Flink SQL> SHOW MODULES;
    19. +-------------+
    20. | module name |
    21. +-------------+
    22. | core |
    23. | hive |
    24. +-------------+
    25. 2 rows in set
    26. -- Show all loaded modules with both name and use status
    27. Flink SQL> SHOW FULL MODULES;
    28. +-------------+------+
    29. | module name | used |
    30. +-------------+------+
    31. | core | true |
    32. | hive | true |
    33. +-------------+------+
    34. 2 rows in set
    35. -- Change resolution order
    36. Flink SQL> USE MODULES hive, core ;
    37. Flink SQL> SHOW MODULES;
    38. +-------------+
    39. | module name |
    40. +-------------+
    41. | hive |
    42. | core |
    43. +-------------+
    44. 2 rows in set
    45. Flink SQL> SHOW FULL MODULES;
    46. +-------------+------+
    47. | module name | used |
    48. +-------------+------+
    49. | hive | true |
    50. | core | true |
    51. +-------------+------+
    52. 2 rows in set
    53. -- Unload hive module
    54. Flink SQL> UNLOAD MODULE hive;
    55. Flink SQL> SHOW MODULES;
    56. Empty set
    57. Flink SQL> SHOW FULL MODULES;
    58. +-------------+-------+
    59. | module name | used |
    60. +-------------+-------+
    61. | hive | false |
    62. +-------------+-------+
    63. 1 row in set

    YAML

    All modules defined using YAML must provide a type property that specifies the type. The following types are supported out of the box.

    Users can use Java, Scala or Python to load/unload/use/list modules programmatically.

    Java

    1. EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
    2. TableEnvironment tableEnv = TableEnvironment.create(settings);
    3. // Show initially loaded and enabled modules
    4. tableEnv.listModules();
    5. // +-------------+
    6. // | module name |
    7. // +-------------+
    8. // | core |
    9. // +-------------+
    10. tableEnv.listFullModules();
    11. // +-------------+------+
    12. // | module name | used |
    13. // +-------------+------+
    14. // | core | true |
    15. // +-------------+------+
    16. // Load a hive module
    17. tableEnv.loadModule("hive", new HiveModule());
    18. // Show all enabled modules
    19. tableEnv.listModules();
    20. // +-------------+
    21. // | module name |
    22. // +-------------+
    23. // | core |
    24. // | hive |
    25. // +-------------+
    26. // Show all loaded modules with both name and use status
    27. tableEnv.listFullModules();
    28. // +-------------+------+
    29. // | module name | used |
    30. // +-------------+------+
    31. // | core | true |
    32. // | hive | true |
    33. // +-------------+------+
    34. // Change resolution order
    35. tableEnv.useModules("hive", "core");
    36. tableEnv.listModules();
    37. // +-------------+
    38. // | module name |
    39. // +-------------+
    40. // | hive |
    41. // | core |
    42. // +-------------+
    43. tableEnv.listFullModules();
    44. // +-------------+------+
    45. // | module name | used |
    46. // +-------------+------+
    47. // | hive | true |
    48. // | core | true |
    49. // +-------------+------+
    50. tableEnv.useModules("hive");
    51. // +-------------+
    52. // | module name |
    53. // +-------------+
    54. // | hive |
    55. // +-------------+
    56. tableEnv.listFullModules();
    57. // +-------------+-------+
    58. // | module name | used |
    59. // +-------------+-------+
    60. // | hive | true |
    61. // | core | false |
    62. // +-------------+-------+
    63. // Unload hive module
    64. tableEnv.unloadModule("hive");
    65. tableEnv.listModules();
    66. // Empty set
    67. tableEnv.listFullModules();
    68. // +-------------+-------+
    69. // | module name | used |
    70. // +-------------+-------+
    71. // | hive | false |
    72. // +-------------+-------+

    Python

    1. from pyflink.table import *
    2. # environment configuration
    3. settings = EnvironmentSettings.inStreamingMode()
    4. t_env = TableEnvironment.create(settings)
    5. # Show initially loaded and enabled modules
    6. t_env.list_modules()
    7. # +-------------+
    8. # | module name |
    9. # +-------------+
    10. # | core |
    11. # +-------------+
    12. t_env.list_full_modules()
    13. # +-------------+------+
    14. # | module name | used |
    15. # +-------------+------+
    16. # | core | true |
    17. # +-------------+------+
    18. # Load a hive module
    19. t_env.load_module("hive", HiveModule())
    20. # Show all enabled modules
    21. t_env.list_modules()
    22. # +-------------+
    23. # | module name |
    24. # +-------------+
    25. # | core |
    26. # | hive |
    27. # +-------------+
    28. # Show all loaded modules with both name and use status
    29. t_env.list_full_modules()
    30. # +-------------+------+
    31. # | module name | used |
    32. # +-------------+------+
    33. # | core | true |
    34. # | hive | true |
    35. # +-------------+------+
    36. # Change resolution order
    37. t_env.use_modules("hive", "core")
    38. t_env.list_modules()
    39. # +-------------+
    40. # | module name |
    41. # +-------------+
    42. # | hive |
    43. # | core |
    44. # +-------------+
    45. t_env.list_full_modules()
    46. # +-------------+------+
    47. # | module name | used |
    48. # +-------------+------+
    49. # | hive | true |
    50. # | core | true |
    51. # +-------------+------+
    52. # Disable core module
    53. t_env.use_modules("hive")
    54. t_env.list_modules()
    55. # +-------------+
    56. # | module name |
    57. # +-------------+
    58. # | hive |
    59. # +-------------+
    60. t_env.list_full_modules()
    61. # +-------------+-------+
    62. # | module name | used |
    63. # +-------------+-------+
    64. # | hive | true |
    65. # | core | false |
    66. # +-------------+-------+
    67. # Unload hive module
    68. t_env.unload_module("hive")
    69. t_env.list_modules()
    70. # Empty set
    71. t_env.list_full_modules()
    72. # +-------------+-------+
    73. # | module name | used |
    74. # +-------------+-------+
    75. # | hive | false |