Implementation of sink.go
Implement the following object (sink) in sink.go:
-
Object implementing the Sink interface of the gopkg.in/sensorbee/sensorbee.v0/core package provided on EdgeStreaming
The implementation procedure is as follows:
-
Design a tuple
The sink object sets a tuple with the following procedure:
-
Receives the integer type parameter "decimal"
-
Rounds the received value down to the valid decimal place ("decimal") and outputs the value in standard output
-
Implements a design to receive the tuples that hold data in a JSON data schema such as below.
{ "type": "object", "required": ["payload"], "properties": { "payload": { "type": "object", "required": ["value", "formula"], "properties": { "value": { "type": "number" } "formula": { "type": "string" } } } } }
= Remarks =The sink object is designed to receive streaming data from a UDSF with the operation function described below implemented and to output that data. Therefore, in addition to the output value ("value" element), it is designed to receive the operation formula ("formula" element).
-
-
Define methods
Define a process for each of the following methods of the Sink interface:
Method name
Process
Write method
Output processing of streaming data
Close method
Close processing
In the Write method of the sample Sink Operation, the output process of values rounded down to the specified valid decimal place is implemented.
-
Import the packages
Use "sample" as the package name for the sample Sink Operation and import the following packages with the import statement:
-
fmt
-
math
-
gopkg.in/sensorbee/sensorbee.v0/bql
-
gopkg.in/sensorbee/sensorbee.v0/core
-
gopkg.in/sensorbee/sensorbee.v0/data
-
-
Create the sink struct
Next, create the sink struct that defines the data structure of the sink object.
sink struct holds as a member an int type variable "decimal" that indicates the valid decimal place.
type sink struct { decimal int }
= Remarks =There are no limitations for the struct name. "sink" is used in this manual for the sake of convenience.
-
Next, to implement the Sink interface, add the following methods to the sink struct:
-
Write(ctx *core.Context, tuple *core.Tuple) error
-
Close(ctx *core.Context) error
Defer the implementation of the output processing and add an empty method that returns "nil" such as the following:
func (s *sink) Write(ctx *core.Context, tuple *core.Tuple) error { return nil } func (s *sink) Close(ctx *core.Context) error { return nil }
-
-
Next, implement the creation function and the CreateSink function of the sink object.
func CreateSink(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) { decimal, err := getDecimal(params) if err != nil { return nil, err } return &sink{ decimal: decimal, }, nil } func getDecimal(params data.Map) (int, error) { node, ok := params["decimal"] if !ok { return 0, fmt.Errorf("decimal is required") } decimal, err := data.AsInt(node) if err != nil { return 0, fmt.Errorf("decimal must be a int:%s", err) } return int(decimal), nil }
The CreateSink function is a function that is called when EdgeStreaming Runtime interprets the Create Sink statement of BQL and receives the parameter specified in the BQL as the argument "params" (data.Map type (contents are map type in Go programming language)).
The getDecimal function retrieves the "decimal" element (valid decimal place) from map type data. The CreateSink function calls the getDecimal function to retrieve the "decimal" and generates a sink struct with that "decimal".
-
Implement the Write method as follows to output in standard output rounded down to the specified valid decimal place:
func (s *sink) Write(ctx *core.Context, tuple *core.Tuple) error { // Retrieve the "payload" element p, ok := tuple.Data["payload"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: payload") } payload, err := data.AsMap(p) if err != nil { return err } // Retrieve the "value" element v, ok := payload["value"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: value") } value, err := data.AsFloat(v) if err != nil { return err } // Retrieve the "formula" element f, ok := payload["formula"] if !ok { return fmt.Errorf("the tuple doesn't have the required field: formula") } formula, err := data.AsString(f) if err != nil { return err } // Round down to the valid decimal place shift := math.Pow(10, float64(s.decimal)) value = math.Floor(value*shift) / shift // Output to the log ctx.Log().Infof("formula: %s", formula) ctx.Log().Infof("value: %f", value) return nil }
-
Next, implement the Close method. In this case, there is no need to do anything with the Close method.
func (s *sink) Close(ctx *core.Context) error { return nil }
Next, implement the Write method and Close method that perform the actual processing.
These methods are generally expressed using the following code (actual processes should be implemented in the passages in red and bold text):
func (s *source) Write(ctx *core.Context, tuple *core.Tuple) error { <receive tuple> <write tuple> return nil } func (s *source) Close(ctx *core.Context) error { <close writer> return nil }
The Write method is called each time the sink object receives a tuple.
This completes the implementation of sink.