220808 Apache Airflow

Apache Airflow


Configuration 살펴보기

더 많은 Airflow Worker가 필요하다면, 추가 machine에서 celery worker를 명령한다.

Flower

Airflow workers를 dashboard를 통해서 모니터링하기 위한 툴이다. Celery Executor를 사용하면, Flower에 접속해서 Celery Executor의 Administrator와 Airflow worker를 대시보드를 통해 모니터링 할 수 있다.

localhost:5555/dashboard

아래의 Worker dashboard를 살펴보면, Max concurrency 값이 16인 것으로 보아, 최대 16개의 Task를 동시에 실행하는 것이 가능하다. 이는 사용하는 PC의 리소스에 따라 줄일 수도 늘릴 수도 있다.

다음으로 Queues 메뉴는 유용하게 사용될 수 있는데, 특정 Task를 특정 worker로 라우팅되도록 할 수도 있다.
예를들어 높은 리소스를 소비하는 Task가 존재하고, 현재 하나의 높은 리소스 Worker를 가진다고 가정하면, queue를 생성해서 queue를 Worker에 붙이고, 높은 리소스 소비 Task를 생성한 queue로 보내서 해당 Worker만 해당 작업을 실행할 수 있도록 queue를 지정해서 사용할 수 있다.

Read more

220807 Apache Airflow

Apache Airflow


Backfilling

DAG를 처음 실행하게 되면, scheduler는 자동으로 non-triggered DagRuns을 시작 날짜(start_date)와 현재(now) 사이 시점에서 실행하게 된다.
catch up mechanism은 자동으로 non-triggered DagRun을 마지막으로 실행된 날짜와 현재 시간 사이에서 실행할 수 있도록 허용한다.

예를들어, 만약에 DAG를 2일동안 중지시키고나서 DAG를 다시 시작했다면, 이 기간 동안 트리거되지 않은 DAG 실행에 해당하는 일부 DAG 실행이 발생합니다.
Backfilling mechanism은 historical DagRuns를 실행하도록 하는데, 예를들어 start date 이전의 기간에 DagRun을 실행할 수 있다.
방법은 Airflow DAG Backfill 명령을 실행하는 명령을 사용하면 된다.

(예를들어 01/03(start_date)부터 01/07(now)까지 DAG RUN을 실행했고, start_date 이전인 01/01부터 01/02 기간동안 DAG RUN을 실행하고자 한다면, Backfilling mechanism을 위한 명령을 사용하면 된다)

