구글 PS2팀 강희정
1. Vertex AI Pipelines 소개
Vertex AI Pipelines는 머신러닝 워크플로를 조정하여 머신러닝 시스템을 자동화하고, 모니터링 및 관리할 수 있습니다. Vertex AI Pipelines를 사용하면 머신러닝 시스템에 DevOps를 결합한 MLOps 를 구축할 수 있습니다.
서버리스 방식으로 제공되어 인프라나 클러스터 유지 보수에대한 관리 요소 없이 파이프라인 구축에 집중할 수 있습니다.
2. Vertex AI Pipelines 작성
Vertex Pipelines는 TFX( TensorFlow Extended ) 및 KFP( Kubeflow Pipelines ) 두 가지 오픈소스 Python SDK를 지원합니다. 이번 예제에서는 KFP SDK를 사용하며 Vertex AI 서비스에 대한 액세스를 지원하는 Google Cloud pipeline component 사용을 포함합니다.
2-1. 필요한 라이브러리를 불러오고 기본 파라미터를 정의합니다.
from typing import NamedTuple
import kfp
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output,
component)
from google_cloud_pipeline_components.v1.automl.training_job import \
AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.dataset.create_tabular_dataset.component import \
tabular_dataset_create as TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.endpoint.create_endpoint.component import \
endpoint_create as EndpointCreateOp
from google_cloud_pipeline_components.v1.endpoint.deploy_model.component import \
model_deploy as ModelDeployOp
PROJECT_ID = "YOUR-PROJECT-ID"
REGION = "us-central1"
BUCKET_URI = f"gs://{PROJECT_ID}-unique"
# set path for storing the pipeline artifacts
PIPELINE_NAME = "automl-tabular-beans-training"
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/beans"
2-2. 파이프라인에서 실행할 모델 평가 구성요소를 작성합니다.
@component(
base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
project: str,
location: str,
thresholds_dict_str: str,
model: Input[Artifact],
metrics: Output[Metrics],
metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]): # Return parameter.
import json
import logging
from google.cloud import aiplatform
aiplatform.init(project=project)
# Fetch model eval info
def get_eval_info(model):
response = model.list_model_evaluations()
metrics_list = []
metrics_string_list = []
for evaluation in response:
evaluation = evaluation.to_dict()
print("model_evaluation")
print(" name:", evaluation["name"])
print(" metrics_schema_uri:", evaluation["metricsSchemaUri"])
metrics = evaluation["metrics"]
for metric in metrics.keys():
logging.info("metric: %s, value: %s", metric, metrics[metric])
metrics_str = json.dumps(metrics)
metrics_list.append(metrics)
metrics_string_list.append(metrics_str)
return (
evaluation["name"],
metrics_list,
metrics_string_list,
)
# Use the given metrics threshold(s) to determine whether the model is
# accurate enough to deploy.
def classification_thresholds_check(metrics_dict, thresholds_dict):
for k, v in thresholds_dict.items():
logging.info("k {}, v {}".format(k, v))
if k in ["auRoc", "auPrc"]: # higher is better
if metrics_dict[k] < v: # if under threshold, don't deploy
logging.info("{} < {}; returning False".format(metrics_dict[k], v))
return False
logging.info("threshold checks passed.")
return True
def log_metrics(metrics_list, metricsc):
test_confusion_matrix = metrics_list[0]["confusionMatrix"]
logging.info("rows: %s", test_confusion_matrix["rows"])
# log the ROC curve
fpr = []
tpr = []
thresholds = []
for item in metrics_list[0]["confidenceMetrics"]:
fpr.append(item.get("falsePositiveRate", 0.0))
tpr.append(item.get("recall", 0.0))
thresholds.append(item.get("confidenceThreshold", 0.0))
print(f"fpr: {fpr}")
print(f"tpr: {tpr}")
print(f"thresholds: {thresholds}")
metricsc.log_roc_curve(fpr, tpr, thresholds)
# log the confusion matrix
annotations = []
for item in test_confusion_matrix["annotationSpecs"]:
annotations.append(item["displayName"])
logging.info("confusion matrix annotations: %s", annotations)
metricsc.log_confusion_matrix(
annotations,
test_confusion_matrix["rows"],
)
# log textual metrics info as well
for metric in metrics_list[0].keys():
if metric != "confidenceMetrics":
val_string = json.dumps(metrics_list[0][metric])
metrics.log_metric(metric, val_string)
logging.getLogger().setLevel(logging.INFO)
# extract the model resource name from the input Model Artifact
model_resource_path = model.metadata["resourceName"]
logging.info("model path: %s", model_resource_path)
# Get the trained model resource
model = aiplatform.Model(model_resource_path)
# Get model evaluation metrics from the the trained model
eval_name, metrics_list, metrics_str_list = get_eval_info(model)
logging.info("got evaluation name: %s", eval_name)
logging.info("got metrics list: %s", metrics_list)
log_metrics(metrics_list, metricsc)
thresholds_dict = json.loads(thresholds_dict_str)
deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
if deploy:
dep_decision = "true"
else:
dep_decision = "false"
logging.info("deployment decision is %s", dep_decision)
return (dep_decision,)
2-3. 다음과 같은 Google Cloud Pipeline Component가 포함된 파이프라인을 정의합니다.
- TabularDatasetCreateOp : BigQuery를 데이터소스로 한 tabular 형식의 데이터세트 정의
- AutoMLTabularTrainingJobRunOp : tabular 형식의 AutoML 학습 후, Vertex AI Model Registry에 등록
- ModelDeployOp : Model Registry에 등록된 학습 모델을 Online Prediction(endpoint)에 배포
@kfp.dsl.pipeline(name="automl-tab-beans-training-v2",
pipeline_root=PIPELINE_ROOT)
def pipeline(
bq_source: str = "bq://aju-dev-demos.beans.beans1",
display_name: str = DISPLAY_NAME,
project: str = PROJECT_ID,
gcp_region: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str = '{"auRoc": 0.95}',
):
dataset_create_op = TabularDatasetCreateOp(
project=project, display_name=display_name, bq_source=bq_source
)
training_op =AutoMLTabularTrainingJobRunOp(
project=project,
display_name=display_name,
optimization_prediction_type="classification",
optimization_objective="minimize-log-loss",
budget_milli_node_hours=1000,
column_transformations=[
{"numeric": {"column_name": "Area"}},
{"numeric": {"column_name": "Perimeter"}},
{"numeric": {"column_name": "MajorAxisLength"}},
... other columns ...
{"categorical": {"column_name": "Class"}},
],
dataset=dataset_create_op.outputs["dataset"],
target_column="Class",
)
model_eval_task = classif_model_eval_metrics(
project,
gcp_region,
api_endpoint,
thresholds_dict_str,
training_op.outputs["model"],
)
with dsl.Condition(
model_eval_task.outputs["dep_decision"] == "true",
name="deploy_decision",
):
deploy_op =ModelDeployOp( # noqa: F841
model=training_op.outputs["model"],
project=project,
machine_type="n1-standard-4",
)
2-4. Google Cloud 파이프라인 작업을 실행합니다.
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)
from google.cloud import bigquery
bq_source = "aju-dev-demos.beans.beans1"
client = bigquery.Client()
bq_region = client.get_table(bq_source).location.lower()
try:
assert bq_region in REGION
print(f"Region validated: {REGION}")
except AssertionError:
print(
"Please make sure the region of BigQuery (source) and that of the pipeline are the same."
)
# Configure the pipeline
job = aiplatform.PipelineJob(
display_name=PIPELINE_DISPLAY_NAME,
template_path="tabular_classification_pipeline.yaml",
pipeline_root=PIPELINE_ROOT,
parameter_values={
"project": PROJECT_ID,
"gcp_region": REGION,
"bq_source": f"bq://{bq_source}",
"thresholds_dict_str": '{"auRoc": 0.95}',
"DATASET_DISPLAY_NAME": DATASET_DISPLAY_NAME,
"TRAINING_DISPLAY_NAME": TRAINING_DISPLAY_NAME,
"MODEL_DISPLAY_NAME": MODEL_DISPLAY_NAME,
"ENDPOINT_DISPLAY_NAME": ENDPOINT_DISPLAY_NAME,
"MACHINE_TYPE": MACHINE_TYPE,
},
enable_caching=False,
)
# Run the job
job.submit()
2-5. Vertex AI Pipelines 에서 파이프라인 작업을 확인합니다.

