Hello all,
I am Trying to Train a Text Regression Model Using TFX Pipeline I have Done Multiple Experiments wrt to preprocessing_fn but unfortunately Pipeline Breaks after Transform Component.
Can anyone help me out.
def _build_keras_model() -> tf.keras.Model:
inputs = [
tf.keras.layers.Input(shape=(max_len,),dtype=tf.string,name=x) for x in _FEATURE_KEYS,]
print(inputs)
other_layers = [tf.keras.layers.Embedding(input_dim=max_features,output_dim=64),
tf.keras.layers.Dense(units=64,activation='relu',name="FirstDenseLayer"),
tf.keras.layers.Dense(1, activation='relu',name="OutputLayer")
]
inputs.extend(other_layers)
model_ex_3 = tf.keras.Sequential(inputs)
# model_ex_3 = tf.keras.Sequential([
# tf.keras.layers.Input(shape=(max_len,),dtype=tf.string,name=x) for x in _FEATURE_KEYS,]
# tf.keras.layers.Embedding(input_dim=max_features,output_dim=64),
# tf.keras.layers.Dense(units=64,activation='relu',name="FirstDenseLayer"),
# # tf.keras.layers.Dropout(rate=0.2,name="FirstDropOutLayer"),
# # tf.keras.layers.Dense(units=hp_units,activation='relu',name="FirstSubDenseLayer"),
# # tf.keras.layers.Dropout(rate=0.2,name="FirstSubDropOutLayer"),
# tf.keras.layers.Dense(units=32,activation='relu',name="SecondDenseLayer"),
# # tf.keras.layers.Dropout(rate=0.2,name="SecondDropOutLayer"),
# # tf.keras.layers.Dense(units=hp_units,activation='relu',name="ThirdDenseLayer"),
# # tf.keras.layers.Dropout(rate=0.2,name="ThirdropOutLayer"),
# tf.keras.layers.Dense(1, activation='relu',name="OutputLayer")
# ],
# name ="Text-Regression-Third")
##Defining Tuner Learning Rate
# hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
model_ex_3.compile(loss = tf.keras.losses.mean_absolute_error,
optimizer = tf.keras.optimizers.Adam(),
metrics=['mae']
)
model_ex_3.summary()
print(model_ex_3.summary())
return model_ex_3
from typing import List
import tensorflow_model_analysis as tfma
from tfx.components import SchemaGen
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
_LABEL_KEY = ‘Total Hours’
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
def _create_pipeline(pipeline_name: str,
pipeline_root: str,
data_root: str,
module_file: str,
eval_accuracy_threshold: float,
serving_model_dir: str,
metadata_path: str) → tfx.dsl.Pipeline:
“”“Creates a three component penguin pipeline with TFX.”“”
Brings data into the pipeline.
components =[]
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
components.append(example_gen)
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs['examples'])
components.append(statistics_gen)
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'])
components.append(schema_gen)
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
preprocessing_fn='preprocessing.preprocessing_fn',
splits_config=transform_pb2.SplitConfig(
splits=[
transform_pb2.SplitConfig.Split(name=‘train’, hash_buckets=4),
transform_pb2.SplitConfig.Split(name=‘eval’, hash_buckets=1)
]
)
)
components.append(transform)
print(transform.outputs['transformed_examples'])
Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
transformed_examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5),
run_fn='time_log_trainer.run_fn')
components.append(trainer)
# Get the latest blessed model for model validation.
model_resolver = tfx.dsl.Resolver(
strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
model_blessing=tfx.dsl.Channel(
type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
'latest_blessed_model_resolver')
# perform quality validation of a candidate model (compared to a baseline).
eval_config = tfma.EvalConfig(
model_specs=[
tfma.ModelSpec(
signature_name='serving_default',
label_key=_LABEL_KEY,
# Use transformed label key if Transform is used.
# label_key=features.transformed_name(features.LABEL_KEY),
preprocessing_function_names=['preprocessing.preprocessing_fn'])
],
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='MeanAbsoluteError',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': eval_accuracy_threshold}),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10})))
])
])
evaluator = tfx.components.Evaluator( # pylint: disable=unused-variable
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config)
from tfx.types import Channel
model_blessing_channel = Channel(…)
Pushes the model to a filesystem destination.
components.append(evaluator)
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
components.append(pusher)
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components)
tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
eval_accuracy_threshold=0,
module_file=_trainer_module_file,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH))
ERROR:
INFO:dill:D2: <dict object at 0x7fcb1bb40240>
T4: <class 'threading.Condition'>
INFO:dill:T4: <class 'threading.Condition'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7fcb182a1d00>
INFO:dill:D2: <dict object at 0x7fcb182a1d00>
Lo: <unlocked _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:Lo: <unlocked _thread.lock object at 0x7fcb1a1f6bd0>
F2: <function _create_lock at 0x7fcbac181040>
INFO:dill:F2: <function _create_lock at 0x7fcbac181040>
# F2
INFO:dill:# F2
# Lo
INFO:dill:# Lo
B3: <built-in method acquire of _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:B3: <built-in method acquire of _thread.lock object at 0x7fcb1a1f6bd0>
# B3
INFO:dill:# B3
B3: <built-in method release of _thread.lock object at 0x7fcb1a1f6bd0>
INFO:dill:B3: <built-in method release of _thread.lock object at 0x7fcb1a1f6bd0>
# B3
INFO:dill:# B3
T4: <class 'collections.deque'>
INFO:dill:T4: <class 'collections.deque'>
# T4
INFO:dill:# T4
# D2
INFO:dill:# D2
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb1be88a00>
INFO:dill:D2: <dict object at 0x7fcb1be88a00>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb187a58c0>
INFO:dill:D2: <dict object at 0x7fcb187a58c0>
# D2
INFO:dill:# D2
D2: <dict object at 0x7fcb1bbb4800>
INFO:dill:D2: <dict object at 0x7fcb1bbb4800>
# D2
INFO:dill:# D2
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 372 failed.
INFO:absl:Cleaning up stateless execution info.
PLEASE HELP ME OUT.