Source code for w4h.read

"""The Read module contains funtions primarily for the input of data through the reading of data files,
as well as support functions to carry out this task
"""

import datetime
import importlib
import inspect
import json
import os
import pathlib

import geopandas as gpd
import pandas as pd
import numpy as np

from w4h import logger_function, verbose_print
import w4h

repoDir = pathlib.Path(os.getcwd())
RESOURCE_DIR = pathlib.Path(str(importlib.resources.files('w4h'))).joinpath('resources')


# Read and concatenate control points into main database/dataframe
[docs] def add_control_points(df_without_control, df_control=None, xcol='LONGITUDE', ycol='LATITUDE', zcol='ELEV_FT', controlpoints_crs='EPSG:4269', output_crs='EPSG:5070', description_col='FORMATION', interp_col='INTERPRETATION', target_col='TARGET', verbose=False, log=False, **kwargs): """Function to add control points, primarily to aid in interpolation. This may be useful when conditions are known but do not exist in input well database Parameters ---------- df_without_control : pandas.DataFrame Dataframe with current working data df_control : str, pathlib.Purepath, or pandas.DataFrame Pandas dataframe with control points well_key : str, optional The column containing the "key" (unique identifier) for each well, by default 'API_NUMBER' xcol : str, optional The column in df_control containing the x coordinates for each control point, by default 'LONGITUDE' ycol : str, optional The column in df_control containing the y coordinates for each control point, by default 'LATITUDE' zcol : str, optional The column in df_control containing the z coordinates for each control point, by default 'ELEV_FT' controlpoints_crs : str, optional The column in df_control containing the crs of points, by default 'EPSG:4269' output_crs : str, optional The output coordinate system, by default 'EPSG:5070' description_col : str, optional The column in df_control with the description (if this is used), by default 'FORMATION' interp_col : str, optional The column in df_control with the interpretation (if this is used), by default 'INTERPRETATION' target_col : str, optional The column in df_control with the target code (if this is used), by default 'TARGET' verbose : bool, optional Whether to print information to terminal, by default False log : bool, optional Whether to log information in log file, by default False **kwargs Keyword arguments of pandas.concat() or pandas.read_csv that will be passed to that function, except for objs, which are df and df_control Returns ------- pandas.DataFrame Pandas DataFrame with original data and control points formatted the same way and concatenated together """ if verbose: verbose_print(add_control_points, locals(), exclude_params=['df_without_control', 'df_control']) if df_control is None: return df_without_control elif isinstance(df_control, pd.DataFrame) or isinstance(df_control, gpd.GeoDataFrame): pass else: read_csv_kwargs = {k: v for k, v in locals()['kwargs'].items() if k in inspect.signature(pd.read_csv).parameters.keys()} df_control = pd.read_csv(df_control, **read_csv_kwargs) #Drop unnecessary columns, if needed if target_col not in df_without_control.columns and target_col in df_control.columns: df_control.drop([target_col], axis=1, inplace=True) if interp_col not in df_without_control.columns and interp_col in df_control.columns: df_control.drop([interp_col], axis=1, inplace=True) if description_col not in df_without_control.columns and description_col in df_control.columns: df_control.drop([description_col], axis=1, inplace=True) #If our main df is already a geodataframe, make df_control one too if isinstance(df_without_control, gpd.GeoDataFrame): from w4h import coords2geometry gdf_control = coords2geometry(df_no_geometry=df_control, xcol=xcol, ycol=ycol, zcol=zcol, input_coords_crs=controlpoints_crs, log=log) #Get kwargs passed to pd.concat, and set defaults for ignore_index and join concat_kwargs = {k: v for k, v in locals()['kwargs'].items() if k in inspect.signature(pd.concat).parameters.keys()} if 'ignore_index' not in concat_kwargs.keys(): concat_kwargs['ignore_index'] = True if 'join' not in concat_kwargs.keys(): concat_kwargs['join'] = 'outer' gdf = pd.concat([df_without_control, gdf_control], **concat_kwargs) if controlpoints_crs != output_crs: gdf = gdf.to_crs(output_crs) return gdf
#Define the datatypes for a dataframe
[docs] def define_dtypes(undefined_df, datatypes=None, verbose=False, log=False): """Function to define datatypes of a dataframe, especially with file-indicated dyptes Parameters ---------- undefined_df : pd.DataFrame Pandas dataframe with columns whose datatypes need to be (re)defined datatypes : dict, str, pathlib.PurePath() object, or None, default = None Dictionary containing datatypes, to be used in pandas.DataFrame.astype() function. If None, will read from file indicated by dtype_file (which must be defined, along with dtype_dir), by default None log : bool, default = False Whether to log inputs and outputs to log file. Returns ------- dfout : pandas.DataFrame Pandas dataframe containing redefined columns """ if verbose: verbose_print(define_dtypes, locals(), exclude_params=['undefined_df']) if undefined_df is None: dfout = None else: logger_function(log, locals(), inspect.currentframe().f_code.co_name) dfout = undefined_df.copy() if isinstance(datatypes, pathlib.PurePath) or isinstance(datatypes, str): datatypes = pathlib.Path(datatypes) if not datatypes.exists(): if verbose: print("\tERROR: datatypes file '{}' does not exist, using inferred datatypes.".format(datatypes),) return dfout elif datatypes.is_dir(): if verbose: print('ERROR: datatypes must be either dict or filepath (path to directories not allowed)') return dfout datatypes = read_dict(file=datatypes) for key in datatypes.keys(): if key in dfout.columns: dfout[key] = dfout[key].astype(datatypes[key]) elif isinstance(datatypes, dict): if verbose: print('datatypes is None, not updating datatypes') dfout = dfout.astype(datatypes) else: if verbose: print('ERROR: datatypes must be either dict or a filepath, not {}'.format(type(datatypes))) return dfout return dfout
#Function to setup files of interest
[docs] def file_setup(well_data, metadata=None, data_filename='*ISGS_DOWNHOLE_DATA*.txt', metadata_filename='*ISGS_HEADER*.txt', log_dir=None, verbose=False, log=False): """Function to setup files, assuming data, metadata, and elevation/location are in separate files (there should be one "key"/identifying column consistent across all files to join/merge them later) This function may not be useful if files are organized differently than this structure. If that is the case, it is recommended to use the get_most_recent() function for each individual file if needed. It may also be of use to simply skip this function altogether and directly define each filepath in a manner that can be used by pandas.read_csv() Parameters ---------- well_data : str or pathlib.Path object Str or pathlib.Path to directory containing input files, by default str(repoDir)+'/resources' metadata : str or pathlib.Path object, optional Str or pathlib.Path to directory containing input metadata files, by default str(repoDir)+'/resources' data_filename : str, optional Pattern used by pathlib.glob() to get the most recent data file, by default '*ISGS_DOWNHOLE_DATA*.txt' metadata_filename : str, optional Pattern used by pathlib.glob() to get the most recent metadata file, by default '*ISGS_HEADER*.txt' log_dir : str or pathlib.PurePath() or None, default=None Directory to place log file in. This is not read directly, but is used indirectly by w4h.logger_function() verbose : bool, default = False Whether to print name of files to terminal, by default True log : bool, default = True Whether to log inputs and outputs to log file. Returns ------- tuple Tuple with paths to (well_data, metadata) """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(file_setup, locals(), exclude_params=['well_data']) #Define filepath variables to be used later for reading/writing files data_path = pathlib.Path(well_data) if metadata is None: origMetaPath = None metadata=data_path else: origMetaPath = metadata metadata=pathlib.Path(metadata) #If input path is a directory, find most recent version of the file. If file, just read the file if data_path.is_dir(): downholeDataFILE = get_most_recent(data_path, data_filename, verbose=verbose) else: downholeDataFILE = data_path if metadata.is_dir(): headerDataFILE = get_most_recent(metadata, metadata_filename, verbose=verbose) if headerDataFILE == []: headerDataFILE = downholeDataFILE else: if origMetaPath is None: headerDataFILE = downholeDataFILE else: headerDataFILE = metadata #Set all input as pathlib.Path objects (may be redundant, but just in case) downholeDataPATH = pathlib.Path(downholeDataFILE) headerDataPATH = pathlib.Path(headerDataFILE) if verbose: print('\tUsing the following file(s):') print('\t ', downholeDataFILE) if headerDataFILE != downholeDataFILE: print('\t', headerDataFILE) #Define datatypes, to use later #downholeDataDTYPES = {'ID':np.uint32, "API_NUMBER":np.uint64,"TABLE_NAME":str,"WHO":str,"INTERPRET_DATE":str,"FORMATION":str,"THICKNESS":np.float64,"TOP":np.float64,"BOTTOM":np.float64} #headerDataDTYPES = {'ID':np.uint32,'API_NUMBER':np.uint64,"TDFORMATION":str,"PRODFORM":str,"TOTAL_DEPTH":np.float64,"SECTION":np.float64,"TWP":np.float64,"TDIR":str,"RNG":np.float64,"RDIR":str,"MERIDIAN":np.float64,"FARM_NAME":str,"NSFOOT":np.float64,"NSDIR":str,"EWFOOT":np.float64,"EWDIR":str,"QUARTERS":str,"ELEVATION":np.float64,"ELEVREF":str,"COMP_DATE":str,"STATUS":str,"FARM_NUM":str,"COUNTY_CODE":np.float64,"PERMIT_NUMBER":str,"COMPANY_NAME":str,"COMPANY_CODE":str,"PERMIT_DATE":str,"CORNER":str,"LATITUDE":np.float64,"LONGITUDE":np.float64,"ENTERED_BY":str,"UPDDATE":str,"ELEVSOURCE":str, "ELEV_FT":np.float64} return downholeDataPATH, headerDataPATH
# Gets the current date for use with in code
[docs] def get_current_date(): """ Gets the current date to help with finding the most recent file --------------------- Parameters: None --------------------- Returns: todayDate : datetime object with today's date dateSuffix : str to use for naming output files """ todayDate = datetime.date.today() todayDateStr = str(todayDate) dateSuffix = '_'+todayDateStr return todayDate, dateSuffix
#Function to get most recent file
[docs] def get_most_recent(dir=RESOURCE_DIR, glob_pattern='*', verbose=False): """Function to find the most recent file with the indicated pattern, using pathlib.glob function. Parameters ---------- dir : str or pathlib.Path object, optional Directory in which to find the most recent file, by default str(repoDir)+'/resources' glob_pattern : str, optional String used by the pathlib.glob() function/method for searching, by default '*' Returns ------- pathlib.Path object Pathlib Path object of the most recent file fitting the glob pattern indicated in the glob_pattern parameter. """ if verbose: verbose_print(get_most_recent, locals()) todayDate = datetime.date.today() todayDateStr = str(todayDate) files = pathlib.Path(dir).rglob(glob_pattern) #Get all the files that fit the pattern fileDates = [] for f in files: #Get the file dates from their file modification times fileDates.append(np.datetime64(datetime.datetime.fromtimestamp(os.path.getmtime(f)))) if fileDates == []: #If no files found that match pattern, return an empty pathlib.Path() if verbose: print('No file found in {} matching {} pattern'.format(dir, glob_pattern)) mostRecentFile = pathlib.Path() return mostRecentFile else: globInd = np.argmin(np.datetime64(todayDateStr) - np.array(fileDates)) #Find the index of the most recent file #Iterate through glob/files again (need to recreate glob) files = pathlib.Path(dir).rglob(glob_pattern) for j, f in enumerate(files): if j == globInd: mostRecentFile=f break if verbose: print('Most Recent version of file fitting {} pattern is: {}'.format(glob_pattern, mostRecentFile.name)) return mostRecentFile
# Define the search term filepaths
[docs] def get_search_terms(spec_path=str(repoDir)+'/resources/', spec_glob_pattern='*SearchTerms-Specific*', start_path=None, start_glob_pattern='*SearchTerms-Start*', wildcard_path=None, wildcard_glob_pattern='*SearchTerms-Wildcard', use_tokens=False, verbose=False, log=False): """Read in dictionary files for downhole data Parameters ---------- spec_path : str or pathlib.Path, optional Directory where the file containing the specific search terms is located, by default str(repoDir)+'/resources/' spec_glob_pattern : str, optional Search string used by pathlib.glob() to find the most recent file of interest, uses get_most_recent() function, by default '*SearchTerms-Specific*' start_path : str or None, optional Directory where the file containing the start search terms is located, by default None start_glob_pattern : str, optional Search string used by pathlib.glob() to find the most recent file of interest, uses get_most_recent() function, by default '*SearchTerms-Start*' wildcard_path : str or pathlib.Path, default = None Directory where the file containing the wildcard search terms is located, by default None wildcard_glob_pattern : str, default = '*SearchTerms-Wildcard' Search string used by pathlib.glob() to find the most recent file of interest, uses get_most_recent() function, by default '*SearchTerms-Wildcard*' log : bool, default = True Whether to log inputs and outputs to log file. Returns ------- (specTermsPath, startTermsPath, wilcardTermsPath) : tuple Tuple containing the pandas dataframes with specific search terms, with start search terms, and with wildcard search terms """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(get_search_terms, locals()) # specTermsFile = "SearchTerms-Specific_BedrockOrNo_2022-09.csv" #Specific matches # startTermsFile = "SearchTerms-Start_BedrockOrNo.csv" #Wildcard matches for the start of the description # Exact match path if spec_path is None: specTermsPath = spec_path else: spec_path = pathlib.Path(spec_path) if spec_path.is_dir(): specTermsPath = get_most_recent(spec_path, spec_glob_pattern) else: specTermsPath = spec_path #Startswith path if start_path is None: startTermsPath = start_path else: start_path = pathlib.Path(start_path) if start_path.is_dir(): startTermsPath = get_most_recent(start_path, start_glob_pattern) else: startTermsPath = start_path #Wildcard Path if wildcard_path is None: wilcardTermsPath = wildcard_path else: wildcard_path = pathlib.Path(wildcard_path) if wildcard_path.is_dir(): wilcardTermsPath = get_most_recent(wildcard_path, wildcard_glob_pattern) else: wilcardTermsPath = wildcard_path return specTermsPath, startTermsPath, wilcardTermsPath
#Read dictionary file into dictionary variable
[docs] def read_dict(file, keytype='np'): """Function to read a text file with a dictionary in it into a python dictionary Parameters ---------- file : str or pathlib.Path object Filepath to the file of interest containing the dictionary text keytype : str, optional String indicating the datatypes used in the text, currently only 'np' is implemented, by default 'np' Returns ------- dict Dictionary translated from text file. """ if pathlib.Path(file).suffix == '.json': with open(file, 'r') as f: jsDict = json.load(f) return jsDict with open(file, 'r') as f: data= f.read() jsDict = json.loads(data) if keytype=='np': for k in jsDict.keys(): jsDict[k] = getattr(np, jsDict[k]) return jsDict
#Read files into pandas dataframes
[docs] def read_dictionary_terms(dict_file=None, id_col='ID', search_col='DESCRIPTION', definition_col='LITHOLOGY', class_flag_col='CLASS_FLAG', dictionary_type=None, class_flag=6, rem_extra_cols=True, verbose=False, log=False): """Function to read dictionary terms from file into pandas dataframe Parameters ---------- dict_file : str or pathlib.Path object, or list of these File or list of files to be read search_col : str, default = 'DESCRIPTION' Name of column containing search terms (geologic formations) definition_col : str, default = 'LITHOLOGY' Name of column containing interpretations of search terms (lithologies) dictionary_type : str or None, {None, 'exact', 'start', 'wildcard',} Indicator of which kind of dictionary terms to be read in: None, 'exact', 'start', or 'wildcard' by default None. - If None, uses name of file to try to determine. If it cannot, it will default to using the classification flag from class_flag - If 'exact', will be used to search for exact matches to geologic descriptions - If 'start', will be used as with the .startswith() string method to find inexact matches to geologic descriptions - If 'wildcard', will be used to find any matching substring for inexact geologic matches class_flag : int, default = 1 Classification flag to be used if dictionary_type is None and cannot be otherwise determined, by default 1 rem_extra_cols : bool, default = True Whether to remove the extra columns from the input file after it is read in as a pandas dataframe, by default True log : bool, default = False Whether to log inputs and outputs to log file. Returns ------- dict_terms : pandas.DataFrame Pandas dataframe with formatting ready to be used in the classification steps of this package """ if dict_file is None: dict_file=w4h.get_resources()['LithologyDict_Exact'] logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(read_dictionary_terms, locals()) #Read files into pandas dataframes dict_terms = [] #if dict_file is None: # dict_file = get_resources()['LithologyDict_Exact'] # df = pd.DataFrame(columns=['ID', 'DESCRIPTION', 'LITHOLOGY', 'CLASS_FLAGS']) # dict_terms.append(df) # dict_file = [''] if isinstance(dict_file, (tuple, list)): for i, f in enumerate(dict_file): if not f.exists(): df = pd.DataFrame(columns=['ID', 'DESCRIPTION', 'LITHOLOGY', 'CLASS_FLAGS']) dict_terms.append(df) else: dict_terms.append(pd.read_csv(f)) if id_col in dict_terms[i].columns: dict_terms[i].set_index(id_col, drop=True, inplace=True) else: if dict_file is None: dict_file = w4h.get_resources()['LithologyDict_Exact'] dict_file = pathlib.Path(dict_file) if dict_file.exists() and dict_file.is_file(): dict_terms.append(pd.read_csv(dict_file, low_memory=False)) if id_col in dict_terms[-1].columns: dict_terms[-1].set_index(id_col, drop=True, inplace=True) dict_file = [dict_file] else: print(f'ERROR: dict_file ({dict_file}) does not exist.') #Create empty dataframe to return dict_terms = pd.DataFrame(columns=['ID', 'DESCRIPTION', 'LITHOLOGY', "CLASS_FLAGS"]) return dict_terms #Rename important columns searchTermList = ['searchterm', 'search', 'exact'] startTermList = ['startterm', 'start', 'startswith'] wildcardTermList = ['wildcard', 'substring', ] #Recast all columns to datatypes of headerData to defined types dict_termDtypes = {search_col:str, definition_col:str, class_flag_col:np.uint8} if dictionary_type is None: dictionary_type='' #Allow string methods on this variable #Iterating, to allow reading of multiple dict file at once (also works with just one at at time) for i, d in enumerate(dict_terms): if dictionary_type.lower() in searchTermList or (dictionary_type=='' and 'spec' in str(dict_file[i]).lower()): d[class_flag_col] = 1 elif dictionary_type.lower() in startTermList or (dictionary_type=='' and 'start' in str(dict_file[i]).lower()): d[class_flag_col] = 4 #Start term classification flag elif dictionary_type.lower() in wildcardTermList or (dictionary_type=='' and 'wildcard' in str(dict_file[i]).lower()): d[class_flag_col] = 5 #Wildcard term classification flag else: d[class_flag_col] = class_flag #Custom classification flag, defined as argument #1: exact classification match, 2: (not defined...ML?), 3: bedrock classification for obvious bedrock, 4: start term, 5: wildcard/substring, 6: Undefined #Rename columns so it is consistent through rest of code if search_col != 'DESCRIPTION': d.rename(columns={search_col:'DESCRIPTION'}, inplace=True) if definition_col != 'LITHOLOGY': d.rename(columns={definition_col:'LITHOLOGY'}, inplace=True) if class_flag_col != 'CLASS_FLAG': d.rename(columns={class_flag_col:'CLASS_FLAG'}, inplace=True) #Cast all columns as type str, if not already for i in range(0, np.shape(d)[1]): if d.iloc[:,i].name in list(dict_termDtypes.keys()): d.iloc[:,i] = d.iloc[:,i].astype(dict_termDtypes[d.iloc[:,i].name]) #Delete duplicate definitions d.drop_duplicates(subset='DESCRIPTION',inplace=True) #Apparently, there are some duplicate definitions, which need to be deleted first d.reset_index(inplace=True, drop=True) #Whether to remove extra columns that aren't needed from dataframe if rem_extra_cols: d = d[['DESCRIPTION', 'LITHOLOGY', 'CLASS_FLAG']] #If only one file designated, convert it so it's no longer a list, but just a dataframe if len(dict_terms) == 1: dict_terms = dict_terms[0] return dict_terms
#Function to read lithology file into pandas dataframe
[docs] def read_lithologies(lith_file=None, interp_col='LITHOLOGY', target_col='CODE', use_cols=None, verbose=False, log=False): """Function to read lithology file into pandas dataframe Parameters ---------- lith_file : str or pathlib.Path object, default = None Filename of lithology file. If None, default is contained within repository, by default None interp_col : str, default = 'LITHOLOGY' Column to used to match interpretations target_col : str, default = 'CODE' Column to be used as target code use_cols : list, default = None Which columns to use when reading in dataframe. If None, defaults to ['LITHOLOGY', 'CODE']. log : bool, default = True Whether to log inputs and outputs to log file. Returns ------- pandas.DataFrame Pandas dataframe with lithology information """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(read_lithologies, locals()) if lith_file is None: #Find resources import w4h lith_file= w4h.get_resources()['LithInterps_FineCoarse'] if not isinstance(lith_file, pathlib.PurePath): lith_file = pathlib.Path(lith_file) if use_cols is None: use_cols = [interp_col, target_col] lithoDF = pd.read_csv(lith_file, usecols=use_cols) lithoDF.rename(columns={interp_col:'INTERPRETATION', target_col:'TARGET'}, inplace=True) return lithoDF
#Read raw data by text
[docs] def read_raw_csv(data_filepath, metadata_filepath, data_cols=None, metadata_cols=None, xcol='LONGITUDE', ycol='LATITUDE', well_key='API_NUMBER', encoding='latin-1', verbose=False, log=False, **read_csv_kwargs): """Easy function to read raw .txt files output from (for example), an Access database Parameters ---------- data_filepath : str Filename of the file containing data, including the extension. metadata_filepath : str Filename of the file containing metadata, including the extension. data_cols : list, default = None List with strings with names of columns from txt file to keep after reading. If None, ["API_NUMBER","TABLE_NAME","FORMATION","THICKNESS","TOP","BOTTOM"], by default None. metadata_cols : list, default = None List with strings with names of columns from txt file to keep after reading. If None, ['API_NUMBER',"TOTAL_DEPTH","SECTION","TWP","TDIR","RNG","RDIR","MERIDIAN","QUARTERS","ELEVATION","ELEVREF","COUNTY_CODE","LATITUDE","LONGITUDE","ELEVSOURCE"], by default None x_col : str, default = 'LONGITUDE' Name of column in metadata file indicating the x-location of the well, by default 'LONGITUDE' ycol : str, default = 'LATITUDE' Name of the column in metadata file indicating the y-location of the well, by default 'LATITUDE' well_key : str, default = 'API_NUMBER' Name of the column with the key/identifier that will be used to merge data later, by default 'API_NUMBER' encoding : str, default = 'latin-1' Encoding of the data in the input files, by default 'latin-1' verbose : bool, default = False Whether to print the number of rows in the input columns, by default False log : bool, default = False Whether to log inputs and outputs to log file. **read_csv_kwargs **kwargs that get passed to pd.read_csv() Returns ------- (pandas.DataFrame, pandas.DataFrame/None) Tuple/list with two pandas dataframes: (well_data, metadata) metadata is None if only well_data is used """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(read_raw_csv, locals()) #Check if input data is already dataframe, otherwise, read it in as dataframe if not isinstance(data_filepath, pd.DataFrame) or isinstance(data_filepath, gpd.GeoDataFrame): downholeDataIN = pd.read_csv(data_filepath, sep=',', header='infer', encoding=encoding, **read_csv_kwargs) if data_cols is None: data_useCols = downholeDataIN.columns elif data_cols == 'auto': data_useCols = ["API_NUMBER","TABLE_NAME","FORMATION","THICKNESS","TOP","BOTTOM"] else: data_useCols= data_cols downholeDataIN = downholeDataIN[data_useCols] if metadata_filepath is None: headerDataIN = None elif not isinstance(metadata_filepath, pd.DataFrame) or isinstance(metadata_filepath, gpd.GeoDataFrame): headerDataIN = pd.read_csv(metadata_filepath, sep=',', header='infer', encoding=encoding, **read_csv_kwargs) else: headerDataIN = None if metadata_cols is None and headerDataIN is not None: metadata_useCols = headerDataIN.columns elif metadata_cols == 'auto': metadata_useCols = ['API_NUMBER',"TOTAL_DEPTH","SECTION","TWP","TDIR","RNG","RDIR","MERIDIAN","QUARTERS","ELEVATION","ELEVREF","COUNTY_CODE","LATITUDE","LONGITUDE","ELEVSOURCE"] else: metadata_useCols= metadata_cols if headerDataIN is not None: headerDataIN = headerDataIN[metadata_useCols] #Drop data with no API downholeDataIN = downholeDataIN.dropna(subset=[well_key]) #Drop data with no API if headerDataIN is not None: headerDataIN = headerDataIN.dropna(subset=[well_key]) #Drop metadata with no API #Drop data with no or missing location information headerDataIN = headerDataIN.dropna(subset=[ycol]) headerDataIN = headerDataIN.dropna(subset=[xcol]) #Reset index so index goes from 0 in numerical/integer order headerDataIN.reset_index(inplace=True, drop=True) else: #***UPDATE: Need to make sure these columns exist in this case*** #Drop data with no or missing location information downholeDataIN = downholeDataIN.dropna(subset=[ycol]) downholeDataIN = downholeDataIN.dropna(subset=[xcol]) downholeDataIN.reset_index(inplace=True, drop=True) #Print outputs to terminal, if designated if verbose: print('\tData file has ' + str(downholeDataIN.shape[0])+' valid well records.') if headerDataIN is not None: print("Metadata file has "+str(headerDataIN.shape[0])+" unique wells with valid location information.") return downholeDataIN, headerDataIN
#Read file with xyz data
[docs] def read_xyz(xyzpath, datatypes=None, verbose=False, log=False): """Function to read file containing xyz data (elevation/location) Parameters ---------- xyzpath : str or pathlib.Path Filepath of the xyz file, including extension datatypes : dict, default = None Dictionary containing the datatypes for the columns int he xyz file. If None, {'ID':np.uint32,'API_NUMBER':np.uint64,'LATITUDE':np.float64,'LONGITUDE':np.float64,'ELEV_FT':np.float64}, by default None verbose : bool, default = False Whether to print the number of xyz records to the terminal, by default False log : bool, default = False Whether to log inputs and outputs to log file. Returns ------- pandas.DataFrame Pandas dataframe containing the elevation and location data """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(read_xyz, locals()) if datatypes is None: xyzDTypes = {'ID':np.uint32,'API_NUMBER':np.uint64,'LATITUDE':np.float64,'LONGITUDE':np.float64,'ELEV_FT':np.float64} xyzDataIN = pd.read_csv(xyzpath, sep=',', header='infer', dtype=xyzDTypes, index_col='ID') if verbose: print('XYZ file has ' + str(xyzDataIN.shape[0])+' records with elevation and location.') return xyzDataIN