Skip to content

Commit

Permalink
feat: OBS-381 - extract push request validation into func
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Fiedorowicz <[email protected]>
  • Loading branch information
mfiedorowicz committed Feb 2, 2024
1 parent 5d057d4 commit a065f49
Showing 1 changed file with 35 additions and 32 deletions.
67 changes: 35 additions & 32 deletions diode-server/distributor/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,40 +91,15 @@ func (c *Component) Stop() error {

// Push handles a push request
func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushResponse, error) {
reqID := in.GetId()
if reqID == "" {
return nil, fmt.Errorf("id is empty")
if err := validatePushRequest(in); err != nil {
return nil, err
}

reqStream := in.GetStream()
if reqStream == "" {
reqStream = DefaultRequestStream
}

producerAppName := in.GetProducerAppName()
if producerAppName == "" {
return nil, fmt.Errorf("producer app name is empty")
}

producerAppVersion := in.GetProducerAppVersion()
if producerAppVersion == "" {
return nil, fmt.Errorf("producer app version is empty")
}

sdkName := in.GetSdkName()
if sdkName == "" {
return nil, fmt.Errorf("sdk name is empty")
}

sdkVersion := in.GetSdkVersion()
if sdkVersion == "" {
return nil, fmt.Errorf("sdk version is empty")
}

if len(in.GetData()) < 1 {
return nil, fmt.Errorf("data is empty")
}

errs := make([]string, 0)

for i, v := range in.GetData() {
Expand All @@ -139,12 +114,12 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo
continue
}
msg := map[string]interface{}{
"id": reqID,
"id": in.GetId(),
"stream": reqStream,
"producer_app_name": producerAppName,
"producer_app_version": producerAppVersion,
"sdk_name": sdkName,
"sdk_version": sdkVersion,
"producer_app_name": in.GetProducerAppName(),
"producer_app_version": in.GetProducerAppVersion(),
"sdk_name": in.GetSdkName(),
"sdk_version": in.GetSdkVersion(),
"data": encodedEntity,
"ts": v.GetTimestamp().String(),
"ingestion_ts": time.Now().UnixNano(),
Expand All @@ -159,3 +134,31 @@ func (c *Component) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushRespo

return &pb.PushResponse{Errors: errs}, nil
}

func validatePushRequest(in *pb.PushRequest) error {
if in.GetId() == "" {
return fmt.Errorf("id is empty")
}

if in.GetProducerAppName() == "" {
return fmt.Errorf("producer app name is empty")
}

if in.GetProducerAppVersion() == "" {
return fmt.Errorf("producer app version is empty")
}

if in.GetSdkName() == "" {
return fmt.Errorf("sdk name is empty")
}

if in.GetSdkVersion() == "" {
return fmt.Errorf("sdk version is empty")
}

if len(in.GetData()) < 1 {
return fmt.Errorf("data is empty")
}

return nil
}

0 comments on commit a065f49

Please sign in to comment.