Игрушечная нейронная сеть дает плохие результаты

Я новичок в Python и Tensorflow.

Я сделал игрушечную нейронную сеть, чтобы изучить и то, и другое.

Цель NN — идентифицировать модуль c или разделение между линиями, определяемыми случайными точками, и ширину каждой линии, как показано на этом изображении, которое показывает одну выборку, взятую из тех, которые использовались для обучения сети:

Скриншот

Поскольку я генерирую образцы, у меня их неограниченное количество. Но сеть дает очень плохие результаты.

Вот график c_predicted / c для каждого c. Хорошим результатом было бы получение c_predicted / c≈1 для всех значений c

РЕДАКТИРОВАТЬ: исправлена ​​ошибка при построении этой диаграммы и обновлена ​​диаграмма

Скриншот

У меня были еще худшие ошибки, если я нормализовал данные и использовал слои пакетной нормализации.

Tensorflow быстро заполняет память даже в небольших сетях, поэтому для экономии памяти я повторно использовал один слой с именем convI.

Ожидается, что слой «convI» улучшит вывод слоя, ответ на который «почти» есть. По этой причине у меня есть 3 выходных слоя, все из которых должны выводить один и тот же результат, но за ними следует слой convI, а после convI вывод вычисляется снова, ожидая, что это будет улучшенное предположение по сравнению с предыдущим выводом.

Скриншот

Я читал и пытался следовать лучшим практикам tenorflow, но я делаю что-то ужасно неправильно и не знаю что.

Вот код. Я использовал операторы «if True:» для отступа блоков кода, чтобы сделать его более читабельным.

#Ejemplo tomado de
#https://keras.io/examples/timeseries/timeseries_classification_from_scratch/

import tensorflow as tf
tf.keras.backend.clear_session() #Releases rsources lockedby older crashed processes
#this code is needed to avoid crashes due to a bug in tensorflow
gpus = tf.config.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

print("-" * 50)#prints a line
print('Compute dtype: %s' % policy.compute_dtype)
print('Variable dtype: %s' % policy.variable_dtype)
print("-" * 50)

import numpy as np
import inspect
    
