220424 데이터 파이프라인 스터디 13일차 (Zeppelin notebook을 사용한 분석 및 Spark로 가공한 데이터를 Amazon RDS(MySQL)에 저장)

Apache Zeppelin

이번 포스팅에서는 저번 시간에 이어서 Zeppelin을 사용해서 데이터를 처리실습을 한 내용을 정리해볼 것이다.
Zeppelin은 Interpreter로, 다양한 Interpreter를 제공해준다고 했는데, Spark도 그 중에 하나로 포함이 되어있다. 그래서 이전 시간에 실습했던 것처럼 s3 bucket에 있는 csv format의 파일을 spark.read에 여러 option을 붙여서 DataFrame 객체로써 저장을 할 수 있었다.

이 Apache Zeppelin에서 DataFrame을 조작해보면서 느낀점은 이전에 해봤던 Pandas의 DataFrame 조작과 매우 비슷한 부분이 많다는 것이다. Spark로 만든 DataFrame 변수로는 View Table을 정의할 수 있는데, 이 임시로 정의한 View Table을 활용해서 SQL query를 실행시킬 수도 있다.

그리고 DataFrame format의 데이터를 JSON format의 데이터로 S3 bucket에 저장을 할 수 있으며, S3에 저장된 JSON format의 데이터를 spark.read.json()으로 다시 DataFrame 형식으로 불러올 수 있다.

이정도의 내용을 저번 시간에 간단하게 실습을 해보았다.

Read More

220423 데이터 파이프라인 스터디 12일차 (Zeppelin notebook을 사용한 분석)

Apache Zeppelin

이번 포스팅에서는 Zeppelin을 사용해서 데이터를 처리실습을 한 내용을 정리해볼 것이다.

Zeppelin 메뉴 구성

Zeppelin 페이지를 보면, Job 메뉴를 통해서는 실행되고 있는 좌표들을 모니터링할 수 있다. 그리고 Interpreters 메뉴를 통해서는 다른 Interpreter를 추가할 수도 있으며, Angular, File, Flink, Kotlin, R, Python, Spark 등에 대한 Interpreter를 default로 지원해주고 있음을 확인할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val csvDF = spark.read
// 첫 번째 row가 header인 경우,
.option("header", "true")
// delimiter가 comma로 구분된 경우,
.option("delimiter", ",")
// schema를 구성을 할 때 알지 못하는 데이터의 경우에는 문자열로 정의한다.
// 전체 데이터를 읽고 숫자 데이터만 있는 경우에는 Integer로, 문자만 있는 경우, String만 정의한다.
// 단, 파일이 큰 경우에는 전체를 다 읽어야 되기 때문에 이 부분을 고려해서 작업을 진행해야된다.
.option("inferSchema","true")
.csv("s3://[s3-name]/[...]/[filename].csv")

csvDF.show // 파일 확인하기

csvDF.printSchema // DataFrame Schema 출력/확인

Read More

220423 데이터 파이프라인 스터디 12일차 (종합복습 및 세미 프로젝트 준비)

데이터 파이프라인(Data Pipeline)

오늘은 데이터파이프라인 1일차부터 11일차까지 학습했던 내용들은 종합적으로 복습하면서 앞으로 개인적으로 진행하게 될 세미 프로젝트는 어떤 방향으로 진행을 하게 될 것인지에 대해서 구상을 해보았다. (참고로 개인프로젝트를 채워넣을 개인 포트폴리오 웹 사이트를 아주 심플하게 만들었다. 뭔가 좋은 동기부여가 될 것 같아 하나 하나씩 채워가기 위해서 만들었는데, 확실히 좋은 동기부여가 되는 것 같긴하다)

아직은 총 11차 중에 2일분 밖에 복습을 진행하지 못했다. 하지만, 2일차까지 내용을 천천히 되새기면서 정리를 해보니 정말 주옥같은 내용들이 많았고, 내가 새로운 개념들을 배워가다가 잊고 있었던 부분들도 있었다. 앞으로 남은 복습분량들도 충분히 시간을 투자해서 진행할 계획이다.

Read More

220423 [비공개]데이터 파이프라인 스터디 12일차 (종합복습 및 세미 프로젝트 준비)

데이터 파이프라인(Data Pipeline)

