# Get driver
def driver_open(url=records_url):
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()),options=options)
driver.get(url)
driver.set_window_size(1200,850)
return driver
# Get record for given time period
def get_records_from_dates(beg,end,n_iter,driver):
if n_iter == 0:
li_doctype = driver.find_element('id','searchCriteriaDocuments-tab')
li_doctype.click()
time.sleep(3)
text_area = driver.find_element('id','documentType-DocumentType')
text_area.send_keys("NTS")
b_date = driver.find_element('id','beginDate-DocumentType')
b_date.clear()
b_date.send_keys(beg)
e_date = driver.find_element('id','endDate-DocumentType')
e_date.clear()
e_date.send_keys(end)
submit = driver.find_element('id','submit-DocumentType')
submit.click()
time.sleep(3)
if n_iter == 0:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
driver.save_screenshot(f'tmp/screenshot.png')
df = pd.read_html(driver.page_source,skiprows=0)[-2]
df = df.drop(columns=['#','Unnamed: 1', 'Unnamed: 2',
'Unnamed: 4', 'Book', 'Book Type',
'DocLinks.1','Unnamed: 15', 'Page'])
return df
# Get complete dataframe
def fetch_records(n_weeks):
date_format = "%m/%d/%Y"
x = datetime.today()
df = pd.DataFrame()
dates = []
for i in range(n_weeks):
x = x - timedelta(days=1)
end = x - timedelta(weeks=1)
dates.append(x.strftime(date_format))
dates.append(end.strftime(date_format))
x = end
dates_reversed = dates[::-1]
print('Looking up records for the week(s) of',', '.join(dates_reversed[::2]))
driver = driver_open()
n_dates = int(len(dates)/2)
print('Fetching records now')
for i in notebook.tqdm(range(n_dates), total=n_dates):
file_name = "{:02}".format(i)
partial = get_records_from_dates(dates[i*2+1],dates[i*2], i, driver)
df = pd.concat([df,partial], ignore_index=True, sort=False)
driver.quit()
return df
# Get Address and Appraised Value
def add_details_cols(pids):
address_list = []
appraised_list = []
print('Adding parcel info')
for pid in notebook.tqdm(pids, total=len(pids)):
url = pid_url + pid
page = requests.get(url)
soup = BeautifulSoup(page.content, 'html.parser')
try:
address_cell = soup.find('table',id='cphContent_DetailsViewParcel').find_all('td')[5]
address_list.append(address_cell.text)
appraise_cell = soup.find('table',id='cphContent_GridViewTaxRoll').find_all('td')[7]
appraised_list.append(appraise_cell.text)
except:
address_list.append(None)
appraised_list.append(None)
time.sleep(1)
return pd.Series(address_list), pd.Series(appraised_list)
# Get Location
def add_location_cols(addresses):
lat = []
lon = []
hood = []
metro = []
geolocator = Nominatim(user_agent="KingCounty")
print('Adding location info')
try:
for address in notebook.tqdm(addresses, total=len(addresses)):
try:
location = geolocator.geocode(str(address) + ', WA')
except:
location = None
if location and location.raw and (len(location.raw['display_name'].split(', ')) > 2):
lat.append(location.latitude)
lon.append(location.longitude)
address = location.raw['display_name'].split(', ')
hood.append(address[2])
metro.append(address[3])
else:
lat.append(None)
lon.append(None)
hood.append(None)
metro.append(None)
time.sleep(1.5)
except ValueError as e:
print('An error occurred: {}\n{}'.format(str(e),location))
return pd.Series(lat), pd.Series(lon) , pd.Series(hood), pd.Series(metro)
# Add PID, Address, Apprased Value to dataframe
def transform(df):
# Apply hashing function to the column
df['Grantor'] = df['Grantor'].apply(lambda x: hashlib.md5(x.encode()).hexdigest())
df['PID'] = df['Legal'].apply(lambda x: str(x.strip().split(" ")[1]) if x.find('PID') != -1 else '')
df.drop(columns=['Legal'], inplace=True)
df['Address'], df['Appraised Value'] = add_details_cols(list(df['PID']))
df['Latitude'], df['Longitude'], df['Neighborhood'], df['Metro'] = add_location_cols(df['Address'])
return df
# check if local file is older than 1 week
def isWeek_old(filename):
epoch = os.path.getmtime(f'data/{filename}.csv')
org = date.fromtimestamp(epoch)
is_old = org + timedelta(weeks=1) < date.today()
return is_old
# Load Date
def load(filename, n_weeks=1):
try:
df = fetch_records(n_weeks)
df = transform(df)
if df.empty:
raise Exception("Dataframe appears to be empty, please try again later...")
org_rows = df.shape[0]
df.drop_duplicates(subset=['Address'], inplace=True)
df.dropna(inplace=True)
print(f'{org_rows - df.shape[0]} were deleted')
df.to_csv(f'data/{filename}.csv')
print(f'Save to file: data/{filename}.csv')
return df
except Exception as err:
print(err)
# Cache dataset to local disk or reload and save if older than 1 week
def refresh_data(filename):
# init data fetch first time
if f'{filename}.csv' not in os.listdir('data'):
df = load(filename, n_weeks=12)
# refresh data if old
elif isWeek_old(filename):
date_format = "%m/%d/%Y"
df = pd.read_csv(f'data/{filename}.csv',index_col='Unnamed: 0')
last_update = df['Record Date'].max()
start = datetime.strptime(last_update,date_format)
x = datetime.today()
diff = x - start
n_weeks = math.ceil(diff.days/7)
df = load(filename, n_weeks=n_weeks)
else:
df = pd.read_csv(f'data/{filename}.csv',index_col='Unnamed: 0')
return df