Skip to content

Commit

Permalink
[Improve] decrease memory usage when csv&gzip is on (apache#212)
Browse files Browse the repository at this point in the history
Co-authored-by: zhaorongsheng <[email protected]>
  • Loading branch information
zhaorongsheng and zhaorongsheng authored Jul 1, 2024
1 parent 1df1c96 commit 3e745e7
Showing 1 changed file with 29 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.doris.spark.load
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.json.JsonMapper
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
Expand All @@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.slf4j.{Logger, LoggerFactory}

import java.io.{ByteArrayOutputStream, IOException}
import java.io.{ByteArrayOutputStream, IOException, InputStream}
import java.net.{HttpURLConnection, URL}
import java.nio.charset.StandardCharsets
import java.util
Expand Down Expand Up @@ -375,14 +376,13 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader

if (compressType.nonEmpty) {
if ("gz".equalsIgnoreCase(compressType.get) && format == DataFormat.CSV) {
val recordBatchString = new RecordBatchString(RecordBatch.newBuilder(iterator.asJava)
val recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(iterator.asJava)
.format(format)
.sep(columnSeparator)
.delim(lineDelimiter)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough)
val content = recordBatchString.getContent
val compressedData = compressByGZ(content)
val compressedData = compressByGZ(recodeBatchInputStream)
entity = Some(new ByteArrayEntity(compressedData))
}
else {
Expand Down Expand Up @@ -457,6 +457,31 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader
compressedData
}

/**
* compress data by gzip
*
* @param contentInputStream data content
* @throws
* @return compressed byte array data
*/
@throws[IOException]
def compressByGZ(contentInputStream: InputStream): Array[Byte] = {
var compressedData: Array[Byte] = null
try {
val baos = new ByteArrayOutputStream
val gzipOutputStream = new GZIPOutputStream(baos)
try {
IOUtils.copy(contentInputStream, gzipOutputStream)
gzipOutputStream.finish()
compressedData = baos.toByteArray
} finally {
if (baos != null) baos.close()
if (gzipOutputStream != null) gzipOutputStream.close()
}
}
compressedData
}

/**
* handle stream load response
*
Expand Down

0 comments on commit 3e745e7

Please sign in to comment.