Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Commit

Permalink
Merge pull request #92 from zilliztech/feature_dbname
Browse files Browse the repository at this point in the history
support specify Milvus database and upsert
  • Loading branch information
wenhuiZilliz authored Aug 6, 2024
2 parents 8640c6b + 67312e1 commit 9979e85
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ datas to milvus 2.x.
|:-----------------------------------------|:------------------|
| [Milvus](https://milvus.io/) | 0.9.x, 1.x or 2.x |
| [Elasticsearch](https://www.elastic.co/) | 7.x or 8.x |
| go | 1.20.2 or later |
| go | 1.22.2 or later |

- Data Format Support

Expand Down
33 changes: 31 additions & 2 deletions README_2X.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ meta:
#......
...
```
- if you want to customize source or target milvus grpc connection request or receive message max size, you can add below config like:
- if want to customize source or target milvus grpc connection request or receive message max size, you can add like below cfg. (If not do this, there may be errors due to the request for data exceeding the default maximum limit of the Grcp server)
```yaml
...
source:
Expand All @@ -103,4 +103,33 @@ meta:
maxCallRecvMsgSize: 67108864
maxCallSendMsgSize: 268435456
...
...
...
```

- If Source Milvus collection is not in the database `default`, you can add `source.milvus2x.database` to specify database name.
```yaml
...
source:
milvus2x:
...
database: my_database
...
```
- If want to migrate data to Target Milvus collection (isn't `default` database), you can add `target.milvus2x.database` to specify database name, database name will auto create if not exists.
```yaml
...
target:
milvus2x:
...
database: my_database
...
```
- If want to use `upsert` write mode migrate data to Target Milvus, you can add `target.milvus2x.writeMode=upsert`. if use `upsert` mode means when the same primary key exists in your data of the source collection, it will be deduplicated.
```yaml
...
target:
milvus2x:
...
writeMode: upsert
...
```
10 changes: 10 additions & 0 deletions README_ES.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ target:
useSSL: true
```
- If want to migrate data to Target Milvus collection (isn't `default` database), you can add `target.milvus2x.database` to specify database name, database name will auto create if not exists.
```yaml
...
target:
milvus2x:
...
database: my_database
...
```

## migration.yaml reference

### `dumper`
Expand Down
2 changes: 2 additions & 0 deletions core/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ var LOAD_CHECK_BULK_STATE_INTERVAL = time.Second * 10 //second
var LOAD_CHECK_BACKLOG_INTERVAL = time.Second * 10 //second
// const SUB_FILE_SIZE = 1024 * 1024 * 512 //512MB
const SUB_FILE_SIZE = 1024 * 1024 * 300

const UPSERT = "upsert"
3 changes: 3 additions & 0 deletions core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Milvus2xConfig struct {
GrpcMaxRecvMsgSize int
GrpcMaxSendMsgSize int

Database string
WriteMode string //insert or upsert

Version string //internal param
hashCache atomic.Uint32
}
Expand Down
3 changes: 3 additions & 0 deletions core/config/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ func resolveTargetMilvus2xConfig(v *viper.Viper) *Milvus2xConfig {
Password: v.GetString("target.milvus2x.password"),
GrpcMaxRecvMsgSize: v.GetInt("target.milvus2x.grpc.maxCallRecvMsgSize"),
GrpcMaxSendMsgSize: v.GetInt("target.milvus2x.grpc.maxCallSendMsgSize"),
Database: v.GetString("target.milvus2x.database"),
WriteMode: v.GetString("target.milvus2x.writeMode"),
}
}

Expand Down Expand Up @@ -358,5 +360,6 @@ func resolveSourceMilvus2xConfig(v *viper.Viper) *Milvus2xConfig {
Password: v.GetString("source.milvus2x.password"),
GrpcMaxRecvMsgSize: v.GetInt("source.milvus2x.grpc.maxCallRecvMsgSize"),
GrpcMaxSendMsgSize: v.GetInt("source.milvus2x.grpc.maxCallSendMsgSize"),
Database: v.GetString("source.milvus2x.database"),
}
}
4 changes: 4 additions & 0 deletions core/dbclient/cus_field_milvus2x.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,7 @@ func (this *CustomFieldMilvus2x) CheckLoadStatus(ctx context.Context, collection
func (cus *CustomFieldMilvus2x) StartBatchInsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error {
return cus.Milvus2x.StartBatchInsert(ctx, collection, data)
}

func (cus *CustomFieldMilvus2x) StartBatchUpsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error {
return cus.Milvus2x.StartBatchUpsert(ctx, collection, data)
}
49 changes: 47 additions & 2 deletions core/dbclient/milvus2x.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (this *Milvus2x) GetMilvus() client.Client {
return this.milvus
}

// NewMilvus2xClient 这里为 target milvus的统一创建入口,和source区分开
func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) {

log.Info("begin to new milvus2x client",
Expand Down Expand Up @@ -67,11 +68,20 @@ func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) {
return nil, err
}

log.Info("[Milvus2x] begin to test connect",
log.Info("[Milvus2x] begin to test target connect",
zap.String("endpoint", cfg.Endpoint),
zap.String("username", cfg.UserName),
zap.String("databaseName", cfg.Database),
zap.Int("GrpcMaxCallRecvMsgSize", cfg.GrpcMaxRecvMsgSize),
zap.Int("GrpcMaxCallSendMsgSize", cfg.GrpcMaxSendMsgSize))

if cfg.Database != "" {
err = useDatabase(cfg, milvus, ctx)
if err != nil {
return nil, err
}
}

_, err = milvus.HasCollection(ctx, "test")
if err != nil {
return nil, err
Expand All @@ -84,6 +94,31 @@ func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) {
return c, nil
}

func useDatabase(cfg *config.Milvus2xConfig, milvus client.Client, ctx context.Context) error {
dbs, err := milvus.ListDatabases(ctx)
if err != nil {
return err
}
var dbExists = false
for _, db := range dbs {
if db.Name == cfg.Database {
dbExists = true
break
}
}
if !dbExists {
err = milvus.CreateDatabase(ctx, cfg.Database)
if err != nil {
return err
}
}
err = milvus.UsingDatabase(ctx, cfg.Database)
if err != nil {
return err
}
return nil
}

func (this *Milvus2x) CheckNeedCreateCollection(ctx context.Context, createParam *common.CollectionParam) error {
log.Info("Begin to CheckNeedCreateCollection,", zap.String("collection", createParam.CollectionName))
exist, err := this.milvus.HasCollection(ctx, createParam.CollectionName)
Expand Down Expand Up @@ -243,6 +278,16 @@ func (this *Milvus2x) StartBatchInsert(ctx context.Context, collection string, d
log.L().Info("[Loader] BatchInsert return err", zap.Error(err))
return err
}
log.LL(ctx).Info("[Loader] success to batchInsert to Milvus", zap.String("col", collection))
log.LL(ctx).Info("[Loader] success to BatchInsert to Milvus", zap.String("col", collection))
return nil
}

func (this *Milvus2x) StartBatchUpsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error {
_, err := this.milvus.Upsert(ctx, collection, "", data.Columns...)
if err != nil {
log.L().Info("[Loader] BatchUpsert return err", zap.Error(err))
return err
}
log.LL(ctx).Info("[Loader] success to BatchUpsert to Milvus", zap.String("col", collection))
return nil
}
10 changes: 7 additions & 3 deletions core/loader/cus_milvus2x_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ func (this *CustomMilvus2xLoader) compareResult(ctx context.Context) error {
return nil
}

func (this *CustomMilvus2xLoader) WriteByBatchInsert(ctx context.Context, data *milvus2x.Milvus2xData) error {
func (this *CustomMilvus2xLoader) BatchWrite(ctx context.Context, data *milvus2x.Milvus2xData) error {

log.LL(ctx).Info("[Loader] Begin to write data by batchInsert sdk to milvus", zap.String("collection", this.runtimeCollectionNames[0]))
return this.CusMilvus2x.StartBatchInsert(ctx, this.runtimeCollectionNames[0], data)
log.LL(ctx).Info("[Loader] Begin to batchWrite data to milvus", zap.String("collection", this.runtimeCollectionNames[0]))
if this.cfg.TargetMilvus2xCfg.WriteMode == common.UPSERT {
return this.CusMilvus2x.StartBatchUpsert(ctx, this.runtimeCollectionNames[0], data)
} else {
return this.CusMilvus2x.StartBatchInsert(ctx, this.runtimeCollectionNames[0], data)
}
}
2 changes: 1 addition & 1 deletion starter/migration/milvus2x_starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (starter *Starter) dumpByIterator(ctx context.Context, collCfg *milvus2xtyp

func (starter *Starter) loadByBatchInsert(ctx context.Context, dataChannel chan *milvus2x.Milvus2xData) error {
for data := range dataChannel {
err := starter.Loader.WriteByBatchInsert(ctx, data)
err := starter.Loader.BatchWrite(ctx, data)
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion storage/milvus2x/milvus2_3_ver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (milvus23 *Milvus23VerClient) DescCollection(ctx context.Context, collectio
return collEntity, nil
}

// 这里统一给source创建milvus client, 和target区分开
func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, error) {

log.Info("[Milvus23x] begin to new milvus client", zap.String("endPoint", cfg.Endpoint))
Expand Down Expand Up @@ -148,12 +149,19 @@ func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, e
return nil, err
}

log.Info("[Milvus23x] begin to test connect",
log.Info("[Milvus23x] begin to test source connect",
zap.String("endpoint", cfg.Endpoint),
zap.String("username", cfg.UserName),
zap.String("databaseName", cfg.Database),
zap.Int("GrpcMaxCallRecvMsgSize", cfg.GrpcMaxRecvMsgSize),
zap.Int("GrpcMaxCallSendMsgSize", cfg.GrpcMaxSendMsgSize))

if cfg.Database != "" {
err := milvus.UsingDatabase(ctx, cfg.Database)
if err != nil {
return nil, err
}
}
_, err = milvus.HasCollection(ctx, "test")
if err != nil {
return nil, err
Expand Down

0 comments on commit 9979e85

Please sign in to comment.