TFX Transform layer returning an empty dictionary

I have created a data pipeline. I have run all the components until my Trainer, in which I have an error in the function serve_tf_examples_fn. More precisely, when transforming the raw data using the transform graph, it returns an empty dictionary. I have already lost two days without finding the error, but I’m suspecting that my preprocessing_fn function is not set up properly preventing it from creating the graph - I’m not sure. Anyway, I’ll share my preprocessing_fn:

import tensorflow as tf
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
import kapre
from util import transformed_name

def get_melspectrogram(audio, sample_rate, n_mels, n_fft, hop_length, fmin, fmax, sample):
    #Decode and pad the vectors
    audio_tensors = tf.map_fn(lambda x: decode_and_pad_audio(x,  320000), audio, dtype=tf.float32)
    
    # Convert tensors to their respective values
    sample_rate = tf.cast(sample_rate, tf.int32).numpy()[0]
    n_mels = tf.cast(n_mels, tf.int32).numpy()[0]
    n_fft = tf.cast(n_fft, tf.int32).numpy()[0]
    hop_length = tf.cast(hop_length, tf.int32).numpy()[0]
    sample = tf.cast(sample, tf.int32).numpy()[0]
    fmin = fmin.numpy()[0]
    fmax = fmax.numpy()[0]


    input_layer = Input(shape=(sample, 1), name='input')
    melspec_layer = kapre.composed.get_melspectrogram_layer(input_shape=(sample, 1),
                                                            n_mels=n_mels,
                                                            sample_rate=sample_rate,
                                                            n_fft=n_fft,
                                                            hop_length=hop_length,
                                                            mel_f_min=fmin,
                                                            mel_f_max=fmax)(input_layer)

    model = Model(inputs=input_layer, outputs=melspec_layer)
    audio_tensors = tf.squeeze(audio_tensors, axis=1)
    mels = model(audio_tensors)
    mels = tf.squeeze(mels, axis=-1)

    return tf.reshape(mels, [-1, 1060, 128])


def decode_and_pad_audio(audio, sample):
    audio_tensor = tf.io.decode_raw(audio, tf.float32)
    audio_tensor = tf.expand_dims(audio_tensor, 0)  # Add batch dimension
    audio_tensor = tf.expand_dims(audio_tensor, -1) 
    def pad_audio():
        num_samples = tf.shape(audio_tensor)[1]
        padding_size = sample - num_samples
        return tf.concat([audio_tensor, tf.zeros((1, padding_size, 1), dtype=audio_tensor.dtype)], axis=1)

    def slice_audio():
        return audio_tensor[:, :sample, :]

    padded_audio = tf.cond(tf.math.greater_equal(sample, tf.shape(audio_tensor)[1]), pad_audio, slice_audio)
    return padded_audio

def preprocessing_fn(inputs):
    def process_example(example):
        audio, label, sample_rate, fmax, n_mels, hop_length, n_fft, fmin, sample = example
        
        # Encode the label
        binary_label = tf.where(label == 'human', 1, 0)
        label_one_hot = tf.one_hot(binary_label, depth=2)
        label_one_hot = tf.reshape(label_one_hot, [-1, 2])
        # Serialize the labels
        label_serialized = tf.io.serialize_tensor(label_one_hot)
        
        # Use tf.py_function to call get_melspectrogram with NumPy operations
        mels = tf.py_function(get_melspectrogram,
                              [audio, sample_rate, n_mels, n_fft, hop_length, fmin, fmax, sample],
                              tf.float32)
        
        # Serialize the audios
        mels_serialized = tf.io.serialize_tensor(mels)
        
        return mels_serialized, label_serialized

    # Apply process_example to each input example
    audio_xf, label_xf = tf.map_fn(process_example, (inputs['audio'], inputs['label'], inputs['sample_rate'], inputs['fmax'],
                                       inputs['n_mels'], inputs['hop_length'], inputs['n_fft'], inputs['fmin'],
                                       inputs['sample']), dtype=(tf.string, tf.string))

    return {transformed_name('audio'): audio_xf, transformed_name('label'): label_xf}

I suspect an issue is likely the use of tf.py_function. Transform expects the tft.apply_pyfunc API to be used instead -

Though this API doesn’t currently support TF2, meaning force_tf_compat_v1 must be set to True -

