Я новичок в Python и Tensorflow.
Я сделал игрушечную нейронную сеть, чтобы изучить и то, и другое.
Цель NN — идентифицировать модуль c или разделение между линиями, определяемыми случайными точками, и ширину каждой линии, как показано на этом изображении, которое показывает одну выборку, взятую из тех, которые использовались для обучения сети:
Поскольку я генерирую образцы, у меня их неограниченное количество. Но сеть дает очень плохие результаты.
Вот график c_predicted / c для каждого c. Хорошим результатом было бы получение c_predicted / c≈1 для всех значений c
РЕДАКТИРОВАТЬ: исправлена ошибка при построении этой диаграммы и обновлена диаграмма
У меня были еще худшие ошибки, если я нормализовал данные и использовал слои пакетной нормализации.
Tensorflow быстро заполняет память даже в небольших сетях, поэтому для экономии памяти я повторно использовал один слой с именем convI.
Ожидается, что слой «convI» улучшит вывод слоя, ответ на который «почти» есть. По этой причине у меня есть 3 выходных слоя, все из которых должны выводить один и тот же результат, но за ними следует слой convI, а после convI вывод вычисляется снова, ожидая, что это будет улучшенное предположение по сравнению с предыдущим выводом.
Я читал и пытался следовать лучшим практикам tenorflow, но я делаю что-то ужасно неправильно и не знаю что.
Вот код. Я использовал операторы «if True:» для отступа блоков кода, чтобы сделать его более читабельным.
#Ejemplo tomado de
#https://keras.io/examples/timeseries/timeseries_classification_from_scratch/
import tensorflow as tf
tf.keras.backend.clear_session() #Releases rsources lockedby older crashed processes
#this code is needed to avoid crashes due to a bug in tensorflow
gpus = tf.config.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
print("-" * 50)#prints a line
print('Compute dtype: %s' % policy.compute_dtype)
print('Variable dtype: %s' % policy.variable_dtype)
print("-" * 50)
import numpy as np
import inspect
import matplotlib.pyplot as plt
class Dataset(object):
numberOfSamples = 10000
sampleShape = [4000,1]
cMin = 0.1
cMax = 10
εMin = 0
εMax = 0.4
fractionOfTrainVsTest = 0.9
rangeOfSamples = range(numberOfSamples)
indexofSplit=numberOfSamples*fractionOfTrainVsTest
#c&ε used for training
c=None
ε=None
x=None
x_normalized=None#x as x_test for NN
y_normalized=None#y as y_test for NN
StdDevX_i=None
StdDevX_global=None#Standard deviation (of the standard deviation) of the entire samples used in training.
def __init__(self):
pass
def CreateDataset(self,
numberOfSamples=10000,
sampleShape=[4000,1],
cMin=0.1,cMax=10,
εMin=0,εMax=0.4,
fractionOfTrainVsTest=0.9):
#copies all parameters to the class instance
parameters = inspect.getfullargspec(self.CreateDataset)
[setattr(self, "self." + var, value) for var , value in zip(list(parameters[0][1:]) ,list(parameters[3]))]
[print(name,"==",value) for name,value in vars(self).items()]
self.rangeOfSamples = range(self.numberOfSamples)
print("Generating samples")
if True:
#c[i] is the module (separation between "lines" to be found by the NN
self.c = [np.random.uniform(self.cMin,self.cMax) for n in self.rangeOfSamples]
#ε[i] is noise amplitude (error) as fraction of c
self.ε = [np.random.uniform(self.εMin,self.εMax) for n in self.rangeOfSamples]
#Number of lines of each sample
NLines = [np.random.randint(5,50) for n in self.rangeOfSamples]
#x for x_train (is an y coordinate)
#x=c*±(pick one line from NLines)+ε
#x[i].shape==sampleShape
self.x = [self.c[r] * (np.diff(np.sort(np.random.randint(-NLines[r],NLines[r],size=self.sampleShape)),append=0) + self.ε[r] * np.random.uniform(-1 , 1,self.sampleShape))
for r in self.rangeOfSamples]
#sort_x = False#ToDo: explore if sorting enhances performance of the NN, as base data or extra feature.
#if sort_x:
# x = [np.sort(x_i,axis=0) for x_i in x]
self.StdDevX_i = [np.std(self.x[r]) for r in self.rangeOfSamples]
self.StdDevX_global = np.std(self.StdDevX_i)
self.x_normalized = [self.x[r] / self.StdDevX_i[r] for r in self.rangeOfSamples]# / self.StdDevX_global#supossed to be already averaged to 0
self.c_normalized = [self.c[r] / self.StdDevX_i[r] for r in self.rangeOfSamples]# / self.StdDevX_global
self.ε_normalized = [self.ε[r] * self.c_normalized[r] for r in self.rangeOfSamples]
#import Plotear as pt
#pt.PlotearXY(Y=self.StdDevX_i/self.StdDevX_global,scatter=True)
##ExtraFeatures
#Δ_sorted_self.x_normalized = [np.diff(np.sort(sample,axis=0),axis=0) for sample in self.x_normalized]
#ToDo: reduce to unique [values, counts = np.unique(words, return_counts=True)]
##Export to excel and open
#if True:
# import os
# myPath=os.path.join(os.environ['temp'], 'Dataset.xlsx')
# df.to_excel(myPath,sheet_name="Python data",engine="xlsxwriter")
# os.startfile(myPath)
#import Plotear as pt
#i = np.random.randint(len(x))
#pt.PlotearXY(Y=x[i],Title="c=" + str(c[i]) + "; ε=" + str(ε[i]) + "; N° lines=" + str(NLines[i]),scatter=True)
#pt.PlotearXY(Y=self.x_normalized[0],Title="c=" + str(c[0]) + "; ε=" + str(ε[0]) + "; N° lines=" + str(NLines[0]),scatter=True)
if True:#Format data for Keras
self.indexofSplit = int(self.fractionOfTrainVsTest * len(self.x))
self.x_normalized = np.asarray(self.x_normalized)
self.y_normalized = np.asarray([ [self.c_normalized[r], self.ε_normalized[r]] for r in self.rangeOfSamples])#Esta normalizado entre 0 y 1
x_train = self.x_normalized[:self.indexofSplit].astype('float32')
x_test = self.x_normalized[self.indexofSplit:].astype('float32')
y_train = self.y_normalized[:self.indexofSplit].astype('float32')
y_test = self.y_normalized[self.indexofSplit:].astype('float32')
print("Datos generados")
return x_train, y_train, x_test, y_test
def Predict(self, model, NotNormalized_X):
stdevX=np.std(NotNormalized_X,axis=1,keepdims=True)
NormalizedX=np.divide(NotNormalized_X,stdevX)#/self.StdDevX_global#chequear == self.x_normalized
Prediction=model.predict(NormalizedX)[-1]*np.squeeze(stdevX,axis=2)#*self.StdDevX_global#
return Prediction
myData = Dataset()
x_train, y_train, x_test, y_test = myData.CreateDataset()
if True:#Build a model
def make_model_feedback_output(input_shape, output_shape):
#Modern NVIDIA GPUs use a special hardware unit called Tensor Cores that can multiply float16 matrices very quickly. However, Tensor Cores requires certain dimensions of tensors to be a multiple of 8.
#tf.keras.layers.Dense(units=64)
#tf.keras.layers.Conv2d(filters=48, kernel_size=7, stride=3)
#tf.keras.layers.LSTM(units=64)
#tf.keras.Model.fit(epochs=2, batch_size=128)
input_layer = keras.layers.Input(input_shape)
#--------------------------------------------------------------------------------------
conv0 = keras.layers.Conv1D(filters=64, kernel_size=16, padding="valid", activation="swish")(input_layer)
conv1 = keras.layers.Conv1D(filters=64, kernel_size=16, padding="valid", activation="swish")(conv0)
#conv1 = keras.layers.BatchNormalization()(conv1)
#conv1 = keras.layers.LeakyReLU()(conv1)
#--------------------------------------------------------------------------------------
convI = keras.layers.Conv1D(filters=64, kernel_size=16, padding="valid", activation="swish")
#--------------------------------------------------------------------------------------
conv2 = convI(conv1)
#conv2 = keras.layers.BatchNormalization()(conv2)
#conv2 = keras.layers.LeakyReLU()(conv2)
gap1 = keras.layers.GlobalAveragePooling1D()(conv2)
#Output dtype needs to be dtype="float32" for mixed_precision
output_layer1 = keras.layers.Dense(output_shape[0], activation='swish',dtype="float32", name="output_layer1")(gap1)
#--------------------------------------------------------------------------------------
conv3 = convI(conv2)
#conv3 = keras.layers.BatchNormalization()(conv3)
#conv3 = keras.layers.LeakyReLU()(conv3)
gap2 = keras.layers.GlobalAveragePooling1D()(conv3)
#Output dtype tiene que ser dtype="float32" para que funcione mixed_precision
output_layer2 = keras.layers.Dense(output_shape[0], activation='swish',dtype="float32", name="output_layer2")(gap2)
#--------------------------------------------------------------------------------------
conv4 = convI(conv3)
#conv4 = keras.layers.BatchNormalization()(conv4)
#conv4 = keras.layers.LeakyReLU()(conv4)
gap3 = keras.layers.GlobalAveragePooling1D()(conv4)
#Output dtype tiene que ser dtype="float32" para que funcione mixed_precision
output_layer3 = keras.layers.Dense(output_shape[0], activation='swish',dtype="float32", name="output_layer3")(gap3)
#--------------------------------------------------------------------------------------
return keras.models.Model(inputs=input_layer, outputs=[output_layer1, output_layer2, output_layer3])
model = make_model_feedback_output(input_shape=x_train.shape[1:],output_shape=y_train.shape[1:])
if True:#Train the model
Epochs = 100
Patience = 100
batch_size = 128#Has to be multiple of 8 to take advantage of mixed_precision
pathForSavingFile = "z:\best_model.h5"
callbacks = [keras.callbacks.ModelCheckpoint(pathForSavingFile, save_best_only=True, monitor="val_loss"),
keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=Patience, min_lr=0.0001),
keras.callbacks.EarlyStopping(monitor="val_loss", patience=Patience, verbose=1),]
model.compile(optimizer="adam",
loss={'output_layer1': 'MeanSquaredError',
'output_layer2': 'MeanSquaredError',
'output_layer3': 'MeanSquaredError'},
metrics={'output_layer1': 'MeanSquaredError',
'output_layer2': 'MeanSquaredError',
'output_layer3': 'MeanSquaredError'},)
print("Training start...")
print(model.summary())
print('a')#beep
import time
start_time = time.time()
history = model.fit(x_train,
[y_train for i in range(3)],
batch_size=batch_size,
epochs=Epochs,
callbacks=callbacks,
validation_split=0.1,
verbose=2,)
print("Training ended")
print("--- %s minutes ---" % ((time.time() - start_time) / 60))
print('a')#beep
if True:#Evaluate model on test data
model = keras.models.load_model(pathForSavingFile)
Losses = model.evaluate(x_test, [y_test,y_test,y_test])
test_loss, test_acc = (Losses[0] , Losses[-1])
print("Test accuracy", test_acc)
print("Test loss", test_loss)
if True:#Plot the model's training and validation loss
metric = "mean_squared_error"
Loss_result = "val_mean_squared_error"
plt.figure()
plt.plot(history.history['output_layer3_loss'])
plt.plot(history.history["val_output_layer3_mean_squared_error"])
plt.title("model " + metric)
plt.ylabel(metric, fontsize="large")
plt.xlabel("epoch", fontsize="large")
plt.legend(["train; min=" + str(min(history.history['output_layer3_loss'])), "val; min=" + str(min(history.history["val_output_layer3_mean_squared_error"]))], loc="best")
plt.show()
plt.close()
if True:#Plot percentage of error of predicted c
Prediction = myData.Predict(model,np.asarray(myData.x))
PredictionC, Predictionε = [Prediction[:,i][:] for i in range(2)]
PercentErrorIn_C = np.divide( PredictionC,np.asarray(myData.c))
#myData.y_normalized is guaranteed to be >0
if True:#Coordinates for plotting a predicted example
YY = PercentErrorIn_C[myData.indexofSplit:]
XX = np.asarray(myData.c[myData.indexofSplit:] )
fig = plt.figure()
fig.tight_layout()
fig.subplots_adjust(bottom=0.2)
ax = plt.gca()
ax.scatter(XX,YY,marker="o",s=0.5)
plt.legend(["Ratio of error: c_estimated/c"], loc="best")
plt.title("Error c_predicted/c")
#plt.draw()
plt.show()
plt.close()
if True:#Plot an example
index_random = np.random.randint(myData.indexofSplit,len(myData.x_normalized))
Prediction = myData.Predict(model,myData.x[index_random-1:index_random])
#Prediction = (model.predict(myData.x_normalized)[index_random])[-1]#Only last output (output3)
if True:#Coordinates for plotting a predicted example
YY = myData.x[index_random]
XX = np.arange(0,len(YY))
#c_predicted,ε_predicted = (-0.5+Prediction[0]) * MaxAbsX[index_random]*2, Prediction[1]
#c_predicted,ε_predicted = (-0.5 + Prediction[0,0]) * MaxAbsX[index_random] * 2, Prediction[0,1]
#c_predicted,ε_predicted = ( Prediction[0,0]) * MaxAbsX[index_random] , Prediction[0,1]
c_predicted,ε_predicted = Prediction[0,0] , Prediction[0,1]
cX = np.array([0,XX[-1]])
cY = np.array([1,1]) * c_predicted
εX = np.array([1,1]) * XX[-1] / 2
εY = cY + np.array([-1,1]) * ε_predicted #ε is noise amplitude as a fraction of c
fig = plt.figure()
fig.tight_layout()
fig.subplots_adjust(bottom=0.2)
ax = plt.gca()
ax.scatter(XX,YY,marker="o",s=1)#.abs()
ax.plot(cX,cY,marker="|",markersize=10)
ax.plot(εX,εY,marker="_",markersize=10)
plt.title('c=" + "{0:.2f}'.format(myData.c[index_random]) + "; c predicted=" + '{0:.2f}'.format(c_predicted) + "; c_predicted/c=" + '{0:.2f}'.format(c_predicted/myData.c[index_random]))
#plt.draw()
plt.show()
plt.close()
pass