1
with DAG('my_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:

이렇게 catchup=False로 설정값을 바꿔주면, non-triggered DAG RUN이 실행되게 된다. 이 mechanism은 과거에 non-triggered DAG RUN을 자동으로 재실행할때 사용된다.

Executor

이전 포스팅에서도 다뤘던 내용이지만, Executor는 이름 자체는 Task를 실행할 것 같지만, Task를 실행하지 않는다. 단지 tasks를 시스템에서 어떻게 실행할 것인가에 대해 정의한다.

Executor에는 다양한 종류가 있는데, local executors와 remote executors가 있다. local executor는 여러 개의 task를 single machine에서 실행을 하고, sequential executor는 single machine에서 한 번에 하나의 task를 실행할때 사용된다.

remote executor에는 Celery executor가 있는데, tasks를 multiple machine, 그리고 salary cluster에서 실행한다. K8s executor는 multiple machine에서 K8s cluster의 multiple pods에서 multiple tasks를 실행한다.

Executor의 변경은 Airflow의 환경설정 파일에서 executor parameter를 변경함으로써 적용할 수 있다. (사용되는 executor에 따라 변경해야 되는 별도의 환경설정 요소가 있다)

Read more

220805 Apache Airflow

Apache Airflow


Apache Airflow UI 구성

Apache Airflow UI에서 DAGs 리스트를 보면, 현재 Apache Airflow에 포함되어있는 DAG의 목록이 출력된다. 리스트 중 하나의 아이템을 살펴보면, DAG의 이름의 좌측에 해당 DAG를 Pause/Unpause 할 수 있는 토글 버튼이 있고, 이름의 아래에는 data pipeline이나 팀 또는 기능별로 묶어서 관리할 수 있도록 태그가 표시되어있다. (단, tag에 따라 permission을 부여할 수 없다)

그 외에 Owner에 대한 정보와 Runs 칼럼에서는 queued, success, running, failed 상태별로 현재 DAG이 실행 상태를 확인할 수 있다. Last Run 및 Next Run 항목을 통해서는 DAG가 언제 마지막으로 실행이 되었고, 그 다음 실행은 언제 되는지에 대해 확인할 수 있다.

Recent Tasks에서는 총 15개의 상태 정보로 나뉘어 활성화된 DAG의 Task들의 실행 상태에 대해서 확인을 할 수 있다. (none, removed, scheduled, queued, running, success, shutdown, ...)

Actions에서는 강제로 Trigger 시키거나 DAG를 삭제할 수 있는데, DAG를 삭제한다는 의미가 DAG 자체를 삭제하는 것이 아닌, Meta store에서 DAG RUN Instance를 삭제한다는 것을 의미한다.

Apache Airflow DAG item 상세보기

DAG의 이름을 클릭하면, Grid를 통해 실행한 DAG들의 상태 정보에 대해서 모니터링 할 수 있으며, Graph View를 통해서는 DAG의 각 Tasks가 어떤 Tasks를 dependencies로 가지고 있는지에 대해서 구조적으로 확인을 할 수 있다.

Landing Times에서는 DAG에서의 모든 작업들이 scheduled 상태에서 completion으로 완료되는데 얼마나 걸렸는지에 대한 정보를 시각화된 그래프로 확인할 수 있다. 만약 시간이 오래걸린다면, 걸리는 시간을 줄일 수 있도록 별도의 대응이 필요하다.

Calendar view에서는 각 각의 네모칸이 특정 DAG의 상태 다이어그램 정보를 종합해서 보여준다. 특정 DAG가 특정 날짜에 문제가 생긴다면, 빨간색으로 칸이 표시되며, 성공적으로 DAG가 시행이 되었다면 해당 일자에 초록색으로 포시된다. 점으로 표시된 칸은 얼마나 많은 다이어그램이 해당 날에 계획되어있는지에 대한 정보를 제공한다. 따라서 이러한 모니터링 정보를 기반으로 어떤 날에 데이터 파이프라인을 수정해서 문제를 해결해야되는지 알 수 있다.

Gantt view에서는 DAG의 특정 Task가 완료되는데에 얼마나 시간이 걸렸는지와 pipeline에서 bottleneck이 발생했는지에 대한 전반적인 overview를 제공한다. 상대적으로 긴 직사각형은 그만큼 Task를 실행하는데 시간이 걸렸다는 의미이고, 오래걸린 작업에 대해서는 어떻게 하면 작업이 완료되는데 걸리는 시간을 단축시킬 수 있는지에 대한 고민이 필요하다.

직사각형이 overlapping되었다는 것은 복수 개의 Task를 동시에 실행할 수 있다는 것을 의미한다. (DAG parallelism)

Code view에서는 데이터 파이프라인의 코드에 접근할 수 있는데, 이를 통해서 적용한 수정사항이 DAG에 제대로 적용이 되었는지 확인을 할 수 있다.

Read more

220804 Apache Airflow

Apache Airflow


이번 포스팅에서는 Apache Airflow의 기본 개념과 사용에 대해서 정리해보려고 한다.
이전에 AWS의 EventBridge라는 서비스를 사용해서 셍성한 Lambda 함수를 일정 주기의 시간동안 정기적으로 실행되도록 스케줄링해서 실습한 적이 있는데, Task를 좀 더 복잡한 구조로 스케줄링하기 위해서는 Apache Airflow를 활용하는 것이 좀 더 효율적이라고 오프라인 수업에서 배워서, 한 번 Apache Airflow를 사용해서 데이터 파이프라인을 구축해보고자 학습을 하게 되었다. 그리고 지원하고자 하는 기업에서 Apache Airflow를 업무에서 도입을 하여 사용하고 있기 때문에 좀 더 잘 이해하고 사용해보려고 한다.

Apache Airflow를 사용하는 이유

데이터 파이프라인 구축에 있어, 데이터를 추출하는 Extract, 데이터를 적재하는 Load, 데이터를 변환하는 Transform 과정을 거친다. 데이터 추출은 API를 통해서 하기도 하며, Load는 Snowflake와 같은 data warehousing, data lake와 같은 Single platform을 제공하는 SaaS를 활용하기도 한다. 그리고 Transform은 Dbt와 같은 분석을 위한 데이터 웨어하우스에 적재된 데이터를 간단한 SELECT 문 작성을 통해 변환을 할 수도 있다.

만약 Extract, Load, Transform 단계에서 예기치 못한 에러가 발생한다면, 그리고 데이터 파이프라인이 한 개가 아닌 100개 이상이라면 어떨까?

관리하기가 많이 어려워진다. 이러한 이유로 인해 Airflow를 사용해서 종합적인 파이프라인의 관리가 필요하다.

DAG(Directed Acyclic Graph)

DAG는 방향성 비순환 그래프로, Airfow의 주요 컨셉이며, 복수 개의 Task를 모아서 어떻게 실행이 되어야 하는지에 대한 종속성과 관계에 따라 구조화 시키는 것을 말한다.

Operator

Operator는 Task이며, 실행이 되면 Task instance가 생성이 되며, Operator의 예로는 Action operator, Transfer operator, Sensor operator가 있다.

Action operator의 예로는 Python operator, Bash operator가 있는데, Python operator는 Python function을 실행시키며, Bash operator는 Bash command를 실행시키는 역할을 한다. 그리고 Transfer operator의 예로는 MySQL의 데이터를 RedShift로 이전하는 작업이 있으며, Sensor operator의 예로는 특정 이벤트가 발생하면 다음 Step으로 넘어가도록 하는 작업이 있다.

Apache Airflow의 요소

Apache Airflow에는 Webserver, Meta store, Scheduler, Executor, Queue, Worker가 있다.

Executor는 직접적으로 Task를 실행하지 않으며, K8S 클러스터는 K8S Executor를 사용하고, Celery 클러스터는 Celery Executor를 사용한다. 여기서 Celery는 multiple machine에서의 multiple tasks를 실행하기 위한 Python 프레임워크이다.

Queue는 주어진 Task를 보장된 순서로 실행시키기 위해 존재한다.

Worker는 Task가 효과적으로 실행될 수 있도록 하는 역할을 하며, Worker가 없다면, sub processes 혹은 K8S를 사용하는 경우, Path가 주어진다.

Apache Airflow의 Architecture

Apache Airflow의 Architecture로는 단일 노드로 구성된 One Node Architecture, 그리고 복수 개의 노드들로 구성된 Multi Node Architecture가 있다.

Read more

220728 Burrow - Kafka consumer Lag 모니터링

Burrow


이번 포스팅에서는 Kafka의 Consumer Lag를 모니터링할 때 필수적으로 사용되는 Burrow에 대해서 정리해보려고 한다.

이번 포트폴리오에 추가 할 사이드 프로젝트로 Kafka를 활용해서 Kafka cluster를 구성하고, Kafka-client 라이브러리를 활용하여 Python으로 Producer 및 Consumer를 구성하였다. Consumer에서는 ELK 스택의 docker 컨테이너를 연결하여, Producer로부터 유입된 로그 데이터를 Logstash를 통해 Elasticsearch에 저자을 하고, 저장된 로그 데이터를 Kibana를 통해서 시각화하였다.

Kafka consumer는 kafka-client 라이브러리를 활용해서 Java, Python등의 언어로 개발을 할 수 있는데, 생성한 KafkaConsumer 객체를 통해 현재 lag 정보를 가져올 수 있다.

만약 lag을 실시간으로 모니터링하고 싶은 경우에는 Elasticsearch나 influxdb와 같은 곳으로 Consumer lag metric 정보를 보낸 뒤에 Grafana 대시모드를 통해서 실시간으로 시각화하여 확인 할 수 있다.

하지만 Consumer 단위에서 lag을 모니터링하는 것은 아주 위험하고, 운영요소가 많이 들어간다는 문제가 있다. 그 이유는 Consumer logic 단에서 lag을 수집하는 것은 Consumer 상태에 dependency가 걸리기 때문이다.
만약에 Consumer가 비정상적으로 종료되게 된다면, 더 이상 Consumer는 lag 정보를 보낼 수 없기 때문에 더 이상 lag을 측정할 수 없는 문제가 발생한다.

그리고 차후에 추가적으로 consumer가 개발될 때 마다 해당 consumer에 lag 정보를 특정 저장소에 저장할 수 있도록 로직을 개발해야되기 때문에 공수가 많이 든다.

만약에 consumer lag을 수집할 수 없는 consumer라면, lag을 모니터링 할 수 없기 때문에 까다로워진다.

이러한 이유로 인해 linkedIn에서는 Apache kafka와 함께 Kafka consumer lag을 효과적으로 모니터링 할 수 있도록 Burrow를 개발하였다.

Burrow는 오픈 소스 프로젝트로, Go 언어로 개발이 되었으며, 현재 깃허브에 올라가 있다. 이 Burrow는 Kafka와는 독립적인 consumer의 lag을 모니터링하기 위한 애플리케이션이다.

Burrow의 특징

Multi-Kafka cluster를 지원한다.

Kafka를 사용하는 기업에서는 보통 2개 이상의 Kafka cluster를 구성해서 사용하는데, 여러 개의 Kafka 클러스터를 구성하더라도 한 개의 Burrow만으로 모든 카프카 클러스터에 붙은 consumer의 lag을 모두 모니터링 할 수 있다.

Read more

220703 데이터 파이프라인 구축 오프라인 수업 / 6주차

Review


이번 포스팅에서는 여섯 번째 데이터 파이프라인 구축 오프라인 수업시간에서 배운 내용을 정리하려고 한다.

이번 여섯 번째 수업을 마지막으로 데이터 파이프라인 구축과 관련한 데이터 엔지니어링 수업이 마무리되었다. 이번 수업을 통해 정말 많은 것들을 배울 수 있었다. 특히 이전에는 클라우드 플랫폼을 활용해서 데이터 파이프라인을 구축하는 것이 전부라고 생각했었지만, 이번 수업을 듣고나서 관리 및 운영의 관점에서 클라우드 플랫폼에서 제공하는 서비스들의 근간이 되는 오픈 소스 프로젝트의 세부 동작원리에 대해서 이해하는 것이 더 중요하다는 것을 배웠다. 그리고 클라우드 플랫폼을 활용해서 데이터 파이프라인을 처음 구축했을때 그 다음 스탭으로 어떤 식으로 공부를 이어나갈지 감을 잡지 못했었는데, 이번 총 6번의 수업동안 (6주간 진행)앞으로 어떻게 더 공부를 해야되는지, 그리고 새로운 기술스택이 나왔을때 어떤식으로 학습을 이어나가야 되는지에 대해서 알게 되었다.
이번 마지막 수업을 마무리하며 강사님이 어느 데이터 파이프라인 구축에 있어, 어느 파이프라인 구성이 정답이고 그런 건 없다고 하셨다. 그리고 수업을 들으면서 느낀 것은 정말 하나의 파이프라인을 구성할때에도 많은 것들을 고려해야하며, 파이프라인의 각 구성 요소들의 특징들을 제대로 이해하고 있어야 비로소 효율적인 파이프라인을 구축할 수 있다는 것을 배웠다. 아무튼 이번 수업을 통해 좀 더 데이터 엔지니어의 업무 중 하나인 데이터 파이프라인 구축에 대해서 좀 더 심도있게 배울 수 있었던 것 같다.

매주 한 번 강남역에 가서 세 시간씩 하루도 빠지지 않고 수업에 참여하고, 배운 내용을 블로그에 하나도 빠뜨리지 않고 기록하였다. 이런 나에게 칭찬을 하며, 마지막 수업시간에 배운 내용을 정리해보려고 한다.

ElasticSearch

ES를 NoSQL DB와 같은 저장소로써 사용을 하면서 저장된 데이터를 검색하는 용도로 사용된다. 그리고 주로 모니터링이나 로깅과 같은 용도로 많이 사용된다. 메트릭 같은 것을 JSON에 같이 담아서 Kibana를 통해서 그래프로 그려주면, 프로메테우스와 같은 TS DB와 같은 성능은 내지 못하지만, 대시보드로 충분히 그려서 활용할 수 있다. 데이터가 적은 경우에는 앞에서 설명한 것과 같이 ElasticSearch에 저장된 데이터를 Kibana를 활용해서 시각화를 해줄 수 있지만, 이러한 메트릭 값들이 많아지면, TS(Time Series) 전용으로 담아두는 TS DB를 생성해서 관리한다.
이처럼 ES는 검색엔진이지만, 다양한 용도로 사용이 되고 있다.

Read more

220626 데이터 파이프라인 구축 오프라인 수업 / 5주차

Review


이번 포스팅에서는 다섯 번째 데이터 파이프라인 구축 오프라인 수업시간에서 배운 내용을 정리하려고 한다.

우선 실습에 앞서 간단하게 Flink에 대해서 정리를 해보자. Flink는 분산 데이터 처리 프레임워크 (Processing unbounded data)로, Kafka와 같은 MQ에 Flink를 붙여서 처리를 할 수 있다. 이외에도 MQ에 Kinesis Firehose를 붙이고, Lambda를 붙여서 custom한 형태의 데이터로 추출을 할 수도 있고, 데이터를 암호화하거나 특정 format(Parquet, ORC)으로 변환을 해서 추출을 할 수도 있다.
또 Logstash를 붙여서 데이터를 간단하게 처리해서 넘겨줄 수 있다. (whitelist / filter plugin을 통해 처리) 커스텀하게 데이터를 모아주거나 분석을 하는 경우, 예를들어 GPS 신호를 계속 보내서 사용자들이 이 데이터를 1분동안 aggregation해서 어느 지역에 사람이 많은지 분석하는 작업은 logstash로 분석을 하는 것이 불가능하다. 그리고 각 각의 logstash로 나눠서 분산처리를 하는 것은 가능하지만, 각 각의 logstash가 서로 데이터를 공유하지는 못하기 때문에(클러스터 내의 노드로써 존재하지 않기 때문에) 복잡한 스트림 처리나 프로세싱, 분석이 필요할때 Flink를 사용한다.

그리고 Flink, Storm을 개발할때에는 DAG를 많이 사용한다. Kafka에서 받아온 실시간 데이터를 키별로 분류를 하는 작업에서도 사용이 될 수 있는데, 게임 데이터를 분석한다고 가정했을때 각 캐릭터의 직업에 따라 분류를 해서 각 직업별로 행동 패턴을 분석하고자 할때 각 각의 Dataflow를 머릿속으로 구조화시킨 다음에 붙여주는 작업을 해야한다.
source operator(Kafka) -> keyBy operator -> aggregation operator -> HDFS sink Flink와 같은 데이터 플로우 프로그래밍이 필요한 어플리케이션은 operator와 같은 요소를 하나 하나 개발을 한 다음에 연결을 해주는 방식으로 개발을 하게 된다.

DB ETL작업을 할때 DB에서 주기적으로 데이터를 select해서 Flink내부에서 처리를 하고자 할 때에도 source operator를 사용한다. (Flink 외부에서 데이터를 긁어오는 부분을 source operator)

중간에서 데이터를 변환해주는 부분을 Transformation Operator라고 한다. 그리고 처리 결과를 밖으로 빼내는 부분을 Sink Operator라고 한다.

  • 실제 flink로 어플리케이션을 개발하게 되면, 수십개의 TM가 생성이 된다.
    개발을 할때 각 각의 operator의 갯수를 parallism을 통해 정의할 수 있다.
    Kafka(3) -> Map(10) -> Sink(3) => 16개의 operator가 TM의 각 각의 노드에 분산이 되어 처리된다. 이와같은 특징으로, Fluentd와 logstash와 같은 agent 기반의 데이터 파이프라인과 비교되어 사용된다.

Read more

220619 데이터 파이프라인 구축 오프라인 수업 / 4주차

Review


이번 포스팅에서는 네 번째 데이터 파이프라인 구축 오프라인 수업시간에서 배운 내용을 정리하려고 한다.

Kafka 실습환경 구축 및 Single consumer 실습

Kafka 실습환경 구축 및 Single consumer 실습

이전에 Kafka를 실습했을때는 생성한 EC2 인스턴스에 apache kafka 압축파일을 다운받고, 압축을 풀고, zookeeper와 kafka server를 시작하고, Topic을 partitions와 replication-factor 옵션의 값을 1로 생성하고 bootstrap-server 옵션은 localhost의 9092번 포트로해서 설정하였다. (kafka server: 9092, zookeeper: 2181) 이때 실습을 했을때는 세부 옵션에서 partitions이 뭔지 replication-factor가 뭔지 구체적으로 알지 못했었는데, 이제는 ISR 그룹이 뭔지 partition과 replication-factor이 뭔지 이해를 한 상태이기 때문에 좀 더 체계적으로 실습을 할 수 있는 것 같다.

그리고 이번 실습에서는 zookeeper와 kafka server를 하나의 터미널에서 명령어로 일일이 실행시키지 않고, zookeeper와 kafka host server, producer, consumer를 각각의 container로 구동을 시키기 때문에 구조적으로 좀 더 확장성 있는 것 같다. (docker image를 이용해서 컨테이너 서비스로 구동시키기 때문에 좀 더 빠르게 환경 구성을 할 수 있는 것 같다. 이래서 컨테이너로 서버를 띄우고 관리하는 것 같다.)

Read more

220618 데이터 파이프라인 구축 오프라인 수업 / 4주차 오프라인 수업 전 준비

Preparation


이번 포스팅에서는 내일 있을 데이터 파이프라인 구축 관련 수업에서 실습할 내용들에 대해 미리 실습을 해보고, 개념적인 부분을 좀 다져보려고 한다.
내일 수업에서는 Apache Kafka를 실습하는데, Kafka cluster의 각 Broker와 Zookeeper, Producer와 Consumer를 전부 Docker container로 띄우고 관리한다. 이미 Docker에 대해 공부했고, Kubernetes에 대해서 추가적으로 공부를 하는 중이라 이번 수업을 통해 Kafka의 각 구성요소들을 Docker container에 띄워서 관리하면 많은 도움이 될 것 같다.
그리고 여지까지 했던 실습들과 비교했을때 상대적으로 구조가 복잡하기 때문에 미리 Kafka의 개념적인 내용을 머릿속에 그리면서 실습하게 될 전체적인 구조에 대해서 파악하고 가야 할 필요성을 느꼈다.

Read more

220612 데이터 파이프라인 구축 오프라인 수업 / 3주차

Review


이번 포스팅에서는 세 번째 데이터 파이프라인 구축 오프라인 수업시간에서 배운 내용을 정리하려고 한다. 참고로 첫 번째와 두 번째 수업때도 너무 유익한 내용들이 많았는데, 이번 시간이 정말 너무 유익하고 좋았다.
아마도 이전에 인터넷 강의로 수강을 했을 때 아쉬웠던 부분이 많았는데, 이번에 개별적으로 현직자 분께 오프라인으로 직접 수업을 들으니, 궁금했던 부분이 많이 해소되기도 했고, 강사님이 수업에 필요한 여러 자료나 실제 회사에서 업무했을 때 필요한 부분에 대해 설명을 잘 해주셔서 그런 것 같다.

Read more