(Seattle, WA) Distressed Property Dashboard Project¶

Python: Data Extraction¶

  • Data Sources: King County Records, Assessor Parcel Website
  • Dataset: Custom
  • Export Option: SQL database
  • Data Transformations: Concat multiple fields, remove unknown columns
  • Future changes: Add historical trend column
In [53]:
# Imports
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By

import pandas as pd
import numpy as np
import time
import math
from re import findall, sub
from datetime import date, timedelta, datetime
from geopy.geocoders import Nominatim
from bs4 import BeautifulSoup
import requests
import os
from tqdm import notebook
import hashlib
In [54]:
# Set current directory
os.chdir('/home/jovyan/work')

# Setup chromedriver
options = webdriver.ChromeOptions()
options.add_argument('headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')

# Set constants
records_url = "https://recordsearch.kingcounty.gov/LandmarkWeb/search/index?theme=.blue&section=searchCriteriaDocuments&quickSearchSelection="
pid_url = 'https://blue.kingcounty.com/Assessor/eRealProperty/Detail.aspx?ParcelNbr='
In [55]:
# Get driver
def driver_open(url=records_url):
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()),options=options)
    driver.get(url)
    driver.set_window_size(1200,850)
    return driver

# Get record for given time period
def get_records_from_dates(beg,end,n_iter,driver):
    if n_iter == 0:
        li_doctype = driver.find_element('id','searchCriteriaDocuments-tab')
        li_doctype.click()
        time.sleep(3)

        text_area = driver.find_element('id','documentType-DocumentType')
        text_area.send_keys("NTS")

    b_date = driver.find_element('id','beginDate-DocumentType')
    b_date.clear()
    b_date.send_keys(beg)

    e_date = driver.find_element('id','endDate-DocumentType')
    e_date.clear()
    e_date.send_keys(end)

    submit = driver.find_element('id','submit-DocumentType')
    submit.click()
    time.sleep(3)
    if n_iter == 0:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        driver.save_screenshot(f'tmp/screenshot.png')

    df = pd.read_html(driver.page_source,skiprows=0)[-2]
    df = df.drop(columns=['#','Unnamed: 1', 'Unnamed: 2', 
                        'Unnamed: 4', 'Book', 'Book Type',
                        'DocLinks.1','Unnamed: 15', 'Page'])
    return df
  
# Get complete dataframe
def fetch_records(n_weeks):
    date_format = "%m/%d/%Y"
    x = datetime.today()
    df = pd.DataFrame()
    dates = []

    for i in range(n_weeks):
        x = x - timedelta(days=1)
        end = x - timedelta(weeks=1)
        dates.append(x.strftime(date_format)) 
        dates.append(end.strftime(date_format)) 
        x = end
    dates_reversed = dates[::-1]
    print('Looking up records for the week(s) of',', '.join(dates_reversed[::2]))
    driver = driver_open()
    n_dates = int(len(dates)/2)
    
    print('Fetching records now')
    for i in notebook.tqdm(range(n_dates), total=n_dates):
        file_name = "{:02}".format(i)
        partial = get_records_from_dates(dates[i*2+1],dates[i*2], i, driver)
        df = pd.concat([df,partial], ignore_index=True, sort=False)

    driver.quit()
    return df

# Get Address and Appraised Value
def add_details_cols(pids):
    address_list = []
    appraised_list = []

    print('Adding parcel info')
    for pid in notebook.tqdm(pids, total=len(pids)):
        url = pid_url + pid
        page = requests.get(url)
        soup = BeautifulSoup(page.content, 'html.parser')
        try:
            address_cell = soup.find('table',id='cphContent_DetailsViewParcel').find_all('td')[5]
            address_list.append(address_cell.text)
            appraise_cell = soup.find('table',id='cphContent_GridViewTaxRoll').find_all('td')[7]
            appraised_list.append(appraise_cell.text)
        except:
            address_list.append(None)
            appraised_list.append(None)
    
        time.sleep(1)
    return pd.Series(address_list), pd.Series(appraised_list)

# Get Location
def add_location_cols(addresses):
    lat = []
    lon = []
    hood = []
    metro = []
    geolocator = Nominatim(user_agent="KingCounty")

    print('Adding location info')
    try:  
        for address in notebook.tqdm(addresses, total=len(addresses)):
            try:
                location = geolocator.geocode(str(address) + ', WA')
            except:
                location = None
            if location and location.raw and (len(location.raw['display_name'].split(', ')) > 2):
                lat.append(location.latitude)
                lon.append(location.longitude)
                address = location.raw['display_name'].split(', ')
                hood.append(address[2])
                metro.append(address[3])

            else:
                lat.append(None)
                lon.append(None)
                hood.append(None)
                metro.append(None)

        time.sleep(1.5)
    except ValueError as e:
        print('An error occurred: {}\n{}'.format(str(e),location))

    return pd.Series(lat), pd.Series(lon) , pd.Series(hood), pd.Series(metro)

