読者です 読者をやめる 読者になる 読者になる

がりらぼ

WindowsRuntimeの応援ブログ

【WindowsRuntime+ReactiveExtensions入門】その1 RXとは

ReactiveExtensionsとは

ReactiveExtensionとは、IOvserverとIObservableインターフェースを実装するクラスを使った通信、監視パターンをより簡単に実装を行うことができる拡張メソッドやファクトリメソッドなどのことです。

f:id:garicchi:20140912053351p:plain

IObserverとIObservableインターフェース

IObserverとIObservableの解説の時にも解説しましたが、IObserver、IObservableインターフェースを実装した通信、監視パターンは、「監視されるもの(IObservable)」、「監視するもの(IObserver)」、「監視を解除するもの(IDisposable)」の3つの概念の理解が重要になります。

f:id:garicchi:20140911235912p:plain

監視者(IObserver)が監視されるもの(IObservable)の監視を開始するにはIObservableのSubscribeメソッドを実行します。

f:id:garicchi:20140912001620p:plain

監視されるもの(IObservable)が監視者(IObserver)に作業の進捗を通知するにはIObserverのOnNextメソッドを実行します。

f:id:garicchi:20140912003620p:plain

監視されるもの(IObservable)が監視者(IObserver)に作業のエラーを通知するにはIObserverのOnErrorメソッドを実行します。

f:id:garicchi:20140912011824p:plain

監視されるもの(IObservable)が監視者(IObserver)に作業完了を通知するにはIObserverのOnCompletedメソッドを実行します。

f:id:garicchi:20140912005052p:plain

監視者(IObserver)の監視されるもの(IObservable)に対する監視を解除するには監視解除するものがDisposeメソッドを実行します。

f:id:garicchi:20140912010348p:plain

RXなしで実装してみる

これをRXなしでIObserverとIObservableを1から実装するとこんな感じになります。

//監視人(監視する人)
public class MyObserver : IObserver<int>
{
    //監視対象が処理を完了したとき
    public void OnCompleted()
    {
        Console.WriteLine(" 監視人「すべての作業終了を確認した」");
    }

    //監視対象がエラーを出した時
    public void OnError(Exception error)
    {
        Console.WriteLine(" 監視人「エラー発生を確認した」 {0}", error);
    }

    //監視対象から通知が来た時
    public void OnNext(int value)
    {
        Console.WriteLine(" 監視人「作業{0}を確認した」", value);
    }
}

//労働者(監視される人)
public class MyObservable : IObservable<int>
{
    public MyObserver myObserver;    //自分を監視する人

    //監視される仕事を実行
    public void Execute()
    {

        Console.WriteLine("労働者「作業1をしました」");
        //監視人に通知
        if (myObserver != null)
            myObserver.OnNext(1);

        Console.WriteLine("労働者「作業2をしました」");
        //監視人に通知
        if (myObserver != null)
            myObserver.OnNext(2);

        Console.WriteLine("労働者「作業3をしました」");
        //監視人に通知
        if (myObserver != null)
            myObserver.OnNext(3);

        Console.WriteLine("労働者「すべての作業おわりました」");
        //監視人に完了通知
        if (myObserver != null)
            myObserver.OnCompleted();

    }

    //監視人を割り当てて監視開始
    public IDisposable Subscribe(IObserver<int> observer)
    {
        myObserver = (MyObserver)observer;
        return new MyObserverDisposer(this);
    }
}

//監視人の監視を解除する人
public class MyObserverDisposer : IDisposable
{
    public MyObservable observable;
    public MyObserverDisposer(MyObservable observable)
    {
        this.observable = observable;
    }
    public void Dispose()
    {
        //監視人をはずす
        observable.myObserver = null;
        Console.WriteLine("  削除人「監視人を削除したぞ」");
    }
}
class Program
{
    static void Main(string[] args)
    {
        MyObserver observer = new MyObserver();
        MyObservable observable = new MyObservable();

        //監視人を割り当てる
        IDisposable disporser = observable.Subscribe(observer);

        Console.WriteLine("------------作業開始-----------");
        //監視人がいるのでObserverが反応する
        observable.Execute();

        //監視からはずす
        disporser.Dispose();

        Console.WriteLine("------------作業開始-----------");
        //監視対象が実行しても監視していないからObserverは反応しない
        observable.Execute();

    }
}

Reactive Extensionsの登場

上記のプログラムでは、IObserver、IObservableインターフェースをつかった通信、監視パターンを実装するだけなのに非常にめんどくさいコードを書かなければいけません。

そこで、Reactive Extensionsというライブラリを用いることによって、非常に簡単にIObserver、IObservableパターンを実装することが可能になります。

Reactive ExtensionsはNugetからライブラリをインストールする必要があります。 Nugetで「Reactive Extensions」と検索しましょう。

f:id:garicchi:20140912053933p:plain

Reactive ExtensionsをインストールできたらIObservable、IObserver通信監視パターンを以下のコードで実現することができます。

//Observable(監視されるもの)
IObservable<int> observable=Observable.Range(1, 10);

//監視を削除するDisposer
IDisposable disposer=observable.Subscribe(q =>
{
    //監視者がOnNext通知を受けとったとき
    listBox.Items.Add("OnNext "+q);
}, ex =>
{
    //監視者がOnError通知を受けとったとき
    listBox.Items.Add("OnError");
}, () =>
{
    //監視者がOnCompleted通知を受け取ったとき
    listBox.Items.Add("OnCompleted");
});

//監視解除
disposer.Dispose();

WindowsRuntimeとReactiveExtensionsの記事なのでListBoxを使ってみました。 まずObservableクラスのファクトリメソッドによって、1行でIObservableな監視されるオブジェクトを生成可能になっています。

また、Subscribeメソッドの引数に個々の通知を受け取った時のActionを指定することによってIObserverを実装するクラスを作らなくてもよくなりました。

さらに、Subscribeメソッドの戻り値では、IDisposableを実装するCompositeDisposableクラスが提供されるのでDisposerも新規に作らなくてもよくなりました。

上記のコードを解説すると、

①Observable.Rangeメソッドによって1~10の値を連続的に通知するIObservableが生成される

②Subscribeメソッドに指定するActionで1~10の連続値が通知されたときにListBoxにitemを入れる。

③Subscribeで戻ってきたDisposerのDisposeメソッドを実行することで監視を解除する

という3つの処理をしています。

では実際に実行してみると、Subscribeメソッドを通ったとき、ListBoxにアイテムが追加されまくります。

f:id:garicchi:20140912054942p:plain

もっとわかりやすく書くと、 Observable.Rangeは1~10の数字を渡す通知を連続的に発行するものです。

それをSubscribeで監視することによって、通知が発行されたときにListBoxに追加します。

f:id:garicchi:20140912055646p:plain

このように、RXとは通知が主体となって「自分が監視しているものが通知したときに何をするか」を指定することによってプログラミングを行います。