import matplotlib.pyplot as plt
class Dataset(object):
    numberOfSamples = 10000
    sampleShape = [4000,1]
    cMin = 0.1
    cMax = 10
    εMin = 0
    εMax = 0.4
    fractionOfTrainVsTest = 0.9
    rangeOfSamples = range(numberOfSamples)
    indexofSplit=numberOfSamples*fractionOfTrainVsTest

    #c&ε used for training
    c=None
    ε=None

    x=None
    x_normalized=None#x as x_test for NN
    y_normalized=None#y as y_test for NN
    StdDevX_i=None
    StdDevX_global=None#Standard deviation (of the standard deviation) of the entire samples used in training.

    def __init__(self):
        pass

    def CreateDataset(self,
                      numberOfSamples=10000,
                      sampleShape=[4000,1],
                      cMin=0.1,cMax=10,
                      εMin=0,εMax=0.4,
                      fractionOfTrainVsTest=0.9):   
        #copies all parameters to the class instance
        parameters = inspect.getfullargspec(self.CreateDataset)
        [setattr(self, "self." + var, value)  for var , value in zip(list(parameters[0][1:]) ,list(parameters[3]))]
        [print(name,"==",value) for name,value in vars(self).items()]

        self.rangeOfSamples = range(self.numberOfSamples)
    
        print("Generating samples")
        if True:
            
            #c[i] is the module (separation between "lines" to be found by the NN
            self.c = [np.random.uniform(self.cMin,self.cMax) for n in self.rangeOfSamples]
        

            #ε[i] is noise amplitude (error) as fraction of c
            self.ε = [np.random.uniform(self.εMin,self.εMax) for n in self.rangeOfSamples]
        
            #Number of lines of each sample
            NLines = [np.random.randint(5,50) for n in self.rangeOfSamples]

            #x for x_train (is an y coordinate)
            #x=c*±(pick one line from NLines)+ε
            #x[i].shape==sampleShape
            self.x = [self.c[r] * (np.diff(np.sort(np.random.randint(-NLines[r],NLines[r],size=self.sampleShape)),append=0) + self.ε[r] * np.random.uniform(-1 , 1,self.sampleShape))
                 for r in self.rangeOfSamples] 
            #sort_x = False#ToDo: explore if sorting enhances performance of the NN, as base data or extra feature.
            #if sort_x:
            #    x = [np.sort(x_i,axis=0) for x_i in x]

            self.StdDevX_i = [np.std(self.x[r]) for r in self.rangeOfSamples]
            self.StdDevX_global = np.std(self.StdDevX_i)
            self.x_normalized = [self.x[r] / self.StdDevX_i[r] for r in self.rangeOfSamples]# / self.StdDevX_global#supossed to be already averaged to 0
            self.c_normalized = [self.c[r] / self.StdDevX_i[r] for r in self.rangeOfSamples]# / self.StdDevX_global
            self.ε_normalized = [self.ε[r] * self.c_normalized[r] for r in self.rangeOfSamples]

            #import Plotear as pt
            #pt.PlotearXY(Y=self.StdDevX_i/self.StdDevX_global,scatter=True)

            ##ExtraFeatures
            #Δ_sorted_self.x_normalized = [np.diff(np.sort(sample,axis=0),axis=0) for sample in self.x_normalized]
        
            #ToDo: reduce to unique [values, counts = np.unique(words, return_counts=True)]
    
            ##Export to excel and open
            #if True:
            #    import os
            #    myPath=os.path.join(os.environ['temp'], 'Dataset.xlsx')
            #    df.to_excel(myPath,sheet_name="Python data",engine="xlsxwriter")
            #    os.startfile(myPath)
        
    
        #import Plotear as pt
        #i = np.random.randint(len(x))
        #pt.PlotearXY(Y=x[i],Title="c=" + str(c[i]) + "; ε=" + str(ε[i]) + "; N° lines=" + str(NLines[i]),scatter=True)
        #pt.PlotearXY(Y=self.x_normalized[0],Title="c=" + str(c[0]) + "; ε=" + str(ε[0]) + "; N° lines=" + str(NLines[0]),scatter=True)
    
        if True:#Format data for Keras
            self.indexofSplit = int(self.fractionOfTrainVsTest * len(self.x))
    
            self.x_normalized = np.asarray(self.x_normalized)
            self.y_normalized = np.asarray([ [self.c_normalized[r], self.ε_normalized[r]]  for r in self.rangeOfSamples])#Esta normalizado entre 0 y 1
        
            x_train = self.x_normalized[:self.indexofSplit].astype('float32') 
            x_test = self.x_normalized[self.indexofSplit:].astype('float32') 
    
            y_train = self.y_normalized[:self.indexofSplit].astype('float32') 
            y_test = self.y_normalized[self.indexofSplit:].astype('float32') 
        print("Datos generados")
        return x_train, y_train, x_test, y_test


    def Predict(self, model, NotNormalized_X):
        stdevX=np.std(NotNormalized_X,axis=1,keepdims=True)
        NormalizedX=np.divide(NotNormalized_X,stdevX)#/self.StdDevX_global#chequear == self.x_normalized
        Prediction=model.predict(NormalizedX)[-1]*np.squeeze(stdevX,axis=2)#*self.StdDevX_global# 
        return Prediction

     
myData = Dataset()
x_train, y_train, x_test, y_test = myData.CreateDataset()

