Tensorflow records get corrupted randomly (TF 2.15)

Hi, I am performing a considerable amount of random search iterations to find good hyperparameters of a model. The data I am using is stored in tensorflow records.
My model trains fine for many epochs. There have been periods where I had no issue for a month or two. However, currently, after about 70 epochs of training, my tensorflow records get corrupted for no apparent reason.

I receive the error message:

2 root error(s) found.
(0) DATA_LOSS: corrupted record at 928889550
[[{{node IteratorGetNext}}]]
[[IteratorGetNext/_4]]
(1) DATA_LOSS: corrupted record at 928889550
[[{{node IteratorGetNext}}]]
0 successful operations.
0 derived errors ignored. [Op:__inference_train_function_406967]

If I replace the records with backups (or recreate them from audiofiles) the error vanishes and everything works fine again.
Every time the records get corrupted the index of the allegedly corrupted record is different, there does not seem to be a pattern.
However, if I do not replace the record and just rerun my code, the index remains the same.

This is very strange. I run my code not only on my personal computer, but also on a compute cluster on many different nodes. There, the records appear to not break at all. So it may be related to my machine, although I use identical code (prototype on my computer, then move it to the compute cluster).

Information on this type of error is very scarce. So I hope that maybe someone here has an idea.

I cannot use tf.data.experimental.ignore_errors(), because then my dataset is considerably reduced, making the rest of the training not meaningful.

Here is how I create and read the records:

 def fetch(self):
        dataset = tf.data.TFRecordDataset(self.tfr, compression_type=None).map(self._decode,
                                                        num_parallel_calls=None)  # was  num_parallel_calls=tf.data.experimental.AUTOTUNE
        if self.mode == "train" or self.mode == "train_large":
            dataset = dataset.shuffle(dataset.cardinality(), reshuffle_each_iteration=True)  # formerly: 2000, True
            train_dataset = dataset.batch(self.batch_size,
                                          drop_remainder=False)  # dataset.batch(1, drop_remainder=True)#
            train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)  # was tf.data.experimental.AUTOTUNE
            return train_dataset

        if self.mode == "valid":
            valid_dataset = dataset.batch(self.batch_size,
                                          drop_remainder=False)  # was: dataset.batch(1,drop_remaineder=False)
            valid_dataset = valid_dataset.prefetch(tf.data.experimental.AUTOTUNE)  # was tf.data.experimental.AUTOTUNE
            return valid_dataset

        else:
            dataset = dataset.batch(1, drop_remainder=False)
            dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
            return dataset

    def _encode(self, mode):

        if self.mode == "train":
            print("\nSerializing training data...\n")

        if self.mode == "valid":
            print("\nSerializing validation data...\n")

        if self.mode == "test":
            print("\nSerializing testing data...\n")

        writer = tf.io.TFRecordWriter(self.tfr)

        if self.mode != "test":
            mix_filenames = glob.glob(os.path.join(self.wav_dir, "*mixed*.wav"))
            target_filenames = glob.glob(os.path.join(self.wav_dir, "*clean*.wav"))
            sys.stdout.flush()

            for mix_filename, _ in tqdm(zip(mix_filenames,
                                            target_filenames), total=len(mix_filenames)):
                mix, _ = librosa.load(mix_filename, self.sample_rate, mono=False)
                target_filename = mix_filename[:-9] + 'clean.wav'
                clean, _ = librosa.load(target_filename, self.sample_rate, mono=False)

                def write(a, b):
                    example = tf.train.Example(
                        features=tf.train.Features(
                            feature={
                                "noisy_left": self._float_list_feature(mix[0, a:b]),
                                "noisy_right": self._float_list_feature(mix[1, a:b]),
                                "clean_left": self._float_list_feature(clean[0, a:b]),
                                "clean_right": self._float_list_feature(clean[1, a:b])}))

                    writer.write(example.SerializeToString())

                now_length = mix.shape[-1]

                target_length = int(self.duration * self.sample_rate)

                if now_length < target_length:
                    continue

                stride = int(self.duration * self.sample_rate)
                for i in range(0, now_length - target_length, stride):
                    write(i, i + target_length)
        else:
            mix_filenames = glob.glob(os.path.join(self.wav_dir, "*mixed*.wav"))
            sys.stdout.flush()

            for mix_filename in tqdm(mix_filenames, total=len(mix_filenames)):
                mix, _ = librosa.load(mix_filename, self.sample_rate, mono=False)

                def write(a, b):
                    example = tf.train.Example(
                        features=tf.train.Features(
                            feature={
                                "noisy_left": self._float_list_feature(mix[0, a:b]),
                                "noisy_right": self._float_list_feature(mix[1, a:b])}))

                    writer.write(example.SerializeToString())

                write(None, None)
        writer.close()
        sys.stdout.flush()

    def _decode(self, serialized_example):
        if self.mode != "test":
            example = tf.io.parse_single_example(
                serialized_example,
                features={
                    "noisy_left": tf.io.VarLenFeature(tf.float32),
                    "noisy_right": tf.io.VarLenFeature(tf.float32),
                    "clean_left": tf.io.VarLenFeature(tf.float32),
                    "clean_right": tf.io.VarLenFeature(tf.float32)})

            noisy_left = tf.sparse.to_dense(example["noisy_left"])

            noisy_right = tf.sparse.to_dense(example["noisy_right"])

            clean_left = tf.sparse.to_dense(example["clean_left"])

            clean_right = tf.sparse.to_dense(example["clean_right"])

            return (noisy_left, noisy_right), (clean_left, clean_right)
        else:
            example = tf.io.parse_single_example(
                serialized_example,
                features={
                    "noisy_left": tf.io.VarLenFeature(tf.float32),
                    "noisy_right": tf.io.VarLenFeature(tf.float32)})

            noisy_left = tf.sparse.to_dense(example["noisy_left"])

            noisy_right = tf.sparse.to_dense(example["noisy_right"])

            return noisy_left, noisy_right

Looking forward to your ideas :slight_smile: