S&P 500 Screener Project¶

Python: ETL of S&P500 financials¶

  • Data Source: Yahoo Finance
  • Import Tool: Yfinance
  • Export Option: SQL database
  • Data Transformations: drop /edit cols, create features for regression analysis
  • Future changes: Add portfolio risk analysis, Agg. economic health indicators
In [2]:
import pandas as pd
import numpy as np
import yfinance as yf
from tqdm import notebook
from bs4 import BeautifulSoup
from datetime import timedelta
import requests
import os
In [3]:
# Set current directory
os.chdir('/home/jovyan/work')

# Pandas settings
pd.set_option("display.max_columns", None)

# Constants
tickers_url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
In [4]:
# Fetch S&P 500 ticker list
def read_tickers():    
    page = requests.get(tickers_url)
    soup = BeautifulSoup(page.content, 'html.parser')
    table = soup.find("table", {"class": "wikitable sortable"})
    
    # Fail now if we haven't found the right table
    header = table.findAll("th")
    if header[0].text.rstrip() != "Symbol" or header[1].string != "Security":
        raise Exception("Can't parse Wikipedia's table!")
    
    ticker_arr = []
    rows = table.findAll("tr")
    for row in rows:
        fields = row.findAll("td")
        if fields:
            ticker = fields[0].text.rstrip().replace('.','-')
            ticker_arr.append(ticker)

    ticker_str = ' '.join(ticker_arr)

    return ticker_str, ticker_arr
In [4]:
# read tickers and init yahoo finance scraper
tickers, ticker_arr = read_tickers()
sp500 = yf.Tickers(tickers)

ticker_range = int(len(ticker_arr))
In [5]:
# Extract

def extract_info(yfd):
    df = pd.DataFrame()
    
    for i in notebook.tqdm(range(ticker_range), total=ticker_range):
        data = yfd.tickers.get(ticker_arr[i]).info
        partial = pd.json_normalize(data, max_level=1)
        df = pd.concat([df,partial], ignore_index=True, sort=False)
    print('Load company info data successful')
    return df

def transform_helper(partial,i,T=True):
    if T:
        partial = partial.T
    partial.reset_index(inplace=True)
    partial.columns = ['Date', *partial.columns[1:]]
    partial['Ticker'] = ticker_arr[i]

    try:
        partial['Date'] = pd.to_datetime(partial['Date']).dt.date
    except:
        partial['Date'] = np.nan
    return partial

def get_income(yfd):
    df = pd.DataFrame()

    for i in notebook.tqdm(range(ticker_range), total=ticker_range):
        org = yfd.tickers.get(ticker_arr[i]).financials
        partial = transform_helper(org.copy(),i)
        df = pd.concat([df,partial], ignore_index=True, sort=False)
    print('Load company income statement data successful')
    return df

def get_balance_sheet(yfd):
    df = pd.DataFrame()

    for i in notebook.tqdm(range(ticker_range), total=ticker_range):
        org = yfd.tickers.get(ticker_arr[i]).balance_sheet
        partial = transform_helper(org.copy(),i)
        df = pd.concat([df,partial], ignore_index=True, sort=False)
    print('Load company balance sheet data successful')
    return df

def get_cash_flow(yfd):
    df = pd.DataFrame()

    for i in notebook.tqdm(range(ticker_range), total=ticker_range):
        org = yfd.tickers.get(ticker_arr[i]).cashflow
        partial = transform_helper(org.copy(),i)
        df = pd.concat([df,partial], ignore_index=True, sort=False)
    print('Load company cashflow statement data successful')
    return df

def extract_financials(yfd):
    df_pnl = get_income(yfd)
    df_bs = get_balance_sheet(yfd)
    df_cf = get_cash_flow(yfd)
    return df_pnl, df_bs, df_cf

def extract_recommendations(yfd):
    df = pd.DataFrame()

    for i in notebook.tqdm(range(ticker_range), total=ticker_range):
        org = yfd.tickers.get(ticker_arr[i]).recommendations
        partial = transform_helper(org.copy(),i,T=False)
        twoQs = pd.to_datetime('today').date() - timedelta(weeks=26)
        partial = partial[partial['Date'] > twoQs]
        df = pd.concat([df,partial],ignore_index=True, sort=False)
    print('Load company recommendation data successful')
    return df

   
