220804 Apache Airflow

Apache Airflow


이번 포스팅에서는 Apache Airflow의 기본 개념과 사용에 대해서 정리해보려고 한다.
이전에 AWS의 EventBridge라는 서비스를 사용해서 셍성한 Lambda 함수를 일정 주기의 시간동안 정기적으로 실행되도록 스케줄링해서 실습한 적이 있는데, Task를 좀 더 복잡한 구조로 스케줄링하기 위해서는 Apache Airflow를 활용하는 것이 좀 더 효율적이라고 오프라인 수업에서 배워서, 한 번 Apache Airflow를 사용해서 데이터 파이프라인을 구축해보고자 학습을 하게 되었다. 그리고 지원하고자 하는 기업에서 Apache Airflow를 업무에서 도입을 하여 사용하고 있기 때문에 좀 더 잘 이해하고 사용해보려고 한다.

Apache Airflow를 사용하는 이유

데이터 파이프라인 구축에 있어, 데이터를 추출하는 Extract, 데이터를 적재하는 Load, 데이터를 변환하는 Transform 과정을 거친다. 데이터 추출은 API를 통해서 하기도 하며, Load는 Snowflake와 같은 data warehousing, data lake와 같은 Single platform을 제공하는 SaaS를 활용하기도 한다. 그리고 Transform은 Dbt와 같은 분석을 위한 데이터 웨어하우스에 적재된 데이터를 간단한 SELECT 문 작성을 통해 변환을 할 수도 있다.

만약 Extract, Load, Transform 단계에서 예기치 못한 에러가 발생한다면, 그리고 데이터 파이프라인이 한 개가 아닌 100개 이상이라면 어떨까?

관리하기가 많이 어려워진다. 이러한 이유로 인해 Airflow를 사용해서 종합적인 파이프라인의 관리가 필요하다.

DAG(Directed Acyclic Graph)

DAG는 방향성 비순환 그래프로, Airfow의 주요 컨셉이며, 복수 개의 Task를 모아서 어떻게 실행이 되어야 하는지에 대한 종속성과 관계에 따라 구조화 시키는 것을 말한다.

Operator

Operator는 Task이며, 실행이 되면 Task instance가 생성이 되며, Operator의 예로는 Action operator, Transfer operator, Sensor operator가 있다.

Action operator의 예로는 Python operator, Bash operator가 있는데, Python operator는 Python function을 실행시키며, Bash operator는 Bash command를 실행시키는 역할을 한다. 그리고 Transfer operator의 예로는 MySQL의 데이터를 RedShift로 이전하는 작업이 있으며, Sensor operator의 예로는 특정 이벤트가 발생하면 다음 Step으로 넘어가도록 하는 작업이 있다.

Apache Airflow의 요소

Apache Airflow에는 Webserver, Meta store, Scheduler, Executor, Queue, Worker가 있다.

Executor는 직접적으로 Task를 실행하지 않으며, K8S 클러스터는 K8S Executor를 사용하고, Celery 클러스터는 Celery Executor를 사용한다. 여기서 Celery는 multiple machine에서의 multiple tasks를 실행하기 위한 Python 프레임워크이다.

Queue는 주어진 Task를 보장된 순서로 실행시키기 위해 존재한다.

Worker는 Task가 효과적으로 실행될 수 있도록 하는 역할을 하며, Worker가 없다면, sub processes 혹은 K8S를 사용하는 경우, Path가 주어진다.

Apache Airflow의 Architecture

Apache Airflow의 Architecture로는 단일 노드로 구성된 One Node Architecture, 그리고 복수 개의 노드들로 구성된 Multi Node Architecture가 있다.

Read More

220728 Burrow - Kafka consumer Lag 모니터링

Burrow


이번 포스팅에서는 Kafka의 Consumer Lag를 모니터링할 때 필수적으로 사용되는 Burrow에 대해서 정리해보려고 한다.

이번 포트폴리오에 추가 할 사이드 프로젝트로 Kafka를 활용해서 Kafka cluster를 구성하고, Kafka-client 라이브러리를 활용하여 Python으로 Producer 및 Consumer를 구성하였다. Consumer에서는 ELK 스택의 docker 컨테이너를 연결하여, Producer로부터 유입된 로그 데이터를 Logstash를 통해 Elasticsearch에 저자을 하고, 저장된 로그 데이터를 Kibana를 통해서 시각화하였다.

Kafka consumer는 kafka-client 라이브러리를 활용해서 Java, Python등의 언어로 개발을 할 수 있는데, 생성한 KafkaConsumer 객체를 통해 현재 lag 정보를 가져올 수 있다.

만약 lag을 실시간으로 모니터링하고 싶은 경우에는 Elasticsearch나 influxdb와 같은 곳으로 Consumer lag metric 정보를 보낸 뒤에 Grafana 대시모드를 통해서 실시간으로 시각화하여 확인 할 수 있다.

하지만 Consumer 단위에서 lag을 모니터링하는 것은 아주 위험하고, 운영요소가 많이 들어간다는 문제가 있다. 그 이유는 Consumer logic 단에서 lag을 수집하는 것은 Consumer 상태에 dependency가 걸리기 때문이다.
만약에 Consumer가 비정상적으로 종료되게 된다면, 더 이상 Consumer는 lag 정보를 보낼 수 없기 때문에 더 이상 lag을 측정할 수 없는 문제가 발생한다.

그리고 차후에 추가적으로 consumer가 개발될 때 마다 해당 consumer에 lag 정보를 특정 저장소에 저장할 수 있도록 로직을 개발해야되기 때문에 공수가 많이 든다.

만약에 consumer lag을 수집할 수 없는 consumer라면, lag을 모니터링 할 수 없기 때문에 까다로워진다.

이러한 이유로 인해 linkedIn에서는 Apache kafka와 함께 Kafka consumer lag을 효과적으로 모니터링 할 수 있도록 Burrow를 개발하였다.

Burrow는 오픈 소스 프로젝트로, Go 언어로 개발이 되었으며, 현재 깃허브에 올라가 있다. 이 Burrow는 Kafka와는 독립적인 consumer의 lag을 모니터링하기 위한 애플리케이션이다.

Burrow의 특징

Multi-Kafka cluster를 지원한다.

Kafka를 사용하는 기업에서는 보통 2개 이상의 Kafka cluster를 구성해서 사용하는데, 여러 개의 Kafka 클러스터를 구성하더라도 한 개의 Burrow만으로 모든 카프카 클러스터에 붙은 consumer의 lag을 모두 모니터링 할 수 있다.

Read More

220721 SQL JOINS

SQL JOINS


이번 포스팅에서는 SQL의 JOIN에 대해서 종합적으로 정리를 해보려고 한다.

INNER JOIN

department에 속하는 employee name을 출력

1
2
3
4
5
-- INNER JOIN에서 INNER 생략 가능 
-- JOIN을 할때에는 ON 절에 작성하는 JOIN 조건의 column이름은 달라도 관계없다. Column의 값이 중요하다.
SELECT e.emp_name, d.dept_name
FROM employee e
JOIN department d ON e.dept_id = d.dept_id

OUTER JOIN

LEFT JOIN

모든 employee 이름과 department 이름 출력

1
2
3
4
-- LEFT JOIN = INNER JOIN + ANY additional records from the LEFT TABLE.
SELECT e.emp_name, d.dept_name
FROM employee e
LEFT JOIN department d ON e.dept_id = d.dept_id;

RIGHT JOIN

1
2
3
4
-- RIGHT JOIN = INNER JOIN + ANY additional records from the RIGHT TABLE.
SELECT e.emp_name, d.dept_name
FROM employee e
RIGHT JOIN department d ON e.dept_id = d.dept_id;

Read More

220721 ANSI SQL & NON ANSI SQL

ANSI JOIN AND NON-ANSI JOIN


이번 포스팅에서는 ANSI SQL과 NON ANSI SQL의 각기 다른 방식으로 JOIN 쿼리를 작성했을때의 차이점에 대해서 간단하게 포스팅하려고 한다.

ANSI & NON-ANSI SQL

표준 ANSI 방식의 JOIN 쿼리에서는 JOIN 키워드와 ON 절을 사용하여 두 테이블을 합치며, 필터 조건은 WHERE 절에 작성을 해준다.