오늘은 데이터파이프라인 1일차부터 11일차까지 학습했던 내용들은 종합적으로 복습하면서 앞으로 개인적으로 진행하게 될 세미 프로젝트는 어떤 방향으로 진행을 하게 될 것인지에 대해서 구상을 해보았다. (참고로 개인프로젝트를 채워넣을 개인 포트폴리오 웹 사이트를 아주 심플하게 만들었다. 뭔가 좋은 동기부여가 될 것 같아 하나 하나씩 채워가기 위해서 만들었는데, 확실히 좋은 동기부여가 되는 것 같긴하다)

아직은 총 11차 중에 2일분 밖에 복습을 진행하지 못했다. 하지만, 2일차까지 내용을 천천히 되새기면서 정리를 해보니 정말 주옥같은 내용들이 많았고, 내가 새로운 개념들을 배워가다가 잊고 있었던 부분들도 있었다. 앞으로 남은 복습분량들도 충분히 시간을 투자해서 진행할 계획이다.

복습노트-1

우선 총 11일 동안 진행했던 데이터 파이프라인 학습은 나름 만족스러운 진행상태이다. 총 학습 표의 44.7% 달성을 했고, 그 사이 사이에 블로깅이며, 다른 공부와 병행을 하고, 개인적으로 복습시간을 갖았던 것을 고려하면, 나름 괜찮은 성과이다.

복습노트-2

세미 프로젝트의 방향

이번 포트폴리오에 넣을 프로젝트의 내용은 각 프로젝트별로 테마 주제를 정하여 진행할 예정이다.

예를들어, 첫 번째, 아래와같이 서비스 운영적 측면에서 cost를 고려하여 두 개의 Architecture를 구상하여 비교할 수 있다.

서비스 운영/cost측면을 고려한 두 Architecture 구상 및 비교

Cloud service를 기반으로 Data Pipeline을 구축하게 되면, 사용되는 서비스에 따 가격부담이 달라질 수 있다.

Architecture1) Amazon API Gateway => Amazon Kinesis Stream => Amazon Kinesis Firehose => S3

Architecture1의 구성으로 데이터 수집 파이프라인을 구성하게 되면, 가장 이상적으로 좋지만, Amazon API Gateway에서 가격 단가가 부담이 될 수 있다.

Architecture2) Amazon Pinpoint => Amazon Kinesis stream => Amazon Kinesis Firehose => S3

CSP(Cloud Service Provider)가 바뀌어도 유연하게 해당 클라우드 환경에 맞게 파이프라인을 구축할 수 있도록 Architecture 구성해보기

결국에 실무에서 어느정도 파이프라인이 구축이 되어있는 상태에서 업무를 할 수도 있겠고, 처음부터 새로운 파이프라인을 구축하는 업무를 할 수도 있겠다. 이 모든 경우의 수에서 중요한 것은 어느 환경이 되었건 내가 전체적인 데이터 파이프라인의 구조를 알고 있고, 같이 업무하는 사람과 제대로 소통을 할 수 있는가를 보여줄 수 있어야한다.

가장 기본이 중요하다. 포트폴리오를 통해서 대단한 것을 보여주는 것 보다는 얼마나 기본에 충실했고, 이 프로젝트를 통해서 어떤 것들을 겪었고, 얻었는지, 그리고 추가적으로 어떤 것들을 학습해서 내 것으로 만들었는지에 대해서 구체적으로 기술해준다면 잠재적으로 내가 어떤 잠재성을 가진 데이터 엔지니어로 성장할 수 있는지 보여줄 수 있을 것이라고 생각한다.

“서울 따릉이 일일 데이터 웨어하우스 및 BI 대시보드 구축”

“블로그 유입자 분석 백엔드 시스템 구축”

  1. 먼저, 데이터를 뽑아낼 수 있는 소스가 필요하다. 직접 어플리케이션을 만들 수도 있고, 오픈 API 등을 이용해 타 서비스의 데이터를 받아올 수 있죠. 이역할을 하는 코드를 짜야합니다.

  2. 데이터를 어떤 형태로 만들지 고민 해야 합니다. 데이터가 로그라면, 어떤 식의 로그가 우리에게 최적일지 고민해봐야 합니다. 코드에 비해 데이터의 모형, 즉 원본 데이터 모델은 추후에 바꾸기 쉽지 않습니다. 데이터 모델링에 대해 고민해보게 됩니다.

  3. 코드를 돌리고, 데이터를 쌓을 수 있는 컴퓨팅 리소스가 필요합니다. 일반적으로 클라우드를 사용하게 되죠. 클라우드 서비스가 무엇이 있는지 공부하고, 사용법을 조금 익히게 됩니다.

  4. 더 구체적으로 들어가, 데이터를 어떻게 뽑아내고 옮기고 쌓을 지 고민해야 합니다. 데이터를 주기적으로 뽑아내야 한다면, 에어플로우와 같은 프레임 워크를 고려하게 됩니다. 또한 어떤 데이터 베이스를 사용할지 결정하고, 데이터를 append 로 쌓을 건지, truncate 하고 insert 할 것인지 결정하게 됩니다.