# Add PID, Address, Apprased Value to dataframe
def transform(df):
    # Apply hashing function to the column
    df['Grantor'] = df['Grantor'].apply(lambda x: hashlib.md5(x.encode()).hexdigest())
    df['PID'] = df['Legal'].apply(lambda x: str(x.strip().split(" ")[1]) if x.find('PID') != -1 else '')
    df.drop(columns=['Legal'], inplace=True)
    df['Address'], df['Appraised Value'] = add_details_cols(list(df['PID']))
    df['Latitude'], df['Longitude'], df['Neighborhood'], df['Metro'] = add_location_cols(df['Address'])
    
    return df


# check if local file is older than 1 week
def isWeek_old(filename):
    epoch = os.path.getmtime(f'data/{filename}.csv')
    org = date.fromtimestamp(epoch)
    is_old = org + timedelta(weeks=1) < date.today()
    return is_old

#  Load Date
def load(filename, n_weeks=1):
    try:
        df = fetch_records(n_weeks)
        df = transform(df)
        if df.empty:
            raise Exception("Dataframe appears to be empty, please try again later...")
        
        org_rows = df.shape[0]
        df.drop_duplicates(subset=['Address'], inplace=True)
        df.dropna(inplace=True)
        print(f'{org_rows - df.shape[0]} were deleted')
        
        df.to_csv(f'data/{filename}.csv') 
        print(f'Save to file: data/{filename}.csv')

        return df
    except Exception as err:
        print(err)

# Cache dataset to local disk or reload and save if older than 1 week
def refresh_data(filename):
    # init data fetch first time
    if f'{filename}.csv' not in os.listdir('data'):
        df = load(filename, n_weeks=12)
    
    # refresh data if old
    elif isWeek_old(filename):
        date_format = "%m/%d/%Y"
        df = pd.read_csv(f'data/{filename}.csv',index_col='Unnamed: 0')
        last_update = df['Record Date'].max()
        start = datetime.strptime(last_update,date_format)
        x = datetime.today()
        diff = x - start
        n_weeks = math.ceil(diff.days/7)
        df = load(filename, n_weeks=n_weeks)
        
    else:
        df = pd.read_csv(f'data/{filename}.csv',index_col='Unnamed: 0')
        
    return df
In [56]:
df = refresh_data('seattle-distressed-properties')
Looking up records for the week(s) of 06/20/2022, 06/28/2022, 07/06/2022, 07/14/2022, 07/22/2022, 07/30/2022, 08/07/2022, 08/15/2022, 08/23/2022, 08/31/2022, 09/08/2022, 09/16/2022
Fetching records now
  0%|          | 0/12 [00:00<?, ?it/s]
Adding parcel info
  0%|          | 0/170 [00:00<?, ?it/s]
Adding location info
  0%|          | 0/170 [00:00<?, ?it/s]
36 were deleted
Save to file: data/seattle-distressed-properties.csv
In [62]:
df.info()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 134 entries, 0 to 169
Data columns (total 14 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Status           134 non-null    object 
 1   Grantor          134 non-null    object 
 2   Grantee          134 non-null    object 
 3   Record Date      134 non-null    object 
 4   Doc Type         134 non-null    object 
 5   Rec. #           134 non-null    int64  
 6   DocLinks         134 non-null    object 
 7   PID              134 non-null    object 
 8   Address          134 non-null    object 
 9   Appraised Value  134 non-null    object 
 10  Latitude         134 non-null    float64
 11  Longitude        134 non-null    float64
 12  Neighborhood     134 non-null    object 
 13  Metro            134 non-null    object 
dtypes: float64(2), int64(1), object(11)
memory usage: 15.7+ KB
In [67]:
df['Address'].isna().unique()
Out[67]:
array([False])
In [59]:
from sqlalchemy import create_engine
engine = create_engine('postgresql://postgres:headband@192.168.0.151:5432/mydb')
In [60]:
"""
Note to self: inlcude a unique index to use for postgres database insert functionality

CREATE UNIQUE INDEX CONCURRENTLY seattle_rec_num 
ON SeattleDistressedProp ("Rec. #");

ALTER TABLE SeattleDistressedProp 
ADD CONSTRAINT unique_rec_num 
UNIQUE USING INDEX seattle_rec_num;
"""

def update_server(table_name):
    table_exists = (
    'SELECT EXISTS (SELECT FROM pg_tables '
    'WHERE schemaname = \'public\' AND '
    f'tablename  = \'{table_name}\');'
    )
    
    row_count = (
    'SELECT count(*) '
    f'FROM "{table_name}";'
    )
  
    try:
        if engine.execute(table_exists).first()[0]:
            org_rows = engine.execute(row_count).first()[0]
            cols = ",".join(['"'+i+'"' for i in list(df.columns)])
            query = (
            f'INSERT INTO "{table_name}"({cols}) '
            f'VALUES {",".join([str(i) for i in list(df.to_records(index=False))])} '
            f'ON CONFLICT ("Rec. #") DO NOTHING;'
            )
            print(query)
            engine.execute(query)
            new_rows = engine.execute(row_count).first()[0]
            diff_rows = new_rows - org_rows
            print(diff_rows,' rows added')
        else:
            df.to_sql(table_name,con=engine,index=False,if_exists='replace')
        print('Successfully updated server')
    except Exception as err:
        print('Got an error while updating server')
        print('Error: ', str(err))
In [61]:
update_server("SeattleDistressedProp")
Successfully updated server