아래의 쿼리는 department name이 HR 부서인 employee의 이름과 부서 정보를 출력해주는 쿼리이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
-- ANSI
-- JOIN 키워드를 사용해서 ON clause에서 조인되는 조건을 명시했다면 표준 ANSI 방식
SELECT e.emp_name, d.dept_name
FROM employee e
INNER JOIN department d on d.dept_id = e.dept_id
WHERE d.dept_name = 'HR';

-- INNER JOIN에서는 ON 절에 AND 필터 조건을 붙여서 작성해도 결과는 같다.
SELECT e.emp_name, d.dept_name
FROM employee e
INNER JOIN department d ON d.dept_id = e.dept_id AND d.dept_name = 'HR';

-- 하지만, OUTER JOIN에서는 위와같이 ON 절에 필터조건을 거는 경우 문제가 될 수 있다.
1
2
3
4
5
6
-- NON-ANSI
-- JOIN 키워드 대신 ,(comma)를 사용해서 조인 조건을 명시했다면 NON-ANSI 방식
SELECT e.emp_name, d.dept_name
FROM employee e
,department d WHERE d.dept_id = e.dept_id
AND d.dept_name = 'HR';

아래의 쿼리는 모든 employee의 이름과 부서 정보를 출력해주는 쿼리이다.
모든 employee의 정보를 출력해주기 위해서는 department 테이블에 명기된 부서 정보와 matching되지 않은 employee의 정보도 출력해야 되기 때문에 OUTER JOIN을 해서 모든 employee의 정보를 출력해줘야 한다.

Read More

220703 데이터 파이프라인 구축 오프라인 수업 / 6주차

Review


이번 포스팅에서는 여섯 번째 데이터 파이프라인 구축 오프라인 수업시간에서 배운 내용을 정리하려고 한다.

이번 여섯 번째 수업을 마지막으로 데이터 파이프라인 구축과 관련한 데이터 엔지니어링 수업이 마무리되었다. 이번 수업을 통해 정말 많은 것들을 배울 수 있었다. 특히 이전에는 클라우드 플랫폼을 활용해서 데이터 파이프라인을 구축하는 것이 전부라고 생각했었지만, 이번 수업을 듣고나서 관리 및 운영의 관점에서 클라우드 플랫폼에서 제공하는 서비스들의 근간이 되는 오픈 소스 프로젝트의 세부 동작원리에 대해서 이해하는 것이 더 중요하다는 것을 배웠다. 그리고 클라우드 플랫폼을 활용해서 데이터 파이프라인을 처음 구축했을때 그 다음 스탭으로 어떤 식으로 공부를 이어나갈지 감을 잡지 못했었는데, 이번 총 6번의 수업동안 (6주간 진행)앞으로 어떻게 더 공부를 해야되는지, 그리고 새로운 기술스택이 나왔을때 어떤식으로 학습을 이어나가야 되는지에 대해서 알게 되었다.
이번 마지막 수업을 마무리하며 강사님이 어느 데이터 파이프라인 구축에 있어, 어느 파이프라인 구성이 정답이고 그런 건 없다고 하셨다. 그리고 수업을 들으면서 느낀 것은 정말 하나의 파이프라인을 구성할때에도 많은 것들을 고려해야하며, 파이프라인의 각 구성 요소들의 특징들을 제대로 이해하고 있어야 비로소 효율적인 파이프라인을 구축할 수 있다는 것을 배웠다. 아무튼 이번 수업을 통해 좀 더 데이터 엔지니어의 업무 중 하나인 데이터 파이프라인 구축에 대해서 좀 더 심도있게 배울 수 있었던 것 같다.

매주 한 번 강남역에 가서 세 시간씩 하루도 빠지지 않고 수업에 참여하고, 배운 내용을 블로그에 하나도 빠뜨리지 않고 기록하였다. 이런 나에게 칭찬을 하며, 마지막 수업시간에 배운 내용을 정리해보려고 한다.

ElasticSearch