일단 이정도의 프로젝트로 3~4개 프로젝트를 진도와 같이 병행하면서 진행할 예정이다.
지금 이 상황에서 진행할 수 있는 프로젝트 테마/주제로는 아래 두 가지가 있을 것 같다.

CSP(Cloud Service Provider)가 바뀌어도 유연하게 해당 클라우드 환경에 맞게 파이프라인을 구축할 수 있도록 Architecture 구성해보기 - (Lambda Architecture)

서비스 운영/cost측면을 고려한 두 Architecture 구상 및 비교

Architecture1) Amazon API Gateway => Amazon Kinesis Stream => Amazon Kinesis Firehose => S3

Architecture1의 구성으로 데이터 수집 파이프라인을 구성하게 되면, 가장 이상적으로 좋지만, Amazon API Gateway에서 가격 단가가 부담이 될 수 있다.

Architecture2) Amazon Pinpoint => Amazon Kinesis stream => Amazon Kinesis Firehose => S3

AWS Pinpoint reference : https://docs.aws.amazon.com/ko_kr/pinpoint/latest/developerguide/integrate-events.html

HDP2.6.5 환경에서 MapReduce Python script/Pig Latin/Spark로 작성하고 실행해보기

우선 국내 빅데이터 제공 sample 대용량 데이터를 구한다. 흥미로운 데이터를 구해서
같은 처리를 MapReduce Python script, Pig Latin script, Spark로 작성을 하고 그 차이에 대해서 스스로 분석을 해본다.
Pig와 MapReduce/TEZ와의 각각의 궁합에 대해서 분석적으로 접근하면서 어떠한 차이가 있는지에 대해서도 분석할 수 있는 방법이 있는지 (HORTONWORKS SANDBOX) 찾아보는 것이 좋을 것 같다.

일단 주제선정이 확실해지면, 30만원 credit 신청을 하고 받으면 큰 프로젝트로 2개 정도 더 하기

(2022/05/07 $300 Credit 신청이 거절되었다)

220423 AWS Practitioner 자격증 취득 준비

AWS Practitioner


이번 포스팅에서는 요즘 재미를 붙여가면서 공부하고 있는 AWS 공부와 관련된 이야기 좀 하려고 한다. AWS 공부를 하게 된 계기는 데이터 파이프라인 구축과 관련된 실습을 AWS 가상 클라우드 환경에서 하고 있기 때문이다.
이번 실습을 하면서 이전에는 단순히 S3 Bucket에 리소스를 올려서 사이드 프로젝트에서 사용하고, EC2 가상 서버 인스턴스를 올려서 단기적으로 사용하던 것을 넘어서서 이번에는 각 서비스에 대해서 제대로 이해하고 사용하기 시작했다.
그 이유는 지금 공부하면서 실습하고 있는 내용이 데이터 파이프라인 구축에 관한 내용인데, 데이터 수집, 전처리, 분석 및 시각화, 저장해주는 각 각의 일련의 과정에서 사용되는 AWS의 서비스들은 많으며, 다양한 구성도가 나올 수 있고, 상황에 맞게 잘 구성해서 사용해야되기 때문이다.
심지어 운영적 측면도 고려하여, cost도 생각해서 pipeline에서 사용할 AWS의 서비스를 구성 및 설계해야한다.

생각보다 한 서비스를 활용한다는 것은 정말 복잡하다. 데이터 파이프라인 실습 중간 중간에 공부를 하면서 AWS와 관련된 여러 서비스들이 등장을 하는데, 뭔가 학습에 있어 로드맵이 필요하다는 생각이 들어서 찾아보던 중, AWS 관련 자격증 중에서 AWS Practitioner라는 자격증이 있다는 것을 알게 되었다.

