diff --git a/runner/buckets.go b/runner/buckets.go index 36e66cd640..5deb831076 100644 --- a/runner/buckets.go +++ b/runner/buckets.go @@ -21,6 +21,12 @@ var ( "event_delivery_time": { 60, 300, 900, 1800, 2100, 2700, 3900, 4500, 5400, 9900, 11100, 12600, 21600, 23400, 43200, 45000, 82800, 86400, 88200, // 1m, 5m, 15m, 30m, 35m, 45m, 1h5m, 1h15m, 1h30m, 2h45m, 3h5m, 3h30m, 6h, 6h30m, 12h, 12h30m, 23h, 24h, 24h30m }, + "warehouse_schema_size": { + float64(10 * bytesize.B), float64(100 * bytesize.B), + float64(1 * bytesize.KB), float64(10 * bytesize.KB), float64(100 * bytesize.KB), + float64(1 * bytesize.MB), float64(3 * bytesize.MB), float64(5 * bytesize.MB), float64(10 * bytesize.MB), + float64(25 * bytesize.MB), float64(50 * bytesize.MB), float64(100 * bytesize.MB), float64(1 * bytesize.GB), + }, } customBuckets = map[string][]float64{ diff --git a/warehouse/router/upload.go b/warehouse/router/upload.go index a8cf3b4a4a..b8558985ac 100644 --- a/warehouse/router/upload.go +++ b/warehouse/router/upload.go @@ -15,6 +15,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/services/alerta" @@ -170,6 +171,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo dto.Warehouse, f.conf, f.logger.Child("warehouse"), + f.statsFactory, ), upload: dto.Upload, diff --git a/warehouse/schema/schema.go b/warehouse/schema/schema.go index b864421458..6f56fdc97f 100644 --- a/warehouse/schema/schema.go +++ b/warehouse/schema/schema.go @@ -8,10 +8,14 @@ import ( "slices" "sync" + jsoniter "github.com/json-iterator/go" "github.com/samber/lo" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/rudderlabs/rudder-server/warehouse/internal/repo" @@ -19,6 +23,8 @@ import ( whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) +var json = jsoniter.ConfigCompatibleWithStandardLibrary + // deprecatedColumnsRegex // This regex is used to identify deprecated columns in the warehouse // Example: abc-deprecated-dba626a7-406a-4757-b3e0-3875559c5840 @@ -54,6 +60,10 @@ type Schema struct { schemaInWarehouseMu sync.RWMutex unrecognizedSchemaInWarehouse model.Schema unrecognizedSchemaInWarehouseMu sync.RWMutex + + stats struct { + schemaSize stats.Histogram + } } func New( @@ -61,8 +71,9 @@ func New( warehouse model.Warehouse, conf *config.Config, logger logger.Logger, + statsFactory stats.Stats, ) *Schema { - return &Schema{ + s := &Schema{ warehouse: warehouse, schemaRepo: repo.NewWHSchemas(db), stagingFileRepo: repo.NewStagingFiles(db), @@ -71,6 +82,14 @@ func New( skipDeepEqualSchemas: conf.GetBool("Warehouse.skipDeepEqualSchemas", false), enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false), } + s.stats.schemaSize = statsFactory.NewTaggedStat("warehouse_schema_size", stats.HistogramType, stats.Tags{ + "module": "warehouse", + "workspaceId": warehouse.WorkspaceID, + "destType": warehouse.Destination.DestinationDefinition.Name, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + }) + return s } // ConsolidateStagingFilesUsingLocalSchema @@ -247,7 +266,13 @@ func (sh *Schema) UpdateLocalSchema(ctx context.Context, uploadID int64, updated // 1. Inserts the updated schema into the local schema table // 2. Updates the local schema instance func (sh *Schema) updateLocalSchema(ctx context.Context, uploadId int64, updatedSchema model.Schema) error { - _, err := sh.schemaRepo.Insert(ctx, &model.WHSchema{ + updatedSchemaInBytes, err := json.Marshal(updatedSchema) + if err != nil { + return fmt.Errorf("marshaling schema: %w", err) + } + sh.stats.schemaSize.Observe(float64(len(updatedSchemaInBytes))) + + _, err = sh.schemaRepo.Insert(ctx, &model.WHSchema{ UploadID: uploadId, SourceID: sh.warehouse.Source.ID, Namespace: sh.warehouse.Namespace, diff --git a/warehouse/schema/schema_test.go b/warehouse/schema/schema_test.go index 2d6688f034..3bba325645 100644 --- a/warehouse/schema/schema_test.go +++ b/warehouse/schema/schema_test.go @@ -7,6 +7,9 @@ import ( "testing" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/samber/lo" @@ -73,6 +76,7 @@ func (m *mockFetchSchemaRepo) FetchSchema(context.Context) (model.Schema, model. } func TestSchema_UpdateLocalSchema(t *testing.T) { + workspaceID := "test-workspace-id" sourceID := "test_source_id" destinationID := "test_destination_id" namespace := "test_namespace" @@ -151,8 +155,12 @@ func TestSchema_UpdateLocalSchema(t *testing.T) { schemaMap: map[string]model.WHSchema{}, } + statsStore, err := memstats.New() + require.NoError(t, err) + s := Schema{ warehouse: model.Warehouse{ + WorkspaceID: workspaceID, Source: backendconfig.SourceT{ ID: sourceID, }, @@ -165,10 +173,18 @@ func TestSchema_UpdateLocalSchema(t *testing.T) { schemaRepo: mockRepo, schemaInWarehouse: schemaInWarehouse, } + tags := stats.Tags{ + "module": "warehouse", + "workspaceId": s.warehouse.WorkspaceID, + "destType": s.warehouse.Destination.DestinationDefinition.Name, + "sourceId": s.warehouse.Source.ID, + "destinationId": s.warehouse.Destination.ID, + } + s.stats.schemaSize = statsStore.NewTaggedStat("warehouse_schema_size", stats.HistogramType, tags) ctx := context.Background() - err := s.UpdateLocalSchema(ctx, uploadID, tc.mockSchema.Schema) + err = s.UpdateLocalSchema(ctx, uploadID, tc.mockSchema.Schema) if tc.wantError == nil { require.NoError(t, err) require.Equal(t, tc.wantSchema, s.localSchema) @@ -178,6 +194,9 @@ func TestSchema_UpdateLocalSchema(t *testing.T) { require.Empty(t, s.localSchema) require.Empty(t, mockRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema) } + marshalledSchema, err := json.Marshal(tc.mockSchema.Schema) + require.NoError(t, err) + require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue()) err = s.UpdateLocalSchemaWithWarehouse(ctx, uploadID) if tc.wantError == nil { @@ -188,7 +207,11 @@ func TestSchema_UpdateLocalSchema(t *testing.T) { require.Error(t, err, fmt.Sprintf("got error %v, want error %v", err, tc.wantError)) require.Empty(t, s.localSchema) require.Empty(t, mockRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema) + require.EqualValues(t, float64(241), statsStore.Get("warehouse_schema_size", tags).LastValue()) } + marshalledSchema, err = json.Marshal(schemaInWarehouse) + require.NoError(t, err) + require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue()) }) } } @@ -1772,6 +1795,9 @@ func TestSchema_SyncRemoteSchema(t *testing.T) { require.False(t, schemaChanged) }) t.Run("schema changed", func(t *testing.T) { + statsStore, err := memstats.New() + require.NoError(t, err) + testSchema := model.Schema{ tableName: model.TableSchema{ "test_int": "int", @@ -1823,6 +1849,14 @@ func TestSchema_SyncRemoteSchema(t *testing.T) { schemaRepo: mockSchemaRepo, log: logger.NOP, } + tags := stats.Tags{ + "module": "warehouse", + "workspaceId": s.warehouse.WorkspaceID, + "destType": s.warehouse.Destination.DestinationDefinition.Name, + "sourceId": s.warehouse.Source.ID, + "destinationId": s.warehouse.Destination.ID, + } + s.stats.schemaSize = statsStore.NewTaggedStat("warehouse_schema_size", stats.HistogramType, tags) mockFetchSchemaRepo := &mockFetchSchemaRepo{ err: nil, @@ -1839,6 +1873,10 @@ func TestSchema_SyncRemoteSchema(t *testing.T) { require.Equal(t, schemaInWarehouse, mockSchemaRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema) require.Equal(t, schemaInWarehouse, s.schemaInWarehouse) require.Equal(t, schemaInWarehouse, s.unrecognizedSchemaInWarehouse) + + marshalledSchema, err := json.Marshal(s.localSchema) + require.NoError(t, err) + require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue()) }) t.Run("schema not changed", func(t *testing.T) { testSchema := model.Schema{