ООП-код Python для определения колен в данных

Контекст (необязательно)

В моей работе мы заинтересованы в выявлении конкретных событий, которые могут произойти во время деградации физической системы в течение ее срока службы. В частности, они называются:

  • Колено начало (начало фазы нелинейной деградации)
  • Точка колена (точка, после которой происходит ускоренная деградация)
  • Конец срока службы (когда он достигает 80% от первоначального значения).

с примером, показанным здесь:
capacity_fade_example

Доступны дополнительные описания контекста и методов здесь и здесь.

Код

Это мой первый «настоящий» проект ООП, помимо простых иллюстративных примеров, обычно встречающихся в учебных пособиях, поэтому я очень приветствую любые отзывы, конструктивную критику, советы и т. Д., Которые помогут мне улучшить и учиться. Насколько я тестировал, код в настоящее время работает правильно.

Поскольку это то, что будет часто использоваться в разных проектах, я намеревался собрать весь необходимый код в класс. Это предотвращает несоответствия из-за изменений, вносимых в результате копирования и вставки кода. Можно создать экземпляр и выполнить всю необходимую логику, используя только 2 или 3 вызова методов. Я попытался упростить его расширение (например, добавление новых методов идентификации или включение новых источников данных).

Вот код для создания экземпляра класса и вызова его методов

kf = KneeFinder(cycles, capacities, truncate=True, mode="knee")
kf.set_params_using_dict(params_dict, src="https://codereview.stackexchange.com/questions/257734/severson", data_type="capacity")
kf.find_onset_and_point()
kf.find_eol()

Вот код класса

import numpy as np
from sklearn.isotonic import IsotonicRegression
from scipy.optimize import curve_fit
from scipy.signal import medfilt
import warnings
import matplotlib.pyplot as plt