가장 기초가 되는 자격증이라는 점이 가장 마음에 들었고, 지금 시점에 배우는 서비스들 중에 아닌 서비스들도 있지만, 대부분의 서비스들이 AWS 서비스들 중에 가장 기본이 되는 서비스들이 많기 때문에 병행해서 학습을 한다면 좀 더 좋을 거라고 생각을 했는데, 2주간 같이 병행해서 학습을 해보니, 역시 내 예상대로 정말 많은 도움이 되었다.

AWS Practitioner Online Course Final Test

오늘부로 AWS 공식 사이트에서 제공해주는 AWS Cloud Practitioner Essentials 코스 완료했다. 최종 테스트에서 87% 정답률을 달성했는데, 틀린문제는 다시 풀어서 다 맞췄다.

생각보다 수업내용과 AWS 공식 사이트에서 제공해주는 무료 강좌의 커리큘럼에서 순차적으로 등장하는 AWS의 서비스들이 겹치는 순간이 좀 많이 있어서 복습의 효과까지 얻게 되었던 것 같다.

이제 남은 시험날까지 13일정도 남았는데, 그 날까지 기출문제를 계속 풀어보면서 AWS에 있는 서비스 및 기술들에 대해서 다시 복습하고, AWS 공식 사이트에서 알려준 아래의 중요도 순으로 시간을 배분해서 공부를 해봐야겠다.

영역3(기술) > 영역1(클라우드 개념) > 영역2(보안 및 규정준수) > 영역4(결제 및 요금)

220421/220422 데이터 파이프라인 스터디 10/11일차 (AWS EMR - 실습)

데이터 파이프라인(Data Pipeline)

이번 포스팅에서는 이전 시간에 이론적인 부분을 학습한 EMR을 직접 실행해보고 모니터링해보면서 학습한 부분에 대해 포스팅해보려고 한다.

EMR 서비스

처음 EMR 서비스 대시보드에는 아무것도 보이지 않지만, 기본적으로 보이는 클러스터 history 정보의 경우에는 default로 두 달 정도 보여주며, optional하게 노출 기간의 변경이 가능하다.

저렴하게 EMR 서비스 이용

  • 고급 옵션으로 이동
    • EMR은 관리형 클러스터이기 때문에 여러가지 소프트웨어를 옵션으로 설치할 수 있다.
      (emr-6.x : Docker/Kubernetes를 같이 실행할 수 있는 환경을 지원)
  • 소프트웨어 선택
    • Hadoop, Ganglia, Hive, Zeppelin, Spark, Pig 선택
    • cf. Zippelin : 웹 환경에서 Spark에 여러가지 command를 통해서 job을 던진다.
    • cf. Livy : 어플리케이션 간에 Spark에 job을 던질때 사용되는 것이 Livy이다.
    • AWS Glue catalog setting
      • Glue는 S3에 저장된 정보의 meta정보를 테이블 형태로 관리하는 것을 말한다.
  • 클러스터 생성
  • 하드웨어 구성
    • Networking
      • VPC : 하나의 건물로 이해 / subnet은 각 층으로 이해 (스팟을 제공하는 subnet이 따로 있는 경우가 있기 때문에 확인 필요)
    • Cluster Nodes and Instance
      • Instance type : spark 자체는 메모리 기반 데이터를 분석하는 툴이다. 따라서 메로리에 좀 더 최적화된 인스턴스 타입을 선택한다. (r-> ram -> memory 기반의 인스턴스, c -> computing, g -> gpu, i -> disk 최적화(데이터 분석시에도 고려되면 좋음))
Amazon EMR Cluster Nodes and Instances setting

Read More

220422 Hadoop과 친해지기 열 번째 이야기(Pig로 Hadoop 프로그래밍)

Hadoop & Pig

이번 포스팅에서는 구조상으로 보면, MapReduce의 상위에 위치해있는 Pig라는 친구를 사용해서 실습을 해보도록 하겠다. 실습은 Ambari를 사용해서 해보도록 한다.

Ambari에서 관리자 계정 접근 활성화

1
2
3
4
$su root
# switch to root account
$ambari-admin-password-reset
# type the password for admin account

Pig ?

Ambari 웹 브라우저의 화면 우측 상단에 격자 무늬 아이콘을 클릭하면, Pig View 메뉴를 확인할 수 있는데, 이곳에 Pig Scripts 추가하고 실행해볼 수 있다.

