In [ ]:
from __future__ import annotations


from sqlalchemy import create_engine
import pendulum
import os
import pandas as pd
import logging

from airflow.decorators import dag, task


crash_uri = 'https://gisdata-csj.opendata.arcgis.com/datasets/CSJ::crash-locations.csv?outSR=%7B%22latestWkid%22%3A2227%2C%22wkid%22%3A102643%7D';
vehicles_uri = 'https://gisdata-csj.opendata.arcgis.com/datasets/CSJ::crash-vehicles-involved.csv?outSR=%7B%22latestWkid%22%3A2227%2C%22wkid%22%3A102643%7D';
safety_corridors_uri = 'https://gisdata-csj.opendata.arcgis.com/datasets/CSJ::vision-zero-safety-corridors.csv?outSR=%7B%22latestWkid%22%3A2227%2C%22wkid%22%3A102643%7D';


@dag(
    schedule=None,
    start_date=pendulum.datetime(2022, 11, 10, tz="US/Pacific"),
    catchup=False,
    tags=['project'],
)
def vision_zero_crash():
    
    @task()
    def etl():
        df = pd.read_csv(crash_uri)
        logging.info(df.info)
        table_name = 'vision_zero_crash'
        dbpasswd = os.environ['SHARED_PASSWORD']
        engine = create_engine(f'postgresql://shared:{dbpasswd}@postgres:5432/shared')
        df.to_sql(table_name,con=engine,index=False,if_exists='replace')

    etl()

vision_zero_crash()



@dag(
    schedule=None,
    start_date=pendulum.datetime(2022, 11, 10, tz="US/Pacific"),
    catchup=False,
    tags=['project'],
)
def vision_zero_vehicles():
    
    @task()
    def etl():
        df = pd.read_csv(vehicles_uri)
        logging.info(df.info)
        table_name = 'vision_zero_vehicles'
        dbpasswd = os.environ['SHARED_PASSWORD']
        engine = create_engine(f'postgresql://shared:{dbpasswd}@postgres:5432/shared')
        df.to_sql(table_name,con=engine,index=False,if_exists='replace')

    etl()
vision_zero_vehicles()



@dag(
    schedule=None,
    start_date=pendulum.datetime(2022, 11, 10, tz="US/Pacific"),
    catchup=False,
    tags=['project'],
)
def vision_zero_safety_corridors():
    
    @task()
    def etl():
        df = pd.read_csv(safety_corridors_uri)
        logging.info(df.info)
        table_name = 'vision_zero_safety_corridors'
        dbpasswd = os.environ['SHARED_PASSWORD']
        engine = create_engine(f'postgresql://shared:{dbpasswd}@postgres:5432/shared')
        df.to_sql(table_name,con=engine,index=False,if_exists='replace')

    etl()

vision_zero_safety_corridors()