Source code for w4h.classify

"""The Classify module contains functions for defining geological intervals into a preset subset of geologic interpretations.
"""

import datetime
import inspect
import re
import string

import dask
import pandas as pd
import numpy as np

from w4h import logger_function, verbose_print
#The following flags are used to mark the classification method:
#- 0: Not classified
#- 1: Specific Search Term Match
#- 2: wPermits bedrock top pick
#- 3: Intervals >550' below ground surface
#- 4: Wildcard match (startTerm) - no context
#- 5: Wildcard match (any substring) - more liberal
#- Top of well?


#Load Data
#Preprocess Text
#Tokenization
#Lemmatization
#POS Tagging
#Named Entity Recognition (NER)
#Text Complexity Analysis
#Text Classification
#Evaluate and Refine
#Save Results

try:
    nlp = spacy.load("en_core_web_sm")
except Exception:
    pass


# Not active, in progress
def _analyze_complexity():
    return


# Not active, in progress
def _classify_description_type():
    return


# Define well intervals by depth
[docs] def depth_define(df, top_col='TOP', thresh=550.0, parallel_processing=False, verbose=False, log=False): """Function to define all intervals lower than thresh as bedrock Parameters ---------- df : pandas.DataFrame Dataframe to classify top_col : str, default = 'TOP' Name of column that contains the depth information, likely of the top of the well interval, by default 'TOP' thresh : float, default = 550.0 Depth (in units used in df['top_col']) below which all intervals will be classified as bedrock, by default 550.0. verbose : bool, default = False Whether to print results, by default False log : bool, default = True Whether to log results to log file Returns ------- df : pandas.DataFrame Dataframe containing intervals classified as bedrock due to depth """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(depth_define, locals(), exclude_params=['df']) df = df.copy() df['CLASS_FLAG'] = df['CLASS_FLAG'].mask(df[top_col] > thresh, 3) # Add a Classification Flag of 3 (bedrock b/c it's deepter than 550') to all records where the top of the interval is >550' df['BEDROCK_FLAG'] = df['BEDROCK_FLAG'].mask(df[top_col] > thresh, True) if verbose: total = df.shape[0] if parallel_processing: print("numRecsClass") numRecsClass = int(df[df['CLASS_FLAG']==3]['CLASS_FLAG'].sum().compute()) if total.compute() > 0: print("Computing percRecsClass") percRecsClass = round((numRecsClass / total.compute())*100,2) print("Computing recsRemaining") recsRemainig = df['CLASS_FLAG'].isna().sum().compute() else: percRecsClass = 0 recsRemainig = 0 else: numRecsClass = int(df[df['CLASS_FLAG']==3]['CLASS_FLAG'].sum()) if total > 0: percRecsClass = round((numRecsClass / total)*100,2) recsRemainig = df['CLASS_FLAG'].isna().sum() else: percRecsClass = 0 recsRemainig = 0 print('\tClassified bedrock well records using depth threshold at depth of {}'.format(thresh)) print("\t\t{} records classified using bedrock threshold depth ({}% of unclassified data)".format(numRecsClass, percRecsClass)) print(f'\t\t{recsRemainig} records remain unclassified ({100-percRecsClass}% of unclassified data).') return df
# Output data that still needs to be defined
[docs] def export_undefined(df, outdir): """Function to export terms that still need to be defined. Parameters ---------- df : pandas.DataFrame Dataframe containing at least some unclassified data outdir : str or pathlib.Path Directory to save file. Filename will be generated automatically based on today's date. Returns ------- stillNeededDF : pandas.DataFrame Dataframe containing only unclassified terms, and the number of times they occur """ import pathlib if isinstance(outdir, pathlib.PurePath): if not outdir.is_dir() or not outdir.exists(): print('Please specify a valid directory for export. Filename is generated automatically.') return outdir = outdir.as_posix() else: outdir.replace('\\','/') outdir.replace('\\'[-1], '/') #Get directory path correct if outdir[-1] != '/': outdir = outdir+'/' todayDate = datetime.date.today() todayDateStr = str(todayDate) searchDF = df[df['CLASS_FLAG'].isna()] stillNeededDF=searchDF['FORMATION'].value_counts() stillNeededDF.to_csv(outdir+'Undefined_'+todayDateStr+'.csv') return stillNeededDF
# Fill in unclassified rows' flags with 0
[docs] def fill_unclassified(df, classification_col='CLASS_FLAG'): """Fills unclassified rows in 'CLASS_FLAG' column with np.nan Parameters ---------- df : pandas.DataFrame Dataframe on which to perform operation Returns ------- df : pandas.DataFrame Dataframe on which operation has been performed """ df[classification_col] = df[classification_col].fillna(0) return df
# not active, in progress def _evaluate_classification(): return # Function to get unique wells
[docs] def get_unique_wells(df, wellid_col='API_NUMBER', verbose=False, log=False): """Gets unique wells as a dataframe based on a given column name. Parameters ---------- df : pandas.DataFrame Dataframe containing all wells and/or well intervals of interest wellid_col : str, default="API_NUMBER" Name of column in df containing a unique identifier for each well, by default 'API_NUMBER'. .unique() will be run on this column to get the unique values. log : bool, default = False Whether to log results to log file Returns ------- wellsDF DataFrame containing only the unique well IDs """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(get_unique_wells, locals(), exclude_params=['df']) #Get Unique well APIs uniqueWells = df[wellid_col].unique() wellsDF = pd.DataFrame(uniqueWells) if verbose: print('Number of unique wells: '+str(wellsDF.shape[0])) wellsDF.columns = ['UNIQUE_ID'] return wellsDF
# Not active, in progress def _lemmatize(): return # Merge lithologies to main df based on classifications
[docs] def merge_lithologies(well_data_df, targinterps_df, interp_col='INTERPRETATION', target_col='TARGET', target_class='bool'): """Function to merge lithologies and target booleans based on classifications Parameters ---------- well_data_df : pandas.DataFrame Dataframe containing classified well data targinterps_df : pandas.DataFrame Dataframe containing lithologies and their target interpretations, depending on what the target is for this analysis (often, coarse materials=1, fine=0) target_col : str, default = 'TARGET' Name of column in targinterps_df containing the target interpretations target_class, default = 'bool' Whether the input column is using boolean values as its target indicator Returns ------- df_targ : pandas.DataFrame Dataframe containing merged lithologies/targets """ #by default, use the boolean input if target_class=='bool': targinterps_df[target_col] = targinterps_df[target_col].where(targinterps_df[target_col] == '1', other='0').astype(int) targinterps_df[target_col] = targinterps_df[target_col].fillna(value=0) else: targinterps_df[target_col] = targinterps_df[target_col].replace('DoNotUse', value=-1) targinterps_df[target_col] = targinterps_df[target_col].fillna(value=-2) targinterps_df[target_col].astype(np.int8) df_targ = well_data_df.merge(right=targinterps_df.set_index(interp_col), right_on=interp_col, left_on="LITHOLOGY", how='left') #df_targ = pd.merge(well_data_df, targinterps_df.set_index(interp_col), right_on=interp_col, left_on='LITHOLOGY', how='left') return df_targ
# Not active, in progress def _named_entity_recognition(): return # Not active, in progress def _preprocess_nlp(df, description_col="FORMATION", nlp_model_size='small', remove_puncuation=True, **kwargs): nlpSmallList = ['small', 's'] nlpMedList = ['medium', 'med', 'md', 'm'] nlpLargeList = ['large', 'lg', 'l'] nlpTransList = ['transformer', 'trans', 'tr', 't'] replace_str = "" if remove_puncuation: replace_str = r"[^\w\s]" df[f"PREPROCESSED_{description_col}"] = df[description_col].str.lower().str.replace(replace_str, '') return df # Merge data back together
[docs] def remerge_data(classifieddf, searchdf, parallel_processing=False): """Function to merge newly-classified (or not) and previously classified data Parameters ---------- classifieddf : pandas.DataFrame Dataframe that had already been classified previously searchdf : pandas.DataFrame Dataframe with new classifications Returns ------- remergeDF : pandas.DataFrame Dataframe containing all the data, merged back together """ if parallel_processing: remergeDF = dask.dataframe.concat([classifieddf,searchdf], join='inner').reset_index() else: remergeDF = pd.concat([classifieddf,searchdf], join='inner').sort_index() return remergeDF
# Define records with full search term
[docs] def specific_define(df, terms_df, description_col='FORMATION', terms_col='DESCRIPTION', parallel_processing=False, verbose=False, log=False): """Function to classify terms that have been specifically defined in the terms_df. Parameters ---------- df : pandas.DataFrame Input dataframe with unclassified well descriptions. terms_df : pandas.DataFrame Dataframe containing the classifications description_col : str, default='FORMATION' Column name in df containing the well descriptions, by default 'FORMATION'. terms_col : str, default='DESCRIPTION' Column name in terms_df containing the classified descriptions, by default 'DESCRIPTION'. verbose : bool, default=False Whether to print up results, by default False. Returns ------- df_Interps : pandas.DataFrame Dataframe containing the well descriptions and their matched classifications. """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(specific_define, locals(), exclude_params=['df', 'terms_df']) if description_col != terms_col: terms_df = terms_df.rename(columns={terms_col:description_col}) terms_col = description_col df[description_col] = df[description_col].astype(str) terms_df[terms_col] = terms_df[terms_col].astype(str) df[description_col] = df[description_col].str.casefold() terms_df[terms_col] = terms_df[terms_col].str.casefold() #df['FORMATION'] = df['FORMATION'].str.strip(['.,:?\t\s']) #terms_df['FORMATION'] = terms_df['FORMATION'].str.strip(['.,:?\t\s']) terms_df = terms_df.drop_duplicates(subset=terms_col, keep='last') terms_df = terms_df.reset_index(drop=True) terms_df['CLASS_FLAG'] = 1 # Preset column to equal 1 df_Interps = df.merge(right=terms_df.set_index(terms_col), left_on=description_col, right_on=terms_col, how='inner') #df_Interps = pd.merge(left=df, right=terms_df.set_index(terms_col), on=description_col, how='left') df_Interps = df_Interps.rename(columns={description_col:'FORMATION'}) df_Interps['BEDROCK_FLAG'] = df_Interps['LITHOLOGY'] == 'BEDROCK' if verbose: totRecords = df_Interps.shape[0] if parallel_processing: numRecsClass = int(df_Interps['CLASS_FLAG'].eq(1).sum().compute()) recsRemainig = int(df_Interps['CLASS_FLAG'].isna().sum().compute()) percRecsClass= round(( numRecsClass / totRecords.compute())*100, 2) else: numRecsClass = int(df_Interps[df_Interps['CLASS_FLAG']==1]['CLASS_FLAG'].sum()) recsRemainig = df_Interps['CLASS_FLAG'].isna().sum() percRecsClass= round(( numRecsClass / totRecords)*100, 2) print('\tClassified well records using exact matches') print("\t\t{} records classified using exact matches ({}% of unclassified data)".format(numRecsClass, percRecsClass)) print('\t\t{} records remain unclassified ({}% of unclassified data).'.format(recsRemainig, 100-percRecsClass)) return df_Interps
# Split dataframe into records that have been defined v those that have not
[docs] def split_defined(df, classification_col='CLASS_FLAG', verbose=False, log=False): """Function to split dataframe with well descriptions into two dataframes based on whether a row has been classified. Parameters ---------- df : pandas.DataFrame Dataframe containing all the well descriptions classification_col : str, default = 'CLASS_FLAG' Name of column containing the classification flag, by default 'CLASS_FLAG' verbose : bool, default = False Whether to print results, by default False log : bool, default = False Whether to log results to log file Returns ------- Two-item tuple of pandas.Dataframe tuple[0] is dataframe containing classified data, tuple[1] is dataframe containing unclassified data. """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) searchDF = df[df[classification_col].isna()] # Unclassified data classifedDF = df[df[classification_col].notnull()] # Already-classifed data return classifedDF, searchDF
#Quickly sort dataframe
[docs] def sort_dataframe(df, sort_cols=['API_NUMBER', 'TOP'], remove_nans=True): """Function to sort dataframe by one or more columns. Parameters ---------- df : pandas.DataFrame Dataframe to be sorted sort_cols : str or list of str, default = ['API_NUMBER','TOP'] Name(s) of columns by which to sort dataframe, by default ['API_NUMBER','TOP'] remove_nans : bool, default = True Whether or not to remove nans in the process, by default True Returns ------- df_sorted : pandas.DataFrame Sorted dataframe """ #Sort columns for better processing later df_sorted = df.sort_values(sort_cols) df_sorted.reset_index(inplace=True, drop=True) if remove_nans: df_sorted = df_sorted[pd.notna(df_sorted["LITHOLOGY"])] return df_sorted
# Classify downhole data by the initial substring
[docs] def start_define(df, terms_df, description_col='FORMATION', terms_col='DESCRIPTION', parallel_processing=False, verbose=False, log=False): """Function to classify descriptions according to starting substring. Parameters ---------- df : pandas.DataFrame Dataframe containing all the well descriptions terms_df : pandas.DataFrame Dataframe containing all the startswith substrings to use for searching description_col : str, default = 'FORMATION' Name of column in df containing descriptions, by default 'FORMATION' terms_col : str, default = 'FORMATION' Name of column in terms_df containing startswith substring to match with description_col, by default 'FORMATION' verbose : bool, default = False Whether to print out results, by default False log : bool, default = True Whether to log results to log file Returns ------- df : pandas.DataFrame Dataframe containing the original data and new classifications """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(start_define, locals(), exclude_params=['df', 'terms_df']) #if verbose: # #Estimate when it will end, based on test run # estTime = df.shape[0]/3054409 * 6 #It took about 6 minutes to classify data with entire dataframe. This estimates the fraction of that it will take # nowTime = datetime.datetime.now() # endTime = nowTime+datetime.timedelta(minutes=estTime) # print("Start Term process should be done by {:d}:{:02d}".format(endTime.hour, endTime.minute)) #First, for each startterm, find all results in df that start with, add classification flag, and add interpretation. for i,s in enumerate(terms_df[terms_col]): df['CLASS_FLAG'].where(~df[description_col].str.startswith(s,na=False),4,inplace=True) df['LITHOLOGY'].where(~df[description_col].str.startswith(s,na=False),terms_df.loc[i,'LITHOLOGY'],inplace=True) df['BEDROCK_FLAG'].loc[df["LITHOLOGY"] == 'BEDROCK'] if verbose: if parallel_processing: numRecsClass = int(df[df['CLASS_FLAG']==4]['CLASS_FLAG'].sum().compute()) percRecsClass= round((numRecsClass/df.shape[0].compute())*100,2) recsRemainig = df['CLASS_FLAG'].isna().sum().compute() else: numRecsClass = int(df[df['CLASS_FLAG']==4]['CLASS_FLAG'].sum()) percRecsClass= round((numRecsClass/df.shape[0])*100,2) recsRemainig = df['CLASS_FLAG'].isna().sum() print('\tClassified well records using initial substring matches') print("\t\t{} records classified using initial substring matches ({}% of unclassified data)".format(numRecsClass, percRecsClass)) print('\t\t{} records remain unclassified ({}% of unclassified data).'.format(recsRemainig, 100-percRecsClass)) return df
# Not active, in progress def _tag_parts_of_speech(): return # Not active, in progress def _tokenize(): return # Classify downhole data by any substring
[docs] def wildcard_define(df, terms_df, description_col='FORMATION', terms_col='DESCRIPTION', verbose=False, log=False): """Function to classify descriptions according to any substring. Parameters ---------- df : pandas.DataFrame Dataframe containing all the well descriptions terms_df : pandas.DataFrame Dataframe containing all the startswith substrings to use for searching description_col : str, default = 'FORMATION' Name of column in df containing descriptions, by default 'FORMATION' terms_col : str, default = 'FORMATION' Name of column in terms_df containing startswith substring to match with description_col, by default 'FORMATION' verbose : bool, default = False Whether to print out results, by default False log : bool, default = True Whether to log results to log file Returns ------- df : pandas.DataFrame Dataframe containing the original data and new classifications """ logger_function(log, locals(), inspect.currentframe().f_code.co_name) if verbose: verbose_print(wildcard_define, locals(), exclude_params=['df', 'terms_df']) #if verbose: # #Estimate when it will end, based on test run # estTime = df.shape[0]/3054409 * 6 #It took about 6 minutes to classify data with entire dataframe. This estimates the fraction of that it will take # nowTime = datetime.datetime.now() # endTime = nowTime+datetime.timedelta(minutes=estTime) # print("Wildcard Term process should be done by (?) {:d}:{:02d}".format(endTime.hour, endTime.minute)) #First, for each startterm, find all results in df that start with, add classification flag, and add interpretation. for i,s in enumerate(terms_df[terms_col]): df['CLASS_FLAG'].where(~df[description_col].str.contains(s, case=False, regex=False, na=False), 5, inplace=True) df['LITHOLOGY'].where(~df[description_col].str.contains(s, case=False, regex=False, na=False),terms_df.loc[i,'LITHOLOGY'],inplace=True) df['BEDROCK_FLAG'].loc[df["LITHOLOGY"] == 'BEDROCK'] if verbose: totRecs = df.shape[0] numRecsClass = int(df[df['CLASS_FLAG']==5]['CLASS_FLAG'].sum()) percRecsClass= round((numRecsClass / totRecs)*100, 2) recsRemainig = df['CLASS_FLAG'].isna().sum() print('\tClassified well records using any substring (wildcard) match') print("\t\t{} records classified using any substring match ({}% of unclassified data)".format(numRecsClass, percRecsClass)) print(f'\t\t{recsRemainig} records remain unclassified ({100-percRecsClass}% of unclassified data).') return df