Implementation of source.go

Implement the following object (source) in source.go:

  • Object implementing the Source interface of the gopkg.in/sensorbee/sensorbee.v0/core package provided on EdgeStreaming

 

The implementation procedure is as follows:

 

  1. Generate a tuple

    The source object generates a tuple with the following procedure:

    1. Receives the "interval" integer type parameter

    2. Generates pseudo-random numbers at regular intervals according to "interval"

    3. Generates a tuple to hold the data of a JSON data schema such as below

      {
        "type": "object",
        "required": ["payload"],
        "properties": {
          "payload": {
            "type": "object",
            "required": ["value"],
            "properties": {
              "value": {
                "type": "number"
              }
            }
          }
        }
      }
      

      An example of the JSON data of the tuple that is output is shown below.

      {"payload": {"value": 3.5423242}}

       

     

  2. Define methods

    Define a process for each of the following methods of the Source interface:

     

    Method name

    Process

    GenerateStream method

    Generation processing of streaming data

    Stop method

    Close processing

     

    In the GenerateStream method of the sample Source Operation, describe a section to generate pseudo-random numbers at specified regular intervals as a function of the Operation.

     

     

  3. Import the packages

    Use "sample" as the package name for the sample Source Operation and import the following packages with the import statement:

    • math/rand

    • time

    • gopkg.in/sensorbee/sensorbee.v0/bql

    • gopkg.in/sensorbee/sensorbee.v0/core

    • gopkg.in/sensorbee/sensorbee.v0/data

     

  4. Create the source struct

    Create the source struct that defines the data structure of the source object.

     

    The source struct holds as members a time.Duration type variable "interval" and an empty channel type variable "term" for termination notifications, as shown below.

    type source struct {
       interval time.Duration
       term     chan struct{}
    }
    

    The source struct holds the following members:

     

    Type

    Variable

    time.Duration type

    Generation processing of streaming data

    Empty channel type

    Close processing

     

     

     

  5. Next, to implement the Source interface, add the following methods to the source object:

    • GenerateStream(ctx *Context, w Writer) error

    • Stop(ctx *Context) error

     

  6. Defer the implementation of the pseudo-random number generation processing and add an empty method that returns "nil" such as the following:

    func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error {
       return nil
    }
    
    func (s *source) Stop(ctx *core.Context) error {
       return nil
    }
    

     

  7. Next, implement the creation function and the CreateSource function of the source object.

    func CreateSource(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Source, error) {
       interval, err := getInterval(params)
       if err != nil {
          return nil, err
       }
     
       return &source{
          interval: interval,
          term:     make(chan struct{}),
       }, nil
    }
     
    func getInterval(params data.Map) (time.Duration, error) {
       interval := 1 * time.Second 
       if v, ok := params["interval"]; ok {
          i, err := data.ToDuration(v)
          if err != nil {
             return interval, err
          }
          interval = i
       }
       return interval, nil
    }
    

    The CreateSource function is a function that is called when EdgeStreaming Runtime interprets the Create Source statement of BQL and receives the parameter specified in the BQL as the argument "params" (data.Map type (contents are map type in Go programming language)).

     

    The getInterval function retrieves the "interval" element (creation interval) from map type data.

    The CreateSource function calls the getInterval function to retrieve the "interval" and generates a source struct holding that "interval" and an empty channel for termination notifications.

     

  8. Next, implement the GenerateStream method and Stop method that perform the actual processing. These methods are generally expressed using the following code (actual processes should be implemented in the passages in red and bold text):

    func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error {
        <initialization> 
            defer func() {
            <clean up> 
        }()
    
         for <check stop> {
            t := <create a new tuple> 
            if err := w.Write(ctx, t); err != nil {
                return err        }
        }
        return nil
    }
     
    func (s *source) Stop(ctx *core.Context) error {
        <turn on a stop flag> 
        <wait until GenerateStream stops> 
        return nil
    }
    

     

    As indicated in the code above, the GenerateStream method is called once when the stream starts and then continues to generate tuples until the method returns a return value.

     

  9.  

  10. Implement the GenerateStream method as follows to generate pseudo-random numbers at regular intervals.

    func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error { 
       rand.Seed(time.Now().UnixNano()) // Set a seed for the pseudo-random numbers in the initialization process 
       
       next := time.Now()
       for {
          val := rand.Float64()
     
          // Generate a tuple 
          m := data.Map{"value": data.Float(val)} 
          t := core.NewTuple(data.Map{"payload": m})
     
          if s.interval > 0 {
             t.Timestamp = next
          } 
          ctx.Log().Debug("generation:", val) // Output to the log 
          if err := w.Write(ctx, t); err != nil { // Output the tuple 
             return err 
          } 
     
          // Wait for the elapse of the specified time and the termination notification 
          if s.interval > 0 { 
             now := time.Now()
             next = next.Add(s.interval)
             if next.Before(now) {
                next = now.Add(s.interval)
             }
             select {
             case <-s.term:
                return core.ErrSourceStopped
             case <-time.After(next.Sub(now)):
             }
          }
       }
       return nil
    }
    

     

  11. Next, implement the Stop method as follows:

    func (s *source) Stop(ctx *core.Context) error {
       s.term <- struct{}{}
       return nil
    }
    

 

This completes the implementation of source.