210620 RxJS (작성중...)

RxJS
본 블로그 포스팅은 RxJS와 관련하여 네이버의 손창욱님이 올려주신 유튜브 영상을 보고 정리해보았다.

RxJS를 처음 접하는 나와 같은 사람도 정말 쉽게 이해할 수 있도록 설명을 해주셔서 너무 유익했다.

모든 어플리케이션은 궁극적으로 상태머신들의 집합

Input(입력값)이 들어오면 logic을 통해 state가 업데이트되고, 업데이트된 state는 logic을 통해 Output(결과값)이 나온다.

RxJS

RxJS는 비동기 처리를 원할하게 할 수 있도록 도와주는 데이터 스트림으로 정의할 수 있다.

RxJS를 학습하기 위해서는 여러 새로운 용어들이 등장한다.(개별적으로 정리해서 공부)

동기/비동기 방식의 사용

동기 방식은 흐름이 그대로 가기 때문에 흐름을 쉽게 파악할 수 있지만, 비동기 처리는 굉장히 느린 처리나 사용자로부터 빠르게 interaction을 받아와야 하는 경우 그리고 높은 퍼포먼스가 요구되는 상황에서 사용이 된다.

RxJS의 역할

RxJS이외에도 Callback, Promise, Generator, Async/Await과 같은 표준을 사용하여 비동기 처리를 할 수 있는데 왜 굳이 RxJS를 사용해야 될까?

Callback함수를 사용해서 비동기 처리를 하는 경우, 에러에 대한 처리가 어렵고, 콜백 헬등의 분제가 발생한다.
Promise를 사용하는 것이 좀 더 나은 방법이긴 하지만, 한 번에 하나의 데이터를 처리하기 때문에 연속성을 갖는 데이터를 처리할 수 없다는 점과 서버로 보낸 요청을 취소할 수 없다는 단점이 있기 때문에 Observable을 사용하여 위의 단점을 보안해서 비동기 처리를 할 수 있다.

그리고 RxJS는비동기 처리 영역, 데이터 전파, 데이터 처리를 담당하며, 일관된 방식으로 안전하게 데이터 흐름을 처리하는 라이브러리이다.
여기서 일관된 방식이란 입력된 값의 타입에 따라 각 각 따로 처리를 하지 않고 하나의 방식으로 처리를 할 수 있다는 말이다.

  • [참고] 입력값에 따른 데이터 처리

    개발자가 처리하는 입력값에는 어떤 것들이 있을까?
    (1) 배열 데이터, 함수 반환값 : 동기 데이터를 처리하는 방식
    (2) 키보드/마우스 입력, 원격지 데이터, DB 데이터 : 비동기 데이터를 처리하는 방식

    위와 같이 RxJS이외의 Callback, Promise, Generator, Async/Await과 같은 표준을 사용하여 비동기 처리를 하게 되면, 데이터 처리 방식이 제각각이기 때문에 각기 다른 방식으로 처리를 해줘야 한다.

    하지만 RxJS는 동기/비동기와 관계없이 데이터를 시간축을 기준으로 연속적인 데이터 스트림으로 처리한다.

사용자 입력에 따른 동기처리와 비동기처리

위와같이 모든 데이터를 시간의 축을 기준으로 배열의 컬랙션이 오는 형태로 생각하면, 시간축 관점에서 결국 동기와 비동기는 같다는 결론이 나온다.

→ RxJS는 하나의 방식으로 처리한다. (인터페이스의 단일화)

RxJS Observable

rxjs는 이러한 입력 데이터들을 각 각 다른 형태로 처리하지 않고, 하나의 시간 축에 따른 배열 컬랙션으로 생각하고 모두 일관된 하나의 방식으로 처리하게 된다. (모두 Observable로 처리)

이 결과 개발자는 좀 더 로직에 집중을 해서 작업을 할 수 있다.

의존관계가 있는 상태머신

의존관계가 있는 상태머신에 변경된 상태 정보를 다른 상태머신에 어떻게 전달해야되는지에 대한 고민이 있을 수 있다.

