From a5307f17650db7c475b3be3615b7b4c755653a49 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Wed, 28 Aug 2024 15:47:52 +0530 Subject: [PATCH] fix: using jsoniter --- .../lytics_bulk_upload/lyticsBulkUpload.go | 42 +++++++++++++------ .../lytics_bulk_upload/types.go | 16 +++---- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload/lyticsBulkUpload.go b/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload/lyticsBulkUpload.go index 85eaf769ed..4b02d4dfb0 100644 --- a/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload/lyticsBulkUpload.go +++ b/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload/lyticsBulkUpload.go @@ -4,7 +4,6 @@ import ( "bufio" "encoding/csv" "encoding/json" - stdjson "encoding/json" "fmt" "io" "net/http" @@ -13,6 +12,7 @@ import ( "time" "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" "github.com/tidwall/gjson" "github.com/rudderlabs/rudder-go-kit/bytesize" @@ -37,11 +37,11 @@ func NewLyticsBulkUploader(destinationName, authorization, baseEndpoint string) func NewManager(destination *backendconfig.DestinationT) (*LyticsBulkUploader, error) { destConfig := DestinationConfig{} - jsonConfig, err := stdjson.Marshal(destination.Config) + jsonConfig, err := json.Marshal(destination.Config) if err != nil { return nil, fmt.Errorf("error in marshalling destination config: %v", err) } - err = stdjson.Unmarshal(jsonConfig, &destConfig) + err = json.Unmarshal(jsonConfig, &destConfig) if err != nil { return nil, fmt.Errorf("error in unmarshalling destination config: %v", err) } @@ -92,8 +92,8 @@ func (b *LyticsBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, streamT } // Unmarshal Properties into a map of json.RawMessage - var fields map[string]json.RawMessage - if err := json.Unmarshal(data.Message.Properties, &fields); err != nil { + var fields map[string]interface{} + if err := jsoniter.Unmarshal(data.Message.Properties, &fields); err != nil { return err } @@ -104,11 +104,29 @@ func (b *LyticsBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, streamT for i, mapping := range streamTraitsMapping { if value, exists := fields[mapping.RudderProperty]; exists { // Convert the json.RawMessage value to a string - var valueStr string - if err := json.Unmarshal(value, &valueStr); err == nil { - csvRow[i] = valueStr - } else { - csvRow[i] = string(value) + switch v := value.(type) { + case jsoniter.RawMessage: + // Convert the json.RawMessage value to a string + var valueStr string + if err := jsoniter.Unmarshal(v, &valueStr); err == nil { + csvRow[i] = valueStr + } else { + csvRow[i] = string(v) + } + case []byte: + // Handle if the value is directly a []byte + var valueStr string + if err := jsoniter.Unmarshal(v, &valueStr); err == nil { + csvRow[i] = valueStr + } else { + csvRow[i] = string(v) + } + case string: + // If the value is already a string, use it directly + csvRow[i] = v + default: + // Handle other types (e.g., numbers, booleans) by converting to a string + csvRow[i] = fmt.Sprintf("%v", v) } } else { // Append an empty string if the RudderProperty is not found in fields @@ -195,7 +213,7 @@ func (b *LyticsBulkUploader) createCSVFile(existingFilePath string, streamTraits for scanner.Scan() { line := scanner.Text() var data Data - if err := json.Unmarshal([]byte(line), &data); err != nil { + if err := jsoniter.Unmarshal([]byte(line), &data); err != nil { // Collect the failed job ID actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID) continue @@ -289,7 +307,7 @@ func (e *LyticsBulkUploader) UploadBulkFile(data *HttpRequestData, filePath stri func (b *LyticsBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput { destination := asyncDestStruct.Destination filePath := asyncDestStruct.FileName - destConfig, err := json.Marshal(destination.Config) + destConfig, err := jsoniter.Marshal(destination.Config) if err != nil { eventsAbortedStat := stats.Default.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ "module": "batch_router", diff --git a/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload/types.go b/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload/types.go index 861ce71f24..a8de9936a6 100644 --- a/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload/types.go +++ b/router/batchrouter/asyncdestinationmanager/lytics_bulk_upload/types.go @@ -3,12 +3,12 @@ package lyticsBulkUpload import ( "context" "encoding/csv" - "encoding/json" "io" "net/http" "os" "time" + jsoniter "github.com/json-iterator/go" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" ) @@ -60,13 +60,13 @@ type ActionFileInfo struct { } type Message struct { - UserID string `json:"userId"` - AnonymousID string `json:"anonymousId"` - Event string `json:"event"` - Properties json.RawMessage `json:"properties"` - Traits json.RawMessage `json:"traits"` - Context json.RawMessage `json:"context"` - Timestamp time.Time `json:"timestamp"` + UserID string `json:"userId"` + AnonymousID string `json:"anonymousId"` + Event string `json:"event"` + Properties jsoniter.RawMessage `json:"properties"` + Traits jsoniter.RawMessage `json:"traits"` + Context jsoniter.RawMessage `json:"context"` + Timestamp time.Time `json:"timestamp"` } type Metadata struct {