When we use sequential, we can just call the model multiple times as shown in the example below.
embedding_model = Sequential([
Dense(128, activation='elu', input_shape=(Xa_train.shape[1],)),
Dense(64, activation="elu"),
Dense(emb_size, activation='linear')
])
input_anchor = Input(shape=(Xa_train.shape[1],))
input_positive = Input(shape=(Xa_train.shape[1],))
input_negative = Input(shape=(Xa_train.shape[1],))
embedding_anchor = embedding_model(input_anchor)
embedding_positive = embedding_model(input_positive)
embedding_negative = embedding_model(input_negative)
output = concatenate([embedding_anchor, embedding_positive, embedding_negative], axis=1)
tnn = Model([input_anchor, input_positive, input_negative], output)
Here, I tried to build the model via model-subclassing and shared the weight.
class TNN_model(tf.keras.Model):
def __init__(self, input_dim, latent_dim=2, name="autoencoder", **kwargs
):
super(TNN_model, self).__init__(name=name, **kwargs)
self.model_layer1 = Dense(128, activation="elu", input_shape=(input_dim,))
self.model_layer2 = Dense(64, activation="elu")
self.model_output = Dense(latent_dim, activation="linear")
def call(self, data):
x = self.model_layer1(data)
x = self.model_layer2(x)
x_out = self.model_output(x)
return x_out
class ModelTrain(tf.keras.Model):
def __init__(self, model):
super(ModelTrain, self).__init__()
self.model = model
def compile(self, optimizer, loss):
super(ModelTrain, self).compile()
self.optimizer = optimizer
self.loss = loss
def train_step(self, data):
x, y = data
Xa_train_x, Xp_train_x, Xn_train_x = x[0], x[1], x[2]
with tf.GradientTape() as tape:
model_out_anchor = self.model(Xa_train_x)
model_out_positive = self.model(Xp_train_x)
model_out_negative = self.model(Xn_train_x)
output = [model_out_anchor, model_out_positive, model_out_negative]
loss, loss_positive, loss_negative = self.loss(output)
grads = tape.gradient(loss, self.model.trainable_variables)
self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
return {'total_loss': loss, 'loss_positive': loss_positive, 'loss_negative': loss_negative}
def triplet_loss(output_encoder):
anchor, positive, negative = output_encoder[0], output_encoder[1], output_encoder[2]
positive_dist = tf.reduce_mean(tf.square(anchor - positive), axis=1)
negative_dist = tf.reduce_mean(tf.square(anchor - negative), axis=1)
loss = tf.maximum(0., positive_dist - negative_dist)
return loss, positive_dist, negative_dist
au_TNN = TNN_model(Xa_train_final.shape[1], emb_size)
training = ModelTrain(au_TNN)
training.compile(optimizer=Adam(), loss=triplet_loss)
output_dummy = np.zeros((Xa_train_final.shape[0], emb_size))
history = training.fit([Xa_train_final, Xp_train_final, Xn_train_final], [output_dummy, output_dummy, output_dummy],
epochs=number_epoch, batch_size=batch_size_value,
shuffle=True, verbose=2)
I used the same loss function (very basic triplet loss) and dataset for both examples (sequential and sub-classing); however, while sequential training loss went down, sub-classing training loss was very unsteady. It seems that instead of sharing the layers (model), it created three models. I could not find any example or documentation for this.
Is sharing model in this way correct for Keras model sub-classing?
with tf.GradientTape() as tape:
model_out_anchor = self.model(Xa_train_x)
model_out_positive = self.model(Xp_train_x)
model_out_negative = self.model(Xn_train_x)
I use TensorFlow 2.3.0