都道府県マスタ、ジャンルマスタ、商品マスタなどのデータが必ず揃っていなければいけない仕様のアプリ場合、最初のアプリ起動時にデータをダウンロードさせるというケースは少なくないと思います。
全てのデータが揃っていないとアプリ成り立たない場合は、全てのダウンロードの完了を待つ必要がありますよね。フラグで管理して待つというのもアリなんでしょうけど、Windows PhoneではせっかくReactive Extensions(Rx)が使えるのでイベントを合成してまとめて処理出来るようにしてしまいましょう。
HttpWebRequestをRxでラッピングして使う方法に関しては、HttpWebRequest/HttpWebResponseを使ってWebページを取得するやHttpWebRequestを使ってPOSTメソッドでリクエストするをお読みください。
日本語でRxとHttpWebRequestについて書かれている一番良いものは @neuecc のReactive Extensions用のWebRequest拡張メソッドです。
ここでは、Observable.Zipメソッド、Observable.ForkJoinメソッド、Observable.Mergeメソッドを使って処理をまとめてみましょう。
Zip:2個のレスポンスが返ってくるのを待つ
Observable.Zipメソッドは、並列処理の結合を行います。
例えば、AとBの2つのリクエストがあります。

これら2つのレスポンスが返ってくるまで待ちます。AとBの処理が完了した時点で、Zipメソッドの第三引数に指定したFunctionが実行され、1つの流れとなります。
では、実際のソースコードを見てみましょう。
// 通信を行ったのち、アクセスしたURLを返す意味無しメソッド private IObservable<string> GetRequestObservable(string url) { var req = HttpWebRequest.CreateHttp(url); return Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)() .Select(res => new StreamReader(res.GetResponseStream()).ReadToEnd()) .Select(text => url); } // Observable.Zipはイベントの結合を行う private void button1_Click(object sender, RoutedEventArgs e) { var ioA = GetRequestObservable("http://www.kantei.go.jp/"); var ioB = GetRequestObservable("http://www.gov-online.go.jp/"); Observable.Zip(ioA, ioB, (left, right) => { return string.Format("{0}\n{1}", left, right); }) .ObserveOnDispatcher() .Subscribe(msg => MessageBox.Show(msg)); }
このコードを実行するとどうなるでしょうか。
メッセージボックスには、”http://www.kantei.go.jp/”と”http://www.gov-online.go.jp/”が同時に表示されます。
Forkjoin:複数個のレスポンスが返ってくるのを待つ
Observable.Forkjoinメソッドは、並列同時実行を行います。
例えば、AとBとCのリクエストがあります。

ABCの3つのレスポンスが返ってくるのを待った後、1つの流れとなります。
では、実際のソースコードを見てみましょう。ForkJoinメソッドの次のSelectメソッドはstring[]型で配列を受け取って、string.Joinメソッドでひとつにしています。
// 通信を行ったのち、アクセスしたURLを返す意味無しメソッド private IObservable<string> GetRequestObservable(string url) { var req = HttpWebRequest.CreateHttp(url); return Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)() .Select(res => new StreamReader(res.GetResponseStream()).ReadToEnd()) .Select(text => url); } // Observable.ForkJoinは並列処理の実行を行う private void button2_Click(object sender, RoutedEventArgs e) { var ioA = GetRequestObservable("http://www.kantei.go.jp/"); var ioB = GetRequestObservable("http://www.gov-online.go.jp/"); var ioC = GetRequestObservable("http://www.ndl.go.jp/"); Observable.ForkJoin(ioA, ioB, ioC) .Select(urls => string.Join("\n", urls)) .ObserveOnDispatcher() .Subscribe(msg => MessageBox.Show(msg)); }
このコードを実行するとどうなるでしょうか。
メッセージボックスには、”http://www.kantei.go.jp/”と”http://www.gov-online.go.jp/”と”http://www.ndl.go.jp/”が同時に表示されます。
Observable.ZipメソッドとObservable.ForkJoinメソッドは、両方とも処理の完了を待ってIO
先ほど紹介したZipメソッドは、AとBが両方ともIObservable
Merge:レスポンスが返ってきたら1つの流れにまとめる
Observable.Mergeメソッドは、ForkjoinやZipと違い、イベントの完了を待ちません。流れを1つの流れにまとめるだけです。
例えば、AとBの2つのリクエストがあります。

AとBのうち、レスポンスが返ってきた順でひとつの流れに合流します。例えるならば川が合流してひとつの大きな川になるイメージです。
// 通信を行ったのち、アクセスしたURLを返す意味無しメソッド private IObservable<string> GetRequestObservable(string url) { var req = HttpWebRequest.CreateHttp(url); return Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)() .Select(res => new StreamReader(res.GetResponseStream()).ReadToEnd()) .Select(text => url); } // Observable.Mergeは処理の流れを1本化する private void button3_Click(object sender, RoutedEventArgs e) { var ioA = GetRequestObservable("http://www.kantei.go.jp/"); var ioB = GetRequestObservable("http://www.gov-online.go.jp/"); Observable.Merge(ioA, ioB) .ObserveOnDispatcher() .Subscribe(msg => MessageBox.Show(msg)); }
このコードを実行するとどうなるでしょうか。
AかBの先に通信処理が終わった方のURLが、まずメッセージボックスで表示されて、もうひとつのURLがメッセージボックスで表示されます。