In [6]:
df1 = extract_info(sp500)
  0%|          | 0/502 [00:00<?, ?it/s]
Load company info data successful
In [7]:
df2, df3, df4 = extract_financials(sp500)
  0%|          | 0/502 [00:00<?, ?it/s]
Load company income statement data successful
  0%|          | 0/502 [00:00<?, ?it/s]
Load company balance sheet data successful
  0%|          | 0/502 [00:00<?, ?it/s]
Load company cashflow statement data successful
In [8]:
df5 = extract_recommendations(sp500)
  0%|          | 0/502 [00:00<?, ?it/s]
Load company recommendation data successful
In [10]:
# Transform

# drop empty or unecessary cols
null_cols = df1.isna().sum() >500
drop_cols = null_cols.loc[lambda x : x == True]
df1.drop(columns=drop_cols.index.array, inplace=True)
df1.drop(columns=['maxAge','fax','quoteType',
                  # '52WeekChange', 'SandP52WeekChange', 'impliedSharesOutstanding', 'trailingPegRatio'
                 'tradeable','address2','companyOfficers','priceHint',
                 'financialCurrency','exchangeTimezoneName','exchangeTimezoneShortName',
                 'market','uuid','targetLowPrice','recommendationKey','targetMedianPrice',
                 'numberOfAnalystOpinions','targetMeanPrice','targetHighPrice','recommendationMean',
                 'gmtOffSetMilliseconds'], inplace=True)
print("df1 cleaning successful")
df1 cleaning successful
In [12]:
# run any test now
redo_arr = df4[df4['Date'].isna()]['Ticker'].unique().tolist()
if len(redo_arr) > 0:
    redo = ' '.join(redo_arr)
    missing = yf.Tickers(redo)
    ticker_arr = redo_arr
    ticker_range = int(len(redo_arr))
    
    df2r, df3r, df4r = extract_financials(missing)
    
    df2 = df2[df2['Ticker'].apply(lambda x: x not in redo_arr)]
    df2 = pd.concat([df2,df2r],ignore_index=True, sort=False)
    df3 = df3[df3['Ticker'].apply(lambda x: x not in redo_arr)]
    df3 = pd.concat([df3,df3r],ignore_index=True, sort=False)
    df4 = df4[df4['Ticker'].apply(lambda x: x not in redo_arr)]
    df4 = pd.concat([df4,df4r],ignore_index=True, sort=False)
  0%|          | 0/4 [00:00<?, ?it/s]
Load company income statement data successful
  0%|          | 0/4 [00:00<?, ?it/s]
Load company balance sheet data successful
  0%|          | 0/4 [00:00<?, ?it/s]
Load company cashflow statement data successful
In [17]:
# Last check before sql import
m1 = len(df2[df2['Date'].isna()]['Ticker'].unique().tolist())>0
m2 = len(df3[df3['Date'].isna()]['Ticker'].unique().tolist())>0
m3 = len(df4[df4['Date'].isna()]['Ticker'].unique().tolist())>0
m4 = len(df5['Ticker'].unique().tolist()) == len(ticker_arr)
if not(m1+m2+m3+m4):
    print('Dataset is complete')
else:
    print('Dataset is not complete')
Dataset is complete
In [22]:
from sqlalchemy import create_engine
engine = create_engine('postgresql://postgres:headband@192.168.0.151:5432/mydb')
In [23]:
# Load
def update_server(table_name,dataframe):  
    try:
        dataframe.to_sql(table_name,con=engine,index=False,if_exists='replace')
        print(f'Successfully imported {table_name} to server')
    except Exception as err:
        print('Got an error while updating server')
        print('Error: ', str(err))
In [24]:
dfs = [df1,df2,df3,df4,df5]
sql_tables = ["Info","Income","BalanceSheet","CashFlow","Recommendation"]

for i in range(len(dfs)):
    update_server(f"SP500{sql_tables[i]}",dfs[i])
Successfully imported SP500Info to server
Successfully imported SP500Income to server
Successfully imported SP500BalanceSheet to server
Successfully imported SP500CashFlow to server
Successfully imported SP500Recommendation to server