pyfunc is also likely incompatible with serving environments, so this API would likely be better to avoid if possible.

You can try putting this logic in a copy of this codelab to hopefully make it easier to iterate on the preprocessing_fn logic and make sure it works on a bit of dummy data -

1 Like

Hi, @zoy! You’re definitely the person I wished to answer this question! :slight_smile: I have done some good advances, however, I still have some troubles that I think deserve to be shared.

I have done a lot of things to be able to make a portable graph of my code; most of them, if not all, consist in cleaning up my definitions from tf.py_functions and python objects (like parameters like Python Int and Float). Here is my preprocessing_fn function for audio processing:

import tensorflow as tf
from tensorflow.python.framework import ops
from util import transformed_name
from tensorflow_transform import common
from tensorflow_transform import common_types
from typing import Dict, Optional, Any, Union, Tuple
import os
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import tensor_util
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops.signal import shape_ops
from sys import exit

_MEL_BREAK_FREQUENCY_HERTZ = 700.0
_MEL_HIGH_FREQUENCY_Q = 1127.0


@tf.function
def decode_and_pad_audio(audio, sample):
    
    audio_tensor = tf.io.decode_raw(audio, tf.float32)
    def pad_audio():
        num_samples = tf.shape(audio_tensor)[1]
        padding_size = sample - num_samples
        return tf.concat([audio_tensor, tf.zeros((1, padding_size), dtype=audio_tensor.dtype)], axis=1)

    def slice_audio():
        return audio_tensor[:, :sample]

    padded_audio = tf.cond(tf.math.greater_equal(sample, tf.shape(audio_tensor)[1]), pad_audio, slice_audio)
    return padded_audio


@tf.function
def normalize_tensorflow(
    S: common_types.TensorType,
    min_level_db: float,
    name: Optional[str] = None) -> tf.Tensor:

  with tf.compat.v1.name_scope(name, 'normalize'):
    return tf.clip_by_value((S - min_level_db) / -min_level_db, 0, 1)




@tf.function
def tf_log10(x: Union[ops.Tensor, tf.SparseTensor, tf.RaggedTensor], 
              name: Optional[str] = None) -> tf.Tensor:

    with tf.compat.v1.name_scope(name, 'tf_log10'):
        numerator = tf.math.log(x)
        denominator = tf.math.log(tf.constant(10, dtype=numerator.dtype))
        return numerator / denominator



@tf.function
def amp_to_db_tensorflow(x: Union[ops.Tensor, tf.SparseTensor, tf.RaggedTensor], 
                          name: Optional[str] = None) -> tf.Tensor:

    with tf.compat.v1.name_scope(name, 'amp_to_db_tensorflow'):
        return 20 * tf_log10(tf.clip_by_value(tf.abs(x), 1e-5, 1e100))


    
@tf.function
def stft_tensorflow(signals: Union[ops.Tensor, tf.SparseTensor, tf.RaggedTensor], 
                    win_length: int, 
                    hop_length: int, 
                    n_fft: int,
                    name: Optional[str] = None) -> tf.Tensor:

    with tf.compat.v1.name_scope(name, 'stft_tensorflow'):
        return tf.signal.stft(
            signals,
            win_length,
            hop_length,
            n_fft,
            pad_end=True,
            window_fn=tf.signal.hann_window,
        )


@tf.function
def _mel_to_hertz(mel_values, name=None):

  with ops.name_scope(name, 'mel_to_hertz', [mel_values]):
    mel_values = ops.convert_to_tensor(mel_values)
    return _MEL_BREAK_FREQUENCY_HERTZ * (
        math_ops.exp(mel_values / _MEL_HIGH_FREQUENCY_Q) - 1.0
    )
@tf.function
def _hertz_to_mel(frequencies_hertz, name=None):

  with ops.name_scope(name, 'hertz_to_mel', [frequencies_hertz]):
    frequencies_hertz = ops.convert_to_tensor(frequencies_hertz)
    return _MEL_HIGH_FREQUENCY_Q * math_ops.log(
        1.0 + (frequencies_hertz / _MEL_BREAK_FREQUENCY_HERTZ))
    

