from __future__ import annotations
from sqlalchemy import create_engine
import pendulum
import os
import pandas as pd
import logging
from airflow.decorators import dag, task
ppd_uri = 'http://publicplansdata.org/api/?a=data-download&q=QVariables&variables=ppd_id%2CPlanName%2Cfy%2Csystem_id%2CPlanFullName%2Csource_PlanBasics%2CFiscalYearType%2CPlanInceptionYear%2CPlanClosed%2CPlanYearClosed%2CAdministeringGovt%2CStateAbbrev%2CStateName%2CGovtName%2CPlanType%2CEmployeeTypeCovered%2CSocSecCovered%2CSocSecCovered_verbatim%2CCostStructure%2CEmployerType%2CCostSharing%2CBenefitsWebsite%2CEEGroupID%2CTierID%2CActRptDate%2Cfye%2CActCostMeth_GASB%2CAssetValMeth_GASB%2CFundingMeth_GASB%2CInflationAssumption_GASB%2CInvestmentReturnAssumption_GASB%2CActCostMethCode_GASB%2CAssetValMethCode_GASB%2CAssetSmoothingPeriod_GASB%2CFundingMethCode1_GASB%2CFundingMethCode2_GASB%2CUAALAmortPeriod_GASB%2CBlendedDiscountRate%2CAssetValMeth%2CPhaseIn%2CAssetValMeth_note%2CActCostMeth%2CActCostMeth_note%2CFundingMeth%2CFundingMeth_note%2CMktAssets_Smooth%2CActAssets_Smooth%2CNetFlows_smooth%2CAssetValMethCode%2CSmoothingReset%2CGainlossConcept%2CGainLossBase_1%2CGainLossBase_2%2CGainLoss%2CGainLossPeriod%2CPhaseInPercent%2CPhaseInPeriods%2CPhaseInType%2CGainLossRecognition%2CAssetSmoothingBaseline%2CExpectedReturnMethod%2CAddSubtractGainLoss%2CUpperCorridor%2CLowerCorridor%2CActCostMethCode%2CFundMethCode_1%2CFundMethCode_2%2CPayrollGrowthAssumption%2CTotAmortPeriod%2CRemainingAmortPeriod%2CUAALYearEstablished%2CWageInflation%2CActAssets_GASB%2CActLiabilities_GASB%2CActFundedRatio_GASB%2CUAAL_GASB%2CActLiabilities_other%2Cpayroll%2CRequiredContribution%2CPercentReqContPaid%2CTotalPensionLiability%2CNetPosition%2CNetPensionLiability%2CActAssets_est%2CActLiabilities_est%2CActFundedRatio_est%2CRequiredContribution_est%2CActFundedRatio_GASB67%2CInvestmentConsultantName%2CInvestmentConsultantCode%2CInvestmentReturn_1yr%2CInvestmentReturn_2yr%2CInvestmentReturn_3yr%2CInvestmentReturn_4yr%2CInvestmentReturn_5yr%2CInvestmentReturn_7yr%2CInvestmentReturn_8yr%2CInvestmentReturn_10yr%2CInvestmentReturn_12yr%2CInvestmentReturn_15yr%2CInvestmentReturn_20yr%2CInvestmentReturn_25yr%2CInvestmentReturn_30yr%2CInvestmentReturn_LongTerm%2CInvestmentReturn_LTStartYear%2CGrossReturns%2CGeoReturn_est%2CGeoGrowth_est%2CInvestmentReturn_1yr_est%2CInvestmentReturn_5yr_est%2CInvestmentReturn_10yr_est%2CAvgReturn_3yr%2CAvgReturn_5yr%2CAvgReturn_10yr%2Cexpense_SecLendMgmtFees%2Ccontrib_EE_regular%2Ccontrib_ER_regular%2Ccontrib_ER_state%2Ccontrib_EE_PurchaseService%2Ccontrib_EE_other%2Ccontrib_ER_other%2Ccontrib_other%2CPOB_Flag%2CPOB_Amount%2Ccontrib_tot%2CFairValueChange_investments%2CFairValueChange_RealEstate%2Cincome_interest%2Cincome_dividends%2Cincome_InterestAndDividends%2Cincome_RealEstate%2Cincome_PrivateEquity%2Cincome_alternatives%2Cincome_international%2Cincome_OtherInvestments%2Cexpense_RealEstate%2Cexpense_PrivateEquity%2Cexpense_alternatives%2Cexpense_OtherInvestments%2Cexpense_investments%2CFVChange_SecLend%2Cincome_SecuritiesLending%2Cexpense_SecuritiesLending%2Cincome_SecuritiesLendingRebate%2CFVChange_SecLend_UG%2Cincome_OtherAdditions%2Cincome_net%2Cexpense_TotBenefits%2Cexpense_RetBenefits%2Cexpense_DisabilityBenefits%2Cexpense_DeathBenefits%2Cexpense_DROPBenefits%2Cexpense_SurvivorBenefits%2Cexpense_COLABenefits%2Cexpense_LumpSumBenefits%2Cexpense_OtherBenefits%2Cexpense_refunds%2Cexpense_AdminExpenses%2Cexpense_Depreciation%2Cexpense_OtherDeductions%2Cexpense_net%2Cadjustment_MktAssets%2CMktAssets_net%2CBegMktAssets_net%2Ccontrib_ER_tot%2Ccontrib_other_tot%2CFairValueChange_tot%2Cincome_interest_dividends_tot%2Cexpense_investments_tot%2CSecLend_tot%2Cinvestments_net%2Cexpense_other_tot%2CContributionFY%2CActuarialFirm%2CActuarialFirmCode%2CNormCostRate_tot%2CNormCostRate_EE%2CNormCostRate_ER%2CReqContRate_ER%2CReqContRate_tot%2CReqContRate_ER_Stat%2CReqContRate_tot_Stat%2CNormCostAmount_tot%2CNormCostAmount_EE%2CNormCostAmount_ER%2CReqContAmount_ER%2CReqContAmount_tot%2CReqContAmount_ER_Stat%2CReqContAmount_tot_Stat%2CNormCostRate_tot_est%2CNormCostRate_EE_est%2CNormCostRate_ER_est%2CReqContRate_ER_est%2CProjectedPayroll%2CUAALRate%2Cbeneficiaries_DisabilityRetirees%2Cbeneficiaries_DependentSurvivors%2Cactives_tot%2CActiveSalaries%2CActiveAge_avg%2CActiveTenure_avg%2CActiveSalary_avg%2CInactiveVestedMembers%2CInactiveNonVested%2Cbeneficiaries_tot%2Cbenefits_tot%2CBeneficiaryAge_avg%2CBeneficiaryBenefit_avg%2Cbeneficiaries_ServiceRetirees%2Cbenefits_ServiceRetirees%2CServiceRetireeAge_avg%2CServiceRetireeBenefit_avg%2CServiceRetAge_avg%2CServiceRetTenure_avg%2Cbenefits_DisabilityRetirees%2Cbeneficiaries_survivors%2Cbeneficiaries_SpousalSurvivors%2Cbeneficiaries_other%2CDROPMembers%2COtherMembers%2CTotMembership%2CActiveSalary_avg_est%2CBeneficiaryBenefit_avg_est%2CPVFB_InactiveNonVested%2CPVFB_active%2CPVFB_InactiveVested%2CPVFB_retiree%2CPVFB_other%2CPVFB_tot%2CPVFNC_tot%2CPVFNC_EE%2CPVFNC_ER%2CPVFS%2CMktAssets_ActRpt%2CActAssets_AVA%2CActLiabilities_EAN%2CActLiabilities_PUC%2CTotFund_BnchmrkRtrn%2CLeverage_Flag%2CLeverage_Total_Actl%2CLeverage_Total_Trgt%2CEQTotal_Rtrn%2CEQTotal_Actl%2CEQTotal_Trgt%2CFITotal_Rtrn%2CFITotal_Actl%2CFITotal_Trgt%2CRETotal_Rtrn%2CRETotal_Actl%2CRETotal_Trgt%2CAltMiscTotal_Rtrn%2CAltMiscTotal_Actl%2CAltMiscTotal_Trgt%2CPETotal_Rtrn%2CPETotal_Actl%2CPETotal_Trgt%2CHFTotal_Rtrn%2CHFTotal_Actl%2CHFTotal_Trgt%2CCOMDTotal_Rtrn%2CCOMDTotal_Actl%2CCOMDTotal_Trgt%2CCashTotal_Rtrn%2CCashTotal_Actl%2CCashTotal_Trgt%2COtherTotal_Rtrn%2COtherTotal_Actl%2COtherTotal_Trgt&filtereegroupid=0&filtertierid=0&filename=ppd-data-latest';
@dag(
schedule=None,
start_date=pendulum.datetime(2023, 4, 9, tz="US/Pacific"),
catchup=False,
tags=['project'],
)
def public_pensions_data():
@task()
def etl():
df = pd.read_csv(ppd_uri, encoding = 'unicode_escape', engine ='python')
logging.info(df.info)
table_name = 'public_pensions'
dbpasswd = os.environ['SHARED_PASSWORD']
engine = create_engine(f'postgresql://shared:{dbpasswd}@postgres:5432/shared')
df.to_sql(table_name,con=engine,index=False,if_exists='replace')
etl()
public_pensions_data()