Hi
I am trying to implement a TFX pipeline with custom components. As a start I am creating a custom ExampleGen component which takes images and masks as inputs and creates TFRecords as output. I have run this component in the interactive context to verify the output. But which I hook it up to the kubeflow pipeline, I get a failure with the following error
→ 290 input_artifact_spec.task_output_artifact.producer_task = producer_id
291 input_artifact_spec.task_output_artifact.output_artifact_key = output_key
292 task_spec.inputs.artifacts[name].CopyFrom(input_artifact_spec)
TypeError: None has type NoneType, but expected one of: bytes, unicode
Here the producer_id and the outpu_key is not set. Do I need to set them with some value when I configure the component?
Here is how I am setting up the component
import tfx
from tfx import v1
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
) → tfx.v1.dsl.Pipeline:
“”“Creates a three component penguin pipeline with TFX.”“”
Brings data into the pipeline.
#example_gen = tfx.components.CsvExampleGen(input_base=data_root)
#test_context = InteractiveContext()
data_root = os.path.join(DATA_ROOT, ‘buildings/’, ‘images’)
#pdb.set_trace()
examples = data_root
input_artifact = tfx.types.standard_artifacts.Examples()
input_artifact.uri = data_root
input_artifact.split_names = artifact_utils.encode_split_names([‘train’, ‘val’])
input_channel = tfx.types.channel_utils.as_channel(artifacts=[input_artifact])
ingest_images = CustomIngestionComponent(
input=input_channel, name=‘ImageIngestionComponent’)
Uses user-provided Python function that trains a model.
trainer = tfx.v1.components.Trainer(
module_file=module_file,
examples=ingest_images.outputs[‘examples’],
train_args=tfx.v1.proto.TrainArgs(num_steps=100),
eval_args=tfx.v1.proto.EvalArgs(num_steps=5))
Pushes the model to a filesystem destination.
pusher = tfx.v1.components.Pusher(
model=trainer.outputs[‘model’],
push_destination=tfx.v1.proto.PushDestination(
filesystem=tfx.v1.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
Following three components will be included in the pipeline.
components = [
ingest_images,
#trainer,
#pusher,
]
return tfx.v1.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components)
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + ‘_pipeline.json’
runner = tfx.v1.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.v1.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
outpipe = _create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
serving_model_dir=SERVING_MODEL_DIR)
_ = runner.run( outpipe)