[Reactive Programming 위키피디아 참고]

데이터 흐름과 상태 변화 전파에 중점을 둔 프로그램 패러다임으로, 프로그래밍 언어에서 데이터 흐름을 쉽게 표현할 수 있어야 하며 기본 실행 모델이 변경 사항을 데이터 흐름을 통해 자동으로 전파한다는 것을 의미.

개발자가 변경된 데이터에 대한 처리를 하지 않아도 데이터 흐름을 통해 자동으로 빠르게 전파하게 된다는 의미이다.

Observer pattern

자 그럼 Observer 패턴에 대해서 살펴보자.

Observer pattern

Observer 패턴은 위와같이 상태변화가 발생하는 Subject(Observable)와 Subject의 상태 변화에 대한 대응을 하는 Observer로 구성이 되어있다.

Subject의 변경사항이 생기면 자동으로 Observer의 update를 호출한다.
이는 Reactive 프로그래밍의 Push-scenario(외부 환경이 어플리케이션에 데이터를 밀어넣는다)방식으로 동작하는 어플리케이션을 말한다.

필요한 데이터를 획득하기 위해서 어플리케이션이 외부 환경에 요청을 하여 데이터를 획득하는 것이 아닌, 어플리케이션은 외부 환경을 관찰하고 있다가 외부 환경에서 데이터 스트림을 방출하면 그것에 반응하여 데이터를 획득하는 것이다.

  • Observable

    외부환경에서 어플리케이션의 내부로 연속적으로 흐르는 데이터(데이터 스트림)를 생성하고 방출하는 객체를 말한다.

  • Observer

    Observable이 방출(omit)한 Notification(Observable이 방출할 수 있는 푸시 기반의 이벤트 또는 값)을 획득하여 사용하는 객체를 말한다.

    Observer는 데이터 소비자로 데이터를 생산하는 Observable을 구독해서 Observable의 상태를 관찰한다. 그리고 Observable이 방출한 notification은 Observer에게 자동으로 전파된다.

    예시)TV 방송국(Observable) / TV(Observer) / 영상정보 프레임(Observable notification)

  • 구현의 관점에서의 구독(Subscribe) 오퍼레이터

    Observable의 subscribe operator를 호출할 때 인자로 Observer를 전달하는 것을 말한다.

    예시
    posts.service.ts

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import { Injectable } from "@angular/core";
    import { Post } from "./post.model";
    import { Subject } from 'rxjs';

    @Injectable({providedIn: 'root'})
    export class PostsService {
    private posts: Post[] = [];
    private postsUpdated = new Subject<Post[]>();
    // service를 fetching하고 있는 컴포넌트들로부터 직접적으로 posts 데이터를 조작하는 것을 피하기 위해
    // 직접 Posts 정보를 가져오지 않고, 복사해서 가져오는 것이 좋다.

    // event-driven approach @Output emitter를 통해서 처리할 수도 있지만,
    // rxjs를 사용해서 처리할 수 있다. (Angular의 일부는 아니지만 core dependency)
    getPosts() {
    return [...this.posts];
    }

    getPostUpdateListener() {
    return this.postsUpdated.asObservable();
    }

    addPost(title: string, content: string) {
    const post: Post = { title, content };
    this.posts.push(post);
    this.postsUpdated.next([...this.posts]);
    }
    }

Observable 생성자를 new 연산자로 호출해서 Observable을 생성하고 인수로 subscriber 함수를 전달한다.
여기서 전달한 subscriber 함수는 Observable의 역할인 데이터 스트림을 생성하고 방출하는 처리를 정의한 함수이다.

subscriber 함수는 생성자의 인자로 전달되고 실행되지 않고, subscribe operator에 의해 Observable이 구독될 때 호출되는 callback함수이다.

