Tensorflow and Keras multithreading problem with h5imagegenerator


I am trying to fit a model. I have around 10^7 64x64x1 uint8 images stored in hdf5 file. I am trying to multithread my way out of waiting for hours to finish one epoch, but I’ve encountered some problems that limit what optimizer I can or cannot use. This relates to me having to hack around to get fitting to work, but losing the optimizer state in the process.

My system is Windows 10, I run stuff on 1080Ti nVidia GPU, I use python 3.9.9, and TF 2.7.0. I use IDLE as my coding platform. I try to keep things mostly vanilla.

Firstly, I have my model that I build. For this I have a function, that creates a sequential model, and this is correct and bugfree, but there is no point in going to details of the model, as it is changing all the time. Some convolutional layers and some fully connected with all but last having activation of relu, and last softmax. This function is buildNNetModelBW()

I start the code with model creation:

model = buildNNetModelBW()
#opt = Adam(learning_rate=0.01)
opt = SGD(learning_rate=0.004, momentum=0.8, nesterov=True)
model.compile(loss = categorical_crossentropy,
              optimizer = opt,
              metrics =['categorical_crossentropy', 'accuracy'])

I’ve tried Adam and SGD, Adam doesn’t work, because I cannot fit more than one epoch, and SGD loses all momentum, but works normally as if all momentum options were disabled.

I have some helper code, this generates the training and validation generators:

def getGenerators(p=0.05):
    print("Create augmentor")
    augmentor = Compose(getAugmentor(p))
    print("Create training generator")
    traingen = train_generator = HDF5ImageGenerator(
    print("Create validation generator")
    validgen = train_generator = HDF5ImageGenerator(
    return (traingen, validgen)

I use h5imagegenerator and albumentations to augment the images and also convert them to 32x32x1 float in range 0…1.

I have the following code that has been the only way to make this model fitting work that I’ve been able to come up with:

for i in range(0,1000):
        traingen, validgen = getGenerators(p=0.1) #I create two generators every loop, otherwise I get stuck
            x=traingen, #I use generators here, everything is "set" to show all values
            batch_size=64,#64 to make things just a tad faster
            shuffle=False, #Data generated is in random order, and shuffling takes way too long
            steps_per_epoch=int(len(traingen)), #Number of batches
            max_queue_size=16, #Does fit in memory, leaves ~10GB of free ram
            workers=15, #I have 16 cores, so 1 for other OS tasks is okay
            use_multiprocessing=True #To make it fast
        time.sleep(1) #These sleeps are an attempt to give the running threads time to exit
        model.save(r'./model') #Save model
        time.sleep(1) #More time to let everything cool down
        traingen = None #If I don't set these to none, the training will hang regardless of new generators
        validgen = None
        model = None #Maybe should've gone with del, but setting None seems to do the trick too
        time.sleep(1) #Give more time for things to settle, not sure if necessary at all
        backend.clear_session() #I was hoping this would kill processes, but it just keeps things clean
        time.sleep(1) #At this point I am really not sure if sleeping does anything
        model = models.load_model(r'./model') #Load the model, and repopulate the model variable.
        #The idea of all of this is to detatch the model from anything that might be left from the old
        #fitting round, so that it wouldn't hang

The thing is, I can only run the first epoch, or the code hangs. I cannot “just clear the session”, because the next epoch will hang. The only way I’ve been able to get more than one epoch is to reload the model and start each round from scratch. But this makes training slower, and some optimizers useless, as the optimizer state is not reinstated, but created new every iteration. I’ve tried storing the optimizer and then replacing it, but this doesn’t work either.

I’ve also found that this method I use leaves some threads hanging around, and when they terminate, sometimes they leave error messages:

Exception in thread Thread-662:
Traceback (most recent call last):
  File "C:\Program Files\Python39\lib\threading.py", line 973, in _bootstrap_inner
  File "C:\Program Files\Python39\lib\threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Program Files\Python39\lib\site-packages\keras\utils\data_utils.py", line 744, in _run
    with closing(self.executor_fn(_SHARED_SEQUENCES)) as executor:
  File "C:\Program Files\Python39\lib\site-packages\keras\utils\data_utils.py", line 721, in pool_fn
    pool = get_pool_class(True)(
  File "C:\Program Files\Python39\lib\multiprocessing\context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
  File "C:\Program Files\Python39\lib\multiprocessing\pool.py", line 212, in __init__
  File "C:\Program Files\Python39\lib\multiprocessing\pool.py", line 303, in _repopulate_pool
    return self._repopulate_pool_static(self._ctx, self.Process,
  File "C:\Program Files\Python39\lib\multiprocessing\pool.py", line 326, in _repopulate_pool_static
  File "C:\Program Files\Python39\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python39\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Program Files\Python39\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Program Files\Python39\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
RuntimeError: dictionary changed size during iteration

But it is only ever one thread per epoch. Also if I kill the console window, some python processes will be left hanging indefinitely, until I close them manually.

Is there a smarter way for doing multiple epoch fitting, or at least reinstating optimizer state at reset?