Hi everyone! Just a newbie here. I am trying to use dask and tensorflow to train my machine learning model. I was able to use dask to read parquet files and load batches of it into my model. Here is the code that I made.
import tensorflow as tf import dask.dataframe as dd from dask_ml.model_selection import train_test_split train_path = "/some_train_path" data = dd.read_parquet(train_path, engine="pyarrow", compression = "snappy", columns = ["X_jets", "y"], split_row_groups = 32) def transform_partition(partition): x_jets_flat = [val for sublist in partition.values for inner_list in sublist for val in inner_list] x_jets_flat = np.array(x_jets_flat, np.float32) #Create to numpy array rows = np.int32(x_jets_flat.shape/375) #375 because it needs to be divided by 3*125 x_jets_reshaped = np.reshape(x_jets_flat, (rows, 3, 125, 125)) #Reshape #x2_jets_reshaped = x2_jets_reshaped[:, 1:, :, :] #comment out if want last two channels. modify to [0,2] if want 1st and last x2_jets_reshaped = np.where(x_jets_reshaped < 1e-3, 0, x_jets_reshaped) #Zero-Suppression x2_jets_reshaped[-1, ...] = 25. * x2_jets_reshaped[-1, ...] #This is for HCal as what I received from the source code x2_jets_reshaped = x2_jets_reshaped / 100. # Standardize num_samples = x2_jets_reshaped.shape #Just take the number of batches (32 in this case) x2_jets_reshaped_2d = np.reshape(x2_jets_reshaped, (num_samples, -1)) #Do reshapping as I need to do do scaling on images. x2_jets_standardized = StandardScaler().fit_transform(x2_jets_reshaped_2d) #Here, I also tried RobustScaler x2_jets_standardized_reshaped = np.reshape(x2_jets_standardized, x2_jets_reshaped.shape) #Back to original image format return x2_jets_standardized_reshaped def target_partition(partition): #This is pretty straightforward for y y = partition.values y = np.array(y, np.float32) return y
Now, apply the appropriate function to each of the dask series partition using .map_partitions(). Also, I used .to_delayed() here to create a lazily loaded chunks. This is equivalent to a in-memory equivalent of .compute()
X = data["X_jets"].map_partitions(transform_partition, meta = np.array()).to_delayed() y = data["y"].map_partitions(target_partition, meta = np.array()).to_delayed() #Split the dataset into training and validation set X_train, X_val, y_train, y_val = train_test_split(X, y, train_size = 3, test_size = 1, random_state = 42, shuffle = True)
Afterwards, create a function that will generate batches of the dataset.
def dask_generator(X_iter, y_iter): for delayed_X, delayed_y in zip(X_iter, y_iter): X = delayed_X.compute() y = delayed_y.compute() yield X, y train_loader = dask_generator(X_train, y_train) val_loader = dask_generator(X_val, y_val)
Finally, feed it to my machine learning model.
model.fit(train_loader, steps_per_epoch = len(X_train), validation_data = val_loader, validation_steps = len(X_val), verbose=1, epochs=25, callbacks = callbacks)
The error that it returns is
Epoch 1/25 3/3 [==============================] - 12s 4s/step - loss: 0.7539 - accuracy: 0.4792 - val_loss: 0.7354 - val_accuracy: 0.4375 - lr: 0.0010 Epoch 2/25 WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 75 batches). You may need to use the repeat() function when building your dataset. WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 1 batches). You may need to use the repeat() function when building your dataset. WARNING:tensorflow:Learning rate reduction is conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr WARNING:tensorflow:Early stopping conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr 3/3 [==============================] - 1s 226ms/step - loss: 0.7539 - accuracy: 0.4792 - lr: 0.0010
So, the training immediately stops at the first epoch. I am just using a fraction of the whole dataset to test if everything looks fine. Here, I wanted to bring all dataset fed in epoch 1 to epoch 2. However, tensorflow seems to understand that dask cannot repeatedly produce the dataset and feed all of it into each epoch. I also cannot use len(X_train)//batch_size here because it will defeat the purpose of feeding the whole dataset into each epoch. That is, this method will work but will only work until epoch 3. Is there a fix for this?