udsf.goの実装

 

udsf.goに、以下のオブジェクト(udsf)を実装します。

  • EdgeStreamingで提供している、gopkg.in/sensorbee/sensorbee.v0/coreパッケージのUDSFインターフェースを実装したオブジェクト

 

実装手順は以下のとおりです。

 

  1. 四則演算の流れ

    udsfオブジェクトは、以下のデータを受け取ります。

    • 文字列型のパラメータ:stream_name(入力Stream名)

    • operator(演算子)

    • 浮動小数点型パラメータ: initial_value(初期値)

     

    udsfオブジェクトは、受け取ったTupleのvalue要素の値を、現在の値(スタート時はinitial_value(初期値))に指定した演算子で、演算し続けます。

     

  2. スキーマ―の定義

    udsfオブジェクトのTupleのJSONスキーマについて、以下の設定になるように設計します。

     

    スキーマ―

    説明

    TupleのJSONスキーマ(入力用)

    udsfオブジェクトが入力用として受け取るスキーマです。

    source.goが出力するスキーマ―を設定します。

    TupleのJSONスキーマ(出力用)

    udsfオブジェクトが出力するスキーマ―です。

    sink.goが受け取るスキーマーを設定します。

     

  3. メソッドの定義

    UDSFインターフェースでは、以下の各メソッドに処理を定義します。

     

    メソッド名

    処理

    Processメソッド

    Streamingデータの変換処理

    Terminateメソッド

    終了処理

     

    SampleのUDSF OperationではProcessメソッド に、四則演算処理を実装します。

     

  4. パッケージのインポート

    SampleのUDSF Operationのパッケージ名を”sample”とし、import文で以下のパッケージをインポートします。

    • fmt

    • gopkg.in/sensorbee/sensorbee.v0/bql/udf

    • gopkg.in/sensorbee/sensorbee.v0/core

    • gopkg.in/sensorbee/sensorbee.v0/data

     

  5. udsf structの作成

    次に、udsfオブジェクトのデータ構造を定義したudsf structを作成します。

    udsf structは、現在の値を保持するfloat64型の変数”cur”と、四則演算子を保持するoperator型の変数”ope”をメンバに持ちます。

    type operator byte
     
    const (
       none    = ' ' 
       plus    = '+' 
       minus   = '-' 
       times   = '*' 
       divided = '/' 
    ) 
     
    type udsf struct {
       cur float64
       ope operator
    }
    

     

  6. 次に、UDSFインターフェースを実装するように、以下のメソッドを、sink structに追加します。

    • Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error

    • Terminate(ctx *core.Context) error

     

    変換処理の実装は後回しにし、以下のような”nil”を返す空のメソッドを追加します。

    func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
       return nil
    }
     
    func (u *udsf) Terminate(ctx *core.Context) error {
       return nil
    }
    

     

  7. 次に、udsfオブジェクトの生成関数、CreateUDSF関数を実装します。

     

    CreateUDSF関数は、EdgeStreaming RuntimeがBQLのSelect文を解釈した際に、コールされる関数で、引数 “params”(data.Map型(中身はGo言語のマップ型))で、BQLに指定したパラメータを受け取ります。

     

    マップ型の引数”params”から、必要な値を取得する処理は、source,sinkと同じなので説明を省略しますが、UDSFの生成関数に限り、以下の処理を必ず実装する必要があります。

    • “stream_name”要素を取得する(getStreamName関数)

    • 上記で取得した”stream_name”を引数にして、decl.Inputをコールする

    func CreateUDSF(decl udf.UDSFDeclarer, params data.Map) (udf.UDSF, error) {
       inputStream, err := getStreamName(params)
       if err != nil {
          return nil, err
       }
     
       operator, err := getOperator(params)
       if err != nil {
          return nil, err
       }
     
       initialValue, err := getInitialValue(params)
       if err != nil {
          return nil, err
       }
     
       if err := decl.Input(inputStream, nil); err != nil {
          return nil, err
       }
     
       return &udsf{
          ope: operator,
          cur: initialValue,
       }, nil
    }
     
    func getStreamName(params data.Map) (string, error) {
       node, ok := params["stream_name"]
       if !ok {
          return "", fmt.Errorf("stream_name is required")
       }
       streamName, err := data.AsString(node)
       if err != nil {
          return "", fmt.Errorf("stream_name must be a string:%s", err)
       }
       return streamName, nil
    }
     
    func getOperator(params data.Map) (operator, error) {
       node, ok := params["operator"]
       if !ok {
          return none, fmt.Errorf("operator is required")
       }
       operatorStr, err := data.AsString(node)
       if err != nil {
          return none, fmt.Errorf("operator must be a string:%s", err)
       }
     
       switch operatorStr {
       case "plus":
          return plus, nil
       case "minus":
          return minus, nil
       case "times":
          return times, nil
       case "divided":
          return divided, nil
       default:
          return none, fmt.Errorf("invalid oparator")
       }
    }
     
    func getInitialValue(params data.Map) (float64, error) {
       initialValue := 0.0
       node, ok := params["initial_value"]
       if !ok {
          return initialValue, nil
       }
       initialValue, err := data.AsFloat(node)
       if err != nil {
          return initialValue, fmt.Errorf("initial_value is invalid")
       }
       return initialValue, nil
    }
    

     

  8. 次に、実処理を行うProcessメソッド、Terminateメソッドを実装します。これらのメソッドの一般的に、以下のようなコードで表現されます(赤太字の箇所を実装します)。

    ProcessメソッドはUDSFオブジェクトがtupleを受け取る毎に、コールされます。

    func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
        <receive tuple> 
        <convert data> 
        <write tuple> 
        return nil
    }
     
    func (u *udsf) Terminate(ctx *core.Context) error {
        <close writer> 
        return nil
    }
    

     

  9. それでは、実際に、指定した四則演算子で、データの変換するように、Processメソッドを以下のように実装します。

    func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
     
       // payload要素の取得
       p, ok := tuple.Data["payload"]
       if !ok {
          return fmt.Errorf("the tuple doesn't have the required field: payload")
       }
       payload, err := data.AsMap(p)
       if err != nil {
          return err
       }
     
       // value要素の取得 
       v, ok := payload["value"]
       if !ok {
          return fmt.Errorf("the tuple doesn't have the required field: value")
       }
       value, err := data.AsFloat(v)
       if err != nil {
          return err
       }
     
       // 四則演算と演算式の生成 
       var formula string
       newVal := u.cur
       switch u.ope {
       case plus:
          newVal += value
       case minus:
          newVal -= value
       case times:
          newVal *= value
       case divided:
          newVal /= value
       }
       formula = fmt.Sprintf("%f %s %f", u.cur, string(u.ope), value)
       ctx.Log().Debug("calculate: " + formula)
     
       // Tupleのデータの生成 
       m := data.Map{
          "value":   data.Float(value),
          "formula": data.String(formula),
       }
     
       if err := w.Write(ctx, core.NewTuple(data.Map{"payload": m})); err != nil { // 変換後のデータの出力 
          return err
       }
       return nil
    }
     
    

     

  10. 次にTerminateメソッドを実装します。 今回はTerminateメソッドでは何もする必要はありません。

    func (u *udsf) Terminate(ctx *core.Context) error {
       return nil
    }
    

     

これでUDSFの実装が完了です。