from __future__ import annotations
from sqlalchemy import create_engine
import pendulum
import os
import pandas as pd
import logging
from airflow.decorators import dag, task
crash_uri = 'https://gisdata-csj.opendata.arcgis.com/datasets/CSJ::crash-locations.csv?outSR=%7B%22latestWkid%22%3A2227%2C%22wkid%22%3A102643%7D';
vehicles_uri = 'https://gisdata-csj.opendata.arcgis.com/datasets/CSJ::crash-vehicles-involved.csv?outSR=%7B%22latestWkid%22%3A2227%2C%22wkid%22%3A102643%7D';
safety_corridors_uri = 'https://gisdata-csj.opendata.arcgis.com/datasets/CSJ::vision-zero-safety-corridors.csv?outSR=%7B%22latestWkid%22%3A2227%2C%22wkid%22%3A102643%7D';
@dag(
schedule=None,
start_date=pendulum.datetime(2022, 11, 10, tz="US/Pacific"),
catchup=False,
tags=['project'],
)
def vision_zero_crash():
@task()
def etl():
df = pd.read_csv(crash_uri)
logging.info(df.info)
table_name = 'vision_zero_crash'
dbpasswd = os.environ['SHARED_PASSWORD']
engine = create_engine(f'postgresql://shared:{dbpasswd}@postgres:5432/shared')
df.to_sql(table_name,con=engine,index=False,if_exists='replace')
etl()
vision_zero_crash()
@dag(
schedule=None,
start_date=pendulum.datetime(2022, 11, 10, tz="US/Pacific"),
catchup=False,
tags=['project'],
)
def vision_zero_vehicles():
@task()
def etl():
df = pd.read_csv(vehicles_uri)
logging.info(df.info)
table_name = 'vision_zero_vehicles'
dbpasswd = os.environ['SHARED_PASSWORD']
engine = create_engine(f'postgresql://shared:{dbpasswd}@postgres:5432/shared')
df.to_sql(table_name,con=engine,index=False,if_exists='replace')
etl()
vision_zero_vehicles()
@dag(
schedule=None,
start_date=pendulum.datetime(2022, 11, 10, tz="US/Pacific"),
catchup=False,
tags=['project'],
)
def vision_zero_safety_corridors():
@task()
def etl():
df = pd.read_csv(safety_corridors_uri)
logging.info(df.info)
table_name = 'vision_zero_safety_corridors'
dbpasswd = os.environ['SHARED_PASSWORD']
engine = create_engine(f'postgresql://shared:{dbpasswd}@postgres:5432/shared')
df.to_sql(table_name,con=engine,index=False,if_exists='replace')
etl()
vision_zero_safety_corridors()