From 3cd44bfaf57c654186fc19b602bfbaef8418ad46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=83=E6=9E=AB?= Date: Mon, 14 Oct 2024 17:27:37 +0800 Subject: [PATCH 1/2] feat: add Hologres prefix support for sts --- api/model_datasource.go | 6 ++++-- featurestore/feature_store_client.go | 10 ++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/api/model_datasource.go b/api/model_datasource.go index 0b5e882..da808c8 100644 --- a/api/model_datasource.go +++ b/api/model_datasource.go @@ -26,13 +26,15 @@ type Datasource struct { Ak Ak `json:"-"` TestMode bool `json:"-"` + + HologresPrefix string `json:"-"` } func (d *Datasource) GenerateDSN(datasourceType string) (DSN string) { if datasourceType == constants.Datasource_Type_Hologres { if d.TestMode { if d.Ak.SecurityToken != "" { - DSN = fmt.Sprintf("postgres://paifsslr$%s:%s@%s/%s?sslmode=disable&connect_timeout=10&options=sts_token=%s", + DSN = fmt.Sprintf("postgres://%s%s:%s@%s/%s?sslmode=disable&connect_timeout=10&options=sts_token=%s", d.HologresPrefix, d.Ak.AccesskeyId, d.Ak.AccesskeySecret, d.PublicAddress, d.Database, url.QueryEscape(d.Ak.SecurityToken)) } else { DSN = fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&connect_timeout=10", @@ -40,7 +42,7 @@ func (d *Datasource) GenerateDSN(datasourceType string) (DSN string) { } } else { if d.Ak.SecurityToken != "" { - DSN = fmt.Sprintf("postgres://paifsslr$%s:%s@%s/%s?sslmode=disable&connect_timeout=10&options=sts_token=%s", + DSN = fmt.Sprintf("postgres://%s%s:%s@%s/%s?sslmode=disable&connect_timeout=10&options=sts_token=%s", d.HologresPrefix, d.Ak.AccesskeyId, d.Ak.AccesskeySecret, d.VpcAddress, d.Database, url.QueryEscape(d.Ak.SecurityToken)) } else { DSN = fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&connect_timeout=10", diff --git a/featurestore/feature_store_client.go b/featurestore/feature_store_client.go index db49795..40fd10b 100644 --- a/featurestore/feature_store_client.go +++ b/featurestore/feature_store_client.go @@ -77,6 +77,12 @@ func WithToken(token string) ClientOption { } } +func WithHologresPrefix(hologresPrefix string) ClientOption { + return func(e *FeatureStoreClient) { + e.hologresPrefix = hologresPrefix + } +} + type FeatureStoreClient struct { // loopLoadData flag to invoke loopLoadProjectData function loopLoadData bool @@ -110,6 +116,9 @@ type FeatureStoreClient struct { // sts token token string + + // hologres prefix for sts token + hologresPrefix string } func NewFeatureStoreClient(regionId, accessKeyId, accessKeySecret, projectName string, opts ...ClientOption) (*FeatureStoreClient, error) { @@ -216,6 +225,7 @@ func (c *FeatureStoreClient) LoadProjectData() error { p.OnlineDataSource = getDataSourceResponse.Datasource p.OnlineDataSource.Ak = ak p.OnlineDataSource.TestMode = c.testMode + p.OnlineDataSource.HologresPrefix = c.hologresPrefix getDataSourceResponse, err = c.client.DatasourceApi.DatasourceDatasourceIdGet(p.OfflineDatasourceId, c.hologresPort, c.hologresPublicAddress) if err != nil { From c8ff4e9e200abba0b0d7425ae2f51fbaacf03a7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=83=E6=9E=AB?= Date: Tue, 15 Oct 2024 11:49:06 +0800 Subject: [PATCH 2/2] perf: de duplication of query feature names --- domain/base_feature_view.go | 7 ++++++- domain/sequence_feature_view.go | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/domain/base_feature_view.go b/domain/base_feature_view.go index f1dffc9..a4619ee 100644 --- a/domain/base_feature_view.go +++ b/domain/base_feature_view.go @@ -133,6 +133,8 @@ func NewBaseFeatureView(view *api.FeatureView, p *Project, entity *FeatureEntity func (f *BaseFeatureView) GetOnlineFeatures(joinIds []interface{}, features []string, alias map[string]string) ([]map[string]interface{}, error) { var selectFields []string selectFields = append(selectFields, f.primaryKeyField.Name) + seenFields := make(map[string]bool) + seenFields[f.primaryKeyField.Name] = true for _, featureName := range features { if featureName == "*" { selectFields = append(selectFields, f.featureFields...) @@ -148,7 +150,10 @@ func (f *BaseFeatureView) GetOnlineFeatures(joinIds []interface{}, features []st return nil, fmt.Errorf("feature name :%s not found in the featureview fields", featureName) } - selectFields = append(selectFields, featureName) + if !seenFields[featureName] { + selectFields = append(selectFields, featureName) + seenFields[featureName] = true + } } } diff --git a/domain/sequence_feature_view.go b/domain/sequence_feature_view.go index 2ede286..3cc7b26 100644 --- a/domain/sequence_feature_view.go +++ b/domain/sequence_feature_view.go @@ -43,6 +43,16 @@ func NewSequenceFeatureView(view *api.FeatureView, p *Project, entity *FeatureEn sequenceFeatureView.offline_2_online_seq_map[seqConfig.OfflineSeqName] = seqConfig.OnlineSeqName } + seen := make(map[string]bool) + var uniqueSeqConfigs []*api.SeqConfig + for _, seqConfig := range sequenceFeatureView.sequenceConfig.SeqConfig { + if !seen[seqConfig.OnlineSeqName] { + uniqueSeqConfigs = append(uniqueSeqConfigs, seqConfig) + seen[seqConfig.OnlineSeqName] = true + } + } + sequenceFeatureView.sequenceConfig.SeqConfig = uniqueSeqConfigs + requiredElements1 := []string{"user_id", "item_id", "event"} requiredElements2 := []string{"user_id", "item_id", "event", "timestamp"} if len(sequenceFeatureView.sequenceConfig.DeduplicationMethod) == len(requiredElements1) { @@ -105,6 +115,7 @@ func NewSequenceFeatureView(view *api.FeatureView, p *Project, entity *FeatureEn func (f *SequenceFeatureView) GetOnlineFeatures(joinIds []interface{}, features []string, alias map[string]string) ([]map[string]interface{}, error) { sequenceConfig := f.sequenceConfig onlineConfig := []*api.SeqConfig{} + seenFields := make(map[string]bool) for _, feature := range features { if feature == "*" { @@ -115,7 +126,10 @@ func (f *SequenceFeatureView) GetOnlineFeatures(joinIds []interface{}, features for _, seqConfig := range sequenceConfig.SeqConfig { if seqConfig.OnlineSeqName == feature { found = true - onlineConfig = append(onlineConfig, seqConfig) + if !seenFields[feature] { + onlineConfig = append(onlineConfig, seqConfig) + seenFields[feature] = true + } break } }