Official | HULFT IoT EdgeStreaming Plugin SDK Getting Started Third Edition: July 1, 2021
Implementation of udsf.go
Implement the following object (udsf) in udsf.go:
-
Object implementing the UDSF interface of the gopkg.in/sensorbee/sensorbee.v0/core package provided on EdgeStreaming
The implementation procedure is as follows:
-
Flow of arithmetic operations
The udsf object receives the following data:
-
String type parameter: stream_name (name of input stream)
-
operator
-
Floating point type parameter: initial_value (default value)
The udsf object continuously performs operations on the value of the "value" element of the received tuple with the operator specified as the current value (at the start, initial_value (default value)).
-
-
Define a schema
Design a JSON schema for the tuple of the udsf object with the following settings:
Schema
Description
JSON schema of the tuple (for input)
The udsf object receives this schema for input.
Set the schema output by source.go.
JSON schema of the tuple (for output)
The udsf object outputs this schema.
Set the schema received by sink.go.
-
Define a method
Define a process for each of the following methods of the UDSF interface:
Method name
Process
Process method
Conversion processing of streaming data
Terminate method
Close processing
The arithmetic operation processing is implemented in the Process method of the sample UDSF Operation.
-
Import the packages
Use "sample" as the package name for the UDSF Operation and import the following packages with the import statement:
-
fmt
-
gopkg.in/sensorbee/sensorbee.v0/bql/udf
-
gopkg.in/sensorbee/sensorbee.v0/core
-
gopkg.in/sensorbee/sensorbee.v0/data
-
-
Create the udsf struct
Next, create the udsf struct that defines the data structure of the udsf object.
udsf struct holds as members the float64 type variable "cur" that holds the current value and the operator type variable "ope" that holds the arithmetic operator.
type operator byte const ( none = ' ' plus = '+' minus = '-' times = '*' divided = '/' ) type udsf struct { cur float64 ope operator }
-
Next, to implement the UDSF interface, add the following methods to the sink struct:
-
Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error
-
Terminate(ctx *core.Context) error
Defer the implementation of the conversion processing and add an empty method that returns "nil" such as the following:
func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error { return nil } func (u *udsf) Terminate(ctx *core.Context) error { return nil }
-
-
Next, implement the creation functions and the CreateUDSF function of the udsf object.
The CreateUDSF function is a function that is called when EdgeStreaming Runtime interprets the Select statement of BQL and receives the parameter specified in the BQL as the argument "params" (data.Map type (contents are map type in Go programming language)).
The process to retrieve the necessary values from the map type argument "params" is the same as for source and sink, so this explanation is omitted. However, the following processes limited to the UDSF creation function must be implemented:
-
Retrieve the "stream_name" element (getStreamName function)
-
Set the retrieved "stream_name" as an argument and call decl.Input
func CreateUDSF(decl udf.UDSFDeclarer, params data.Map) (udf.UDSF, error) { inputStream, err := getStreamName(params) if err != nil { return nil, err } operator, err := getOperator(params) if err != nil { return nil, err } initialValue, err := getInitialValue(params) if err != nil { return nil, err } if err := decl.Input(inputStream, nil); err != nil { return nil, err } return &udsf{ ope: operator, cur: initialValue, }, nil } func getStreamName(params data.Map) (string, error) { node, ok := params["stream_name"] if !ok { return "", fmt.Errorf("stream_name is required") } streamName, err := data.AsString(node) if err != nil { return "", fmt.Errorf("stream_name must be a string:%s", err) } return streamName, nil } func getOperator(params data.Map) (operator, error) { node, ok := params["operator"] if !ok { return none, fmt.Errorf("operator is required") } operatorStr, err := data.AsString(node) if err != nil { return none, fmt.Errorf("operator must be a string:%s", err) } switch operatorStr { case "plus": return plus, nil case "minus": return minus, nil case "times": return times, nil case "divided": return divided, nil default: return none, fmt.Errorf("invalid oparator") } } func getInitialValue(params data.Map) (float64, error) { initialValue := 0.0 node, ok := params["initial_value"] if !ok { return initialValue, nil } initialValue, err := data.AsFloat(node) if err != nil { return initialValue, fmt.Errorf("initial_value is invalid") } return initialValue, nil }
-
-
Next, implement the Process method and Terminate method that perform the actual processing. These methods are generally expressed using the following code (actual processes should be implemented in the passages in red and bold text):
The Process method is called each time the UDSF object receives a tuple.
func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error { <receive tuple> <convert data> <write tuple> return nil } func (u *udsf) Terminate(ctx *core.Context) error { <close writer> return nil }
-
Implement the Process method as follows to convert data with the specified arithmetic operator:
func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error { // Retrieve the "payload" element p, ok := tuple.Data["payload"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: payload") } payload, err := data.AsMap(p) if err != nil { return err } // Retrieve the "value" element v, ok := payload["value"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: value") } value, err := data.AsFloat(v) if err != nil { return err } // Generate the arithmetic operation and operation formula var formula string newVal := u.cur switch u.ope { case plus: newVal += value case minus: newVal -= value case times: newVal *= value case divided: newVal /= value } formula = fmt.Sprintf("%f %s %f", u.cur, string(u.ope), value) ctx.Log().Debug("calculate: " + formula) // Generate a tuple m := data.Map{ "value": data.Float(value), "formula": data.String(formula), } if err := w.Write(ctx, core.NewTuple(data.Map{"payload": m})); err != nil { // Output the converted data return err } return nil }
-
Next, implement the Terminate method. In this case, there is no need to do anything with the Terminate method.
func (u *udsf) Terminate(ctx *core.Context) error { return nil }
This completes the implementation of UDSF.
Official | HULFT IoT EdgeStreaming Plugin SDK Getting Started Third Edition: July 1, 2021