Hi everyone. I am doing a task of time series prediction using Encoder-Decoder. This is my model:
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 9 23:59:31 2024
@author: ASUS
"""
from library import *
class LuongAttention(tf.keras.layers.Layer):
def __init__(self, units):
super(LuongAttention, self).__init__()
self.W1 = tf.keras.layers.Dense(units)
self.W2 = tf.keras.layers.Dense(units)
self.V = tf.keras.layers.Dense(1)
def call(self, query, values):
#print('\n******* Luong Attention STARTS******')
#print('query (decoder hidden state): (batch_size, hidden size) ', query.shape)
#print('values (encoder all hidden state): (batch_size, max_len, hidden size) ', values.shape)
# query hidden state shape == (batch_size, hidden size)
# query_with_time_axis shape == (batch_size, 1, hidden size)
# values shape == (batch_size, max_len, hidden size)
# we are doing this to broadcast addition along the time axis to calculate the score
query_with_time_axis = tf.expand_dims(query, 1)
#print('query_with_time_axis:(batch_size, 1, hidden size) ', query_with_time_axis.shape)
values_transposed = tf.transpose(values, perm=[0, 2, 1])
#print('values_transposed:(batch_size, hidden size, max_len) ', values_transposed.shape)
# score shape == (batch_size, max_length, 1)
# we get 1 at the last axis because we are applying score to self.V
# the shape of the tensor before applying self.V is (batch_size, max_length, units)
#BAHDANAU ADDITIVE:
score = self.V(tf.nn.tanh(
self.W1(query_with_time_axis) + self.W2(values)))
#LUONGH Dot-product
#score = tf.transpose(tf.matmul(query_with_time_axis, values_transposed) , perm=[0, 2, 1])
#print('score: (batch_size, max_length, 1) ',score.shape)
# attention_weights shape == (batch_size, max_length, 1)
attention_weights = tf.nn.softmax(score, axis=1)
#print('attention_weights: (batch_size, max_length, 1) ',attention_weights.shape)
# context_vector shape after sum == (batch_size, hidden_size)
context_vector = attention_weights * values
#print('context_vector before reduce_sum: (batch_size, max_length, hidden_size) ',context_vector.shape)
context_vector = tf.reduce_sum(context_vector, axis=1)
#print('context_vector after reduce_sum: (batch_size, hidden_size) ',context_vector.shape)
#print('\n******* Luong Attention ENDS******')
return context_vector, attention_weights
#%%
class Encoder(tf.keras.layers.Layer):
def __init__(self,
lstm_units,
dropout_rate,
l2_penalty,
num_features,
regularization):
super().__init__()
self.lstm_units = lstm_units
self.dropout_rate = dropout_rate
self.l2_penalty = l2_penalty
self.dropout = Dropout(dropout_rate)
if regularization:
self.lstm1 = Bidirectional(LSTM(lstm_units[0],
return_sequences = True,
return_state = True,
dropout = dropout_rate,
recurrent_dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.GlorotNormal(),
kernel_regularizer = tf.keras.regularizers.l2(l2_penalty)))
self.lstm2 = Bidirectional(LSTM(lstm_units[0],
return_sequences = True,
return_state = True,
dropout = dropout_rate,
recurrent_dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.GlorotNormal(),
kernel_regularizer = tf.keras.regularizers.l2(l2_penalty)))
self.dense = Dense(num_features,
activation = 'relu',
kernel_initializer = tf.keras.initializers.HeNormal(),
kernel_regularizer = tf.keras.regularizers.l2(l2_penalty))
else:
self.lstm1 = Bidirectional(LSTM(lstm_units[0],
return_sequences = True,
return_state = True,
dropout = dropout_rate,
recurrent_dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.GlorotNormal()))
self.lstm2 = Bidirectional(LSTM(lstm_units[0],
return_sequences = True,
return_state = True,
dropout = dropout_rate,
recurrent_dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.GlorotNormal()))
self.dense = Dense(num_features,
activation = 'relu',
kernel_initializer = tf.keras.initializers.HeNormal())
def call(self,
encoder_inputs,
training = None):
output_lstm1, forward_state_h_lstm1, forward_state_c_lstm1, backward_state_h_lstm1, backward_state_c_lstm1 = self.lstm1(encoder_inputs,
#training = training
)
# state_h_lstm1 = tf.concat([forward_state_h_lstm1, backward_state_h_lstm1], axis = -1)
# state_c_lstm1 = tf.concat([forward_state_c_lstm1, backward_state_c_lstm1], axis = -1)
# states_lstm1 = [state_h_lstm1, state_c_lstm1]
output_lstm2, forward_state_h_lstm2, forward_state_c_lstm2, backward_state_h_lstm2, backward_state_c_lstm2 = self.lstm2(output_lstm1,
initial_state = [forward_state_h_lstm1,
forward_state_c_lstm1,
backward_state_h_lstm1,
backward_state_c_lstm1],
#training = training
)
state_h_lstm2 = tf.concat([forward_state_h_lstm2, backward_state_h_lstm2], axis = -1)
state_c_lstm2 = tf.concat([forward_state_c_lstm2, backward_state_c_lstm2], axis = -1)
state_lstm2 = [state_h_lstm2, state_c_lstm2]
# encoder_outputs: (batch_size, time step, num features), state_lstm2: a list of (batch size, lstm_units[0]*2) (due to the Bidirectional)
return output_lstm2, state_lstm2
class DecoderBase(tf.keras.layers.Layer):
def __init__(self,
out_step,
dropout_rate,
#dist_size,
#probabilistic,
l2_penalty,
regularization):
super().__init__()
self.out_step = out_step
self.dropout_rate = dropout_rate
#self.dist_size = dist_size
#self.probabilistic = probabilistic
self.base_dropout = Dropout(dropout_rate)
if regularization:
self.base_dense = Dense(1,
activation = 'relu',
kernel_initializer = tf.keras.initializers.HeNormal(),
kernel_regularizer = tf.keras.regularizers.l2(l2_penalty))
else:
self.base_dense = Dense(1,
activation = 'relu',
kernel_initializer = tf.keras.initializers.HeNormal())
# Have to run one step at a time unlike with the encoder since sometimes is not teacher forcing
def run_single_recurrent_step(self,
inputs, # (batch size, num of features) (num of features here can be 1 + 14 or 1)
states, # list of 2 tensors: (batch size, num of features) (num of features here can be lstm_units[0]*2)
input_sequence_data, # (batch size, time step, num of features) (num of features here can be lstm_units[0]*2)
training):
raise NotImplementedError()
def call(self,
decoder_inputs,
initial_inputs,
initial_states,
input_sequence_data,
teacher_force_prob = None,
training = None):
# decoder_inputs is the labels of (batch size, time step, 1)
# initial_inputs is the final output of the encoder outputs (batch size, num features)
# initial_states is the output states of the encoder: a list of 2 tensors: (batch size, lstm_units[0]*2)
# input_sequence_data is the encoder outputs (batch size, time step, lstm_units[0]*2)
predictions = []
input_data = self.base_dropout(initial_inputs)
# convert from (batch size, lstm_units[0]*2) to (batch size, 1) to ensure the congruency of the input to the LSTM cell
input_data = self.base_dense(input_data)
states = initial_states
for t in range(self.out_step):
# inputs dimension: (batch size, 1 + 14)
inputs = input_data
# inputs dimension: (batch size, 1)
#inputs = input_data
outputs, states_output = self.run_single_recurrent_step(inputs, states, input_sequence_data, training)
predictions.append(outputs)
teacher_force = random.random() < teacher_force_prob if teacher_force_prob is not None else False
if teacher_force:
input_data = decoder_inputs[:, t, :] # the input for t+1 is the label at t
else:
input_data = outputs # the input for t+1 is the output at t
states = states_output
outputs_predictions = tf.stack(predictions)
outputs_predictions = tf.transpose(outputs_predictions, [1, 0, 2])
return outputs_predictions # (batch size, time step, num of features), in this case, num of features = 1
class DecoderVanilla(DecoderBase):
def __init__(self,
lstm_units,
out_step,
dropout_rate,
#dist_size,
#probabilistic,
l2_penalty,
regularization):
super().__init__(out_step,
dropout_rate,
#dist_size,
#probabilistic,
l2_penalty,
regularization)
self.lstm_units = lstm_units
self.dropout = Dropout(dropout_rate)
if regularization:
self.lstm_cell = LSTMCell(lstm_units[1],
dropout = dropout_rate,
recurrent_dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.GlorotNormal(),
kernel_regularizer = tf.keras.regularizers.l2(l2_penalty))
self.dense = Dense(1,
kernel_regularizer = tf.keras.regularizers.l2(l2_penalty)) # no need activation for the output
else:
self.lstm_cell = LSTMCell(lstm_units[1],
dropout = dropout_rate,
recurrent_dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.GlorotNormal())
self.dense = Dense(1) # no need activation for the output
def run_single_recurrent_step(self,
inputs,
states,
input_sequence_data,
training):
return_outputs, return_states = self.lstm_cell(inputs, states = states,
#training = training
)
return_outputs = self.dense(tf.concat([return_outputs, inputs], axis = -1))
return return_outputs, return_states
class DecoderWithAttention(DecoderBase):
def __init__(self,
lstm_units,
out_step,
dropout_rate,
#dist_size,
#probabilistic,
l2_penalty,
regularization):
super().__init__(out_step,
dropout_rate,
#dist_size,
#probabilistic,
l2_penalty,
regularization)
self.lstm_units = lstm_units
self.dropout = Dropout(dropout_rate)
self.attention = LuongAttention(32)
if regularization:
self.lstm_cell = LSTMCell(lstm_units[1],
dropout = dropout_rate,
recurrent_dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.GlorotNormal(),
kernel_regularizer = tf.keras.regularizers.l2(l2_penalty))
self.dense = Dense(1,
kernel_regularizer = tf.keras.regularizers.l2(l2_penalty)) # no need activation for the output
else:
self.lstm_cell = LSTMCell(lstm_units[1],
dropout = dropout_rate,
recurrent_dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.GlorotNormal())
self.dense = Dense(1) # no need activation for the output
def run_single_recurrent_step(self,
inputs,
states,
input_sequence_data,
training):
query = states[0]
values = input_sequence_data
# apply attention
context_vector, attention_weights = self.attention(query, values)
inputs_concat = tf.concat([context_vector, inputs], axis = -1)
return_outputs, return_states = self.lstm_cell(inputs_concat, states = states,
#training = training
)
return_outputs = self.dense(tf.concat([return_outputs, inputs_concat, context_vector], axis = -1))
return return_outputs, return_states
def seq2seq(encoder_input_shape,
decoder_input_shape,
out_step,
num_features,
type_decoder,
lstm_units,
dropout_rate,
l2_penalty,
teacher_force_prob,
regularization,
training):
encoder = Encoder(lstm_units,
dropout_rate,
l2_penalty,
num_features,
regularization)
if type_decoder == 'Vanilla':
decoder = DecoderVanilla(lstm_units,
out_step,
dropout_rate,
l2_penalty,
regularization)
elif type_decoder == 'WithAttention':
decoder = DecoderWithAttention(lstm_units,
out_step,
dropout_rate,
l2_penalty,
regularization)
encoder_inputs = Input(encoder_input_shape)
decoder_inputs = Input(decoder_input_shape)
encoder_outputs, encoder_states = encoder(encoder_inputs, training)
decoder_outputs = decoder(decoder_inputs, encoder_outputs[:,-1,:], encoder_states, encoder_outputs, teacher_force_prob, training)
# Buil model
model = Model(inputs = [encoder_inputs, decoder_inputs], outputs = decoder_outputs, name = 'Seq2Seq')
return model
This is the code to create the model ‘Vanilla’.
lstm_units = [32,
2*32]
n_features = 14
dropout_rate = 0.3
#dropout_rate = 0.3
l2_penalty = 0.01
#l2_penalty = 0.001
batch_size = 512
n_epochs = 200
out_step = 12
regularization = True
training = True
encoder_input_shape = (12, 14)
decoder_input_shape = (12, 1)
model_vanilla = seq2seq(encoder_input_shape,
decoder_input_shape,
out_step,
n_features,
'Vanilla',
lstm_units,
dropout_rate,
l2_penalty,
teacher_force_prob = None,
regularization = True,
training = True)
This is an example of the input:
x = tf.ones(shape = (800, 12, 14))
y = tf.ones(shape = (800, 12, 1))
out = model_vanilla((x, y)) # this model takes 2 inputs
At first, the model runs well. The output shape is (800, 12, 1). However, when I change the batch size:
x = tf.ones(shape = (900, 12, 14))
y = tf.ones(shape = (900, 12, 1))
out = model_vanilla((x, y)) # this model takes 2 inputs
This error occurs:
InvalidArgumentError: Exception encountered when calling layer "lstm_cell_13" " f"(type LSTMCell).
{{function_node __wrapped__Mul_device_/job:localhost/replica:0/task:0/device:GPU:0}} required broadcastable shapes [Op:Mul]
Call arguments received by layer "lstm_cell_13" " f"(type LSTMCell):
• inputs=tf.Tensor(shape=(900, 1), dtype=float32)
• states=['tf.Tensor(shape=(900, 64), dtype=float32)', 'tf.Tensor(shape=(900, 64), dtype=float32)']
• training=True
2024-04-18 00:44:42.125422: W tensorflow/core/framework/op_kernel.cc:1768] INVALID_ARGUMENT: required broadcastable shapes
The error comes from:
outputs, states_output = self.run_single_recurrent_step(inputs, states, input_sequence_data, training)
return_outputs, return_states = self.lstm_cell(inputs, states = states,
So in conclusion, when I change the batch size of the input, the model does not run anymore. It raises the error: INVALID_ARGUMENT: required broadcastable shapes. Can anymore figure out the reason and a solution? Thanks a lots .