パラレルストリーミング処理
パラレルストリーミング処理とは
パラレルストリーミング処理(Parallel Stream Processing、以降はPSPと記載)は、メモリ消費を抑えながら大容量データを高速に処理する機構です。
PSPは、以下のような特徴を持っています。
-
メモリ消費の抑制
入力データをすべてメモリに保持せず、1000件ずつ「読み取り」→「変換」→「書き込み」の処理を行います。
そのため、大量のメモリを必要とすることなく大容量のデータを処理できます。
-
超大容量データ処理
入力データをメモリに保持せず処理するため、理論上、データ容量の制限はありません。
-
パフォーマンスの向上
「読み取り」→「変換」→「書き込み」を順番に処理する場合、CPU資源を有効に使えていません。
PSPを利用することで、マルチスレッドによって読み取り、変換、書き込みの各処理が分散されるため、1つの処理がI/O待ちになっている状態でも、ほかのスレッドで変換などの処理を並行で行えるようになります。
パラレルストリーミング処理のアーキテクチャ
PSPは以下のアーキテクチャで動作します。読み取り処理、変換処理、書き込み処理がブロックごとに別々のスレッドで同時に処理されます。

結果データを生成する(上図では、読み取り処理および変換処理)コンポーネントは、内部的に結果データを格納するブロック(たとえば1000行単位)を2つ保持するようになっています。
結果データを生成するコンポーネントは、データの書き込みが可能な状態(データが消費されている状態)のブロックを検出すると、データの書き込み処理を行います。データの書き込みが可能なブロックを検出できない状態(データが消費されていない状態)では、処理を待機します。
結果データを使用するコンポーネント(上図では、変換処理および書き込み処理)は、入力元のコンポーネントの結果データに、データの読み取りが可能な状態(データの生成が完了した状態)のブロックを検出するとデータを読み取って処理を開始します。データの読み取りが可能なブロックを検出できない状態(データの生成が未完了な状態)では処理を待機します。
処理の流れ
「読み取り」→「変換」→「書き込み」の簡単なサンプルで、処理の流れを処理ステップごとに説明します。
各ステップのデータa、データb、データcは、読み取り用データの1ブロックを表します。
ステップ1: 処理前
-
データa: まだファイルから読み取られていません。
-
データb: まだファイルから読み取られていません。
-
データc: まだファイルから読み取られていません。

ステップ2: 「データa」の読み取り
-
データa: 読み取り処理でファイルから読み取られ、読み取りコンポーネントのブロックに書き込まれます。
-
データb: まだファイルから読み取られていません。
-
データc: まだファイルから読み取られていません。

ステップ3: 「データb」の読み取りと「データa」の変換
-
データa: 変換処理で変換され、変換コンポーネントのブロックに書き込まれます。
-
データb: 読み取り処理でファイルから読み取られ、読み取りコンポーネントのブロックに書き込まれます。
-
データc: まだファイルから読み取られていません。

すべての処理が並行で行われます。
ステップ4: 「データc」の読み取りと「データb」の変換および「データa」の書き込み
-
データa: 書き込み処理で実際にデータがDBに書き込まれます。また、変換処理が完了しているので、読み取り処理のブロックから削除されます。
-
データb: 変換処理で変換され、変換コンポーネントのブロックに書き込まれます。
-
データc: 読み取りコンポーネントのブロックが空いたので、読み取り処理でファイルから読み取られ、読み取りコンポーネントのブロックに書き込まれます。

すべての処理が並行で行われます。
ステップ5: 「データc」の変換と「データb」の書き込み
-
データa: 書き込み処理まで終了しています。
-
データb: 書き込み処理で実際にデータがDBに書き込まれます。また、変換処理が完了しているので、読み取り処理のブロックから削除されます。
-
データc: 変換処理で変換され、変換コンポーネントのブロックに書き込まれます。

すべての処理が並行で行われます。
ステップ6: 「データc」の書き込み
-
データa: 書き込み処理まで終了しています。
-
データb: 書き込み処理まで終了しています。
-
データc: 書き込み処理で実際にデータがDBに書き込まれます。また、変換処理が完了しているので、読み取り処理のブロックから削除されます。

すべての処理が並行で行われます。
このように、データをブロックごとに分割し、それぞれのブロックを並行して処理することで、超大容量データを高速に処理しています。
パラレルストリーミング処理の使用方法
スマートコンパイラにより、スクリプトの内容を自動判別してパラレルストリーミング処理を適用します。
そのため、基本的にはパラレルストリーミングを意識することなくスクリプトを作成できます。
詳細については、「スマートコンパイラ」を参照してください。
パラレルストリーミング処理とスレッド
PSPでは複数のスレッドが協調して処理を行います。
PSPではスクリプトのスレッドのほかに、結果データを生成する(入力元となる)コンポーネントの数だけスレッドが生成されます。結果データを生成しない書き込みコンポーネントに関しては、スクリプトと同じスレッドで動作します。
たとえば、「読み取り」→「変換」→「書き込み」のスクリプトでは、生成するスレッド数は「3」となります。
「読み取り」→「変換」→「変換」→「書き込み」のスクリプトでは、生成するスレッド数は「4」となります。
仕様制限
-
トランザクションが開始していた場合、PSPで実行するコンポーネントはそのトランザクションに属します。
-
トランザクションが開始していない場合、PSPで実行するコンポーネントは専用のトランザクションを開始します。
そのため、同じコネクションリソースを使用するコンポーネントが複数存在し、いずれかをPSPで実行する場合、別々のコネクションを使用します。
-
PSPでは、結果データを複数のコンポーネントの入力元に指定できません。
-
変数Mapperで、入力ドキュメントから変数にマッピングしている場合も入力元への指定にあたります。
-
-
エラーについて
-
PSPでは、読み取りや変換コンポーネントで発生したエラーは、変換コンポーネントを入力元に取るコンポーネントが存在しない場合には、エラーとして処理されません。
たとえば、PSPが適用されている読み取りコンポーネントでエラーが発生しても、スクリプトは成功し、エラーメッセージはログに出力されません。
-
-
PSPで処理を行う読み取りコンポーネントと書き込みコンポーネントの間で、処理対象となるファイルは操作できません。
-
PSPでは、一部のコンポーネント変数が使用できません。
= 参照 =詳細については、各オペレーションのヘルプを参照してください。
-
PSPでは、結果データを作成するコンポーネントが実行された際にスレッドが作成されます。
-
ブロックサイズは変更できません。
-
引数からのデータフローやMapper間のデータフローの場合には、Mapperでスキーマの設定を行うことができますが、テーブルモデル型以外のスキーマに変更すると PSP対応Mapperとして動作しません。
つまり、大容量のデータを扱う場合にはメモリを確保する必要があります。PSPを適用する場合には、Mapperのスキーマをテーブルモデル型以外のスキーマ(XML型)に変更しないようにしてください。
= 参照 =テーブルモデル型およびXML型については、「データモデル 」を参照してください。
注意事項
PSPデータフロー中に条件分岐コンポーネントを配置する場合
PSPの結果データが条件分岐コンポーネントにより使用されなかった場合、パフォーマンスが低下します。
PSPデータフローを無効化することで、パフォーマンス低下を防ぐことができます。
以下のスクリプトは、CSVファイルを一行ずつ読み取り、その行がある条件を満たす場合にCSVファイルに書き込む処理です。マッピングとCSVファイルへの書き込みの間でPSPデータフローが有効になっており、条件に合致しない場合にパフォーマンスが低下します。
