Google Cloud Platform 내에서 일련의 작업들에 대해 workflow를 생성하고, 스케쥴링, 모니터링할수 있는 관리형 서비스(Composer)를 소개합니다.
Composer는 Java와 Python의 프로그래밍 언어를 통해 구현이 가능하고, Web Interface를 제공하기 때문에 손쉽게 접근이 가능합니다. Google Cloud Platform에서 Kubernetes 환경하에서 관리형 서비스로 제공됩니다.
오픈소스인 Apache Airflow(https://airflow.apache.org/)를 기반으로 GCP내의 SDK가 기본적으로 내장되어 있어 별도로 설치 하지 않아도 GCP내의 Components에 Access가능한 장점을 가지고 있습니다.
뉴스레터 가입
클라우드 관련 최신 소식을 업데이트 받으실 수 있습니다.
1. 구성요소
- DAGs(Directed Acyclic Graph) : Task들의 집합이고, Workflow를 어떻게 수행할지를 정의합니다.
- Operators : 실제로 수행하는 주체가 되며, 다양한 Operator를 사용할 수 있습니다.
(예) BashOperator, PythonOperator, BigQueryOperator 등 - Tasks : Operator가 생성되는 단위입니다.
- Task Instances : Task가 수행되는 특정 단위 입니다. (Running, Success, Failed, Skipped 등)
2. Composer 생성
Composer는 GCP 콘솔에서 간단하게 웹 기반으로 생성할 수 있습니다.
아래와 같이 GCP 콘솔로 들어가서 Composer 항목에서 CREATE 버튼을 누르면 생성 할 수 있습니다.
Composer는 Kubernetes 기반 아래에서 동작하기 때문에 생성 시 Kubernetes 엔진에 관련된 옵션들을 입력해야 합니다. 기본설정 이외에 보다 자세한 옵션(디스크 크기, 노드 개수 등)은 아래 와 같습니다.
설정 | 내용 |
---|---|
Node count | Kubernetes Engine node의 개수이며, Default는 3개입니다. |
Location | 필수항목으로 Kubernetes를 위한 Compute Engine이 어떤 region에 생성될지를 정의합니다. |
Zone | Compute Engine이 어떤 Zone에 생성될지를 결정합니다. 만약 설정하지 않으면 랜덤하게 위치됩니다. |
Machine Type | Kubernetes 클러스터를 위해 사용되는 Compute Engine의 Machine Type입니다. 기본설정은 n1-standard-1 입니다. |
Disk Size | Kubernetes node의 디스크 크기이며, 기본값은 100GB이며 최소값은 20GB입니다. |
OAuth Scopes | Google API를 Access할 수 있도록 해주는 옵션이며 기본값은 https://www.googleapis.com/auth/cloud-platform 입니다. |
Network ID | VPC network ID를 지정할 수 있으며, 기본값은 default network입니다. |
Subnetwork ID | Compute Engine Machine들이 통신하는 subnetwork ID입니다. |
Service account | Node에 사용되는 Service account이며, 기본값은 Compute Engine service account가 사용됩니다. |
Tags | Node에 적용되는 Instance tags의 리스트를 정의할 수 있으며, 방화벽 혹은 valid source를 확인하는데 사용됩니다. |
https://cloud.google.com/composer/docs/how-to/managing/creating#creating_a_new_environment
생성이 완료되면 아래와 같이 Composer에 대한 접근방법, 설정, 환경변수들에 대해 확인할 수 있습니다.
3. Composer 수행 방법
1) Python으로 수행할 Workflow를 작성합니다.
2) 작성된 코드를 Google Cloud Storage에 업로드 합니다. 이 저장소는 DAG이 저장되는 폴더로 Composer가 수행하게 됩니다.
3) Composer가 자동적으로 DAG을 감지하여 정의된 스케줄에 따라 수행하게 됩니다. 이 동작은 보통 3~4분 정도가 소요됩니다.
4. 테스트 시나리오
1) BigQuery에서 Table을 생성합니다.
2) BigQuery에서 데이터를 조회한 다음 1번에서 생성한 Table에 데이터를 삽입합니다.
샘플 코드는 아래와 같습니다.
default_dag_args = {
‘start_date’: datetime.datetime(2018, 5, 28),
’email’: [‘test@email.com’],
……
‘retry_delay’: datetime.timedelta(minutes=5),
‘project_id’: ‘sample-project’
}
with models.DAG(
‘BigQuery_Batch’,
schedule_interval = ’50 4 * * *’,
default_args=default_dag_args) as dag:
# Create BigQuery Table
make_bq_dataset = bash_operator.BashOperator(
task_id=’make_bq_dataset’,
bash_command=’bq ls {} | grep {} || bq mk {} {}’.format(
bq_past_dataset_name, yesterday_ds_nodash, bq_destination_table,
bq_page_access_log_schema))
……
make_bq_dataset >> bq_query
위 샘플 코드는 BashOperator를 이용한 것으로, bq 명령어로 table을 생성하도록 Composer내에서 구현한 것입니다. 수행결과는 Composer 웹에서 확인할 수 있습니다.
5. Email Notification 설정
Workflow를 수행한 다음 수행결과를 이메일로 받고자 할 경우, 아래와 같이 환경변수에 등록을 해주면 수행결과를 이메일로 받을 수 있습니다.
1) SendGrid Email API를 enable합니다.
2) Composer 생성 시 아래 변수를 설정합니다.
구글 클라우드 플랫폼(GCP)에 대해 더 알고 싶으세요?
베스핀글로벌의 GCP 전문 엔지니어가 답해드립니다.