【公式】HULFT IoT EdgeStreaming Plugin SDK 開発ガイド_2021年7月1日_第3版発行

sink.goの実装

sink.goに、以下のオブジェクト(sink)を実装します。

  • EdgeStreamingで提供しているgopkg.in/sensorbee/sensorbee.v0/coreパッケージのSinkインターフェースを実装したオブジェクト

 

実装手順は以下のとおりです。

 

  1. Tupleの設計

    sinkオブジェクトは、以下の手順でTupleを設定します。

    1. 整数型のパラメータ”decimal”を受け取る

    2. ”decimal”の有効小数点桁数で、受け取った値を標準出力に出力

    3. 以下ようなJSONデータのスキーマのデータを持つTupleを受け取るように設計を実施

      {
        "type": "object",
        "required": ["payload"],
        "properties": {
          "payload": {
            "type": "object",
            "required": ["value", "formula"],
            "properties": {
              "value": {
                "type": "number"
              }
              "formula": {
                "type": "string"
              }
            }
          }
        }
      }
      

      = 備考 =

      sinkオブジェクトは、後述する演算機能を実装したUDSFからのStreamingデータを受け取って出力することを想定しているため、出力する値(value要素)に加えて、演算式(formula要素)を受け取るように設計されています。

     

  2. メソッドの定義

    Sinkインターフェースは、以下の各メソッドに処理を定義します。

     

    メソッド名

    処理

    Writeメソッド

    Streamingデータの出力処理

    Closeメソッド

    終了処理

    SampleのSink OperationではWriteメソッド に、有効小数点桁数指定出力処理を実装します。

     

  3. パッケージのインポート

    SampleのSink Operationのパッケージ名を”sample”とし、import文で以下のパッケージをインポートします。

    • fmt

    • math

    • gopkg.in/sensorbee/sensorbee.v0/bql

    • gopkg.in/sensorbee/sensorbee.v0/core

    • gopkg.in/sensorbee/sensorbee.v0/data

     

  4. sink structの作成

    次に、sinkオブジェクトのデータ構造を定義したsink structを作成します。

    sink structは、有効小数点桁数を表すint型の変数decimalをメンバに持ちます。

    type sink struct {
       decimal int
    }
    

    = 備考 =

    struct名に制限はありません。本マニュアルでは便宜上、「sink」としています。

     

  5. 次に、Sinkインターフェースを実装するように、以下のメソッドを、sink structに追加します。

    • Write(ctx *core.Context, tuple *core.Tuple) error

    • Close(ctx *core.Context) error

     

     

    出力処理の実装は後回しにし、以下のような”nil”を返す空のメソッドを追加します。

    func (s *sink) Write(ctx *core.Context, tuple *core.Tuple) error {
          return nil
    }
     
    func (s *sink) Close(ctx *core.Context) error {
       return nil
    }
    

     

  6. 次に、sink オブジェクトの生成関数、CreateSink関数を実装します。

    func CreateSink(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
       decimal, err := getDecimal(params)
       if err != nil {
          return nil, err
       }
     
       return &sink{
          decimal: decimal,
       }, nil
    }
     
    func getDecimal(params data.Map) (int, error) {
       node, ok := params["decimal"]
       if !ok {
          return 0, fmt.Errorf("decimal is required")
       }
       decimal, err := data.AsInt(node)
       if err != nil {
          return 0, fmt.Errorf("decimal must be a int:%s", err)
       }
       return int(decimal), nil
    }
    

    CreateSink関数は、EdgeStreaming RuntimeがBQLのCreate Sink文を解釈した際に、コールされる関数で、引数 “params”(data.Map型(中身はGo言語のマップ型))で、BQLに指定したパラメータを受け取ります。

     

    getDecimal関数は、マップ型から、要素”decimal(有効小数点桁数)”を取得しています。CreateSink関数では、getDecimal関数をコールして、”decimal”を取得し、その”decimal”を持つsink structを生成しています。

     

  7. 次に、実処理を行うWriteメソッド、Closeメソッドを実装します。

    これらのメソッドの一般的に、以下のようなコードで表現されます(赤太字の箇所を実装します)。

    func (s *source) Write(ctx *core.Context, tuple *core.Tuple) error {
        <receive tuple> 
        <write tuple> 
        return nil
    }
     
    func (s *source) Close(ctx *core.Context) error {
        <close writer> 
        return nil
    }
    

    Writeメソッドはsinkオブジェクトがtupleを受け取る毎に、コールされます。

     

  8. それでは、実際に、指定した有効小数点桁数で、標準出力に出力するように、Writeメソッドを以下のように実装します。

    func (s *sink) Write(ctx *core.Context, tuple *core.Tuple) error {
     
       // payload要素の取得
       p, ok := tuple.Data["payload"]
       if !ok {
          return fmt.Errorf("the tuple doesn't have the required field: payload")
       }
       payload, err := data.AsMap(p)
       if err != nil {
          return err
       }
     
       // value要素の取得
       v, ok := payload["value"]
       if !ok {
          return fmt.Errorf("the tuple doesn't have the required field: value")
       }
       value, err := data.AsFloat(v)
       if err != nil {
          return err
       }
     
       // formula要素の取得
       f, ok := payload["formula"]
       if !ok {
          return fmt.Errorf("the tuple doesn't have the required field: formula")
       }
       formula, err := data.AsString(f)
       if err != nil {
          return err
       }
     
       // 有効小数点桁数丸め処理
       shift := math.Pow(10, float64(s.decimal))
       value = math.Floor(value*shift) / shift
     
       // ログに出力
       ctx.Log().Infof("formula: %s", formula)
       ctx.Log().Infof("value: %f", value)
     
       return nil
    }
    

     

  9. 次にCloseメソッドを実装しますが、今回はCloseメソッドでは何もする必要はありません。

    func (s *sink) Close(ctx *core.Context) error {
       return nil
    }
    

 

これでsinkの実装が完了です。

 

 

 

【公式】HULFT IoT EdgeStreaming Plugin SDK 開発ガイド_2021年7月1日_第3版発行