I am working with TFX and AIrflow to build the pipeline for image classification and segmentation. I am replicating the repo by Jan Marcel Kezmann (Link). While executing the classification_dag.py in dags/classification_pipeline and segmentation_dag.py in dags/segmentation_pipeline, I am getting a runtime error. The error message is mentioned below.
[2023-01-23, 10:55:37 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
return_value = self.execute_callable()
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_component.py", line 76, in _airflow_component_launcher
launcher.launch()
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 209, in launch
copy.deepcopy(execution_decision.exec_properties))
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py", line 74, in _run_executor
copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 586, in Do
TransformProcessor().Transform(label_inputs, label_outputs, status_file)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 1145, in Transform
make_beam_pipeline_fn)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 1526, in _RunBeamImpl
output_path=dataset.materialize_output_path))
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/pipeline.py", line 597, in __exit__
self.result = self.run()
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/pipeline.py", line 574, in run
return self.runner.run_pipeline(self, self._options)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 201, in run_pipeline
options)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 443, in run_stages
runner_execution_context, bundle_context_manager, bundle_input)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 776, in _execute_bundle
bundle_manager))
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1000, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1393, in process_bundle
for ix, part in enumerate(input.partition(self._num_workers)):
AttributeError: 'NoneType' object has no attribute 'partition'
[2023-01-23, 10:55:37 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=segmentation_dag, task_id=Transform, execution_date=20230123T052420, start_date=20230123T052509, end_date=20230123T052537
[2023-01-23, 10:55:37 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 43 for task Transform ('NoneType' object has no attribute 'partition'; 42847)
[2023-01-23, 10:55:37 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2023-01-23, 10:55:37 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
Airflow Dag is as below Segmentation Dag
I tried to check multiple versions of Airflow and TFX. I belive this error is due to incompatible versions as error is coming from the executor of transform component and airflow. Before the dag execution, I have compile the individule module and have not found any error in them.
Versions TFX:- 1.12.0 Tensorflow:- 2.11.0 Airflow:- 2.3.0