TFX Issue: RuntimeError: Failed to apply: CreateSavedModel[tf_v2_only]

Hello all,
I am Trying to Train a Text Regression Model Using TFX Pipeline I have Done Multiple Experiments wrt to preprocessing_fn but unfortunately Pipeline Breaks after Transform Component.

Can anyone help me out.

def _build_keras_model() -> tf.keras.Model:
    inputs = [
        tf.keras.layers.Input(shape=(max_len,),dtype=tf.string,name=x) for x in _FEATURE_KEYS,]
    
    print(inputs)
    
    other_layers = [tf.keras.layers.Embedding(input_dim=max_features,output_dim=64),
                     tf.keras.layers.Dense(units=64,activation='relu',name="FirstDenseLayer"),
                     tf.keras.layers.Dense(1, activation='relu',name="OutputLayer")                     
                    ]
    
    inputs.extend(other_layers)
    model_ex_3 = tf.keras.Sequential(inputs)
    
#     model_ex_3 = tf.keras.Sequential([
#         tf.keras.layers.Input(shape=(max_len,),dtype=tf.string,name=x) for x in _FEATURE_KEYS,]
#         tf.keras.layers.Embedding(input_dim=max_features,output_dim=64),
#         tf.keras.layers.Dense(units=64,activation='relu',name="FirstDenseLayer"),
#     #         tf.keras.layers.Dropout(rate=0.2,name="FirstDropOutLayer"),
#     #         tf.keras.layers.Dense(units=hp_units,activation='relu',name="FirstSubDenseLayer"),
#     #         tf.keras.layers.Dropout(rate=0.2,name="FirstSubDropOutLayer"),
#         tf.keras.layers.Dense(units=32,activation='relu',name="SecondDenseLayer"),
#     #         tf.keras.layers.Dropout(rate=0.2,name="SecondDropOutLayer"),  
#     #         tf.keras.layers.Dense(units=hp_units,activation='relu',name="ThirdDenseLayer"),
#     #         tf.keras.layers.Dropout(rate=0.2,name="ThirdropOutLayer"),
#         tf.keras.layers.Dense(1, activation='relu',name="OutputLayer")
#     ],
#         name ="Text-Regression-Third")



    ##Defining Tuner Learning Rate
    #     hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
    model_ex_3.compile(loss = tf.keras.losses.mean_absolute_error,

                       optimizer = tf.keras.optimizers.Adam(),

                       metrics=['mae']
                      )
    model_ex_3.summary()
    print(model_ex_3.summary())
    return model_ex_3

from typing import List
import tensorflow_model_analysis as tfma

from tfx.components import SchemaGen

from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
_LABEL_KEY = ‘Total Hours’
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

def _create_pipeline(pipeline_name: str,
pipeline_root: str,
data_root: str,
module_file: str,
eval_accuracy_threshold: float,
serving_model_dir: str,
metadata_path: str) → tfx.dsl.Pipeline:

“”“Creates a three component penguin pipeline with TFX.”“”

Brings data into the pipeline.

components =[]

example_gen = tfx.components.CsvExampleGen(input_base=data_root)    
components.append(example_gen)

    
statistics_gen = tfx.components.StatisticsGen(
                                examples=example_gen.outputs['examples'])
components.append(statistics_gen)


schema_gen = tfx.components.SchemaGen(
                                statistics=statistics_gen.outputs['statistics'])
components.append(schema_gen)



transform = tfx.components.Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    preprocessing_fn='preprocessing.preprocessing_fn',

splits_config=transform_pb2.SplitConfig(

splits=[

transform_pb2.SplitConfig.Split(name=‘train’, hash_buckets=4),

transform_pb2.SplitConfig.Split(name=‘eval’, hash_buckets=1)

]

)

)

components.append(transform)


print(transform.outputs['transformed_examples'])

Uses user-provided Python function that trains a model.

trainer = tfx.components.Trainer(
    transformed_examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=tfx.proto.TrainArgs(num_steps=100),
    eval_args=tfx.proto.EvalArgs(num_steps=5),
    run_fn='time_log_trainer.run_fn')

components.append(trainer)

  # Get the latest blessed model for model validation.