@tf.function
def linear_to_mel_weight_matrix(num_mel_bins=20,
                                num_spectrogram_bins=129,
                                sample_rate=8000,
                                lower_edge_hertz=125.0,
                                upper_edge_hertz=3800.0,
                                dtype=dtypes.float32,
                                name=None):
  
  
  with ops.name_scope(name, 'linear_to_mel_weight_matrix') as name:
    # Convert Tensor `sample_rate` to float, if possible.
    if isinstance(sample_rate, ops.Tensor):
      maybe_const_val = tensor_util.constant_value(sample_rate)
      if maybe_const_val is not None:
        sample_rate = maybe_const_val

    # Note: As num_spectrogram_bins is passed to `math_ops.linspace`
    # and the validation is already done in linspace (both in shape function
    # and in kernel), there is no need to validate num_spectrogram_bins here.

    # This function can be constant folded by graph optimization since there are
    # no Tensor inputs.
    sample_rate = math_ops.cast(
        sample_rate, dtype, name='sample_rate')
    lower_edge_hertz = ops.convert_to_tensor(
        lower_edge_hertz, dtype, name='lower_edge_hertz')
    upper_edge_hertz = ops.convert_to_tensor(
        upper_edge_hertz, dtype, name='upper_edge_hertz')
    zero = ops.convert_to_tensor(0.0, dtype)

    # HTK excludes the spectrogram DC bin.
    bands_to_zero = 1
    nyquist_hertz = sample_rate / 2.0
    linear_frequencies = math_ops.linspace(
        zero, nyquist_hertz, num_spectrogram_bins)[bands_to_zero:]
    spectrogram_bins_mel = array_ops.expand_dims(
        _hertz_to_mel(linear_frequencies), 1)

    # Compute num_mel_bins triples of (lower_edge, center, upper_edge). The
    # center of each band is the lower and upper edge of the adjacent bands.
    # Accordingly, we divide [lower_edge_hertz, upper_edge_hertz] into
    # num_mel_bins + 2 pieces.
    band_edges_mel = shape_ops.frame(
        math_ops.linspace(_hertz_to_mel(lower_edge_hertz),
                          _hertz_to_mel(upper_edge_hertz),
                          num_mel_bins + 2), frame_length=3, frame_step=1)

    # Split the triples up and reshape them into [1, num_mel_bins] tensors.
    lower_edge_mel, center_mel, upper_edge_mel = tuple(array_ops.reshape(
        t, [1, num_mel_bins]) for t in array_ops.split(
            band_edges_mel, 3, axis=1))

    # Calculate lower and upper slopes for every spectrogram bin.
    # Line segments are linear in the mel domain, not Hertz.
    lower_slopes = (spectrogram_bins_mel - lower_edge_mel) / (
        center_mel - lower_edge_mel)
    upper_slopes = (upper_edge_mel - spectrogram_bins_mel) / (
        upper_edge_mel - center_mel)

    # Intersect the line segments with each other and zero.
    mel_weights_matrix = math_ops.maximum(
        zero, math_ops.minimum(lower_slopes, upper_slopes))

    # Re-add the zeroed lower bins we sliced out above.
    return array_ops.pad(
        mel_weights_matrix, [[bands_to_zero, 0], [0, 0]], name=name)


@tf.function
def mel_spectrogram(
    tensor: Union[ops.Tensor, tf.SparseTensor, tf.RaggedTensor], 
    win_length: int,
    hop_length: int,
    n_fft: int,
    ref_level_db: float,
    min_level_db: float,
    num_mel_bins: int,
    sample_rate: int,
    mel_lower_edge_hertz: float,
    mel_upper_edge_hertz: float,
    name: Optional[str] = None) -> tf.Tensor:

  with tf.compat.v1.name_scope(name, 'mel_spectrogram'):
      # Process the audio
      D = stft_tensorflow(tensor, win_length, hop_length, n_fft)
      S = amp_to_db_tensorflow(tf.abs(D)) - ref_level_db
      S = normalize_tensorflow(S, min_level_db)

      # Calculate the mel weight matrix
      mel_weight_matrix = linear_to_mel_weight_matrix(
          num_mel_bins=num_mel_bins,
          num_spectrogram_bins=tf.shape(S)[-1],
          sample_rate=sample_rate,
          lower_edge_hertz=mel_lower_edge_hertz,
          upper_edge_hertz=mel_upper_edge_hertz,
          dtype=tf.float32,
      )

      # Apply the mel weight matrix to the spectrogram
      mel_spectrogram = tf.tensordot(S, mel_weight_matrix, 1)
      
      return mel_spectrogram
      #return tf.reshape(mel_spectrogram, [-1, 1067, 128])




