具有完整代码的ipython netbook可以在以下链接中找到。
导入所需的包,加载数据集并定义两个辅助函数。第一个方法prepare_dataset将数据分割成块,为模型训练创建XY对。X是过去(例1到t-1)的风电价值(wind power value),Y将在t时刻为未来值(future value)。第二种方法train_evaluate执行三件事,1)解码遗传算法解决方案以获得窗口大小和单元数。2)使用GA找到的窗口大小来准备数据集,并将其划分为训练和验证集,3)训练LSTM模型,在验证集上计算RMSE,并返回该值将其作为当前遗传算法解决方案的适应度值。
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split as split
from keras.layers import LSTM, Input, Dense
from keras.models import Model
from deap import base, creator, tools, algorithms
from scipy.stats import bernoulli
from bitstring import BitArray
data = pd.read_csv('train.csv')
data = np.reshape(np.array(data['wp1']),(len(data['wp1']),1))
# Use first 17,257 points as training/validation and rest of the 1500 points as test set.
train_data = data[0:17257]
test_data = data[17257:]
def prepare_dataset(data, window_size):
X, Y = np.empty((0,window_size)), np.empty((0))
for i in range(len(data)-window_size-1):
X = np.vstack([X,data[i:(i + window_size),0]])
Y = np.append(Y,data[i + window_size,0])
X = np.reshape(X,(len(X),window_size,1))
Y = np.reshape(Y,(len(Y),1))
return X, Y
def train_evaluate(ga_individual_solution):
# Decode GA solution to integer for window_size and num_units
window_size_bits = BitArray(ga_individual_solution[0:6])
num_units_bits = BitArray(ga_individual_solution[6:])
window_size = window_size_bits.uint
num_units = num_units_bits.uint
print('\nWindow Size: ', window_size, ', Num of Units: ', num_units)
# Return fitness score of 100 if window_size or num_unit is zero
if window_size == 0 or num_units == 0:
return 100,
# Segment the train_data based on new window_size; split into train and validation (80/20)
X,Y = prepare_dataset(train_data,window_size)
X_train, X_val, y_train, y_val = split(X, Y, test_size = 0.20, random_state = 1120)
# Train LSTM model and predict on validation set
inputs = Input(shape=(window_size,1))
x = LSTM(num_units, input_shape=(window_size,1))(inputs)
predictions = Dense(1, activation='linear')(x)
model = Model(inputs=inputs, outputs=predictions)
model.fit(X_train, y_train, epochs=5, batch_size=10,shuffle=True)
y_pred = model.predict(X_val)
# Calculate the RMSE score as fitness score for GA
rmse = np.sqrt(mean_squared_error(y_val, y_pred))
print('Validation RMSE: ', rmse,'\n')
return rmse,
population_size = 4
num_generations = 4
gene_length = 10
# As we are trying to minimize the RMSE score, that's why using -1.0.
# In case, when you want to maximize accuracy for instance, use 1.0
creator.create('FitnessMax', base.Fitness, weights = (-1.0,))
creator.create('Individual', list , fitness = creator.FitnessMax)
toolbox = base.Toolbox()
toolbox.register('binary', bernoulli.rvs, 0.5)
toolbox.register('individual', tools.initRepeat, creator.Individual, toolbox.binary,
n = gene_length)
toolbox.register('population', tools.initRepeat, list , toolbox.individual)
toolbox.register('mate', tools.cxOrdered)
toolbox.register('mutate', tools.mutShuffleIndexes, indpb = 0.6)
toolbox.register('select', tools.selRoulette)
toolbox.register('evaluate', train_evaluate)
population = toolbox.population(n = population_size)
r = algorithms.eaSimple(population, toolbox, cxpb = 0.4, mutpb = 0.1,
ngen = num_generations, verbose = False)
通过使用 tools.selBest(population,k = 1),可以很容易地通过遗传算法找到的K最佳解决方案。之后,最优配置可以用来训练完整的训练集,并在holdout测试集上进行测试。
# Print top N solutions - (1st only, for now)
best_individuals = tools.selBest(population,k = 1)
best_window_size = None
best_num_units = None
for bi in best_individuals:
window_size_bits = BitArray(bi[0:6])
num_units_bits = BitArray(bi[6:])
best_window_size = window_size_bits.uint
best_num_units = num_units_bits.uint
print('\nWindow Size: ', best_window_size, ', Num of Units: ', best_num_units)
# Train the model using best configuration on complete training set
#and make predictions on the test set
X_train,y_train = prepare_dataset(train_data,best_window_size)
X_test, y_test = prepare_dataset(test_data,best_window_size)
inputs = Input(shape=(best_window_size,1))
x = LSTM(best_num_units, input_shape=(best_window_size,1))(inputs)
predictions = Dense(1, activation='linear')(x)
model = Model(inputs = inputs, outputs = predictions)
model.fit(X_train, y_train, epochs=5, batch_size=10,shuffle=True)
y_pred = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print('Test RMSE: ', rmse)