【公式】HULFT IoT EdgeStreaming Plugin SDK 開発ガイド_2021年7月1日_第3版発行
udsf.goの実装
udsf.goに、以下のオブジェクト(udsf)を実装します。
-
EdgeStreamingで提供している、gopkg.in/sensorbee/sensorbee.v0/coreパッケージのUDSFインターフェースを実装したオブジェクト
実装手順は以下のとおりです。
-
四則演算の流れ
udsfオブジェクトは、以下のデータを受け取ります。
-
文字列型のパラメータ:stream_name(入力Stream名)
-
operator(演算子)
-
浮動小数点型パラメータ: initial_value(初期値)
udsfオブジェクトは、受け取ったTupleのvalue要素の値を、現在の値(スタート時はinitial_value(初期値))に指定した演算子で、演算し続けます。
-
-
スキーマ―の定義
udsfオブジェクトのTupleのJSONスキーマについて、以下の設定になるように設計します。
スキーマ―
説明
TupleのJSONスキーマ(入力用)
udsfオブジェクトが入力用として受け取るスキーマです。
source.goが出力するスキーマ―を設定します。
TupleのJSONスキーマ(出力用)
udsfオブジェクトが出力するスキーマ―です。
sink.goが受け取るスキーマーを設定します。
-
メソッドの定義
UDSFインターフェースでは、以下の各メソッドに処理を定義します。
メソッド名
処理
Processメソッド
Streamingデータの変換処理
Terminateメソッド
終了処理
SampleのUDSF OperationではProcessメソッド に、四則演算処理を実装します。
-
パッケージのインポート
SampleのUDSF Operationのパッケージ名を”sample”とし、import文で以下のパッケージをインポートします。
-
fmt
-
gopkg.in/sensorbee/sensorbee.v0/bql/udf
-
gopkg.in/sensorbee/sensorbee.v0/core
-
gopkg.in/sensorbee/sensorbee.v0/data
-
-
udsf structの作成
次に、udsfオブジェクトのデータ構造を定義したudsf structを作成します。
udsf structは、現在の値を保持するfloat64型の変数”cur”と、四則演算子を保持するoperator型の変数”ope”をメンバに持ちます。
type operator byte const ( none = ' ' plus = '+' minus = '-' times = '*' divided = '/' ) type udsf struct { cur float64 ope operator }
-
次に、UDSFインターフェースを実装するように、以下のメソッドを、sink structに追加します。
-
Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error
-
Terminate(ctx *core.Context) error
変換処理の実装は後回しにし、以下のような”nil”を返す空のメソッドを追加します。
func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error { return nil } func (u *udsf) Terminate(ctx *core.Context) error { return nil }
-
-
次に、udsfオブジェクトの生成関数、CreateUDSF関数を実装します。
CreateUDSF関数は、EdgeStreaming RuntimeがBQLのSelect文を解釈した際に、コールされる関数で、引数 “params”(data.Map型(中身はGo言語のマップ型))で、BQLに指定したパラメータを受け取ります。
マップ型の引数”params”から、必要な値を取得する処理は、source,sinkと同じなので説明を省略しますが、UDSFの生成関数に限り、以下の処理を必ず実装する必要があります。
-
“stream_name”要素を取得する(getStreamName関数)
-
上記で取得した”stream_name”を引数にして、decl.Inputをコールする
func CreateUDSF(decl udf.UDSFDeclarer, params data.Map) (udf.UDSF, error) { inputStream, err := getStreamName(params) if err != nil { return nil, err } operator, err := getOperator(params) if err != nil { return nil, err } initialValue, err := getInitialValue(params) if err != nil { return nil, err } if err := decl.Input(inputStream, nil); err != nil { return nil, err } return &udsf{ ope: operator, cur: initialValue, }, nil } func getStreamName(params data.Map) (string, error) { node, ok := params["stream_name"] if !ok { return "", fmt.Errorf("stream_name is required") } streamName, err := data.AsString(node) if err != nil { return "", fmt.Errorf("stream_name must be a string:%s", err) } return streamName, nil } func getOperator(params data.Map) (operator, error) { node, ok := params["operator"] if !ok { return none, fmt.Errorf("operator is required") } operatorStr, err := data.AsString(node) if err != nil { return none, fmt.Errorf("operator must be a string:%s", err) } switch operatorStr { case "plus": return plus, nil case "minus": return minus, nil case "times": return times, nil case "divided": return divided, nil default: return none, fmt.Errorf("invalid oparator") } } func getInitialValue(params data.Map) (float64, error) { initialValue := 0.0 node, ok := params["initial_value"] if !ok { return initialValue, nil } initialValue, err := data.AsFloat(node) if err != nil { return initialValue, fmt.Errorf("initial_value is invalid") } return initialValue, nil }
-
-
次に、実処理を行うProcessメソッド、Terminateメソッドを実装します。これらのメソッドの一般的に、以下のようなコードで表現されます(赤太字の箇所を実装します)。
ProcessメソッドはUDSFオブジェクトがtupleを受け取る毎に、コールされます。
func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error { <receive tuple> <convert data> <write tuple> return nil } func (u *udsf) Terminate(ctx *core.Context) error { <close writer> return nil }
-
それでは、実際に、指定した四則演算子で、データの変換するように、Processメソッドを以下のように実装します。
func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error { // payload要素の取得 p, ok := tuple.Data["payload"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: payload") } payload, err := data.AsMap(p) if err != nil { return err } // value要素の取得 v, ok := payload["value"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: value") } value, err := data.AsFloat(v) if err != nil { return err } // 四則演算と演算式の生成 var formula string newVal := u.cur switch u.ope { case plus: newVal += value case minus: newVal -= value case times: newVal *= value case divided: newVal /= value } formula = fmt.Sprintf("%f %s %f", u.cur, string(u.ope), value) ctx.Log().Debug("calculate: " + formula) // Tupleのデータの生成 m := data.Map{ "value": data.Float(value), "formula": data.String(formula), } if err := w.Write(ctx, core.NewTuple(data.Map{"payload": m})); err != nil { // 変換後のデータの出力 return err } return nil }
-
次にTerminateメソッドを実装します。 今回はTerminateメソッドでは何もする必要はありません。
func (u *udsf) Terminate(ctx *core.Context) error { return nil }
これでUDSFの実装が完了です。
【公式】HULFT IoT EdgeStreaming Plugin SDK 開発ガイド_2021年7月1日_第3版発行