Source code for w4h.core

"""The Core module contains core functions of the package used in other modules or as primary functions in the package. 
This includes the main run() function that allows rapid data analysis, a function to retrieve sample data,
and functions that are used throughout the package for logging and printing verbose outputs."""

import datetime
import importlib
import inspect
import json
import logging
import os
import pathlib
import subprocess

import zipfile

import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import rioxarray as rxr
from shapely import wkt
import xarray as xr

from shapely.geometry import Point

import w4h

log_filename = None  # initialize so variable exists but is None

RESOURCE_DIR = pathlib.Path(str(importlib.resources.files('w4h'))).joinpath('resources')
SAMPLE_DATA_DIR = RESOURCE_DIR.joinpath('sample_data')

# Main function to run model all at once
[docs] def run(well_data, surf_elev_grid, bedrock_elev_grid, model_grid=None, metadata=None, keep_all_cols=True, layers=9, description_col='FORMATION', top_col='TOP', bottom_col='BOTTOM', depth_type='depth', study_area=None, xcol='LONGITUDE', ycol='LATITUDE', zcol='SURFACE_ELEV', well_id_col='API_NUMBER', lith_dict=None, lith_dict_start=None, lith_dict_wildcard=None, use_tokens=False, target_dict=None, target_name='', include_elevation_grids=True, include_elevation_coordinates=True, export_dir=None, verbose=False, log=False, **kw_params): """Function to run entire process with one line of code. NOTE: verbose and log are boolean parameters used for most of the functions. verbose=True prints information to terminal, log=True logs to a file in the log_dir, which defaults to the export_dir Parameters ---------- well_data : str or pathlib.Path obj Filepath to file or directory containing well data. surf_elev_grid : str or pathlib.Path object _description_ bedrock_elev_grid : str or pathlib.Path object _description_ model_grid : str or pathlib.Path object, or model grid parameters (see model_grid function) _description_ metadata : str or pathlib.Path object, or None, default=None Filepath to file or directory containing well metadata, such as location and elevation. If None, will check if well_data is a directory, and if so, will use metadata_filename to search in same directory. keep_all_cols : bool, default=True Whether to keep all columns of the input dataframes/files. If True, no columns are excluded. If False, only keeps necessary columns. layers : int, default = 9 The number of layers in the model grid description_col : str, default = 'FORMATION' Name of column containing geologic descriptions of the well interval. This column should be in well_data. top_col : str, default = 'TOP' Name of column containing depth/elevation at top of well interval. This column should be in well_data. bottom_col : str, default = 'BOTTOM' Name of column containing depth/elevation at bottom of well interval. This column should be in well_data. depth_type : str, default = 'depth' Whether values top_col or bottom_col refer to depth or elevation. study_area : str or pathlib.Path object, or geopandas.GeoDataFrame _description_ xcol : str, default = 'LONGITUDE' Name of column containing x coordinates. This column should be in metadata unless metadata is not read, then it should be in well_data. ycol : str, default = 'LATITUDE' Name of column containing y coordinates. This column should be in metadata unless metadata is not read, then it should be in well_data. zcol : str, default = 'ELEVATION' Name of column containing z coordinates. This column should be in metadata unless metadata is not read, then it should be in well_data. lith_dict : str or pathlib.Path object, or pandas.DataFrame _description_ lith_dict_start : str or pathlib.Path object, or pandas.DataFrame _description_ lith_dict_wildcard : str or pathlib.Path object, or pandas.DataFrame _description_ use_tokens : bool Whether to tokenize the data for classification target_dict : str or pathlib.Path object, or pandas.DataFrame _description_ target_name : str, default = 'CoarseFine' Name of target of interest, to be used on exported files include_elevation_grid : bool, default = True Whether to include the elevation grids (surface, bedrock, and derived total "drift" thickness and layer thickness) include_elevation_coordinates : bool, default = True Whether to include the elevation coordinates for each grid point at all layers in the output. If True, these are saved as unindexed coordinates. export_dir : str or pathlib.Path object, default = None Directory to export output files verbose : bool, default = False Whether to print updates/results log : bool, default = False Whether to send parameters and outputs to log file, to be saved in export_dir, or the same directory as well_data if export_dir not defined. **kw_params Keyword parameters used by any of the functions throughout the process. See list of functions above, and the API documentation for their possible parameters """ if verbose: verbose_print(run, locals()) #Get data (files or otherwise) file_setup_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.file_setup).parameters.keys()} #Check how well_data and metadata were defined if isinstance(well_data, pathlib.PurePath) or isinstance(well_data, str): #Convert well_data to pathlib.Path if not already if isinstance(well_data, str): well_data = pathlib.Path(well_data) if metadata is None: if well_data.is_dir(): # If the two files are supposed to be in the same directory (or just want well_data found) well_dataPath, metadataPath = w4h.file_setup(well_data=well_data, verbose=verbose, log=log, **file_setup_kwargs) elif well_data.exists(): # If well_data is a file, and metadata is not used well_dataPath, _ = w4h.file_setup(well_data=well_data, verbose=verbose, log=log, **file_setup_kwargs) metadataPath = None else: #Need for well_data to exist at the very least raise IOError('well_data file does not exist:{}'.format(well_data)) elif isinstance(metadata, pathlib.PurePath) or isinstance(metadata, str): #Metdata has specifically been specified by a filepath if isinstance(metadata, str): metadata = pathlib.Path(metadata) well_dataPath, metadataPath = w4h.file_setup(well_data=well_data, metadata=metadata, **file_setup_kwargs) else: if isinstance(metadata, (pd.DataFrame, gpd.GeoDataFrame)): well_dataPath, _ = w4h.file_setup(well_data=well_data, verbose=verbose, log=log, **file_setup_kwargs) metadataPath = metadata elif metadata is None: well_dataPath, _ = w4h.file_setup(well_data=well_data, verbose=verbose, log=log, **file_setup_kwargs) elif isinstance(well_data, (pd.DataFrame, gpd.GeoDataFrame)): if isinstance(metadata, (pd.DataFrame, gpd.GeoDataFrame)): well_dataPath = well_data metadataPath = metadata elif isinstance(metadata, pathlib.PurePath) or isinstance(metadata, str): _, metadataPath = w4h.file_setup(well_data=metadata, metadata=metadata, verbose=verbose, log=log, **file_setup_kwargs) well_dataPath = well_data else: print('ERROR: metadata must be a string filepath, a pathlib.Path object, or pandas.DataFrame') else: print('ERROR: well_data must be a string filepath, a pathlib.Path object, or pandas.DataFrame') if not export_dir: if export_dir is False or export_dir is None: if verbose: print("\tData will not be exported") pass else: nowTime = datetime.datetime.now() nowTime = str(nowTime).replace(':', '-').replace(' ','_').split('.')[0] nowTimeStr = '_'+str(nowTime) outDir = 'Output_'+nowTimeStr if isinstance(well_dataPath, pd.DataFrame) or isinstance(well_dataPath, gpd.GeoDataFrame): export_dir = pathlib.Path(outDir) elif isinstance(well_dataPath, pathlib.PurePath): if well_dataPath.is_dir(): export_dir = well_dataPath.joinpath(outDir) else: export_dir = well_dataPath.parent.joinpath(outDir) else: raise IOError('export_dir should be explicitly defined if well_data is not a filepath') if not export_dir.exists(): try: export_dir.mkdir() except Exception: print('Export Directory not created') # Get pandas dataframes from input read_raw_txt_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.read_raw_csv).parameters.keys()} well_data_IN, metadata_IN = w4h.read_raw_csv(data_filepath=well_dataPath, metadata_filepath=metadataPath, verbose=verbose, log=log, **read_raw_txt_kwargs) # Functions to read data into dataframes. Also excludes extraneous columns, and drops header data with no location information if metadata_IN is None: well_data_xyz = well_data_IN else: merge_metadata_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.merge_metadata).parameters.keys()} well_data_xyz = w4h.merge_metadata(data_df=well_data_IN, header_df=metadata_IN, data_cols=None, header_cols=None, auto_pick_cols=False, drop_duplicate_cols=True, log=False, **merge_metadata_kwargs) # Convert well_data_xyz to have geometry coords2geometry_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.coords2geometry).parameters.keys()} well_data_xyz = w4h.coords2geometry(df_no_geometry=well_data_xyz, xcol=xcol, ycol=ycol, zcol=zcol, verbose=verbose, log=log, **coords2geometry_kwargs) # Get Study area read_study_area_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.read_study_area).parameters.keys()} if study_area is None: studyAreaIN = None use_study_area = False else: studyAreaIN = w4h.read_study_area(study_area=study_area, log=log, **read_study_area_kwargs) use_study_area = True clip_gdf2study_area_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.clip_gdf2study_area).parameters.keys()} well_data_xyz = w4h.clip_gdf2study_area(study_area=studyAreaIN, gdf=well_data_xyz, verbose=verbose, log=log,**clip_gdf2study_area_kwargs) # Get surfaces and grid(s) read_grid_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.read_grid).parameters.keys()} surfaceElevPath = surf_elev_grid bedrockElevPath = bedrock_elev_grid surfaceElevGridIN = w4h.read_grid(grid_path=surfaceElevPath, grid_type='surface', study_area=studyAreaIN, verbose=verbose, log=log, **read_grid_kwargs) bedrockElevGridIN = w4h.read_grid(grid_path=bedrockElevPath, grid_type='bedrock', study_area=studyAreaIN, verbose=verbose, log=log, **read_grid_kwargs) if isinstance(model_grid, xr.DataArray): modelGrid = model_grid if verbose: print("Input model_grid specified is already xarray DataArray. Will use this directly.") elif model_grid is not None: modelGrid = w4h.read_grid(grid_path=model_grid, grid_type='model', study_area=studyAreaIN, verbose=verbose, log=log, **read_grid_kwargs) else: # If model grid is not defined, it is set equal to the bedrock grid, except that all data values are set to 1 modelGrid = bedrockElevGridIN.copy() modelGrid.values[:] = 1 # Add control points add_control_points_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.add_control_points).parameters.keys()} if add_control_points_kwargs != {}: well_data_xyz = w4h.add_control_points(df_without_control=well_data_xyz, xcol=xcol, ycol=ycol, zcol=zcol, top_col=top_col, bottom_col=bottom_col, description_col=description_col, verbose=verbose, log=log, **add_control_points_kwargs) # Analyze Surface(s) and grid(s) bedrockGrid, surfaceGrid = w4h.align_rasters(grids_unaligned=[bedrockElevGridIN, surfaceElevGridIN], model_grid=modelGrid, no_data_val_grid=0, log=log) driftThickGrid, layerThickGrid = w4h.get_drift_thick(surface_elev=surfaceGrid, bedrock_elev=bedrockGrid, layers=layers, plot=verbose, log=log) well_data_xyz = w4h.sample_raster_points(raster=bedrockGrid, points_df=well_data_xyz, xcol=xcol, ycol=ycol, new_col='BEDROCK_ELEV', verbose=verbose, log=log) well_data_xyz = w4h.sample_raster_points(raster=surfaceGrid, points_df=well_data_xyz, xcol=xcol, ycol=ycol, new_col='SURFACE_ELEV', verbose=verbose, log=log) well_data_xyz['BEDROCK_DEPTH'] = well_data_xyz['SURFACE_ELEV'] - well_data_xyz['BEDROCK_ELEV'] well_data_xyz['LAYER_THICK'] = well_data_xyz['BEDROCK_DEPTH'] / layers well_data_xyz = w4h.get_layer_depths(df_with_depths=well_data_xyz, layers=layers, log=log) # Clean up data well_data_xyz = w4h.remove_nonlocated(df_with_locations=well_data_xyz, log=log, verbose=verbose) well_data_xyz = w4h.remove_no_topo(df_with_topo=well_data_xyz, zcol=zcol, verbose=verbose, log=log) remove_no_depth_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.remove_no_depth).parameters.keys()} well_data_xyz = w4h.remove_no_depth(well_data_xyz, verbose=verbose, top_col=top_col, bottom_col=bottom_col, log=log, **remove_no_depth_kwargs) #Drop records with no depth information remove_bad_depth_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.remove_bad_depth).parameters.keys()} well_data_xyz = w4h.remove_bad_depth(well_data_xyz, verbose=verbose, top_col=top_col, bottom_col=bottom_col, depth_type=depth_type, log=log, **remove_bad_depth_kwargs)#Drop records with bad depth information (i.e., top depth > bottom depth) (Also calculates thickness of each record) remove_no_formation_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.remove_no_description).parameters.keys()} well_data_xyz = w4h.remove_no_description(well_data_xyz, description_col=description_col, verbose=verbose, log=log, **remove_no_formation_kwargs) # CLASSIFICATION # Read dictionary definitions and classify if use_tokens: # This will eventually parse out well data descriptions pass else: get_search_terms_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.get_search_terms).parameters.keys()} specTermsPATH, startTermsPATH, wildcardTermsPATH, = w4h.get_search_terms(spec_path=lith_dict, start_path=lith_dict_start, wildcard_path=lith_dict_wildcard, use_tokens=use_tokens, log=log, **get_search_terms_kwargs) read_dictionary_terms_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.read_dictionary_terms).parameters.keys()} if 'class_flag' in read_dictionary_terms_kwargs.keys(): del read_dictionary_terms_kwargs['class_flag'] # This is specific to an invidiual dict terms file, so don't want to use for all specTerms = w4h.read_dictionary_terms(dict_file=specTermsPATH, log=log, **read_dictionary_terms_kwargs) startTerms = w4h.read_dictionary_terms(dict_file=startTermsPATH, log=log, **read_dictionary_terms_kwargs) wildcardTerms = w4h.read_dictionary_terms(dict_file=wildcardTermsPATH, log=log, **read_dictionary_terms_kwargs) # Clean up dictionary terms specTerms = specTerms.drop_duplicates(subset='DESCRIPTION') specTerms = specTerms.reset_index(drop=True) specTerms['CLASS_FLAG'] = 1 startTerms = startTerms.drop_duplicates(subset='DESCRIPTION') startTerms = startTerms.reset_index(drop=True) wildcardTerms = wildcardTerms.drop_duplicates(subset='DESCRIPTION') wildcardTerms = wildcardTerms.reset_index(drop=True) if verbose: noSpecTerms = specTerms.shape[0] noStartTerms = startTerms.shape[0] noWildcardTerms = wildcardTerms.shape[0] print('\tSearch terms to be used:') print(f'\t\t {noSpecTerms} exact match term/definition pairs') print(f'\t\t {noStartTerms} starting match term/definition pairs') print(f'\t\t {noWildcardTerms} wildcard match term/definition pairs') # CLASSIFICATIONS # Exact match classifications well_data_xyz = w4h.specific_define(well_data_xyz, terms_df=specTerms, description_col=description_col, verbose=verbose, log=log) # .startswith classifications if lith_dict_start is not None: classifedDF, searchDF = w4h.split_defined(well_data_xyz, verbose=verbose, log=log) searchDF = w4h.start_define(df=searchDF, terms_df=startTerms, description_col=description_col, verbose=verbose, log=log) well_data_xyz = w4h.remerge_data(classifieddf=classifedDF, searchdf=searchDF) #UPDATE: Needed? *** # wildcard/any substring match classifications if lith_dict_wildcard is not None: classifedDF, searchDF = w4h.split_defined(well_data_xyz, verbose=verbose, log=log) searchDF = w4h.wildcard_define(df=searchDF, terms_df=wildcardTerms, description_col=description_col, verbose=verbose, log=log) well_data_xyz = w4h.remerge_data(classifieddf=classifedDF, searchdf=searchDF) #UPDATE: Needed? *** # Depth classification classifedDF, searchDF = w4h.split_defined(well_data_xyz, verbose=verbose, log=log) searchDF = w4h.depth_define(df=searchDF, thresh=550, verbose=verbose, log=log) well_data_xyz = w4h.remerge_data(classifieddf=classifedDF, searchdf=searchDF) #UPDATE: Needed? *** # Fill unclassified data well_data_xyz = w4h.fill_unclassified(well_data_xyz, classification_col='CLASS_FLAG') # Add target interpratations read_lithologies_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.read_lithologies).parameters.keys()} targetInterpDF = w4h.read_lithologies(lith_file=target_dict, log=log, **read_lithologies_kwargs) well_data_xyz = w4h.merge_lithologies(well_data_df=well_data_xyz, targinterps_df=targetInterpDF, target_col='TARGET', target_class='bool') # Sort dataframe to prepare for next steps # well_data_xyz = w4h.sort_dataframe(df=well_data_xyz, sort_cols=['API_NUMBER','TOP'], remove_nans=True) well_data_xyz = well_data_xyz.sort_values(by=[well_id_col, top_col]) well_data_xyz = well_data_xyz.reset_index(drop=True) # UPDATE: Option to remove nans? well_data_xyz = well_data_xyz[well_data_xyz["LITHOLOGY"].notnull()] layer_target_thick_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.layer_target_thick).parameters.keys()} resdf = w4h.layer_target_thick(gdf=well_data_xyz, well_id_col=well_id_col, layers=layers, export_dir=export_dir, depth_top_col=top_col, depth_bot_col=bottom_col, log=log, **layer_target_thick_kwargs) returnALL = False if 'return_all' in layer_target_thick_kwargs.keys(): if layer_target_thick_kwargs['return_all'] is True: returnALL = True res_list, resdf = resdf # bedrockGrid, surfaceGrid, driftThickGrid, layerThickGrid layer_interp_kwargs = {k: v for k, v in locals()['kw_params'].items() if k in inspect.signature(w4h.layer_interp).parameters.keys()} layers_data = w4h.layer_interp(points=resdf, model_grid=modelGrid, layers=layers, verbose=verbose, log=log, **layer_interp_kwargs) if include_elevation_grids: # Add surface, bedrock, and derived grids layers_data['Surface_Elevation'] = surfaceGrid layers_data['Bedrock_Elevation'] = bedrockGrid layers_data['Bedrock_Depth'] = driftThickGrid layers_data['Layer_Thickness'] = layerThickGrid if include_elevation_coordinates: # Add each layer's elevation as an unindexed coordinate layerElevs = [] for i in range(1, layers+1): layerElevs.append((layers_data['Surface_Elevation'] - (layers_data['Layer_Thickness']*i)).values) layerElevs = np.array(layerElevs) layers_data = layers_data.assign_coords(layer_elevs=(['Layer', "y", "x"], layerElevs)) # Calculate current time for export string nowTime = datetime.datetime.now() nowTime = str(nowTime).replace(':', '-').replace(' ', '_').split('.')[0] nowTimeStr = '_'+str(nowTime) # Ensure spatial dimensinos are set to improve export layers_data.rio.set_spatial_dims(x_dim='x', y_dim='y', inplace=True) layers_data.rio.write_grid_mapping(inplace=True) layers_crs = pyproj.CRS.from_wkt(layers_data.spatial_ref.crs_wkt) layers_data.rio.write_crs(layers_crs, inplace=True) # THIS MAY BE REPEAT OF LAST LINES OF layer_interp() w4h.export_grids(grid_data=layers_data, out_path=export_dir, file_id=target_name, filetype='tif', variable_sep=True, date_stamp=True, verbose=verbose, log=log) if returnALL: print('Returning all data [results dataframe, gridded layers, results list]') return resdf, layers_data, res_list else: return resdf, layers_data
[docs] def gui(): streamlitPath = pathlib.Path(__file__).with_name("w4h_gui.py") cmd = ['streamlit', 'run', streamlitPath.as_posix()] import sys from streamlit.web import cli as stcli import streamlit import sys import subprocess import tempfile temp_dir = tempfile.TemporaryDirectory() def run_streamlit_app(path_dir): temp_dir = tempfile.TemporaryDirectory() # create a temporary directory fpathList = ['classify.py', 'clean.py', 'core.py', 'export.py', 'layers.py', '__init__.py', 'mapping.py', 'read.py', 'visualization.py'] currDir = streamlitPath.parent.as_posix() for fpath in fpathList: temp_file_path = os.path.join(temp_dir.name, fpath) with open(pathlib.Path(currDir).joinpath(fpath), 'r') as cf: scriptText = cf.read() # write the streamlit app code to a Python script in the temporary directory with open(temp_file_path, 'w') as f: f.write(scriptText) # execute the streamlit app try: # execute the streamlit app subprocess.run( ['streamlit', "run", temp_file_path], stderr=subprocess.DEVNULL ) except KeyboardInterrupt: pass # clean up the temporary directory when done temp_dir.cleanup() #with open(streamlitPath.parent.as_posix(), 'r') as file: # appText = file.read() run_streamlit_app(pathlib.Path(__name__).parent)
#streamlit.web.bootstrap.run(streamlitPath.as_posix(), '', [], # Function to update docstring for run function, used in __init__ file def _run_docstring(): nl = '\n\t' functionList = [w4h.file_setup, w4h.read_raw_csv, w4h.define_dtypes, w4h.merge_metadata, w4h.coords2geometry, w4h.read_study_area, w4h.clip_gdf2study_area, w4h.read_grid, w4h.add_control_points, w4h.remove_nonlocated, w4h.remove_no_topo, w4h.remove_no_depth, w4h.remove_bad_depth, w4h.remove_no_description, w4h.get_search_terms, w4h.read_dictionary_terms, w4h.specific_define, w4h.split_defined, w4h.start_define, w4h.wildcard_define, w4h.remerge_data, w4h.fill_unclassified, w4h.read_lithologies, w4h.merge_lithologies, w4h.align_rasters, w4h.get_drift_thick, w4h.sample_raster_points, w4h.get_layer_depths, w4h.layer_target_thick, w4h.layer_interp, w4h.export_grids] funcStrList = [] funcParams = [] funcDefaults = [] prevOutputList = ['df', 'filepath', 'study_area'] requiredList = [] for func in functionList: parameters = inspect.signature(func).parameters defaults = [param.default for param in list(zip(*parameters.items()))[1]] parameters = list(zip(*parameters.items()))[0] for i, d in enumerate(defaults): if 'kwargs' in parameters[i]: defaults[i] = {} elif d is inspect._empty: if func.__name__ == 'read_study_area' and parameters[i] == 'study_area': defaults[i] = "None <but defaults to w4h.resources()['study_area']>" elif any(o in parameters[i] for o in prevOutputList): defaults[i] = '<output of previous function>' else: defaults[i] = '<no default>' firstLine = f"\n\n**{func.__name__}**" followingLines = '' for i, param in enumerate(parameters): followingLines += f"\n\n\t{param}".ljust(25) if isinstance(defaults[i], str) and defaults[i]!='<positional, no default>': followingLines += f"| default = '{defaults[i]}'" else: followingLines += f"| default = {defaults[i]}" #funcDefaults.append(['<positional, no default>' if d is inspect._empty else d for d in defaults]) #funcParams.append(list(zip(*parameters.items()))[0]) funcString = firstLine + followingLines funcStrList.append(funcString) run_docstring = f""" w4h.run() is a function that runs the intended workflow of the wells4hydrogeology (w4h) package. This means that it runs several constituent functions. The workflow that this follows is provided in the package wiki. It accepts the parameters of the constituent functions. To see a list of these functions and parameters, use `help(w4h.run)`. The following functions used in w4h.run() are listed below, along with their parameters and default values for those parameters. See the documentation for the each of the individual functions for more information on a specific parameter: {nl.join(funcStrList)}" """ return run_docstring # Function for logging (experimental)
[docs] def logger_function(logtocommence, parameters, func_name): """Function to log other functions, to be called from within other functions Parameters ---------- logtocommence : bool Whether to perform logging steps parameters : dict Dictionary containing parameters and their values, from function func_name : str Name of function within which this is called """ if logtocommence: global log_filename #log parameter should be false by default on all. If true, will show up in kwargs #Get the log parameter value if 'log' in parameters.keys(): log_file = parameters.pop('log', None) else: #If it wasn't set, default to None log_file = None #Get currenet time and setup format for log messages curr_time = datetime.datetime.now() FORMAT = '%(asctime)s %(message)s' #Check if we are starting a new logfile (only does this during run of file_setup() or (currently non-existent) new_logfile() functions) if log_file == True and (func_name == 'file_setup' or func_name == 'new_logfile'): #Get the log_dir variable set as a file_setup() parameter, or default to None if not specified out_dir = parameters.pop('log_dir', None) if out_dir is None: #If output directory not specified, default to the input directory out_dir = parameters['well_data'] #Get the timestamp for the filename (this won't change, so represents the start of logging) timestamp = curr_time.strftime('%Y-%m-%d_%H-%M-%S') log_filename = pathlib.Path(out_dir).joinpath(f"log_{timestamp}.txt") if 'verbose' in parameters.keys(): print('Logging data to', log_filename) #Set up logging stream using logging module logging.basicConfig(filename=log_filename, level=logging.INFO, format=FORMAT, filemode='w') #Log logging.info(f"{func_name} CALLED WITH PARAMETERS:\n\t {parameters}") elif log_file == True: #Run this for functions that aren't setting up logging file if log_filename: #Get the log stream and log this function's call with parameters logging.basicConfig(filename=log_filename, level=logging.INFO, format=FORMAT) logging.info(f"{func_name} CALLED WITH PARAMETERS: \n\t{parameters}") else: #If log file has not already been set up, set it up timestamp = curr_time.strftime('%Y-%m-%d_%H-%M-%S') log_filename = f"log_{timestamp}.txt" #Now, get the log stream and log this function's call with parameters logging.basicConfig(filename=log_filename, level=logging.INFO, format=FORMAT) logging.info(f"{func_name} CALLED WITH PARAMETERS: \n\t{parameters}") else: #Don't log if log=False pass return
# Reusable function for consistently-formatted verbose printing output
[docs] def verbose_print(func, local_variables, exclude_params=[]): print_list = ['\n'] sTime = datetime.datetime.now() print_list.append(f"{func.__name__}") print_list.append(f"\tStarted at {sTime}.") print_list.append(f"\tParameters:") for k, v in local_variables.items(): if k in inspect.signature(func).parameters: if 'kwargs' in k: print_list.append(f"\t\t{k}") for kk, vv in local_variables[k].items(): print_list.append(f"\t\t\t{kk}={vv}") elif k in exclude_params: print_list.append(f"\t\t{k}=<input object>") else: print_list.append(f"\t\t{k}={v}") for line in print_list: print(line) return print_list
# Get filepaths for package resources in dictionary format
[docs] def get_resources(resource_type='filepaths', scope='local', verbose=False): """Function to get filepaths for resources included with package Parameters ---------- resource_type : str, {'filepaths', 'data'} If filepaths, will return dictionary with filepaths to sample data. If data, returns dictionary with data objects. scope : str, {'local', 'statewide'} If 'local', will read in sample data for a local (around county sized) project. If 'state', will read in sample data for a statewide project (Illinois) verbose : bool, optional Whether to print results to terminal, by default False Returns ------- resources_dict : dict Dictionary containing key, value pairs with filepaths to resources that may be of interest. """ resources_dict = {} #Get sample data #Get lithology dictionaries' filepaths sample_dictionary_dir = SAMPLE_DATA_DIR.joinpath('DictionaryTerms') resources_dict['LithologyDict_Exact'] = w4h.get_most_recent(dir=sample_dictionary_dir, glob_pattern='*DICTIONARY_SearchTerms*', verbose=verbose) resources_dict['LithologyDict_Start'] = w4h.get_most_recent(dir=sample_dictionary_dir, glob_pattern='*SearchTerms-Start*', verbose=verbose) resources_dict['LithologyDict_Wildcard'] = w4h.get_most_recent(dir=sample_dictionary_dir, glob_pattern='*SearchTerms-Wildcard*', verbose=verbose) #Get Lithology Interpretation filepaths lith_interp_dir = SAMPLE_DATA_DIR.joinpath('LithologyInterpretations') resources_dict['LithInterps_FineCoarse'] = w4h.get_most_recent(dir=lith_interp_dir, glob_pattern='*FineCoarse*', verbose=verbose) resources_dict['LithInterps_Clay'] = w4h.get_most_recent(dir=lith_interp_dir, glob_pattern='*Clay*', verbose=verbose) resources_dict['LithInterps_Silt'] = w4h.get_most_recent(dir=lith_interp_dir, glob_pattern='*Silt*', verbose=verbose) resources_dict['LithInterps_Sand'] = w4h.get_most_recent(dir=lith_interp_dir, glob_pattern='*Sand*', verbose=verbose) resources_dict['LithInterps_Gravel'] = w4h.get_most_recent(dir=lith_interp_dir, glob_pattern='*Gravel*', verbose=verbose) #Get other resource filepaths resources_dict['well_data_dtypes'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='*downholeDataTypes*', verbose=verbose) resources_dict['metadata_dtypes'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='*headerDataTypes*', verbose=verbose) resources_dict['ISWS_CRS'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='isws_crs.json', verbose=verbose) resources_dict['xyz_dtypes'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='xyzDataTypes.json', verbose=verbose) resources_dict['model_grid'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='grid_625_raster.tif', verbose=verbose) statewideSampleDir = SAMPLE_DATA_DIR.joinpath('statewide_sample_data') statewideList = ['statewide', 'state', 'regional', 'region', 's', 'r'] if scope.lower() in statewideList: resources_dict['well_data'] = statewideSampleDir.joinpath("IL_Statewide_WellData_XYz_2023-07-20_cleaned.zip") resources_dict['surf_elev'] = w4h.get_most_recent(dir=statewideSampleDir, glob_pattern='*IL_Statewide_Surface_Elev_ft_625ft_Lambert_GridAlign*', verbose=verbose) resources_dict['bedrock_elev'] = w4h.get_most_recent(dir=statewideSampleDir, glob_pattern='*IL_Statewide_Bedrock_Elev_2023_ft_625ft_Lambert_GridAlign*', verbose=verbose) resources_dict['study_area'] = w4h.get_most_recent(dir=statewideSampleDir, glob_pattern='*IL_Statewide_boundary*', verbose=verbose) else: resources_dict['study_area'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='*sample_studyArea*', verbose=verbose) resources_dict['surf_elev'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='*sample_surface_bedrock_lidarresampled100ft*', verbose=verbose) resources_dict['bedrock_elev'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='*LocalSample_Bedrock_elev_EStLGrimleyPhillips*', verbose=verbose) resources_dict['well_data'] = w4h.get_most_recent(dir=SAMPLE_DATA_DIR, glob_pattern='sample_well_data*', verbose=verbose) # Get data objects if specified dataObjList = ['data', 'objects', 'do', 'data objects', 'dataobjects'] if resource_type.lower() in dataObjList: resources_dict['LithologyDict_Exact'] = pd.read_csv(resources_dict['LithologyDict_Exact'], dtype={"ID":int, "DESCRIPTION":str, "LITHOLOGY":str, "COLOR":str, "CONSISTENCY":str, "MOD1":str, "MOD2":str, "INTERPRETED":str, "COMPLETED":str, "ORIGIN_INDIANA":str}, index_col='ID') resources_dict['LithologyDict_Start'] = pd.read_csv(resources_dict['LithologyDict_Start']) resources_dict['LithologyDict_Wildcard'] = pd.read_csv(resources_dict['LithologyDict_Wildcard']) resources_dict['LithInterps_FineCoarse'] = pd.read_csv(resources_dict['LithInterps_FineCoarse']) resources_dict['LithInterps_Clay'] = pd.read_csv(resources_dict['LithInterps_Clay']) resources_dict['LithInterps_Silt'] = pd.read_csv(resources_dict['LithInterps_Silt']) resources_dict['LithInterps_Sand'] = pd.read_csv(resources_dict['LithInterps_Sand']) resources_dict['LithInterps_Gravel'] = pd.read_csv(resources_dict['LithInterps_Gravel']) with open(resources_dict['well_data_dtypes'], 'r', encoding='utf-8') as f: resources_dict['well_data_dtypes'] = json.load(f) with open(resources_dict['metadata_dtypes'], 'r', encoding='utf-8') as f: resources_dict['metadata_dtypes'] = json.load(f) with open(resources_dict['ISWS_CRS'], 'r', encoding='utf-8') as f: resources_dict['ISWS_CRS'] = json.load(f) with open(resources_dict['xyz_dtypes'], 'r', encoding='utf-8') as f: resources_dict['xyz_dtypes'] = json.load(f) if scope.lower() in statewideList: sacrs = resources_dict['ISWS_CRS'] with zipfile.ZipFile(resources_dict['well_data'].as_posix(), 'r') as archive: for file_name in archive.namelist(): with archive.open(file_name) as file: if 'HEADER' in file_name: metaDF = pd.read_csv(file) else: resources_dict['well_data'] = pd.read_csv(file) geometry = [Point(xy) for xy in zip(resources_dict['well_data']['LONGITUDE'], resources_dict['well_data']['LATITUDE'])] resources_dict['well_data'] = gpd.GeoDataFrame(resources_dict['well_data'], geometry=geometry, crs='EPSG:5070') else: sacrs = 'EPSG:5070' df = pd.read_csv(resources_dict['well_data']) df['geometry'] = df['geometry'].apply(wkt.loads) resources_dict['well_data'] = gpd.GeoDataFrame(df, geometry='geometry') resources_dict['study_area'] = gpd.read_file(resources_dict['study_area'], geometry='geometry', crs=sacrs) resources_dict['model_grid'] = rxr.open_rasterio(resources_dict['model_grid']) resources_dict['surf_elev'] = rxr.open_rasterio(resources_dict['surf_elev']) #resources_dict['surf_elev'] = resources_dict['surf_elev'].sel(band=1) resources_dict['bedrock_elev'] = rxr.open_rasterio(resources_dict['bedrock_elev']) #resources_dict['bedrock_elev'] = resources_dict['bedrock_elev'].sel(band=1) return resources_dict
# Only used for development purposes, check that parameters are unique def __check_parameter_names(verbose=True): #Check parameters are unique import inspect import w4h import pandas as pd function_list = [w4h.file_setup, w4h.read_raw_csv, w4h.define_dtypes, w4h.read_study_area, w4h.read_grid, w4h.add_control_points, w4h.coords2geometry, w4h.clip_gdf2study_area, w4h.remove_nonlocated, w4h.remove_no_topo, w4h.remove_no_depth, w4h.remove_bad_depth, w4h.remove_no_description, w4h.get_search_terms, w4h.read_dictionary_terms, w4h.specific_define, w4h.start_define, w4h.wildcard_define, w4h.depth_define, w4h.fill_unclassified, w4h.read_lithologies, w4h.merge_lithologies, w4h.align_rasters, w4h.get_drift_thick, w4h.sample_raster_points, w4h.get_layer_depths, w4h.layer_target_thick, w4h.layer_interp, w4h.export_grids] paramDF = pd.DataFrame() for f in function_list: currParamList = inspect.getfullargspec(f)[0] fList = [] for p in currParamList: fList.append(f.__name__) currParamDF = pd.DataFrame({'Function':fList, 'Parameter':currParamList}) paramDF = pd.concat([paramDF, currParamDF]) uniqueDF = paramDF.drop_duplicates(subset='Parameter').copy() for up in uniqueDF['Parameter']: if up != 'verbose' and up!='log': matchDF = paramDF[paramDF['Parameter']==up].copy() if verbose: if matchDF.shape[0] > 1: print(matchDF) return paramDF