본문 바로가기
Unity/UniRx & UniTask

UniRx의 작동 원리 (3) Observable을 다루는 방법

by Pretty Garbage 2022. 11. 27.

★Observable을 생성하는 방법

Observable을 만드는 방법은 여러가지가 있습니다.

  1. 스트림 소스가 되는 오브젝트 (ex Subject<T> 등)을 사용한다.
  2. 팩토리 메소드를 사용한다.
  3. UniRx가 제공하는 Observable을 그대로 이용한다.
  4. 데이터 구조에서 변환하기

각각 만드는 방법에 대해서는 아래에 기술합니다.

 

 

Observable에서 사용 가능한 메시지

■ OnNext 메시지

Observable에서 메시지를 전달하는 경우 기본으로 OnNext 메시지를 사용합니다.

IObservable<T>의 T(type)이 의미가 없을 경우 UniRx.Unit 유형을 사용 합니다.

 

//남은 시간
        [SerializeField] private float countTimeSeconds = 30f;
        
        //Async subject = OnComplete가 발행될 때까지 기다렸다가 마지막 OnNext만 통지해준다.
        private readonly AsyncSubject<Unit> _onTimeUpAsyncSubject = new();
        private IDisposable _disposable;
        
        //제한 시간이 끝나면 이벤트를 통지하는 Observable
        public IObservable<Unit> OnTimeUpAsyncSubject => _onTimeUpAsyncSubject;
        
        private void Start()
        {
            //제한 시간이 끝나면 통지한다.
            _disposable = Observable
                .Timer(TimeSpan.FromSeconds(countTimeSeconds))
                .Subscribe(_ =>
                {
                    //타이머가 끝나면 Unit형의 메시지를 발행한다.
                    _onTimeUpAsyncSubject.OnNext(Unit.Default);
                    _onTimeUpAsyncSubject.OnCompleted();
                });
        }

        private void OnDestroy()
        {
            //Observable이 아직 Dispose가 안되었다면, 
            _disposable?.Dispose();
            
            //AsyncSubject도 Dispose해준다.
            _onTimeUpAsyncSubject.Dispose();
        }

 

●위에서 .OnSubscribe()에서 _=> 와 같은 람다식이 보이는데 

Observable에서 관찰하고 OnNext에서 보내는 값이 필요 없다는 의미입니다. 

이해하기 쉽게 함수를 1개 만들었는데 매개변수가 없다는 의미와 비슷하겠네요.

 

■OnError 

이 메시지는 Observable에서 처리하는 동안 예외가 발생했음을 알립니다.

이 메시지가 발행되면 Observable은 작동을 중지합니다.

 

■OnComplete

이 메시지는 Observable에서 더 이상의 메시지를 발행하지 않습니다.

즉 이 Observable에서 임무를 완수하고 종료됨을 의미합니다. Observable의 종류에 따라서

OnComplete를 발행하지 않고 OnNext를 무한히 발행하기도 합니다.

 

Observable을  파기하는 방법

스트림을 만들어 메시지를 발행하기 시작한 Observable은 더 이상 사용하지 않을 때에는

스트림을 파기하고, 버려야합니다. Observable의 파기를 하지 않는다면, 다른 내용을 간섭하거나

리소스 낭비를 일으키게 됩니다.

 

Observable을 파기하는 방법은 다음과 같습니다.

  1. OnComplete / OnError 를 발행합니다.
  2. Subscribe()에서 addTo를 이용해 Dispose를 등록해줍니다.
  3. 스트림 소스의 Dispose를 실행합니다.

 

■OnComplete / OnError를 발행하는 방법

 

private void OnComplete()
        {
            //데이터 소스 작성
            var subject = new Subject<int>();
            
            //Observable의 구독
            subject.Subscribe(
                x => Debug.Log("OnNext:" + x),
                ex => Debug.Log("OnError:" + ex),
                () => Debug.Log("OnComplete")
            );
            
            subject.OnNext(1);
            subject.OnNext(2);
            subject.OnNext(3);
            
            //메시지 발행이 끝났으면 완료 메시지를 보내주고
            subject.OnCompleted();
            
            //해지 해준다.
            subject.Dispose();
        }

        private void OnError()
        {
            //데이터 소스 작성
            var subject = new Subject<string>();
            
            //Observable의 구독
            subject
                .Select(s => int.Parse(s))
                .Subscribe(
                x => Debug.Log("OnNext:" + x),
                ex => Debug.Log("OnError:" + ex),
                () => Debug.Log("OnComplete")
            );
            
            subject.OnNext("1");
            subject.OnNext("2");
            subject.OnNext("three"); //여기서 int로 파싱이 안되기 때문에 에러가 발생한다.
            subject.OnNext("4");
            
            //하지만 Subject는 아직 살아 있기 때문에 다시 구독하면 재활용이 가능
            subject
                .Subscribe(
                    Debug.Log,
                    Debug.Log,
                    () => Debug.Log("OnComplete")
                );


            //해지 해준다.
            subject.Dispose();
        }

■Subscribe()의 Dispose()를 발행하기

 

IObservable<T>의 Subscribe()의 반환값은 IDisposable 을 반환하게 되어있습니다.

IDisposable 인터페이스는 오브젝트의 파기 (리소스 해제)에 사용하는 인터페이스로

System 네임스페이스 정의되어있으며, UniRx와 관계없이 C# 개발에 사용되는 인터페이스입니다.

위에서 Subscribe()의 반환값은 IDisposable이라고 했습니다.

그 반환값에 Dispose()를 해주면 해지 됩니다.

 

var subject = new Subject<int>();

            IDisposable disposableA = subject
                .Subscribe(x => Debug.Log(x.ToString()));
            var disposableB = subject
                .Subscribe(x => Debug.Log(x.ToString()));
            
            disposableA.Dispose();
            disposableB.Dispose();

■ 스트림 소스의 Dispose()를 실행하기

 

스트림 소스라는 것은 Subject<T> 나 ReactiveProperty<T> 같은 Observable을 상속받는 객체를 의미합니다.

스트림 소스의 Dispose()를 호출할 경우, 그 후 생성된 Observable을 모두 파기 가능합니다.

Observable의 누수를 방지하기 위해서라도 Dispose를 해야합니다.

 

var subject2 = new Subject<string>();
subject2.Dispose();
var reactiveProperty = new ReactiveProperty<int>();
reactiveProperty.Dispose();

 

 

위와 같이 해지 시키는 방법에 대해서 알아 보았는데 각각의 차이가 있습니다.

 

◆OnComplete(OnError) 메시지를 발행하는 방법

  • 메시지를 받는 Observable쪽이 해제된다.
  • OnComplete나 OnError메시지를 발행한 Subject는 재사용 할 수 없습니다.

◆Subscribe().Dispose()

  • 여러 구독을 하는 경우 일정 구독 처리만 중단할 수 있다.
  • 원래 Subject가 유효하면 재구독가능
  • IObserver<T>.OnComplete()는 실행되지 않는다. (OnComplete 처리를 해준게 없으니까)

◆스트림소스의 Dispose()

  • 그 소스로부터 생성되어진 Observable이 모두 해제된다.
  • 역시 IObservable<T>.OnComplete()가 실행되지 않는다.
  • await 처리를 한 경우에도 중단된다.

 

※간단하게 AddTo(gameObject)를 사용해도 됩니다!

생명주기 즉 Dispose의 시점을 게임오브젝트의 라이프사이클에 맞춰서 해지됩니다.

GameObject의 경우에는 OnDestroy() 타이밍

CompositeDisposable의 Dispose() 타이밍이라고 하는데 이건 잘모르겠습니다. 어떤 경우인지