Skip to content

Commit

Permalink
fix: using jsoniter
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Aug 28, 2024
1 parent fca3914 commit a5307f1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"encoding/csv"
"encoding/json"
stdjson "encoding/json"
"fmt"
"io"
"net/http"
Expand All @@ -13,6 +12,7 @@ import (
"time"

"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/bytesize"
Expand All @@ -37,11 +37,11 @@ func NewLyticsBulkUploader(destinationName, authorization, baseEndpoint string)

func NewManager(destination *backendconfig.DestinationT) (*LyticsBulkUploader, error) {
destConfig := DestinationConfig{}
jsonConfig, err := stdjson.Marshal(destination.Config)
jsonConfig, err := json.Marshal(destination.Config)
if err != nil {
return nil, fmt.Errorf("error in marshalling destination config: %v", err)
}
err = stdjson.Unmarshal(jsonConfig, &destConfig)
err = json.Unmarshal(jsonConfig, &destConfig)
if err != nil {
return nil, fmt.Errorf("error in unmarshalling destination config: %v", err)
}
Expand Down Expand Up @@ -92,8 +92,8 @@ func (b *LyticsBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, streamT
}

// Unmarshal Properties into a map of json.RawMessage
var fields map[string]json.RawMessage
if err := json.Unmarshal(data.Message.Properties, &fields); err != nil {
var fields map[string]interface{}
if err := jsoniter.Unmarshal(data.Message.Properties, &fields); err != nil {
return err
}

Expand All @@ -104,11 +104,29 @@ func (b *LyticsBulkUploader) PopulateCsvFile(actionFile *ActionFileInfo, streamT
for i, mapping := range streamTraitsMapping {
if value, exists := fields[mapping.RudderProperty]; exists {
// Convert the json.RawMessage value to a string
var valueStr string
if err := json.Unmarshal(value, &valueStr); err == nil {
csvRow[i] = valueStr
} else {
csvRow[i] = string(value)
switch v := value.(type) {
case jsoniter.RawMessage:
// Convert the json.RawMessage value to a string
var valueStr string
if err := jsoniter.Unmarshal(v, &valueStr); err == nil {
csvRow[i] = valueStr
} else {
csvRow[i] = string(v)
}
case []byte:
// Handle if the value is directly a []byte
var valueStr string
if err := jsoniter.Unmarshal(v, &valueStr); err == nil {
csvRow[i] = valueStr
} else {
csvRow[i] = string(v)
}
case string:
// If the value is already a string, use it directly
csvRow[i] = v
default:
// Handle other types (e.g., numbers, booleans) by converting to a string
csvRow[i] = fmt.Sprintf("%v", v)
}
} else {
// Append an empty string if the RudderProperty is not found in fields
Expand Down Expand Up @@ -195,7 +213,7 @@ func (b *LyticsBulkUploader) createCSVFile(existingFilePath string, streamTraits
for scanner.Scan() {
line := scanner.Text()
var data Data
if err := json.Unmarshal([]byte(line), &data); err != nil {
if err := jsoniter.Unmarshal([]byte(line), &data); err != nil {
// Collect the failed job ID
actionFile.FailedJobIDs = append(actionFile.FailedJobIDs, data.Metadata.JobID)
continue
Expand Down Expand Up @@ -289,7 +307,7 @@ func (e *LyticsBulkUploader) UploadBulkFile(data *HttpRequestData, filePath stri
func (b *LyticsBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
filePath := asyncDestStruct.FileName
destConfig, err := json.Marshal(destination.Config)
destConfig, err := jsoniter.Marshal(destination.Config)
if err != nil {
eventsAbortedStat := stats.Default.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{
"module": "batch_router",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package lyticsBulkUpload
import (
"context"
"encoding/csv"
"encoding/json"
"io"
"net/http"
"os"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
)
Expand Down Expand Up @@ -60,13 +60,13 @@ type ActionFileInfo struct {
}

type Message struct {
UserID string `json:"userId"`
AnonymousID string `json:"anonymousId"`
Event string `json:"event"`
Properties json.RawMessage `json:"properties"`
Traits json.RawMessage `json:"traits"`
Context json.RawMessage `json:"context"`
Timestamp time.Time `json:"timestamp"`
UserID string `json:"userId"`
AnonymousID string `json:"anonymousId"`
Event string `json:"event"`
Properties jsoniter.RawMessage `json:"properties"`
Traits jsoniter.RawMessage `json:"traits"`
Context jsoniter.RawMessage `json:"context"`
Timestamp time.Time `json:"timestamp"`
}

type Metadata struct {
Expand Down

0 comments on commit a5307f1

Please sign in to comment.