220808 Apache Airflow

Apache Airflow


Configuration 살펴보기

더 많은 Airflow Worker가 필요하다면, 추가 machine에서 celery worker를 명령한다.

Flower

Airflow workers를 dashboard를 통해서 모니터링하기 위한 툴이다. Celery Executor를 사용하면, Flower에 접속해서 Celery Executor의 Administrator와 Airflow worker를 대시보드를 통해 모니터링 할 수 있다.

localhost:5555/dashboard

아래의 Worker dashboard를 살펴보면, Max concurrency 값이 16인 것으로 보아, 최대 16개의 Task를 동시에 실행하는 것이 가능하다. 이는 사용하는 PC의 리소스에 따라 줄일 수도 늘릴 수도 있다.

다음으로 Queues 메뉴는 유용하게 사용될 수 있는데, 특정 Task를 특정 worker로 라우팅되도록 할 수도 있다.
예를들어 높은 리소스를 소비하는 Task가 존재하고, 현재 하나의 높은 리소스 Worker를 가진다고 가정하면, queue를 생성해서 queue를 Worker에 붙이고, 높은 리소스 소비 Task를 생성한 queue로 보내서 해당 Worker만 해당 작업을 실행할 수 있도록 queue를 지정해서 사용할 수 있다.

Read more

220807 Apache Airflow

Apache Airflow


Backfilling

DAG를 처음 실행하게 되면, scheduler는 자동으로 non-triggered DagRuns을 시작 날짜(start_date)와 현재(now) 사이 시점에서 실행하게 된다.
catch up mechanism은 자동으로 non-triggered DagRun을 마지막으로 실행된 날짜와 현재 시간 사이에서 실행할 수 있도록 허용한다.

예를들어, 만약에 DAG를 2일동안 중지시키고나서 DAG를 다시 시작했다면, 이 기간 동안 트리거되지 않은 DAG 실행에 해당하는 일부 DAG 실행이 발생합니다.
Backfilling mechanism은 historical DagRuns를 실행하도록 하는데, 예를들어 start date 이전의 기간에 DagRun을 실행할 수 있다.
방법은 Airflow DAG Backfill 명령을 실행하는 명령을 사용하면 된다.

(예를들어 01/03(start_date)부터 01/07(now)까지 DAG RUN을 실행했고, start_date 이전인 01/01부터 01/02 기간동안 DAG RUN을 실행하고자 한다면, Backfilling mechanism을 위한 명령을 사용하면 된다)

