From e4232e5e4da7ede5c8e4b08b184006fcec3907c7 Mon Sep 17 00:00:00 2001 From: liugddx Date: Sun, 4 Jun 2023 20:27:48 +0800 Subject: [PATCH 01/21] add milvus connector --- .../connector-milvus/pom.xml | 59 +++++++++ .../seatunnel/milvus/config/MilvusConfig.java | 99 ++++++++++++++++ .../milvus/config/MilvusOptions.java | 52 ++++++++ .../exception/MilvusConnectorErrorCode.java | 21 ++++ .../exception/MilvusConnectorException.java | 31 +++++ .../seatunnel/milvus/sink/MilvusSink.java | 84 +++++++++++++ .../milvus/sink/MilvusSinkFactory.java | 46 +++++++ .../milvus/sink/MilvusSinkWriter.java | 112 ++++++++++++++++++ seatunnel-connectors-v2/pom.xml | 1 + 9 files changed, 505 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-milvus/pom.xml create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java diff --git a/seatunnel-connectors-v2/connector-milvus/pom.xml b/seatunnel-connectors-v2/connector-milvus/pom.xml new file mode 100644 index 00000000000..42e84e3687d --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/pom.xml @@ -0,0 +1,59 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-milvus + SeaTunnel : Connectors V2 : Milvus + + + 2.2.2 + 0.12.0 + + + + + + io.milvus + milvus-sdk-java + ${milvus.version} + + + + com.theokanning.openai-gpt3-java + service + ${openai.sdk.version} + + + + org.apache.seatunnel + connector-common + ${project.version} + + + + + diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java new file mode 100644 index 00000000000..acd1f774cef --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.sink.SeaTunnelSink; + +import java.io.Serializable; + +/** + * Utility class to milvus configuration options, used by {@link SeaTunnelSink}. + */ +public class MilvusConfig implements Serializable { + + + public static final Option MILVUS_HOST = + Options.key("milvus_host") + .stringType() + .noDefaultValue() + .withDescription("The milvus host"); + + public static final Option MILVUS_PORT = + Options.key("milvus_port") + .intType() + .defaultValue(19530) + .withDescription("This port is for gRPC. Default is 19530"); + + public static final Option USERNAME = + Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("The username of milvus server."); + + public static final Option PASSWORD = + Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("The password of milvus server."); + + public static final Option COLLECTION_NAME = + Options.key("collection_name") + .stringType() + .noDefaultValue() + .withDescription("A collection of milvus, which is similar to a table in a relational database."); + + public static final Option PARTITION_FIELD = + Options.key("partition_field") + .stringType() + .noDefaultValue() + .withDescription("Partition fields, which must be included in the collection's schema."); + + + public static final Option OPENAI_ENGINE = + Options.key("openai_engine") + .stringType() + .noDefaultValue() + .withDescription("Text embedding model. Default is 'text-embedding-ada-002'"); + + public static final Option OPENAI_API_KEY = + Options.key("openai_api_key") + .stringType() + .noDefaultValue() + .withDescription("Use your own Open AI API Key here."); + + + + public static final Option DIMENSION = + Options.key("dimension") + .intType() + .noDefaultValue() + .withDescription("Embeddings size."); + + + + public static final Option EMBEDDINGS_FIELDS = + Options.key("embeddings_fields") + .stringType() + .noDefaultValue() + .withDescription("Fields to be embedded,They use`,`for splitting"); + + + +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java new file mode 100644 index 00000000000..4ab77db1aa3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java @@ -0,0 +1,52 @@ +package org.apache.seatunnel.connectors.seatunnel.milvus.config; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class MilvusOptions implements Serializable { + + private String milvusHost; + private Integer milvusPort; + private String collectionName; + private String partitionField; + private String userName; + private String password; + private String openaiEngine; + private String openaiApiKey; + private Integer dimension; + private String embeddingsFields; + + public MilvusOptions(Config config){ + this.milvusHost = config.getString(MilvusConfig.MILVUS_HOST.key()); + this.milvusPort = config.getInt(MilvusConfig.MILVUS_PORT.key()); + this.collectionName = config.getString(MilvusConfig.COLLECTION_NAME.key()); + this.userName = config.getString(MilvusConfig.USERNAME.key()); + this.password = config.getString(MilvusConfig.PASSWORD.key()); + + if (config.hasPath(MilvusConfig.PARTITION_FIELD.key())){ + this.partitionField = config.getString(MilvusConfig.PARTITION_FIELD.key()); + } + if (config.hasPath(MilvusConfig.OPENAI_ENGINE.key())){ + this.openaiEngine = config.getString(MilvusConfig.OPENAI_ENGINE.key()); + } + if (config.hasPath(MilvusConfig.OPENAI_API_KEY.key())){ + this.openaiApiKey = config.getString(MilvusConfig.OPENAI_API_KEY.key()); + } + if (config.hasPath(MilvusConfig.DIMENSION.key())){ + this.dimension = config.getInt(MilvusConfig.DIMENSION.key()); + } + if (config.hasPath(MilvusConfig.EMBEDDINGS_FIELDS.key())){ + this.embeddingsFields = config.getString(MilvusConfig.EMBEDDINGS_FIELDS.key()); + } + + } + + + + +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java new file mode 100644 index 00000000000..66e656c1fd9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.exception; + +public class MilvusConnectorErrorCode { +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java new file mode 100644 index 00000000000..8d8da35d3be --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class MilvusConnectorException extends SeaTunnelRuntimeException { + public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + public MilvusConnectorException( + SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java new file mode 100644 index 00000000000..c77f1c969fd --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class MilvusSink extends AbstractSimpleSink { + + private SeaTunnelRowType seaTunnelRowType; + + private MilvusOptions milvusOptions; + + @Override + public String getPluginName() { + return "Milvus"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + milvusOptions = new MilvusOptions(pluginConfig); + CheckResult result = + CheckConfigUtil.checkAllExists( + pluginConfig, + MilvusConfig.MILVUS_HOST.key(), + MilvusConfig.MILVUS_PORT.key(), + MilvusConfig.COLLECTION_NAME.key()); + if (!result.isSuccess()) { + throw new MilvusConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, result.getMsg())); + } + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new MilvusSinkWriter(seaTunnelRowType, milvusOptions); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java new file mode 100644 index 00000000000..23b4ecc3372 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; + +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.*; + +@AutoService(Factory.class) +public class MilvusSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Milvus"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + MILVUS_HOST, + MILVUS_PORT, + COLLECTION_NAME, + USERNAME, + PASSWORD) + .optional(PARTITION_FIELD, OPENAI_ENGINE, OPENAI_API_KEY, DIMENSION, EMBEDDINGS_FIELDS) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java new file mode 100644 index 00000000000..ad8595dc0d8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.sink; + +import com.theokanning.openai.embedding.EmbeddingRequest; +import com.theokanning.openai.embedding.EmbeddingResult; +import com.theokanning.openai.service.OpenAiService; +import io.milvus.client.MilvusServiceClient; +import io.milvus.param.ConnectParam; +import io.milvus.param.collection.FlushParam; +import io.milvus.param.dml.InsertParam; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class MilvusSinkWriter extends AbstractSinkWriter { + + private final SeaTunnelRowType seaTunnelRowType; + + private final MilvusServiceClient milvusClient; + + private final MilvusOptions milvusOptions; + + private OpenAiService service; + + public MilvusSinkWriter(SeaTunnelRowType seaTunnelRowType, MilvusOptions milvusOptions) { + this.seaTunnelRowType = seaTunnelRowType; + this.milvusOptions = milvusOptions; + ConnectParam connectParam = ConnectParam.newBuilder() + .withHost(milvusOptions.getMilvusHost()) + .withPort(milvusOptions.getMilvusPort()) + .withAuthorization(milvusOptions.getUserName(),milvusOptions.getPassword()) + .build(); + milvusClient = new MilvusServiceClient(connectParam); + + if (milvusOptions.getEmbeddingsFields() != null){ + service = new OpenAiService(milvusOptions.getOpenaiApiKey()); + } + + } + + + @Override + public void write(SeaTunnelRow element) throws IOException { + + List fields = new ArrayList<>(); + + InsertParam.Builder builder = InsertParam.newBuilder(); + + builder = builder.withCollectionName(milvusOptions.getCollectionName()); + + for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) { + if (milvusOptions.getPartitionField() != null && milvusOptions.getPartitionField().equals(seaTunnelRowType.getFieldName(i))){ + builder.withPartitionName(String.valueOf(element.getField(i))); + } + if (milvusOptions.getEmbeddingsFields() != null){ + List embeddingsFields = Arrays.asList(milvusOptions.getEmbeddingsFields().split(",")); + if (embeddingsFields.contains(seaTunnelRowType.getFieldName(i))){ + EmbeddingResult embeddings = service.createEmbeddings(EmbeddingRequest.builder().model(milvusOptions.getOpenaiEngine()).input(Collections.singletonList(String.valueOf(element.getField(i)))).build()); + List embedding = embeddings.getData().get(0).getEmbedding(); + InsertParam.Field field = new InsertParam.Field(seaTunnelRowType.getFieldName(i), Collections.singletonList(embedding)); + fields.add(field); + continue; + } + } + InsertParam.Field field = new InsertParam.Field(seaTunnelRowType.getFieldName(i), Collections.singletonList(element.getField(i))); + fields.add(field); + } + + if (milvusOptions.getPartitionField() != null){ + builder.withPartitionName(milvusOptions.getPartitionField()); + } + + InsertParam build = builder.withFields(fields).build(); + + milvusClient.insert(build); + } + + + @Override + public List snapshotState(long checkpointId) throws IOException { + milvusClient.flush(FlushParam.newBuilder().addCollectionName(milvusOptions.getCollectionName()).build()); + return Collections.emptyList(); + } + + @Override + public void close() throws IOException { + milvusClient.close(); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 8af51a0d0e5..6f70ecd3323 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -73,6 +73,7 @@ connector-hbase connector-rocketmq connector-paimon + connector-milvus From 8ed28ab574a8a32ffcbae0b2bb48dc334ec10456 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 5 Jun 2023 13:22:37 +0800 Subject: [PATCH 02/21] fix code style --- .../seatunnel/milvus/config/MilvusConfig.java | 24 ++------ .../milvus/config/MilvusOptions.java | 20 +++--- .../exception/MilvusConnectorErrorCode.java | 21 ------- .../exception/MilvusConnectorException.java | 1 + .../seatunnel/milvus/sink/MilvusSink.java | 9 ++- .../milvus/sink/MilvusSinkFactory.java | 28 ++++++--- .../milvus/sink/MilvusSinkWriter.java | 61 ++++++++++++------- 7 files changed, 79 insertions(+), 85 deletions(-) delete mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java index acd1f774cef..e0219cbafd7 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java @@ -23,12 +23,9 @@ import java.io.Serializable; -/** - * Utility class to milvus configuration options, used by {@link SeaTunnelSink}. - */ +/** Utility class to milvus configuration options, used by {@link SeaTunnelSink}. */ public class MilvusConfig implements Serializable { - public static final Option MILVUS_HOST = Options.key("milvus_host") .stringType() @@ -57,14 +54,15 @@ public class MilvusConfig implements Serializable { Options.key("collection_name") .stringType() .noDefaultValue() - .withDescription("A collection of milvus, which is similar to a table in a relational database."); + .withDescription( + "A collection of milvus, which is similar to a table in a relational database."); public static final Option PARTITION_FIELD = Options.key("partition_field") .stringType() .noDefaultValue() - .withDescription("Partition fields, which must be included in the collection's schema."); - + .withDescription( + "Partition fields, which must be included in the collection's schema."); public static final Option OPENAI_ENGINE = Options.key("openai_engine") @@ -78,22 +76,12 @@ public class MilvusConfig implements Serializable { .noDefaultValue() .withDescription("Use your own Open AI API Key here."); - - public static final Option DIMENSION = - Options.key("dimension") - .intType() - .noDefaultValue() - .withDescription("Embeddings size."); - - + Options.key("dimension").intType().noDefaultValue().withDescription("Embeddings size."); public static final Option EMBEDDINGS_FIELDS = Options.key("embeddings_fields") .stringType() .noDefaultValue() .withDescription("Fields to be embedded,They use`,`for splitting"); - - - } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java index 4ab77db1aa3..778ad2d5295 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java @@ -1,8 +1,9 @@ package org.apache.seatunnel.connectors.seatunnel.milvus.config; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import lombok.AllArgsConstructor; import lombok.Data; -import org.apache.seatunnel.shade.com.typesafe.config.Config; import java.io.Serializable; @@ -21,32 +22,27 @@ public class MilvusOptions implements Serializable { private Integer dimension; private String embeddingsFields; - public MilvusOptions(Config config){ + public MilvusOptions(Config config) { this.milvusHost = config.getString(MilvusConfig.MILVUS_HOST.key()); this.milvusPort = config.getInt(MilvusConfig.MILVUS_PORT.key()); this.collectionName = config.getString(MilvusConfig.COLLECTION_NAME.key()); this.userName = config.getString(MilvusConfig.USERNAME.key()); this.password = config.getString(MilvusConfig.PASSWORD.key()); - if (config.hasPath(MilvusConfig.PARTITION_FIELD.key())){ + if (config.hasPath(MilvusConfig.PARTITION_FIELD.key())) { this.partitionField = config.getString(MilvusConfig.PARTITION_FIELD.key()); } - if (config.hasPath(MilvusConfig.OPENAI_ENGINE.key())){ + if (config.hasPath(MilvusConfig.OPENAI_ENGINE.key())) { this.openaiEngine = config.getString(MilvusConfig.OPENAI_ENGINE.key()); } - if (config.hasPath(MilvusConfig.OPENAI_API_KEY.key())){ + if (config.hasPath(MilvusConfig.OPENAI_API_KEY.key())) { this.openaiApiKey = config.getString(MilvusConfig.OPENAI_API_KEY.key()); } - if (config.hasPath(MilvusConfig.DIMENSION.key())){ + if (config.hasPath(MilvusConfig.DIMENSION.key())) { this.dimension = config.getInt(MilvusConfig.DIMENSION.key()); } - if (config.hasPath(MilvusConfig.EMBEDDINGS_FIELDS.key())){ + if (config.hasPath(MilvusConfig.EMBEDDINGS_FIELDS.key())) { this.embeddingsFields = config.getString(MilvusConfig.EMBEDDINGS_FIELDS.key()); } - } - - - - } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java deleted file mode 100644 index 66e656c1fd9..00000000000 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.milvus.exception; - -public class MilvusConnectorErrorCode { -} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java index 8d8da35d3be..88ee8d105a4 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java @@ -24,6 +24,7 @@ public class MilvusConnectorException extends SeaTunnelRuntimeException { public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { super(seaTunnelErrorCode, errorMessage); } + public MilvusConnectorException( SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { super(seaTunnelErrorCode, errorMessage, cause); diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java index c77f1c969fd..9942763189f 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java @@ -17,7 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.milvus.sink; -import com.google.auto.service.AutoService; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -33,7 +34,8 @@ import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig; import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; -import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; import java.io.IOException; @@ -78,7 +80,8 @@ public SeaTunnelDataType getConsumedType() { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + public AbstractSinkWriter createWriter(SinkWriter.Context context) + throws IOException { return new MilvusSinkWriter(seaTunnelRowType, milvusOptions); } } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java index 23b4ecc3372..b1aad544746 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java @@ -17,12 +17,22 @@ package org.apache.seatunnel.connectors.seatunnel.milvus.sink; -import com.google.auto.service.AutoService; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.*; +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.COLLECTION_NAME; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.DIMENSION; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.EMBEDDINGS_FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.MILVUS_HOST; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.MILVUS_PORT; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.OPENAI_API_KEY; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.OPENAI_ENGINE; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.PARTITION_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.USERNAME; @AutoService(Factory.class) public class MilvusSinkFactory implements TableSinkFactory { @@ -34,13 +44,13 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required( - MILVUS_HOST, - MILVUS_PORT, - COLLECTION_NAME, - USERNAME, - PASSWORD) - .optional(PARTITION_FIELD, OPENAI_ENGINE, OPENAI_API_KEY, DIMENSION, EMBEDDINGS_FIELDS) + .required(MILVUS_HOST, MILVUS_PORT, COLLECTION_NAME, USERNAME, PASSWORD) + .optional( + PARTITION_FIELD, + OPENAI_ENGINE, + OPENAI_API_KEY, + DIMENSION, + EMBEDDINGS_FIELDS) .build(); } } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index ad8595dc0d8..61676512295 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -17,6 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.milvus.sink; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; + import com.theokanning.openai.embedding.EmbeddingRequest; import com.theokanning.openai.embedding.EmbeddingResult; import com.theokanning.openai.service.OpenAiService; @@ -24,10 +29,6 @@ import io.milvus.param.ConnectParam; import io.milvus.param.collection.FlushParam; import io.milvus.param.dml.InsertParam; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; import java.io.IOException; import java.util.ArrayList; @@ -48,20 +49,19 @@ public class MilvusSinkWriter extends AbstractSinkWriter { public MilvusSinkWriter(SeaTunnelRowType seaTunnelRowType, MilvusOptions milvusOptions) { this.seaTunnelRowType = seaTunnelRowType; this.milvusOptions = milvusOptions; - ConnectParam connectParam = ConnectParam.newBuilder() - .withHost(milvusOptions.getMilvusHost()) - .withPort(milvusOptions.getMilvusPort()) - .withAuthorization(milvusOptions.getUserName(),milvusOptions.getPassword()) - .build(); + ConnectParam connectParam = + ConnectParam.newBuilder() + .withHost(milvusOptions.getMilvusHost()) + .withPort(milvusOptions.getMilvusPort()) + .withAuthorization(milvusOptions.getUserName(), milvusOptions.getPassword()) + .build(); milvusClient = new MilvusServiceClient(connectParam); - if (milvusOptions.getEmbeddingsFields() != null){ + if (milvusOptions.getEmbeddingsFields() != null) { service = new OpenAiService(milvusOptions.getOpenaiApiKey()); } - } - @Override public void write(SeaTunnelRow element) throws IOException { @@ -72,24 +72,39 @@ public void write(SeaTunnelRow element) throws IOException { builder = builder.withCollectionName(milvusOptions.getCollectionName()); for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) { - if (milvusOptions.getPartitionField() != null && milvusOptions.getPartitionField().equals(seaTunnelRowType.getFieldName(i))){ + if (milvusOptions.getPartitionField() != null + && milvusOptions.getPartitionField().equals(seaTunnelRowType.getFieldName(i))) { builder.withPartitionName(String.valueOf(element.getField(i))); } - if (milvusOptions.getEmbeddingsFields() != null){ - List embeddingsFields = Arrays.asList(milvusOptions.getEmbeddingsFields().split(",")); - if (embeddingsFields.contains(seaTunnelRowType.getFieldName(i))){ - EmbeddingResult embeddings = service.createEmbeddings(EmbeddingRequest.builder().model(milvusOptions.getOpenaiEngine()).input(Collections.singletonList(String.valueOf(element.getField(i)))).build()); + if (milvusOptions.getEmbeddingsFields() != null) { + List embeddingsFields = + Arrays.asList(milvusOptions.getEmbeddingsFields().split(",")); + if (embeddingsFields.contains(seaTunnelRowType.getFieldName(i))) { + EmbeddingResult embeddings = + service.createEmbeddings( + EmbeddingRequest.builder() + .model(milvusOptions.getOpenaiEngine()) + .input( + Collections.singletonList( + String.valueOf(element.getField(i)))) + .build()); List embedding = embeddings.getData().get(0).getEmbedding(); - InsertParam.Field field = new InsertParam.Field(seaTunnelRowType.getFieldName(i), Collections.singletonList(embedding)); + InsertParam.Field field = + new InsertParam.Field( + seaTunnelRowType.getFieldName(i), + Collections.singletonList(embedding)); fields.add(field); continue; } } - InsertParam.Field field = new InsertParam.Field(seaTunnelRowType.getFieldName(i), Collections.singletonList(element.getField(i))); + InsertParam.Field field = + new InsertParam.Field( + seaTunnelRowType.getFieldName(i), + Collections.singletonList(element.getField(i))); fields.add(field); } - if (milvusOptions.getPartitionField() != null){ + if (milvusOptions.getPartitionField() != null) { builder.withPartitionName(milvusOptions.getPartitionField()); } @@ -98,10 +113,12 @@ public void write(SeaTunnelRow element) throws IOException { milvusClient.insert(build); } - @Override public List snapshotState(long checkpointId) throws IOException { - milvusClient.flush(FlushParam.newBuilder().addCollectionName(milvusOptions.getCollectionName()).build()); + milvusClient.flush( + FlushParam.newBuilder() + .addCollectionName(milvusOptions.getCollectionName()) + .build()); return Collections.emptyList(); } From cead8d727a0b784425a68c5c3f89f8f564dd3da0 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 5 Jun 2023 15:29:27 +0800 Subject: [PATCH 03/21] add connector-milvus in dist --- seatunnel-dist/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 80ef7209da3..bd8dfd2a4e2 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -509,6 +509,13 @@ provided + + org.apache.seatunnel + connector-milvus + ${project.version} + provided + + com.aliyun.phoenix From 5724ef009fffd3db5630690e3aaa707ab9fa76b8 Mon Sep 17 00:00:00 2001 From: liugddx Date: Mon, 5 Jun 2023 23:13:10 +0800 Subject: [PATCH 04/21] close openai client --- .../connectors/seatunnel/milvus/sink/MilvusSinkWriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 61676512295..21af7ef3c26 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -125,5 +125,6 @@ public List snapshotState(long checkpointId) throws IOException { @Override public void close() throws IOException { milvusClient.close(); + service.shutdownExecutor(); } } From 6fb2120723961ac95ca1f7f5a93bb75e0bec6b4e Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Tue, 6 Jun 2023 12:21:09 +0800 Subject: [PATCH 05/21] add license header --- .../seatunnel/milvus/config/MilvusOptions.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java index 778ad2d5295..68591930f44 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.milvus.config; import org.apache.seatunnel.shade.com.typesafe.config.Config; From 4568e9be12ba9f7ce411a5ca66ddf981d9fb8e6a Mon Sep 17 00:00:00 2001 From: liugddx Date: Wed, 7 Jun 2023 22:24:00 +0800 Subject: [PATCH 06/21] fix some bug --- .../seatunnel/milvus/sink/MilvusSinkWriter.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 21af7ef3c26..f2efa6196b1 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.milvus.sink; +import io.milvus.grpc.MutationResult; +import io.milvus.param.R; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; @@ -35,6 +37,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; public class MilvusSinkWriter extends AbstractSinkWriter { @@ -89,10 +92,11 @@ public void write(SeaTunnelRow element) throws IOException { String.valueOf(element.getField(i)))) .build()); List embedding = embeddings.getData().get(0).getEmbedding(); + List collect = embedding.stream().map(Double::floatValue).collect(Collectors.toList()); InsertParam.Field field = new InsertParam.Field( seaTunnelRowType.getFieldName(i), - Collections.singletonList(embedding)); + Collections.singletonList(collect)); fields.add(field); continue; } @@ -110,7 +114,7 @@ public void write(SeaTunnelRow element) throws IOException { InsertParam build = builder.withFields(fields).build(); - milvusClient.insert(build); + handleResponseStatus(milvusClient.insert(build)); } @Override @@ -127,4 +131,10 @@ public void close() throws IOException { milvusClient.close(); service.shutdownExecutor(); } + + private void handleResponseStatus(R r) { + if (r.getStatus() != R.Status.Success.getCode()) { + throw new RuntimeException(r.getMessage()); + } + } } From 46be4a88314a85ffba9f1d5fc302c348f589ba0b Mon Sep 17 00:00:00 2001 From: liugddx Date: Wed, 7 Jun 2023 23:43:57 +0800 Subject: [PATCH 07/21] add doc --- docs/en/connector-v2/sink/Milvus.md | 84 +++++++++++++++++++ plugin-mapping.properties | 1 + .../seatunnel/milvus/config/MilvusConfig.java | 5 +- .../milvus/config/MilvusOptions.java | 12 +-- .../milvus/sink/MilvusSinkFactory.java | 8 +- .../milvus/sink/MilvusSinkWriter.java | 6 +- 6 files changed, 97 insertions(+), 19 deletions(-) create mode 100644 docs/en/connector-v2/sink/Milvus.md diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md new file mode 100644 index 00000000000..0efddc8191e --- /dev/null +++ b/docs/en/connector-v2/sink/Milvus.md @@ -0,0 +1,84 @@ +# Milvus + +> Milvus sink connector + +## Description + +Write data to Apache milvus. + +## Key features + +- [ ] [exactly-once](file:///Users/liugddx/code/incubator-seatunnel/docs/en/concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-------------------|--------|----------|------------------------| +| milvus_host | String | Yes | - | +| milvus_port | Int | No | 19530 | +| username | String | Yes | - | +| password | String | Yes | - | +| collection_name | String | No | - | +| partition_field | String | No | - | +| openai_engine | String | No | text-embedding-ada-002 | +| openai_api_key | String | No | - | +| embeddings_fields | String | No | - | + +### milvus_host [string] + +The milvus host. + +### milvus_port [int] + +This port is for gRPC. Default is 19530. + +### username [String] + +The username of milvus server. + +### password [String] + +The password of milvus server. + +### collection_name [String] + +A collection of milvus, which is similar to a table in a relational database. + +### partition_field [String] + +Partition fields, which must be included in the collection's schema. + +### openai_engine [String] + +Text embedding model. Default is 'text-embedding-ada-002'. + +### openai_api_key [String] + +Use your own Open AI API Key here. + +### embeddings_fields [String] + +Fields to be embedded,They use`,`for splitting. + +## Examples + +```hocon +sink { + Milvus { + milvus_host = localhost + milvus_port = 19530 + username = root + password = Milvus + collection_name = title_db + openai_engine = text-embedding-ada-002 + openai_api_key = sk-xxxx + embeddings_fields = title_2 + } +``` + +## Changelog + +### next version + +- Add Milvus Sink Connector + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index de6593b4523..d2a743a1021 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -110,3 +110,4 @@ seatunnel.source.Rocketmq = connector-rocketmq seatunnel.sink.Rocketmq = connector-rocketmq seatunnel.source.Paimon = connector-paimon seatunnel.sink.Paimon = connector-paimon +seatunnel.sink.Milvus = connector-milvus \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java index e0219cbafd7..bf12ba60d1d 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java @@ -67,7 +67,7 @@ public class MilvusConfig implements Serializable { public static final Option OPENAI_ENGINE = Options.key("openai_engine") .stringType() - .noDefaultValue() + .defaultValue("text-embedding-ada-002") .withDescription("Text embedding model. Default is 'text-embedding-ada-002'"); public static final Option OPENAI_API_KEY = @@ -76,9 +76,6 @@ public class MilvusConfig implements Serializable { .noDefaultValue() .withDescription("Use your own Open AI API Key here."); - public static final Option DIMENSION = - Options.key("dimension").intType().noDefaultValue().withDescription("Embeddings size."); - public static final Option EMBEDDINGS_FIELDS = Options.key("embeddings_fields") .stringType() diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java index 68591930f44..1d15776dc41 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java @@ -36,12 +36,15 @@ public class MilvusOptions implements Serializable { private String password; private String openaiEngine; private String openaiApiKey; - private Integer dimension; private String embeddingsFields; public MilvusOptions(Config config) { this.milvusHost = config.getString(MilvusConfig.MILVUS_HOST.key()); - this.milvusPort = config.getInt(MilvusConfig.MILVUS_PORT.key()); + if (config.hasPath(MilvusConfig.MILVUS_PORT.key())) { + this.milvusPort = config.getInt(MilvusConfig.MILVUS_PORT.key()); + } else { + this.milvusPort = MilvusConfig.MILVUS_PORT.defaultValue(); + } this.collectionName = config.getString(MilvusConfig.COLLECTION_NAME.key()); this.userName = config.getString(MilvusConfig.USERNAME.key()); this.password = config.getString(MilvusConfig.PASSWORD.key()); @@ -51,13 +54,12 @@ public MilvusOptions(Config config) { } if (config.hasPath(MilvusConfig.OPENAI_ENGINE.key())) { this.openaiEngine = config.getString(MilvusConfig.OPENAI_ENGINE.key()); + } else { + this.openaiEngine = MilvusConfig.OPENAI_ENGINE.defaultValue(); } if (config.hasPath(MilvusConfig.OPENAI_API_KEY.key())) { this.openaiApiKey = config.getString(MilvusConfig.OPENAI_API_KEY.key()); } - if (config.hasPath(MilvusConfig.DIMENSION.key())) { - this.dimension = config.getInt(MilvusConfig.DIMENSION.key()); - } if (config.hasPath(MilvusConfig.EMBEDDINGS_FIELDS.key())) { this.embeddingsFields = config.getString(MilvusConfig.EMBEDDINGS_FIELDS.key()); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java index b1aad544746..f0904bf7787 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java @@ -24,7 +24,6 @@ import com.google.auto.service.AutoService; import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.COLLECTION_NAME; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.DIMENSION; import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.EMBEDDINGS_FIELDS; import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.MILVUS_HOST; import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.MILVUS_PORT; @@ -45,12 +44,7 @@ public String factoryIdentifier() { public OptionRule optionRule() { return OptionRule.builder() .required(MILVUS_HOST, MILVUS_PORT, COLLECTION_NAME, USERNAME, PASSWORD) - .optional( - PARTITION_FIELD, - OPENAI_ENGINE, - OPENAI_API_KEY, - DIMENSION, - EMBEDDINGS_FIELDS) + .optional(PARTITION_FIELD, OPENAI_ENGINE, OPENAI_API_KEY, EMBEDDINGS_FIELDS) .build(); } } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index f2efa6196b1..ada399c8333 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.milvus.sink; -import io.milvus.grpc.MutationResult; -import io.milvus.param.R; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; @@ -29,6 +27,7 @@ import com.theokanning.openai.service.OpenAiService; import io.milvus.client.MilvusServiceClient; import io.milvus.param.ConnectParam; +import io.milvus.param.R; import io.milvus.param.collection.FlushParam; import io.milvus.param.dml.InsertParam; @@ -92,7 +91,8 @@ public void write(SeaTunnelRow element) throws IOException { String.valueOf(element.getField(i)))) .build()); List embedding = embeddings.getData().get(0).getEmbedding(); - List collect = embedding.stream().map(Double::floatValue).collect(Collectors.toList()); + List collect = + embedding.stream().map(Double::floatValue).collect(Collectors.toList()); InsertParam.Field field = new InsertParam.Field( seaTunnelRowType.getFieldName(i), From 83f22883307a33049113ce62c767eb38469e76ac Mon Sep 17 00:00:00 2001 From: liugddx Date: Thu, 8 Jun 2023 00:01:35 +0800 Subject: [PATCH 08/21] add collection check --- .../seatunnel/milvus/sink/MilvusSink.java | 2 +- .../milvus/sink/MilvusSinkWriter.java | 42 ++++++++++++++----- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java index 9942763189f..4aeca66b9b9 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java @@ -82,6 +82,6 @@ public SeaTunnelDataType getConsumedType() { @Override public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { - return new MilvusSinkWriter(seaTunnelRowType, milvusOptions); + return new MilvusSinkWriter(milvusOptions); } } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index ada399c8333..755e0daca0b 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.milvus.sink; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; @@ -26,10 +25,15 @@ import com.theokanning.openai.embedding.EmbeddingResult; import com.theokanning.openai.service.OpenAiService; import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.DescribeCollectionResponse; import io.milvus.param.ConnectParam; import io.milvus.param.R; +import io.milvus.param.collection.DescribeCollectionParam; +import io.milvus.param.collection.FieldType; import io.milvus.param.collection.FlushParam; +import io.milvus.param.collection.HasCollectionParam; import io.milvus.param.dml.InsertParam; +import io.milvus.response.DescCollResponseWrapper; import java.io.IOException; import java.util.ArrayList; @@ -40,16 +44,15 @@ public class MilvusSinkWriter extends AbstractSinkWriter { - private final SeaTunnelRowType seaTunnelRowType; - private final MilvusServiceClient milvusClient; private final MilvusOptions milvusOptions; private OpenAiService service; - public MilvusSinkWriter(SeaTunnelRowType seaTunnelRowType, MilvusOptions milvusOptions) { - this.seaTunnelRowType = seaTunnelRowType; + private final List fields; + + public MilvusSinkWriter(MilvusOptions milvusOptions) { this.milvusOptions = milvusOptions; ConnectParam connectParam = ConnectParam.newBuilder() @@ -59,6 +62,25 @@ public MilvusSinkWriter(SeaTunnelRowType seaTunnelRowType, MilvusOptions milvusO .build(); milvusClient = new MilvusServiceClient(connectParam); + handleResponseStatus( + milvusClient.hasCollection( + HasCollectionParam.newBuilder() + .withCollectionName(milvusOptions.getCollectionName()) + .build())); + + R describeCollectionResponseR = + milvusClient.describeCollection( + DescribeCollectionParam.newBuilder() + .withCollectionName(milvusOptions.getCollectionName()) + .build()); + + handleResponseStatus(describeCollectionResponseR); + + DescCollResponseWrapper wrapper = + new DescCollResponseWrapper(describeCollectionResponseR.getData()); + + fields = wrapper.getFields(); + if (milvusOptions.getEmbeddingsFields() != null) { service = new OpenAiService(milvusOptions.getOpenaiApiKey()); } @@ -73,15 +95,15 @@ public void write(SeaTunnelRow element) throws IOException { builder = builder.withCollectionName(milvusOptions.getCollectionName()); - for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) { + for (int i = 0; i < this.fields.size(); i++) { if (milvusOptions.getPartitionField() != null - && milvusOptions.getPartitionField().equals(seaTunnelRowType.getFieldName(i))) { + && milvusOptions.getPartitionField().equals(this.fields.get(i).getName())) { builder.withPartitionName(String.valueOf(element.getField(i))); } if (milvusOptions.getEmbeddingsFields() != null) { List embeddingsFields = Arrays.asList(milvusOptions.getEmbeddingsFields().split(",")); - if (embeddingsFields.contains(seaTunnelRowType.getFieldName(i))) { + if (embeddingsFields.contains(this.fields.get(i).getName())) { EmbeddingResult embeddings = service.createEmbeddings( EmbeddingRequest.builder() @@ -95,7 +117,7 @@ public void write(SeaTunnelRow element) throws IOException { embedding.stream().map(Double::floatValue).collect(Collectors.toList()); InsertParam.Field field = new InsertParam.Field( - seaTunnelRowType.getFieldName(i), + this.fields.get(i).getName(), Collections.singletonList(collect)); fields.add(field); continue; @@ -103,7 +125,7 @@ public void write(SeaTunnelRow element) throws IOException { } InsertParam.Field field = new InsertParam.Field( - seaTunnelRowType.getFieldName(i), + this.fields.get(i).getName(), Collections.singletonList(element.getField(i))); fields.add(field); } From 4adf01a739deacc8989a42aef0224dfb6082fb52 Mon Sep 17 00:00:00 2001 From: liugddx Date: Thu, 8 Jun 2023 00:08:28 +0800 Subject: [PATCH 09/21] fix dead line --- docs/en/connector-v2/sink/Milvus.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md index 0efddc8191e..30d83f27f7c 100644 --- a/docs/en/connector-v2/sink/Milvus.md +++ b/docs/en/connector-v2/sink/Milvus.md @@ -8,7 +8,7 @@ Write data to Apache milvus. ## Key features -- [ ] [exactly-once](file:///Users/liugddx/code/incubator-seatunnel/docs/en/concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) ## Options From 95e54b4a6bfeac18462047bfbb95ad7c58f5219a Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Tue, 13 Jun 2023 10:30:37 +0800 Subject: [PATCH 10/21] add some argument check --- .../exception/MilvusConnectorErrorCode.java | 42 +++++++++ .../milvus/sink/MilvusSinkWriter.java | 92 ++++++++++++++++--- 2 files changed, 120 insertions(+), 14 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java new file mode 100644 index 00000000000..96c163aaeb5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorErrorCode.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.milvus.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum MilvusConnectorErrorCode implements SeaTunnelErrorCode { + RESPONSE_FAILED("Milvus-01", "The server returns an exception."); + + private final String code; + private final String description; + + MilvusConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 755e0daca0b..bc8208a3eb7 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -20,11 +20,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; +import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; import com.theokanning.openai.embedding.EmbeddingRequest; import com.theokanning.openai.embedding.EmbeddingResult; import com.theokanning.openai.service.OpenAiService; import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.DataType; import io.milvus.grpc.DescribeCollectionResponse; import io.milvus.param.ConnectParam; import io.milvus.param.R; @@ -42,6 +44,10 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; +import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorErrorCode.RESPONSE_FAILED; + public class MilvusSinkWriter extends AbstractSinkWriter { private final MilvusServiceClient milvusClient; @@ -50,7 +56,7 @@ public class MilvusSinkWriter extends AbstractSinkWriter { private OpenAiService service; - private final List fields; + private final List metaFields; public MilvusSinkWriter(MilvusOptions milvusOptions) { this.milvusOptions = milvusOptions; @@ -79,7 +85,7 @@ public MilvusSinkWriter(MilvusOptions milvusOptions) { DescCollResponseWrapper wrapper = new DescCollResponseWrapper(describeCollectionResponseR.getData()); - fields = wrapper.getFields(); + this.metaFields = wrapper.getFields(); if (milvusOptions.getEmbeddingsFields() != null) { service = new OpenAiService(milvusOptions.getOpenaiApiKey()); @@ -95,15 +101,31 @@ public void write(SeaTunnelRow element) throws IOException { builder = builder.withCollectionName(milvusOptions.getCollectionName()); - for (int i = 0; i < this.fields.size(); i++) { + for (int i = 0; i < this.metaFields.size(); i++) { + + FieldType fieldType = this.metaFields.get(i); + + if (fieldType.isPrimaryKey()) { + if (!(element.getField(i) instanceof Number) + && !(element.getField(i) instanceof String)) { + throw new MilvusConnectorException( + ILLEGAL_ARGUMENT, "Primary key field only supports number and string."); + } + } + if (milvusOptions.getPartitionField() != null - && milvusOptions.getPartitionField().equals(this.fields.get(i).getName())) { + && milvusOptions.getPartitionField().equals(fieldType.getName())) { builder.withPartitionName(String.valueOf(element.getField(i))); } if (milvusOptions.getEmbeddingsFields() != null) { List embeddingsFields = Arrays.asList(milvusOptions.getEmbeddingsFields().split(",")); - if (embeddingsFields.contains(this.fields.get(i).getName())) { + if (embeddingsFields.contains(fieldType.getName())) { + if (fieldType.getDataType() != DataType.BinaryVector + || fieldType.getDataType() != DataType.FloatVector) { + throw new MilvusConnectorException( + ILLEGAL_ARGUMENT, "Vector field only supports binary and float."); + } EmbeddingResult embeddings = service.createEmbeddings( EmbeddingRequest.builder() @@ -117,23 +139,20 @@ public void write(SeaTunnelRow element) throws IOException { embedding.stream().map(Double::floatValue).collect(Collectors.toList()); InsertParam.Field field = new InsertParam.Field( - this.fields.get(i).getName(), - Collections.singletonList(collect)); + fieldType.getName(), Collections.singletonList(collect)); fields.add(field); continue; } } + + judgmentParameterType(fieldType, element.getField(i)); + InsertParam.Field field = new InsertParam.Field( - this.fields.get(i).getName(), - Collections.singletonList(element.getField(i))); + fieldType.getName(), Collections.singletonList(element.getField(i))); fields.add(field); } - if (milvusOptions.getPartitionField() != null) { - builder.withPartitionName(milvusOptions.getPartitionField()); - } - InsertParam build = builder.withFields(fields).build(); handleResponseStatus(milvusClient.insert(build)); @@ -156,7 +175,52 @@ public void close() throws IOException { private void handleResponseStatus(R r) { if (r.getStatus() != R.Status.Success.getCode()) { - throw new RuntimeException(r.getMessage()); + throw new MilvusConnectorException(RESPONSE_FAILED, r.getMessage(), r.getException()); + } + } + + private void judgmentParameterType(FieldType fieldType, Object value) { + switch (fieldType.getDataType()) { + case Bool: + if (!(value instanceof Boolean)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + case Int8: + case Int16: + case Int32: + if (!(value instanceof Integer) + && !(value instanceof Byte) + && !(value instanceof Short)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + case Int64: + if (!(value instanceof Long) + && !(value instanceof Integer) + && !(value instanceof Byte) + && !(value instanceof Short)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + case Float: + if (!(value instanceof Float)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + case Double: + if (!(value instanceof Float) && !(value instanceof Double)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + case VarChar: + if (!(value instanceof String)) { + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); + } + default: + throw new MilvusConnectorException( + UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); } } } From 2abebf413aeb84e2ae59c0a067f2943c789ed3c9 Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Tue, 13 Jun 2023 11:13:33 +0800 Subject: [PATCH 11/21] add e2e test case --- .../connector-milvus-v2-e2e/pom.xml | 36 ++++++++ .../e2e/connector/milvus/MilvusIT.java | 85 +++++++++++++++++++ .../src/test/resources/fake_to_milvus.conf | 81 ++++++++++++++++++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 4 files changed, 203 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml new file mode 100644 index 00000000000..47ccff4ff70 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connector-v2-e2e + 2.3.2-SNAPSHOT + + + org.example + connector-milvus-v2-e2e + + SeaTunnel : E2E : Connector V2 : Milvus + + + 2.2.2 + + + + + + org.apache.seatunnel + connector-milvus + ${project.version} + test + + + io.milvus + milvus-sdk-java + ${milvus.version} + test + + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java new file mode 100644 index 00000000000..6c4e6580785 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/java/org/apache/seatunnel/e2e/connector/milvus/MilvusIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.milvus; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.QueryResults; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.dml.QueryParam; + +import java.io.IOException; + +@Disabled +public class MilvusIT extends TestSuiteBase implements TestResource { + + private MilvusServiceClient milvusClient; + + private final String host = "172.30.10.80"; + + private final Integer port = 19530; + + private final String username = "root"; + + private final String password = "Milvus"; + + private final String collectionName = "title_db"; + + @BeforeEach + @Override + public void startUp() throws Exception { + ConnectParam connectParam = + ConnectParam.newBuilder() + .withHost(host) + .withPort(port) + .withAuthorization(username, password) + .build(); + milvusClient = new MilvusServiceClient(connectParam); + } + + @AfterEach + @Override + public void tearDown() throws Exception { + if (milvusClient != null) { + milvusClient.close(); + } + } + + @TestTemplate + public void testMilvus(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/fake_to_milvus.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + R query = + milvusClient.query( + QueryParam.newBuilder().withCollectionName(collectionName).build()); + + Assertions.assertEquals(5, query.getData().getFieldsDataCount()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf new file mode 100644 index 00000000000..223070fe744 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/src/test/resources/fake_to_milvus.conf @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of batch processing in SeaTunnel config +###### + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + + FakeSource { + schema = { + fields { + c_int = int + c_array = array + c_tinyint = tinyint + c_smallint = smallint + c_bigint = bigint + c_float = float + c_double = double + c_bol = "boolean" + } + } + result_table_name = "fake" + } + +} + +transform { +} + +sink { + Milvus { + milvus_host = 172.30.10.80 + milvus_port = 19530 + username = root + password = Milvus + collection_name = title_db + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 070ab6e936f..aa7c200ba4f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -58,6 +58,7 @@ connector-rocketmq-e2e connector-pulsar-e2e connector-paimon-e2e + connector-milvus-v2-e2e From 4b7c4b5c91a6b3aefae459368350e1efbe4a9a43 Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Tue, 13 Jun 2023 11:15:48 +0800 Subject: [PATCH 12/21] add e2e test case --- .../seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml index 47ccff4ff70..2e6154b1b79 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml @@ -8,7 +8,6 @@ 2.3.2-SNAPSHOT - org.example connector-milvus-v2-e2e SeaTunnel : E2E : Connector V2 : Milvus From 6a5137168765ae27ac3fe961277d69d50ba2a98e Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Tue, 13 Jun 2023 11:30:36 +0800 Subject: [PATCH 13/21] add license --- .../connector-milvus-v2-e2e/pom.xml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml index 2e6154b1b79..f7a66562709 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml @@ -1,4 +1,18 @@ + 4.0.0 From d3fd9c06857f56eb96b9c98536455bdeb16e93da Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Tue, 13 Jun 2023 11:34:17 +0800 Subject: [PATCH 14/21] add license --- .../connector-milvus-v2-e2e/pom.xml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml index f7a66562709..a88f2378438 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-v2-e2e/pom.xml @@ -19,7 +19,7 @@ org.apache.seatunnel seatunnel-connector-v2-e2e - 2.3.2-SNAPSHOT + ${revision} connector-milvus-v2-e2e @@ -32,6 +32,12 @@ + + org.apache.seatunnel + connector-fake + ${project.version} + test + org.apache.seatunnel connector-milvus From f7d01b8d3b1e594dc691d931b8d85bda04831516 Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Tue, 13 Jun 2023 13:07:53 +0800 Subject: [PATCH 15/21] fix some bug --- .../seatunnel/milvus/sink/MilvusSinkWriter.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index bc8208a3eb7..6eefb71efb6 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -106,7 +106,10 @@ public void write(SeaTunnelRow element) throws IOException { FieldType fieldType = this.metaFields.get(i); if (fieldType.isPrimaryKey()) { - if (!(element.getField(i) instanceof Number) + if (!(element.getField(i) instanceof Long) + && !(element.getField(i) instanceof Integer) + && !(element.getField(i) instanceof Byte) + && !(element.getField(i) instanceof Short) && !(element.getField(i) instanceof String)) { throw new MilvusConnectorException( ILLEGAL_ARGUMENT, "Primary key field only supports number and string."); @@ -122,7 +125,7 @@ public void write(SeaTunnelRow element) throws IOException { Arrays.asList(milvusOptions.getEmbeddingsFields().split(",")); if (embeddingsFields.contains(fieldType.getName())) { if (fieldType.getDataType() != DataType.BinaryVector - || fieldType.getDataType() != DataType.FloatVector) { + && fieldType.getDataType() != DataType.FloatVector) { throw new MilvusConnectorException( ILLEGAL_ARGUMENT, "Vector field only supports binary and float."); } @@ -186,6 +189,7 @@ private void judgmentParameterType(FieldType fieldType, Object value) { throw new MilvusConnectorException( UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); } + break; case Int8: case Int16: case Int32: @@ -195,6 +199,7 @@ private void judgmentParameterType(FieldType fieldType, Object value) { throw new MilvusConnectorException( UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); } + break; case Int64: if (!(value instanceof Long) && !(value instanceof Integer) @@ -203,21 +208,25 @@ private void judgmentParameterType(FieldType fieldType, Object value) { throw new MilvusConnectorException( UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); } + break; case Float: if (!(value instanceof Float)) { throw new MilvusConnectorException( UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); } + break; case Double: if (!(value instanceof Float) && !(value instanceof Double)) { throw new MilvusConnectorException( UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); } + break; case VarChar: if (!(value instanceof String)) { throw new MilvusConnectorException( UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); } + break; default: throw new MilvusConnectorException( UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE.getDescription()); From d0c00f07eeea029da2915e8b73c202b8e98081b3 Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Fri, 16 Jun 2023 11:43:12 +0800 Subject: [PATCH 16/21] add cdc feature --- docs/en/connector-v2/sink/Milvus.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md index 30d83f27f7c..1cc39e66d1b 100644 --- a/docs/en/connector-v2/sink/Milvus.md +++ b/docs/en/connector-v2/sink/Milvus.md @@ -9,6 +9,7 @@ Write data to Apache milvus. ## Key features - [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) ## Options From 92b11c87e632670a31e7d1340692ea4531077310 Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Mon, 21 Aug 2023 10:40:49 +0800 Subject: [PATCH 17/21] format doc --- docs/en/connector-v2/sink/Milvus.md | 86 ++++++++++++----------------- 1 file changed, 34 insertions(+), 52 deletions(-) diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md index 1cc39e66d1b..9d33f54d196 100644 --- a/docs/en/connector-v2/sink/Milvus.md +++ b/docs/en/connector-v2/sink/Milvus.md @@ -1,65 +1,47 @@ -# Milvus - > Milvus sink connector -## Description +## Support These Engines -Write data to Apache milvus. +> Spark
+> Flink
+> SeaTunnel Zeta
-## Key features +## Key Features - [ ] [exactly-once](../../concept/connector-v2-features.md) - [ ] [cdc](../../concept/connector-v2-features.md) -## Options - -| name | type | required | default value | -|-------------------|--------|----------|------------------------| -| milvus_host | String | Yes | - | -| milvus_port | Int | No | 19530 | -| username | String | Yes | - | -| password | String | Yes | - | -| collection_name | String | No | - | -| partition_field | String | No | - | -| openai_engine | String | No | text-embedding-ada-002 | -| openai_api_key | String | No | - | -| embeddings_fields | String | No | - | - -### milvus_host [string] - -The milvus host. - -### milvus_port [int] - -This port is for gRPC. Default is 19530. - -### username [String] - -The username of milvus server. - -### password [String] - -The password of milvus server. - -### collection_name [String] - -A collection of milvus, which is similar to a table in a relational database. - -### partition_field [String] - -Partition fields, which must be included in the collection's schema. - -### openai_engine [String] - -Text embedding model. Default is 'text-embedding-ada-002'. - -### openai_api_key [String] - -Use your own Open AI API Key here. +## Description -### embeddings_fields [String] +Write data to Apache milvus. -Fields to be embedded,They use`,`for splitting. +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------| +| milvus_host | String | Yes | - | The milvus host. | +| milvus_port | Int | No | 19530 | This port is for gRPC. Default is 19530. | +| username | String | Yes | - | The username of milvus server. | +| password | String | Yes | - | The password of milvus server. | +| collection_name | String | No | - | A collection of milvus, which is similar to a table in a relational database. | +| partition_field | String | No | - | Partition fields, which must be included in the collection's schema. | +| openai_engine | String | No | text-embedding-ada-002 | Text embedding model. Default is 'text-embedding-ada-002'. | +| openai_api_key | String | No | - | Use your own Open AI API Key here. | +| embeddings_fields | String | No | - | Fields to be embedded,They use`,`for splitting. | + +### Data Type Mapping + +| Milvus Data type | SeaTunnel Data type | +|------------------|---------------------| +| Bool | BOOLEAN | +| Int8 | TINYINT | +| Int16 | SMALLINT | +| Int32 | INT | +| Int64 | BIGINT | +| Float | FLOAT | +| Double | DOUBLE | +| VarChar | DECIMAL | +| String | STRING | ## Examples From fe55be8029a9d1a5a81c958bd391cecef6f0b3ea Mon Sep 17 00:00:00 2001 From: liugddx Date: Thu, 24 Aug 2023 20:00:56 +0800 Subject: [PATCH 18/21] format doc --- docs/en/connector-v2/sink/Milvus.md | 27 ++++++ .../seatunnel/milvus/config/MilvusConfig.java | 84 ---------------- .../milvus/config/MilvusOptions.java | 95 +++++++++++-------- .../milvus/config/MilvusSinkConfig.java | 57 +++++++++++ .../seatunnel/milvus/sink/MilvusSink.java | 20 ++-- .../milvus/sink/MilvusSinkFactory.java | 13 +-- .../milvus/sink/MilvusSinkWriter.java | 37 ++++---- 7 files changed, 168 insertions(+), 165 deletions(-) delete mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java create mode 100644 seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md index 9d33f54d196..85ead00f647 100644 --- a/docs/en/connector-v2/sink/Milvus.md +++ b/docs/en/connector-v2/sink/Milvus.md @@ -46,6 +46,32 @@ Write data to Apache milvus. ## Examples ```hocon +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + LocalFile { + schema { + fields { + bookID = string + title_1 = string + title_2 = string + } + } + path = "/tmp/milvus_test/book" + file_format_type = "csv" + } +} + +transform { +} + sink { Milvus { milvus_host = localhost @@ -57,6 +83,7 @@ sink { openai_api_key = sk-xxxx embeddings_fields = title_2 } +} ``` ## Changelog diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java deleted file mode 100644 index bf12ba60d1d..00000000000 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusConfig.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.milvus.config; - -import org.apache.seatunnel.api.configuration.Option; -import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.api.sink.SeaTunnelSink; - -import java.io.Serializable; - -/** Utility class to milvus configuration options, used by {@link SeaTunnelSink}. */ -public class MilvusConfig implements Serializable { - - public static final Option MILVUS_HOST = - Options.key("milvus_host") - .stringType() - .noDefaultValue() - .withDescription("The milvus host"); - - public static final Option MILVUS_PORT = - Options.key("milvus_port") - .intType() - .defaultValue(19530) - .withDescription("This port is for gRPC. Default is 19530"); - - public static final Option USERNAME = - Options.key("username") - .stringType() - .noDefaultValue() - .withDescription("The username of milvus server."); - - public static final Option PASSWORD = - Options.key("password") - .stringType() - .noDefaultValue() - .withDescription("The password of milvus server."); - - public static final Option COLLECTION_NAME = - Options.key("collection_name") - .stringType() - .noDefaultValue() - .withDescription( - "A collection of milvus, which is similar to a table in a relational database."); - - public static final Option PARTITION_FIELD = - Options.key("partition_field") - .stringType() - .noDefaultValue() - .withDescription( - "Partition fields, which must be included in the collection's schema."); - - public static final Option OPENAI_ENGINE = - Options.key("openai_engine") - .stringType() - .defaultValue("text-embedding-ada-002") - .withDescription("Text embedding model. Default is 'text-embedding-ada-002'"); - - public static final Option OPENAI_API_KEY = - Options.key("openai_api_key") - .stringType() - .noDefaultValue() - .withDescription("Use your own Open AI API Key here."); - - public static final Option EMBEDDINGS_FIELDS = - Options.key("embeddings_fields") - .stringType() - .noDefaultValue() - .withDescription("Fields to be embedded,They use`,`for splitting"); -} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java index 1d15776dc41..f64b3fd731b 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusOptions.java @@ -17,51 +17,66 @@ package org.apache.seatunnel.connectors.seatunnel.milvus.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import lombok.AllArgsConstructor; -import lombok.Data; +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; import java.io.Serializable; -@Data -@AllArgsConstructor public class MilvusOptions implements Serializable { - private String milvusHost; - private Integer milvusPort; - private String collectionName; - private String partitionField; - private String userName; - private String password; - private String openaiEngine; - private String openaiApiKey; - private String embeddingsFields; + public static final Option MILVUS_HOST = + Options.key("milvus_host") + .stringType() + .noDefaultValue() + .withDescription("The milvus host"); + + public static final Option MILVUS_PORT = + Options.key("milvus_port") + .intType() + .defaultValue(19530) + .withDescription("This port is for gRPC. Default is 19530"); + + public static final Option USERNAME = + Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("The username of milvus server."); + + public static final Option PASSWORD = + Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("The password of milvus server."); + + public static final Option COLLECTION_NAME = + Options.key("collection_name") + .stringType() + .noDefaultValue() + .withDescription( + "A collection of milvus, which is similar to a table in a relational database."); + + public static final Option PARTITION_FIELD = + Options.key("partition_field") + .stringType() + .noDefaultValue() + .withDescription( + "Partition fields, which must be included in the collection's schema."); + + public static final Option OPENAI_ENGINE = + Options.key("openai_engine") + .stringType() + .defaultValue("text-embedding-ada-002") + .withDescription("Text embedding model. Default is 'text-embedding-ada-002'"); - public MilvusOptions(Config config) { - this.milvusHost = config.getString(MilvusConfig.MILVUS_HOST.key()); - if (config.hasPath(MilvusConfig.MILVUS_PORT.key())) { - this.milvusPort = config.getInt(MilvusConfig.MILVUS_PORT.key()); - } else { - this.milvusPort = MilvusConfig.MILVUS_PORT.defaultValue(); - } - this.collectionName = config.getString(MilvusConfig.COLLECTION_NAME.key()); - this.userName = config.getString(MilvusConfig.USERNAME.key()); - this.password = config.getString(MilvusConfig.PASSWORD.key()); + public static final Option OPENAI_API_KEY = + Options.key("openai_api_key") + .stringType() + .noDefaultValue() + .withDescription("Use your own Open AI API Key here."); - if (config.hasPath(MilvusConfig.PARTITION_FIELD.key())) { - this.partitionField = config.getString(MilvusConfig.PARTITION_FIELD.key()); - } - if (config.hasPath(MilvusConfig.OPENAI_ENGINE.key())) { - this.openaiEngine = config.getString(MilvusConfig.OPENAI_ENGINE.key()); - } else { - this.openaiEngine = MilvusConfig.OPENAI_ENGINE.defaultValue(); - } - if (config.hasPath(MilvusConfig.OPENAI_API_KEY.key())) { - this.openaiApiKey = config.getString(MilvusConfig.OPENAI_API_KEY.key()); - } - if (config.hasPath(MilvusConfig.EMBEDDINGS_FIELDS.key())) { - this.embeddingsFields = config.getString(MilvusConfig.EMBEDDINGS_FIELDS.key()); - } - } + public static final Option EMBEDDINGS_FIELDS = + Options.key("embeddings_fields") + .stringType() + .noDefaultValue() + .withDescription("Fields to be embedded,They use`,`for splitting"); } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java new file mode 100644 index 00000000000..56aff7d6550 --- /dev/null +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.milvus.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.io.Serializable; + +@Setter +@Getter +@ToString +public class MilvusSinkConfig implements Serializable { + + private String milvusHost; + private Integer milvusPort; + private String collectionName; + private String partitionField; + private String userName; + private String password; + private String openaiEngine; + private String openaiApiKey; + private String embeddingsFields; + + public static MilvusSinkConfig of(ReadonlyConfig config) { + MilvusSinkConfig sinkConfig = new MilvusSinkConfig(); + sinkConfig.setMilvusHost(config.get(MilvusOptions.MILVUS_HOST)); + sinkConfig.setMilvusPort(config.get(MilvusOptions.MILVUS_PORT)); + sinkConfig.setCollectionName(config.get(MilvusOptions.COLLECTION_NAME)); + config.getOptional(MilvusOptions.PARTITION_FIELD).ifPresent(sinkConfig::setPartitionField); + config.getOptional(MilvusOptions.USERNAME).ifPresent(sinkConfig::setUserName); + config.getOptional(MilvusOptions.PASSWORD).ifPresent(sinkConfig::setPassword); + config.getOptional(MilvusOptions.OPENAI_ENGINE).ifPresent(sinkConfig::setOpenaiEngine); + config.getOptional(MilvusOptions.OPENAI_API_KEY).ifPresent(sinkConfig::setOpenaiApiKey); + config.getOptional(MilvusOptions.EMBEDDINGS_FIELDS) + .ifPresent(sinkConfig::setEmbeddingsFields); + + return sinkConfig; + } +} diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java index 4aeca66b9b9..1ac7b551b38 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -31,20 +32,18 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig; import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; import com.google.auto.service.AutoService; -import java.io.IOException; - @AutoService(SeaTunnelSink.class) public class MilvusSink extends AbstractSimpleSink { private SeaTunnelRowType seaTunnelRowType; - private MilvusOptions milvusOptions; + private MilvusSinkConfig milvusSinkConfig; @Override public String getPluginName() { @@ -53,13 +52,13 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { - milvusOptions = new MilvusOptions(pluginConfig); + milvusSinkConfig = MilvusSinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig)); CheckResult result = CheckConfigUtil.checkAllExists( pluginConfig, - MilvusConfig.MILVUS_HOST.key(), - MilvusConfig.MILVUS_PORT.key(), - MilvusConfig.COLLECTION_NAME.key()); + MilvusOptions.MILVUS_HOST.key(), + MilvusOptions.MILVUS_PORT.key(), + MilvusOptions.COLLECTION_NAME.key()); if (!result.isSuccess()) { throw new MilvusConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, @@ -80,8 +79,7 @@ public SeaTunnelDataType getConsumedType() { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) - throws IOException { - return new MilvusSinkWriter(milvusOptions); + public AbstractSinkWriter createWriter(SinkWriter.Context context) { + return new MilvusSinkWriter(milvusSinkConfig); } } diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java index f0904bf7787..92f9205cfc1 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java @@ -23,18 +23,7 @@ import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.COLLECTION_NAME; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.EMBEDDINGS_FIELDS; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.MILVUS_HOST; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.MILVUS_PORT; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.OPENAI_API_KEY; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.OPENAI_ENGINE; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.PARTITION_FIELD; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusConfig.USERNAME; - -@AutoService(Factory.class) -public class MilvusSinkFactory implements TableSinkFactory { +static @AutoService(Factory.class) public class MilvusSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { return "Milvus"; diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 6eefb71efb6..0ed33489325 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -19,7 +19,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; import com.theokanning.openai.embedding.EmbeddingRequest; @@ -52,32 +52,33 @@ public class MilvusSinkWriter extends AbstractSinkWriter { private final MilvusServiceClient milvusClient; - private final MilvusOptions milvusOptions; + private final MilvusSinkConfig milvusSinkConfig; private OpenAiService service; private final List metaFields; - public MilvusSinkWriter(MilvusOptions milvusOptions) { - this.milvusOptions = milvusOptions; + public MilvusSinkWriter(MilvusSinkConfig milvusSinkConfig) { + this.milvusSinkConfig = milvusSinkConfig; ConnectParam connectParam = ConnectParam.newBuilder() - .withHost(milvusOptions.getMilvusHost()) - .withPort(milvusOptions.getMilvusPort()) - .withAuthorization(milvusOptions.getUserName(), milvusOptions.getPassword()) + .withHost(milvusSinkConfig.getMilvusHost()) + .withPort(milvusSinkConfig.getMilvusPort()) + .withAuthorization( + milvusSinkConfig.getUserName(), milvusSinkConfig.getPassword()) .build(); milvusClient = new MilvusServiceClient(connectParam); handleResponseStatus( milvusClient.hasCollection( HasCollectionParam.newBuilder() - .withCollectionName(milvusOptions.getCollectionName()) + .withCollectionName(milvusSinkConfig.getCollectionName()) .build())); R describeCollectionResponseR = milvusClient.describeCollection( DescribeCollectionParam.newBuilder() - .withCollectionName(milvusOptions.getCollectionName()) + .withCollectionName(milvusSinkConfig.getCollectionName()) .build()); handleResponseStatus(describeCollectionResponseR); @@ -87,8 +88,8 @@ public MilvusSinkWriter(MilvusOptions milvusOptions) { this.metaFields = wrapper.getFields(); - if (milvusOptions.getEmbeddingsFields() != null) { - service = new OpenAiService(milvusOptions.getOpenaiApiKey()); + if (milvusSinkConfig.getEmbeddingsFields() != null) { + service = new OpenAiService(milvusSinkConfig.getOpenaiApiKey()); } } @@ -99,7 +100,7 @@ public void write(SeaTunnelRow element) throws IOException { InsertParam.Builder builder = InsertParam.newBuilder(); - builder = builder.withCollectionName(milvusOptions.getCollectionName()); + builder = builder.withCollectionName(milvusSinkConfig.getCollectionName()); for (int i = 0; i < this.metaFields.size(); i++) { @@ -116,13 +117,13 @@ public void write(SeaTunnelRow element) throws IOException { } } - if (milvusOptions.getPartitionField() != null - && milvusOptions.getPartitionField().equals(fieldType.getName())) { + if (milvusSinkConfig.getPartitionField() != null + && milvusSinkConfig.getPartitionField().equals(fieldType.getName())) { builder.withPartitionName(String.valueOf(element.getField(i))); } - if (milvusOptions.getEmbeddingsFields() != null) { + if (milvusSinkConfig.getEmbeddingsFields() != null) { List embeddingsFields = - Arrays.asList(milvusOptions.getEmbeddingsFields().split(",")); + Arrays.asList(milvusSinkConfig.getEmbeddingsFields().split(",")); if (embeddingsFields.contains(fieldType.getName())) { if (fieldType.getDataType() != DataType.BinaryVector && fieldType.getDataType() != DataType.FloatVector) { @@ -132,7 +133,7 @@ public void write(SeaTunnelRow element) throws IOException { EmbeddingResult embeddings = service.createEmbeddings( EmbeddingRequest.builder() - .model(milvusOptions.getOpenaiEngine()) + .model(milvusSinkConfig.getOpenaiEngine()) .input( Collections.singletonList( String.valueOf(element.getField(i)))) @@ -165,7 +166,7 @@ public void write(SeaTunnelRow element) throws IOException { public List snapshotState(long checkpointId) throws IOException { milvusClient.flush( FlushParam.newBuilder() - .addCollectionName(milvusOptions.getCollectionName()) + .addCollectionName(milvusSinkConfig.getCollectionName()) .build()); return Collections.emptyList(); } From 39e809a31fa8d372dec08299652309cbc4c1a1c4 Mon Sep 17 00:00:00 2001 From: liugddx Date: Thu, 24 Aug 2023 20:11:11 +0800 Subject: [PATCH 19/21] fix error --- .../seatunnel/milvus/sink/MilvusSink.java | 22 +++---------------- .../milvus/sink/MilvusSinkFactory.java | 17 +++++++++++--- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java index 1ac7b551b38..6f266e46ed6 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java @@ -20,21 +20,16 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig; -import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException; import com.google.auto.service.AutoService; @@ -53,19 +48,8 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { milvusSinkConfig = MilvusSinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig)); - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - MilvusOptions.MILVUS_HOST.key(), - MilvusOptions.MILVUS_PORT.key(), - MilvusOptions.COLLECTION_NAME.key()); - if (!result.isSuccess()) { - throw new MilvusConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } + ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig)) + .validate(new MilvusSinkFactory().optionRule()); } @Override diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java index 92f9205cfc1..a256425d756 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java @@ -20,10 +20,12 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusOptions; import com.google.auto.service.AutoService; -static @AutoService(Factory.class) public class MilvusSinkFactory implements TableSinkFactory { +@AutoService(Factory.class) +public class MilvusSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { return "Milvus"; @@ -32,8 +34,17 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(MILVUS_HOST, MILVUS_PORT, COLLECTION_NAME, USERNAME, PASSWORD) - .optional(PARTITION_FIELD, OPENAI_ENGINE, OPENAI_API_KEY, EMBEDDINGS_FIELDS) + .required( + MilvusOptions.MILVUS_HOST, + MilvusOptions.MILVUS_PORT, + MilvusOptions.COLLECTION_NAME, + MilvusOptions.USERNAME, + MilvusOptions.PASSWORD) + .optional( + MilvusOptions.PARTITION_FIELD, + MilvusOptions.OPENAI_ENGINE, + MilvusOptions.OPENAI_API_KEY, + MilvusOptions.EMBEDDINGS_FIELDS) .build(); } } From c13adcb482796cf84e6eb711a267dbed7b052806 Mon Sep 17 00:00:00 2001 From: liugddx Date: Thu, 24 Aug 2023 23:55:03 +0800 Subject: [PATCH 20/21] fix error --- .../milvus/sink/MilvusSinkWriter.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 0ed33489325..89c4c1d227d 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -106,12 +106,13 @@ public void write(SeaTunnelRow element) throws IOException { FieldType fieldType = this.metaFields.get(i); + Object field = element.getField(i); if (fieldType.isPrimaryKey()) { - if (!(element.getField(i) instanceof Long) - && !(element.getField(i) instanceof Integer) - && !(element.getField(i) instanceof Byte) - && !(element.getField(i) instanceof Short) - && !(element.getField(i) instanceof String)) { + if (!(field instanceof Long) + && !(field instanceof Integer) + && !(field instanceof Byte) + && !(field instanceof Short) + && !(field instanceof String)) { throw new MilvusConnectorException( ILLEGAL_ARGUMENT, "Primary key field only supports number and string."); } @@ -119,7 +120,7 @@ public void write(SeaTunnelRow element) throws IOException { if (milvusSinkConfig.getPartitionField() != null && milvusSinkConfig.getPartitionField().equals(fieldType.getName())) { - builder.withPartitionName(String.valueOf(element.getField(i))); + builder.withPartitionName(String.valueOf(field)); } if (milvusSinkConfig.getEmbeddingsFields() != null) { List embeddingsFields = @@ -136,25 +137,25 @@ public void write(SeaTunnelRow element) throws IOException { .model(milvusSinkConfig.getOpenaiEngine()) .input( Collections.singletonList( - String.valueOf(element.getField(i)))) + String.valueOf(field))) .build()); List embedding = embeddings.getData().get(0).getEmbedding(); List collect = embedding.stream().map(Double::floatValue).collect(Collectors.toList()); - InsertParam.Field field = + InsertParam.Field insertField = new InsertParam.Field( fieldType.getName(), Collections.singletonList(collect)); - fields.add(field); + fields.add(insertField); continue; } } - judgmentParameterType(fieldType, element.getField(i)); + judgmentParameterType(fieldType, field); - InsertParam.Field field = + InsertParam.Field insertField = new InsertParam.Field( - fieldType.getName(), Collections.singletonList(element.getField(i))); - fields.add(field); + fieldType.getName(), Collections.singletonList(field)); + fields.add(insertField); } InsertParam build = builder.withFields(fields).build(); From f6ad8ad4e2c0b4c15af8bd9a44e6eedd562e4a99 Mon Sep 17 00:00:00 2001 From: liugddx Date: Thu, 24 Aug 2023 23:57:03 +0800 Subject: [PATCH 21/21] fix error --- .../connectors/seatunnel/milvus/sink/MilvusSinkWriter.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 89c4c1d227d..5fe5ff9a504 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -135,9 +135,7 @@ public void write(SeaTunnelRow element) throws IOException { service.createEmbeddings( EmbeddingRequest.builder() .model(milvusSinkConfig.getOpenaiEngine()) - .input( - Collections.singletonList( - String.valueOf(field))) + .input(Collections.singletonList(String.valueOf(field))) .build()); List embedding = embeddings.getData().get(0).getEmbedding(); List collect = @@ -153,8 +151,7 @@ public void write(SeaTunnelRow element) throws IOException { judgmentParameterType(fieldType, field); InsertParam.Field insertField = - new InsertParam.Field( - fieldType.getName(), Collections.singletonList(field)); + new InsertParam.Field(fieldType.getName(), Collections.singletonList(field)); fields.add(insertField); }