Error when adding Lambda layer to my keras model

I am doing image classification and trying to augment my images based on fastai augmentation scheme. This is my code for it;

@tf.function
def _distorted_bounding_box_crop(
  image,bbox,min_object_covered=0.1,
  aspect_ratio_range=(0.75,1.33),
  area_range=(0.05,0.1),max_attempts=100
):
  shape = tf.shape(image)
  random_distorted_bbox = tf.image.sample_distorted_bounding_box(
      shape,
      bounding_boxes = bbox,
      min_object_covered = min_object_covered,
      aspect_ratio_range = aspect_ratio_range,
      area_range = area_range,
      max_attempts = max_attempts
  )
  offset,size,_ = random_distorted_bbox
  offset_y,offset_x,_ = tf.unstack(offset)
  target_y,target_x,_ = tf.unstack(size)
  image = tf.image.crop_to_bounding_box(
      image,offset_y,offset_x,target_y,target_x
  )
  return image

def random_apply(func,image,prob):
  return tf.cond(
      tf.math.less(
          tf.random.uniform([],0,1,tf.float32),tf.cast(prob,tf.float32)
      ),
      lambda:func(image),lambda:image
  )

@tf.function    
def random_crop_resize(x, area_range=(0.08,1.0)):
    bbox = tf.constant([0.,0.,1.,1.],dtype=tf.float32,shape=[1,1,4])
    # aspect_ratio = height / width
    x = _distorted_bounding_box_crop(
      x,bbox,
      area_range=area_range               
    )
    x = tf.image.resize(x,[HEIGHT,WIDTH])
    x = tf.cast(tf.clip_by_value(x,0,255), tf.uint8)
    return x

@tf.function
def val_crop(x, label):
    x = tf.image.resize(x, [HEIGHT, WIDTH])
    x = tf.cast(tf.clip_by_value(x,0,255), tf.uint8)
    return x,label

def random_lighting(x,s=0.2):
    # x = tf.image.random_brightness(x,max_delta = s)
    # x = tf.image.random_contrast(x,
    #                               lower = 1 - s,
    #                               upper = 1 + s
    #                               )

    # x = tf.cast(tf.clip_by_value(x,0,255), tf.uint8)
    bright = keras.layers.RandomBrightness(s, (0,255), seed = SEED)
    contrast = keras.layers.RandomContrast(s, (0,255), seed = SEED)
    x = bright(x)
    x = contrast(x)
    return x

def affine_tfms(image, max_zoom = 1.1, max_rotate = 10.0, max_warp = 0.2):
    # def _get_zoom_factor():
    #     return tf.random.uniform(shape=[], minval=1.0, maxval=max_zoom)
        
    def zooming(x):
        zoom = keras.layers.RandomZoom(.1,.1)
        x = zoom(x)
        # zoom_dim = tf.cast(tf.cast(tf.shape(x)[:2],tf.float32) * zoom_factor, tf.int32)
        # x = tf.image.resize(x, zoom_dim)
        # x = tf.image.resize_with_crop_or_pad(x, HEIGHT, WIDTH)
        # x = tf.cast(tf.clip_by_value(x,0,255), tf.uint8)
        return x

    def rotation(x):
        rot = keras.layers.RandomRotation(factor = 0.1, fill_mode="reflect")
        x = rot(x)
        # x = tf.cast(tf.clip_by_value(x,0,255), tf.uint8)
        return x

    def warping(x):
        warp = keras.layers.RandomPerspective(factor = 0.5, scale = max_warp, interpolation = "nearest")
        x = warp(x)
        return x

    image = random_apply(rotation, image, prob = 0.75)
    image = random_apply(zooming, image, prob = 0.75)
    image = warping(image)
    # image = tf.cast(tf.clip_by_value(image,0,255), tf.uint8)
    return image

def random_flip(x):
    flip = keras.layers.RandomFlip("horizontal", seed = SEED)
    x = flip(x)
    # x = tf.cast(tf.clip_by_value(x,0,255), tf.uint8)
    return x

def batch_tfms(image):
    image = random_apply(random_lighting, image, prob = 0.75)
    image = random_apply(random_flip, image, prob = 0.5)
    image = affine_tfms(image)
    return tf.cast(tf.clip_by_value(image,0,255), tf.uint8)



custom_aug = Sequential([
    Lambda(lambda x: batch_tfms(tf.expand_dims(x,axis=0)))
])

When I reach the point of training this is the error I get;

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[68], line 30
     27 base.trainable = False
     29 inputs = keras.Input(shape = [HEIGHT, WIDTH, 3])
---> 30 x = custom_aug(inputs)
     31 x = keras.applications.resnet.preprocess_input(x)
     32 x = base(x, training = False)

