RuntimeError: __iter__() is only supported inside of tf.function or when eager execution is enabled

I am trying to run a pre-processing script in tensorflow latest container image
docker_image = tensorflow/tensorflow:2.7.0-gpu

Below is the script and error logs

import argparse
import io
import os
import subprocess

import ray
import tensorflow.compat.v1 as tf
from PIL import Image
from psutil import cpu_count

from utils import *
from object_detection.utils import dataset_util, label_map_util

label_map = label_map_util.load_labelmap(‘./label_map.pbtxt’)
label_map_dict = label_map_util.get_label_map_dict(label_map)
t2idict = {y:x for x,y in label_map_dict.items()}
def class_text_to_int(text):
return t2idict[text]

def create_tf_example(filename, encoded_jpeg, annotations):
“”"
This function create a tf.train.Example from the Waymo frame.
args:
- filename [str]: name of the image
- encoded_jpeg [bytes]: jpeg encoded image
- annotations [protobuf object]: bboxes and classes
returns:
- tf_example [tf.Train.Example]: tf example in the objection detection api format.
“”"

# TODO: Implement function to convert the data
encoded_jpg_io = io.BytesIO(encoded_jpeg)
image = Image.open(encoded_jpg_io)
width, height = image.size

image_format = b'jpeg'

xmins = []
xmaxs = []
ymins = []
ymaxs = []
classes_text = []
classes = []

for index, row in enumerate(annotations):
    
    xmin = row.box.center_x - row.box.length/2.0
    xmax = row.box.center_x + row.box.length/2.0
    ymin = row.box.center_y - row.box.width/2.0
    ymax = row.box.center_y + row.box.width/2.0
    
     
    xmins.append(xmin / width)
    xmaxs.append(xmax / width)
    ymins.append(ymin / height)
    ymaxs.append(ymax / height)
    classes_text.append(class_text_to_int(row.type).encode('utf8'))
    classes.append(row.type)

filename = filename.encode('utf8')
tf_example = tf.train.Example(features=tf.train.Features(feature={
    'image/height': int64_feature(height),
    'image/width': int64_feature(width),
    'image/filename': bytes_feature(filename),
    'image/source_id': bytes_feature(filename),
    'image/encoded': bytes_feature(encoded_jpeg),
    'image/format': bytes_feature(image_format),
    'image/object/bbox/xmin': float_list_feature(xmins),
    'image/object/bbox/xmax': float_list_feature(xmaxs),
    'image/object/bbox/ymin': float_list_feature(ymins),
    'image/object/bbox/ymax': float_list_feature(ymaxs),
    'image/object/class/text': bytes_list_feature(classes_text),
    'image/object/class/label': int64_list_feature(classes),
}))
return tf_example

def download_tfr(filepath, temp_dir):
“”"
download a single tf record
args:
- filepath [str]: path to the tf record file
- temp_dir [str]: path to the directory where the raw data will be saved
returns:
- local_path [str]: path where the file is saved
“”"
# create data dir
dest = os.path.join(temp_dir, ‘raw’)
os.makedirs(dest, exist_ok=True)
filename = os.path.basename(filepath)
local_path = os.path.join(dest, filename)
if os.path.exists(local_path):
return local_path
print(“start downloading {}”.format(local_path))
# download the tf record file
cmd = [‘gsutil’, ‘cp’, filepath, f’{dest}‘]
logger.info(f’Downloading {filepath}’)
res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if res.returncode != 0:
logger.error(f’Could not download file {filepath}')

print("complete downloading {}".format(local_path))
return local_path

def process_tfr(filepath, data_dir):
“”"
process a Waymo tf record into a tf api tf record
args:
- filepath [str]: path to the Waymo tf record file
- data_dir [str]: path to the destination directory
“”"
# create processed data dir
dest = os.path.join(data_dir, ‘processed’)
os.makedirs(dest, exist_ok=True)
file_name = os.path.basename(filepath)

if os.path.exists(f'{dest}/{file_name}'):
    return

logger.info(f'Processing {filepath}')
writer = tf.python_io.TFRecordWriter(f'{dest}/{file_name}')
dataset = tf.data.TFRecordDataset(filepath, compression_type='')
for idx, data in enumerate(dataset):
    frame = open_dataset.Frame()
    frame.ParseFromString(bytearray(data.numpy()))
    encoded_jpeg, annotations = parse_frame(frame)
    filename = file_name.replace('.tfrecord', f'_{idx}.tfrecord')
    tf_example = create_tf_example(filename, encoded_jpeg, annotations)
    writer.write(tf_example.SerializeToString())
writer.close()
return

@ray.remote
def download_and_process(filename, temp_dir, data_dir):
# need to re-import the logger because of multiprocesing
dest = os.path.join(data_dir, ‘processed’)
os.makedirs(dest, exist_ok=True)
file_name = os.path.basename(filename)

if os.path.exists(f'{dest}/{file_name}'):
    print("processed file  {} exists, skip".format(file_name))
    return
logger = get_module_logger(__name__)
local_path = download_tfr(filename, temp_dir)
process_tfr(local_path, data_dir)
# remove the original tf record to save space
if os.path.exists(local_path):
    logger.info(f'Deleting {local_path}')
    os.remove(local_path)

if name == “main”:
parser = argparse.ArgumentParser(description=‘Download and process tf files’)
parser.add_argument(‘–data_dir’, required=False, default=“./data”,
help=‘processed data directory’)
parser.add_argument(‘–temp_dir’, required=False, default=“./data/temp”,
help=‘raw data directory’)
args = parser.parse_args()
logger = get_module_logger(name)
# open the filenames file
with open(‘filenames.txt’, ‘r’) as f:
filenames = f.read().splitlines()
logger.info(f’Download {len(filenames)} files. Be patient, this will take a long time.')

data_dir = args.data_dir
temp_dir = args.temp_dir

download_and_process(filenames[0], temp_dir, data_dir)

# init ray
ray.init(num_cpus=cpu_count())

workers = [download_and_process.remote(fn, temp_dir, data_dir) for fn in filenames[:100]]
_ = ray.get(workers)
print("Done with downloading")

Logs

Traceback (most recent call last):
File “download_process.py”, line 204, in
_ = ray.get(workers)
File “/usr/local/lib/python3.8/dist-packages/ray/worker.py”, line 1538, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RuntimeError): ray::main.download_and_process() (pid=250, ip=172.17.0.2)
File “python/ray/_raylet.pyx”, line 479, in ray._raylet.execute_task
File “download_process.py”, line 176, in download_and_process
process_tfr(local_path, data_dir)
File “download_process.py”, line 137, in process_tfr
iterator = iter(dataset)
File “/usr/local/lib/python3.8/dist-packages/tensorflow/python/data/ops/dataset_ops.py”, line 3445, in iter
return iter(self._dataset)
File “/usr/local/lib/python3.8/dist-packages/tensorflow/python/data/ops/dataset_ops.py”, line 413, in iter
raise RuntimeError("iter() is only supported inside of tf.function "
RuntimeError: iter() is only supported inside of tf.function or when eager execution is enabled.

Hi @Karan_Singh

As you have mentioned, you have installed TensorFlow 2.7 with GPU support in your system, but you are importing the TensorFlow 1.x in your code here:

import ray
import tensorflow.compat.v1 as tf  #<-------------replace this line with 'import tensorflow as tf'
from PIL import Image
from psutil import cpu_count

Could you please import the tensorflow as mentioned in above code sothat installed TensorFlow 2.x can get imported and try executing the code again in eager execution mode?

Let us know if the issue still persists. Thank you.