Hello all,
I am working on my science fair project which involves using various remote sensing methods to predict the probability of a landslide occurring in a given area. However, my current attempts have failed, as their MAE is too high (~30 in a 0-100 range). Can someone guide me in the process of creating this CNN? My data input is a GeoTIFF raster and I am expecting a value from 0-100 as an output prediction for the chance of a landslide occurring in that area. My code is given below
import os
import numpy as np
import rasterio
import random
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
import tensorflow as tf
from tensorflow.keras import layers, models
import optuna
# Enable Intel optimizations
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '1'
# Configure TensorFlow to use Intel GPU
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
try:
tf.config.experimental.set_visible_devices(physical_devices[0], 'GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)
print("Intel GPU is available and configured.")
except Exception as e:
print(f"Error configuring GPU: {e}")
else:
print("No GPU found, using CPU.")
def load_geotiff(file_path):
try:
with rasterio.open(file_path) as src:
return src.read(1)
except Exception as e:
print(f"Error loading file {file_path}: {e}")
return None
def get_min_dimensions(subdirs, root_dir):
min_height, min_width = float('inf'), float('inf')
for subdir in subdirs:
tiff_path = os.path.join(root_dir, subdir, "normalized_topography.tif")
if os.path.exists(tiff_path):
img = load_geotiff(tiff_path)
if img is not None:
min_height = min(min_height, img.shape[0])
min_width = min(min_width, img.shape[1])
return min_height, min_width
def crop_center(image, target_height, target_width):
height, width = image.shape
start_y = (height - target_height) // 2
start_x = (width - target_width) // 2
return image[start_y:start_y + target_height, start_x:start_x + target_width]
def load_and_crop_data(subdirs, root_dir, target_height, target_width):
X, y = [], []
for subdir in subdirs:
tiff_path = os.path.join(root_dir, subdir, "normalized_topography.tif")
susc_path = os.path.join(root_dir, subdir, "susc.txt")
if os.path.exists(tiff_path) and os.path.exists(susc_path):
img = load_geotiff(tiff_path)
if img is not None:
cropped_img = crop_center(img, target_height, target_width)
X.append(cropped_img)
y.append(float(open(susc_path).read().strip()))
return np.array(X), np.array(y)
root_dir = "Data/Datasets"
all_subdirs = [str(i) for i in range(1, 501)] # Use all data
random.shuffle(all_subdirs)
min_height, min_width = get_min_dimensions(all_subdirs, root_dir)
print(f"Minimum dimensions: height={min_height}, width={min_width}")
X, y = load_and_crop_data(all_subdirs, root_dir, min_height, min_width)
if X.shape[0] == 0:
print("Error: No data loaded. Exiting.")
exit()
X = X.reshape(X.shape[0], X.shape[1], X.shape[2], 1)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
imputer = SimpleImputer(strategy='mean')
X_train_imputed = imputer.fit_transform(X_train.reshape(-1, X_train.shape[1] * X_train.shape[2]))
X_val_imputed = imputer.transform(X_val.reshape(-1, X_val.shape[1] * X_val.shape[2]))
X_test_imputed = imputer.transform(X_test.reshape(-1, X_test.shape[1] * X_test.shape[2]))
X_train_imputed = X_train_imputed.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
X_val_imputed = X_val_imputed.reshape(X_val.shape[0], X_val.shape[1], X_val.shape[2], 1)
X_test_imputed = X_test_imputed.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)
y_train_imputed = imputer.fit_transform(y_train.reshape(-1, 1)).ravel()
y_val_imputed = imputer.transform(y_val.reshape(-1, 1)).ravel()
y_test_imputed = imputer.transform(y_test.reshape(-1, 1)).ravel()
y_train_scaled = y_train_imputed
y_val_scaled = y_val_imputed
y_test_scaled = y_test_imputed
class DisplayPredictions(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
predictions = self.model.predict(X_test_imputed)
for i in range(min(5, len(y_test_scaled))):
actual = y_test_scaled[i]
predicted = predictions[i][0]
diff = abs(actual - predicted)
print(f"Epoch {epoch+1} - Actual: {actual:.4f}, Predicted: {predicted:.4f}, Difference: {diff:.4f}")
def create_model(trial):
model = models.Sequential()
model.add(layers.Input(shape=X_train_imputed.shape[1:]))
for i in range(trial.suggest_int('n_conv_layers', 2, 4)): # Reduced number of layers (between 2 and 4)
filters = trial.suggest_int(f'conv{i+1}_filters', 32, 128) # Reduced filter size
model.add(layers.Conv2D(filters, (3, 3), activation='relu', padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Dropout(trial.suggest_float(f'dropout{i+1}', 0.1, 0.3))) # Lower dropout
model.add(layers.Flatten())
model.add(layers.Dense(trial.suggest_int('dense_units', 64, 256), activation='relu'))
model.add(layers.Dropout(trial.suggest_float('dense_dropout', 0.1, 0.3)))
model.add(layers.Dense(1))
learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-2)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])
return model
def objective(trial):
model = create_model(trial)
batch_size = trial.suggest_categorical('batch_size', [32, 64]) # Reduced batch size options
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3) # Reduced patience
early_stopping = tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True) # Reduced patience
history = model.fit(
X_train_imputed, y_train_scaled,
batch_size=batch_size,
epochs=50, # Reduced epochs for faster training
validation_data=(X_val_imputed, y_val_scaled),
verbose=0,
callbacks=[DisplayPredictions(), lr_scheduler, early_stopping]
)
return history.history['val_mae'][-1]
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10) # Reduced trials to speed up optimization
best_model = create_model(study.best_trial)
best_model.fit(
X_train_imputed, y_train_scaled,
epochs=50, # Reduced epochs for faster training
batch_size=study.best_params['batch_size'],
validation_data=(X_val_imputed, y_val_scaled),
verbose=1,
callbacks=[
DisplayPredictions(),
tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3),
tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
]
)
cnn_pred = best_model.predict(X_test_imputed)
mae_metric = tf.keras.metrics.MeanAbsoluteError()
cnn_mae = mae_metric(y_test_scaled, cnn_pred).numpy()
mse_metric = tf.keras.metrics.MeanSquaredError()
cnn_mse = mse_metric(y_test_scaled, cnn_pred).numpy()
print("\nCNN Results:")
print(f"Mean Absolute Error: {cnn_mae:.4f}")
print(f"Mean Squared Error: {cnn_mse:.4f}")
print("\nBest CNN Parameters:")
print(study.best_params)
The susc.txt
file mentioned in the code contains the susceptibility values as a float. Can someone help me? I need to finish this in 10 days.
Thanks,
Arnav
If anyone needs more information, then just go to my github repository.
Everything you need should be in the Remote Sensing folder.