observable은 subscribe되기 전까지 동작하지 않는다.

  • subscriber 함수의 구성

    subscriber 함수는 next, error, complete 메소드를 사용해서 notification을 방출한다.
    방출된 notification은 subscribe operator의 인자로 전달되서 Observable을 구독하고 있는 모든 Observer의 next, error, complete 메소드가 방출된 notification에 반응하여 동작한다.

예시
post-list.component.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import { Component, OnInit, Input, OnDestroy } from '@angular/core';
import { Post } from 'src/app/share/post.model';
import { PostsService } from 'src/app/share/posts.service';
import { Subscription } from 'rxjs';

@Component({
selector: 'app-post-list',
templateUrl: './post-list.component.html',
styleUrls: ['./post-list.component.css']
})
export class PostListComponent implements OnInit, OnDestroy {
// service로 대체
// @Input() posts: Post[];
posts: Post[];
private postsSub: Subscription;

constructor(public postsService: PostsService) {
// this.posts = [
// {title: 'First Post', content: 'This is the first post\'s content'},
// {title: 'Second Post', content: 'This is the second post\'s content'},
// {title: 'Third Post', content: 'This is the third post\'s content'},
// ];
this.posts = [];
}

ngOnInit(){
// initial tasks
this.posts = this.postsService.getPosts();
// subscribe에는 3개의 인수를 넣어줄 수 있다.
// (1 arg) 새로운 데이터가 방출되면, 실행되는 함수
// (2 arg) 에러가 발생하는 경우에 호출되는 함수
// (3 arg) Obserable이 완료되었을때 (더이상 기대되는 값이 존재하지 않을때)
this.postsSub = this.postsService.getPostUpdateListener()
// NEW DATA EMITTED, ERROR, COMPLETE
.subscribe((posts: Post[]) => {
this.posts = posts;
});
}

// 메모리 누수를 방지하기 위해서 unsubscribe
ngOnDestroy() {
this.postsSub.unsubscribe();
}
}
  • Observable과 Subject의 차이

    일반적인 Observable은 “passive”한 특성을 가지고 있다.(e.g.wraps callback, event, …) 반면에 Subject는 “active”한 특징(can be triggered from code)을 가지고 있다. Subject는 next()를 통해 방출되는 데이터 스트림을 Observer가 구독함으로써 새로운 데이터가 방출될때 자동으로 업데이트된 새로운 데이터 스트림을 감지할 수 있다.

  • Angular HTTP Client

    데이터베이스에서 데이터를 fetch하기 위해서 Angular에서 기본으로 제공해주는 HTTP Client module을 사용하면 간단하게 처리할 수 있다.

    1
    2
    import { HttpClientModule } from '@angular/common/http';
    <!-- 최상위 module의 import에 추가 -->
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import { Injectable } from "@angular/core";
    import { Post } from "./post.model";
    import { Subject } from 'rxjs';
    import {HttpClient} from '@angular/common/http';
    @Injectable({providedIn: 'root'})
    export class PostsService {
    private posts: Post[] = [];
    private postsUpdated = new Subject<Post[]>();
    // service를 fetching하고 있는 컴포넌트들로부터 직접적으로 posts 데이터를 조작하는 것을 피하기 위해
    // 직접 Posts 정보를 가져오지 않고, 복사해서 가져오는 것이 좋다.

    // event-driven approach @Output emitter를 통해서 처리할 수도 있지만,
    // rxjs를 사용해서 처리할 수 있다. (Angular의 일부는 아니지만 core dependency)
    constructor(private http: HttpClient){}
    getPosts() {
    // return [...this.posts];
    this.http.get<{message: string, posts: Post[]}>('http://localhost:3000/api/posts')
    // new data, error, complete
    .subscribe((postData) => {
    this.posts = postData.posts
    this.postsUpdated.next([...this.posts]);
    });
    }

    getPostUpdateListener() {
    return this.postsUpdated.asObservable();
    }

    addPost(title: string, content: string) {
    const post: Post = { id: null, title, content };
    this.posts.push(post);
    this.postsUpdated.next([...this.posts]);
    }
    }

[참고] 모던 자바스크립트 Deep Dive 13.16 Angular RxJS