구글 클라우드 인사이트 Vertex AI Pipelines 활용하여 파이프라인 자동화 구글 인사이트 by Miyeon. Jo 2023년 11월 08일 2023년 11월 08일 548 구글 PS2팀 강희정목차Vertex AI Pipelines 소개Vertex AI Pipelines 작성Artifact Registry에 Vertex AI Pipelines 업로드Vertex AI Pipelines Schedules 이용한 실행 자동화1. Vertex AI Pipelines 소개Vertex AI Pipelines는 머신러닝 워크플로를 조정하여 머신러닝 시스템을 자동화하고, 모니터링 및 관리할 수 있습니다. Vertex AI Pipelines를 사용하면 머신러닝 시스템에 DevOps를 결합한 MLOps 를 구축할 수 있습니다. 서버리스 방식으로 제공되어 인프라나 클러스터 유지 보수에대한 관리 요소 없이 파이프라인 구축에 집중할 수 있습니다.2. Vertex AI Pipelines 작성Vertex Pipelines는 TFX( TensorFlow Extended ) 및 KFP( Kubeflow Pipelines ) 두 가지 오픈소스 Python SDK를 지원합니다. 이번 예제에서는 KFP SDK를 사용하며 Vertex AI 서비스에 대한 액세스를 지원하는 Google Cloud pipeline component 사용을 포함합니다.2-1. 필요한 라이브러리를 불러오고 기본 파라미터를 정의합니다.from typing import NamedTuple import kfp from google.cloud import aiplatform from kfp import compiler, dsl from kfp.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component) from google_cloud_pipeline_components.v1.automl.training_job import \ AutoMLTabularTrainingJobRunOp from google_cloud_pipeline_components.v1.dataset.create_tabular_dataset.component import \ tabular_dataset_create as TabularDatasetCreateOp from google_cloud_pipeline_components.v1.endpoint.create_endpoint.component import \ endpoint_create as EndpointCreateOp from google_cloud_pipeline_components.v1.endpoint.deploy_model.component import \ model_deploy as ModelDeployOp PROJECT_ID = "YOUR-PROJECT-ID" REGION = "us-central1" BUCKET_URI = f"gs://{PROJECT_ID}-unique" # set path for storing the pipeline artifacts PIPELINE_NAME = "automl-tabular-beans-training" PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/beans" 2-2. 파이프라인에서 실행할 모델 평가 구성요소를 작성합니다.@component( base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest", packages_to_install=["google-cloud-aiplatform"], ) def classification_model_eval_metrics( project: str, location: str, thresholds_dict_str: str, model: Input[Artifact], metrics: Output[Metrics], metricsc: Output[ClassificationMetrics], ) -> NamedTuple("Outputs", [("dep_decision", str)]): # Return parameter. import json import logging from google.cloud import aiplatform aiplatform.init(project=project) # Fetch model eval info def get_eval_info(model): response = model.list_model_evaluations() metrics_list = [] metrics_string_list = [] for evaluation in response: evaluation = evaluation.to_dict() print("model_evaluation") print(" name:", evaluation["name"]) print(" metrics_schema_uri:", evaluation["metricsSchemaUri"]) metrics = evaluation["metrics"] for metric in metrics.keys(): logging.info("metric: %s, value: %s", metric, metrics[metric]) metrics_str = json.dumps(metrics) metrics_list.append(metrics) metrics_string_list.append(metrics_str) return ( evaluation["name"], metrics_list, metrics_string_list, ) # Use the given metrics threshold(s) to determine whether the model is # accurate enough to deploy. def classification_thresholds_check(metrics_dict, thresholds_dict): for k, v in thresholds_dict.items(): logging.info("k {}, v {}".format(k, v)) if k in ["auRoc", "auPrc"]: # higher is better if metrics_dict[k] < v: # if under threshold, don't deploy logging.info("{} < {}; returning False".format(metrics_dict[k], v)) return False logging.info("threshold checks passed.") return True def log_metrics(metrics_list, metricsc): test_confusion_matrix = metrics_list[0]["confusionMatrix"] logging.info("rows: %s", test_confusion_matrix["rows"]) # log the ROC curve fpr = [] tpr = [] thresholds = [] for item in metrics_list[0]["confidenceMetrics"]: fpr.append(item.get("falsePositiveRate", 0.0)) tpr.append(item.get("recall", 0.0)) thresholds.append(item.get("confidenceThreshold", 0.0)) print(f"fpr: {fpr}") print(f"tpr: {tpr}") print(f"thresholds: {thresholds}") metricsc.log_roc_curve(fpr, tpr, thresholds) # log the confusion matrix annotations = [] for item in test_confusion_matrix["annotationSpecs"]: annotations.append(item["displayName"]) logging.info("confusion matrix annotations: %s", annotations) metricsc.log_confusion_matrix( annotations, test_confusion_matrix["rows"], ) # log textual metrics info as well for metric in metrics_list[0].keys(): if metric != "confidenceMetrics": val_string = json.dumps(metrics_list[0][metric]) metrics.log_metric(metric, val_string) logging.getLogger().setLevel(logging.INFO) # extract the model resource name from the input Model Artifact model_resource_path = model.metadata["resourceName"] logging.info("model path: %s", model_resource_path) # Get the trained model resource model = aiplatform.Model(model_resource_path) # Get model evaluation metrics from the the trained model eval_name, metrics_list, metrics_str_list = get_eval_info(model) logging.info("got evaluation name: %s", eval_name) logging.info("got metrics list: %s", metrics_list) log_metrics(metrics_list, metricsc) thresholds_dict = json.loads(thresholds_dict_str) deploy = classification_thresholds_check(metrics_list[0], thresholds_dict) if deploy: dep_decision = "true" else: dep_decision = "false" logging.info("deployment decision is %s", dep_decision) return (dep_decision,)2-3. 다음과 같은 Google Cloud Pipeline Component가 포함된 파이프라인을 정의합니다.TabularDatasetCreateOp : BigQuery를 데이터소스로 한 tabular 형식의 데이터세트 정의AutoMLTabularTrainingJobRunOp : tabular 형식의 AutoML 학습 후, Vertex AI Model Registry에 등록ModelDeployOp : Model Registry에 등록된 학습 모델을 Online Prediction(endpoint)에 배포@kfp.dsl.pipeline(name="automl-tab-beans-training-v2", pipeline_root=PIPELINE_ROOT) def pipeline( bq_source: str = "bq://aju-dev-demos.beans.beans1", display_name: str = DISPLAY_NAME, project: str = PROJECT_ID, gcp_region: str = "us-central1", api_endpoint: str = "us-central1-aiplatform.googleapis.com", thresholds_dict_str: str = '{"auRoc": 0.95}', ): dataset_create_op = TabularDatasetCreateOp( project=project, display_name=display_name, bq_source=bq_source ) training_op =AutoMLTabularTrainingJobRunOp( project=project, display_name=display_name, optimization_prediction_type="classification", optimization_objective="minimize-log-loss", budget_milli_node_hours=1000, column_transformations=[ {"numeric": {"column_name": "Area"}}, {"numeric": {"column_name": "Perimeter"}}, {"numeric": {"column_name": "MajorAxisLength"}}, ... other columns ... {"categorical": {"column_name": "Class"}}, ], dataset=dataset_create_op.outputs["dataset"], target_column="Class", ) model_eval_task = classif_model_eval_metrics( project, gcp_region, api_endpoint, thresholds_dict_str, training_op.outputs["model"], ) with dsl.Condition( model_eval_task.outputs["dep_decision"] == "true", name="deploy_decision", ): deploy_op =ModelDeployOp( # noqa: F841 model=training_op.outputs["model"], project=project, machine_type="n1-standard-4", )2-4. Google Cloud 파이프라인 작업을 실행합니다.aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI) from google.cloud import bigquery bq_source = "aju-dev-demos.beans.beans1" client = bigquery.Client() bq_region = client.get_table(bq_source).location.lower() try: assert bq_region in REGION print(f"Region validated: {REGION}") except AssertionError: print( "Please make sure the region of BigQuery (source) and that of the pipeline are the same." ) # Configure the pipeline job = aiplatform.PipelineJob( display_name=PIPELINE_DISPLAY_NAME, template_path="tabular_classification_pipeline.yaml", pipeline_root=PIPELINE_ROOT, parameter_values={ "project": PROJECT_ID, "gcp_region": REGION, "bq_source": f"bq://{bq_source}", "thresholds_dict_str": '{"auRoc": 0.95}', "DATASET_DISPLAY_NAME": DATASET_DISPLAY_NAME, "TRAINING_DISPLAY_NAME": TRAINING_DISPLAY_NAME, "MODEL_DISPLAY_NAME": MODEL_DISPLAY_NAME, "ENDPOINT_DISPLAY_NAME": ENDPOINT_DISPLAY_NAME, "MACHINE_TYPE": MACHINE_TYPE, }, enable_caching=False, ) # Run the job job.submit()2-5. Vertex AI Pipelines 에서 파이프라인 작업을 확인합니다.3. Artifact Registry에 위에서 생성한 파이프라인 YAML 파일을 업로드합니다.Artifact Registry를 이용하여 파이프라인 파일을 관리하면 재사용과 추적이 용이합니다.※ Artifact Registry에 `demo-kfp-repo` 이름의 KFP 저장소가 생성되어 있어야 합니다.from kfp.registry import RegistryClient client = RegistryClient(host=f"https://us-central1-kfp.pkg.dev/{PROJECT_ID}/demo-kfp-repo") templateName, versionName = client.upload_pipeline( file_name="tabular_classification_pipeline.yaml", tags=["latest"],) [Artifact Registry 저장소에 업로드 된 파일 확인]4. Vertex AI Pipelines Schedules 이용한 실행 자동화Vertex AI Pipelines에서는 파이프라인 YAML 파일을 이용하여 스케줄을 등록할 수 있는 기능을 제공하고 있습니다. 학습 일정이 일정한 스케줄에 의해 이루어지는 경우, 간단히 등록하여 자동 학습을 실행할 수 있습니다. Google Cloud console에서 3번 단계에서 Artifact Registry에 업로드한 파이프라인을 자동 실행하도록 스케줄링을 만들어 보겠습니다.다음 순서에 따라 파이프라인 일정을 만듭니다.4-1. Google Cloud 콘솔의 Vertex AI 섹션에서 파이프라인 페이지의 일정 탭으로 이동합니다.4-2. 예약된 실행 만들기를 클릭하여 파이프라인 실행 만들기 창을 엽니다.4-3. 3번에서 업로드한 Artifact Registry에서 파이프라인을 가져오기 위해 기존 파이프라인에서 선택을 클릭합니다.파이프라인 YAML 파일이 포함된 저장소를 선택합니다.파이프라인 구성요소와 버전을 선택합니다.4-4. 파이프라인 실행을 식별하기 위한 실행 이름을 지정하고, 실행할 리전을 선택합니다.4-5. 다음과 같이 실행 일정을 지정합니다.반복을 선택합니다.시작 시간에서 일정이 활성화되는 시간을 지정합니다.즉시 : 일정을 만들고 즉시 첫 번째 실행이 수행되도록 예약활성화 날짜 : 특정 시간과 날짜에 첫 번째 실행 예약빈도 필드에서 unix-cron을 기반으로 하는 크론 일정 표현식을 사용하여 파이프라인 실행을 예약하고 실행할 빈도를 지정합니다.실행 예약 시간을 실행할 시간대를 선택합니다.실행 예약 시간을 실행할 시간대를 선택합니다.종료에서 일정이 종료되는 시간을 지정합니다.만료되지 않음 : 무기한 파이프라인 실행사용 : 특정 날짜와 시간에 일정 종료4-6. (선택사항) 커스텀 서비스 계정, 고객 관리 암호화 키(CMEK) 또는 피어링된 VPC 네트워크를 지정하는 경우 입력합니다.4-7. 계속을 클릭하고 파이프라인의 런타임 구성을 지정합니다.4-8. 제출을 클릭하여 파이프라인 실행을 만듭니다.4-9. 설정한 스케줄에 맞게 파이프라인이 실행되는지 확인합니다. 출처https://cloud.google.com/vertex-ai/docs/pipelines/introductionhttps://cloud.google.com/blog/topics/developers-practitioners/use-vertex-pipelines-build-automl-classification-end-end-workflowhttps://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/automl_tabular_classification_beans.ipynbhttps://cloud.google.com/vertex-ai/docs/pipelines/create-pipeline-templatehttps://cloud.google.com/vertex-ai/docs/pipelines/schedule-pipeline-run