Skip to content

Commit

Permalink
Merge pull request #1628 from orozery/objectstore_marshaller
Browse files Browse the repository at this point in the history
add objectstore marshaller
  • Loading branch information
safchain authored Jan 31, 2019
2 parents 723e8e4 + 94e888e commit ddc21c0
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 4 deletions.
10 changes: 9 additions & 1 deletion contrib/objectstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/contrib/objectstore/subscriber"
"github.com/skydive-project/skydive/contrib/objectstore/subscriber/flowtransformer"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/websocket"
Expand Down Expand Up @@ -39,6 +40,13 @@ func main() {
subscriberUsername := cfg.GetString("subscriber_username")
subscriberPassword := cfg.GetString("subscriber_password")
maxSecondsPerStream := cfg.GetInt("max_seconds_per_stream")
flowTransformerName := cfg.GetString("flow_transformer")

flowTransformer, err := flowtransformer.New(flowTransformerName)
if err != nil {
logging.GetLogger().Errorf("Failed to initialize flow transformer: %s", err.Error())
os.Exit(1)
}

authOpts := &shttp.AuthenticationOpts{
Username: subscriberUsername,
Expand All @@ -58,7 +66,7 @@ func main() {
}
structClient := wsClient.UpgradeToStructSpeaker()

s := subscriber.New(endpoint, region, bucket, accessKey, secretKey, objectPrefix, maxSecondsPerStream)
s := subscriber.New(endpoint, region, bucket, accessKey, secretKey, objectPrefix, maxSecondsPerStream, flowTransformer)

// subscribe to the flow updates
structClient.AddStructMessageHandler(s, []string{"flow"})
Expand Down
1 change: 1 addition & 0 deletions contrib/objectstore/skydive-objectstore.yml.default
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
# subscriber_username:
# subscriber_password:
# max_seconds_per_stream: 86400
# flow_transformer: custom1
42 changes: 42 additions & 0 deletions contrib/objectstore/subscriber/flowtransformer/custom1/custom1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package custom1

import (
"github.com/skydive-project/skydive/flow"
)

// Flow represents a transformed flow
type Flow struct {
UUID string
LayersPath string
Network *flow.FlowLayer
Transport *flow.TransportLayer
LastUpdateMetric *flow.FlowMetric
Metric *flow.FlowMetric
Start int64
Last int64
FinishType flow.FlowFinishType
}

// FlowTransformer is a custom transformer for flows
type FlowTransformer struct {
}

// Transform transforms a flow before being stored
func (m *FlowTransformer) Transform(f *flow.Flow) interface{} {
return &Flow{
UUID: f.UUID,
LayersPath: f.LayersPath,
Network: f.Network,
Transport: f.Transport,
LastUpdateMetric: f.LastUpdateMetric,
Metric: f.Metric,
Start: f.Start,
Last: f.Last,
FinishType: f.FinishType,
}
}

// New returns a new Custom1Marshaller
func New() *FlowTransformer {
return &FlowTransformer{}
}
25 changes: 25 additions & 0 deletions contrib/objectstore/subscriber/flowtransformer/flowtransformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package flowtransformer

import (
"fmt"
"github.com/skydive-project/skydive/contrib/objectstore/subscriber/flowtransformer/custom1"
"github.com/skydive-project/skydive/flow"
)

// FlowTransformer allows generic transformations of a flow
type FlowTransformer interface {
// Transform transforms a flow before being stored
Transform(flow *flow.Flow) interface{}
}

// New creates a new flow transformer based on a name string
func New(flowTransformerName string) (FlowTransformer, error) {
switch flowTransformerName {
case "custom1":
return custom1.New(), nil
case "":
return nil, nil
default:
return nil, fmt.Errorf("Marshaller '%s' is not supported", flowTransformerName)
}
}
23 changes: 20 additions & 3 deletions contrib/objectstore/subscriber/objectstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/aws/aws-sdk-go/aws"

"github.com/skydive-project/skydive/contrib/objectstore/subscriber/client"
"github.com/skydive-project/skydive/contrib/objectstore/subscriber/flowtransformer"
"github.com/skydive-project/skydive/flow"
"github.com/skydive-project/skydive/logging"
ws "github.com/skydive-project/skydive/websocket"
Expand All @@ -33,6 +34,7 @@ type Subscriber struct {
currentStream stream
maxStreamDuration time.Duration
objectStoreClient client.Client
flowTransformer flowtransformer.FlowTransformer
}

// OnStructMessage is triggered when WS server sends us a message.
Expand All @@ -58,7 +60,21 @@ func (s *Subscriber) StoreFlows(flows []*flow.Flow) error {
return nil
}

flowsString, err := json.Marshal(flows)
var jsonMarshalInput interface{}
if s.flowTransformer == nil {
jsonMarshalInput = flows
} else {
transformedFlows := make([]interface{}, 0, len(flows))
for _, f := range flows {
transformedFlow := s.flowTransformer.Transform(f)
if transformedFlow != nil {
transformedFlows = append(transformedFlows, transformedFlow)
}
}
jsonMarshalInput = transformedFlows
}

flowsBytes, err := json.Marshal(jsonMarshalInput)
if err != nil {
logging.GetLogger().Error("Error encoding flows: ", err)
return err
Expand All @@ -83,7 +99,7 @@ func (s *Subscriber) StoreFlows(flows []*flow.Flow) error {
// gzip
var b bytes.Buffer
w := gzip.NewWriter(&b)
w.Write([]byte(flowsString))
w.Write(flowsBytes)
w.Close()

currentStream := s.currentStream
Expand All @@ -106,13 +122,14 @@ func (s *Subscriber) StoreFlows(flows []*flow.Flow) error {
}

// New returns a new flows subscriber writing to an object storage service
func New(endpoint, region, bucket, accessKey, secretKey, objectPrefix string, maxSecondsPerStream int) *Subscriber {
func New(endpoint, region, bucket, accessKey, secretKey, objectPrefix string, maxSecondsPerStream int, flowTransformer flowtransformer.FlowTransformer) *Subscriber {
objectStoreClient := client.New(endpoint, region, accessKey, secretKey)
s := &Subscriber{
bucket: bucket,
objectPrefix: objectPrefix,
maxStreamDuration: time.Second * time.Duration(maxSecondsPerStream),
objectStoreClient: objectStoreClient,
flowTransformer: flowTransformer,
}
return s
}

0 comments on commit ddc21c0

Please sign in to comment.