Implementation of source.go
Implement the following object (source) in source.go:
-
Object implementing the Source interface of the gopkg.in/sensorbee/sensorbee.v0/core package provided on EdgeStreaming
The implementation procedure is as follows:
-
Generate a tuple
The source object generates a tuple with the following procedure:
-
Receives the "interval" integer type parameter
-
Generates pseudo-random numbers at regular intervals according to "interval"
-
Generates a tuple to hold the data of a JSON data schema such as below
{ "type": "object", "required": ["payload"], "properties": { "payload": { "type": "object", "required": ["value"], "properties": { "value": { "type": "number" } } } } }
An example of the JSON data of the tuple that is output is shown below.
{"payload": {"value": 3.5423242}}
-
-
Define methods
Define a process for each of the following methods of the Source interface:
Method name
Process
GenerateStream method
Generation processing of streaming data
Stop method
Close processing
In the GenerateStream method of the sample Source Operation, describe a section to generate pseudo-random numbers at specified regular intervals as a function of the Operation.
-
Import the packages
Use "sample" as the package name for the sample Source Operation and import the following packages with the import statement:
-
math/rand
-
time
-
gopkg.in/sensorbee/sensorbee.v0/bql
-
gopkg.in/sensorbee/sensorbee.v0/core
-
gopkg.in/sensorbee/sensorbee.v0/data
-
-
Create the source struct
Create the source struct that defines the data structure of the source object.
The source struct holds as members a time.Duration type variable "interval" and an empty channel type variable "term" for termination notifications, as shown below.
type source struct { interval time.Duration term chan struct{} }
The source struct holds the following members:
Type
Variable
time.Duration type
Generation processing of streaming data
Empty channel type
Close processing
-
Next, to implement the Source interface, add the following methods to the source object:
-
GenerateStream(ctx *Context, w Writer) error
-
Stop(ctx *Context) error
-
-
Defer the implementation of the pseudo-random number generation processing and add an empty method that returns "nil" such as the following:
func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error { return nil } func (s *source) Stop(ctx *core.Context) error { return nil }
-
Next, implement the creation function and the CreateSource function of the source object.
func CreateSource(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Source, error) { interval, err := getInterval(params) if err != nil { return nil, err } return &source{ interval: interval, term: make(chan struct{}), }, nil } func getInterval(params data.Map) (time.Duration, error) { interval := 1 * time.Second if v, ok := params["interval"]; ok { i, err := data.ToDuration(v) if err != nil { return interval, err } interval = i } return interval, nil }
The CreateSource function is a function that is called when EdgeStreaming Runtime interprets the Create Source statement of BQL and receives the parameter specified in the BQL as the argument "params" (data.Map type (contents are map type in Go programming language)).
The getInterval function retrieves the "interval" element (creation interval) from map type data.
The CreateSource function calls the getInterval function to retrieve the "interval" and generates a source struct holding that "interval" and an empty channel for termination notifications.
-
Next, implement the GenerateStream method and Stop method that perform the actual processing. These methods are generally expressed using the following code (actual processes should be implemented in the passages in red and bold text):
func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error { <initialization> defer func() { <clean up> }() for <check stop> { t := <create a new tuple> if err := w.Write(ctx, t); err != nil { return err } } return nil } func (s *source) Stop(ctx *core.Context) error { <turn on a stop flag> <wait until GenerateStream stops> return nil }
As indicated in the code above, the GenerateStream method is called once when the stream starts and then continues to generate tuples until the method returns a return value.
-
Implement the GenerateStream method as follows to generate pseudo-random numbers at regular intervals.
func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error { rand.Seed(time.Now().UnixNano()) // Set a seed for the pseudo-random numbers in the initialization process next := time.Now() for { val := rand.Float64() // Generate a tuple m := data.Map{"value": data.Float(val)} t := core.NewTuple(data.Map{"payload": m}) if s.interval > 0 { t.Timestamp = next } ctx.Log().Debug("generation:", val) // Output to the log if err := w.Write(ctx, t); err != nil { // Output the tuple return err } // Wait for the elapse of the specified time and the termination notification if s.interval > 0 { now := time.Now() next = next.Add(s.interval) if next.Before(now) { next = now.Add(s.interval) } select { case <-s.term: return core.ErrSourceStopped case <-time.After(next.Sub(now)): } } } return nil }
-
Next, implement the Stop method as follows:
func (s *source) Stop(ctx *core.Context) error { s.term <- struct{}{} return nil }
This completes the implementation of source.