forked from apache/doris-spark-connector
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Feature](load) Support loader with copy into (apache#190)
- Loading branch information
1 parent
a9725bf
commit 6f50c34
Showing
9 changed files
with
565 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package org.apache.doris.spark.exception; | ||
|
||
public class CopyIntoException extends Exception { | ||
public CopyIntoException() { | ||
super(); | ||
} | ||
public CopyIntoException(String message) { | ||
super(message); | ||
} | ||
public CopyIntoException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
public CopyIntoException(Throwable cause) { | ||
super(cause); | ||
} | ||
protected CopyIntoException(String message, Throwable cause, | ||
boolean enableSuppression, | ||
boolean writableStackTrace) { | ||
super(message, cause, enableSuppression, writableStackTrace); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
82 changes: 82 additions & 0 deletions
82
spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package org.apache.doris.spark.util; | ||
|
||
import java.util.Map; | ||
import java.util.Properties; | ||
import java.util.StringJoiner; | ||
import java.util.List; | ||
import java.util.Arrays; | ||
|
||
public class CopySQLBuilder { | ||
private final static String COPY_SYNC = "copy.async"; | ||
private final static String FILE_TYPE = "file.type"; | ||
private final static String FORMAT_KEY = "format"; | ||
private final static String FIELD_DELIMITER_KEY = "column_separator"; | ||
private final static String LINE_DELIMITER_KEY = "line_delimiter"; | ||
private final static String COMPRESSION = "compression"; | ||
|
||
private final String fileName; | ||
private Properties copyIntoProps; | ||
private String tableIdentifier; | ||
private String data_type; | ||
|
||
public CopySQLBuilder(String data_type, Properties copyIntoProps, String tableIdentifier, String fileName) { | ||
this.data_type = data_type; | ||
this.fileName = fileName; | ||
this.tableIdentifier = tableIdentifier; | ||
this.copyIntoProps = copyIntoProps; | ||
} | ||
|
||
public String buildCopySQL() { | ||
StringBuilder sb = new StringBuilder(); | ||
sb.append("COPY INTO ").append(tableIdentifier).append(" FROM @~('{").append(String.format(fileName,"}*')")).append(" PROPERTIES ("); | ||
|
||
//copy into must be sync | ||
copyIntoProps.put(COPY_SYNC, false); | ||
copyIntoProps.put(FILE_TYPE, data_type); | ||
if (data_type.equals("JSON")) { | ||
copyIntoProps.put("file.strip_outer_array", "false"); | ||
} | ||
StringJoiner props = new StringJoiner(","); | ||
for (Map.Entry<Object, Object> entry : copyIntoProps.entrySet()) { | ||
// remove format | ||
if (!String.valueOf(entry.getKey()).equals("format")){ | ||
String key = concatPropPrefix(String.valueOf(entry.getKey())); | ||
String value = String.valueOf(entry.getValue()); | ||
String prop = String.format("'%s'='%s'", key, value); | ||
props.add(prop); | ||
} | ||
} | ||
sb.append(props).append(" )"); | ||
return sb.toString(); | ||
} | ||
|
||
static final List<String> PREFIX_LIST = | ||
Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, COMPRESSION); | ||
|
||
private String concatPropPrefix(String key) { | ||
if (PREFIX_LIST.contains(key)) { | ||
return "file." + key; | ||
} | ||
if (FORMAT_KEY.equals(key)) { | ||
return "file.type"; | ||
} | ||
return key; | ||
} | ||
} |
67 changes: 67 additions & 0 deletions
67
spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package org.apache.doris.spark.util; | ||
|
||
import com.google.common.base.Preconditions; | ||
import org.apache.commons.codec.binary.Base64; | ||
import org.apache.http.HttpEntity; | ||
import org.apache.http.HttpHeaders; | ||
import org.apache.http.client.methods.HttpPost; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class HttpPostBuilder { | ||
String url; | ||
Map<String, String> header; | ||
HttpEntity httpEntity; | ||
|
||
public HttpPostBuilder() { | ||
header = new HashMap<>(); | ||
} | ||
|
||
public HttpPostBuilder setUrl(String url) { | ||
this.url = url; | ||
return this; | ||
} | ||
|
||
public HttpPostBuilder addCommonHeader() { | ||
header.put(HttpHeaders.EXPECT, "100-continue"); | ||
return this; | ||
} | ||
|
||
public HttpPostBuilder baseAuth(String encoded) { | ||
header.put(HttpHeaders.AUTHORIZATION, "Basic " + encoded); | ||
return this; | ||
} | ||
|
||
public HttpPostBuilder setEntity(HttpEntity httpEntity) { | ||
this.httpEntity = httpEntity; | ||
return this; | ||
} | ||
|
||
public HttpPost build() { | ||
Preconditions.checkNotNull(url); | ||
Preconditions.checkNotNull(httpEntity); | ||
HttpPost put = new HttpPost(url); | ||
header.forEach(put::setHeader); | ||
put.setEntity(httpEntity); | ||
return put; | ||
} | ||
} |
81 changes: 81 additions & 0 deletions
81
spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package org.apache.doris.spark.util; | ||
|
||
import com.google.common.base.Preconditions; | ||
import org.apache.commons.codec.binary.Base64; | ||
import org.apache.http.HttpEntity; | ||
import org.apache.http.HttpHeaders; | ||
import org.apache.http.client.methods.HttpPut; | ||
import org.apache.http.entity.StringEntity; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class HttpPutBuilder { | ||
String url; | ||
Map<String, String> header; | ||
HttpEntity httpEntity; | ||
public HttpPutBuilder() { | ||
header = new HashMap<>(); | ||
} | ||
|
||
public HttpPutBuilder setUrl(String url) { | ||
this.url = url; | ||
return this; | ||
} | ||
|
||
public HttpPutBuilder addFileName(String fileName){ | ||
header.put("fileName", fileName); | ||
return this; | ||
} | ||
|
||
public HttpPutBuilder setEmptyEntity() { | ||
try { | ||
this.httpEntity = new StringEntity(""); | ||
} catch (Exception e) { | ||
throw new IllegalArgumentException(e); | ||
} | ||
return this; | ||
} | ||
|
||
public HttpPutBuilder addCommonHeader() { | ||
header.put(HttpHeaders.EXPECT, "100-continue"); | ||
return this; | ||
} | ||
|
||
public HttpPutBuilder baseAuth(String encoded) { | ||
header.put(HttpHeaders.AUTHORIZATION, "Basic " + encoded); | ||
return this; | ||
} | ||
|
||
public HttpPutBuilder setEntity(HttpEntity httpEntity) { | ||
this.httpEntity = httpEntity; | ||
return this; | ||
} | ||
|
||
public HttpPut build() { | ||
Preconditions.checkNotNull(url); | ||
Preconditions.checkNotNull(httpEntity); | ||
HttpPut put = new HttpPut(url); | ||
header.forEach(put::setHeader); | ||
put.setEntity(httpEntity); | ||
return put; | ||
} | ||
} |
Oops, something went wrong.