@tf.function
def convert_labels(label: tf.Tensor, name: Optional[str] = None) -> tf.Tensor:
    """
    Converts the input 'label' tensor into a binary label and one-hot encodes it.

    Args:
        label: A `Tensor` of labels where 'human' corresponds to 1 and any other value to 0.
        name: (Optional) A name for this operation.

    Returns:
        A `Tensor` that is a one-hot encoding of the input `label`.

    Raises:
        TypeError: If the type of `label` is not supported.
    """
    with tf.compat.v1.name_scope(name, 'convert_labels'):
        binary_label = tf.where(label == 'human', 1, 0)
        label_one_hot = tf.one_hot(binary_label, depth=2)
        label_one_hot = tf.reshape(label_one_hot, [-1, 2])

        return label_one_hot
    

def preprocessing_fn(inputs):
    
    sample =  tf.reshape(tf.cast(inputs['sample'], tf.int32)[0], [])
    win_length = tf.reshape(tf.cast(inputs['n_fft'], tf.int32)[0], []) # I've assumed 'win_length' is equivalent to 'n_fft' from inputs
    n_fft = tf.reshape(inputs['n_fft'][0], [])
    hop_length = tf.reshape(tf.cast(inputs['hop_length'], tf.int32)[0], [])
    ref_level_db = tf.constant(50.0)  # Not specified in the inputs
    min_level_db = tf.constant(-100.0)  # Not specified in the inputs
    # mel scaling
    num_mel_bins  = tf.reshape(tf.cast(inputs['n_mels'], tf.int32)[0], [])
    mel_lower_edge_hertz = tf.reshape(inputs['fmin'][0], [])  # I've assumed 'mel_lower_edge_hertz' is equivalent to 'fmin' from inputs
    mel_upper_edge_hertz = tf.reshape(inputs['fmax'][0], [])
    # inversion
    # power = tf.constant(1.5, tf.float32) # Not specified in the inputs
    # griffin_lim_iters = tf.constant(50, tf.int32)  # Not specified in the inputs
    # pad = tf.constant(True)  # Not specified in the inputs
    sample_rate = tf.reshape(tf.cast(inputs['sample_rate'], tf.int32)[0], [])
    
    audio = inputs['audio']
    audio_reshaped = tf.map_fn(lambda x: decode_and_pad_audio(x, sample), audio, fn_output_signature=tf.TensorSpec(shape=[None, None], dtype=tf.float32))
    mels = mel_spectrogram(audio_reshaped, win_length, hop_length, n_fft, ref_level_db, min_level_db, num_mel_bins, sample_rate, mel_lower_edge_hertz, mel_upper_edge_hertz)
    mels_serialized = tf.expand_dims(tf.io.serialize_tensor(mels), axis=0)
    
    # Encode the label
    label = inputs['label']
    one_hot = convert_labels(label)
    # Serialize the labels
    label_serialized = tf.expand_dims(tf.io.serialize_tensor(one_hot), axis=0)
    # We use tf.size to infer if there is batch dimension
    #outputs = preprocess_single_example(mels, one_hot)
    
    #return outputs

    return {transformed_name('audio'): mels_serialized, transformed_name('label'): label_serialized}

Notice that I had including to rewrite some tf.functions to not work with Python objects - as it happens to linear_to_mel_weight_matrix method and its submethods.

Now I have the following problem, which I think is relevant to the community: TFT.transform process the raw data in batches and, for that reason, outputs it in batches. And it is creating an extra dimension that is unmatching my model input - something for which I may find a workaround. For example, I’ve tried the following one:


@tf.function
def serialize_tensor(tensor):
    return tf.io.serialize_tensor(tensor)

@tf.function
def preprocess_single_example(mels, one_hot):
    mels = tf.expand_dims(mels, axis=0)
    one_hot = tf.expand_dims(one_hot, axis=0)
    
    mels_serialized = tf.map_fn(serialize_tensor, mels, dtype=tf.string)
    one_hot_serialized = tf.map_fn(serialize_tensor, one_hot, dtype=tf.string)

    return {transformed_name('audio'): mels_serialized, transformed_name('label'): one_hot_serialized}

