Good evening, I need help with a method I have developed to retrieve my predictions by applying reverse standardization to them, from my first date for test data, retrieve predictions up to 2026. I’m using one-step and multi-step models, or from 24 inputs, I want to make a prediction within the next 24 hours.
I’m a bit dubious about the way I’ve done the code, I need some help.
def predict_until_2026(self, model, scaler, plot_col='water_level'):
"""
Returns a DataFrame containing the predicted values for 'water_level' (unstandardized),
with corresponding dates, using StandardScaler for unstandardization.
"""
# Get a sample batch from the test set
inputs, _ = next(iter(self.test))
# Make predictions
predictions = model(inputs)
# Check that the shape of predictions is correct
if len(predictions.shape) != 3:
raise ValueError("Predictions must be in the form (batch, time, feature).")
# Identify the index of the specified column for unstandardization
label_col_index = self.label_columns_indices.get(plot_col, self.column_indices[plot_col])
# Flatten the array of predicted values for the specified column
scaled_predicted_values = predictions[:, :, label_col_index].numpy().flatten()
# Reverse standardization for the `plot_col` column
predicted_values = (scaled_predicted_values * scaler.scale_[label_col_index]) + scaler.mean_[label_col_index]
# Set the start and end dates for predictions
start_date = water_level_df_copy['Date'].iloc[int(len(water_level_df_copy) * 0.90)]
end_date = pd.Timestamp("2026-12-31")
# Generate daily timestamps for the prediction period
daily_timestamps = pd.date_range(start=start_date, periods=len(predicted_values), freq='D')
# Adjust dates if they exceed `end_date`
daily_timestamps = daily_timestamps[daily_timestamps <= end_date]
# Create a DataFrame with dates and unstandardized predicted values
results_df = pd.DataFrame({
'Date': daily_timestamps,
'Predicted Water Level': predicted_values[:len(daily_timestamps)]
})
return results_df
It’s the general code
class WindowGenerator():
def __init__(self, input_width, label_width, shift,
train_df=train_df, val_df=val_df, test_df=test_df,
label_columns=None):
# Store the raw data.
self.train_df = train_df
self.val_df = val_df
self.test_df = test_df
# Determine the label column indices.
self.label_columns = label_columns
if label_columns is not None:
self.label_columns_indices = {name: i for i, name in
enumerate(label_columns)}
self.column_indices = {name: i for i, name in
enumerate(train_df.columns)}
# Define window parameters.
self.input_width = input_width
self.label_width = label_width
self.shift = shift
self.total_window_size = input_width + shift
self.input_slice = slice(0, input_width)
self.input_indices = np.arange(self.total_window_size)[self.input_slice]
self.label_start = self.total_window_size - self.label_width
self.labels_slice = slice(self.label_start, None)
self.label_indices = np.arange(self.total_window_size)[self.labels_slice]
def __repr__(self):
return '\n'.join([
f'Total window size: {self.total_window_size}',
f'Input indices: {self.input_indices}',
f'Label indices: {self.label_indices}',
f'Label column name(s): {self.label_columns}'])
# Split
# Given a list of consecutive inputs, the split_window method will convert them
# into a window of inputs and a window of labels.
def split_window(self, features):
inputs = features[:, self.input_slice, :]
labels = features[:, self.labels_slice, :]
if self.label_columns is not None:
labels = tf.stack(
[labels[:, :, self.column_indices[name]] for name in self.label_columns],
axis=-1)
# Slicing doesn't preserve static shape information, so set the shapes
# manually to make `tf.data.Datasets` easier to inspect.
inputs.set_shape([None, self.input_width, None])
labels.set_shape([None, self.label_width, None])
return inputs, labels
# Plot
def plot(self, model=None, plot_col='water_level', max_subplots=3):
inputs, labels = self.example
plt.figure(figsize=(12, 8))
plot_col_index = self.column_indices[plot_col]
max_n = min(max_subplots, len(inputs))
for n in range(max_n):
plt.subplot(max_n, 1, n + 1)
plt.ylabel(f'{plot_col} [normalized]')
plt.plot(self.input_indices, inputs[n, :, plot_col_index],
label='Inputs', marker='.', zorder=-10)
if self.label_columns:
label_col_index = self.label_columns_indices.get(plot_col, None)
else:
label_col_index = plot_col_index
if label_col_index is None:
continue
plt.scatter(self.label_indices, labels[n, :, label_col_index],
edgecolors='k', label='Labels', c='#2ca02c', s=64)
if model is not None:
predictions = model(inputs)
plt.scatter(self.label_indices, predictions[n, :, label_col_index],
marker='X', edgecolors='k', label='Predictions',
c='#ff7f0e', s=64)
if n == 0:
plt.legend()
plt.xlabel('Time [h]')
# Create tf.Data.Dataset
def make_dataset(self, data):
data = np.array(data, dtype=np.float32)
ds = tf.keras.utils.timeseries_dataset_from_array(
data=data,
targets=None,
sequence_length=self.total_window_size,
sequence_stride=1,
shuffle=True,
batch_size=1024,)
ds = ds.map(self.split_window)
return ds
@property
def train(self):
return self.make_dataset(self.train_df)
@property
def val(self):
return self.make_dataset(self.val_df)
@property
def test(self):
return self.make_dataset(self.test_df)
@property
def example(self):
"""Get and cache an example batch of `inputs, labels` for plotting."""
result = getattr(self, '_example', None)
if result is None:
# If no example batch is found, get one from the `.train` dataset
result = next(iter(self.train))
# Cache it for next time
self._example = result
return result
def predict_until_2026(self, model, scaler, plot_col='water_level'):
"""
Returns a DataFrame containing the predicted values for 'water_level' (unstandardized),
with corresponding dates, using StandardScaler for unstandardization.
"""
# Get a sample batch from the test set
inputs, _ = next(iter(self.test))
# Make predictions
predictions = model(inputs)
# Check that the shape of predictions is correct
if len(predictions.shape) != 3:
raise ValueError("Predictions must be in the form (batch, time, feature).")
# Identify the index of the specified column for unstandardization
label_col_index = self.label_columns_indices.get(plot_col, self.column_indices[plot_col])
# Flatten the array of predicted values for the specified column
scaled_predicted_values = predictions[:, :, label_col_index].numpy().flatten()
# Reverse standardization for the `plot_col` column
predicted_values = (scaled_predicted_values * scaler.scale_[label_col_index]) + scaler.mean_[label_col_index]
# Set the start and end dates for predictions
start_date = water_level_df_copy['Date'].iloc[int(len(water_level_df_copy) * 0.90)]
end_date = pd.Timestamp("2026-12-31")
# Generate daily timestamps for the prediction period
daily_timestamps = pd.date_range(start=start_date, periods=len(predicted_values), freq='D')
# Adjust dates if they exceed `end_date`
daily_timestamps = daily_timestamps[daily_timestamps <= end_date]
# Create a DataFrame with dates and unstandardized predicted values
results_df = pd.DataFrame({
'Date': daily_timestamps,
'Predicted Water Level': predicted_values[:len(daily_timestamps)]
})
return results_df