ES를 NoSQL DB와 같은 저장소로써 사용을 하면서 저장된 데이터를 검색하는 용도로 사용된다. 그리고 주로 모니터링이나 로깅과 같은 용도로 많이 사용된다. 메트릭 같은 것을 JSON에 같이 담아서 Kibana를 통해서 그래프로 그려주면, 프로메테우스와 같은 TS DB와 같은 성능은 내지 못하지만, 대시보드로 충분히 그려서 활용할 수 있다. 데이터가 적은 경우에는 앞에서 설명한 것과 같이 ElasticSearch에 저장된 데이터를 Kibana를 활용해서 시각화를 해줄 수 있지만, 이러한 메트릭 값들이 많아지면, TS(Time Series) 전용으로 담아두는 TS DB를 생성해서 관리한다.
이처럼 ES는 검색엔진이지만, 다양한 용도로 사용이 되고 있다.

Read More

220626 데이터 파이프라인 구축 오프라인 수업 / 5주차

Review


이번 포스팅에서는 다섯 번째 데이터 파이프라인 구축 오프라인 수업시간에서 배운 내용을 정리하려고 한다.

우선 실습에 앞서 간단하게 Flink에 대해서 정리를 해보자. Flink는 분산 데이터 처리 프레임워크 (Processing unbounded data)로, Kafka와 같은 MQ에 Flink를 붙여서 처리를 할 수 있다. 이외에도 MQ에 Kinesis Firehose를 붙이고, Lambda를 붙여서 custom한 형태의 데이터로 추출을 할 수도 있고, 데이터를 암호화하거나 특정 format(Parquet, ORC)으로 변환을 해서 추출을 할 수도 있다.
또 Logstash를 붙여서 데이터를 간단하게 처리해서 넘겨줄 수 있다. (whitelist / filter plugin을 통해 처리) 커스텀하게 데이터를 모아주거나 분석을 하는 경우, 예를들어 GPS 신호를 계속 보내서 사용자들이 이 데이터를 1분동안 aggregation해서 어느 지역에 사람이 많은지 분석하는 작업은 logstash로 분석을 하는 것이 불가능하다. 그리고 각 각의 logstash로 나눠서 분산처리를 하는 것은 가능하지만, 각 각의 logstash가 서로 데이터를 공유하지는 못하기 때문에(클러스터 내의 노드로써 존재하지 않기 때문에) 복잡한 스트림 처리나 프로세싱, 분석이 필요할때 Flink를 사용한다.

그리고 Flink, Storm을 개발할때에는 DAG를 많이 사용한다. Kafka에서 받아온 실시간 데이터를 키별로 분류를 하는 작업에서도 사용이 될 수 있는데, 게임 데이터를 분석한다고 가정했을때 각 캐릭터의 직업에 따라 분류를 해서 각 직업별로 행동 패턴을 분석하고자 할때 각 각의 Dataflow를 머릿속으로 구조화시킨 다음에 붙여주는 작업을 해야한다.
source operator(Kafka) -> keyBy operator -> aggregation operator -> HDFS sink Flink와 같은 데이터 플로우 프로그래밍이 필요한 어플리케이션은 operator와 같은 요소를 하나 하나 개발을 한 다음에 연결을 해주는 방식으로 개발을 하게 된다.

DB ETL작업을 할때 DB에서 주기적으로 데이터를 select해서 Flink내부에서 처리를 하고자 할 때에도 source operator를 사용한다. (Flink 외부에서 데이터를 긁어오는 부분을 source operator)

중간에서 데이터를 변환해주는 부분을 Transformation Operator라고 한다. 그리고 처리 결과를 밖으로 빼내는 부분을 Sink Operator라고 한다.

  • 실제 flink로 어플리케이션을 개발하게 되면, 수십개의 TM가 생성이 된다.
    개발을 할때 각 각의 operator의 갯수를 parallism을 통해 정의할 수 있다.
    Kafka(3) -> Map(10) -> Sink(3) => 16개의 operator가 TM의 각 각의 노드에 분산이 되어 처리된다. 이와같은 특징으로, Fluentd와 logstash와 같은 agent 기반의 데이터 파이프라인과 비교되어 사용된다.

    Read More

220623 Terraform study

Terraform


