TFX custom component pipeline error


I am trying to implement a TFX pipeline with custom components. As a start I am creating a custom ExampleGen component which takes images and masks as inputs and creates TFRecords as output. I have run this component in the interactive context to verify the output. But which I hook it up to the kubeflow pipeline, I get a failure with the following error

→ 290 input_artifact_spec.task_output_artifact.producer_task = producer_id
291 input_artifact_spec.task_output_artifact.output_artifact_key = output_key
292 task_spec.inputs.artifacts[name].CopyFrom(input_artifact_spec)

TypeError: None has type NoneType, but expected one of: bytes, unicode

Here the producer_id and the outpu_key is not set. Do I need to set them with some value when I configure the component?

Here is how I am setting up the component

import tfx
from tfx import v1

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
) → tfx.v1.dsl.Pipeline:
“”“Creates a three component penguin pipeline with TFX.”""

Brings data into the pipeline.

#example_gen = tfx.components.CsvExampleGen(input_base=data_root)

#test_context = InteractiveContext()

data_root = os.path.join(DATA_ROOT, ‘buildings/’, ‘images’)
examples = data_root
input_artifact = tfx.types.standard_artifacts.Examples()
input_artifact.uri = data_root
input_artifact.split_names = artifact_utils.encode_split_names([‘train’, ‘val’])

input_channel = tfx.types.channel_utils.as_channel(artifacts=[input_artifact])

ingest_images = CustomIngestionComponent(
input=input_channel, name=‘ImageIngestionComponent’)

Uses user-provided Python function that trains a model.

trainer = tfx.v1.components.Trainer(

Pushes the model to a filesystem destination.

pusher = tfx.v1.components.Pusher(

Following three components will be included in the pipeline.

components = [

return tfx.v1.dsl.Pipeline(


runner = tfx.v1.orchestration.experimental.KubeflowV2DagRunner(

Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.

outpipe = _create_pipeline(
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),

_ = outpipe)

Hi Subhasish,

It looks like you’re trying to create a custom ExampleGen which reads data from files, which we refer to as a FileBasedExampleGen. See Custom File-Based ExampleGen

There is also an example which reads Avro files: Avro Example

I hope that helps!

1 Like

Hi Robert,

Thanks for your response. I have looked into the FileBased option and the custom ExampleGen is working in the interactive context. I have verified both the input and output. When I integrate it to the VertexAI pipeline I get the above error.


The problem seems to be in the way that you’re ingesting data. Custom ExampleGens work differently than other custom components. The BaseExampleGenExecutor defines an abstract method called GetInputSourceToExamplePTransform, which you should implement in order to ingest your data, and which should return a Beam PTransform. For example, see in the Avro executor how it implements GetInputSourceToExamplePTransform and defines _AvroToExample, which returns a PCollection.

It might be a bit confusing at first, since _AvroToExample is declared to return a PCollection, and GetInputSourceToExamplePTransform returns a PTransform, but notice that GetInputSourceToExamplePTransform returns _AvroToExample itself, and not the result of _AvroToExample. _AvroToExample is itself a PTransform, which produces a PCollection.

The producer_id issue is because your channel/artifact is not generated by a component.
Using tfx.types.channel_utils.as_channel to create channel won’t work for a pipeline, If you want to create a channel from external source, use Importer (e.g., in penguin_pipeline_local, we use importer to importer an external schema)

HI Robert,

Thanks for the pointer. I will go over it.

Hi Jiayi,

Thanks for your input. Can you point me to the code you are talking about?

This is the Importer example

For custom examplegen, you can follow Robert’s comment above