Dask with Tensorflow

Hi everyone! Just a newbie here. I am trying to use dask and tensorflow to train my machine learning model. I was able to use dask to read parquet files and load batches of it into my model. Here is the code that I made.

import tensorflow as tf
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split


train_path = "/some_train_path"
data = dd.read_parquet(train_path, engine="pyarrow", compression = "snappy", columns = ["X_jets", "y"],
                      split_row_groups = 32) 

def transform_partition(partition):
    x_jets_flat = [val for sublist in partition.values for inner_list in sublist for val in inner_list]
    x_jets_flat = np.array(x_jets_flat, np.float32) #Create to numpy array
    rows = np.int32(x_jets_flat.shape[0]/375) #375 because it needs to be divided by 3*125 
    x_jets_reshaped = np.reshape(x_jets_flat, (rows, 3, 125, 125)) #Reshape
#x2_jets_reshaped = x2_jets_reshaped[:, 1:, :, :]  #comment out if want last two channels. modify to [0,2] if want 1st and last
    x2_jets_reshaped = np.where(x_jets_reshaped < 1e-3, 0, x_jets_reshaped) #Zero-Suppression
    x2_jets_reshaped[-1, ...] = 25. * x2_jets_reshaped[-1, ...] #This is for HCal as what I received from the source code
    x2_jets_reshaped = x2_jets_reshaped / 100.  # Standardize
    num_samples = x2_jets_reshaped.shape[0] #Just take the number of batches (32 in this case)
    x2_jets_reshaped_2d = np.reshape(x2_jets_reshaped, (num_samples, -1)) #Do reshapping as I need to do do scaling on images.
    x2_jets_standardized = StandardScaler().fit_transform(x2_jets_reshaped_2d) #Here, I also tried RobustScaler
    x2_jets_standardized_reshaped = np.reshape(x2_jets_standardized, x2_jets_reshaped.shape) #Back to original image format
        
    return x2_jets_standardized_reshaped

def target_partition(partition): #This is pretty straightforward for y
    y = partition.values 
    y = np.array(y, np.float32)
    return y

Now, apply the appropriate function to each of the dask series partition using .map_partitions(). Also, I used .to_delayed() here to create a lazily loaded chunks. This is equivalent to a in-memory equivalent of .compute()

X = data["X_jets"].map_partitions(transform_partition, meta = np.array([])).to_delayed()
y = data["y"].map_partitions(target_partition, meta = np.array([])).to_delayed()
#Split the dataset into training and validation set
X_train, X_val, y_train, y_val = train_test_split(X, y, train_size = 3, test_size = 1, 
                                                  random_state = 42, shuffle = True)

Afterwards, create a function that will generate batches of the dataset.

def dask_generator(X_iter, y_iter):
    for delayed_X, delayed_y in zip(X_iter, y_iter):
        X = delayed_X.compute()
        y = delayed_y.compute()
        yield X, y

train_loader = dask_generator(X_train, y_train)
val_loader = dask_generator(X_val, y_val)

Finally, feed it to my machine learning model.

model.fit(train_loader, steps_per_epoch = len(X_train),  validation_data = val_loader, validation_steps = len(X_val),
          verbose=1, epochs=25, callbacks = callbacks)

The error that it returns is

Epoch 1/25
3/3 [==============================] - 12s 4s/step - loss: 0.7539 - accuracy: 0.4792 - val_loss: 0.7354 - val_accuracy: 0.4375 - lr: 0.0010
Epoch 2/25
WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 75 batches). You may need to use the repeat() function when building your dataset.
WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 1 batches). You may need to use the repeat() function when building your dataset.
WARNING:tensorflow:Learning rate reduction is conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr
WARNING:tensorflow:Early stopping conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr
3/3 [==============================] - 1s 226ms/step - loss: 0.7539 - accuracy: 0.4792 - lr: 0.0010

So, the training immediately stops at the first epoch. I am just using a fraction of the whole dataset to test if everything looks fine. Here, I wanted to bring all dataset fed in epoch 1 to epoch 2. However, tensorflow seems to understand that dask cannot repeatedly produce the dataset and feed all of it into each epoch. I also cannot use len(X_train)//batch_size here because it will defeat the purpose of feeding the whole dataset into each epoch. That is, this method will work but will only work until epoch 3. Is there a fix for this?

Hi everyone. Just a newbie here. I am trying to use dask to read a parquet file and generate batches of data then use tensorflow to train my machine learning model. Here is what I have done so far

import tensorflow as tf
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split

#Let us specicify the path of the files for training

train_path = /some_path
#Read the file 
data = dd.read_parquet(train_path, engine="pyarrow", compression = "snappy", columns = ["X_jets", "y"],
                      split_row_groups = 32) 

Here, data is a dask series that has about 10000 partitions, with each partition containing 32 batches/rows of data. Now, proceed to data preprocessing.

def transform_partition(partition):
    ...
    ...
    return X

def target_partition(partition): #This is pretty straightforward for y
    ...  
    ... 
    return y

Now, apply the appropriate function to each of the dask series partition using .map_partitions(). Also, I used .to_delayed() here to create a lazily loaded chunks. This is an in-memory equivalent of .compute().

X = data["X_jets"].map_partitions(transform_partition, meta = np.array([])).to_delayed()
y = data["y"].map_partitions(target_partition, meta = np.array([])).to_delayed()

#Split the dataset into training and validation set using dask-ml train_test_split
X_train, X_val, y_train, y_val = train_test_split(X, y, train_size = 3, test_size = 1, 
                                                  random_state = 42, shuffle = True)

#Here, I created a generic dataloader that will load 32 batches/samples to the model
def dask_generator(X_iter, y_iter):
    for delayed_X, delayed_y in zip(X_iter, y_iter):
        X = delayed_X.compute()
        y = delayed_y.compute()
        yield X, y

train_loader = dask_generator(X_train, y_train) 
val_loader = dask_generator(X_val, y_val)

Now, when I feed this to my model, I am getting an error.

model.fit(train_loader, steps_per_epoch=len(X_train), validation_data=val_loader,
          validation_steps=len(X_val), verbose=1, epochs=200, callbacks=callbacks)

Epoch 1/25
3/3 [==============================] - 12s 4s/step - loss: 0.7539 - accuracy: 0.4792 - val_loss: 0.7354 - val_accuracy: 0.4375 - lr: 0.0010
Epoch 2/25
WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 75 batches). You may need to use the repeat() function when building your dataset.
WARNING:tensorflow:Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches (in this case, 1 batches). You may need to use the repeat() function when building your dataset.
WARNING:tensorflow:Learning rate reduction is conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr
WARNING:tensorflow:Early stopping conditioned on metric `val_loss` which is not available. Available metrics are: loss,accuracy,lr
3/3 [==============================] - 1s 226ms/step - loss: 0.7539 - accuracy: 0.4792 - lr: 0.0010

I am just using a fraction of the whole dataset to test if everything looks fine. Here, I wanted to bring all dataset fed in epoch 1 to epoch 2. However, tensorflow seems to understand that dask cannot repeatedly produce the dataset and feed all of it into each epoch. I also cannot use len(X_train)//batch_size here because it will defeat the purpose of feeding the whole dataset into each epoch. That is, this method will work but will only work until epoch 3. Is there a fix for this?