if True:#Build a model

    def make_model_feedback_output(input_shape, output_shape):
    
        #Modern NVIDIA GPUs use a special hardware unit called Tensor Cores that can multiply float16 matrices very quickly. However, Tensor Cores requires certain dimensions of tensors to be a multiple of 8.
        #tf.keras.layers.Dense(units=64)
        #tf.keras.layers.Conv2d(filters=48, kernel_size=7, stride=3)
        #tf.keras.layers.LSTM(units=64)
        #tf.keras.Model.fit(epochs=2, batch_size=128)

        input_layer = keras.layers.Input(input_shape)
        #--------------------------------------------------------------------------------------
        conv0 = keras.layers.Conv1D(filters=64, kernel_size=16, padding="valid", activation="swish")(input_layer)
        conv1 = keras.layers.Conv1D(filters=64, kernel_size=16, padding="valid", activation="swish")(conv0)
        #conv1 = keras.layers.BatchNormalization()(conv1)
        #conv1 = keras.layers.LeakyReLU()(conv1)
        #--------------------------------------------------------------------------------------
    
        convI = keras.layers.Conv1D(filters=64, kernel_size=16, padding="valid", activation="swish")
        #--------------------------------------------------------------------------------------
    
        conv2 = convI(conv1)
        #conv2 = keras.layers.BatchNormalization()(conv2)
        #conv2 = keras.layers.LeakyReLU()(conv2)

        gap1 = keras.layers.GlobalAveragePooling1D()(conv2)

        #Output dtype needs to be dtype="float32" for mixed_precision
        output_layer1 = keras.layers.Dense(output_shape[0], activation='swish',dtype="float32", name="output_layer1")(gap1)
        #--------------------------------------------------------------------------------------
    
        conv3 = convI(conv2)
        #conv3 = keras.layers.BatchNormalization()(conv3)
        #conv3 = keras.layers.LeakyReLU()(conv3)        

        gap2 = keras.layers.GlobalAveragePooling1D()(conv3)

        #Output dtype tiene que ser dtype="float32" para que funcione mixed_precision
        output_layer2 = keras.layers.Dense(output_shape[0], activation='swish',dtype="float32", name="output_layer2")(gap2)
        #--------------------------------------------------------------------------------------

        conv4 = convI(conv3)
        #conv4 = keras.layers.BatchNormalization()(conv4)
        #conv4 = keras.layers.LeakyReLU()(conv4)        

        gap3 = keras.layers.GlobalAveragePooling1D()(conv4)

        #Output dtype tiene que ser dtype="float32" para que funcione mixed_precision
        output_layer3 = keras.layers.Dense(output_shape[0], activation='swish',dtype="float32", name="output_layer3")(gap3)
        #--------------------------------------------------------------------------------------

        return keras.models.Model(inputs=input_layer, outputs=[output_layer1, output_layer2, output_layer3])

    model = make_model_feedback_output(input_shape=x_train.shape[1:],output_shape=y_train.shape[1:])

if True:#Train the model
    Epochs = 100
    Patience = 100
    batch_size = 128#Has to be multiple of 8 to take advantage of mixed_precision
    pathForSavingFile = "z:\best_model.h5"

    callbacks = [keras.callbacks.ModelCheckpoint(pathForSavingFile, save_best_only=True, monitor="val_loss"),
        keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=Patience, min_lr=0.0001),
        keras.callbacks.EarlyStopping(monitor="val_loss", patience=Patience, verbose=1),]
    model.compile(optimizer="adam",
        loss={'output_layer1': 'MeanSquaredError', 
              'output_layer2': 'MeanSquaredError', 
              'output_layer3': 'MeanSquaredError'},
        metrics={'output_layer1': 'MeanSquaredError',
                 'output_layer2': 'MeanSquaredError', 
                 'output_layer3': 'MeanSquaredError'},)
    print("Training start...")
    print(model.summary())
    print('a')#beep

    import time
    start_time = time.time()
    history = model.fit(x_train,
        [y_train for i in range(3)],
        batch_size=batch_size,
        epochs=Epochs,
        callbacks=callbacks,
        validation_split=0.1,
        verbose=2,)
    print("Training ended")
    print("--- %s minutes ---" % ((time.time() - start_time) / 60))
    print('a')#beep





