插件

    Airflow内置了一个简单的插件管理器,可以通过简单地删除文件夹中的文件,将外部功能集成到其核心。

    plugins文件夹中的python模块将被导入,钩子操作符传感器执行器和Web视图将集成到Airflow的主要集合中,并可供使用。

    插件可以简便地用作编写,共享和激活新功能集。

    当然还需要一组更复杂的应用程序来与不同风格的数据和元数据进行交互。

    • 一组用于解析Hive日志和公开Hive元数据(CPU/IO/阶段/倾斜/…)的工具
    • 异常检测框架,允许人们收集指标,设置阈值和警报
    • 审计工具,帮助了解谁访问了什么
    • 配置驱动的SLA监控工具,允许您设置受监控的表以及应该在何时着陆,提醒人员并公开停机的可视化

    Airflow有许多组件可以在构建应用程序时重用:

    • 可用于呈现视图的Web服务器
    • 用于存储模型的元数据数据库
    • 访问您的数据库,以及如何连接到它们
    • 应用程序可以将工作负载推送到的一组Workers
    • 部署了Airflow,您可以专注于后面的工作
    • 基本的图表功能,底层库和抽象

    要创建插件,您需要派生airflow.plugins_manager.AirflowPlugin类并引用要插入Airflow的对象。以下是类似您需要派生的类:

    1. # This is the class you derive to create a plugin
    2. from airflow.plugins_manager import AirflowPlugin
    3. from flask import Blueprint
    4. from flask_admin import BaseView, expose
    5. from flask_admin.base import MenuLink
    6. # Importing base classes that we need to derive
    7. from airflow.hooks.base_hook import BaseHook
    8. from airflow.models import BaseOperator
    9. from airflow.sensors.base_sensor_operator import BaseSensorOperator
    10. from airflow.executors.base_executor import BaseExecutor
    11. # Will show up under airflow.hooks.test_plugin.PluginHook
    12. class PluginHook(BaseHook):
    13. pass
    14. # Will show up under airflow.operators.test_plugin.PluginOperator
    15. class PluginOperator(BaseOperator):
    16. pass
    17. # Will show up under airflow.sensors.test_plugin.PluginSensorOperator
    18. class PluginSensorOperator(BaseSensorOperator):
    19. pass
    20. class PluginExecutor(BaseExecutor):
    21. pass
    22. # Will show up under airflow.macros.test_plugin.plugin_macro
    23. def plugin_macro():
    24. pass
    25. # Creating a flask admin BaseView
    26. class TestView(BaseView):
    27. @expose('/')
    28. def test(self):
    29. # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
    30. return self.render("test_plugin/test.html", content="Hello galaxy!")
    31. v = TestView(category="Test Plugin", name="Test View")
    32. # Creating a flask blueprint to integrate the templates and static folder
    33. bp = Blueprint(
    34. "test_plugin", __name__,
    35. template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
    36. static_folder='static',
    37. static_url_path='/static/test_plugin')
    38. ml = MenuLink(
    39. name='Test Menu Link',
    40. url='https://airflow.incubator.apache.org/')
    41. class TestAppBuilderBaseView(AppBuilderBaseView):
    42. @expose("/")
    43. def test(self):
    44. return self.render("test_plugin/test.html", content="Hello galaxy!")
    45. v_appbuilder_view = TestAppBuilderBaseView()
    46. v_appbuilder_package = {"name": "Test View",
    47. "category": "Test Plugin",
    48. "view": v_appbuilder_view}
    49. # Creating a flask appbuilder Menu Item
    50. appbuilder_mitem = {"name": "Google",
    51. "category": "Search",
    52. "category_icon": "fa-th",
    53. "href": "https://www.google.com"}
    54. # Defining the plugin class
    55. class AirflowTestPlugin(AirflowPlugin):
    56. name = "test_plugin"
    57. operators = [PluginOperator]
    58. sensors = [PluginSensorOperator]
    59. hooks = [PluginHook]
    60. executors = [PluginExecutor]
    61. macros = [plugin_macro]
    62. admin_views = [v]
    63. flask_blueprints = [bp]
    64. menu_links = [ml]
    65. appbuilder_views = [v_appbuilder_package]

    Airflow 1.10使用FlaskAppBuilder引入了基于角色的视图。您可以通过设置rbac = True来配置使用的UI。为了支持两个版本的UI的插件视图和链接并保持向后兼容性,请将字段appbuilder_views和appbuilder_menu_items添加到AirflowTestPlugin类中。