Having NoneType error in multiple pipeline build using TFX and Airflow

I am working with TFX and AIrflow to build the pipeline for image classification and segmentation. I am replicating the repo by Jan Marcel Kezmann (Link). While executing the classification_dag.py in dags/classification_pipeline and segmentation_dag.py in dags/segmentation_pipeline, I am getting a runtime error. The error message is mentioned below.

[2023-01-23, 10:55:37 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_component.py", line 76, in _airflow_component_launcher
    launcher.launch()
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 209, in launch
    copy.deepcopy(execution_decision.exec_properties))
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py", line 74, in _run_executor
    copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 586, in Do
    TransformProcessor().Transform(label_inputs, label_outputs, status_file)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 1145, in Transform
    make_beam_pipeline_fn)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 1526, in _RunBeamImpl
    output_path=dataset.materialize_output_path))
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 201, in run_pipeline
    options)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 443, in run_stages
    runner_execution_context, bundle_context_manager, bundle_input)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 776, in _execute_bundle
    bundle_manager))
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1000, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1393, in process_bundle
    for ix, part in enumerate(input.partition(self._num_workers)):
AttributeError: 'NoneType' object has no attribute 'partition'
[2023-01-23, 10:55:37 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=segmentation_dag, task_id=Transform, execution_date=20230123T052420, start_date=20230123T052509, end_date=20230123T052537
[2023-01-23, 10:55:37 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 43 for task Transform ('NoneType' object has no attribute 'partition'; 42847)
[2023-01-23, 10:55:37 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2023-01-23, 10:55:37 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

Airflow Dag is as below Segmentation Dag

I tried to check multiple versions of Airflow and TFX. I belive this error is due to incompatible versions as error is coming from the executor of transform component and airflow. Before the dag execution, I have compile the individule module and have not found any error in them.

Versions TFX:- 1.12.0 Tensorflow:- 2.11.0 Airflow:- 2.3.0

I’m not sure exactly where the problem would be, but here are some thoughts.

  1. We don’t support Conda, although people have made it work. It might be better to try virtualenv.
  2. Have you successfully gotten the basic Airflow workshop code to work? If not, I’d suggest starting there and then once that’s working layer on the image classification and segmentation.