【公式】HULFT IoT EdgeStreaming Plugin SDK 開発ガイド_2021年7月1日_第3版発行

source.goの実装

source.goに、以下のオブジェクト(source)を実装します。

  • EdgeStreamingで提供している、gopkg.in/sensorbee/sensorbee.v0/coreパッケージのSourceインターフェースを実装したオブジェクト

 

実装手順は以下のとおりです。

 

  1. Tupleの生成

    sourceオブジェクトは、以下の手順でTupleを生成します。

    1. 整数型のパラメータ”interval”を受け取る

    2. ”interval”の時間間隔で疑似乱数を生成

    3. 以下のようなJSONデータのスキーマのデータを持つTupleを生成

      {
        "type": "object",
        "required": ["payload"],
        "properties": {
          "payload": {
            "type": "object",
            "required": ["value"],
            "properties": {
              "value": {
                "type": "number"
              }
            }
          }
        }
      }
      

      出力されるTupleのJSONデータの例を以下に示します。

      {"payload": {"value": 3.5423242}}

       

     

  2. メソッドの定義

    Sourceインターフェースでは、以下の各メソッドに処理を定義します。

     

    メソッド名

    処理

    GenerateStreamメソッド

    Streamingデータの生成処理

    Stopメソッド

    終了処理

     

    SampleのSource OperationではGenerateStreamメソッド に、本機能である疑似乱数を指定した時間間隔で生成する部分を記述します。

     

     

  3. パッケージのインポート

    SampleのSource Operationのパッケージ名を”sample”とし、import文で以下のパッケージをインポートします。

    • math/rand

    • time

    • gopkg.in/sensorbee/sensorbee.v0/bql

    • gopkg.in/sensorbee/sensorbee.v0/core

    • gopkg.in/sensorbee/sensorbee.v0/data

     

  4. source structの作成

    sourceオブジェクトのデータ構造を定義したsource struct を作成します。

     

    source structは、以下のように、time.Duration型の変数intervalと、終了通知用の空のチャネル型の変数termをメンバに持ちます。

    type source struct {
       interval time.Duration
       term     chan struct{}
    }
    

    source structは以下のメンバを持ちます。

     

    変数

    time.Duration型

    Streamingデータの生成処理

    空のチャネル型

    終了処理

     

     

     

  5. 次に、Sourceインターフェースを実装するように、以下のメソッドを、sourceオブジェクトに追加します。

    • GenerateStream(ctx *Context, w Writer) error

    • Stop(ctx *Context) error

     

  6. 疑似乱数の生成処理の実装は後回しにし、以下のような”nil”を返す空のメソッドを追加します。

    func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error {
       return nil
    }
    
    func (s *source) Stop(ctx *core.Context) error {
       return nil
    }
    

     

  7. 次に、sourceオブジェクトの生成関数、CreateSource関数を実装します。

    func CreateSource(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Source, error) {
       interval, err := getInterval(params)
       if err != nil {
          return nil, err
       }
     
       return &source{
          interval: interval,
          term:     make(chan struct{}),
       }, nil
    }
     
    func getInterval(params data.Map) (time.Duration, error) {
       interval := 1 * time.Second 
       if v, ok := params["interval"]; ok {
          i, err := data.ToDuration(v)
          if err != nil {
             return interval, err
          }
          interval = i
       }
       return interval, nil
    }
    

    CreateSource関数は、EdgeStreaming RuntimeがBQLのCreate Source文を解釈した際に、コールされる関数で、引数 “params”(data.Map型(中身はGo言語のマップ型))で、BQLに指定したパラメータを受け取ります。

     

    getInterval関数は、マップ型から、要素”interval(生成間隔)”を取得しています。

    CreateSource関数では、getInterval関数をコールして、”interval”を取得し、その”interval”と終了通知用の空のチャネルを持つsource structを生成しています。

     

  8. 次に、実処理を行うGenerateStreamメソッド、Stopメソッドを実装します。これらのメソッドの一般的に、以下のようなコードで表現されます(赤太字の箇所を実装します)。

    func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error {
        <initialization> 
            defer func() {
            <clean up> 
        }()
    
         for <check stop> {
            t := <create a new tuple> 
            if err := w.Write(ctx, t); err != nil {
                return err        }
        }
        return nil
    }
     
    func (s *source) Stop(ctx *core.Context) error {
        <turn on a stop flag> 
        <wait until GenerateStream stops> 
        return nil
    }
    

     

    上記コードが示すように、GenerateStreamメソッドはStreamの開始時に一度コールされ、メソッドが戻り値を返すまで、Tupleを生成し続けます。

     

  9.  

  10. それでは、実際に、一定時間間隔で、疑似乱数を生成するように、GenerateStreamメソッドを以下のように実装します。

    func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error {
       rand.Seed(time.Now().UnixNano()) // 初期化処理で、疑似乱数のシードを設定
       
       next := time.Now()
       for {
          val := rand.Float64()
     
          // Tupleの生成処理 
          m := data.Map{"value": data.Float(val)}
          t := core.NewTuple(data.Map{"payload": m})
     
          if s.interval > 0 {
             t.Timestamp = next
          }
          ctx.Log().Debug("generation:", val) // ログの出力処理 
          if err := w.Write(ctx, t); err != nil { // Tupleの出力 
             return err
          }
     
          // 指定した時間間隔の経過と終了通知を待つ 
          if s.interval > 0 {
             now := time.Now()
             next = next.Add(s.interval)
             if next.Before(now) {
                next = now.Add(s.interval)
             }
             select {
             case <-s.term:
                return core.ErrSourceStopped
             case <-time.After(next.Sub(now)):
             }
          }
       }
       return nil
    }
    

     

  11. 次に、Stopメソッドを以下のように実装します。

    func (s *source) Stop(ctx *core.Context) error {
       s.term <- struct{}{}
       return nil
    }
    

 

これでsourceの実装が完了です。

 

 

 

【公式】HULFT IoT EdgeStreaming Plugin SDK 開発ガイド_2021年7月1日_第3版発行