Summary
Apache Airflow is Python based workflow scheduler. You have to know Python to use it. I will focus on how writing a DAG, the workflow definition in Python. Then I will adapt the code into an Airflow plugin to make the DAG file more readable.
References:
DAGS
The Python workflow definition files are stored in a subdirectory defined in airflow.cfg, by default, it’s named “dags”. At Airflow startup, it will fill its “DagBag” from the files in the directory.
The test DAG: complete source code
default_args = {'start_date': datetime.datetime(2019, 4, 11),
'retries': 2,
'retry_delay': datetime.timedelta(minutes=2),
'email': [],
'email_on_failure': True}
dag = DAG('discover',
default_args=default_args,
schedule_interval='30 */12 * * *',
catchup=False
)
with dag:
task_fetch = PythonOperator(task_id='fetch_data',
python_callable=fetch,
op_args = [flickr_url, json_file],
# op_kwargs = {},
provide_context=True)
#
task_load = PythonOperator(task_id='load_data',
python_callable=load_db,
op_args = [db_file, json_file],
provide_context=True)
# set up load as fetch's downstream task
task_fetch >> task_load
Creating an Apache Airflow plugin:
Plugins are used to better organize and re-use code. documentation
class FlickrOperator(BaseOperator):
"""
A test operator to do an http request and save the results in a Sqlite db.
:param flickr_url: url
:type flickr_url: string
:param sqlite_db_file: sqlite db file name
:type sqlite_db_file: string
"""
pass
# Defining the required plugin class
class FlickrPlugin(AirflowPlugin):
name = "flickr_plugin"
operators = [FlickrOperator]
flask_blueprints = []
hooks = []
executors = []
admin_views = []
menu_links = []
Testing the plugin and DAGS
Clone my repo: git clone https://github.com/brunoyin/test-drive.git
- git clone https://github.com/puckel/docker-airflow.git
- copy dags/fetch_load.py to the dags directory
- copy plugins/flickr_dag.py to dags directory
- copy plugins/flickrplugin.py to plugins directory
- Edit docker-compose-LocalExecutor.yml to load plugins, run: docker-compose -f docker-compose-LocalExecutor.yml up -d
- use a browser to open linux-host-ip:8080
- locate “discover” dag, toggle “On”, then click trigger to run
docker-compose-LocalExecutor.yml:
version: '2.1'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
webserver:
image: puckel/docker-airflow:1.10.2
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
- ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3