본문 바로가기
Unity/UniRx & UniTask

UniRx의 작동 원리 (2) - Observable

by Pretty Garbage 2022. 11. 20.

Observer 객체를 묶어서 관리하기 위한 Subject

 

Subject 객체는 IObservable<T>를 구현하고 있는 객체입니다. 

Subject는 Observer를 등록할 수 있으며 이벤트가 발생할 때 마다 각 Observer에게 메시지를 발행할 수 있습니다.

 

Observer패턴이 구현이 된다면 반드시 이 Subject 객체에 통지해야 합니다.

 

하지만 ! 이 Subject 객체를 스스로 구현하는 것은 어렵습니다. (UniRx를 이용하지 않을 경우)

 

그래서 UniRx가 이 Subject 객체의 구현을 제공해 줍니다.

 

 

 

Subject의 UniRx 구현 [ UniRx.Subject<T>]

Subject<T>의 사용

Subject는 주로 '이벤트 메시지를 발행할 때에 사용하는 오브젝트'로서 이용합니다. 이벤트 중심의 FP(Functional Programming)을 만들고 싶다면 이 Subject를 이용하면 좋습니다.

Subject<T>의 특징

Subject<T>는 IObserver<T>와 IObservable<T>의 2개를 동시에 구현하고 있습니다.

즉, Subject<T>는 이벤트 메시지의 입력과 출력을 동시에 실시할 수 있습니다! 

이 말이 무엇이냐 하면 이 Subject 객체가 다른 Subject의 IObserver가 될 수도 있다는 이야기입니다.

 

또한 Subject<T>는 임의의 IObserver를 여러 개 등록하고 유지할 수 있다는 성질을 가지고 있습니다.

 

https://qiita.com/yutorisan/items/844eaeab392abf03ce80 포스트 이미지 참고

일본어가 안되어도 괜찮습니다! 대충 이미지로서 Subject는 IObserver로서 역할도 하지만 IObservable로서 다른 여러개의 IObserver들에게 이벤트 통지가 가능합니다.

 

UniRx에서는 ISubject<T> 인터페이스라는 것을 정의하고, 이 인터페이스를 상속받아 구현함으로서

Subject 오브젝트로서 기능을 합니다.  ISubject<T>의 정의 아래와 같습니다.

 

using System;
using System.Collections.Generic;
using System.Text;

namespace UniRx
{
    public interface ISubject<TSource, TResult> : IObserver<TSource>, IObservable<TResult>
    {
    }

    public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>
    {
    }
}
//UniRx GitHub에서 긁어옴!

만약 Subject<T>를 추상적으로 취급할 필요한 상황이 있다면 ISubject<T>를 이용하면 됩니다.

 

예시)

public class PrintLogObserver<T> : IObserver<T>
    {
        //수신할 때 메시지를 로그로 출력하는 Observer
        public void OnCompleted()
        {
            Debug.Log("OnCompleted!");
        }

        public void OnError(Exception error)
        {
            Debug.Log(error);
        }

        public void OnNext(T value)
        {
            Debug.Log(value.ToString());
        }
    }

IObserver를 상속받아 각 해당 이벤트에 내부 구현을 해준다.

 

public class CountDownEventProvider : MonoBehaviour
    {
        //카운트할 시간
        [SerializeField] private int countSeconds = 10;
        //Subject 인스턴스
        private Subject<int> _subject;
        //Subject의 IObservable 인터페이스부분만 공개 한다.
        public IObservable<int> CountDownObservable => _subject;

        private void Awake()
        {
            //Subject 생성
            _subject = new Subject<int>();
            
            //카운트다운할 코루틴을 실행
            StartCoroutine(CountCoroutine());
        }

        //카운트 다운을 실행하고 메시지를 발행
        private IEnumerator CountCoroutine()
        {
            var current = countSeconds;

            while (current > 0)
            {
                _subject.OnNext(current);
                current--;
                yield return new WaitForSeconds(1.0f);
            }
            
            //마지막에 0이 되면 OnCompleted 메시지를 발행한다.
            _subject.OnNext(0);
            _subject.OnCompleted();
        }

        private void OnDestroy()
        {
            //스트림 관찰 해제는 이렇게 해도 되고
            _subject.Dispose();
            //선언부에서 아래와 같이 라이프사이클을 gameObject와 맞춰도 된다.
            //_subject.AddTo(gameObject);
        }
    }

