Hello all,
I am working on my science fair project which involves using various remote sensing methods to predict the probability of a landslide occurring in a given area. However, my current attempts have failed, as their MAE is too high (~30 in a 0-100 range). Can someone guide me in the process of creating this CNN? My data input is a GeoTIFF raster and I am expecting a value from 0-100 as an output prediction for the chance of a landslide occurring in that area. My code is given below
import os
import numpy as np
import rasterio
import random
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
import tensorflow as tf
from tensorflow.keras import layers, models
import optuna
# Enable Intel optimizations
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '1'
# Configure TensorFlow to use Intel GPU
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
tf.config.experimental.set_visible_devices(physical_devices[0], 'GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)
print("Intel GPU is available and configured.")
except Exception as e:
print(f"Error configuring GPU: {e}")
print("No GPU found, using CPU.")
def load_geotiff(file_path):
with rasterio.open(file_path) as src:
return src.read(1)
except Exception as e:
print(f"Error loading file {file_path}: {e}")
return None
def get_min_dimensions(subdirs, root_dir):
min_height, min_width = float('inf'), float('inf')
for subdir in subdirs:
tiff_path = os.path.join(root_dir, subdir, "normalized_topography.tif")
if os.path.exists(tiff_path):
img = load_geotiff(tiff_path)
if img is not None:
min_height = min(min_height, img.shape[0])
min_width = min(min_width, img.shape[1])
return min_height, min_width
def crop_center(image, target_height, target_width):
height, width = image.shape
start_y = (height - target_height) // 2
start_x = (width - target_width) // 2
return image[start_y:start_y + target_height, start_x:start_x + target_width]
def load_and_crop_data(subdirs, root_dir, target_height, target_width):
X, y = [], []
for subdir in subdirs:
tiff_path = os.path.join(root_dir, subdir, "normalized_topography.tif")
susc_path = os.path.join(root_dir, subdir, "susc.txt")
if os.path.exists(tiff_path) and os.path.exists(susc_path):
img = load_geotiff(tiff_path)
if img is not None:
cropped_img = crop_center(img, target_height, target_width)
return np.array(X), np.array(y)
root_dir = "Data/Datasets"
all_subdirs = [str(i) for i in range(1, 501)] # Use all data
min_height, min_width = get_min_dimensions(all_subdirs, root_dir)
print(f"Minimum dimensions: height={min_height}, width={min_width}")
X, y = load_and_crop_data(all_subdirs, root_dir, min_height, min_width)
if X.shape[0] == 0:
print("Error: No data loaded. Exiting.")
X = X.reshape(X.shape[0], X.shape[1], X.shape[2], 1)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
imputer = SimpleImputer(strategy='mean')
X_train_imputed = imputer.fit_transform(X_train.reshape(-1, X_train.shape[1] * X_train.shape[2]))
X_val_imputed = imputer.transform(X_val.reshape(-1, X_val.shape[1] * X_val.shape[2]))
X_test_imputed = imputer.transform(X_test.reshape(-1, X_test.shape[1] * X_test.shape[2]))
X_train_imputed = X_train_imputed.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
X_val_imputed = X_val_imputed.reshape(X_val.shape[0], X_val.shape[1], X_val.shape[2], 1)
X_test_imputed = X_test_imputed.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)
y_train_imputed = imputer.fit_transform(y_train.reshape(-1, 1)).ravel()
y_val_imputed = imputer.transform(y_val.reshape(-1, 1)).ravel()
y_test_imputed = imputer.transform(y_test.reshape(-1, 1)).ravel()
y_train_scaled = y_train_imputed
y_val_scaled = y_val_imputed
y_test_scaled = y_test_imputed
class DisplayPredictions(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
predictions = self.model.predict(X_test_imputed)
for i in range(min(5, len(y_test_scaled))):
actual = y_test_scaled[i]
predicted = predictions[i][0]
diff = abs(actual - predicted)
print(f"Epoch {epoch+1} - Actual: {actual:.4f}, Predicted: {predicted:.4f}, Difference: {diff:.4f}")
def create_model(trial):
model = models.Sequential()
for i in range(trial.suggest_int('n_conv_layers', 2, 4)): # Reduced number of layers (between 2 and 4)
filters = trial.suggest_int(f'conv{i+1}_filters', 32, 128) # Reduced filter size
model.add(layers.Conv2D(filters, (3, 3), activation='relu', padding='same'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Dropout(trial.suggest_float(f'dropout{i+1}', 0.1, 0.3))) # Lower dropout
model.add(layers.Dense(trial.suggest_int('dense_units', 64, 256), activation='relu'))
model.add(layers.Dropout(trial.suggest_float('dense_dropout', 0.1, 0.3)))
learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-2)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])
return model
def objective(trial):
model = create_model(trial)
batch_size = trial.suggest_categorical('batch_size', [32, 64]) # Reduced batch size options
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3) # Reduced patience
early_stopping = tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True) # Reduced patience
history = model.fit(
X_train_imputed, y_train_scaled,
epochs=50, # Reduced epochs for faster training
validation_data=(X_val_imputed, y_val_scaled),
callbacks=[DisplayPredictions(), lr_scheduler, early_stopping]
return history.history['val_mae'][-1]
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10) # Reduced trials to speed up optimization
best_model = create_model(study.best_trial)
X_train_imputed, y_train_scaled,
epochs=50, # Reduced epochs for faster training
validation_data=(X_val_imputed, y_val_scaled),
tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3),
tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
cnn_pred = best_model.predict(X_test_imputed)
mae_metric = tf.keras.metrics.MeanAbsoluteError()
cnn_mae = mae_metric(y_test_scaled, cnn_pred).numpy()
mse_metric = tf.keras.metrics.MeanSquaredError()
cnn_mse = mse_metric(y_test_scaled, cnn_pred).numpy()
print("\nCNN Results:")
print(f"Mean Absolute Error: {cnn_mae:.4f}")
print(f"Mean Squared Error: {cnn_mse:.4f}")
print("\nBest CNN Parameters:")
The susc.txt
file mentioned in the code contains the susceptibility values as a float. Can someone help me? I need to finish this in 10 days.
If anyone needs more information, then just go to my github repository.
Everything you need should be in the Remote Sensing folder.