Skip to content

Commit

Permalink
Merge pull request #34 from aliyun/feature/new_sequence
Browse files Browse the repository at this point in the history
de duplication of query feature names
  • Loading branch information
bruceding authored Oct 22, 2024
2 parents dfcd8a0 + c8ff4e9 commit 2134bc4
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 4 deletions.
6 changes: 4 additions & 2 deletions api/model_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ type Datasource struct {
Ak Ak `json:"-"`

TestMode bool `json:"-"`

HologresPrefix string `json:"-"`
}

func (d *Datasource) GenerateDSN(datasourceType string) (DSN string) {
if datasourceType == constants.Datasource_Type_Hologres {
if d.TestMode {
if d.Ak.SecurityToken != "" {
DSN = fmt.Sprintf("postgres://paifsslr$%s:%s@%s/%s?sslmode=disable&connect_timeout=10&options=sts_token=%s",
DSN = fmt.Sprintf("postgres://%s%s:%s@%s/%s?sslmode=disable&connect_timeout=10&options=sts_token=%s", d.HologresPrefix,
d.Ak.AccesskeyId, d.Ak.AccesskeySecret, d.PublicAddress, d.Database, url.QueryEscape(d.Ak.SecurityToken))
} else {
DSN = fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&connect_timeout=10",
d.Ak.AccesskeyId, d.Ak.AccesskeySecret, d.PublicAddress, d.Database)
}
} else {
if d.Ak.SecurityToken != "" {
DSN = fmt.Sprintf("postgres://paifsslr$%s:%s@%s/%s?sslmode=disable&connect_timeout=10&options=sts_token=%s",
DSN = fmt.Sprintf("postgres://%s%s:%s@%s/%s?sslmode=disable&connect_timeout=10&options=sts_token=%s", d.HologresPrefix,
d.Ak.AccesskeyId, d.Ak.AccesskeySecret, d.VpcAddress, d.Database, url.QueryEscape(d.Ak.SecurityToken))
} else {
DSN = fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&connect_timeout=10",
Expand Down
7 changes: 6 additions & 1 deletion domain/base_feature_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func NewBaseFeatureView(view *api.FeatureView, p *Project, entity *FeatureEntity
func (f *BaseFeatureView) GetOnlineFeatures(joinIds []interface{}, features []string, alias map[string]string) ([]map[string]interface{}, error) {
var selectFields []string
selectFields = append(selectFields, f.primaryKeyField.Name)
seenFields := make(map[string]bool)
seenFields[f.primaryKeyField.Name] = true
for _, featureName := range features {
if featureName == "*" {
selectFields = append(selectFields, f.featureFields...)
Expand All @@ -148,7 +150,10 @@ func (f *BaseFeatureView) GetOnlineFeatures(joinIds []interface{}, features []st
return nil, fmt.Errorf("feature name :%s not found in the featureview fields", featureName)
}

selectFields = append(selectFields, featureName)
if !seenFields[featureName] {
selectFields = append(selectFields, featureName)
seenFields[featureName] = true
}
}
}

Expand Down
16 changes: 15 additions & 1 deletion domain/sequence_feature_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ func NewSequenceFeatureView(view *api.FeatureView, p *Project, entity *FeatureEn
sequenceFeatureView.offline_2_online_seq_map[seqConfig.OfflineSeqName] = seqConfig.OnlineSeqName
}

seen := make(map[string]bool)
var uniqueSeqConfigs []*api.SeqConfig
for _, seqConfig := range sequenceFeatureView.sequenceConfig.SeqConfig {
if !seen[seqConfig.OnlineSeqName] {
uniqueSeqConfigs = append(uniqueSeqConfigs, seqConfig)
seen[seqConfig.OnlineSeqName] = true
}
}
sequenceFeatureView.sequenceConfig.SeqConfig = uniqueSeqConfigs

requiredElements1 := []string{"user_id", "item_id", "event"}
requiredElements2 := []string{"user_id", "item_id", "event", "timestamp"}
if len(sequenceFeatureView.sequenceConfig.DeduplicationMethod) == len(requiredElements1) {
Expand Down Expand Up @@ -105,6 +115,7 @@ func NewSequenceFeatureView(view *api.FeatureView, p *Project, entity *FeatureEn
func (f *SequenceFeatureView) GetOnlineFeatures(joinIds []interface{}, features []string, alias map[string]string) ([]map[string]interface{}, error) {
sequenceConfig := f.sequenceConfig
onlineConfig := []*api.SeqConfig{}
seenFields := make(map[string]bool)

for _, feature := range features {
if feature == "*" {
Expand All @@ -115,7 +126,10 @@ func (f *SequenceFeatureView) GetOnlineFeatures(joinIds []interface{}, features
for _, seqConfig := range sequenceConfig.SeqConfig {
if seqConfig.OnlineSeqName == feature {
found = true
onlineConfig = append(onlineConfig, seqConfig)
if !seenFields[feature] {
onlineConfig = append(onlineConfig, seqConfig)
seenFields[feature] = true
}
break
}
}
Expand Down
10 changes: 10 additions & 0 deletions featurestore/feature_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func WithToken(token string) ClientOption {
}
}

func WithHologresPrefix(hologresPrefix string) ClientOption {
return func(e *FeatureStoreClient) {
e.hologresPrefix = hologresPrefix
}
}

type FeatureStoreClient struct {
// loopLoadData flag to invoke loopLoadProjectData function
loopLoadData bool
Expand Down Expand Up @@ -110,6 +116,9 @@ type FeatureStoreClient struct {

// sts token
token string

// hologres prefix for sts token
hologresPrefix string
}

func NewFeatureStoreClient(regionId, accessKeyId, accessKeySecret, projectName string, opts ...ClientOption) (*FeatureStoreClient, error) {
Expand Down Expand Up @@ -216,6 +225,7 @@ func (c *FeatureStoreClient) LoadProjectData() error {
p.OnlineDataSource = getDataSourceResponse.Datasource
p.OnlineDataSource.Ak = ak
p.OnlineDataSource.TestMode = c.testMode
p.OnlineDataSource.HologresPrefix = c.hologresPrefix

getDataSourceResponse, err = c.client.DatasourceApi.DatasourceDatasourceIdGet(p.OfflineDatasourceId, c.hologresPort, c.hologresPublicAddress)
if err != nil {
Expand Down

0 comments on commit 2134bc4

Please sign in to comment.