読者です 読者をやめる 読者になる 読者になる

がりらぼ

WindowsRuntimeの応援ブログ

【WindowsRuntime+ReactiveExtensions入門】その2 いろいろなファクトリメソッド

WindowsRuntime C#

Observable

Reactive ExtensionsにはObservableというクラスがあり、このクラスが様々なIObseravbleを生成するファクトリメソッドを持っています。

Observable.Range()

Observable.Rangeメソッドは第1引数に指定した整数から、第2引数に指定した値までの連続的な通知を発行します。 つまりObservable.Range(1,50)だと50個の通知を連続的に発行するIObservableをつくることができます。

var disposer=Observable.Range(1, 50).Subscribe(q =>
{
    //監視者がOnNext通知を受けとったとき
    listBox.Items.Add("OnNext "+q);
}, ex =>
{
    //監視者がOnError通知を受けとったとき
    listBox.Items.Add("OnError");
}, () =>
{
    //監視者がOnCompleted通知を受け取ったとき
    listBox.Items.Add("OnCompleted");
});

//監視解除
disposer.Dispose();

f:id:garicchi:20140912060622p:plain

Observable.Return()

Observable.Returnは非常にシンプルで、引数に指定した値を「1度だけ通知」します。

var disposer=Observable.Return(3).Subscribe(q =>
{
    //監視者がOnNext通知を受けとったとき
    listBox.Items.Add("OnNext "+q);
}, ex =>
{
    //監視者がOnError通知を受けとったとき
    listBox.Items.Add("OnError");
}, () =>
{
    //監視者がOnCompleted通知を受け取ったとき
    listBox.Items.Add("OnCompleted");
});

//監視解除
disposer.Dispose();

f:id:garicchi:20140912060829p:plain

Observable.Create()

Observable.CreateはIObservableがOnNextで通知をする方法を自由に決定することができます。

最後にはreturnでOnCompletedのあとに実行したいActionを返します。

var disposer = Observable.Create<int>(observer =>
{
    observer.OnNext(3);
    observer.OnNext(5);
    observer.OnNext(2);
    observer.OnNext(1);
    observer.OnNext(9);
    observer.OnCompleted();

    return () => { listBox.Items.Add("通知終了"); };
}).Subscribe(q =>
{
    //監視者がOnNext通知を受けとったとき
    listBox.Items.Add("OnNext " + q);
}, ex =>
{
    //監視者がOnError通知を受けとったとき
    listBox.Items.Add("OnError");
}, () =>
{
    //監視者がOnCompleted通知を受け取ったとき
    listBox.Items.Add("OnCompleted");
});

//監視解除
disposer.Dispose();

f:id:garicchi:20140912061434p:plain

Observable.Timer()

Observable.Timerメソッドは「何秒後から何秒間隔で通知を発行」することができます。

//1秒後から2秒間隔で通知を発行する
var disposer = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(2))
    .Subscribe(async q =>
{
    //監視者がOnNext通知を受けとったとき
    await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
    {
        listBox.Items.Add("OnNext " + q);
    });
    
}, ex =>
{
    //監視者がOnError通知を受けとったとき
    listBox.Items.Add("OnError");
}, () =>
{
    //監視者がOnCompleted通知を受け取ったとき
    listBox.Items.Add("OnCompleted");
});

これはDisposerをDisposeで監視解除することによってタイマーは止まります。

f:id:garicchi:20140912062333p:plain

Observable.Interval()

Observable.IntervalもTimerと同様ですが、Subscribe後すぐに引数の時間間隔で通知を発行します。

//1秒後から2秒間隔で通知を発行する
var disposer = Observable.Interval(TimeSpan.FromSeconds(1))    
    .Subscribe(async q =>
{
    //監視者がOnNext通知を受けとったとき
    await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
    {
        listBox.Items.Add("OnNext " + q);
    });
    
}, ex =>
{
    //監視者がOnError通知を受けとったとき
    listBox.Items.Add("OnError");
}, () =>
{
    //監視者がOnCompleted通知を受け取ったとき
    listBox.Items.Add("OnCompleted");
});

Observable.FromEvent()

Observable.FromEventを使うことで特定のイベントに反応するIObsrvableを作成することが可能です。

//マウスクリックで通知を発行するObservable
var observable = Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(
    h => (s, args) => h(args),
    h => button2.Click += h, 
    h => button2.Click -= h);  

var disposer= observable.Subscribe(q =>
{
    Button button = q.OriginalSource as Button;
    //監視者がOnNext通知を受けとったとき
    listBox.Items.Add("OnNext " + button.Content);
    
}, ex =>
{
    //監視者がOnError通知を受けとったとき
    listBox.Items.Add("OnError");
}, () =>
{
    //監視者がOnCompleted通知を受け取ったとき
    listBox.Items.Add("OnCompleted");
});

f:id:garicchi:20140912140023p:plain

Observable.FromEventは登録したEventHandlerがイベントを発行したとき、OnNext通知をIObserverに送ることができます。

f:id:garicchi:20140912140608p:plain

Observable.Start

Subscribe直後すぐに非同期処理を開始し、処理が終わるとOnNextとOnCompleted通知を送ることができる IObservableです。

//マウスクリックで通知を発行するObservable
var observable = Observable.Start(() =>
{
    for (int i = 0; i < 10000; i++)
    {
        Debug.WriteLine(i);
    }
});  

var disposer= observable.Subscribe(async q =>
{
    //監視者がOnNext通知を受けとったとき
    await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
    {
        listBox.Items.Add("OnNext " + q);
    });
    
    
}, async ex =>
{
    //監視者がOnError通知を受けとったとき
    await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
    {
        listBox.Items.Add("OnError");
    });
}, async() =>
{
    //監視者がOnCompleted通知を受け取ったとき
    await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
    {
        listBox.Items.Add("OnCompleted");
    });
});

バックグラウンドで回す処理としてforループ内で1000回Debug出力しています。

その後、OnNext通知が来ますがこれはUIスレッドではない、バックグラウンドスレッドからくるのでDispatcherスレッドでの処理にもどします。

Observable.ToAsync()

Observable.Startと同じように非同期処理を通知するIObservableを作成することができますが非同期処理はInvokeメソッドを実行するまで開始されません。

//マウスクリックで通知を発行するObservable
var observableFunc = Observable.ToAsync(() =>
{
    for (int i = 0; i < 10000; i++)
    {
        Debug.WriteLine(i);
    }
});

var observable = observableFunc.Invoke();   //Invokeしないと非同期処理は開始されない
var disposer= observable.Subscribe(async q =>
{
    //監視者がOnNext通知を受けとったとき
    await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
    {
        listBox.Items.Add("OnNext " + q);
    });
    
    
}, async ex =>
{
    //監視者がOnError通知を受けとったとき
    await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
    {
        listBox.Items.Add("OnError");
    });
}, async() =>
{
    //監視者がOnCompleted通知を受け取ったとき
    await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
    {
        listBox.Items.Add("OnCompleted");
    });
});

参考資料

ReactiveExtensions入門 - 大田一希

http://www.slideshare.net/okazuki0130/reactive-extensionsv01