酢ろぐ!

カレーが嫌いなスマートフォンアプリプログラマのブログ。

Reactive Extensionsを使って複数個のHttpWebRequestのレスポンスをまとめる

都道府県マスタ、ジャンルマスタ、商品マスタなどのデータが必ず揃っていなければいけない仕様のアプリ場合、最初のアプリ起動時にデータをダウンロードさせるというケースは少なくないと思います。

全てのデータが揃っていないとアプリ成り立たない場合は、全てのダウンロードの完了を待つ必要がありますよね。フラグで管理して待つというのもアリなんでしょうけど、Windows PhoneではせっかくReactive Extensions(Rx)が使えるのでイベントを合成してまとめて処理出来るようにしてしまいましょう。

HttpWebRequestをRxでラッピングして使う方法に関しては、「HttpWebRequest/HttpWebResponseを使ってWebページを取得する」や「HttpWebRequestを使ってPOSTメソッドでリクエストする」をお読みください。

日本語でRxとHttpWebRequestについて書かれている一番良い記事はのいえさん(@neuecc)の「neue cc - Reactive Extensions用のWebRequest拡張メソッド」だと思います。

ここでは、Observable.Zipメソッド、Observable.ForkJoinメソッド、Observable.Mergeメソッドを使って処理をまとめてみましょう。

ForkJoin:複数個のレスポンスが返ってくるのを待つ

Observable.ForkJoinメソッドは、並列同時実行を行います。

例えば、AとBとCのリクエストがあります。

ABCの3つのレスポンスが返ってくるのを待った後、1つの流れとなります。

では、実際のソースコードを見てみましょう。ForkJoinメソッドの次のSelectメソッドはstring[]型で配列を受け取って、string.Joinメソッドでひとつにしています。

// 通信を行ったのち、アクセスしたURLを返す意味無しメソッド
private IObservable<string> GetRequestObservable(string url) {
    var req = HttpWebRequest.CreateHttp(url);
    return Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)()
     .Select(res => new StreamReader(res.GetResponseStream()).ReadToEnd())
     .Select(text => url);
}

// Observable.ForkJoinは並列処理の実行を行う
private void button2_Click(object sender, RoutedEventArgs e) {
    var ioA = GetRequestObservable("http://www.kantei.go.jp/");
    var ioB = GetRequestObservable("http://www.gov-online.go.jp/");
    var ioC = GetRequestObservable("http://www.ndl.go.jp/");

    Observable.ForkJoin(ioA, ioB, ioC)
        .Select(urls => string.Join("\n", urls))
        .ObserveOnDispatcher()
        .Subscribe(msg => MessageBox.Show(msg));
}

このコードを実行するとどうなるでしょうか。

メッセージボックスには、"http://www.kantei.go.jp/"と"http://www.gov-online.go.jp/"と"http://www.ndl.go.jp/"が同時に表示されます。

Zip:2個のレスポンスが返ってくるのを待つ

Observable.Zipメソッドは、2つのイベントの結合を行います。

例えば、AとBの2つのリクエストがあります。

これら2つのレスポンスが返ってくるまで待ちます。AとBの処理が完了した時点で、Zipメソッドの第三引数に指定したFunctionが実行され、1つの流れとなります。

では、実際のソースコードを見てみましょう。

// 通信を行ったのち、アクセスしたURLを返す意味無しメソッド
private IObservable<string> GetRequestObservable(string url) {
    var req = HttpWebRequest.CreateHttp(url);
    return Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)()
     .Select(res => new StreamReader(res.GetResponseStream()).ReadToEnd())
     .Select(text => url);
}

// Observable.Zipはイベントの結合を行う
private void button1_Click(object sender, RoutedEventArgs e) {
    var ioA = GetRequestObservable("http://www.kantei.go.jp/");
    var ioB = GetRequestObservable("http://www.gov-online.go.jp/");

    Observable.Zip(ioA, ioB, (left, right) => {
            return string.Format("{0}\n{1}", left, right);
        })
        .ObserveOnDispatcher()
        .Subscribe(msg => MessageBox.Show(msg));
}

このコードを実行するとどうなるでしょうか。

メッセージボックスには、"http://www.kantei.go.jp/"と"http://www.gov-online.go.jp/"が同時に表示されます。

Observable.ZipメソッドとObservable.ForkJoinメソッドは、両方とも処理の完了を待ってIOを合成します。違いは何かというと、異なる型の処理の完了を待ちたいのか、同じ型の処理の完了を待ちたいかでしょうか。

先ほど紹介したZipメソッドは、AとBが両方ともIObservableだった為、例としてあまりふさわしくないのですが、Aの戻り値がIObservable、Bの戻り値がIObservableだったとしても第三引数に指定するselectorで合成することができます。

Merge:レスポンスが返ってきたら1つの流れにまとめる

Observable.Mergeメソッドは、ForkjoinやZipと違い、イベントの完了を待ちません。流れを1つの流れにまとめるだけです。

例えば、AとBの2つのリクエストがあります。

AとBのうち、レスポンスが返ってきた順でひとつの流れに合流します。例えるならば川が合流してひとつの大きな川になるイメージです。

private IObservable

このコードを実行するとどうなるでしょうか。

AかBの先に通信処理が終わった方のURLが、まずメッセージボックスで表示されて、もうひとつのURLがメッセージボックスで表示されます。