Code are on github

Summary

Apache Airflow is Python based workflow scheduler. You have to know Python to use it. I will focus on how writing a DAG, the workflow definition in Python. Then I will adapt the code into an Airflow plugin to make the DAG file more readable.

References:

DAGS

The Python workflow definition files are stored in a subdirectory defined in airflow.cfg, by default, it’s named “dags”. At Airflow startup, it will fill its “DagBag” from the files in the directory.

The test DAG: complete source code

default_args = {'start_date': datetime.datetime(2019, 4, 11),
                'retries': 2,
                'retry_delay': datetime.timedelta(minutes=2),
                'email': [],
                'email_on_failure': True}


dag = DAG('discover',
          default_args=default_args,
          schedule_interval='30 */12 * * *',
          catchup=False
          )

with dag:
    task_fetch = PythonOperator(task_id='fetch_data',
                               python_callable=fetch,
                               op_args = [flickr_url, json_file],
                            #    op_kwargs = {},
                               provide_context=True)
    # 
    task_load = PythonOperator(task_id='load_data',
                               python_callable=load_db,
                               op_args = [db_file, json_file],
                               provide_context=True)
    # set up load as fetch's downstream task
    task_fetch >> task_load

Creating an Apache Airflow plugin:

Plugins are used to better organize and re-use code. documentation

complete source code

class FlickrOperator(BaseOperator):
    """
    A test operator to do an http request and save the results in a Sqlite db.
    :param flickr_url: url
    :type flickr_url: string
    :param sqlite_db_file: sqlite db file name
    :type sqlite_db_file: string
    """
    pass


# Defining the required plugin class
class FlickrPlugin(AirflowPlugin):
    name = "flickr_plugin"
    operators = [FlickrOperator]
    flask_blueprints = []
    hooks = []
    executors = []
    admin_views = []
    menu_links = []

Testing the plugin and DAGS

Clone my repo: git clone https://github.com/brunoyin/test-drive.git

  1. git clone https://github.com/puckel/docker-airflow.git
  2. copy dags/fetch_load.py to the dags directory
  3. copy plugins/flickr_dag.py to dags directory
  4. copy plugins/flickrplugin.py to plugins directory
  5. Edit docker-compose-LocalExecutor.yml to load plugins, run: docker-compose -f docker-compose-LocalExecutor.yml up -d
  6. use a browser to open linux-host-ip:8080
  7. locate “discover” dag, toggle “On”, then click trigger to run

docker-compose-LocalExecutor.yml:

version: '2.1'
services:
    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow

    webserver:
        image: puckel/docker-airflow:1.10.2
        restart: always
        depends_on:
            - postgres
        environment:
            - LOAD_EX=n
            - EXECUTOR=Local
        volumes:
            - ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3