def expend_as(tensor, rep):
return layers.Lambda(lambda x, repnum: K.repeat_elements(x, repnum, axis=3),
arguments={‘repnum’: rep})(tensor)
def double_conv_layer(x, filter_size, size, dropout, batch_norm=False):
axis = 3
conv = layers.Conv2D(size, (filter_size, filter_size), padding='same')(x)
if batch_norm is True:
conv = layers.BatchNormalization(axis=axis)(conv)
conv = layers.Activation('relu')(conv)
conv = layers.Conv2D(size, (filter_size, filter_size), padding='same')(conv)
if batch_norm is True:
conv = layers.BatchNormalization(axis=axis)(conv)
conv = layers.Activation('relu')(conv)
if dropout > 0:
conv = layers.Dropout(dropout)(conv)
shortcut = layers.Conv2D(size, kernel_size=(1, 1), padding='same')(x)
if batch_norm is True:
shortcut = layers.BatchNormalization(axis=axis)(shortcut)
res_path = layers.add([shortcut, conv])
return res_path
def gating_signal(input, out_size, batch_norm=False):
“”"
resize the down layer feature map into the same dimension as the up layer feature map
using 1x1 conv
:param input: down-dim feature map
:param out_size:output channel number
:return: the gating feature map with the same dimension of the up layer feature map
“”"
x = layers.Conv2D(out_size, (1, 1), padding=‘same’)(input)
if batch_norm:
x = layers.BatchNormalization()(x)
x = layers.Activation(‘relu’)(x)
return x
def attention_block(x, gating, inter_shape):
shape_x = K.int_shape(x)
shape_g = K.int_shape(gating)
theta_x = layers.Conv2D(inter_shape, (2, 2), strides=(2, 2), padding='same')(x) # 16
shape_theta_x = K.int_shape(theta_x)
phi_g = layers.Conv2D(inter_shape, (1, 1), padding='same')(gating)
upsample_g = layers.Conv2DTranspose(inter_shape, (3, 3),
strides=(shape_theta_x[1] // shape_g[1], shape_theta_x[2] // shape_g[2]),
padding='same')(phi_g) # 16
concat_xg = layers.add([upsample_g, theta_x])
act_xg = layers.Activation('relu')(concat_xg)
psi = layers.Conv2D(1, (1, 1), padding='same')(act_xg)
sigmoid_xg = layers.Activation('sigmoid')(psi)
shape_sigmoid = K.int_shape(sigmoid_xg)
upsample_psi = layers.UpSampling2D(size=(shape_x[1] // shape_sigmoid[1], shape_x[2] // shape_sigmoid[2]))(sigmoid_xg) # 32
upsample_psi = expend_as(upsample_psi, shape_x[3])
y = layers.multiply([upsample_psi, x])
result = layers.Conv2D(shape_x[3], (1, 1), padding='same')(y)
result_bn = layers.BatchNormalization()(result)
return result_bn
def Attention_ResUNet(input_shape, NUM_CLASSES=1, dropout_rate=0.0, batch_norm=True):
FILTER_NUM = 64 # number of basic filters for the first layer
FILTER_SIZE = 3 # size of the convolutional filter
UP_SAMP_SIZE = 2
# input data
# dimension of the image depth
inputs = layers.Input((512, 512, 3), dtype=tf.float32)
axis = 3
# Downsampling layers
# DownRes 1, double residual convolution + pooling
conv_512 = double_conv_layer(inputs, 3, 64, dropout_rate, batch_norm)
pool_256 = layers.MaxPooling2D(pool_size=(2,2))(conv_512)
# DownRes 2
conv_256 = double_conv_layer(pool_256, 3, 2*64, dropout_rate, batch_norm)
pool_128 = layers.MaxPooling2D(pool_size=(2,2))(conv_256)
# DownRes 3
conv_128 = double_conv_layer(pool_128, 3, 4*64, dropout_rate, batch_norm)
pool_64 = layers.MaxPooling2D(pool_size=(2,2))(conv_128)
# DownRes 4
conv_64 = double_conv_layer(pool_64, 3, 8*64, dropout_rate, batch_norm)
pool_32 = layers.MaxPooling2D(pool_size=(2,2))(conv_64)
# DownRes 5, convolution only
conv_32 = double_conv_layer(pool_32, 3, 16*64, dropout_rate, batch_norm)
# Upsampling layers
# UpRes 6, attention gated concatenation + upsampling + double residual convolution
gating_64 = gating_signal(conv_32, 8*64, batch_norm)
att_64 = attention_block(conv_64, gating_64, 8*64)
up_64 = layers.UpSampling2D(size=(2, 2), data_format="channels_last")(conv_32)
up_64 = layers.concatenate([up_64, att_64], axis=axis)
up_conv_64 = double_conv_layer(up_64, 3, 8*64, dropout_rate, batch_norm)
# UpRes 7
gating_128 = gating_signal(up_conv_64, 4*64, batch_norm)
att_128 = attention_block(conv_128, gating_128, 4*64)
up_128 = layers.UpSampling2D(size=(2, 2), data_format="channels_last")(up_conv_64)
up_128 = layers.concatenate([up_128, att_128], axis=axis)
up_conv_128 = double_conv_layer(up_128, 3, 4*64, dropout_rate, batch_norm)
# UpRes 8
gating_256 = gating_signal(up_conv_128, 2*64, batch_norm)
att_256 = attention_block(conv_256, gating_256, 2*64)
up_256 = layers.UpSampling2D(size=(2, 2), data_format="channels_last")(up_conv_128)
up_256 = layers.concatenate([up_256, att_256], axis=axis)
up_conv_256 = double_conv_layer(up_256, 3, 2*64, dropout_rate, batch_norm)
# UpRes 9
gating_512 = gating_signal(up_conv_128, 64, batch_norm)
att_512 = attention_block(conv_512, gating_512, 64)
up_512 = layers.UpSampling2D(size=(2, 2), data_format="channels_last")(up_conv_256)
up_512 = layers.concatenate([up_512, att_512], axis=axis)
up_conv_512 = double_conv_layer(up_512, 3, 64, dropout_rate, batch_norm)
# 1*1 convolutional layers
# valid padding
# batch normalization
# sigmoid nonlinear activation
conv_final = layers.Conv2D(NUM_CLASSES, kernel_size=(1,1))(up_conv_512)
conv_final = layers.BatchNormalization(axis=axis)(conv_final)
conv_final = layers.Activation('sigmoid')(conv_final)
# Model integration
model = models.Model(inputs, conv_final, name="AttentionResUNet")
return model
input_shape=(512,512,3)
model=Attention_ResUNet( input_shape, NUM_CLASSES=1,dropout_rate=0.0, batch_norm=True)
model.summary()
The code for training:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K
os.environ[“TF_CPP_MIN_LOG_LEVEL”] = “2” #set to 1 for warnings and errors
import numpy as np
import cv2
import keras
import keras.utils
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision
H = 512
W = 512
from focal_loss import BinaryFocalLoss #for tough to classify segement class
def create_dir(path):
“”" Create a directory. “”"
if not os.path.exists(path):
os.makedirs(path)
def shuffling(x, y):
x, y = shuffle(x, y, random_state=42)
return x, y
def load_data(path):
x = sorted(glob(os.path.join(path, “image”, “.png")))
y = sorted(glob(os.path.join(path, “mask”, ".png”)))
return x, y
def read_image(path):
path = path.decode()
x = cv2.imread(path, cv2.IMREAD_COLOR)
x = x/255.0
x = x.astype(np.float32)
return x
def read_mask(path):
path = path.decode()
x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
x = x/255.0
x = x > 0.5
x = x.astype(np.float32)
x = np.expand_dims(x, axis=-1)
return x
def tf_parse(x, y):
def _parse(x, y):
x = read_image(x)
y = read_mask(y)
return x, y
x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
x.set_shape([H, W, 3])
y.set_shape([H, W, 1])
return x, y
def tf_dataset(x, y, batch=8):
dataset = tf.data.Dataset.from_tensor_slices((x, y))
dataset = dataset.map(tf_parse)
dataset = dataset.batch(batch)
dataset = dataset.prefetch(10)
return dataset
if name == “main”:
“”" Seeding “”"
np.random.seed(42)
tf.random.set_seed(42)
""" Directory for storing files """
create_dir("files")
""" Hyperparameters """
batch_size = 2
lr = 0.002
num_epochs = 60
model_path = os.path.join("files", "model.h5")
csv_path = os.path.join("files", "data.csv")
""" Dataset """
train_path = os.path.join("/content/drive/MyDrive/Data_brain/train/")
valid_path = os.path.join("/content/drive/MyDrive/Data_brain/test/")
train_x, train_y = load_data(train_path)
train_x, train_y = shuffling(train_x, train_y)
valid_x, valid_y = load_data(valid_path)
print(f"Train: {len(train_x)} - {len(train_y)}")
print(f"Valid: {len(valid_x)} - {len(valid_y)}")
train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)
""" Model """
model = Attention_ResUNet(input_shape)
metrics = [jacard_coef, Recall(), Precision()]
model.compile(loss=BinaryFocalLoss(gamma=2), optimizer=Adam(lr), metrics=metrics)
callbacks = [
ModelCheckpoint(model_path, verbose=1, save_best_only=True),
#ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=1e-7, verbose=1),
CSVLogger(csv_path),
TensorBoard(),
#EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=False),
]
model.fit(
train_dataset,
epochs=num_epochs,
validation_data=valid_dataset,
callbacks=callbacks,
shuffle=False)
I am getting the following error while training the model(the results are absurd):
Train: 1280 - 1280
Valid: 32 - 32
/usr/local/lib/python3.7/dist-packages/keras/utils/generic_utils.py:497: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.
category=CustomMaskWarning)
Epoch 1/60
640/640 [==============================] - 279s 414ms/step - loss: 0.0857 - jacard_coef: 0.0047 - recall: 0.0555 - precision: 0.0049 - val_loss: 0.0365 - val_jacard_coef: 0.0044 - val_recall: 0.0000e+00 - val_precision: 0.0000e+00
Epoch 00001: val_loss improved from inf to 0.03647, saving model to files/model.h5
Epoch 2/60
640/640 [==============================] - 263s 411ms/step - loss: 0.0235 - jacard_coef: 0.0045 - recall: 0.0000e+00 - precision: 0.0000e+00 - val_loss: 0.0159 - val_jacard_coef: 0.0043 - val_recall: 0.0000e+00 - val_precision: 0.0000e+00
Epoch 00002: val_loss improved from 0.03647 to 0.01592, saving model to files/model.h5
Epoch 3/60
39/640 [>…] - ETA: 4:05 - loss: 0.0159 - jacard_coef: 0.0045 - recall: 0.0000e+00 - precision: 0.0000e+00