여지까지 Hadoop에 대해서 학습하면서 Hadoop의 가장 핵심기술인 MapReduce에 대해서 중점적으로 배웠다. MapReduce는 Hadoop의 시작을 함께 한 기술이긴 하지만 오래된 데이터 처리 기술이고, 어렵다. 이러한 어려운 데이터 처리 기술을 Pig는 좀 더 쉽게 할 수 있도록 도와준다.

Read More

220419 데이터 파이프라인 - 7

데이터 파이프라인(Data Pipeline)

이번 포스팅에서는 Amazon EMR에 대해서 알아본다. 이전 포스팅까지는 Kafka, Kinesis와 같은 message queue를 활용해서 분석할 데이터를 수집하는 것에 대해서 직접 파이프라인을 구성해보고 실습해보았다. 이번 포스팅에서는 Apache Hadoop과 Apache Spark와 같은 빅데이터 프레임워크 실행을 간소화하는 관리형 클러스터 플랫폼인 EMR(Elastic MapReduce)에 대해서 실습해본다.
분석할 대상이 되는 데이터가 요구하는 컴퓨팅 파워에 따라 클러스터를 쉽게 확장하고 축소할 수 있는 장점을 Amazon EMR은 가지고 있다. 그래서 이름에도 Elastic이 붙어있다.
Amazon EMR에서 파이프라인의 핵심 축이 되는 Spark의 활용법에 대해서도 집중적으로 정리를 해보자.

DataPipeline flow

데이터 파이프라인(Data Pipeline) 전체흐름

streaming이나 batch에 의해서 발생한 파일 데이터는 Bronze 데이터로써 저장이된다.
이 Bronze 데이터는 한 번 가공해서 테이블 형태로 저장을 하는데 이 데이터 형태는 DW(Data Warehouse)형태라고 하며, Sliver 데이터라고도 한다.
이 Sliver 데이터를 다른 사람이 분석하거나 시각화가 가능한 상태의 데이터로 가공하게 되면, 이 데이터를 DM(Data Mart)형태 혹은 Gold 데이터라고 칭한다.

데이터 파이프라인(Data Pipeline) 전체흐름

DataPipeline Sequence

(STEP 1) S3 bucket에 있는 데이터는 웹/앱/Batch에 의해 발생한 데이터이며, Bronze 데이터라고 한다.
상품/고객의 Transaction, 고객이 상품을 보았는지에 대한 데이터 등이 이에 해당한다.

(STEP 2) 앞서 STEP1에 있는 데이터가 한 번 가공을 해서 Silver 형태로 저장을 하게 되는데, 이때 사용되는 것이 바로 Amazon EMR 안의 Spark이다.

(STEP 3) 상품/고객 같은 경우에도 마스터성 데이터가 있다. 그 데이터를 S3에 넣고, 이벤트성 데이터는 보통 ID가 들어가있는데, ID를 푸는 작업을 한다. 그 이후에 Gold 타입의 데이터로 변화한다.

EMR 구조

Hadoop 구조를 가지고 있는 관리형 클러스터 플랫폼이다. 기본적으로 EC2 기반으로 구성을 하며, EC2에서 클러스터를 구성하기 위한 여러 서비스 요소들을 설치한다.

Amazon EMR 구조

Master Node

  • 클러스터를 관리
  • 노드간에 데이터 및 작업의 분배를 조정
  • 작업 상태를 추적하고 클러스터의 상태를 모니터링

Core Node(최근에는 Core Node없이 Master Node 자체만으로도 Jupyter notebook으로 구성할 수 있다)

  • Data Node로, 클러스터의 HDFS에 데이터를 저장하는 노드이며, 하나 이상의 Core node가 있어야 한다.

Task Node

  • Core Node와 비슷한 역할을 한다.
  • Task Node는 HDFS가 없으며, 오직 compute resources만을 가진다. 따라서 Scale In/Out하기에 유용하다. (Optional)

HDFS로 S3를 사용하면 좋은점

  • Computing Node와 Data Node를 서로 독립된 객체로써 운영함으로써 좋은 Architecture를 구성할 수 있다.
  • 기존 HDFS의 경우에는 용량이 부족한 경우에는 용량에 대한 확인을 해서 확장을 해줘야했는데, S3의 경우에는 확장에 대한 신경을 쓰지 않아도 된다.
  • 높은 내구성을 가진다.
  • 유연하게 클러스터 내(S3)에서 필요에 따라 노드를 추가하거나 삭제할 수 있다.
  • 클러스터를 종료 후에 다시 클러스터를 구성해도 기존의 데이터를 다시 읽을 수 있다.
  • 여러개의 클러스터 서비스에서 데이터를 읽을 수 있다.(운영 EMR과 분석용 EMR에서 모두 사용 가능)
