Facing issues running tensorflow_io library on dataflow in a tfx pipeline

I am currently facing a related issue on dataflow when using tfx library. The tfx pipeline works fine locally but it fails on dataflow.

Overview: Issue arises while using tensorflow_io library to do preprocessing in tfx transform component. It works fine when one worker is used on dataflow but it throws out the below mentioned error when there are multiple workers used in dataflow. Is there any correct way to load external libraries such as ‘tensorflow_io’ while using dataflow as beam pipeline argument? I have built custom docker containers using ‘tfx create’ to build the pipeline and ‘tfx run’ to run the pipeline.
In the docker container I have specifically installed tensorflow_io library.
library versions:

tensorflow-io==0.23.1
tensorflow-io-gcs-filesystem==0.23.1
tensorflow-io==2.7.0
tfx=1.6.1

Traceback:

FileNotFoundError: Op type not registered ‘IO>AudioResample’ in binary running on cmle-training-workerpool0-12345. Make sure the Op and Kernel are registered in the binary running in this process. Note that if you are loading a saved graph which used ops from tf.contrib, accessing (e.g.) tf.contrib.resampler should be done before importing the graph, as contrib ops are lazily registered when the module is first accessed

Have you already tried creating and using a container image? Here’s how you can do that:

Using a Container Image for a Worker

TFX 0.26.0 and above has experimental support for using custom container image for Dataflow workers.

In order to use this, you have to:

  • Build a Docker image which has both tfx and the users’ custom code and dependencies pre-installed.
    • For users who (1) use tfx>=0.26 and (2) uses python 3.7 to develop their pipelines, the easiest way to do this is extending the corresponding version of the official tensorflow/tfx image:
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.
ARG TFX_VERSION FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
  • Push the image built to a container image registry which is accessible by the project used by Dataflow.
    • Google Cloud users can consider using Cloud Build which nicely automates above steps.
  • Provide following beam_pipeline_args:
beam_pipeline_args.extend([
   '--runner=DataflowRunner', 
   '--project={project-id}', 
   '--worker_harness_container_image={image-ref}', 
   '--experiments=use_runner_v2', 
])

See Apache Beam und TFX  |  TensorFlow

Yes, I have already tried running the pipeline with “beam_pipeline_args” suggested above but to no avail.

beam_pipeline_args = [
“–project={gcp-project}”,
“–runner=DataflowRunner”,
“–job_name=dataflow-xx” ,
“–temp_location={temp-location}”,
“–region={gcp-region}”,
“–experiments=use_runner_v2”,
“–sdk_container_image={custom-pipeline-image}”
“–sdk_location=container”
]

I have also set the “default_image” parameter in “kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig” to the custom image.

Any further suggestions and help is much appreciated.

Thanks, I’ve asked some folks to look into it.

1 Like

The issue likely is with the de-serialization of that op off of disk and not the library itself. Are you using a Lambda layer by any chance? I faced a similar issue using tensorflow transform, and have made a pull request to update the getting started notebook with the caveats. You can find my proposed update to the documentation here, point #3 under define a preprocessing function.

Before I figured out the issue with the Lambda layers related to de-serialization, the workaround that worked for me is invoking the particular op (not even in an assignment) correctly loaded it into the graph and allowed the transformations to succeed.

The other issue that I experienced is that if that op was already accessed in the same program execution, the Op type not registered issue wouldn’t come up. It would only happen when I had reloaded preprocessing_fn off of disk. In my case, the solution that worked was getting rid of Lambda layers and using tf.function directly. I have an implementation up on github in case you’re interested in seeing it. If you have a repo with this code, I’d love to work on solving this.