3. Artifact Registry에 위에서 생성한 파이프라인 YAML 파일을 업로드합니다.
Artifact Registry를 이용하여 파이프라인 파일을 관리하면 재사용과 추적이 용이합니다.
※ Artifact Registry에 `demo-kfp-repo` 이름의 KFP 저장소가 생성되어 있어야 합니다.
from kfp.registry import RegistryClient
client = RegistryClient(host=f"https://us-central1-kfp.pkg.dev/{PROJECT_ID}/demo-kfp-repo")
templateName, versionName = client.upload_pipeline(
file_name="tabular_classification_pipeline.yaml",
tags=["latest"],)
[Artifact Registry 저장소에 업로드 된 파일 확인]

4. Vertex AI Pipelines Schedules 이용한 실행 자동화
Vertex AI Pipelines에서는 파이프라인 YAML 파일을 이용하여 스케줄을 등록할 수 있는 기능을 제공하고 있습니다. 학습 일정이 일정한 스케줄에 의해 이루어지는 경우, 간단히 등록하여 자동 학습을 실행할 수 있습니다.
Google Cloud console에서 3번 단계에서 Artifact Registry에 업로드한 파이프라인을 자동 실행하도록 스케줄링을 만들어 보겠습니다.
다음 순서에 따라 파이프라인 일정을 만듭니다.
4-1. Google Cloud 콘솔의 Vertex AI 섹션에서 파이프라인 페이지의 일정 탭으로 이동합니다.

