Help in creating an effective and accurate CNN model

Hello all,
I am working on my science fair project which involves using various remote sensing methods to predict the probability of a landslide occurring in a given area. However, my current attempts have failed, as their MAE is too high (~30 in a 0-100 range). Can someone guide me in the process of creating this CNN? My data input is a GeoTIFF raster and I am expecting a value from 0-100 as an output prediction for the chance of a landslide occurring in that area. My code is given below

import os
import numpy as np
import rasterio
import random
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
import tensorflow as tf
from tensorflow.keras import layers, models
import optuna

# Enable Intel optimizations
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '1'

# Configure TensorFlow to use Intel GPU
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    try:
        tf.config.experimental.set_visible_devices(physical_devices[0], 'GPU')
        tf.config.experimental.set_memory_growth(physical_devices[0], True)
        print("Intel GPU is available and configured.")
    except Exception as e:
        print(f"Error configuring GPU: {e}")
else:
    print("No GPU found, using CPU.")

def load_geotiff(file_path):
    try:
        with rasterio.open(file_path) as src:
            return src.read(1)
    except Exception as e:
        print(f"Error loading file {file_path}: {e}")
        return None

def get_min_dimensions(subdirs, root_dir):
    min_height, min_width = float('inf'), float('inf')
    for subdir in subdirs:
        tiff_path = os.path.join(root_dir, subdir, "normalized_topography.tif")
        if os.path.exists(tiff_path):
            img = load_geotiff(tiff_path)
            if img is not None:
                min_height = min(min_height, img.shape[0])
                min_width = min(min_width, img.shape[1])
    return min_height, min_width

def crop_center(image, target_height, target_width):
    height, width = image.shape
    start_y = (height - target_height) // 2
    start_x = (width - target_width) // 2
    return image[start_y:start_y + target_height, start_x:start_x + target_width]

def load_and_crop_data(subdirs, root_dir, target_height, target_width):
    X, y = [], []
    for subdir in subdirs:
        tiff_path = os.path.join(root_dir, subdir, "normalized_topography.tif")
        susc_path = os.path.join(root_dir, subdir, "susc.txt")
        if os.path.exists(tiff_path) and os.path.exists(susc_path):
            img = load_geotiff(tiff_path)
            if img is not None:
                cropped_img = crop_center(img, target_height, target_width)
                X.append(cropped_img)
                y.append(float(open(susc_path).read().strip()))
    return np.array(X), np.array(y)

root_dir = "Data/Datasets"
all_subdirs = [str(i) for i in range(1, 501)]  # Use all data
random.shuffle(all_subdirs)

min_height, min_width = get_min_dimensions(all_subdirs, root_dir)
print(f"Minimum dimensions: height={min_height}, width={min_width}")

X, y = load_and_crop_data(all_subdirs, root_dir, min_height, min_width)

if X.shape[0] == 0:
    print("Error: No data loaded. Exiting.")
    exit()

X = X.reshape(X.shape[0], X.shape[1], X.shape[2], 1)

X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

imputer = SimpleImputer(strategy='mean')

X_train_imputed = imputer.fit_transform(X_train.reshape(-1, X_train.shape[1] * X_train.shape[2]))
X_val_imputed = imputer.transform(X_val.reshape(-1, X_val.shape[1] * X_val.shape[2]))
X_test_imputed = imputer.transform(X_test.reshape(-1, X_test.shape[1] * X_test.shape[2]))

X_train_imputed = X_train_imputed.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
X_val_imputed = X_val_imputed.reshape(X_val.shape[0], X_val.shape[1], X_val.shape[2], 1)
X_test_imputed = X_test_imputed.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)

y_train_imputed = imputer.fit_transform(y_train.reshape(-1, 1)).ravel()
y_val_imputed = imputer.transform(y_val.reshape(-1, 1)).ravel()
y_test_imputed = imputer.transform(y_test.reshape(-1, 1)).ravel()

y_train_scaled = y_train_imputed
y_val_scaled = y_val_imputed
y_test_scaled = y_test_imputed

class DisplayPredictions(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        predictions = self.model.predict(X_test_imputed)
        for i in range(min(5, len(y_test_scaled))):
            actual = y_test_scaled[i]
            predicted = predictions[i][0]
            diff = abs(actual - predicted)
            print(f"Epoch {epoch+1} - Actual: {actual:.4f}, Predicted: {predicted:.4f}, Difference: {diff:.4f}")

def create_model(trial):
    model = models.Sequential()
    model.add(layers.Input(shape=X_train_imputed.shape[1:]))
    
    for i in range(trial.suggest_int('n_conv_layers', 2, 4)):  # Reduced number of layers (between 2 and 4)
        filters = trial.suggest_int(f'conv{i+1}_filters', 32, 128)  # Reduced filter size
        model.add(layers.Conv2D(filters, (3, 3), activation='relu', padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Dropout(trial.suggest_float(f'dropout{i+1}', 0.1, 0.3)))  # Lower dropout

    model.add(layers.Flatten())
    model.add(layers.Dense(trial.suggest_int('dense_units', 64, 256), activation='relu'))
    model.add(layers.Dropout(trial.suggest_float('dense_dropout', 0.1, 0.3)))
    model.add(layers.Dense(1))
    
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-2)
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])
    return model

def objective(trial):
    model = create_model(trial)
    batch_size = trial.suggest_categorical('batch_size', [32, 64])  # Reduced batch size options
    
    lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3)  # Reduced patience
    early_stopping = tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)  # Reduced patience
    
    history = model.fit(
        X_train_imputed, y_train_scaled,
        batch_size=batch_size,
        epochs=50,  # Reduced epochs for faster training
        validation_data=(X_val_imputed, y_val_scaled),
        verbose=0,
        callbacks=[DisplayPredictions(), lr_scheduler, early_stopping]
    )
    
    return history.history['val_mae'][-1]

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)  # Reduced trials to speed up optimization

best_model = create_model(study.best_trial)
best_model.fit(
    X_train_imputed, y_train_scaled,
    epochs=50,  # Reduced epochs for faster training
    batch_size=study.best_params['batch_size'],
    validation_data=(X_val_imputed, y_val_scaled),
    verbose=1,
    callbacks=[
        DisplayPredictions(),
        tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3),
        tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
    ]
)

cnn_pred = best_model.predict(X_test_imputed)

mae_metric = tf.keras.metrics.MeanAbsoluteError()
cnn_mae = mae_metric(y_test_scaled, cnn_pred).numpy()

mse_metric = tf.keras.metrics.MeanSquaredError()
cnn_mse = mse_metric(y_test_scaled, cnn_pred).numpy()

print("\nCNN Results:")
print(f"Mean Absolute Error: {cnn_mae:.4f}")
print(f"Mean Squared Error: {cnn_mse:.4f}")
print("\nBest CNN Parameters:")
print(study.best_params)


The susc.txt file mentioned in the code contains the susceptibility values as a float. Can someone help me? I need to finish this in 10 days.

Thanks,
Arnav

If anyone needs more information, then just go to my github repository.
Everything you need should be in the Remote Sensing folder.

1 Like