【公式】HULFT IoT EdgeStreaming Plugin SDK 開発ガイド_2021年7月1日_第3版発行
sink.goの実装
sink.goに、以下のオブジェクト(sink)を実装します。
-
EdgeStreamingで提供しているgopkg.in/sensorbee/sensorbee.v0/coreパッケージのSinkインターフェースを実装したオブジェクト
実装手順は以下のとおりです。
-
Tupleの設計
sinkオブジェクトは、以下の手順でTupleを設定します。
-
整数型のパラメータ”decimal”を受け取る
-
”decimal”の有効小数点桁数で、受け取った値を標準出力に出力
-
以下ようなJSONデータのスキーマのデータを持つTupleを受け取るように設計を実施
{ "type": "object", "required": ["payload"], "properties": { "payload": { "type": "object", "required": ["value", "formula"], "properties": { "value": { "type": "number" } "formula": { "type": "string" } } } } }
= 備考 =sinkオブジェクトは、後述する演算機能を実装したUDSFからのStreamingデータを受け取って出力することを想定しているため、出力する値(value要素)に加えて、演算式(formula要素)を受け取るように設計されています。
-
-
メソッドの定義
Sinkインターフェースは、以下の各メソッドに処理を定義します。
メソッド名
処理
Writeメソッド
Streamingデータの出力処理
Closeメソッド
終了処理
SampleのSink OperationではWriteメソッド に、有効小数点桁数指定出力処理を実装します。
-
パッケージのインポート
SampleのSink Operationのパッケージ名を”sample”とし、import文で以下のパッケージをインポートします。
-
fmt
-
math
-
gopkg.in/sensorbee/sensorbee.v0/bql
-
gopkg.in/sensorbee/sensorbee.v0/core
-
gopkg.in/sensorbee/sensorbee.v0/data
-
-
sink structの作成
次に、sinkオブジェクトのデータ構造を定義したsink structを作成します。
sink structは、有効小数点桁数を表すint型の変数decimalをメンバに持ちます。
type sink struct { decimal int }
= 備考 =struct名に制限はありません。本マニュアルでは便宜上、「sink」としています。
-
次に、Sinkインターフェースを実装するように、以下のメソッドを、sink structに追加します。
-
Write(ctx *core.Context, tuple *core.Tuple) error
-
Close(ctx *core.Context) error
出力処理の実装は後回しにし、以下のような”nil”を返す空のメソッドを追加します。
func (s *sink) Write(ctx *core.Context, tuple *core.Tuple) error { return nil } func (s *sink) Close(ctx *core.Context) error { return nil }
-
-
次に、sink オブジェクトの生成関数、CreateSink関数を実装します。
func CreateSink(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) { decimal, err := getDecimal(params) if err != nil { return nil, err } return &sink{ decimal: decimal, }, nil } func getDecimal(params data.Map) (int, error) { node, ok := params["decimal"] if !ok { return 0, fmt.Errorf("decimal is required") } decimal, err := data.AsInt(node) if err != nil { return 0, fmt.Errorf("decimal must be a int:%s", err) } return int(decimal), nil }
CreateSink関数は、EdgeStreaming RuntimeがBQLのCreate Sink文を解釈した際に、コールされる関数で、引数 “params”(data.Map型(中身はGo言語のマップ型))で、BQLに指定したパラメータを受け取ります。
getDecimal関数は、マップ型から、要素”decimal(有効小数点桁数)”を取得しています。CreateSink関数では、getDecimal関数をコールして、”decimal”を取得し、その”decimal”を持つsink structを生成しています。
-
それでは、実際に、指定した有効小数点桁数で、標準出力に出力するように、Writeメソッドを以下のように実装します。
func (s *sink) Write(ctx *core.Context, tuple *core.Tuple) error { // payload要素の取得 p, ok := tuple.Data["payload"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: payload") } payload, err := data.AsMap(p) if err != nil { return err } // value要素の取得 v, ok := payload["value"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: value") } value, err := data.AsFloat(v) if err != nil { return err } // formula要素の取得 f, ok := payload["formula"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: formula") } formula, err := data.AsString(f) if err != nil { return err } // 有効小数点桁数丸め処理 shift := math.Pow(10, float64(s.decimal)) value = math.Floor(value*shift) / shift // ログに出力 ctx.Log().Infof("formula: %s", formula) ctx.Log().Infof("value: %f", value) return nil }
-
次にCloseメソッドを実装しますが、今回はCloseメソッドでは何もする必要はありません。
func (s *sink) Close(ctx *core.Context) error { return nil }
次に、実処理を行うWriteメソッド、Closeメソッドを実装します。
これらのメソッドの一般的に、以下のようなコードで表現されます(赤太字の箇所を実装します)。
func (s *source) Write(ctx *core.Context, tuple *core.Tuple) error { <receive tuple> <write tuple> return nil } func (s *source) Close(ctx *core.Context) error { <close writer> return nil }
Writeメソッドはsinkオブジェクトがtupleを受け取る毎に、コールされます。
これでsinkの実装が完了です。
【公式】HULFT IoT EdgeStreaming Plugin SDK 開発ガイド_2021年7月1日_第3版発行