TFX Issue: RuntimeError: Failed to apply: CreateSavedModel[tf_v2_only]

Hello all,
I am Trying to Train a Text Regression Model Using TFX Pipeline I have Done Multiple Experiments wrt to preprocessing_fn but unfortunately Pipeline Breaks after Transform Component.

Can anyone help me out.

def _build_keras_model() -> tf.keras.Model:
    inputs = [
        tf.keras.layers.Input(shape=(max_len,),dtype=tf.string,name=x) for x in _FEATURE_KEYS,]
    
    print(inputs)
    
    other_layers = [tf.keras.layers.Embedding(input_dim=max_features,output_dim=64),
                     tf.keras.layers.Dense(units=64,activation='relu',name="FirstDenseLayer"),
                     tf.keras.layers.Dense(1, activation='relu',name="OutputLayer")                     
                    ]
    
    inputs.extend(other_layers)
    model_ex_3 = tf.keras.Sequential(inputs)
    
#     model_ex_3 = tf.keras.Sequential([
#         tf.keras.layers.Input(shape=(max_len,),dtype=tf.string,name=x) for x in _FEATURE_KEYS,]
#         tf.keras.layers.Embedding(input_dim=max_features,output_dim=64),
#         tf.keras.layers.Dense(units=64,activation='relu',name="FirstDenseLayer"),
#     #         tf.keras.layers.Dropout(rate=0.2,name="FirstDropOutLayer"),
#     #         tf.keras.layers.Dense(units=hp_units,activation='relu',name="FirstSubDenseLayer"),
#     #         tf.keras.layers.Dropout(rate=0.2,name="FirstSubDropOutLayer"),
#         tf.keras.layers.Dense(units=32,activation='relu',name="SecondDenseLayer"),
#     #         tf.keras.layers.Dropout(rate=0.2,name="SecondDropOutLayer"),  
#     #         tf.keras.layers.Dense(units=hp_units,activation='relu',name="ThirdDenseLayer"),
#     #         tf.keras.layers.Dropout(rate=0.2,name="ThirdropOutLayer"),
#         tf.keras.layers.Dense(1, activation='relu',name="OutputLayer")
#     ],
#         name ="Text-Regression-Third")



    ##Defining Tuner Learning Rate
    #     hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
    model_ex_3.compile(loss = tf.keras.losses.mean_absolute_error,

                       optimizer = tf.keras.optimizers.Adam(),

                       metrics=['mae']
                      )
    model_ex_3.summary()
    print(model_ex_3.summary())
    return model_ex_3

from typing import List
import tensorflow_model_analysis as tfma

from tfx.components import SchemaGen

from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
_LABEL_KEY = ‘Total Hours’
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

def _create_pipeline(pipeline_name: str,
pipeline_root: str,
data_root: str,
module_file: str,
eval_accuracy_threshold: float,
serving_model_dir: str,
metadata_path: str) → tfx.dsl.Pipeline:

“”“Creates a three component penguin pipeline with TFX.”“”

Brings data into the pipeline.

components =[]

example_gen = tfx.components.CsvExampleGen(input_base=data_root)    
components.append(example_gen)

    
statistics_gen = tfx.components.StatisticsGen(
                                examples=example_gen.outputs['examples'])
components.append(statistics_gen)


schema_gen = tfx.components.SchemaGen(
                                statistics=statistics_gen.outputs['statistics'])
components.append(schema_gen)



transform = tfx.components.Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    preprocessing_fn='preprocessing.preprocessing_fn',

splits_config=transform_pb2.SplitConfig(

splits=[

transform_pb2.SplitConfig.Split(name=‘train’, hash_buckets=4),

transform_pb2.SplitConfig.Split(name=‘eval’, hash_buckets=1)

]

)

)

components.append(transform)


print(transform.outputs['transformed_examples'])

Uses user-provided Python function that trains a model.

trainer = tfx.components.Trainer(
    transformed_examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=tfx.proto.TrainArgs(num_steps=100),
    eval_args=tfx.proto.EvalArgs(num_steps=5),
    run_fn='time_log_trainer.run_fn')

components.append(trainer)

  # Get the latest blessed model for model validation.
