diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 437eabe0..6ac74674 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -144,4 +144,7 @@ public interface ConfigurationOptions { String DORIS_HTTPS_KEY_STORE_PASSWORD = "doris.https.key-store-password"; + String LOAD_MODE = "doris.sink.load.mode"; + String DEFAULT_LOAD_MODE = "stream_load"; + } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java new file mode 100644 index 00000000..7d40a1ce --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.exception; + +public class CopyIntoException extends Exception { + public CopyIntoException() { + super(); + } + public CopyIntoException(String message) { + super(message); + } + public CopyIntoException(String message, Throwable cause) { + super(message, cause); + } + public CopyIntoException(Throwable cause) { + super(cause); + } + protected CopyIntoException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java index 60e39494..5f8d6ee1 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java @@ -33,6 +33,9 @@ public class RespContent { @JsonProperty(value = "TxnId") private long TxnId; + @JsonProperty(value = "msg") + private String msg; + @JsonProperty(value = "Label") private String Label; @@ -108,4 +111,7 @@ public boolean isSuccess() { return DORIS_SUCCESS_STATUS.contains(getStatus()); } + public boolean isCopyIntoSuccess(){ + return this.msg.equals("success"); + } } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java new file mode 100644 index 00000000..70dddd50 --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import java.util.Map; +import java.util.Properties; +import java.util.StringJoiner; +import java.util.List; +import java.util.Arrays; + +public class CopySQLBuilder { + private final static String COPY_SYNC = "copy.async"; + private final static String FILE_TYPE = "file.type"; + private final static String FORMAT_KEY = "format"; + private final static String FIELD_DELIMITER_KEY = "column_separator"; + private final static String LINE_DELIMITER_KEY = "line_delimiter"; + private final static String COMPRESSION = "compression"; + + private final String fileName; + private Properties copyIntoProps; + private String tableIdentifier; + private String data_type; + + public CopySQLBuilder(String data_type, Properties copyIntoProps, String tableIdentifier, String fileName) { + this.data_type = data_type; + this.fileName = fileName; + this.tableIdentifier = tableIdentifier; + this.copyIntoProps = copyIntoProps; + } + + public String buildCopySQL() { + StringBuilder sb = new StringBuilder(); + sb.append("COPY INTO ").append(tableIdentifier).append(" FROM @~('{").append(String.format(fileName,"}*')")).append(" PROPERTIES ("); + + //copy into must be sync + copyIntoProps.put(COPY_SYNC, false); + copyIntoProps.put(FILE_TYPE, data_type); + if (data_type.equals("JSON")) { + copyIntoProps.put("file.strip_outer_array", "false"); + } + StringJoiner props = new StringJoiner(","); + for (Map.Entry entry : copyIntoProps.entrySet()) { + // remove format + if (!String.valueOf(entry.getKey()).equals("format")){ + String key = concatPropPrefix(String.valueOf(entry.getKey())); + String value = String.valueOf(entry.getValue()); + String prop = String.format("'%s'='%s'", key, value); + props.add(prop); + } + } + sb.append(props).append(" )"); + return sb.toString(); + } + + static final List PREFIX_LIST = + Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, COMPRESSION); + + private String concatPropPrefix(String key) { + if (PREFIX_LIST.contains(key)) { + return "file." + key; + } + if (FORMAT_KEY.equals(key)) { + return "file.type"; + } + return key; + } +} \ No newline at end of file diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java new file mode 100644 index 00000000..d88d04b5 --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPost; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class HttpPostBuilder { + String url; + Map header; + HttpEntity httpEntity; + + public HttpPostBuilder() { + header = new HashMap<>(); + } + + public HttpPostBuilder setUrl(String url) { + this.url = url; + return this; + } + + public HttpPostBuilder addCommonHeader() { + header.put(HttpHeaders.EXPECT, "100-continue"); + return this; + } + + public HttpPostBuilder baseAuth(String encoded) { + header.put(HttpHeaders.AUTHORIZATION, "Basic " + encoded); + return this; + } + + public HttpPostBuilder setEntity(HttpEntity httpEntity) { + this.httpEntity = httpEntity; + return this; + } + + public HttpPost build() { + Preconditions.checkNotNull(url); + Preconditions.checkNotNull(httpEntity); + HttpPost put = new HttpPost(url); + header.forEach(put::setHeader); + put.setEntity(httpEntity); + return put; + } +} \ No newline at end of file diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java new file mode 100644 index 00000000..e46b0990 --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class HttpPutBuilder { + String url; + Map header; + HttpEntity httpEntity; + public HttpPutBuilder() { + header = new HashMap<>(); + } + + public HttpPutBuilder setUrl(String url) { + this.url = url; + return this; + } + + public HttpPutBuilder addFileName(String fileName){ + header.put("fileName", fileName); + return this; + } + + public HttpPutBuilder setEmptyEntity() { + try { + this.httpEntity = new StringEntity(""); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + return this; + } + + public HttpPutBuilder addCommonHeader() { + header.put(HttpHeaders.EXPECT, "100-continue"); + return this; + } + + public HttpPutBuilder baseAuth(String encoded) { + header.put(HttpHeaders.AUTHORIZATION, "Basic " + encoded); + return this; + } + + public HttpPutBuilder setEntity(HttpEntity httpEntity) { + this.httpEntity = httpEntity; + return this; + } + + public HttpPut build() { + Preconditions.checkNotNull(url); + Preconditions.checkNotNull(httpEntity); + HttpPut put = new HttpPut(url); + header.forEach(put::setHeader); + put.setEntity(httpEntity); + return put; + } +} \ No newline at end of file diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala new file mode 100644 index 00000000..a42e111b --- /dev/null +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.load + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.ObjectNode +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.exception.{CopyIntoException, StreamLoadException} +import org.apache.doris.spark.rest.models.RespContent +import org.apache.doris.spark.util.{CopySQLBuilder, HttpPostBuilder, HttpPutBuilder} +import org.apache.hadoop.util.StringUtils.escapeString +import org.apache.http.{HttpEntity, HttpStatus} +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.entity.{BufferedHttpEntity, ByteArrayEntity, InputStreamEntity, StringEntity} +import org.apache.http.impl.client.{CloseableHttpClient, HttpClients} +import org.apache.http.util.EntityUtils + +import scala.collection.JavaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType +import org.slf4j.{Logger, LoggerFactory} + +import java.io.{ByteArrayOutputStream, IOException} +import java.nio.charset.StandardCharsets +import java.util.zip.GZIPOutputStream +import java.util.{Base64, Properties, UUID} +import scala.util.{Failure, Success, Try} + +case class CopyIntoResponse(code: Int, msg: String, content: String) + +class CopyIntoLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader { + + private final val LOG: Logger = LoggerFactory.getLogger(classOf[CopyIntoLoader]) + + private val hostPort:String = settings.getProperty(ConfigurationOptions.DORIS_FENODES) + + private val tableIdentifier: String = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER) + + private val OBJECT_MAPPER = new ObjectMapper + + private val copyIntoProps: Properties = getCopyIntoProps + + private val format: DataFormat = DataFormat.valueOf(copyIntoProps.getOrDefault("format", "csv").toString.toUpperCase) + + private val LOAD_URL_PATTERN = "http://%s/copy/upload" + + private val COMMIT_PATTERN = "http://%s/copy/query" + + private val authEncoded: String = getAuthEncoded + + + /** + * execute load + * + * @param iterator row data iterator + * @param schema row data schema + * @return commit message + */ + override def load(iterator: Iterator[InternalRow], schema: StructType): Option[CommitMessage] = { + + var msg: Option[CommitMessage] = None + + val fileName: String = UUID.randomUUID().toString + val client: CloseableHttpClient = getHttpClient + val currentLoadUrl: String = String.format(LOAD_URL_PATTERN, hostPort) + + Try { + val uploadAddressRequest = buildUploadAddressRequest(currentLoadUrl,fileName) + val uploadAddressReponse = client.execute(uploadAddressRequest.build()) + val uploadAddress = handleGetUploadAddressResponse(uploadAddressReponse) + val uploadFileRequest = buildUpLoadFileRequest(uploadAddress, iterator, schema) + val uploadFileReponse = client.execute(uploadFileRequest.build()) + handleUploadFileResponse(uploadFileReponse) + val copyIntoRequest = executeCopyInto(s"${fileName}%s") + val copyIntoReponse = client.execute(copyIntoRequest.build()) + handleExecuteCopyintoResponse(copyIntoReponse) + msg = Some(CommitMessage(fileName)) + } match { + case Success(_) => client.close() + case Failure(e) => + LOG.error(s"Copy into failed, err: ${ExceptionUtils.getStackTrace(e)}") + if (e.isInstanceOf[CopyIntoException]) throw e + throw new CopyIntoException(s"failed to load data on $currentLoadUrl", e) + } + msg + } + + /** + * handle execute copy into response + * + * @param copyIntoReponse row data iterator + */ + private def handleExecuteCopyintoResponse(copyIntoReponse: CloseableHttpResponse) = { + val code = copyIntoReponse.getStatusLine.getStatusCode + val msg = copyIntoReponse.getStatusLine.getReasonPhrase + val content = EntityUtils.toString(new BufferedHttpEntity(copyIntoReponse.getEntity), StandardCharsets.UTF_8) + val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content) + if (loadResponse.code != HttpStatus.SC_OK) { + LOG.error(s"Execute copy sql status is not OK, status: ${loadResponse.code}, response: $loadResponse") + throw new StreamLoadException(String.format("Execute copy sql, http status:%d, response:%s", + new Integer(loadResponse.code), loadResponse)) + } else { + try { + val respContent = OBJECT_MAPPER.readValue(loadResponse.content, classOf[RespContent]) + if (!respContent.isCopyIntoSuccess) { + LOG.error(s"Execute copy sql status is not success, status:${respContent.getStatus}, response:$loadResponse") + throw new StreamLoadException(String.format("Execute copy sql error, load status:%s, response:%s", respContent.getStatus, loadResponse)) + } + LOG.info("Execute copy sql Response:{}", loadResponse) + } catch { + case e: IOException => + throw new StreamLoadException(e) + } + } + } + + private def handleUploadFileResponse(uploadFileReponse: CloseableHttpResponse) = { + val code = uploadFileReponse.getStatusLine.getStatusCode + val msg = uploadFileReponse.getStatusLine.getReasonPhrase + val content = EntityUtils.toString(new BufferedHttpEntity(uploadFileReponse.getEntity), StandardCharsets.UTF_8) + val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content) + if (loadResponse.code != HttpStatus.SC_OK) { + LOG.error(s"Upload file status is not OK, status: ${loadResponse.code}, response: $loadResponse") + throw new CopyIntoException(s"Upload file error, http status:${loadResponse.code}, response:$loadResponse") + } else { + LOG.info(s"Upload file success,status: ${loadResponse.code}, response: $loadResponse") + } + } + + private def buildUpLoadFileRequest(uploadAddress: String, iterator: Iterator[InternalRow], schema: StructType): HttpPutBuilder = { + val builder = new HttpPutBuilder().setUrl(uploadAddress).addCommonHeader().setEntity(generateHttpEntity(iterator, schema)) + builder + } + + /** + * commit transaction + * + * @param msg commit message + */ + override def commit(msg: CommitMessage): Unit = ??? + + + /** + * abort transaction + * + * @param msg commit message + */ + override def abort(msg: CommitMessage): Unit = ??? + + private def executeCopyInto(fileName: String): HttpPostBuilder = { + + val copySQLBuilder: CopySQLBuilder = new CopySQLBuilder(format.toString, copyIntoProps, tableIdentifier, fileName) + val copySql: String = copySQLBuilder.buildCopySQL() + LOG.info(s"build copy sql is $copySql") + val objectNode: ObjectNode = OBJECT_MAPPER.createObjectNode() + objectNode.put("sql", copySql) + + val postBuilder: HttpPostBuilder = new HttpPostBuilder() + postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort)).baseAuth(authEncoded) + .setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(objectNode))) + } + + private def buildUploadAddressRequest(url: String, fileName: String): HttpPutBuilder = { + val builder: HttpPutBuilder = new HttpPutBuilder().setUrl(url).addCommonHeader().addFileName(fileName).setEntity(new StringEntity("")).baseAuth(authEncoded) + builder + } + + private def getAuthEncoded: String = { + val user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER) + val passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD) + Base64.getEncoder.encodeToString(s"$user:$passwd".getBytes(StandardCharsets.UTF_8)) + } + + private def generateHttpEntity(iterator: Iterator[InternalRow], schema: StructType): HttpEntity = { + + var entity: Option[HttpEntity] = None + + val compressType = copyIntoProps.getProperty("compression") + val columnSeparator = escapeString(copyIntoProps.getOrDefault("file.column_separator", "\t").toString) + val lineDelimiter = escapeString(copyIntoProps.getOrDefault("file.line_delimiter", "\n").toString) + val addDoubleQuotes = copyIntoProps.getOrDefault("add_double_quotes", "false").toString.toBoolean + val streamingPassthrough: Boolean = isStreaming && settings.getBooleanProperty( + ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH, + ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT) + + val recordBatchString = new RecordBatchString(RecordBatch.newBuilder(iterator.asJava) + .format(format) + .sep(columnSeparator) + .delim(lineDelimiter) + .schema(schema) + .addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough) + val content = recordBatchString.getContent + + if (compressType !=null && !compressType.isEmpty) { + if ("gz".equalsIgnoreCase(compressType) && format == DataFormat.CSV) { + val compressedData = compressByGZ(content) + entity = Some(new ByteArrayEntity(compressedData)) + } + else { + val msg = s"Not support the compress type [$compressType] for the format [$format]" + throw new CopyIntoException(msg) + } + } + else { + entity = Some(new ByteArrayEntity(content.getBytes(StandardCharsets.UTF_8))) + } + + entity.get + + } + + private def getCopyIntoProps: Properties = { + val props = settings.asProperties().asScala.filter(_._1.startsWith(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX)) + .map { case (k,v) => (k.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length), v)} + if (props.getOrElse("add_double_quotes", "false").toBoolean) { + LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop.") + props.put("trim_double_quotes", "true") + } + props.remove("columns") + val properties = new Properties() + properties.putAll(props.mapValues(_.toString).asJava) + properties + } + + + @throws[IOException] + def compressByGZ(content: String): Array[Byte] = { + var compressedData: Array[Byte] = null + try { + val baos = new ByteArrayOutputStream + val gzipOutputStream = new GZIPOutputStream(baos) + try { + gzipOutputStream.write(content.getBytes("UTF-8")) + gzipOutputStream.finish() + compressedData = baos.toByteArray + } finally { + if (baos != null) baos.close() + if (gzipOutputStream != null) gzipOutputStream.close() + } + } + compressedData + } + + private def getHttpClient: CloseableHttpClient = { + HttpClients.custom().disableRedirectHandling().build() + } + + @throws[CopyIntoException] + private def handleGetUploadAddressResponse(response: CloseableHttpResponse): String = { + val code = response.getStatusLine.getStatusCode + val msg = response.getStatusLine.getReasonPhrase + val content = EntityUtils.toString(new BufferedHttpEntity(response.getEntity), StandardCharsets.UTF_8) + val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content) + if (loadResponse.code == 307) { + val uploadAddress:String = response.getFirstHeader("location").getValue + LOG.info(s"Get upload address Response:$loadResponse") + LOG.info(s"Redirect to s3: $uploadAddress") + uploadAddress + } else { + LOG.error(s"Failed get the redirected address, status ${loadResponse.code}, reason ${loadResponse.msg}, response ${loadResponse.content}") + throw new RuntimeException("Could not get the redirected address.") + } + } +} diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 57cacd65..25e86aae 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -365,7 +365,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader val compressType = streamLoadProps.get("compress_type") val columnSeparator = escapeString(streamLoadProps.getOrElse("column_separator", "\t")) - val lineDelimiter = escapeString(streamLoadProps.getOrElse("line_delimiter", "\t")) + val lineDelimiter = escapeString(streamLoadProps.getOrElse("line_delimiter", "\n")) val addDoubleQuotes = streamLoadProps.getOrElse("add_double_quotes", "false").toBoolean val streamingPassthrough: Boolean = isStreaming && settings.getBooleanProperty( ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH, diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index 26491dfc..bd8e9f7d 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -18,7 +18,7 @@ package org.apache.doris.spark.writer import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} -import org.apache.doris.spark.load.{CommitMessage, Loader, StreamLoader} +import org.apache.doris.spark.load.{CommitMessage, CopyIntoLoader, Loader, StreamLoader} import org.apache.doris.spark.sql.Utils import org.apache.doris.spark.txn.TransactionHandler import org.apache.doris.spark.txn.listener.DorisTransactionListener @@ -42,6 +42,7 @@ class DorisWriter(settings: SparkSettings, private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter]) private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) + private val loadMode: String = settings.getProperty(ConfigurationOptions.LOAD_MODE,ConfigurationOptions.DEFAULT_LOAD_MODE) private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean @@ -210,9 +211,11 @@ class DorisWriter(settings: SparkSettings, @throws[IllegalArgumentException] private def generateLoader: Loader = { - val loadMode = settings.getProperty("load_mode", "stream_load") - if ("stream_load".equalsIgnoreCase(loadMode)) new StreamLoader(settings, isStreaming) - else throw new IllegalArgumentException(s"Unsupported load mode: $loadMode") + loadMode match { + case "stream_load" => new StreamLoader(settings, isStreaming) + case "copy_into" => new CopyIntoLoader(settings, isStreaming) + case _ => throw new IllegalArgumentException(s"Unsupported load mode: $loadMode") + } } def getTransactionHandler: TransactionHandler = txnHandler