Airflow
Nuvolos supports Airflow 2 as a self-service application
For researchers who require scheduled workflows, Nuvolos supports Airflow (2.2.1) as a self-service application. Airflow runs inside a JupyterLab application, making it easy to edit Airflow DAG files, install packages and use the Nuvolos filesystem for data processing.
The JupyterLab application is collaborative, so DAGs can be worked on simultaneously by multiple users in a "Google Docs"-like fashion.

Configuration

DAGs should be created as Python files in the /files/airflow/dags folder, refer to Airflow documentation for an example.

Setting up your first DAG

  1. 1.
    Create a new Python file named /files/airflow/dags/tutorial.py and copy the contents of the tutorial DAG from the Airflow tutorial.
  2. 2.
    Click on the Airflow tab and click on the All DAGs filter selector on the UI, the DAG should show up on the list like on the screenshot below. It can take up to a minute for the DAG to show up on the list, as Airflow is periodically scanning Python files the /files/airflow/dags folder for new DAG definitions.
  3. 3.
    Click on the slider toggle next to the tutorial DAG name to enable the DAG and start the first execution.
  4. 4.
    You should quickly see that the DAG has executed successfully by seeing a 1 in a green circle in the Runs column.
Airlfow Connections and Variables can be configured on the Airflow UI.
Airflow on Nuvolos uses a CeleryExecutor back-end to be able to execute tasks in parallel.

Installing packages

To install packages used in DAGs, simply open a JupyterLab terminal and pip / conda / mamba install the required package. Please refer to the Install a software package chapter of our documentation for detailed instructions.

Logs

Task execution, scheduler and DAG bag update logs are in /files/airflow/logs.

Saving data to Nuvolos

The following example illustrates how to create a DAG that downloads CSV data from an API, saves the data as a compressed Parquet file and uploads the data as a Nuvolos table.

Prerequisites

  1. 1.
    Create a new Airflow application in your working instance and start the application.
  2. 2.
    Once Airflow starts, open a new terminal tab and run the following commands to install package dependencies:
    1. 1.
      mamba install -y --freeze-installed -c conda-forge pandas-datareader
    2. 2.
      mamba install -y --freeze-installed -c conda-forge pyarrow
  3. 3.
    To be able to create Nuvolos tables and load data from the shared Airflow application, the following variables need to be set in Airflow, using values from the Connection Guide:
    1. 1.
      NUVOLOS_DATABASE
    2. 2.
      NUVOLOS_SCHEMA
    3. 3.
      NUVOLOS_USERNAME
    4. 4.
      NUVOLOS_PASSWORD
Add Airflow variables
Once the setup is complete, the following script should be saved as the file /files/airflow/dags/csv_to_nuvolos:
1
""" Example DAG to demonstrate how to download a time series as a CSV file,
2
convert it to Parquet then upload it to Nuvolos. """
3
from datetime import datetime, timedelta
4
5
from airflow import DAG
6
from airflow.operators.python_operator import PythonOperator
7
from airflow.models import Variable
8
9
10
def export_to_parquet(series, start, end):
11
""" Downloads a time-series from St. Luis FRED and exports it as a Parquet file. """
12
import pandas_datareader as pdr
13
14
df = pdr.get_data_fred(series, start=datetime.strptime(start, "%Y-%m-%d"), end=datetime.strptime(end, "%Y-%m-%d"))
15
df.reset_index(inplace=True)
16
df.to_parquet("/files/fred_data.parquet")
17
18
19
def upload_data():
20
from nuvolos import get_connection, to_sql
21
import pandas as pd
22
23
df = pd.read_parquet("/files/fred_data.parquet")
24
with get_connection(dbname=Variable.get("NUVOLOS_DATABASE", default_var=""),
25
schemaname=Variable.get("NUVOLOS_SCHEMA", default_var="master/development"),
26
username=Variable.get("NUVOLOS_USERNAME", default_var="<YOUR_USER>"),
27
password=Variable.get("NUVOLOS_PASSWORD", default_var="dummy")) as conn:
28
to_sql(df=df, name="fred_data", con=conn, if_exists='replace', index=False)
29
30
31
default_args = {
32
'owner': 'airflow',
33
'depends_on_past': False,
34
'email': ['[email protected]'],
35
'email_on_failure': False,
36
'email_on_retry': False,
37
'retries': 1,
38
'retry_delay': timedelta(minutes=5),
39
}
40
41
with DAG(
42
'csv_to_nuvolos',
43
default_args=default_args,
44
description='CSV upload to Nuvolos example DAG',
45
schedule_interval=None,
46
start_date=datetime(2021, 1, 1),
47
catchup=False,
48
tags=['example'],
49
) as dag:
50
51
t1 = PythonOperator(
52
task_id='export_to_parquet',
53
python_callable=export_to_parquet,
54
op_kwargs = {
55
"series" : ['TB3MS'],
56
"start" : "1934-01-01",
57
"end" : "2021-10-01"
58
},
59
)
60
t1.doc_md = """#### FRED data download
61
Downloads time-series data from FRED and saves them to /files/fred_data.parquet.
62
"""
63
64
t2 = PythonOperator(
65
task_id='upload_to_nuvolos',
66
python_callable=upload_data
67
)
68
t2.doc_md = """#### Data upload to Nuvolos
69
Uses the [to_sql function](https://docs.nuvolos.cloud/data/upload-data-to-nuvolos#1.-python) of the Nuvolos connector to upload the data as a Nuvolos table.
70
"""
71
t1 >> t2
Copied!
Save the file, a new DAG should show up within a couple of seconds on the Airflow tab. Click on the slider toggle next to the csv_to_nuvolos DAG name to enable the DAG:
Click on the blue "play" icon to trigger the execution of the DAG. Click on the name of the DAG to see the progress:
When all steps run to success, they show up dark green in Airflow. You can now check the resulting table in the Tables view:
Last modified 29d ago