model_resolver = tfx.dsl.Resolver(
  strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
  model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
  model_blessing=tfx.dsl.Channel(
      type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
          'latest_blessed_model_resolver')

  # perform quality validation of a candidate model (compared to a baseline).
eval_config = tfma.EvalConfig(
  model_specs=[
      tfma.ModelSpec(
          signature_name='serving_default',
          label_key=_LABEL_KEY,
          # Use transformed label key if Transform is used.
          # label_key=features.transformed_name(features.LABEL_KEY),
          preprocessing_function_names=['preprocessing.preprocessing_fn'])
  ],
  slicing_specs=[tfma.SlicingSpec()],
  metrics_specs=[
      tfma.MetricsSpec(metrics=[
          tfma.MetricConfig(
              class_name='MeanAbsoluteError',
              threshold=tfma.MetricThreshold(
                  value_threshold=tfma.GenericValueThreshold(
                      lower_bound={'value': eval_accuracy_threshold}),
                  change_threshold=tfma.GenericChangeThreshold(
                      direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                      absolute={'value': -1e-10})))
      ])
  ])

evaluator = tfx.components.Evaluator(  # pylint: disable=unused-variable
  examples=example_gen.outputs['examples'],
  model=trainer.outputs['model'],
  baseline_model=model_resolver.outputs['model'],
  # Change threshold will be ignored if there is no baseline (first run).
  eval_config=eval_config)

from tfx.types import Channel

model_blessing_channel = Channel(…)

Pushes the model to a filesystem destination.

components.append(evaluator)
pusher = tfx.components.Pusher(
  model=trainer.outputs['model'],
  model_blessing=evaluator.outputs['blessing'],
  push_destination=tfx.proto.PushDestination(
      filesystem=tfx.proto.PushDestination.Filesystem(
          base_directory=serving_model_dir)))

components.append(pusher)

return tfx.dsl.Pipeline(
  pipeline_name=pipeline_name,
  pipeline_root=pipeline_root,
  metadata_connection_config=tfx.orchestration.metadata
  .sqlite_metadata_connection_config(metadata_path),
  components=components)
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      eval_accuracy_threshold=0,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))
ERROR:

INFO:dill:D2: <dict object at 0x7fcb1bb40240>
T4: <class 'threading.Condition'>
INFO:dill:T4: <class 'threading.Condition'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fcb182a1d00>
INFO:dill:D2: <dict object at 0x7fcb182a1d00>
Lo: <unlocked _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:Lo: <unlocked _thread.lock object at 0x7fcb1a1f6bd0>
F2: <function _create_lock at 0x7fcbac181040>
INFO:dill:F2: <function _create_lock at 0x7fcbac181040>
# F2
INFO:dill:# F2
# Lo
INFO:dill:# Lo
B3: <built-in method acquire of _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:B3: <built-in method acquire of _thread.lock object at 0x7fcb1a1f6bd0>
# B3
INFO:dill:# B3
B3: <built-in method release of _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:B3: <built-in method release of _thread.lock object at 0x7fcb1a1f6bd0>
# B3
INFO:dill:# B3
T4: <class 'collections.deque'>
INFO:dill:T4: <class 'collections.deque'>
# T4
INFO:dill:# T4
# D2
INFO:dill:# D2
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb1be88a00>
INFO:dill:D2: <dict object at 0x7fcb1be88a00>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb187a58c0>
INFO:dill:D2: <dict object at 0x7fcb187a58c0>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb1bbb4800>
INFO:dill:D2: <dict object at 0x7fcb1bbb4800>
# D2
INFO:dill:# D2
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 372 failed.
INFO:absl:Cleaning up stateless execution info.


PLEASE HELP ME OUT.

@HIMANSHU_SHAKYAWAR,

Welcome to the Tensorflow Forum!

Could you please provide the preprocessing_fn for transform component to debug the issue?

Thank you!

Hey Chunduriv,
Thanks For the Quick Reply.
i have Done Various Modification in my Model as well as in preprocessing_fun.
please check the Preprocessing_fn below:

%%writefile {_preprocessing_module_file}
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow.keras.preprocessing.text import Tokenizer

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense


# import tensorflow_transform as transform
@tf.function
def preprocessing_fn(inputs):
    """
    Preprocessing function to transform text features into embeddings using TensorFlow Transform.

    Args:
        inputs: Dictionary of input tensors keyed by feature name.

    Returns:
        Dictionary of transformed tensors keyed by feature name.
    """
    #Initialization of Tokenizer
    
    for feature_name, tensor in inputs.items():
        print(f'{feature_name}: {tensor}')
    # Define feature keys
    

    _TEXT_FEATURE_KEYS = [
       'Description'
                    ] 
    _LABEL_KEY = 'Total Hours'
    max_len = 64
    max_vocab_size = 50000
    tokenizer = Tokenizer(num_words=max_vocab_size)
    # Convert text features to embeddings
    transformed_features = {}
    for key in _TEXT_FEATURE_KEYS:       
        # Define vocabulary size
        vocab_size = 1000

        # Define maximum sequence length
        max_seq_length = 20

        # Tokenize the text
        tokenizer = Tokenizer(num_words=vocab_size)
        tokenizer.fit_on_texts(inputs[key])
        sequences = tokenizer.texts_to_sequences(inputs[key])

        # Pad the sequences
        padded_sequences = pad_sequences(sequences, maxlen=max_seq_length)

        transformed_features[f'{key}'] = padded_sequences
    transformed_features[f'{_LABEL_KEY}'] = inputs[_LABEL_KEY]

    return transformed_features

in Above code DONE various Modification.

i have trained the model for the same use case using tensorflow, which is providing the MAE of around 80, but when i am trying to replicate for production using TFX , it start throwing various Errors.

1 Like