이번 포스팅에서는 실제 프로젝트에서 AWS Topology를 Terraform 코드로 작성(IaC)해보면서 전체적인 프로젝트의 공통적인 구조를 어떻게 작성을 할 것인가에 대해 방향을 잡아가면서 정리한 내용을 작성해보려고 한다.

데이터 파이프라인 구축에 있어, AWS network topology 구성

기본 AWS 네트워크 토폴로지 구성

우선 AWS의 서비스를 활용해서 데이터 파이프라인을 구축함에 있어, 전체적인 AWS network topology의 구성은 중요하다. 실습을 했을때는 default VPC, Subnet에 각 각의 AWS 서비스를 활용해서 network topology를 구성을 했지만, 실무에서는 default vpc와 subnet을 사용하지 않는 것을 권장한다고 한다.
실제 업무에서는 Production에서 각 각의 서비스별로나 네트워크 보안 정책에 따라서 VPC를 나눠서 관리하고, 세팅하는 게 일반적이라고 한다. (DE 현직자 오프라인 수업을 통해 배웠다.)
AWS 서비스 중에서 완전 관리형 서비스가 아닌 서비스들은 VPC를 타지만, 완전 관리형 서비스의 경우에는 VPC가 아닌 별도의 AWS Zone(AWS 전용 네트워크 라인)을 탄다고 한다.
그래서 이번 프로젝트 진행에 있어, 각 프로젝트별로 하나의 AWS Network Topology가 나오기 때문에 위의 구성과 같이 전체적으로 구성을 해보려고 한다.

220619 데이터 파이프라인 구축 오프라인 수업 / 4주차

Review


이번 포스팅에서는 네 번째 데이터 파이프라인 구축 오프라인 수업시간에서 배운 내용을 정리하려고 한다.

Kafka 실습환경 구축 및 Single consumer 실습

Kafka 실습환경 구축 및 Single consumer 실습

이전에 Kafka를 실습했을때는 생성한 EC2 인스턴스에 apache kafka 압축파일을 다운받고, 압축을 풀고, zookeeper와 kafka server를 시작하고, Topic을 partitions와 replication-factor 옵션의 값을 1로 생성하고 bootstrap-server 옵션은 localhost의 9092번 포트로해서 설정하였다. (kafka server: 9092, zookeeper: 2181) 이때 실습을 했을때는 세부 옵션에서 partitions이 뭔지 replication-factor가 뭔지 구체적으로 알지 못했었는데, 이제는 ISR 그룹이 뭔지 partition과 replication-factor이 뭔지 이해를 한 상태이기 때문에 좀 더 체계적으로 실습을 할 수 있는 것 같다.

그리고 이번 실습에서는 zookeeper와 kafka server를 하나의 터미널에서 명령어로 일일이 실행시키지 않고, zookeeper와 kafka host server, producer, consumer를 각각의 container로 구동을 시키기 때문에 구조적으로 좀 더 확장성 있는 것 같다. (docker image를 이용해서 컨테이너 서비스로 구동시키기 때문에 좀 더 빠르게 환경 구성을 할 수 있는 것 같다. 이래서 컨테이너로 서버를 띄우고 관리하는 것 같다.)

Read More

220618 데이터 파이프라인 구축 오프라인 수업 / 4주차 오프라인 수업 전 준비

Preparation


이번 포스팅에서는 내일 있을 데이터 파이프라인 구축 관련 수업에서 실습할 내용들에 대해 미리 실습을 해보고, 개념적인 부분을 좀 다져보려고 한다.
내일 수업에서는 Apache Kafka를 실습하는데, Kafka cluster의 각 Broker와 Zookeeper, Producer와 Consumer를 전부 Docker container로 띄우고 관리한다. 이미 Docker에 대해 공부했고, Kubernetes에 대해서 추가적으로 공부를 하는 중이라 이번 수업을 통해 Kafka의 각 구성요소들을 Docker container에 띄워서 관리하면 많은 도움이 될 것 같다.
그리고 여지까지 했던 실습들과 비교했을때 상대적으로 구조가 복잡하기 때문에 미리 Kafka의 개념적인 내용을 머릿속에 그리면서 실습하게 될 전체적인 구조에 대해서 파악하고 가야 할 필요성을 느꼈다.

Read More