Skip to content

Commit

Permalink
feat: stats for schema size (#5031)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Aug 29, 2024
1 parent e832eae commit 01b84f4
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
6 changes: 6 additions & 0 deletions runner/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ var (
"event_delivery_time": {
60, 300, 900, 1800, 2100, 2700, 3900, 4500, 5400, 9900, 11100, 12600, 21600, 23400, 43200, 45000, 82800, 86400, 88200, // 1m, 5m, 15m, 30m, 35m, 45m, 1h5m, 1h15m, 1h30m, 2h45m, 3h5m, 3h30m, 6h, 6h30m, 12h, 12h30m, 23h, 24h, 24h30m
},
"warehouse_schema_size": {
float64(10 * bytesize.B), float64(100 * bytesize.B),
float64(1 * bytesize.KB), float64(10 * bytesize.KB), float64(100 * bytesize.KB),
float64(1 * bytesize.MB), float64(3 * bytesize.MB), float64(5 * bytesize.MB), float64(10 * bytesize.MB),
float64(25 * bytesize.MB), float64(50 * bytesize.MB), float64(100 * bytesize.MB), float64(1 * bytesize.GB),
},
}

customBuckets = map[string][]float64{
Expand Down
2 changes: 2 additions & 0 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/alerta"
Expand Down Expand Up @@ -170,6 +171,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
dto.Warehouse,
f.conf,
f.logger.Child("warehouse"),
f.statsFactory,
),

upload: dto.Upload,
Expand Down
29 changes: 27 additions & 2 deletions warehouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@ import (
"slices"
"sync"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

// deprecatedColumnsRegex
// This regex is used to identify deprecated columns in the warehouse
// Example: abc-deprecated-dba626a7-406a-4757-b3e0-3875559c5840
Expand Down Expand Up @@ -54,15 +60,20 @@ type Schema struct {
schemaInWarehouseMu sync.RWMutex
unrecognizedSchemaInWarehouse model.Schema
unrecognizedSchemaInWarehouseMu sync.RWMutex

stats struct {
schemaSize stats.Histogram
}
}

func New(
db *sqlquerywrapper.DB,
warehouse model.Warehouse,
conf *config.Config,
logger logger.Logger,
statsFactory stats.Stats,
) *Schema {
return &Schema{
s := &Schema{
warehouse: warehouse,
schemaRepo: repo.NewWHSchemas(db),
stagingFileRepo: repo.NewStagingFiles(db),
Expand All @@ -71,6 +82,14 @@ func New(
skipDeepEqualSchemas: conf.GetBool("Warehouse.skipDeepEqualSchemas", false),
enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false),
}
s.stats.schemaSize = statsFactory.NewTaggedStat("warehouse_schema_size", stats.HistogramType, stats.Tags{
"module": "warehouse",
"workspaceId": warehouse.WorkspaceID,
"destType": warehouse.Destination.DestinationDefinition.Name,
"sourceId": warehouse.Source.ID,
"destinationId": warehouse.Destination.ID,
})
return s
}

// ConsolidateStagingFilesUsingLocalSchema
Expand Down Expand Up @@ -247,7 +266,13 @@ func (sh *Schema) UpdateLocalSchema(ctx context.Context, uploadID int64, updated
// 1. Inserts the updated schema into the local schema table
// 2. Updates the local schema instance
func (sh *Schema) updateLocalSchema(ctx context.Context, uploadId int64, updatedSchema model.Schema) error {
_, err := sh.schemaRepo.Insert(ctx, &model.WHSchema{
updatedSchemaInBytes, err := json.Marshal(updatedSchema)
if err != nil {
return fmt.Errorf("marshaling schema: %w", err)
}
sh.stats.schemaSize.Observe(float64(len(updatedSchemaInBytes)))

_, err = sh.schemaRepo.Insert(ctx, &model.WHSchema{
UploadID: uploadId,
SourceID: sh.warehouse.Source.ID,
Namespace: sh.warehouse.Namespace,
Expand Down
40 changes: 39 additions & 1 deletion warehouse/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"testing"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"

"github.com/samber/lo"
Expand Down Expand Up @@ -73,6 +76,7 @@ func (m *mockFetchSchemaRepo) FetchSchema(context.Context) (model.Schema, model.
}

func TestSchema_UpdateLocalSchema(t *testing.T) {
workspaceID := "test-workspace-id"
sourceID := "test_source_id"
destinationID := "test_destination_id"
namespace := "test_namespace"
Expand Down Expand Up @@ -151,8 +155,12 @@ func TestSchema_UpdateLocalSchema(t *testing.T) {
schemaMap: map[string]model.WHSchema{},
}

statsStore, err := memstats.New()
require.NoError(t, err)

s := Schema{
warehouse: model.Warehouse{
WorkspaceID: workspaceID,
Source: backendconfig.SourceT{
ID: sourceID,
},
Expand All @@ -165,10 +173,18 @@ func TestSchema_UpdateLocalSchema(t *testing.T) {
schemaRepo: mockRepo,
schemaInWarehouse: schemaInWarehouse,
}
tags := stats.Tags{
"module": "warehouse",
"workspaceId": s.warehouse.WorkspaceID,
"destType": s.warehouse.Destination.DestinationDefinition.Name,
"sourceId": s.warehouse.Source.ID,
"destinationId": s.warehouse.Destination.ID,
}
s.stats.schemaSize = statsStore.NewTaggedStat("warehouse_schema_size", stats.HistogramType, tags)

ctx := context.Background()

err := s.UpdateLocalSchema(ctx, uploadID, tc.mockSchema.Schema)
err = s.UpdateLocalSchema(ctx, uploadID, tc.mockSchema.Schema)
if tc.wantError == nil {
require.NoError(t, err)
require.Equal(t, tc.wantSchema, s.localSchema)
Expand All @@ -178,6 +194,9 @@ func TestSchema_UpdateLocalSchema(t *testing.T) {
require.Empty(t, s.localSchema)
require.Empty(t, mockRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema)
}
marshalledSchema, err := json.Marshal(tc.mockSchema.Schema)
require.NoError(t, err)
require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue())

err = s.UpdateLocalSchemaWithWarehouse(ctx, uploadID)
if tc.wantError == nil {
Expand All @@ -188,7 +207,11 @@ func TestSchema_UpdateLocalSchema(t *testing.T) {
require.Error(t, err, fmt.Sprintf("got error %v, want error %v", err, tc.wantError))
require.Empty(t, s.localSchema)
require.Empty(t, mockRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema)
require.EqualValues(t, float64(241), statsStore.Get("warehouse_schema_size", tags).LastValue())
}
marshalledSchema, err = json.Marshal(schemaInWarehouse)
require.NoError(t, err)
require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue())
})
}
}
Expand Down Expand Up @@ -1772,6 +1795,9 @@ func TestSchema_SyncRemoteSchema(t *testing.T) {
require.False(t, schemaChanged)
})
t.Run("schema changed", func(t *testing.T) {
statsStore, err := memstats.New()
require.NoError(t, err)

testSchema := model.Schema{
tableName: model.TableSchema{
"test_int": "int",
Expand Down Expand Up @@ -1823,6 +1849,14 @@ func TestSchema_SyncRemoteSchema(t *testing.T) {
schemaRepo: mockSchemaRepo,
log: logger.NOP,
}
tags := stats.Tags{
"module": "warehouse",
"workspaceId": s.warehouse.WorkspaceID,
"destType": s.warehouse.Destination.DestinationDefinition.Name,
"sourceId": s.warehouse.Source.ID,
"destinationId": s.warehouse.Destination.ID,
}
s.stats.schemaSize = statsStore.NewTaggedStat("warehouse_schema_size", stats.HistogramType, tags)

mockFetchSchemaRepo := &mockFetchSchemaRepo{
err: nil,
Expand All @@ -1839,6 +1873,10 @@ func TestSchema_SyncRemoteSchema(t *testing.T) {
require.Equal(t, schemaInWarehouse, mockSchemaRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema)
require.Equal(t, schemaInWarehouse, s.schemaInWarehouse)
require.Equal(t, schemaInWarehouse, s.unrecognizedSchemaInWarehouse)

marshalledSchema, err := json.Marshal(s.localSchema)
require.NoError(t, err)
require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue())
})
t.Run("schema not changed", func(t *testing.T) {
testSchema := model.Schema{
Expand Down

0 comments on commit 01b84f4

Please sign in to comment.