def preprocessing_fn(inputs):
    
    sample =  tf.reshape(tf.cast(inputs['sample'], tf.int32)[0], [])
    win_length = tf.reshape(tf.cast(inputs['n_fft'], tf.int32)[0], []) # I've assumed 'win_length' is equivalent to 'n_fft' from inputs
    n_fft = tf.reshape(inputs['n_fft'][0], [])
    hop_length = tf.reshape(tf.cast(inputs['hop_length'], tf.int32)[0], [])
    ref_level_db = tf.constant(50.0)  # Not specified in the inputs
    min_level_db = tf.constant(-100.0)  # Not specified in the inputs
    # mel scaling
    num_mel_bins  = tf.reshape(tf.cast(inputs['n_mels'], tf.int32)[0], [])
    mel_lower_edge_hertz = tf.reshape(inputs['fmin'][0], [])  # I've assumed 'mel_lower_edge_hertz' is equivalent to 'fmin' from inputs
    mel_upper_edge_hertz = tf.reshape(inputs['fmax'][0], [])
    # inversion
    sample_rate = tf.reshape(tf.cast(inputs['sample_rate'], tf.int32)[0], [])
    
    audio = inputs['audio']
    audio_reshaped = tf.map_fn(lambda x: decode_and_pad_audio(x, sample), audio, fn_output_signature=tf.TensorSpec(shape=[None, None], dtype=tf.float32))
    mels = mel_spectrogram(audio_reshaped, win_length, hop_length, n_fft, ref_level_db, min_level_db, num_mel_bins, sample_rate, mel_lower_edge_hertz, mel_upper_edge_hertz)
    
    
    # Encode the label
    label = inputs['label']
    one_hot = convert_labels(label)
    # Serialize the labels

    # We use tf.size to infer if there is batch dimension
    outputs = preprocess_single_example(mels, one_hot)
    
    return outputs

I have added this outputs = preprocess_single_example(mels, one_hot) as an attempt to unbatch the outputs - however, for some reason that I can’t find out yet, it ends up turning my graph not being portable and resulting in the former bug - empty dictionary.

So here comes my next question: what is the best practice for handling the batching behavior of the TFT Transform? I wouldn’t like to demand it for a first model layer because it would require reshaping the input accordingly in the production.

Thank you very much!

My understanding is that transform is now outputting the right data, the remaining issue is now just with the shape of it.
This will depend on how the data is read on the Trainer side. The Transform output itself is not batched.
Could you share how the data is being read/transformed on the trainer side?
Could you also mention if you’re using TF or Keras for training?

(I suggest putting preprocess_single_example aside for now)

1 Like

Hi @zoy, thanks for your comments:

I just have apparently fixed the errors. Here is how the data is being transformed on the the trainer side:

def input_fn(file_pattern, tf_transform_output, batch_size= 8) -> tf.data.Dataset:
   
    def parse_func(example_proto):
        parsed_features = tf.io.parse_single_example(example_proto, tf_transform_output.transformed_feature_spec().copy())
        label = parsed_features.pop('label_xf')
        label = tf.io.parse_tensor(label, out_type=tf.float32)
        label = tf.reshape(label, [-1, 2])
        
        return parsed_features, label

    # Create a dataset of file paths
    file_paths_dataset = tf.data.Dataset.list_files(file_pattern)

    # Flat map the file paths to a single dataset, where each file is read and decompressed
    dataset = file_paths_dataset.flat_map(
        lambda filepath: tf.data.TFRecordDataset(filepath, compression_type="GZIP")).map(parse_func).repeat()
   
    return dataset


def _get_serve_tf_examples_fn(model, tf_transform_output):
    model.tft_layer = tf_transform_output.transform_features_layer()
 
    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop('label')
        parsed_features = tf.io.parse_example(
        serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        outputs = model(transformed_features)
        return {'outputs': outputs}
    return serve_tf_examples_fn

Notice that at input_fn method I’m using flat_map in order to unbatch the audios and labels. This is a point that still raises some doubts about whether it will not cause any mismatch in production when using the saved transformed graph along with the model…

And here is my custom model layer to handle this input:

class ParseAndDeserializeLayer(tf.keras.layers.Layer):
    def __init__(self, feature_spec, **kwargs):
        super(ParseAndDeserializeLayer, self).__init__(**kwargs)
        self.feature_spec = feature_spec
    
    def call(self, parsed_features):
        audio = tf.io.parse_tensor(parsed_features['audio_xf'], out_type=tf.float32)
        audio = tf.reshape(audio, [-1, 1067, 128])
        return audio
    
    def get_config(self):
        config = super().get_config()
        config.update({"feature_spec": self.feature_spec})
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)



