Hi everyone! Just a newbie here. I am trying to use dask and tensorflow to train my machine learning model. I was able to use dask to read parquet files and load batches of it into my model. Here is the code that I made.
import tensorflow as tf
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
train_path = "/some_train_path"
data = dd.read_parquet(train_path, engine="pyarrow", compression = "snappy", columns = ["X_jets", "y"],
split_row_groups = 32)
def transform_partition(partition):
x_jets_flat = [val for sublist in partition.values for inner_list in sublist for val in inner_list]
x_jets_flat = np.array(x_jets_flat, np.float32) #Create to numpy array
rows = np.int32(x_jets_flat.shape[0]/375) #375 because it needs to be divided by 3*125
x_jets_reshaped = np.reshape(x_jets_flat, (rows, 3, 125, 125)) #Reshape
#x2_jets_reshaped = x2_jets_reshaped[:, 1:, :, :] #comment out if want last two channels. modify to [0,2] if want 1st and last
x2_jets_reshaped = np.where(x_jets_reshaped < 1e-3, 0, x_jets_reshaped) #Zero-Suppression
x2_jets_reshaped[-1, ...] = 25. * x2_jets_reshaped[-1, ...] #This is for HCal as what I received from the source code
x2_jets_reshaped = x2_jets_reshaped / 100. # Standardize
num_samples = x2_jets_reshaped.shape[0] #Just take the number of batches (32 in this case)
x2_jets_reshaped_2d = np.reshape(x2_jets_reshaped, (num_samples, -1)) #Do reshapping as I need to do do scaling on images.
x2_jets_standardized = StandardScaler().fit_transform(x2_jets_reshaped_2d) #Here, I also tried RobustScaler
x2_jets_standardized_reshaped = np.reshape(x2_jets_standardized, x2_jets_reshaped.shape) #Back to original image format
return x2_jets_standardized_reshaped
def target_partition(partition): #This is pretty straightforward for y
y = partition.values
y = np.array(y, np.float32)
return y
Now, apply the appropriate function to each of the dask series partition using .map_partitions(). Also, I used .to_delayed() here to create a lazily loaded chunks. This is equivalent to a in-memory equivalent of .compute()
X = data["X_jets"].map_partitions(transform_partition, meta = np.array([])).to_delayed()
y = data["y"].map_partitions(target_partition, meta = np.array([])).to_delayed()
#Split the dataset into training and validation set
X_train, X_val, y_train, y_val = train_test_split(X, y, train_size = 3, test_size = 1,
random_state = 42, shuffle = True)
Afterwards, create a function that will generate batches of the dataset.
def dask_generator(X_iter, y_iter):
for delayed_X, delayed_y in zip(X_iter, y_iter):
X = delayed_X.compute()
y = delayed_y.compute()
yield X, y
train_loader = dask_generator(X_train, y_train)
val_loader = dask_generator(X_val, y_val)
Finally, feed it to my machine learning model.
model.fit(train_loader, steps_per_epoch = len(X_train), validation_data = val_loader, validation_steps = len(X_val),
verbose=1, epochs=25, callbacks = callbacks)
The error that it returns is
Epoch 1/25
3/3 [==============================] - 12s 4s/step - loss: 0.7539 - accuracy: 0.4792 - val_loss: 0.7354 - val_accuracy: 0.4375 - lr: 0.0010
Epoch 2/25
WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 75 batches). You may need to use the repeat() function when building your dataset.
WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 1 batches). You may need to use the repeat() function when building your dataset.
WARNING:tensorflow:Learning rate reduction is conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr
WARNING:tensorflow:Early stopping conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr
3/3 [==============================] - 1s 226ms/step - loss: 0.7539 - accuracy: 0.4792 - lr: 0.0010
So, the training immediately stops at the first epoch. I am just using a fraction of the whole dataset to test if everything looks fine. Here, I wanted to bring all dataset fed in epoch 1 to epoch 2. However, tensorflow seems to understand that dask cannot repeatedly produce the dataset and feed all of it into each epoch. I also cannot use len(X_train)//batch_size here because it will defeat the purpose of feeding the whole dataset into each epoch. That is, this method will work but will only work until epoch 3. Is there a fix for this?