if __name__ == '__main__': NumOfEvaluateBreakdown.run()
여기서 확인해야 될 중요한 부분은 Streaming은 모든 input과 output에 대해서 문자열로 다룬다는 것이다. 따라서 위의 출력 값에서 볼 수 있듯이, movieID를 numeric값이 아닌 string 값으로 정렬이 됨을 볼 수 있다. 따라서 다음 챌린지에서 평가 횟수를 기준으로 정렬할 때에는 zfill() method를 사용해서 일정 자리수로 맞춰주고, 나머지 자리수의 경우에는 0으로 padding값을 넣어 맞춰줘야 문자열로도 numeric으로 정렬한 것과 같은 동일한 결과값을 얻을 수 있다.
from mrjob.job import MRJob from mrjob.step import MRStep
classNumOfEvaluateBreakdown(MRJob): defsteps(self): return [ MRStep(mapper=self.mapper_get_num_of_evaluate, reducer=self.reducer_count_num_of_evaluate), # Shuffle and Sort 작업이 한 번 더 일어나며, sort_by MRStep( reducer=self.sort_by_rating_count ) ] defmapper_get_num_of_evaluate(self, _, line): (userID, movieID, rating, timestamp) = line.split('\t') yield movieID, 1
# 평가된 횟수를 총 5자리로 숫자로 만들어서 그 값을 기준으로 movieID를 정렬한다. defreducer_count_num_of_evaluate(self, key, values): yieldstr(sum(values)).zfill(5), key
# 현재 복수 개의 동일한 movieID가 movies 값 배열에 들어가 있기 때문에 # movies 배열을 순회하면서, 매핑되는 count 값을 출력해준다. defsort_by_rating_count(self, count, movies): for movie in movies: yield movie, count
if __name__ == '__main__': NumOfEvaluateBreakdown.run()
[Multi-stage jobs]
복수 개의 map/reduce 작업을 chaining 하는 방법은 아래와 같이 MRStep으로 연속해서 ,(comma)로 구분해서 이어주면 된다.
1 2 3 4 5 6 7
defsteps(self): return [ MRStep(mapper=self.mapper_get_ratings, reducer=self.reducer_count_ratings), # MapReduce에 의해 Shuffle and Sort가 이뤄진다. MRStep(reducer=self.reducer_sorted_output) ]