1
with DAG('my_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:

이렇게 catchup=False로 설정값을 바꿔주면, non-triggered DAG RUN이 실행되게 된다. 이 mechanism은 과거에 non-triggered DAG RUN을 자동으로 재실행할때 사용된다.

Executor

이전 포스팅에서도 다뤘던 내용이지만, Executor는 이름 자체는 Task를 실행할 것 같지만, Task를 실행하지 않는다. 단지 tasks를 시스템에서 어떻게 실행할 것인가에 대해 정의한다.

Executor에는 다양한 종류가 있는데, local executors와 remote executors가 있다. local executor는 여러 개의 task를 single machine에서 실행을 하고, sequential executor는 single machine에서 한 번에 하나의 task를 실행할때 사용된다.

remote executor에는 Celery executor가 있는데, tasks를 multiple machine, 그리고 salary cluster에서 실행한다. K8s executor는 multiple machine에서 K8s cluster의 multiple pods에서 multiple tasks를 실행한다.

Executor의 변경은 Airflow의 환경설정 파일에서 executor parameter를 변경함으로써 적용할 수 있다. (사용되는 executor에 따라 변경해야 되는 별도의 환경설정 요소가 있다)

Read more

220805 Apache Airflow

Apache Airflow


Apache Airflow UI 구성

Apache Airflow UI에서 DAGs 리스트를 보면, 현재 Apache Airflow에 포함되어있는 DAG의 목록이 출력된다. 리스트 중 하나의 아이템을 살펴보면, DAG의 이름의 좌측에 해당 DAG를 Pause/Unpause 할 수 있는 토글 버튼이 있고, 이름의 아래에는 data pipeline이나 팀 또는 기능별로 묶어서 관리할 수 있도록 태그가 표시되어있다. (단, tag에 따라 permission을 부여할 수 없다)

그 외에 Owner에 대한 정보와 Runs 칼럼에서는 queued, success, running, failed 상태별로 현재 DAG이 실행 상태를 확인할 수 있다. Last Run 및 Next Run 항목을 통해서는 DAG가 언제 마지막으로 실행이 되었고, 그 다음 실행은 언제 되는지에 대해 확인할 수 있다.

Recent Tasks에서는 총 15개의 상태 정보로 나뉘어 활성화된 DAG의 Task들의 실행 상태에 대해서 확인을 할 수 있다. (none, removed, scheduled, queued, running, success, shutdown, ...)

Actions에서는 강제로 Trigger 시키거나 DAG를 삭제할 수 있는데, DAG를 삭제한다는 의미가 DAG 자체를 삭제하는 것이 아닌, Meta store에서 DAG RUN Instance를 삭제한다는 것을 의미한다.

Apache Airflow DAG item 상세보기

DAG의 이름을 클릭하면, Grid를 통해 실행한 DAG들의 상태 정보에 대해서 모니터링 할 수 있으며, Graph View를 통해서는 DAG의 각 Tasks가 어떤 Tasks를 dependencies로 가지고 있는지에 대해서 구조적으로 확인을 할 수 있다.

Landing Times에서는 DAG에서의 모든 작업들이 scheduled 상태에서 completion으로 완료되는데 얼마나 걸렸는지에 대한 정보를 시각화된 그래프로 확인할 수 있다. 만약 시간이 오래걸린다면, 걸리는 시간을 줄일 수 있도록 별도의 대응이 필요하다.

Calendar view에서는 각 각의 네모칸이 특정 DAG의 상태 다이어그램 정보를 종합해서 보여준다. 특정 DAG가 특정 날짜에 문제가 생긴다면, 빨간색으로 칸이 표시되며, 성공적으로 DAG가 시행이 되었다면 해당 일자에 초록색으로 포시된다. 점으로 표시된 칸은 얼마나 많은 다이어그램이 해당 날에 계획되어있는지에 대한 정보를 제공한다. 따라서 이러한 모니터링 정보를 기반으로 어떤 날에 데이터 파이프라인을 수정해서 문제를 해결해야되는지 알 수 있다.

Gantt view에서는 DAG의 특정 Task가 완료되는데에 얼마나 시간이 걸렸는지와 pipeline에서 bottleneck이 발생했는지에 대한 전반적인 overview를 제공한다. 상대적으로 긴 직사각형은 그만큼 Task를 실행하는데 시간이 걸렸다는 의미이고, 오래걸린 작업에 대해서는 어떻게 하면 작업이 완료되는데 걸리는 시간을 단축시킬 수 있는지에 대한 고민이 필요하다.

직사각형이 overlapping되었다는 것은 복수 개의 Task를 동시에 실행할 수 있다는 것을 의미한다. (DAG parallelism)

Code view에서는 데이터 파이프라인의 코드에 접근할 수 있는데, 이를 통해서 적용한 수정사항이 DAG에 제대로 적용이 되었는지 확인을 할 수 있다.

Read more

220804 Apache Airflow

Apache Airflow


이번 포스팅에서는 Apache Airflow의 기본 개념과 사용에 대해서 정리해보려고 한다.
이전에 AWS의 EventBridge라는 서비스를 사용해서 셍성한 Lambda 함수를 일정 주기의 시간동안 정기적으로 실행되도록 스케줄링해서 실습한 적이 있는데, Task를 좀 더 복잡한 구조로 스케줄링하기 위해서는 Apache Airflow를 활용하는 것이 좀 더 효율적이라고 오프라인 수업에서 배워서, 한 번 Apache Airflow를 사용해서 데이터 파이프라인을 구축해보고자 학습을 하게 되었다. 그리고 지원하고자 하는 기업에서 Apache Airflow를 업무에서 도입을 하여 사용하고 있기 때문에 좀 더 잘 이해하고 사용해보려고 한다.

Apache Airflow를 사용하는 이유

데이터 파이프라인 구축에 있어, 데이터를 추출하는 Extract, 데이터를 적재하는 Load, 데이터를 변환하는 Transform 과정을 거친다. 데이터 추출은 API를 통해서 하기도 하며, Load는 Snowflake와 같은 data warehousing, data lake와 같은 Single platform을 제공하는 SaaS를 활용하기도 한다. 그리고 Transform은 Dbt와 같은 분석을 위한 데이터 웨어하우스에 적재된 데이터를 간단한 SELECT 문 작성을 통해 변환을 할 수도 있다.

만약 Extract, Load, Transform 단계에서 예기치 못한 에러가 발생한다면, 그리고 데이터 파이프라인이 한 개가 아닌 100개 이상이라면 어떨까?

관리하기가 많이 어려워진다. 이러한 이유로 인해 Airflow를 사용해서 종합적인 파이프라인의 관리가 필요하다.

DAG(Directed Acyclic Graph)

DAG는 방향성 비순환 그래프로, Airfow의 주요 컨셉이며, 복수 개의 Task를 모아서 어떻게 실행이 되어야 하는지에 대한 종속성과 관계에 따라 구조화 시키는 것을 말한다.

Operator

Operator는 Task이며, 실행이 되면 Task instance가 생성이 되며, Operator의 예로는 Action operator, Transfer operator, Sensor operator가 있다.

Action operator의 예로는 Python operator, Bash operator가 있는데, Python operator는 Python function을 실행시키며, Bash operator는 Bash command를 실행시키는 역할을 한다. 그리고 Transfer operator의 예로는 MySQL의 데이터를 RedShift로 이전하는 작업이 있으며, Sensor operator의 예로는 특정 이벤트가 발생하면 다음 Step으로 넘어가도록 하는 작업이 있다.

Apache Airflow의 요소

Apache Airflow에는 Webserver, Meta store, Scheduler, Executor, Queue, Worker가 있다.

Executor는 직접적으로 Task를 실행하지 않으며, K8S 클러스터는 K8S Executor를 사용하고, Celery 클러스터는 Celery Executor를 사용한다. 여기서 Celery는 multiple machine에서의 multiple tasks를 실행하기 위한 Python 프레임워크이다.

Queue는 주어진 Task를 보장된 순서로 실행시키기 위해 존재한다.

Worker는 Task가 효과적으로 실행될 수 있도록 하는 역할을 하며, Worker가 없다면, sub processes 혹은 K8S를 사용하는 경우, Path가 주어진다.

Apache Airflow의 Architecture

Apache Airflow의 Architecture로는 단일 노드로 구성된 One Node Architecture, 그리고 복수 개의 노드들로 구성된 Multi Node Architecture가 있다.

Read more