구글 PS1팀 박성진
1. 데이터 파이프라인
데이터 파이프라인은 일련의 데이터 처리 단계입니다. 데이터가 현재 데이터 플랫폼에 로드되지 않은 경우 파이프라인 시작 부분에서 수집됩니다. 그런 다음 각 스텝이 다음 스텝으로 입력되는 출력을 제공하는 일련의 스텝이 있습니다. 이 작업은 파이프라인이 완료될 때까지 계속됩니다. 경우에 따라 독립 단계를 병렬로 실행할 수 있습니다.
아래는 간단한 파이프라인의 그림입니다.
각 단계에서 실행되는 작업은 파일 생성 유무를 확인하는 센싱 작업일 수도 있고, 파일을 업로드 하는 작업일 수도 있으며, 파일을 라인별로 읽어서 변환하는 작업일 수도 있고, 여러 개의 파일을 하나로 싱크 하거나 데이터베이스에 적재하는 작업일 수도 있습니다.
또한 이런 작업들이 상호간의 의존성을 가질 수도 있으며, 동시에 진행이 될 수도 있습니다.
파이프라인은 이런 일련의 작업들을 정해진 시간에 규칙적으로 반복할 수도 있을 뿐더러, 동시에 여러 개의 파이프라인을 작동할 수도 있습니다.
이런 작업의 흐름을 관리하는 도구를 Workflow Management Platform(WfMP) 이라고 합니다.
WfMP를 구현한 소프트웨어는 아주 많지만 Apache Airflow는 유명한 오픈소스 중 하나이며, Google Cloud Platform에서는 Apache Airflow의 Managed Service인 Cloud Composer를 제공하고 있습니다.
따라서 Cloud Composer를 활용해 데이터 파이프라인을 구축하기 위해서는 Apache Airflow를 먼저 이해해야 합니다.
2. Apache Airflow 아키텍처
아래는 Apache Airflow의 아키텍처를 간단히 표현한 그림입니다.
Airflow는 간단한 Web UI를 제공해 사용자로 하여금 파이프라인 운영을 원활하게 하는데 도움을 주고 있지만, 아키텍처 구조상 SPoF가 발생할 수 있으므로 파이프라인 구축 시에 신중해야 합니다.
컴포넌트 별로 하나씩 살펴보겠습니다.
- User Interface
User Interface는 사용자의 웹 환경 혹은 터미널의 CLI가 될 수 있습니다. - Webserver
사용자 요청을 처리하기 위한 웹서버입니다. - Scheduler
DAG Directory에 존재하는 Python 파일을 파싱하여, DAG를 추출해 Task Graph를 그리고 정해진 시간에 DAG를 실행을 위한 일련의 절차를 진행합니다. - Executor
Executor는 Worker에게 Task를 전달하는 전달자 역할을 하며, 어떤 종류의 Executor를 사용하느냐에 따라 Task 전달하는 방법이 순차 혹은 병렬로 결정이 됩니다. - Workers
Workers는 실제 Task를 수행하는 주체로써 동시에 많은 Task를 실행할 수 있게 다수가 존재합니다. - Metadata Database
Scheduler가 파싱한 DAG와 Task 인스턴스 정보, Worker가 실행하면서 발생한 이력들이 airflow_db 라는 이름으로 존재합니다.
Airflow는 User Interface와 Worker를 제외하면 각 컴포넌트는 기본적으로 단일 구성입니다. Worker 컴포넌트만 동시성을 위해 분산 구조로 만들어져 있으며 그 외는 HA 구성이 되어있지 않습니다. 따라서 하나의 컴포넌트라도 장애가 발생하면 Airflow는 정상 작동하지 않습니다.
아래는 Airflow의 Task 인스턴스의 Lifecycle 입니다.
Airflow를 사용하는 데이터 엔지니어라면 Task 인스턴스 위 그림은 반드시 숙지해야할 항목입니다. 운영 환경에서 장애가 발생 했거나 성능 향상을 고려할 경우 어느 컴포넌트에 집중해야할지 빠르게 파악할 필요가 있기 때문입니다.
아래 그림은 Task 인스턴스 Lifecycle 구간 마다 사용되는 Airflow 환경 변수 입니다.
지금까지 알아본 Airflow 컴포넌트와 환경 변수들에 대한 이해가 충분하더라도 Airflow를 운영하기에는 아직 부족함이 있습니다.
하드웨어 이슈로 인해 Airflow의 마이그레이션을 해야하거나, Airflow 버전 업그레이드를 해야하는 경우, 단일 구성 컴포넌트가 장애가 발생하는 경우에는 인프라 담당자, 시스템 엔지니어, 데이터 엔지니어가 모두 하나의 마음으로 고난과 역경을 이겨내야 합니다.
3. Cloud Composer 아키텍처
데이터 엔지니어가 파이프라인에 집중하기 위해서는 인프라 업무와 시스템 업무에서 분리되는 것이 가장 좋습니다. 이런 환경을 제공해주는 것이 Google Cloud의 Cloud Composer 입니다.
아래는 Cloud Composer 아키텍처를 요약한 그림입니다.
실제 업무 환경에 따라 Cloud Composer 환경은 공개 IP, 비공개 IP, 도메인 제한 공유(DRS)의 세 가지 구성으로 만들 수 있습니다. 각 구성은 프로젝트 리소스의 아키텍처를 약간 다르며 이 문서에서는 도메인 제한 공유(DSR) 구성은 설명하지 않습니다.
아래는 Cloud Composer 공개 IP 아키텍처입니다.
공개 IP 구성에서 환경 리소스는 고객과 테넌트 프로젝트 간에 배포됩니다. 테넌트 프로젝트는 Airflow 데이터베이스를 실행하기 위한 Cloud SQL 인스턴스와 Google App Engine Flex VM을 호스팅하여 Airflow 웹 서버를 실행합니다.
아래 그림과 같이 Airflow 스케줄러, 작업자, 웹 서버는 Cloud SQL 프록시 프로세스를 사용하여 Airflow 데이터베이스에 연결합니다.
아래는 Cloud Composer 비공개 IP 아키텍처입니다.
비공개 IP 구성에서 Cloud Composer 리소스는 고객과 테넌트 프로젝트 간에 배포됩니다. 테넌트 프로젝트는 Airflow 데이터베이스를 실행하기 위한 Cloud SQL 인스턴스와 Google App Engine Flex VM을 호스팅하여 Airflow 웹 서버와 Cloud SQL 프록시 프로세스를 실행합니다.
Airflow 스케줄러와 작업자는 GKE 클러스터에서 실행되는 HAProxy 프로세스를 사용하여 테넌트 프로젝트에서 실행 중인 Cloud SQL 프록시 프로세스에 연결됩니다. HAProxy 프로세스는 아래 그림과 같이 테넌트 프로젝트에서 실행 중인 두 Cloud SQL 프록시 간의 프록시 인스턴스에 트래픽을 부하 분산합니다.
Airflow를 직접 운영하는 것보다 Cloud Composer를 운영하는 것이 운영 부담이 적은 건 사실이지만 테넌트 프로젝트가의 리스크에 대해서도 간과해서는 안됩니다. 테넌트 프로젝트에서 관리되는 리소스는 사용자가 인지하지 못하는 시점에 maintenance 작업이 발생할 수 있습니다. 특히 App Engine이 업데이트 되는 상황에는 이미 실행 중인 Task에서 Mysql Connection Lost Error가 발생할 수 있습니다. 이런 상황을 수동으로 처리하지 않고 자동으로 회피하기 위해서는 DAG의 retries argument 값을 반드시 설정해줘야 합니다.
4. 요약
실제 운영 업무에서 가장 많은 문제를 일으키는 컴포넌트는 Scheduler 입니다. 파이프라인의 개수가 적거나 가벼운 ETL 작업만 수행하는 경우에는 절대로 문제가 발생하지 않지만, 데이터가 커짐에 따라 동시 작업의 수가 증가하는 경우 성능의 한계는 반드시 직면하게 되어 있습니다. 데이터 엔지니어는 항상 최적화 된 파이프라인 구축을 염두해 두어야 하지만, 파이프라인 외적인 부분에서 빠르게 문제의 원인을 파악하기 위해서라도 Airflow와 Cloud Composer의 아키텍처는 숙지해 두는 것이 좋습니다.
출처 및 참고사이트
- https://hazelcast.com/glossary/data-pipeline
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#task-instances
- https://cloud.google.com/blog/products/data-analytics/scale-your-composer-environment-together-your-business
- https://cloud.google.com/architecture/automating-infrastructure-using-cloud-composer#defining_the_architecture
- https://cloud.google.com/composer/docs/concepts/architecture#public_ip
- https://cloud.google.com/composer/docs/concepts/architecture#private_ip