class TrainedModel(tf.keras.Model):
    def __init__(self, feature_spec, time_steps, num_layers, units, d_model, num_heads, dropout, output_size, projection, input_name, name="transformer"):
        super().__init__(name=name)
        self.parse_and_deserialize = ParseAndDeserializeLayer(feature_spec)
        self.transformer = Transformer(time_steps, num_layers, units, d_model, num_heads, dropout, output_size, projection, input_name)
    
    def call(self, inputs, training=None):
        x = self.parse_and_deserialize(inputs)
        return self.transformer(x, training)

    def get_config(self):
        config = super().get_config()
        config.update({
            "feature_spec": self.parse_and_deserialize.feature_spec,
            "time_steps": self.transformer.time_steps,
            "num_layers": self.transformer.num_layers,
            "units": self.transformer.units,
            "d_model": self.transformer.d_model,
            "num_heads": self.transformer.num_heads,
            "dropout": self.transformer.dropout,
            "output_size": self.transformer.output_size,
            "projection": self.transformer.projection,
            "input_name": self.transformer.input_name
        })
        return config

Appart from all the debug logs warnings, things are apparently working:


WARNING:absl:`transformed_examples` is deprecated. Please use `examples` instead.
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying CustomExampleGen.py -> build/lib
copying EDA.py -> build/lib
copying audio_to_tfrecord.py -> build/lib
copying util.py -> build/lib
copying model.py -> build/lib
copying preprocessing.py -> build/lib
copying trainer_test.py -> build/lib
copying module.py -> build/lib
copying data_ingestion.py -> build/lib
copying processing_test.py -> build/lib
copying data_augmentation.py -> build/lib
/home/marlon/anaconda3/envs/tfx/lib/python3.9/site-packages/setuptools/command/install.py:34: SetuptoolsDeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools.
  warnings.warn(
installing to /tmp/tmpz841rwjw
running install
running install_lib
copying build/lib/CustomExampleGen.py -> /tmp/tmpz841rwjw
copying build/lib/EDA.py -> /tmp/tmpz841rwjw
copying build/lib/audio_to_tfrecord.py -> /tmp/tmpz841rwjw
copying build/lib/util.py -> /tmp/tmpz841rwjw
copying build/lib/model.py -> /tmp/tmpz841rwjw
copying build/lib/preprocessing.py -> /tmp/tmpz841rwjw
copying build/lib/trainer_test.py -> /tmp/tmpz841rwjw
copying build/lib/module.py -> /tmp/tmpz841rwjw
copying build/lib/data_ingestion.py -> /tmp/tmpz841rwjw
copying build/lib/processing_test.py -> /tmp/tmpz841rwjw
copying build/lib/data_augmentation.py -> /tmp/tmpz841rwjw
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmpz841rwjw/tfx_user_code_Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3-py3.9.egg-info
running install_scripts
creating /tmp/tmpz841rwjw/tfx_user_code_Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3.dist-info/WHEEL
creating '/tmp/tmpffhwjzlm/tfx_user_code_Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3-py3-none-any.whl' and adding '/tmp/tmpz841rwjw' to it
adding 'CustomExampleGen.py'
adding 'EDA.py'
adding 'audio_to_tfrecord.py'
adding 'data_augmentation.py'
adding 'data_ingestion.py'
adding 'model.py'
adding 'module.py'
adding 'preprocessing.py'
adding 'processing_test.py'
adding 'trainer_test.py'
adding 'util.py'
adding 'tfx_user_code_Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3.dist-info/METADATA'
adding 'tfx_user_code_Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3.dist-info/WHEEL'
adding 'tfx_user_code_Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3.dist-info/top_level.txt'
adding 'tfx_user_code_Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3.dist-info/RECORD'
removing /tmp/tmpz841rwjw
WARNING:absl:Examples artifact does not have payload_format custom property. Falling back to FORMAT_TF_EXAMPLE
WARNING:absl:Examples artifact does not have payload_format custom property. Falling back to FORMAT_TF_EXAMPLE
WARNING:absl:Examples artifact does not have payload_format custom property. Falling back to FORMAT_TF_EXAMPLE
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Processing /tmp/tfx-interactive-2023-05-22T11_16_03.447216-mnmc1o0b/_wheels/tfx_user_code_Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3-py3-none-any.whl
Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+edc5f739ca05aaec677badd2194960dc369be8e7b6af13c6c52af78c53d498a3
linear
2023-05-22 12:04:20.461947: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]
499/500 [============================>.] - ETA: 0s - loss: 3.9785e-04 - accuracy: 1.00002023-05-22 12:04:42.397316: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]
2023-05-22 12:04:42.397530: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]
500/500 [==============================] - 23s 36ms/step - loss: 3.9705e-04 - accuracy: 1.0000 - val_loss: 0.0000e+00 - val_accuracy: 1.0000
Model: "transformer"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 parse_and_deserialize_layer  multiple                 0         
 _6 (ParseAndDeserializeLaye                                     
 r)                                                              
                                                                 
 transformer (Transformer)   multiple                  949378    
                                                                 