class KneeFinder():
    '''
    Write docstring - Find out what it should contain for the class
    '''
    
    def __init__(self, cycles, y, truncate=False, mode="knee"):
        self.cycles = cycles
        self.y = y
        self.truncate = truncate # whether or not to truncate using sigmoid fit
        self.mode = mode # knee or elbow
        self.point = None
        self.onset = None
        self.eol_reached = False
        self.eol_cycle = None
        
        # Set the fitting parameters to their default values
        self.reset_all_parameters()
        # Get a continuous array of integer cycle numbers
        self.x_cont = self.process_cycles_array(self.cycles)
    
    
    # Methods
    # Set up the models - should these 4 be staticmethods?
    def _asym_sigmoidal(self, x, a, b, c, d, m):
        ''' Formula for asymmetric sigmoidal function '''
        # Ignore the warnings about overflow in power
        warnings.filterwarnings(
            action='ignore',
            message="overflow encountered in power",
            module=r'.*new_knee_finder')
        return d + ((a - d) / (1 + (x / c) ** b) ** m)
    
    
    def _bw_func(self, x, b0, b1, b2, cp):
        ''' Formula for the single Bacon-Watts model'''
        return b0 + b1*(x-cp) + b2*(x-cp)*np.tanh((x-cp)/1e-8)
    
    
    def _bw2_func(self, x, b0, b1, b2, b3, cp, co):
        ''' Formula for the double Bacon-Watts model'''
        return b0 + b1*(x-cp) + b2*(x-cp)*np.tanh((x-cp)/1e-8) + b3*(x-co)*np.tanh((x-co)/1e-8)

    
    def _exponential(self, x, a, b, c, d, theta):
        ''' Formula for the line plus exponential model'''
        # Ignore the warnings about overflow in power
        warnings.filterwarnings(
            action='ignore',
            message="overflow encountered in multiply",
            module=r'.*new_knee_finder')
        return d*np.exp(a*x - b) + c + theta*x
    
    
    def find_onset_and_point(self):
        '''
        Write docstring
        '''
        # Get the monotonic fit to the experimental data
        self.mon_data = self.fit_monotonic(x_cont=self.x_cont,
                                           x_data=self.cycles,
                                           y_data=self.y)
        
        # Fit asym_sigmoid to the monotonic data if self.truncate==True       
        if self.truncate:
            # Try to fit the sigmoid, but this may fail with a RuntimeError
            # telling us that the optimal parameters were not found.
            try:
                self.sig_fit = self.fit_sigmoid(x_data=self.x_cont,
                                                 y_data=self.mon_data)
                
                self.indices = self.get_truncated_indices(data=self.sig_fit)
                
            except RuntimeError as err:
                error_msg = str(err)
                print(error_msg)
                # Check for the specific error message about fitting
                if 'Optimal parameters not found' in error_msg:
                    print("Can't fit sigmoid for truncation. Skipping")
                # Since we can't fit the asym_sigmoid for truncation,
                # set self.indices as if self.truncate==False
                self.indices = np.arange(len(self.mon_data))
        else:
            self.indices = np.arange(len(self.mon_data))
        
        # Fit line_exponential to the (normal/truncated) monotonic
        self.exp_fit = self.fit_line_exp(x_data=self.x_cont,
                                         y_data=self.mon_data,
                                         indices=self.indices)
        
        # Fit BW and double BW to exp_fit
        self.point, self.bw_fit = self.fit_bacon_watts(x_cont=self.x_cont,
                                                       indices=self.indices,
                                                       y_data=self.exp_fit)
        
        self.onset, self.bw2_fit = self.fit_double_bacon_watts(x_cont=self.x_cont,
                                                               indices=self.indices,
                                                               y_data=self.exp_fit)
        
        
        # Find the indices of the values in x_cont that are closest to the
        # cycle numbers for onset and point, as computed using BW and double BW
        onset_idx = np.argmin(np.abs(self.x_cont - int(self.onset)))
        point_idx = np.argmin(np.abs(self.x_cont - int(self.point)))
        # Get the y value (capacity/IR) corresponding to onset and point,
        # using mon_data
        self.onset_y = self.mon_data[onset_idx]
        self.point_y = self.mon_data[point_idx]
        
        # Include a return statement so you can use line-profiler to assess running time
        return
    
    
    def find_eol(self):
        '''
        Determine whether or not end of life (EOL) is reached,
        and if so, at which cycle.
        
        Use the monotonic fit to check for EOL. This is because:
            1. The values are within the range of experimental values. This
                is not the case if you were to extend the line_exp fit past 
                the truncation point. It decreases very rapidly, so you are
                almost sure to find an EOL, even though it doesn't occur in
                the actual experimental data.
            2. Putting (1) aside, if you find an EOL using the truncated or
                un-truncated line_exp fit, there is often a mismatch between
                the cycle number at which line_exp reaches 80% and the cycle
                number at which the monotonic fit reaches 80%. The monotonic
                fit, being essentially linear interpolation between points,
                much more closely fits the experimental data, so the EOL cycle
                number identified using monotonic fit is more reliable.
            3. If you use the truncated line_exp fit, you are potentially
                ignoring a large percentage of the data (past the truncation).
                You could then easily miss EOL occurrences from past the cycle
                at which the line_exp fit is truncated. Note, simply extending
                the line_exp fit past the truncation point is covered in (1).
        '''
        # Begin by assuming EOL is not reached
        self.eol_reached = False
        
        # Get the monotonic fit
        self.mon_data = self.fit_monotonic(x_cont=self.x_cont,
                                           x_data=self.cycles,
                                           y_data=self.y)
        
        # Use the monotonic fit to check for EOL, because it is guaranteed
        # to be there for the whole curve and it's essentially linear interp
        # meaning that it will be closer to the data than line_exp or sig fits
        
        # Find the initial (experimental) value and compute the EOL value
        init_val = self.y[0]
        if self.mode.lower() == 'knee':
            self.eol_val = 0.8 * init_val
            self.post_eol_indices = np.where(self.mon_data < self.eol_val)[0]
        
        elif self.mode.lower() == 'elbow':
            self.eol_val = 2.0 * init_val
            self.post_eol_indices = np.where(self.mon_data > self.eol_val)[0]
    
        # If it finds indices past the EOL index, set eol_reached to true and
        # find the first cycle number in x_cont after the EOL value is reached
        if len(self.post_eol_indices) > 0:
            self.eol_reached = True
            self.eol_idx = self.post_eol_indices[0]
            self.eol_cycle = self.x_cont[self.eol_idx]
    
        return
    
    
    # Define a method to show the results on a plot
    def plot_results(self, line_exp=False, mon=False, data_style="-"):
        '''
        Write docstring
        '''
        fig, ax = plt.subplots()
        ax.plot(self.cycles, self.y, data_style, label="Experimental")
        if mon:
            ax.plot(self.x_cont, self.mon_data, label="Monotonic", color="green")
        if line_exp:
            ax.plot(self.x_cont[self.indices], self.exp_fit, label="Line_exp fit", color="purple")
        ax.axvline(self.onset, label=f'{self.mode.capitalize()} onset', color="orange")
        ax.axvline(self.point, label=f'{self.mode.capitalize()} point', color="red")
        ax.axhline(self.onset_y, color="orange")
        ax.axhline(self.point_y, color="red")
        if self.eol_cycle != None:
            ax.axvline(self.eol_cycle, label="End of life", color="black")
        ax.grid(alpha=0.4)
        ax.legend()
        plt.show()
        
    
    # Define the helper methods
    def process_cycles_array(self, cycles):
        '''
        Write docstring        
        '''
        
        # Identify the first and last cycle numbers in the cycles array.
        # In the case of non-integer cycle values, round up to the next
        # integer for the first cycle and remove any fractional part of
        # the last cycle number. Avoids issues with NaNs in the monotonic fit.
        first_cycle = int(np.ceil(cycles[0]))
        last_cycle = int(cycles[-1])
        
        # Create an array of integers from first to last cycle numbers.
        x_cont = np.arange(first_cycle, last_cycle+1)
        
        return x_cont
    
    
    def fit_monotonic(self, x_cont, x_data, y_data):
        '''
        x_cont (type: array)
            Array of continuous integer values for which the fitted
            IsotonicRegression result should be used to generate the monotonic
            fit.
            
        x_data (type: array)
            Experimental x data e.g. cycle number
        
        y_data (type: array)
            Experimental y data e.g. capacity or internal resistance
        
        '''
        # Create an IsotonicRegression instance
        if self.mode.lower() == 'knee':
            ir = IsotonicRegression(increasing=False)
        elif self.mode.lower() == 'elbow':
            ir = IsotonicRegression(increasing=True)

        # Fit the monotonic curve to the experimental data
        ir.fit(x_data, y_data)
        
        # Get a value for every cycle based on the fit
        mon_data = ir.predict(x_cont)
        
        return mon_data
    
    
    def fit_sigmoid(self, x_data, y_data):
        '''
        Write docstring
        '''
        sig_popt, _ = curve_fit(self._asym_sigmoidal,
                               x_data,
                               y_data,
                               p0=self.sig_p0,
                               bounds=self.sig_bounds)
        
        sig_fit = self._asym_sigmoidal(x_data, *sig_popt)

        return sig_fit
    
    
    def compute_second_derivative(self, data):
        '''
        Write docstring
        '''
        
        # Using np.gradient gives us a result of the same shape
        dy_dx = np.gradient(data)
        d2y_dx2 = np.gradient(dy_dx)
        
        # Set a threshold, below which, the value is set to zero
        d2y_dx2[np.where(np.abs(d2y_dx2) < 1e-10)] = 0.0
        
        # Replace the last 2 values with the value at index [-3] to avoid
        # the sharp change that happens at the end of the d2y_dx2 array
        d2y_dx2[-2] = d2y_dx2[-3]
        d2y_dx2[-1] = d2y_dx2[-3]
        
        return d2y_dx2
       
    
    def get_truncated_indices(self, data):
        '''
        Write docstring
        '''
        # Compute the second derivative of data,
        # which will be the asymmetric sigmoid fit
        self.d2 = self.compute_second_derivative(data)
        # Filter d2
        self.d2 = medfilt(self.d2, 5)
        # Get an array of the sign of d2 to find changes
        self.d2_sign = np.sign(self.d2)
        
        # Use mode to determine which new sign value to look for to find changes
        if self.mode == 'knee':
            new_sign_val = 1
        elif self.mode == 'elbow':
            new_sign_val == -1
            
        # Find out if there are any places where the sign changes
        change_indices = np.where(self.d2_sign == new_sign_val)[0]
        # If there are no sign changes, return the full array of indices
        if len(change_indices) == 0:
            indices = np.arange(0, len(data))
            return indices
        else:
            # Find the first index at which the sign changes from -ve to +ve
            self.first_change_idx = change_indices[0]
            
            # Look ahead a few cycles to make sure it is not a local erroneous fluctuation
            # Note this is incomplete, because there's nothing to handle AssertionErrors
            lookahead = np.min((20, len(self.d2_sign) - self.first_change_idx))
            assert(np.all(self.d2_sign[self.first_change_idx + lookahead] == new_sign_val))
            indices = np.arange(0, self.first_change_idx)

        return indices
    
    
    def fit_line_exp(self, x_data, y_data, indices):
        '''
        Write docstring
        '''
        # Use the indices array to select subsets of the other input arrays
        # if truncation has been deemed necessary. If no truncation is needed,
        # the indices array will contain the indices for every cycle
        x_data = x_data[indices]
        y_data = y_data[indices]
        
        # Fit the line plus exponential model to the monotonic data
        # Get the optimal parameters for _exponential
        exp_popt, _ = curve_fit(self._exponential,
                                x_data,
                                y_data,
                                p0=self.line_exp_p0,
                                bounds=self.line_exp_bounds)
        # Apply the optimal parameters to the continuous cycle array
        exp_fit = self._exponential(x_data, *exp_popt)
        
        return exp_fit
    
    
    def fit_bacon_watts(self, x_cont, indices, y_data):
        '''
        Write docstring.
        
        Pass the line_exponential fit to this method, to use the
        Bacon-Watts method to compute the cycle number at which
        the knee point occurs.
        '''
        
        # Use the indices array to select subsets of the other input arrays
        x_cont = x_cont[indices]
        y_data = y_data[indices]
        
        # Set some p0 and bounds values based on the cycle numbers
        # of the data being considered by the KneeFinder instance
        self.bw_p0[3] = x_cont[-1]/1.5
        # Set the upper bound to be the final cycle
        self.bw_bounds[1][3] = x_cont[-1]

        # Fit the Bacon Watts model to the line_exponential input
        popt_bw, _ = curve_fit(self._bw_func,
                               x_cont,
                               y_data,
                               p0=self.bw_p0,
                               bounds=self.bw_bounds)
        
        # Apply the optimal parameters to the continuous cycle array
        # to get the Bacon-Watts fit
        bw_fit = self._bw_func(x_cont, *popt_bw)
        
        return popt_bw[3], bw_fit


    def fit_double_bacon_watts(self, x_cont, indices, y_data):
        '''
        Write docstring
        '''
        # Use the indices array to select subsets of the other input arrays
        x_cont = x_cont[indices]
        y_data = y_data[indices]
        
        # Set some p0 and bounds values based on the cycle numbers
        # of the data being considered by the KneeFinder instance.
        # These p0 values give the onset and point a starting location
        # somewhere in the second half of the curve.
        self.bw2_p0[4] = x_cont[-1]/2.0
        self.bw2_p0[5] = x_cont[-1]/1.5
        # Set the upper bound to be the final cycle
        self.bw2_bounds[1][4] = x_cont[-1]
        self.bw2_bounds[1][5] = x_cont[-1]
        

        # Apply the optimal parameters to the continuous cycle array
        # to get the double Bacon-Watts fit
        popt_bw2, _ = curve_fit(self._bw2_func,
                               x_cont,
                               y_data,
                               p0=self.bw2_p0,
                               bounds=self.bw2_bounds)
        
        bw2_fit = self._bw2_func(x_cont, *popt_bw2)
        
        return popt_bw2[4], bw2_fit
        
    
    # Methods for setting and resetting parameters. All of these methods are very similar
    def reset_all_parameters(self):
        try:
            # Set the parameters to their default values
            self.line_exp_p0 = [1, 1, 1, 1, 1]
            self.line_exp_bounds = [0, 0, 0, 0, 0], [1, 1, 1, 1, 1]
            self.sig_p0 = [1, 1, 1, 1, 1]
            self.sig_bounds = [0, 0, 0, 0, 0], [1, 1, 1, 1, 1]
            self.bw_p0 = [1, 1, 1, 1]
            self.bw_bounds = ([-np.inf, -np.inf, -np.inf, 0],[np.inf, np.inf, np.inf, np.inf])
            self.bw2_p0 = [1, 1, 1, 1, 1, 1]
            self.bw2_bounds = ([-np.inf, -np.inf, -np.inf, -np.inf, 0, 0], [np.inf, np.inf, np.inf, np.inf, np.inf, np.inf])
        
        except Exception as e:
            print(f"Error: {e}")
    
    
    def set_params_using_dict(self, params_dict, src, data_type):
        try:
            # Assign the parameter values for the fitting functions,
            # for this particular instance.
            self.set_sigmoid_params(params_dict[src][data_type]['sig']['p0'])
            self.set_sigmoid_bounds(params_dict[src][data_type]['sig']['bounds'])
            self.set_line_exp_params(params_dict[src][data_type]['line_exp']['p0'])
            self.set_line_exp_bounds(params_dict[src][data_type]['line_exp']['bounds'])
        
        except Exception as e:
            print(f"Error: {e}")
            
    
    def set_line_exp_params(self, p0=None):
        # Input validation - check for the correct lengths.
        # Could also check that they obey the relevant bounds,
        # but could maybe leave that to common sense or leave the
        # curve_fit method to raise the exception.
        try:
            assert(len(p0)==5)
            self.line_exp_p0 = p0
            
        except AssertionError:
            print("Argument p0 must have length of 5 for line exponential.")
            print(p0)
            return None
        
        except Exception as e:
            print(f"Error: {e}")
        
        
    def set_line_exp_bounds(self, bounds=None):
        # Input validation - check for the correct lengths
        try:
            assert(len(bounds)==2)
            assert(len(bounds[0])==5)
            assert(len(bounds[1])==5)
            self.line_exp_bounds = bounds
            
        except AssertionError:
            print("bounds must have length of 5 for line exponential.")
            print(bounds)
            return None
        
        except Exception as e:
            print(f"Error: {e}")
            
    
    def reset_line_exp_params(self):
        try:
            self.line_exp_p0 = [1, 1, 1, 1, 1]
            self.line_exp_bounds = [0, 0, 0, 0, 0], [1, 1, 1, 1, 1]
            print("Parameters for line_exp reset")
        
        except Exception as e:
            print(f"Error: {e}")
        
        
    def set_sigmoid_params(self, p0=None):
        # Input validation - check for the correct lengths
        try:
            assert(len(p0)==5)
            self.sig_p0 = p0
            
        except AssertionError:
            print("Argument p0 must have length of 5 for asym_sigmoid.")
            print(p0)
            return None
        
        except Exception as e:
            print(f"Error: {e}")
        
            
    def set_sigmoid_bounds(self, bounds=None):
        # Input validation - check for the correct lengths
        try:
            assert(len(bounds)==2)
            assert(len(bounds[0])==5)
            assert(len(bounds[1])==5)
            self.sig_bounds = bounds
            
        except AssertionError:
            print("bounds must have length of 5 for asym_sigmoid.")
            print(bounds)
            return None
        
        except Exception as e:
            print(f"Error: {e}")
    
    
    def reset_sigmoid_params(self):
        try:
            self.sig_p0 = [1, 1, 1, 1, 1]
            self.sig_bounds = [0, 0, 0, 0, 0], [1, 1, 1, 1, 1]
            print("Parameters for asym_sigmoid reset")
        
        except Exception as e:
            print(e)
        
        
    def set_bw_params(self, p0=None):
        # Input validation - check for the correct lengths
        try:
            assert(len(p0)==4)
            self.bw_p0 = p0
            
        except AssertionError:
            print("Argument p0 must have length of 4 for Bacon Watts.")
            print(p0)
            
        except Exception as e:
            print(f"Error: {e}")
            
            
    def set_bw_bounds(self, bounds=None):
        # Input validation - check for the correct lengths
        try:
            assert(len(bounds)==2)
            assert(len(bounds[0])==4)
            assert(len(bounds[1])==4)
            self.bw_bounds = bounds
            
        except AssertionError:
            print("bounds must have length of 4 for Bacon Watts.")
            print(bounds)
            return None
        
        except Exception as e:
            print(f"Error: {e}")
    
    
    def reset_bw_params(self):
        try:
            # Reset the Bacon-Watts initial parameters and bounds to
            # some default values
            self.bw_p0 = [1, 1, 1, 1]
            self.bw_bounds = ([-np.inf, -np.inf, -np.inf, 0],
                              [np.inf, np.inf, np.inf, np.inf])
            print("Parameters for Bacon Watts reset")
        
        except Exception as e:
            print(f"Error: {e}")
            
        
    def set_bw2_params(self, p0=None):
        # Input validation - check for the correct lengths
        try:
            assert(len(p0)==6)
            self.bw2_p0 = p0
            
        except AssertionError:
            print("Arguments p0 and bounds must have length of 6 for double Bacon Watts.")
            print(p0)
            
        except Exception as e:
            print(f"Error: {e}")
    
    
    def set_bw2_bounds(self, bounds=None):
        # Input validation - check for the correct lengths
        try:
            assert(len(bounds)==2)
            assert(len(bounds[0])==6)
            assert(len(bounds[1])==6)
            self.bw2_bounds = bounds
        
        except AssertionError:
            print("bounds must have length of 6 for double Bacon Watts.")
            print(bounds)
            
        except Exception as e:
            print(f"Error: {e}")
        
    
    def reset_bw2_params(self):
        try:
            # Reset the double Bacon-Watts initial parameters and bounds to
            # some default values
            self.bw2_p0 = [1, 1, 1, 1, 1, 1]
            self.bw2_bounds = ([-np.inf, -np.inf, -np.inf, -np.inf, 0, 0],
                               [np.inf, np.inf, np.inf, np.inf, np.inf, np.inf])
            print("Parameters for double Bacon Watts reset")
            
        except Exception as e:
            print(f"Error: {e}")

