""" Example DAG to demonstrate how to download a time series as a CSV file,
convert it to Parquet then upload it to Nuvolos. """
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
def export_to_parquet(series, start, end):
""" Downloads a time-series from St. Luis FRED and exports it as a Parquet file. """
import pandas_datareader as pdr
df = pdr.get_data_fred(series, start=datetime.strptime(start, "%Y-%m-%d"), end=datetime.strptime(end, "%Y-%m-%d"))
df.reset_index(inplace=True)
df.to_parquet("/files/fred_data.parquet")
from nuvolos import get_connection, to_sql
df = pd.read_parquet("/files/fred_data.parquet")
with get_connection(dbname=Variable.get("NUVOLOS_DATABASE", default_var=""),
schemaname=Variable.get("NUVOLOS_SCHEMA", default_var="master/development"),
username=Variable.get("NUVOLOS_USERNAME", default_var="<YOUR_USER>"),
password=Variable.get("NUVOLOS_PASSWORD", default_var="dummy")) as conn:
to_sql(df=df, name="fred_data", con=conn, if_exists='replace', index=False)
'depends_on_past': False,
'email_on_failure': False,
'retry_delay': timedelta(minutes=5),
default_args=default_args,
description='CSV upload to Nuvolos example DAG',
start_date=datetime(2021, 1, 1),
task_id='export_to_parquet',
python_callable=export_to_parquet,
t1.doc_md = """#### FRED data download
Downloads time-series data from FRED and saves them to /files/fred_data.parquet.
task_id='upload_to_nuvolos',
python_callable=upload_data
t2.doc_md = """#### Data upload to Nuvolos
Uses the [to_sql function](https://docs.nuvolos.cloud/data/upload-data-to-nuvolos#1.-python) of the Nuvolos connector to upload the data as a Nuvolos table.