=================================================================
Total params: 949,378
Trainable params: 949,378
Non-trainable params: 0
_________________________________________________________________
2023-05-22 12:04:43.658505: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'num_mel_bins' with dtype int32
	 [[{{node win_length}}]]
2023-05-22 12:04:43.688654: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'win_length' with dtype int32
	 [[{{node win_length}}]]
INFO:tensorflow:Saver not created because there are no variables in the graph to restore
INFO:tensorflow:Saver not created because there are no variables in the graph to restore
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
2023-05-22 12:04:44.163765: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'examples' with dtype string and shape [?]
	 [[{{node examples}}]]
2023-05-22 12:04:44.180534: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor '146351' with dtype float and shape [1,1067,128]
	...
2023-05-22 12:04:46.446998: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor '152427' with dtype float and shape [1,1067,128]
	 [[{{node 152427}}]]
INFO:tensorflow:Unsupported signature for serialization: ((TensorSpec(shape=(136576, 2), dtype=tf.float32, name='gradient'), <tensorflow.python.framework.func_graph.UnknownArgument object at 0x7fb735ff8c10>, 140424858760368), {}).
INFO:tensorflow:Unsupported signature for serialization: ((TensorSpec(shape=(136576, 2), dtype=tf.float32, name='gradient'), <tensorflow.python.framework.func_graph.UnknownArgument object at 0x7fb735ff8c10>, 140424858760368), {}).
INFO:tensorflow:Unsupported signature for serialization: ((TensorSpec(shape=(2,), dtype=tf.float32, name='gradient'), <tensorflow.python.framework.func_graph.UnknownArgument object at 0x7fb73cec3d60>, 140424983542352), {}).
INFO:tensorflow:Unsupported signature for serialization: ((TensorSpec(shape=(2,), dtype=tf.float32, name='gradient'), <tensorflow.python.framework.func_graph.UnknownArgument object at 0x7fb73cec3d60>, 140424983542352), {}).
2023-05-22 12:04:46.911716: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'inputs_audio' with dtype string and shape [?,1]
	 [[{{node inputs_audio}}]]
2023-05-22 12:04:46.911775: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'inputs_hop_length' with dtype int64 and shape [?,1]
	 ...
2023-05-22 12:04:49.071517: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'inputs' with dtype float and shape [?,?,128]
	 [[{{node inputs}}]]
WARNING:absl:Found untraced functions such as dense_81_layer_call_fn, dense_81_layer_call_and_return_conditional_losses, dense_82_layer_call_fn, dense_82_layer_call_and_return_conditional_losses, dense_83_layer_call_fn while saving (showing 5 of 16). These functions will not be directly callable after loading.
2023-05-22 12:04:49.258691: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'serving_model_dir_default_examples' with dtype string and shape [?]
	 [[{{node serving_model_dir_default_examples}}]]
INFO:tensorflow:Assets written to: /tmp/tfx-interactive-2023-05-22T11_16_03.447216-mnmc1o0b/Trainer/model/12/Format-Serving/assets
INFO:tensorflow:Assets written to: /tmp/tfx-interactive-2023-05-22T11_16_03.447216-mnmc1o0b/Trainer/model/12/Format-Serving/assets
WARNING:absl:<model.MultiHeadAttention object at 0x7fba703138e0> has the same name 'MultiHeadAttention' as a built-in Keras object. Consider renaming <class 'model.MultiHeadAttention'> to avoid naming conflicts when loading with `tf.keras.models.load_model`. If renaming is not possible, pass the object in the `custom_objects` parameter of the load function.
WARNING:absl:<model.MultiHeadAttention object at 0x7fb74827d340> has the same name 'MultiHeadAttention' as a built-in Keras object. Consider renaming <class 'model.MultiHeadAttention'> to avoid naming conflicts when loading with `tf.keras.models.load_model`. If renaming is not possible, pass the object in the `custom_objects` parameter of the load function.
Out[8]: 
ExecutionResult(
    component_id: Trainer
    execution_id: 12
    outputs:
        model: OutputChannel(artifact_type=Model, producer_component_id=Trainer, output_key=model, additional_properties={}, additional_custom_properties={}, _input_trigger=None
        model_run: OutputChannel(artifact_type=ModelRun, producer_component_id=Trainer, output_key=model_run, additional_properties={}, additional_custom_properties={}, _input_trigger=None)

Any comments are more than welcomed…

Thanks,

I am facing a similar issue where my model export fails since some feature keys are missing in the transformed_features output after the transformed_features = model.tft_layer_inference(raw_features).

Here is my preprocessing function for the recency features(feature set which goes missing in the transformed_features) which gets called inside the preprocessing_fn, along with other utility functions:

def _preprocessing_recency_features_fn(
    inputs,
    features_list,
    clip_left_idx=0,
    clip_right_idx=-1,
):
      outputs = {}
      for feature in features_list:
          bs_value_tag = list(
              filter(lambda string: len(findall(r"bs_\d+", string)) > 0, feature.tags)
          )[0]
          bs_value = int(bs_value_tag.split("_")[1])
          log_1_p = _recency_features(
              inputs[feature.name], inputs[features.REQUEST_TS]
          )
          outputs[features.transformed_name(feature.name)] = _quantiles_clip(
              log_1_p, bs_value, clip_left_idx, clip_right_idx
          )
      return outputs

def _quantiles_clip(
    inputs, num_buckets, clip_left_idx=0, clip_right_idx=-1, zero_left_clip=False
):
    quantiles = tft.quantiles(inputs, num_buckets, epsilon=1.0 / num_buckets)
    if zero_left_clip:
        left_clip_val = 0
    else:
        left_clip_val = quantiles[0][clip_left_idx]
    return tf.clip_by_value(inputs, left_clip_val, quantiles[0][clip_right_idx])

def _log1p_abs_diff(x, y):
    return tf.math.log1p(tf.cast(tf.math.abs(x - y), tf.float32))


def _recency_features(joined, now, dim: int = 1):
    return _log1p_abs_diff(
        _fill_in_missing(now), tf.reshape(_fill_in_missing(joined), (-1, dim))
    )

def _fill_in_missing(
    x, non_string_default: Union[int, float] = 0, string_default="", dense_feature_dim=1
):
    """Replace missing values in a SparseTensor.

    Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

    Args:
      x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
        in the second dimension.

    Returns:
      A rank 1 tensor where missing values of `x` have been filled in.
    """
    if not isinstance(x, tf.sparse.SparseTensor):
        return x

    default_value = string_default if x.dtype == tf.string else non_string_default
    dense = tf.sparse.to_dense(
        tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], dense_feature_dim]),
        default_value,
    )
    if dense_feature_dim == 1:
        return tf.squeeze(
            dense,
            axis=1,
        )
    return dense

Using above methods I get a KeyError for one the recency features: KeyError: 'user_created_on_xf'
As expected it works fine during tranformation and training but fails during serving as the feature goes missing. I tried adding with tf.compat.v1.name_scope(None, "recency"): at the start of my function definition but that gives another error saying: TypeError: Python object could not be represented through the generic tracing type. Consider implementing the Tracing Protocol for it: Feature(name='created_on', tags={'bs_1000', 'recency_features'})
It would be great help if someone can help resolve this error. I’ve already spent a couple days trying to fix it with no success so far.