model_resolver = tfx.dsl.Resolver(
  strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
  model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
  model_blessing=tfx.dsl.Channel(
      type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
          'latest_blessed_model_resolver')

  # perform quality validation of a candidate model (compared to a baseline).
eval_config = tfma.EvalConfig(
  model_specs=[
      tfma.ModelSpec(
          signature_name='serving_default',
          label_key=_LABEL_KEY,
          # Use transformed label key if Transform is used.
          # label_key=features.transformed_name(features.LABEL_KEY),
          preprocessing_function_names=['preprocessing.preprocessing_fn'])
  ],
  slicing_specs=[tfma.SlicingSpec()],
  metrics_specs=[
      tfma.MetricsSpec(metrics=[
          tfma.MetricConfig(
              class_name='MeanAbsoluteError',
              threshold=tfma.MetricThreshold(
                  value_threshold=tfma.GenericValueThreshold(
                      lower_bound={'value': eval_accuracy_threshold}),
                  change_threshold=tfma.GenericChangeThreshold(
                      direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                      absolute={'value': -1e-10})))
      ])
  ])

evaluator = tfx.components.Evaluator(  # pylint: disable=unused-variable
  examples=example_gen.outputs['examples'],
  model=trainer.outputs['model'],
  baseline_model=model_resolver.outputs['model'],
  # Change threshold will be ignored if there is no baseline (first run).
  eval_config=eval_config)

from tfx.types import Channel

model_blessing_channel = Channel(…)

Pushes the model to a filesystem destination.

components.append(evaluator)
pusher = tfx.components.Pusher(
  model=trainer.outputs['model'],
  model_blessing=evaluator.outputs['blessing'],
  push_destination=tfx.proto.PushDestination(
      filesystem=tfx.proto.PushDestination.Filesystem(
          base_directory=serving_model_dir)))

components.append(pusher)

return tfx.dsl.Pipeline(
  pipeline_name=pipeline_name,
  pipeline_root=pipeline_root,
  metadata_connection_config=tfx.orchestration.metadata
  .sqlite_metadata_connection_config(metadata_path),
  components=components)
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      eval_accuracy_threshold=0,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))
ERROR:

INFO:dill:D2: <dict object at 0x7fcb1bb40240>
T4: <class 'threading.Condition'>
INFO:dill:T4: <class 'threading.Condition'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fcb182a1d00>
INFO:dill:D2: <dict object at 0x7fcb182a1d00>
Lo: <unlocked _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:Lo: <unlocked _thread.lock object at 0x7fcb1a1f6bd0>
F2: <function _create_lock at 0x7fcbac181040>
INFO:dill:F2: <function _create_lock at 0x7fcbac181040>
# F2
INFO:dill:# F2
# Lo
INFO:dill:# Lo
B3: <built-in method acquire of _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:B3: <built-in method acquire of _thread.lock object at 0x7fcb1a1f6bd0>
# B3
INFO:dill:# B3
B3: <built-in method release of _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:B3: <built-in method release of _thread.lock object at 0x7fcb1a1f6bd0>
# B3
INFO:dill:# B3
T4: <class 'collections.deque'>
INFO:dill:T4: <class 'collections.deque'>
# T4
INFO:dill:# T4
# D2
INFO:dill:# D2
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb1be88a00>
INFO:dill:D2: <dict object at 0x7fcb1be88a00>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb187a58c0>
INFO:dill:D2: <dict object at 0x7fcb187a58c0>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb1bbb4800>
INFO:dill:D2: <dict object at 0x7fcb1bbb4800>
# D2
INFO:dill:# D2
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 372 failed.
INFO:absl:Cleaning up stateless execution info.


PLEASE HELP ME OUT.

@HIMANSHU_SHAKYAWAR,

Welcome to the Tensorflow Forum!

Could you please provide the preprocessing_fn for transform component to debug the issue?

Thank you!

Hey Chunduriv,
Thanks For the Quick Reply.
i have Done Various Modification in my Model as well as in preprocessing_fun.
please check the Preprocessing_fn below:

%%writefile {_preprocessing_module_file}
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow.keras.preprocessing.text import Tokenizer

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense


# import tensorflow_transform as transform
@tf.function
def preprocessing_fn(inputs):
    """
    Preprocessing function to transform text features into embeddings using TensorFlow Transform.

    Args:
        inputs: Dictionary of input tensors keyed by feature name.

    Returns:
        Dictionary of transformed tensors keyed by feature name.
    """
    #Initialization of Tokenizer
    
    for feature_name, tensor in inputs.items():
        print(f'{feature_name}: {tensor}')
    # Define feature keys
    

    _TEXT_FEATURE_KEYS = [
       'Description'
                    ] 
    _LABEL_KEY = 'Total Hours'
    max_len = 64
    max_vocab_size = 50000
    tokenizer = Tokenizer(num_words=max_vocab_size)
    # Convert text features to embeddings
    transformed_features = {}
    for key in _TEXT_FEATURE_KEYS:       
        # Define vocabulary size
        vocab_size = 1000

        # Define maximum sequence length
        max_seq_length = 20

        # Tokenize the text
        tokenizer = Tokenizer(num_words=vocab_size)
        tokenizer.fit_on_texts(inputs[key])
        sequences = tokenizer.texts_to_sequences(inputs[key])

        # Pad the sequences
        padded_sequences = pad_sequences(sequences, maxlen=max_seq_length)

        transformed_features[f'{key}'] = padded_sequences
    transformed_features[f'{_LABEL_KEY}'] = inputs[_LABEL_KEY]

    return transformed_features

in Above code DONE various Modification.

i have trained the model for the same use case using tensorflow, which is providing the MAE of around 80, but when i am trying to replicate for production using TFX , it start throwing various Errors.

1 Like

Hey Guyz,
any suggestion or solution for this issue.

Instead of starting with the LocalDagRunner() I’d suggest starting with the InteractiveContext() in a notebook, as we do for the Interactive Keras tutorial. You can then step through each component in the pipeline separately, and verify that the outputs are what you expect. Looking at the error message that you posted I can see that it comes from dill, but it’s not clear to me which component is throwing the error. Once you have everything working in the InteractiveContext it’s easy to port it to use the LocalDagRunner or a different runner.