if True:#Evaluate model on test data

    model = keras.models.load_model(pathForSavingFile)

    Losses = model.evaluate(x_test, [y_test,y_test,y_test])
    test_loss, test_acc = (Losses[0] , Losses[-1])

    print("Test accuracy", test_acc)
    print("Test loss", test_loss)

if True:#Plot the model's training and validation loss

    metric = "mean_squared_error"
    Loss_result = "val_mean_squared_error"
    plt.figure()
    plt.plot(history.history['output_layer3_loss'])
    plt.plot(history.history["val_output_layer3_mean_squared_error"])
    plt.title("model " + metric)
    plt.ylabel(metric, fontsize="large")
    plt.xlabel("epoch", fontsize="large")
    plt.legend(["train; min=" + str(min(history.history['output_layer3_loss'])), "val; min=" + str(min(history.history["val_output_layer3_mean_squared_error"]))], loc="best")
    plt.show()
    plt.close()

if True:#Plot percentage of error of predicted c
    Prediction = myData.Predict(model,np.asarray(myData.x))
    PredictionC, Predictionε = [Prediction[:,i][:] for i in range(2)]
    PercentErrorIn_C = np.divide( PredictionC,np.asarray(myData.c))
#myData.y_normalized is guaranteed to be >0
    if True:#Coordinates for plotting a predicted example
        YY = PercentErrorIn_C[myData.indexofSplit:]
        XX = np.asarray(myData.c[myData.indexofSplit:] )
    fig = plt.figure()
    fig.tight_layout()
    fig.subplots_adjust(bottom=0.2)
    ax = plt.gca()
    ax.scatter(XX,YY,marker="o",s=0.5)
    plt.legend(["Ratio of error: c_estimated/c"], loc="best")
    plt.title("Error c_predicted/c") 
    #plt.draw()
    plt.show()
    plt.close()


if True:#Plot an example
    index_random = np.random.randint(myData.indexofSplit,len(myData.x_normalized))
    Prediction = myData.Predict(model,myData.x[index_random-1:index_random])
    #Prediction = (model.predict(myData.x_normalized)[index_random])[-1]#Only last output (output3)
    if True:#Coordinates for plotting a predicted example
        YY = myData.x[index_random] 
        XX = np.arange(0,len(YY))
    
        #c_predicted,ε_predicted = (-0.5+Prediction[0]) * MaxAbsX[index_random]*2, Prediction[1]
        #c_predicted,ε_predicted = (-0.5 + Prediction[0,0]) * MaxAbsX[index_random] * 2, Prediction[0,1]
        #c_predicted,ε_predicted = ( Prediction[0,0]) * MaxAbsX[index_random] , Prediction[0,1]
        c_predicted,ε_predicted = Prediction[0,0]  , Prediction[0,1]
        cX = np.array([0,XX[-1]])
        cY = np.array([1,1]) * c_predicted

        εX = np.array([1,1]) * XX[-1] / 2
        εY = cY + np.array([-1,1]) * ε_predicted #ε is noise amplitude as a fraction of c
    fig = plt.figure()
    fig.tight_layout()
    fig.subplots_adjust(bottom=0.2)
    ax = plt.gca()
    ax.scatter(XX,YY,marker="o",s=1)#.abs()
    ax.plot(cX,cY,marker="|",markersize=10)
    ax.plot(εX,εY,marker="_",markersize=10)
    plt.title('c=" + "{0:.2f}'.format(myData.c[index_random]) + "; c predicted=" + '{0:.2f}'.format(c_predicted) + "; c_predicted/c=" + '{0:.2f}'.format(c_predicted/myData.c[index_random])) 
    #plt.draw()
    plt.show()
    plt.close()
    pass

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *