Hi, @zoy! You’re definitely the person I wished to answer this question!
I have done some good advances, however, I still have some troubles that I think deserve to be shared.
I have done a lot of things to be able to make a portable graph of my code; most of them, if not all, consist in cleaning up my definitions from tf.py_functions and python objects (like parameters like Python Int and Float). Here is my preprocessing_fn function for audio processing:
import tensorflow as tf
from tensorflow.python.framework import ops
from util import transformed_name
from tensorflow_transform import common
from tensorflow_transform import common_types
from typing import Dict, Optional, Any, Union, Tuple
import os
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import tensor_util
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops.signal import shape_ops
from sys import exit
_MEL_BREAK_FREQUENCY_HERTZ = 700.0
_MEL_HIGH_FREQUENCY_Q = 1127.0
@tf.function
def decode_and_pad_audio(audio, sample):
audio_tensor = tf.io.decode_raw(audio, tf.float32)
def pad_audio():
num_samples = tf.shape(audio_tensor)[1]
padding_size = sample - num_samples
return tf.concat([audio_tensor, tf.zeros((1, padding_size), dtype=audio_tensor.dtype)], axis=1)
def slice_audio():
return audio_tensor[:, :sample]
padded_audio = tf.cond(tf.math.greater_equal(sample, tf.shape(audio_tensor)[1]), pad_audio, slice_audio)
return padded_audio
@tf.function
def normalize_tensorflow(
S: common_types.TensorType,
min_level_db: float,
name: Optional[str] = None) -> tf.Tensor:
with tf.compat.v1.name_scope(name, 'normalize'):
return tf.clip_by_value((S - min_level_db) / -min_level_db, 0, 1)
@tf.function
def tf_log10(x: Union[ops.Tensor, tf.SparseTensor, tf.RaggedTensor],
name: Optional[str] = None) -> tf.Tensor:
with tf.compat.v1.name_scope(name, 'tf_log10'):
numerator = tf.math.log(x)
denominator = tf.math.log(tf.constant(10, dtype=numerator.dtype))
return numerator / denominator
@tf.function
def amp_to_db_tensorflow(x: Union[ops.Tensor, tf.SparseTensor, tf.RaggedTensor],
name: Optional[str] = None) -> tf.Tensor:
with tf.compat.v1.name_scope(name, 'amp_to_db_tensorflow'):
return 20 * tf_log10(tf.clip_by_value(tf.abs(x), 1e-5, 1e100))
@tf.function
def stft_tensorflow(signals: Union[ops.Tensor, tf.SparseTensor, tf.RaggedTensor],
win_length: int,
hop_length: int,
n_fft: int,
name: Optional[str] = None) -> tf.Tensor:
with tf.compat.v1.name_scope(name, 'stft_tensorflow'):
return tf.signal.stft(
signals,
win_length,
hop_length,
n_fft,
pad_end=True,
window_fn=tf.signal.hann_window,
)
@tf.function
def _mel_to_hertz(mel_values, name=None):
with ops.name_scope(name, 'mel_to_hertz', [mel_values]):
mel_values = ops.convert_to_tensor(mel_values)
return _MEL_BREAK_FREQUENCY_HERTZ * (
math_ops.exp(mel_values / _MEL_HIGH_FREQUENCY_Q) - 1.0
)
@tf.function
def _hertz_to_mel(frequencies_hertz, name=None):
with ops.name_scope(name, 'hertz_to_mel', [frequencies_hertz]):
frequencies_hertz = ops.convert_to_tensor(frequencies_hertz)
return _MEL_HIGH_FREQUENCY_Q * math_ops.log(
1.0 + (frequencies_hertz / _MEL_BREAK_FREQUENCY_HERTZ))
@tf.function
def linear_to_mel_weight_matrix(num_mel_bins=20,
num_spectrogram_bins=129,
sample_rate=8000,
lower_edge_hertz=125.0,
upper_edge_hertz=3800.0,
dtype=dtypes.float32,
name=None):
with ops.name_scope(name, 'linear_to_mel_weight_matrix') as name:
# Convert Tensor `sample_rate` to float, if possible.
if isinstance(sample_rate, ops.Tensor):
maybe_const_val = tensor_util.constant_value(sample_rate)
if maybe_const_val is not None:
sample_rate = maybe_const_val
# Note: As num_spectrogram_bins is passed to `math_ops.linspace`
# and the validation is already done in linspace (both in shape function
# and in kernel), there is no need to validate num_spectrogram_bins here.
# This function can be constant folded by graph optimization since there are
# no Tensor inputs.
sample_rate = math_ops.cast(
sample_rate, dtype, name='sample_rate')
lower_edge_hertz = ops.convert_to_tensor(
lower_edge_hertz, dtype, name='lower_edge_hertz')
upper_edge_hertz = ops.convert_to_tensor(
upper_edge_hertz, dtype, name='upper_edge_hertz')
zero = ops.convert_to_tensor(0.0, dtype)
# HTK excludes the spectrogram DC bin.
bands_to_zero = 1
nyquist_hertz = sample_rate / 2.0
linear_frequencies = math_ops.linspace(
zero, nyquist_hertz, num_spectrogram_bins)[bands_to_zero:]
spectrogram_bins_mel = array_ops.expand_dims(
_hertz_to_mel(linear_frequencies), 1)
# Compute num_mel_bins triples of (lower_edge, center, upper_edge). The
# center of each band is the lower and upper edge of the adjacent bands.
# Accordingly, we divide [lower_edge_hertz, upper_edge_hertz] into
# num_mel_bins + 2 pieces.
band_edges_mel = shape_ops.frame(
math_ops.linspace(_hertz_to_mel(lower_edge_hertz),
_hertz_to_mel(upper_edge_hertz),
num_mel_bins + 2), frame_length=3, frame_step=1)
# Split the triples up and reshape them into [1, num_mel_bins] tensors.
lower_edge_mel, center_mel, upper_edge_mel = tuple(array_ops.reshape(
t, [1, num_mel_bins]) for t in array_ops.split(
band_edges_mel, 3, axis=1))
# Calculate lower and upper slopes for every spectrogram bin.
# Line segments are linear in the mel domain, not Hertz.
lower_slopes = (spectrogram_bins_mel - lower_edge_mel) / (
center_mel - lower_edge_mel)
upper_slopes = (upper_edge_mel - spectrogram_bins_mel) / (
upper_edge_mel - center_mel)
# Intersect the line segments with each other and zero.
mel_weights_matrix = math_ops.maximum(
zero, math_ops.minimum(lower_slopes, upper_slopes))
# Re-add the zeroed lower bins we sliced out above.
return array_ops.pad(
mel_weights_matrix, [[bands_to_zero, 0], [0, 0]], name=name)
@tf.function
def mel_spectrogram(
tensor: Union[ops.Tensor, tf.SparseTensor, tf.RaggedTensor],
win_length: int,
hop_length: int,
n_fft: int,
ref_level_db: float,
min_level_db: float,
num_mel_bins: int,
sample_rate: int,
mel_lower_edge_hertz: float,
mel_upper_edge_hertz: float,
name: Optional[str] = None) -> tf.Tensor:
with tf.compat.v1.name_scope(name, 'mel_spectrogram'):
# Process the audio
D = stft_tensorflow(tensor, win_length, hop_length, n_fft)
S = amp_to_db_tensorflow(tf.abs(D)) - ref_level_db
S = normalize_tensorflow(S, min_level_db)
# Calculate the mel weight matrix
mel_weight_matrix = linear_to_mel_weight_matrix(
num_mel_bins=num_mel_bins,
num_spectrogram_bins=tf.shape(S)[-1],
sample_rate=sample_rate,
lower_edge_hertz=mel_lower_edge_hertz,
upper_edge_hertz=mel_upper_edge_hertz,
dtype=tf.float32,
)
# Apply the mel weight matrix to the spectrogram
mel_spectrogram = tf.tensordot(S, mel_weight_matrix, 1)
return mel_spectrogram
#return tf.reshape(mel_spectrogram, [-1, 1067, 128])
@tf.function
def convert_labels(label: tf.Tensor, name: Optional[str] = None) -> tf.Tensor:
"""
Converts the input 'label' tensor into a binary label and one-hot encodes it.
Args:
label: A `Tensor` of labels where 'human' corresponds to 1 and any other value to 0.
name: (Optional) A name for this operation.
Returns:
A `Tensor` that is a one-hot encoding of the input `label`.
Raises:
TypeError: If the type of `label` is not supported.
"""
with tf.compat.v1.name_scope(name, 'convert_labels'):
binary_label = tf.where(label == 'human', 1, 0)
label_one_hot = tf.one_hot(binary_label, depth=2)
label_one_hot = tf.reshape(label_one_hot, [-1, 2])
return label_one_hot
def preprocessing_fn(inputs):
sample = tf.reshape(tf.cast(inputs['sample'], tf.int32)[0], [])
win_length = tf.reshape(tf.cast(inputs['n_fft'], tf.int32)[0], []) # I've assumed 'win_length' is equivalent to 'n_fft' from inputs
n_fft = tf.reshape(inputs['n_fft'][0], [])
hop_length = tf.reshape(tf.cast(inputs['hop_length'], tf.int32)[0], [])
ref_level_db = tf.constant(50.0) # Not specified in the inputs
min_level_db = tf.constant(-100.0) # Not specified in the inputs
# mel scaling
num_mel_bins = tf.reshape(tf.cast(inputs['n_mels'], tf.int32)[0], [])
mel_lower_edge_hertz = tf.reshape(inputs['fmin'][0], []) # I've assumed 'mel_lower_edge_hertz' is equivalent to 'fmin' from inputs
mel_upper_edge_hertz = tf.reshape(inputs['fmax'][0], [])
# inversion
# power = tf.constant(1.5, tf.float32) # Not specified in the inputs
# griffin_lim_iters = tf.constant(50, tf.int32) # Not specified in the inputs
# pad = tf.constant(True) # Not specified in the inputs
sample_rate = tf.reshape(tf.cast(inputs['sample_rate'], tf.int32)[0], [])
audio = inputs['audio']
audio_reshaped = tf.map_fn(lambda x: decode_and_pad_audio(x, sample), audio, fn_output_signature=tf.TensorSpec(shape=[None, None], dtype=tf.float32))
mels = mel_spectrogram(audio_reshaped, win_length, hop_length, n_fft, ref_level_db, min_level_db, num_mel_bins, sample_rate, mel_lower_edge_hertz, mel_upper_edge_hertz)
mels_serialized = tf.expand_dims(tf.io.serialize_tensor(mels), axis=0)
# Encode the label
label = inputs['label']
one_hot = convert_labels(label)
# Serialize the labels
label_serialized = tf.expand_dims(tf.io.serialize_tensor(one_hot), axis=0)
# We use tf.size to infer if there is batch dimension
#outputs = preprocess_single_example(mels, one_hot)
#return outputs
return {transformed_name('audio'): mels_serialized, transformed_name('label'): label_serialized}
Notice that I had including to rewrite some tf.functions to not work with Python objects - as it happens to linear_to_mel_weight_matrix
method and its submethods.
Now I have the following problem, which I think is relevant to the community: TFT.transform process the raw data in batches and, for that reason, outputs it in batches. And it is creating an extra dimension that is unmatching my model input - something for which I may find a workaround. For example, I’ve tried the following one:
@tf.function
def serialize_tensor(tensor):
return tf.io.serialize_tensor(tensor)
@tf.function
def preprocess_single_example(mels, one_hot):
mels = tf.expand_dims(mels, axis=0)
one_hot = tf.expand_dims(one_hot, axis=0)
mels_serialized = tf.map_fn(serialize_tensor, mels, dtype=tf.string)
one_hot_serialized = tf.map_fn(serialize_tensor, one_hot, dtype=tf.string)
return {transformed_name('audio'): mels_serialized, transformed_name('label'): one_hot_serialized}
def preprocessing_fn(inputs):
sample = tf.reshape(tf.cast(inputs['sample'], tf.int32)[0], [])
win_length = tf.reshape(tf.cast(inputs['n_fft'], tf.int32)[0], []) # I've assumed 'win_length' is equivalent to 'n_fft' from inputs
n_fft = tf.reshape(inputs['n_fft'][0], [])
hop_length = tf.reshape(tf.cast(inputs['hop_length'], tf.int32)[0], [])
ref_level_db = tf.constant(50.0) # Not specified in the inputs
min_level_db = tf.constant(-100.0) # Not specified in the inputs
# mel scaling
num_mel_bins = tf.reshape(tf.cast(inputs['n_mels'], tf.int32)[0], [])
mel_lower_edge_hertz = tf.reshape(inputs['fmin'][0], []) # I've assumed 'mel_lower_edge_hertz' is equivalent to 'fmin' from inputs
mel_upper_edge_hertz = tf.reshape(inputs['fmax'][0], [])
# inversion
sample_rate = tf.reshape(tf.cast(inputs['sample_rate'], tf.int32)[0], [])
audio = inputs['audio']
audio_reshaped = tf.map_fn(lambda x: decode_and_pad_audio(x, sample), audio, fn_output_signature=tf.TensorSpec(shape=[None, None], dtype=tf.float32))
mels = mel_spectrogram(audio_reshaped, win_length, hop_length, n_fft, ref_level_db, min_level_db, num_mel_bins, sample_rate, mel_lower_edge_hertz, mel_upper_edge_hertz)
# Encode the label
label = inputs['label']
one_hot = convert_labels(label)
# Serialize the labels
# We use tf.size to infer if there is batch dimension
outputs = preprocess_single_example(mels, one_hot)
return outputs
I have added this outputs = preprocess_single_example(mels, one_hot)
as an attempt to unbatch the outputs - however, for some reason that I can’t find out yet, it ends up turning my graph not being portable and resulting in the former bug - empty dictionary.
So here comes my next question: what is the best practice for handling the batching behavior of the TFT Transform? I wouldn’t like to demand it for a first model layer because it would require reshaping the input accordingly in the production.
Thank you very much!