From 3e745e732fdade8a26856bd92026b44fd02d2787 Mon Sep 17 00:00:00 2001 From: zhaorongsheng Date: Mon, 1 Jul 2024 10:20:40 +0800 Subject: [PATCH] [Improve] decrease memory usage when csv&gzip is on (#212) Co-authored-by: zhaorongsheng --- .../doris/spark/load/StreamLoader.scala | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 9481b6f7..06bb56ff 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -20,6 +20,7 @@ package org.apache.doris.spark.load import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.json.JsonMapper +import org.apache.commons.io.IOUtils import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} @@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType import org.slf4j.{Logger, LoggerFactory} -import java.io.{ByteArrayOutputStream, IOException} +import java.io.{ByteArrayOutputStream, IOException, InputStream} import java.net.{HttpURLConnection, URL} import java.nio.charset.StandardCharsets import java.util @@ -375,14 +376,13 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader if (compressType.nonEmpty) { if ("gz".equalsIgnoreCase(compressType.get) && format == DataFormat.CSV) { - val recordBatchString = new RecordBatchString(RecordBatch.newBuilder(iterator.asJava) + val recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(iterator.asJava) .format(format) .sep(columnSeparator) .delim(lineDelimiter) .schema(schema) .addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough) - val content = recordBatchString.getContent - val compressedData = compressByGZ(content) + val compressedData = compressByGZ(recodeBatchInputStream) entity = Some(new ByteArrayEntity(compressedData)) } else { @@ -457,6 +457,31 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader compressedData } + /** + * compress data by gzip + * + * @param contentInputStream data content + * @throws + * @return compressed byte array data + */ + @throws[IOException] + def compressByGZ(contentInputStream: InputStream): Array[Byte] = { + var compressedData: Array[Byte] = null + try { + val baos = new ByteArrayOutputStream + val gzipOutputStream = new GZIPOutputStream(baos) + try { + IOUtils.copy(contentInputStream, gzipOutputStream) + gzipOutputStream.finish() + compressedData = baos.toByteArray + } finally { + if (baos != null) baos.close() + if (gzipOutputStream != null) gzipOutputStream.close() + } + } + compressedData + } + /** * handle stream load response *