Official | HULFT IoT EdgeStreaming Plugin SDK Getting Started Third Edition: July 1, 2021

Implementation of udsf.go

 

Implement the following object (udsf) in udsf.go:

  • Object implementing the UDSF interface of the gopkg.in/sensorbee/sensorbee.v0/core package provided on EdgeStreaming

 

The implementation procedure is as follows:

 

  1. Flow of arithmetic operations

    The udsf object receives the following data:

    • String type parameter: stream_name (name of input stream)

    • operator

    • Floating point type parameter: initial_value (default value)

     

    The udsf object continuously performs operations on the value of the "value" element of the received tuple with the operator specified as the current value (at the start, initial_value (default value)).

     

  2. Define a schema

    Design a JSON schema for the tuple of the udsf object with the following settings:

     

    Schema

    Description

    JSON schema of the tuple (for input)

    The udsf object receives this schema for input.

    Set the schema output by source.go.

    JSON schema of the tuple (for output)

    The udsf object outputs this schema.

    Set the schema received by sink.go.

     

  3. Define a method

    Define a process for each of the following methods of the UDSF interface:

     

    Method name

    Process

    Process method

    Conversion processing of streaming data

    Terminate method

    Close processing

     

    The arithmetic operation processing is implemented in the Process method of the sample UDSF Operation.

     

  4. Import the packages

    Use "sample" as the package name for the UDSF Operation and import the following packages with the import statement:

    • fmt

    • gopkg.in/sensorbee/sensorbee.v0/bql/udf

    • gopkg.in/sensorbee/sensorbee.v0/core

    • gopkg.in/sensorbee/sensorbee.v0/data

     

  5. Create the udsf struct

    Next, create the udsf struct that defines the data structure of the udsf object.

    udsf struct holds as members the float64 type variable "cur" that holds the current value and the operator type variable "ope" that holds the arithmetic operator.

    type operator byte
     
    const (
       none    = ' ' 
       plus    = '+' 
       minus   = '-' 
       times   = '*' 
       divided = '/' 
    ) 
     
    type udsf struct {
       cur float64
       ope operator
    }
    

     

  6. Next, to implement the UDSF interface, add the following methods to the sink struct:

    • Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error

    • Terminate(ctx *core.Context) error

     

    Defer the implementation of the conversion processing and add an empty method that returns "nil" such as the following:

    func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
       return nil
    }
     
    func (u *udsf) Terminate(ctx *core.Context) error {
       return nil
    }
    

     

  7. Next, implement the creation functions and the CreateUDSF function of the udsf object.

     

    The CreateUDSF function is a function that is called when EdgeStreaming Runtime interprets the Select statement of BQL and receives the parameter specified in the BQL as the argument "params" (data.Map type (contents are map type in Go programming language)).

     

    The process to retrieve the necessary values from the map type argument "params" is the same as for source and sink, so this explanation is omitted. However, the following processes limited to the UDSF creation function must be implemented:

    • Retrieve the "stream_name" element (getStreamName function)

    • Set the retrieved "stream_name" as an argument and call decl.Input

    func CreateUDSF(decl udf.UDSFDeclarer, params data.Map) (udf.UDSF, error) {
       inputStream, err := getStreamName(params)
       if err != nil {
          return nil, err
       }
     
       operator, err := getOperator(params)
       if err != nil {
          return nil, err
       }
     
       initialValue, err := getInitialValue(params)
       if err != nil {
          return nil, err
       }
     
       if err := decl.Input(inputStream, nil); err != nil {
          return nil, err
       }
     
       return &udsf{
          ope: operator,
          cur: initialValue,
       }, nil
    }
     
    func getStreamName(params data.Map) (string, error) {
       node, ok := params["stream_name"]
       if !ok {
          return "", fmt.Errorf("stream_name is required")
       }
       streamName, err := data.AsString(node)
       if err != nil {
          return "", fmt.Errorf("stream_name must be a string:%s", err)
       }
       return streamName, nil
    }
     
    func getOperator(params data.Map) (operator, error) {
       node, ok := params["operator"]
       if !ok {
          return none, fmt.Errorf("operator is required")
       }
       operatorStr, err := data.AsString(node)
       if err != nil {
          return none, fmt.Errorf("operator must be a string:%s", err)
       }
     
       switch operatorStr {
       case "plus":
          return plus, nil
       case "minus":
          return minus, nil
       case "times":
          return times, nil
       case "divided":
          return divided, nil
       default:
          return none, fmt.Errorf("invalid oparator")
       }
    }
     
    func getInitialValue(params data.Map) (float64, error) {
       initialValue := 0.0
       node, ok := params["initial_value"]
       if !ok {
          return initialValue, nil
       }
       initialValue, err := data.AsFloat(node)
       if err != nil {
          return initialValue, fmt.Errorf("initial_value is invalid")
       }
       return initialValue, nil
    }
    

     

  8. Next, implement the Process method and Terminate method that perform the actual processing. These methods are generally expressed using the following code (actual processes should be implemented in the passages in red and bold text):

    The Process method is called each time the UDSF object receives a tuple.

    func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
        <receive tuple> 
        <convert data> 
        <write tuple> 
        return nil
    }
     
    func (u *udsf) Terminate(ctx *core.Context) error {
        <close writer> 
        return nil
    }
    

     

  9. Implement the Process method as follows to convert data with the specified arithmetic operator:

    func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error { 
     
       // Retrieve the "payload" element 
       p, ok := tuple.Data["payload"]
       if !ok {
          return fmt.Errorf("the tuple doesn't have the required field: payload")
       }
       payload, err := data.AsMap(p)
       if err != nil {
          return err
       } 
     
       // Retrieve the "value" element 
       v, ok := payload["value"] 
       if !ok {
          return fmt.Errorf("the tuple doesn't have the required field: value")
       }
       value, err := data.AsFloat(v)
       if err != nil {
          return err
       } 
     
       // Generate the arithmetic operation and operation formula 
       var formula string 
       newVal := u.cur
       switch u.ope {
       case plus:
          newVal += value
       case minus:
          newVal -= value
       case times:
          newVal *= value
       case divided:
          newVal /= value
       }
       formula = fmt.Sprintf("%f %s %f", u.cur, string(u.ope), value)
       ctx.Log().Debug("calculate: " + formula) 
     
       // Generate a tuple 
       m := data.Map{ 
          "value":   data.Float(value),
          "formula": data.String(formula),
       } 
     
       if err := w.Write(ctx, core.NewTuple(data.Map{"payload": m})); err != nil { // Output the converted data 
          return err 
       }
       return nil
    }
     
    

     

  10. Next, implement the Terminate method. In this case, there is no need to do anything with the Terminate method.

    func (u *udsf) Terminate(ctx *core.Context) error {
       return nil
    }
    

     

This completes the implementation of UDSF.

 

 

Official | HULFT IoT EdgeStreaming Plugin SDK Getting Started Third Edition: July 1, 2021