카운트다운 이벤트가 실질적으로 이루어지는 이벤트 제공 클래스를 구현합니다. 주 로직은 Subject<T>에 해당 시간동안 값을 발행하는 것입니다.

 

자! 이제 이벤트 제공자도 IObserver도 구현했으니 연결해봅시다.

 

public class ObserverEventComponent : MonoBehaviour
    {
        [SerializeField] private CountDownEventProvider countDownEventProvider;
        //Observer의 인스턴스
        private PrintLogObserver<int> _printLogObserver;
        private IDisposable _disposable;

        private void Start()
        {
            _printLogObserver = new PrintLogObserver<int>();

            //Subject의 Subscribe를 불러낼 때 observer를 등록한다.
            _disposable = countDownEventProvider
                .CountDownObservable
                .Subscribe(_printLogObserver);
        }

        private void OnDestroy()
        {
            //GameObject 파괴되면 이벤트 중단
            _disposable?.Dispose();
        }
    }

 

이와 같이 Subject<T>를 이용함으로서 임의의 타이밍에 자유롭게 이벤트를 발생할 수 있습니다. (위는 카운트 다운에서 OnNext(current) 부분에서 1초마다 한번씩 발생하고 있다.)

 

Observer를 등록하려면 Subject<T>의 IObservable<T>를 사용하여 등록합니다. 

IObservable<T>.Subscribe()에 IObserver<T>를 구현한 오브젝트를 건네주고 이벤트를 등록하면 구독이 시작됩니다.

 

부가설명으로 [SerializeField]를 통해 CountDownEventProvier의 인스턴스에 접근하여 CountDownObservable 즉 Subject에 접근하여 구독하는 것을 Start()에서 구현해주고 있습니다. 그리고 구독되어져서 들어오는 인터페이스 종류에 따라 OnNext, OnError, OnComplete로 분기하여 실행이 되겠습니다.

 

더 간단한 Subscribe()의 구현

IObservable<>.Subscribe()메서드는 인수에 IObserver<>를 제공해야 했습니다. 따라서 IObserver<T>를 구현한 객체를 준비하지 않으면 메시지를 구독할 수 없습니다. 하지만 매번 IObserver객체를 준비시키고 제공하기에는 수고가 많이 듭니다.

따라서, UniRx에서는 각 메시지의 처리에 대한 함수를 대리자로 등록할 수 있습니다.

예시로 위에서 구현한 ObserveEventComponent를 바꿔보겠습니다.

public class ObserverEventComponent : MonoBehaviour
    {
        [SerializeField] private CountDownEventProvider countDownEventProvider;
        //Observer의 인스턴스
        private PrintLogObserver<int> _printLogObserver;
        private IDisposable _disposable;

        private void Start()
        {
            //Subject의 Subscribe를 불러낼 때 observer를 등록한다.
            _disposable = countDownEventProvider
                .CountDownObservable
                .Subscribe(
                	x => Debug.Log(x), //OnNext
                    ex => Debug.LogError(ex), //OnError
                    () => Debug.Log("Completed") //OnCompleted
                );
        }

        private void OnDestroy()
        {
            //GameObject 파괴되면 이벤트 중단
            _disposable?.Dispose();
        }
    }

위와 같이 구현한다면 PrintLogObserver<int>같은 Subject<T>클래스를 따로 구현해줄 필요는 없다.

 

정리

Observer 패턴

  • 이벤트 통지에 사용되는 디자인 패턴.
  • Observer와 Subject의 2개의 오브젝트가 등장한다.

Observer 객체

  • Observer 패턴에서 이벤트 메시지를 수신하는 오브젝트
  • 이벤트 리스너/이벤트 핸들러에 해당된다.

Subject 객체

  • Observer패턴에서 이벤트 메시지를 발행하는 주체가 되는 객체
  • Subject객체에는 Observer객체를 등록할 수 있다. (중요!)
  • 이벤트 발행시에 등록된 모든 Observer에게 메시지를 전달할 수 있다.

IObserver 인터페이스

  • Observer를 구현하기 위한 인터페이스
  • "이벤트 메시지를 수신할 수 있다." 는 동작이 정의된다.

IObservable 인터페이스

  • Observer를 받아들이기 위한 인터페이스
  • 이벤트 메시지의 구독 상대를 등록하는 행동이 정의된다. 

ISubject 인터페이스

  • Subject인 것을 나타내는 인터페이스
  • 정의는 IObserver<T>와 IObservable<T>의 조합이다.

 

Observer패턴으로부터 Observable의 생각

Observer패턴의 사고방식을 확장하여 UniRx의 독자적인 사고방식으로 이야기를 펼쳐갑시다.

이를 위해서는 Operator와 Scheduler의 개념을 이해해야합니다.

 

■ Operator : IObservable<T>가 출력이고 IObserver<T>가 입력이라면. 두 가지 인터페이스를

구현하고 "입력된 메시지를 출력한다"라는 매커니즘을 생각해보면, 이 2개의 인터페이스 사이에 오퍼레이터라는 녀석들을 끼워 넣을 수 있습니다. 모든 메시지를 수신하는게 아니라 필터를 하거나 합성을하거나 무시할 수 있습니다.

오퍼레이터의 종류에 대해서는 나중에 다른 포스팅으로 소개해드리도록 하겠습니다만 UniRx로 검색하면 무수히 많은 자료들이 소개됩니다.

 

● Operator와 Subject의 동작 차이

Operator와 Subject<T>는 모두 IObserver<T>와 IObservable<T>를 구현합니다. 그 때문에 입 출력이 가능한 오브젝트로서 행동하지만, 결정적으로 다른 부분이 있습니다.

Subject<T> 내부에는 List<<IObserver<T>> 를 갖고 있어서 여러번 Subscribe되면 구독하고 있는 모든 Observer에게 공통으로

메시지를 보내지만 Operator는 그런 List를 갖고 있지 않기 때문에 무조건 IObservable<T>와 IObserver<T>를 연결하는 데에만 사용할 수 있습니다. 그래서 오퍼레이터를 여러개 체이닝해서 사용할 수 있지만 IObservable 즉 입력이 없으면 동작할 수 없습니다.

 

■ Scheduler

스케쥴러는 메시지의 처리 타이밍이나 실행 Thread를 제어하기 위한 도구입니다.

실행에 시간 계측이 필수인 팩토리 메소드나 Operator는 내부에서 이 Scheduler를 이용해 시간의 계측을 실시하고 있습니다.

예를 들자면 Observable. Timer 및 Observable. Interval과 같은 팩토리 메서드와 Delay 및 timeout과 같은 오퍼레이터가 적용됩니다. 또한 스케쥴러는 메시지의 실행 컨텍스트를 제어하는 기능도 가지고 있습니다. 스케쥴러를 사용해 처리의 실행 컨텍스트를 MainThread에서 돌리거나 다른 스레드풀에서 돌리게 하는 처리도 가능합니다. 역시 다른 포스팅에서 자세히 소개합니다.

 

Observable

IObserver<T>와 IObservable<T>를 결합하여 메시지가 흐르도록 처리가 된 체인을 만들 수 있습니다.

이러한 메시지를 처리하는 일련의 흐름을 Observable이라고 합니다.

다른 말로 하면, Observable은 Subject와 Operator를 체인시킴으로서 생성되는 메시지 처리의 덩어리라고 할 수 있습니다.

 

다음 포스팅은 Observable에 관하여 어떻게 동작하는지 작성해보고자 합니다!