4-2. 예약된 실행 만들기를 클릭하여 파이프라인 실행 만들기 창을 엽니다.
4-3. 3번에서 업로드한 Artifact Registry에서 파이프라인을 가져오기 위해 기존 파이프라인에서 선택을 클릭합니다.

- 파이프라인 YAML 파일이 포함된 저장소를 선택합니다.
- 파이프라인 구성요소와 버전을 선택합니다.
4-4. 파이프라인 실행을 식별하기 위한 실행 이름을 지정하고, 실행할 리전을 선택합니다.

4-5. 다음과 같이 실행 일정을 지정합니다.

- 반복을 선택합니다.
- 시작 시간에서 일정이 활성화되는 시간을 지정합니다.
- 즉시 : 일정을 만들고 즉시 첫 번째 실행이 수행되도록 예약
- 활성화 날짜 : 특정 시간과 날짜에 첫 번째 실행 예약
- 빈도 필드에서 unix-cron을 기반으로 하는 크론 일정 표현식을 사용하여 파이프라인 실행을 예약하고 실행할 빈도를 지정합니다.실행 예약 시간을 실행할 시간대를 선택합니다.
- 실행 예약 시간을 실행할 시간대를 선택합니다.
- 종료에서 일정이 종료되는 시간을 지정합니다.
- 만료되지 않음 : 무기한 파이프라인 실행
- 사용 : 특정 날짜와 시간에 일정 종료
4-6. (선택사항) 커스텀 서비스 계정, 고객 관리 암호화 키(CMEK) 또는 피어링된 VPC 네트워크를 지정하는 경우 입력합니다.
4-7. 계속을 클릭하고 파이프라인의 런타임 구성을 지정합니다.

4-8. 제출을 클릭하여 파이프라인 실행을 만듭니다.

4-9. 설정한 스케줄에 맞게 파이프라인이 실행되는지 확인합니다.

출처
- https://cloud.google.com/vertex-ai/docs/pipelines/introduction
- https://cloud.google.com/blog/topics/developers-practitioners/use-vertex-pipelines-build-automl-classification-end-end-workflow
- https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/automl_tabular_classification_beans.ipynb
- https://cloud.google.com/vertex-ai/docs/pipelines/create-pipeline-template
- https://cloud.google.com/vertex-ai/docs/pipelines/schedule-pipeline-run