When trying to run a custom component in the vertexAI platform, getting the above error, although the component is defined. Following is some code example:
class ImageExampleGenExecutor(BaseExampleGenExecutor):
def GetInputSourceToExamplePTransform(self) -> beam.PTransform:
"""Returns PTransform for image to TF examples."""
return ImageToExample
#--------------------------------------------------------------------------------------------------------------------------------------------------
import tfx
from tfx import v1
#import custom_examplegen_trainer
#from custom_examplegen_trainer import ImageExampleGenExecutor
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
) → tfx.v1.dsl.Pipeline:
“”“Creates a three component penguin pipeline with TFX.”“”
Brings data into the pipeline.
#example_gen = tfx.components.CsvExampleGen(input_base=data_root)
output = example_gen_pb2.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(name=‘train’, hash_buckets=4),
example_gen_pb2.SplitConfig.Split(name=‘eval’, hash_buckets=1)
]))
input_config = example_gen_pb2.Input(splits=[
example_gen_pb2.Input.Split(name=‘images’, pattern=‘/.jpg’),
])
data_root = os.path.join(“content/”, ‘PetImages’)
#input_artifact = tfx.types.standard_artifacts.Examples()
#input_artifact.uri = data_root
#input_channel = tfx.types.channel_utils.as_channel(artifacts=[input_artifact])
example_gen = FileBasedExampleGen(
input_base=data_root,
input_config=input_config,
output_config=output,
custom_executor_spec=executor_spec.ExecutorClassSpec(ImageExampleGenExecutor))
#print (example_gen)
Uses user-provided Python function that trains a model.
Following three components will be included in the pipeline.
components = [
example_gen,
#trainer,
#pusher,
]
return tfx.v1.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components)
#--------------------------------------------------------------------------------------------------------------------------------------------
import os
import tfx
from tfx import v1
#import custom_examplegen_trainer
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + ‘_pipeline.json’
runner = tfx.v1.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.v1.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
outpipe = _create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
serving_model_dir=SERVING_MODEL_DIR)
_ = runner.run( outpipe)
#----------------------------------------------------------------------------------------------------------------------------------------------
from kfp.v2.google import client
pipelines_client = client.AIPlatformClient(
project_id=GOOGLE_CLOUD_PROJECT,
region=GOOGLE_CLOUD_REGION,
)
_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE)
Not sure if I am missing any configuration here.
Thanks
Subhasish