Amazon S3 Bucket with EMR

Storage 부분과 Compute 부분을 분리함으로써 복수 작업자간의 종속성 분리

  • Storage 부분과 Amazon EMR(computing part)을 활용해서 Storage의 데이터에 Spark을 활용해서 동시에 접근을 해서 처리하는 형태로 구성을 할 수 있다.
    (이전에는 큰 서버에 여러 분석가들이 접속을 해서 작업을 했었다. 그래서 큰 작업을 하는 작업을 하는 사람이 있는 경우에는 부하가 걸려서 다른 작업자에게 영향을 주었다. 하지만 위와같이 Storage와 Compute영역을 분리를 시키게 되면, 각 각의 작업자간의 종속성을 분리시킬 수 있다)

Amazon EMR(Elastic MapReduce)내의 다양한 서비스들

Amazon EMR 내에는 데이터 전처리 및 분석을 위한 여러 서비스들이 존재한다. 대표적으로 아래의 서비스들이 존재하며, 시간이 될때 찾아서 각 서비스들의 특징에 대해서 살펴보도록 하자.

  • Presto
  • Ganglia
  • Hue
  • oozie
  • mahout
  • TensorFlow

EMR의 Spark에 대한 모니터링

Zeppelin을 이용한 데이터 처리

220419 데이터 파이프라인 스터디 9일차 (AWS EMR - 이론)

데이터 파이프라인(Data Pipeline)

이번 포스팅에서는 Amazon EMR에 대해서 알아본다. 이전 포스팅까지는 Kafka, Kinesis와 같은 message queue를 활용해서 분석할 데이터를 수집하는 것에 대해서 직접 파이프라인을 구성해보고 실습해보았다. 이번 포스팅에서는 Apache Hadoop과 Apache Spark와 같은 빅데이터 프레임워크 실행을 간소화하는 관리형 클러스터 플랫폼인 EMR(Elastic MapReduce)에 대해서 실습해본다.
분석할 대상이 되는 데이터가 요구하는 컴퓨팅 파워에 따라 클러스터를 쉽게 확장하고 축소할 수 있는 장점을 Amazon EMR은 가지고 있다. 그래서 이름에도 Elastic이 붙어있다.
Amazon EMR에서 파이프라인의 핵심 축이 되는 Spark의 활용법에 대해서도 집중적으로 학습을 하고 정리를 해보려고 한다.

Read More

220419 Hadoop과 친해지기 열 번째 이야기(MapReduce Challenge)

Hadoop MapReduce

이번 포스팅에서는 MapReduce 실습 도전과제에 대해 직접 해본 내용을 정리해보려고 한다.

이전 포스팅에서는 영화 정보 데이터가 주어졌을때, 각 평점을 기준으로 몇 편의 영화가 분포되어있는지 확인하는 MapReduce 코드를 작성해보았다.

이번 Self-Challenge 과제는 총 2개이며,
첫 번째 챌린지 과제는 영화별 인기순위를 분류하기 위한 MapReduce 작성하는 것이다.
두 번째 챌린지 과제는 평가 횟수를 기준으로 영화를 정렬하는 MapReduce를 작성하는 것이다.

영화별 인기순위 분류하기 위한 MapReduce 작성

영화 인기순위는 영화 평가 횟수를 기준으로 보았을 때, 시청 횟수의 대용물로 평가가 될 수 있기 때문에 movieID를 기준으로 mapper를 작성해주고, reducer에서 해당 영화 평가 횟수를 종합해주면 된다는 결론이 나온다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from mrjob.job import MRJob
from mrjob.step import MRStep

class NumOfEvaluateBreakdown(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_num_of_evaluate,
reducer=self.reducer_count_num_of_evaluate)
]
def mapper_get_num_of_evaluate(self, _, line):
(userID, movieID, rating, timestamp) = line.split('\t')
yield movieID, 1

def reducer_count_num_of_evaluate(self, key, values):
yield key, sum(values)

if __name__ == '__main__':
NumOfEvaluateBreakdown.run()

Read More