В качестве минимального рабочего примера класс сохраняется в файле с именем new_knee_finder.py. Данные, используемые в примере, взяты из здесь. Словарь параметров обычно хранится и поддерживается где-то еще, но он явно создается здесь для целей MWE.

# A MWE for Code Review
from new_knee_finder import KneeFinder
import numpy as np

# Create some data
cycles = np.arange(0, 700, 50)
capacities = np.array([3.27795, 3.19774, 3.1372, 3.10605, 3.07494, 3.0869, 3.00876, 3.00872, 2.98472, 2.93986, 2.92171, 2.89218, 2.66845, 2.154])

# Make a dict for the initial curve fitting parameters
params_dict_diao = {"https://codereview.stackexchange.com/questions/257734/diao": {'capacity': {'sig': {'p0': None, 'bounds': None},
                                          'line_exp': {'p0': None, 'bounds': None}}}}
                             
params_dict_diao["https://codereview.stackexchange.com/questions/257734/diao"]['capacity']['sig']['p0'] = np.array([4.619, 4.671, 1530.0, 0.870, 1701.0])
params_dict_diao["https://codereview.stackexchange.com/questions/257734/diao"]['capacity']['sig']['bounds'] = [0, 0, 1e-8, 0, -1],[10, 50, 5000, 10, 5000]
params_dict_diao["https://codereview.stackexchange.com/questions/257734/diao"]['capacity']['line_exp']['p0'] = np.array([1.89e-3, 0.77, 1.88, -0.11, 0])
params_dict_diao["https://codereview.stackexchange.com/questions/257734/diao"]['capacity']['line_exp']['bounds'] = [0, 0, 0, -np.inf, -np.inf],[np.inf, np.inf, np.inf, 0, 0]

kf = KneeFinder(cycles, capacities, truncate=True, mode="knee")
kf.set_params_using_dict(params_dict_diao, src="https://codereview.stackexchange.com/questions/257734/diao", data_type="capacity")
kf.find_onset_and_point()
kf.find_eol()
kf.plot_results(mon=True, line_exp=False, data_style="-o")

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *