読者です 読者をやめる 読者になる 読者になる

がりらぼ

WindowsRuntimeの応援ブログ

C#の主要インターフェース解説:IObservable<T>、IObserver<T>

IObservable<T>、IObserver<T>

この2つのインターフェースはクラスのオブジェクトどうしの監視と通知を実現するインターフェースです。

LINQのような文法で非同期処理を書くことができるReactive ExtensionsはIObservable<T>とIObserver<T>によって実現され、RXを理解する上でも非常に重要なインターフェースであるのですがその分理解が難しく、監視する人(IObserver)監視される人(IObservable)、さらには監視者の監視を解除する人(IDiposable)の3つの概念を理解するのは非常に難しいです。

そこで本ブログでは、労働者と監視者の関係を用いて、簡単なサンプルコードと一緒に解説していきたいと思います。

登場人物紹介

まず、この解説物語には3人の登場人物が登場し、それぞれ以下のインターフェースを実装します。

  • 労働者「MyObservableくん」 :IObservable<T>
  • 監視人「MyObserverくん」 :IObserver<T>
  • 監視削除人「MyObserverDisposerくん」 :IDisposable

f:id:garicchi:20140911235912p:plain

この3人はそれぞれ以下の3つのクラスで実装されます。

労働者「MyObservableくん」

//労働者(監視される人)
public class MyObservable : IObservable<int>
{
    public MyObserver myObserver;    //自分を監視する人

    //監視される仕事を実行
    public void Execute()
    {

        Console.WriteLine("労働者「作業1をしました」");
        //監視人に通知
        if (myObserver != null)
            myObserver.OnNext(1);

        Console.WriteLine("労働者「作業2をしました」");
        //監視人に通知
        if (myObserver != null)
            myObserver.OnNext(2);

        Console.WriteLine("労働者「作業3をしました」");
        //監視人に通知
        if (myObserver != null)
            myObserver.OnNext(3);

        Console.WriteLine("労働者「すべての作業おわりました」");
        //監視人に完了通知
        if (myObserver != null)
            myObserver.OnCompleted();

    }

    //監視人を割り当てて監視開始
    public IDisposable Subscribe(IObserver<int> observer)
    {
        myObserver = (MyObserver)observer;
        return new MyObserverDisposer(this);
    }
}

監視人「MyObserverくん」

//監視人(監視する人)
public class MyObserver : IObserver<int>
{
    //監視対象が処理を完了したとき
    public void OnCompleted()
    {
        Console.WriteLine(" 監視人「すべての作業終了を確認した」");
    }

    //監視対象がエラーを出した時
    public void OnError(Exception error)
    {
        Console.WriteLine(" 監視人「エラー発生を確認した」 {0}", error);
    }

    //監視対象から通知が来た時
    public void OnNext(int value)
    {
        Console.WriteLine(" 監視人「作業{0}を確認した」", value);
    }
}

監視削除人「MyObserverDisposer」くん

//監視人の監視を解除する人
public class MyObserverDisposer : IDisposable
{
    public MyObservable observable;
    public MyObserverDisposer(MyObservable observable)
    {
        this.observable = observable;
    }
    public void Dispose()
    {
        //監視人をはずす
        observable.myObserver = null;
        Console.WriteLine("  削除人「監視人を削除したぞ」");
    }
}

そして作業全体はMainメソッドに書いています。

class Program
{
    static void Main(string[] args)
    {
        MyObserver observer = new MyObserver();
        MyObservable observable = new MyObservable();

        //監視人を割り当てる
        IDisposable disporser = observable.Subscribe(observer);

        Console.WriteLine("------------作業開始-----------");
        //監視人がいるのでObserverが反応する
        observable.Execute();

        //監視からはずす
        disporser.Dispose();

        Console.WriteLine("------------作業開始-----------");
        //監視対象が実行しても監視していないからObserverは反応しない
        observable.Execute();

    }
}

これらのソースコードと平行しながら解説していきます。

さらに、3人にはそれぞれ役割が存在します。

労働者「MyObservableくん」は3つの作業をする必要があります。

監視人「MyObserverくん」はMyObservableくんを監視します。

監視削除人「MyObserverDisposerくん」はMyObserverくんの監視からMyObservableくんの監視を解除することができます。

f:id:garicchi:20140912000931p:plain

ではこれらの3人がどのように連携して作業をすすめるのかを解説していきます。

監視開始

まず、作業を開始するまえに、MyObservableくんがちゃんと作業を報告できるように監視をつけましょう。

f:id:garicchi:20140912001620p:plain

これは、Mainメソッドで実行されるSubscribeメソッドを利用します。

f:id:garicchi:20140912001946p:plain

では実際にMyObservableクラスのSubscribeメソッドを見てみましょう。

ここではMyObservableのメンバーに自分の監視人であるMyObserverを代入しています。

f:id:garicchi:20140912002207p:plain

このように、監視をつけるには

監視人(MyObserver)を監視対象(MyObservable)のSubscribeメソッドで渡して、MyObservableのメンバーに置く必要があります。

さらに次の行を見ると、監視削除人MyObserverDisposerを作成しています。

MyObserverDisposerはどうなっているかというと、自身のメンバにMyObservableを保持しています。

f:id:garicchi:20140912002638p:plain

つまり、監視開始するには、監視削除人も一緒に作成されます。

f:id:garicchi:20140912002757p:plain

このように、監視を開始するためにしなければいけないことは、

  • IObservableのSubscribeメソッドでIObserverを渡しメンバとして保持する
  • IObservableのSubscribeメソッドで監視削除人を作成し、returnする

の2つをしなければいけません。

作成された削除管理人「MyOnserverDisposer」はMainメソッドに返されます。

f:id:garicchi:20140912001946p:plain

以上の操作をすることで監視を開始することができます。

f:id:garicchi:20140912002757p:plain

作業を開始する

では労働者に監視をつけることができたので労働者に作業をさせてみましょう。

労働者「MyObservableくん」の作業は3つありますが、各作業が終わるたびに、監視人にOnNext()という形で作業が終わったことを知らせます。

f:id:garicchi:20140912003620p:plain

このように作業の進捗状況を監視することができるのがIObserverインターフェースの特徴です。

ではどのようなコードでこれが実現されているかを見ます。

Mainメソッドを見ると、MyObservableがExecuteメソッドを実行して作業を開始しています。

f:id:garicchi:20140912004014p:plain

1つの作業完了を報告する

ではMyObservableのExecuteメソッドを見てみます。

作業が終わるごとにMyObserverのOnNext()メソッドを呼んでいます。

このように、IObservableからIObserverのOnNextメソッドを呼ぶことによって、監視人に作業状況を報告しています。

これで監視人も監視ができるわけです。

f:id:garicchi:20140912004119p:plain

ではMyObserverのOnNextメソッドを見てみましょう。

f:id:garicchi:20140912004345p:plain

正しく監視対象から通知がくれば、このメソッドが実行されるはずです。

ここまでのコンソール出力を見てみます。

f:id:garicchi:20140912004526p:plain

ちゃんと、作業したあとに監視人にOnNextで通知を送っていることが確認できました。

ではExecuteメソッドでは3つの作業をおこなうのでこのまま作業を進めます。

作業2も同様にOnNextメソッドで通知を行います。

f:id:garicchi:20140912004730p:plain

f:id:garicchi:20140912004837p:plain

すべての作業完了を通知する

では作業3を終わらせて全ての作業を終わらせました。

まずはOnNextメソッドで作業3の終了報告が行われます。

f:id:garicchi:20140912005001p:plain

そのあと、すべての作業が終了したらOnCompletedメソッドを実行して、全ての作業が終了したことを報告する義務がIObservableにはあります。

f:id:garicchi:20140912005052p:plain

MyObserverクラスのOnCompletedメソッドを見てみましょう。

全ての作業終了通知を受け取っています。

f:id:garicchi:20140912005203p:plain

コンソール出力を見てみると、OnCompletedメソッドが呼ばれていることがわかります。

f:id:garicchi:20140912005328p:plain

ここまでの流れがIObserverとIObservableによる通知、監視の流れです。

実際にはIObserverにはOnCompleted、OnError、OnNextの3つのメソッド

f:id:garicchi:20140912005912p:plain

IObservableにはSubscribeメソッドの実装が必須とされています。

f:id:garicchi:20140912005918p:plain

実際に実装してみると一番簡単にしてもこんな感じになるのがIObserverとIObservableです。

IObservableが、IObserverのOnNextの呼び方によって進捗報告も変わるし、進捗報告されたときのIObserverの挙動も買えることができます。

つまりIObserverとIObservableインターフェースとは、処理の進捗報告、監視の実装を保証するインターフェースであるとも言えます。

監視を外してみる

監視をすることができるということは監視を外すこともできるということです。

Subscribe時に発生したMyOvserverDisposerくんはDisposeメソッドによってMyObserverくんからMyObservableくんへの監視を解除することができます。

f:id:garicchi:20140912010348p:plain

Mainメソッドを見てみると、MyObservableが第1回目の作業を終了したあと、Disposeメソッドを呼んでいます。

f:id:garicchi:20140912010609p:plain

ではMyOvserverDisposerのDisposeメソッドを見てみましょう。

f:id:garicchi:20140912010806p:plain

MyObserverDisposerはSubscribe時にコンストラクタ引数でMyObservableをメンバーとして保持しています。 つまりMyObservable(監視される人)のMyObserver(監視者)をnullにしています。

MyObservableのExecuteメソッドを見てみると、MyObserver(監視者)がnullの場合通知を発生させていません。

f:id:garicchi:20140912011038p:plain

つまり、この場合MyOvserverDisposerがDisposeメソッドを実行すると、MyObservableくんがいくら作業しても監視者に通知はされないということです。

これは監視を解除したと同義になります。

ではMainメソッドではもう一度MyObservableくんが作業をしていますがMyObserverくんには通知されないことをコンソール出力で確認しましょう。

f:id:garicchi:20140912011305p:plain

第1回目の作業では監視人が通知を確認していますが監視人の監視を削除した第2回目の作業では監視人が監視をしていないことが確認できました。

f:id:garicchi:20140912011618p:plain

このようにIObserverとIObservableには監視人の監視を登録、削除もサポートしており、通知の監視に柔軟性を持たせています。

作業中に例外が発生した場合

もしかしたら労働者「MyObservableくん」が作業中にたいへんなことになるかもしれません。

そのような例外も、MyObservableくんはきちんと監視指定なければいけません。

f:id:garicchi:20140912011824p:plain

今回は実装していませんがそのような場合、IObserverのOnErrorメソッドに例外情報をのせてIObserverに通知することで作業中にエラーが起きたことを報告することができます。

まとめ

IObserverとIObservableインターフェースとは

IObserver(監視者)とIObservable(監視される人)の間で作業の進捗監視や報告を行うためのインターフェースです。

今回は単一の作業者に単一の監視者で実装しましたが、複数のIObservableが同一のIObserverを持つことによって複数の作業者を単一の監視者が監視、監視解除を柔軟におこなうことができます。

f:id:garicchi:20140912012513p:plain

これを応用して、非同期処理の進捗報告を監視してLINQのようなメソッドチェーンで非同期処理を実行できるReactiveExtensionsは非常に強力なテクノロジーですが、根本はこのIObserverとIObservableです。

ソースコード

using IObservableSample;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace IObservableSample
{
    //監視人(監視する人)
    public class MyObserver : IObserver<int>
    {
        //監視対象が処理を完了したとき
        public void OnCompleted()
        {
            Console.WriteLine(" 監視人「すべての作業終了を確認した」");
        }

        //監視対象がエラーを出した時
        public void OnError(Exception error)
        {
            Console.WriteLine(" 監視人「エラー発生を確認した」 {0}", error);
        }

        //監視対象から通知が来た時
        public void OnNext(int value)
        {
            Console.WriteLine(" 監視人「作業{0}を確認した」", value);
        }
    }

    //労働者(監視される人)
    public class MyObservable : IObservable<int>
    {
        public MyObserver myObserver;    //自分を監視する人

        //監視される仕事を実行
        public void Execute()
        {

            Console.WriteLine("労働者「作業1をしました」");
            //監視人に通知
            if (myObserver != null)
                myObserver.OnNext(1);

            Console.WriteLine("労働者「作業2をしました」");
            //監視人に通知
            if (myObserver != null)
                myObserver.OnNext(2);

            Console.WriteLine("労働者「作業3をしました」");
            //監視人に通知
            if (myObserver != null)
                myObserver.OnNext(3);

            Console.WriteLine("労働者「すべての作業おわりました」");
            //監視人に完了通知
            if (myObserver != null)
                myObserver.OnCompleted();

        }

        //監視人を割り当てて監視開始
        public IDisposable Subscribe(IObserver<int> observer)
        {
            myObserver = (MyObserver)observer;
            return new MyObserverDisposer(this);
        }
    }

    //監視人の監視を解除する人
    public class MyObserverDisposer : IDisposable
    {
        public MyObservable observable;
        public MyObserverDisposer(MyObservable observable)
        {
            this.observable = observable;
        }
        public void Dispose()
        {
            //監視人をはずす
            observable.myObserver = null;
            Console.WriteLine("  削除人「監視人を削除したぞ」");
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
            MyObserver observer = new MyObserver();
            MyObservable observable = new MyObservable();

            //監視人を割り当てる
            IDisposable disporser = observable.Subscribe(observer);

            Console.WriteLine("------------作業開始-----------");
            //監視人がいるのでObserverが反応する
            observable.Execute();

            //監視からはずす
            disporser.Dispose();

            Console.WriteLine("------------作業開始-----------");
            //監視対象が実行しても監視していないからObserverは反応しない
            observable.Execute();

        }
    }
}