File /usr/local/lib/python3.10/site-packages/keras/src/utils/traceback_utils.py:122, in filter_traceback.<locals>.error_handler(*args, **kwargs)
    119     filtered_tb = _process_traceback_frames(e.__traceback__)
    120     # To get the full stack trace, call:
    121     # `keras.config.disable_traceback_filtering()`
--> 122     raise e.with_traceback(filtered_tb) from None
    123 finally:
    124     del filtered_tb

File /usr/local/lib/python3.10/site-packages/keras/src/layers/input_spec.py:245, in assert_input_compatibility(input_spec, inputs, layer_name)
    243 if spec_dim is not None and dim is not None:
    244     if spec_dim != dim:
--> 245         raise ValueError(
    246             f'Input {input_index} of layer "{layer_name}" is '
    247             "incompatible with the layer: "
    248             f"expected shape={spec.shape}, "
    249             f"found shape={shape}"
    250         )

ValueError: Exception encountered when calling Sequential.call().

Input 0 of layer "functional_3" is incompatible with the layer: expected shape=(None, 224, 3), found shape=(None, 224, 224, 3)

Arguments received by Sequential.call():
  • args=('<KerasTensor shape=(None, 224, 224, 3), dtype=float32, sparse=False, ragged=False, name=keras_tensor_903>',)
  • kwargs={'mask': 'None'}

This is my model definition for reference;

EPOCHS = 8

es = EarlyStopping(
    monitor = "val_sparse_categorical_accuracy",
    patience = 2,
    verbose = 1,
    restore_best_weights = True,
    start_from_epoch = 2,
    min_delta = 0.005
)
lr_manager = OneCycleLR(
    NUM_TRAINING_IMAGES.item(),
    EPOCHS,
    GLOBAL_BATCH_SIZE,
    5e-4,
    maximum_momentum=None, 
    minimum_momentum=None
)
# Create schedule with PyTorch-compatible parameters

with strategy.scope():
    base = keras.applications.ResNet50(
    include_top = False,
    weights = "imagenet",
    input_shape = [HEIGHT, WIDTH, 3]
    )
    base.trainable = False
    
    inputs = keras.Input(shape = [HEIGHT, WIDTH, 3])
    x = custom_aug(inputs)
    x = keras.applications.resnet.preprocess_input(x)
    x = base(x, training = False)
    x = keras.layers.GlobalAveragePooling2D()(x)
    outputs = keras.layers.Dense(5)(x)
    final_model  = keras.Model(inputs, outputs)
    
    final_model.compile(
        optimizer = "adam",
        loss  = "sparse_categorical_crossentropy",
        metrics = ["sparse_categorical_accuracy"]
    )

hist = final_model.fit(train_ds, validation_data = val_ds, 
                       epochs = EPOCHS)

Has anyone experienced this before and how do I solve it?

I found a work around it that works perfectly now. For anybody that may want to do something similar, here you go;

# Custom Keras layer to apply an augmentation with a certain probability
class ProbabilisticAugmentation(keras.layers.Layer):
    """
    Wraps another augmentation layer and applies it with a given probability.
    
    Args:
        layer (tf.keras.layers.Layer): The augmentation layer to apply.
        p (float): The probability of applying the layer. Must be between 0 and 1.
    """
    def __init__(self, layer, p=0.5, **kwargs):
        super().__init__(**kwargs)
        self.layer = layer
        self.p = p

    def call(self, inputs, training=None):
        if not training:
            # During inference or validation, do not apply augmentation
            return inputs

        # Apply the augmentation probabilistically
        return tf.cond(
            tf.random.uniform([]) < self.p,
            lambda: self.layer(inputs, training=training),
            lambda: inputs
        )

max_rotate = 0.0278
max_zoom = -0.1
max_jitter = 0.2

augmentation_pipeline = tf.keras.Sequential([
    # Apply flipping 75% of the time
    ProbabilisticAugmentation(
        keras.layers.RandomBrightness(factor = max_jitter), 
        p=0.75
    ),
    ProbabilisticAugmentation(
        keras.layers.RandomContrast(factor = max_jitter), 
        p=0.75
    ),
    ProbabilisticAugmentation(
        keras.layers.RandomZoom(height_factor = (max_zoom, 0.0)), 
        p=0.75
    ),
    # Apply zoom 50% of the time
    ProbabilisticAugmentation(
        keras.layers.RandomRotation(factor = max_rotate), 
        p=0.75
    ),
    ProbabilisticAugmentation(
        keras.layers.RandomFlip(mode = "horizontal"), 
        p=0.5
    )
], name="augmentation_pipeline")

From here you can just go ahead and call them on your images when creating your image pipeline.

1 Like