Skip to content

Commit

Permalink
Merge pull request #136 from complacentsee/backend
Browse files Browse the repository at this point in the history
Backend migration update
  • Loading branch information
coderReview authored Aug 23, 2023
2 parents 2cd3ae6 + 970006e commit ca430e0
Show file tree
Hide file tree
Showing 15 changed files with 3,602 additions and 350 deletions.
3,038 changes: 3,036 additions & 2 deletions dist/module.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dist/module.js.map

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions dist/plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
"name": "OSIsoft-PI",
"type": "datasource",
"id": "gridprotectionalliance-osisoftpi-datasource",
"backend": true,
"executable": "gpx_osipiwebapi",
"metrics": true,
"annotations": true,
"alerting": false,
"alerting": true,
"streaming": true,
"info": {
"description": "Datasource plugin for OSIsoft PI Web API",
"author": {
Expand Down Expand Up @@ -37,7 +40,7 @@
{"name": "Annotations Editor", "path": "img/annotations.png"}
],
"version": "4.2.0",
"updated": "2023-05-31"
"updated": "2023-08-16"
},
"dependencies": {
"grafanaDependency": ">=8.4.0",
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ var (
_ backend.CheckHealthHandler = (*Datasource)(nil)
_ instancemgmt.InstanceDisposer = (*Datasource)(nil)
_ backend.CallResourceHandler = (*Datasource)(nil)

// _ backend.StreamHandler = (*Datasource)(nil)
_ backend.StreamHandler = (*Datasource)(nil)
)

// NewDatasource creates a new PIWebAPI datasource instance.
Expand All @@ -53,7 +52,7 @@ func NewPIWebAPIDatasource(settings backend.DataSourceInstanceSettings) (instanc
return nil, fmt.Errorf("httpclient new: %w", err)
}

webIDCache := make(map[string]WebIDCacheEntry)
webIDCache := newWebIDCache()

// Create a new scheduler that will be used to clean the webIDCache every 5 minutes.
scheduler := gocron.NewScheduler(time.UTC)
Expand All @@ -67,6 +66,7 @@ func NewPIWebAPIDatasource(settings backend.DataSourceInstanceSettings) (instanc
scheduler: scheduler,
websocketConnectionsMutex: &sync.Mutex{},
sendersByWebIDMutex: &sync.Mutex{},
channelConstruct: make(map[string]StreamChannelConstruct),
websocketConnections: make(map[string]*websocket.Conn),
sendersByWebID: make(map[string]map[*backend.StreamSender]bool),
streamChannels: make(map[string]chan []byte),
Expand Down
30 changes: 16 additions & 14 deletions pkg/plugin/datasource_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
)

type Datasource struct {
settings backend.DataSourceInstanceSettings
queryMux *datasource.QueryTypeMux
StreamHandler backend.StreamHandler
httpClient *http.Client
webIDCache map[string]WebIDCacheEntry
// channelConstruct map[string]StreamChannelConstruct
settings backend.DataSourceInstanceSettings
queryMux *datasource.QueryTypeMux
StreamHandler backend.StreamHandler
httpClient *http.Client
webIDCache WebIDCache
channelConstruct map[string]StreamChannelConstruct
scheduler *gocron.Scheduler
websocketConnectionsMutex *sync.Mutex
sendersByWebIDMutex *sync.Mutex
Expand All @@ -27,12 +27,14 @@ type Datasource struct {
}

type PIWebAPIDataSourceJsonData struct {
URL *string `json:"url,omitempty"`
Access *string `json:"access,omitempty"`
PIServer *string `json:"piserver,omitempty"`
AFServer *string `json:"afserver,omitempty"`
AFDatabase *string `json:"afdatabase,omitempty"`
PIPoint *bool `json:"pipoint,omitempty"`
NewFormat *bool `json:"newFormat,omitempty"`
UseUnit *bool `json:"useUnit,omitempty"`
URL *string `json:"url,omitempty"`
Access *string `json:"access,omitempty"`
PIServer *string `json:"piserver,omitempty"`
AFServer *string `json:"afserver,omitempty"`
AFDatabase *string `json:"afdatabase,omitempty"`
PIPoint *bool `json:"pipoint,omitempty"`
NewFormat *bool `json:"newFormat,omitempty"`
UseUnit *bool `json:"useUnit,omitempty"`
UseExperimental *bool `json:"useExperimental,omitempty"`
UseStreaming *bool `json:"useStreaming,omitempty"`
}
130 changes: 125 additions & 5 deletions pkg/plugin/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (d *Datasource) apiBatchRequest(ctx context.Context, BatchSubRequests inter

// convertSliceToPointers converts a slice of values to a slice of
// pointers to those values. This is used to create point values that are nullable.
// TODO: handle bad value processing here
func convertSliceToPointers(slice interface{}, badValues []int) interface{} {
s := reflect.ValueOf(slice)
t := reflect.TypeOf(slice).Elem()
Expand Down Expand Up @@ -220,7 +221,7 @@ func convertSliceToPointers(slice interface{}, badValues []int) interface{} {
}
}

func handleTimestampValue(val reflect.Value) (reflect.Value, error) {
func parseTimestampValue(val reflect.Value) (reflect.Value, error) {
if val.Kind() != reflect.String {
return reflect.Value{}, fmt.Errorf("timestamp value must be a string")
}
Expand All @@ -239,11 +240,16 @@ func handleTimestampValue(val reflect.Value) (reflect.Value, error) {

// TODO: Code cleanup: handle this directly using slices of pointers.
// TODO: Missing functionality: Add support for replacing bad data.
func convertItemsToDataFrame(frameName string, items []PiBatchContentItem, d Datasource, webID string, summaryQuery bool, includeMetaData bool) (*data.Frame, error) {
frame := data.NewFrame(frameName)
func convertProcessedQueryToDataFrame(processedQuery PiProcessedQuery, d *Datasource, SummaryType string) (*data.Frame, error) {
webID := processedQuery.WebID
includeMetaData := processedQuery.UseUnit
items := *processedQuery.Response.getItems(SummaryType)
SliceType := d.getTypeForWebID(webID)
digitalState := d.getDigitalStateForWebID(webID)

frameName := getDataLabel(d.isUsingNewFormat(), &processedQuery, d.getPointTypeForWebID(webID), SummaryType)
frame := data.NewFrame(frameName)

var timestamps []time.Time
badValues := make([]int, 0)
var values any
Expand All @@ -258,7 +264,7 @@ func convertItemsToDataFrame(frameName string, items []PiBatchContentItem, d Dat
// we need to convert it to a time.Time
if SliceType == reflect.TypeOf([]time.Time{}) {
var err error
val, err = handleTimestampValue(val)
val, err = parseTimestampValue(val)
if err != nil {
continue
}
Expand Down Expand Up @@ -321,7 +327,121 @@ func convertItemsToDataFrame(frameName string, items []PiBatchContentItem, d Dat
values = reflect.Append(reflect.ValueOf(values), val).Interface()
}

backend.Logger.Info("Converting slice to pointers")
// Convert the slice of values to a slice of pointers to the values
// This is so that we can nullify the values that are "bad"
// "Bad" values are values such as system type values that cannot be represented
// in the slice type, or values that are not "good"
valuepointers := convertSliceToPointers(values, badValues)

timeField := data.NewField("time", nil, timestamps)
valueField := data.NewField(frameName, nil, valuepointers)

fieldConfig := &data.FieldConfig{}

if includeMetaData {
fieldConfig.Unit = d.getUnitsForWebID(webID)
fieldConfig.Description = d.getDescriptionForWebID(webID)
}

valueField.SetConfig(fieldConfig)

frame.Fields = append(frame.Fields,
timeField,
valueField,
)

if digitalState {
frame.Fields = append(frame.Fields,
data.NewField(frameName+".Value", nil, digitalStateValues),
)
}
// create a metadata struct for the frame so we can set it later.
frame.Meta = &data.FrameMeta{}
return frame, nil
}

// TODO: FIXME: Remove this function once replaced
func convertItemsToDataFrame(frameName string, items []PiBatchContentItem, d *Datasource, webID string, includeMetaData bool) (*data.Frame, error) {
frame := data.NewFrame(frameName)
SliceType := d.getTypeForWebID(webID)
digitalState := d.getDigitalStateForWebID(webID)

var timestamps []time.Time
badValues := make([]int, 0)
var values any
values = reflect.MakeSlice(reflect.SliceOf(SliceType.Elem()), 0, 0).Interface()
digitalStateValues := make([]int32, 0)

for i, item := range items {
var val reflect.Value
val = reflect.ValueOf(item.Value)

//handle value being a timestamp, the PIWab API returns a timestamp as a string
// we need to convert it to a time.Time
if SliceType == reflect.TypeOf([]time.Time{}) {
var err error
val, err = parseTimestampValue(val)
if err != nil {
continue
}
}

// if the value is valid, get the underlying value
// we need to complete both checks to prevent a panic on a null value
if val.IsValid() && val.Kind() == reflect.Ptr {
val = val.Elem()
}

if !val.IsValid() {
timestamps = append(timestamps, item.Timestamp)
badValues = append(badValues, i)
zeroVal := reflect.Zero(SliceType.Elem())
valuesValue := reflect.ValueOf(values)
values = reflect.Append(valuesValue, zeroVal).Interface()
continue
}

// if the value isn't good, or is not the same type as the slice,
// add it to the list of bad values and nullify later
//TODO we should make this pattern match the query options
if val.Type().Kind() != SliceType.Elem().Kind() || digitalState || !item.isGood() {

timestamps = append(timestamps, item.Timestamp)
if digitalState {
var pds PointDigitalState
if b, err := json.Marshal(item.Value); err == nil {
if err := json.Unmarshal(b, &pds); err != nil {
backend.Logger.Error("Error unmarshalling digital state", err)
badValues = append(badValues, i)
zeroVal := reflect.Zero(SliceType.Elem())
valuesValue := reflect.ValueOf(values)
values = reflect.Append(valuesValue, zeroVal).Interface()
continue
}
pdsValue := reflect.ValueOf(pds.Name)
values = reflect.Append(reflect.ValueOf(values), pdsValue).Interface()
digitalStateValues = append(digitalStateValues, int32(pds.Value))
continue
} else {
backend.Logger.Error("Error unmarshalling digital state", err)
badValues = append(badValues, i)
zeroVal := reflect.Zero(SliceType.Elem())
valuesValue := reflect.ValueOf(values)
values = reflect.Append(valuesValue, zeroVal).Interface()
continue
}
}

badValues = append(badValues, i)
zeroVal := reflect.Zero(SliceType.Elem())
valuesValue := reflect.ValueOf(values)
values = reflect.Append(valuesValue, zeroVal).Interface()
continue
}

timestamps = append(timestamps, item.Timestamp)
values = reflect.Append(reflect.ValueOf(values), val).Interface()
}

// Convert the slice of values to a slice of pointers to the values
// This is so that we can nullify the values that are "bad"
Expand Down
Loading

0 comments on commit ca430e0

Please sign in to comment.