From 6d23d7cf58f9e6cf0a87248653402f75c53c39e5 Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:13:15 +0800 Subject: [PATCH 01/15] fix datasource error (#680) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- .../org/apache/linkis/metadata/service/impl/MdqServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java index 4fa9f49175..25afc1c44a 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java @@ -376,7 +376,7 @@ public String getTableLocation(MetadataQueryParam queryParam) { private int getTableFileNum(String tableLocation) throws IOException { int tableFileNum = 0; - if (StringUtils.isNotBlank(tableLocation)) { + if (StringUtils.isNotBlank(tableLocation) && getRootHdfs().exists(new Path(tableLocation))) { FileStatus tableFile = getFileStatus(tableLocation); tableFileNum = (int) getRootHdfs().getContentSummary(tableFile.getPath()).getFileCount(); } From 8b6c5c7834cba096a620ebfab977d627d6a22b8f Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:16:13 +0800 Subject: [PATCH 02/15] Code Optimization (#681) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Code Optimization * Code Optimization --------- Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- .../org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala | 4 ++-- .../engineplugin/io/executor/IoEngineConnExecutor.scala | 5 ++++- .../linkis/manager/engineplugin/io/utils/IOHelp.scala | 8 ++++---- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala index cbfa94b232..7a3fdf0f7e 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala @@ -176,12 +176,12 @@ class UJESSQLResultSet( return } val metaTmp = resultSetResult.getMetadata - if ("NULL".equals(String.valueOf(metaTmp))) { + if (NULL_VALUE.equals(String.valueOf(metaTmp))) { val fileContentList = resultSetResult.getFileContent.asInstanceOf[util.List[util.List[String]]] if (null != fileContentList) { resultSetMetaData.setColumnNameProperties(1, "linkis_string") resultSetMetaData.setDataTypeProperties(1, "String") - resultSetMetaData.setCommentPropreties(1, "NULL") + resultSetMetaData.setCommentPropreties(1, NULL_VALUE) } } else { metaData = metaTmp.asInstanceOf[util.List[util.Map[String, String]]] diff --git a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala index e8feb0d354..a9ec71ba00 100644 --- a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala @@ -60,6 +60,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import com.google.gson.internal.LinkedTreeMap + class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10) extends ConcurrentComputationExecutor(outputLimit) with Logging { @@ -322,7 +324,8 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10) s"Creator ${methodEntity.creatorUser} for user ${methodEntity.proxyUser} init fs $methodEntity" ) var fsId = methodEntity.id - val properties = methodEntity.params(0).asInstanceOf[Map[String, String]] + val properties = + methodEntity.params(0).asInstanceOf[LinkedTreeMap[String, String]].asScala.toMap val proxyUser = methodEntity.proxyUser if (!fsProxyService.canProxyUser(methodEntity.creatorUser, proxyUser, methodEntity.fsType)) { throw new StorageErrorException( diff --git a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala index b6f35a2e8e..8734dd7191 100644 --- a/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala +++ b/linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/utils/IOHelp.scala @@ -62,16 +62,16 @@ object IOHelp { writer.toString() } else if (method.params.length == 3) { val position = - if (method.params(1).toString.toInt < 0) { + if (method.params(1).toString.toDouble.toInt < 0) { 0 } else { - method.params(1).toString.toInt + method.params(1).toString.toDouble.toInt } val fetchSize = - if (method.params(2).toString.toInt > maxPageSize) { + if (method.params(2).toString.toDouble.toInt > maxPageSize) { maxPageSize.toInt } else { - method.params(2).toString.toInt + method.params(2).toString.toDouble.toInt } if (position > 0) { inputStream.skip(position) From 37337c5a73961edb5c7589c5708f87de3e8854ac Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Wed, 18 Dec 2024 15:34:45 +0800 Subject: [PATCH 03/15] feat: Change the ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED configuration to be obtained at runtime (#684) * feat: Change the ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED configuration to be obtained at runtime * feat: Change the ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED configuration to be obtained at runtime --- .../execute/EngineExecutionContext.scala | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala index 4a13da3c25..35a1a1ac58 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala @@ -31,6 +31,7 @@ import org.apache.linkis.engineconn.acessible.executor.listener.event.{ import org.apache.linkis.engineconn.acessible.executor.log.LogHelper import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf import org.apache.linkis.engineconn.computation.executor.cs.CSTableResultSetWriter +import org.apache.linkis.engineconn.core.EngineConnObject import org.apache.linkis.engineconn.executor.ExecutorExecutionContext import org.apache.linkis.engineconn.executor.entity.Executor import org.apache.linkis.engineconn.executor.listener.{ @@ -191,15 +192,28 @@ class EngineExecutionContext(executor: ComputationExecutor, executorUser: String } else { var taskLog = log val limitLength = ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue - if ( - ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue && - log.length > limitLength - ) { - taskLog = s"${log.substring(0, limitLength)}..." - logger.info("The log is too long and will be intercepted,log limit length : {}", limitLength) + val limitEnableObj = + properties.get(ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.key) + val limitEnable = + if (limitEnableObj == null) { + ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue + } else { + limitEnableObj.toString.toBoolean + } + if (limitEnable) { + if (log.length > limitLength) { + taskLog = s"${log.substring(0, limitLength)}..." + logger.info( + "The log is too long and will be intercepted,log limit length : {}", + limitLength + ) + } } if (!AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue) { - LogHelper.cacheLog(taskLog) + val taskLogs = taskLog.split("\n") + taskLogs.foreach(line => { + LogHelper.cacheLog(line) + }) } else { val listenerBus = getEngineSyncListenerBus getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, taskLog))) From 30a72f7a5c2b287f05ad7fd3cef0129425a3e444 Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Wed, 18 Dec 2024 15:43:50 +0800 Subject: [PATCH 04/15] feat: Parse UDF function name interface submission (#685) --- .../apache/linkis/udf/api/UDFRestfulApi.java | 33 +++++++++++ .../org/apache/linkis/udf/conf/Constants.java | 1 + .../org/apache/linkis/udf/utils/UdfUtils.java | 56 +++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java index 50916755a0..2edb76c15a 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java @@ -23,6 +23,7 @@ import org.apache.linkis.server.utils.ModuleUserUtils; import org.apache.linkis.storage.FSFactory; import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageUtils$; import org.apache.linkis.udf.conf.Constants; import org.apache.linkis.udf.entity.PythonModuleInfo; import org.apache.linkis.udf.entity.UDFInfo; @@ -1437,4 +1438,36 @@ public Message pythonUpload( .data("dependencies", dependencies) .data("fileName", fileName); } + + @ApiImplicitParam( + name = "path", + dataType = "String", + value = "path", + example = "file:///test-dir/test-sub-dir/test1012_01.py") + @RequestMapping(path = "/get-register-functions", method = RequestMethod.GET) + public Message getRegisterFunctions(HttpServletRequest req, @RequestParam("path") String path) { + if (StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_PY) + || StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_SCALA)) { + if (StringUtils.startsWithIgnoreCase(path, StorageUtils$.MODULE$.FILE_SCHEMA())) { + try { + FsPath fsPath = new FsPath(path); + // 获取文件系统实例 + FileSystem fileSystem = (FileSystem) FSFactory.getFs(fsPath); + fileSystem.init(null); + if (fileSystem.canRead(fsPath)) { + return Message.ok() + .data("functions", UdfUtils.getRegisterFunctions(fileSystem, fsPath, path)); + } else { + return Message.error("您没有权限访问该文件"); + } + } catch (Exception e) { + return Message.error("解析文件失败,错误信息:" + e); + } + } else { + return Message.error("仅支持本地文件"); + } + } else { + return Message.error("仅支持.py和.scala文件"); + } + } } diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/conf/Constants.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/conf/Constants.java index ad2692333d..14f93370fa 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/conf/Constants.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/conf/Constants.java @@ -22,6 +22,7 @@ public class Constants { public static final String FILE_EXTENSION_PY = ".py"; public static final String FILE_EXTENSION_ZIP = ".zip"; + public static final String FILE_EXTENSION_SCALA = ".scala"; public static final String FILE_EXTENSION_TAR_GZ = ".tar.gz"; public static final String FILE_PERMISSION = "770"; public static final String DELIMITER_COMMA = ","; diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java index f72ff96dfe..bef01af34f 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java @@ -29,6 +29,10 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.linkis.common.utils.JsonUtils; +import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageUtils$; +import com.fasterxml.jackson.core.type.TypeReference; import org.springframework.web.multipart.MultipartFile; @@ -280,4 +284,56 @@ public static List extractDependencies(String content, String name) { } return modules; } + + public static List getRegisterFunctions(FileSystem fileSystem, FsPath fsPath, String path) + throws Exception { + try (InputStream is = fileSystem.read(fsPath)) { + // 将inputstream内容转换为字符串 + String content = IOUtils.toString(is, StandardCharsets.UTF_8); + if (StringUtils.endsWith(path, Constants.FILE_EXTENSION_PY)) { + // 解析python文件 + return extractPythonMethodNames(path); + } else if (StringUtils.endsWith(path, Constants.FILE_EXTENSION_SCALA)) { + // 解析scala代码 + return extractScalaMethodNames(content); + } else { + throw new UdfException(80041, "Unsupported file type: " + path); + } + } catch (IOException e) { + throw new UdfException(80042, "Failed to read file: " + path, e); + } + } + + public static List extractScalaMethodNames(String scalaCode) { + List methodNames = new ArrayList<>(); + // 正则表达式匹配方法定义,包括修饰符 + String regex = "(\\b(private|protected)\\b\\s+)?\\bdef\\s+(\\w+)\\b"; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(scalaCode); + logger.info("use regex to get scala method names.. reg:{}", regex); + while (matcher.find()) { + String methodName = matcher.group(3); + methodNames.add(methodName); + } + + return methodNames; + } + + public static List extractPythonMethodNames(String udfPath) throws Exception { + String localPath = udfPath.replace(StorageUtils$.MODULE$.FILE_SCHEMA(), ""); + String exec = + Utils.exec( + (new String[] { + Constants.PYTHON_COMMAND.getValue(), + Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py", + localPath + })); + logger.info( + "execute python script to get python method name...{} {} {}", + Constants.PYTHON_COMMAND.getValue(), + Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py", + localPath); + // 将exec转换为List,exec为一个json数组 + return JsonUtils.jackson().readValue(exec, new TypeReference>() {}); + } } From aea92fda8deb302856e6435259b9b5011f59949d Mon Sep 17 00:00:00 2001 From: taoran1250 <543121890@qq.com> Date: Wed, 18 Dec 2024 16:08:16 +0800 Subject: [PATCH 05/15] Dev 1.10.0 webank merge udf register function (#686) * feat: Parse UDF function name interface submission * feat: Parse UDF function name interface submission --- .../admin/linkis_udf_get_python_methods.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 linkis-dist/package/admin/linkis_udf_get_python_methods.py diff --git a/linkis-dist/package/admin/linkis_udf_get_python_methods.py b/linkis-dist/package/admin/linkis_udf_get_python_methods.py new file mode 100644 index 0000000000..d3f8268141 --- /dev/null +++ b/linkis-dist/package/admin/linkis_udf_get_python_methods.py @@ -0,0 +1,18 @@ +import sys +import ast +import json + +def extract_method_names(file_path): + with open(file_path, 'r') as file: + code = file.read() + tree = ast.parse(code) + method_names = set() + + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + method_names.add(node.name) + + return json.dumps(list(method_names), indent=4) + +file_path = sys.argv[1] +print(extract_method_names(file_path)) From 0d5d9f59915cd8f61fd23f3e342b41094c11d902 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Thu, 19 Dec 2024 11:13:33 +0800 Subject: [PATCH 06/15] entrance suport priority queue (#679) * entrance suport priority queue * code optimization * code optimization for priority queue * code optimization * code optimization * update test * deal with Case * code optimization * optimization code * hive engine support extra opts --------- Co-authored-by: Casion --- .../linkis/scheduler/util/SchedulerUtils.java | 84 ++++++ .../conf/SchedulerConfiguration.scala | 17 ++ .../queue/PriorityLoopArrayQueue.scala | 270 ++++++++++++++++++ .../scheduler/queue/SchedulerEvent.scala | 12 + .../queue/fifoqueue/FIFOConsumerManager.scala | 5 +- .../ParallelConsumerManager.scala | 16 +- .../queue/PriorityLoopArrayQueueTest.java | 206 +++++++++++++ .../apache/linkis/scheduler/queue/Test.scala | 46 +++ .../scheduler/util/TestSchedulerUtils.scala | 73 +++++ .../linkis/entrance/EntranceServer.scala | 24 +- .../hive/conf/HiveEngineConfiguration.scala | 6 + .../HiveProcessEngineConnLaunchBuilder.scala | 12 + 12 files changed, 761 insertions(+), 10 deletions(-) create mode 100644 linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java create mode 100644 linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala create mode 100644 linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java create mode 100644 linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/Test.scala create mode 100644 linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala diff --git a/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java b/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java new file mode 100644 index 0000000000..62191aa20e --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.util; + +import org.apache.linkis.scheduler.conf.SchedulerConfiguration; + +import org.apache.commons.lang3.StringUtils; + +public class SchedulerUtils { + private static final String EVENT_ID_SPLIT = "_"; + private static final String ALL_CREATORS = "ALL_CREATORS"; + private static final String SPACIAL_USER_SPLIT = "_v_"; + + /** + * support priority queue with config username or creator + * + * @param groupName + * @return + */ + public static boolean isSupportPriority(String groupName) { + String users = SchedulerConfiguration.SUPPORT_PRIORITY_TASK_USERS(); + if (StringUtils.isEmpty(users)) { + return false; + } + String userName = getUserFromGroupName(groupName); + if (StringUtils.isEmpty(userName)) { + return false; + } + String creators = SchedulerConfiguration.SUPPORT_PRIORITY_TASK_CREATORS(); + creators = creators.toLowerCase(); + users = users.toLowerCase(); + if (ALL_CREATORS.equalsIgnoreCase(creators)) { + return users.contains(userName.toLowerCase()); + } else { + String creatorName = getCreatorFromGroupName(groupName); + return users.contains(userName.toLowerCase()) && creators.contains(creatorName.toLowerCase()); + } + } + + public static String getUserFromGroupName(String groupName) { + if (groupName.contains(SPACIAL_USER_SPLIT)) { + int vIndex = groupName.lastIndexOf(SPACIAL_USER_SPLIT); + int lastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT); + String user = groupName.substring(vIndex + 1, lastIndex); + return user; + } + String[] groupNames = groupName.split(EVENT_ID_SPLIT); + String user = groupNames[groupNames.length - 2]; + return user; + } + + public static String getEngineTypeFromGroupName(String groupName) { + String[] groupNames = groupName.split(EVENT_ID_SPLIT); + String ecType = groupNames[groupNames.length - 1]; + return ecType; + } + + public static String getCreatorFromGroupName(String groupName) { + if (groupName.contains(SPACIAL_USER_SPLIT)) { + int vIndex = groupName.lastIndexOf(SPACIAL_USER_SPLIT); + String creatorName = groupName.substring(0, vIndex); + return creatorName; + } + int lastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT); + int secondLastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT, lastIndex - 1); + String creatorName = groupName.substring(0, secondLastIndex); + return creatorName; + } +} diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala index e3b76ac4e7..24e7d3c4f0 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala @@ -36,4 +36,21 @@ object SchedulerConfiguration { val MAX_GROUP_ALTER_WAITING_SIZE = CommonVars("linkis.fifo.consumer.group.max.alter.waiting.size", 1000).getValue + // support fifo pfifo + val FIFO_QUEUE_STRATEGY = + CommonVars("linkis.fifo.queue.strategy", FIFO_SCHEDULER_STRATEGY).getValue + + val SUPPORT_PRIORITY_TASK_USERS = + CommonVars("linkis.fifo.queue.support.priority.users", "").getValue + + val SUPPORT_PRIORITY_TASK_CREATORS = + CommonVars("linkis.fifo.queue.support.priority.creators", "ALL_CREATORS").getValue + + val MAX_PRIORITY_QUEUE_CACHE_SIZE = + CommonVars("linkis.fifo.priority.queue.max.cache.size", 1000).getValue + + val ENGINE_PRIORITY_RUNTIME_KEY = "wds.linkis.engine.runtime.priority" + + val PFIFO_SCHEDULER_STRATEGY = "pfifo" + val FIFO_SCHEDULER_STRATEGY = "fifo" } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala new file mode 100644 index 0000000000..fd3fecc71b --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.queue + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.scheduler.conf.SchedulerConfiguration + +import java.util +import java.util.Comparator +import java.util.concurrent.PriorityBlockingQueue +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantReadWriteLock + +/** + * 优先级队列元素 + * @param element + * 实际元素 + * @param priority + * 优先级 + * @param index + * 唯一索引 + */ +case class PriorityQueueElement(element: Any, priority: Int, index: Int) + +/** + * 固定大小集合,元素满后会移除最先插入集合的元素 + * @param maxSize + * 集合大小 + * @tparam K + * @tparam V + */ +class FixedSizeCollection[K, V](val maxSize: Int) extends util.LinkedHashMap[K, V] { + // 当集合大小超过最大值时,返回true,自动删除最老的元素 + protected override def removeEldestEntry(eldest: util.Map.Entry[K, V]): Boolean = size > maxSize +} + +/** + * 优先级队列,优先级相同时先进先出 + * @param group + */ +class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging { + + private val maxCapacity = group.getMaximumCapacity + + /** 优先级队列 */ + private val priorityEventQueue = new PriorityBlockingQueue[PriorityQueueElement]( + group.getMaximumCapacity, + new Comparator[PriorityQueueElement] { + + override def compare(o1: PriorityQueueElement, o2: PriorityQueueElement): Int = + if (o1.priority != o2.priority) o2.priority - o1.priority + else o1.index - o2.index + + } + ) + + /** 累加器 1.越先进队列值越小,优先级相同时控制先进先出 2.队列元素唯一索引,不会重复 */ + private val index = new AtomicInteger + + /** 记录队列中当前所有元素索引,元素存入优先级队列时添加,从优先级队列移除时删除 */ + private val indexMap = new util.HashMap[Int, SchedulerEvent]() + + /** 记录已经消费的元素,会有固定缓存大小,默认1000,元素从优先级队列移除时添加 */ + private val fixedSizeCollection = + new FixedSizeCollection[Integer, SchedulerEvent]( + SchedulerConfiguration.MAX_PRIORITY_QUEUE_CACHE_SIZE + ) + + private val rwLock = new ReentrantReadWriteLock + + protected[this] var realSize = size + override def isEmpty: Boolean = size <= 0 + override def isFull: Boolean = size >= maxCapacity + def size: Int = priorityEventQueue.size + + /** + * 将元素添加进队列 + * @param element + * @return + */ + private def addToPriorityQueue(element: PriorityQueueElement): Boolean = { + priorityEventQueue.offer(element) + rwLock.writeLock.lock + Utils.tryFinally(indexMap.put(element.index, element.element.asInstanceOf[SchedulerEvent]))( + rwLock.writeLock.unlock() + ) + true + } + + /** + * 从队列中获取并移除元素 + * @return + */ + private def getAndRemoveTop: SchedulerEvent = { + val top: PriorityQueueElement = priorityEventQueue.take() + rwLock.writeLock.lock + Utils.tryFinally { + indexMap.remove(top.index) + fixedSizeCollection.put(top.index, top.element.asInstanceOf[SchedulerEvent]) + }(rwLock.writeLock.unlock()) + top.element.asInstanceOf[SchedulerEvent] + } + + override def remove(event: SchedulerEvent): Unit = { + get(event).foreach(x => x.cancel()) + } + + override def getWaitingEvents: Array[SchedulerEvent] = { + toIndexedSeq + .filter(x => + x.getState.equals(SchedulerEventState.Inited) || x.getState + .equals(SchedulerEventState.Scheduled) + ) + .toArray + } + + override def clearAll(): Unit = priorityEventQueue synchronized { + realSize = 0 + index.set(0) + priorityEventQueue.clear() + fixedSizeCollection.clear() + indexMap.clear() + } + + override def get(event: SchedulerEvent): Option[SchedulerEvent] = { + val eventSeq = toIndexedSeq.filter(x => x.getId.equals(event.getId)).seq + if (eventSeq.size > 0) Some(eventSeq(0)) else None + } + + /** + * 根据索引获取队列元素 + * @param index + * @return + */ + override def get(index: Int): Option[SchedulerEvent] = { + if (!indexMap.containsKey(index) && !fixedSizeCollection.containsKey(index)) { + throw new IllegalArgumentException( + "The index " + index + " has already been deleted, now index must be better than " + index + ) + } + rwLock.readLock().lock() + Utils.tryFinally { + if (fixedSizeCollection.get(index) != null) Option(fixedSizeCollection.get(index)) + else Option(indexMap.get(index)) + }(rwLock.readLock().unlock()) + } + + override def getGroup: Group = group + + override def setGroup(group: Group): Unit = { + this.group = group + } + + def toIndexedSeq: IndexedSeq[SchedulerEvent] = if (size == 0) { + IndexedSeq.empty[SchedulerEvent] + } else { + priorityEventQueue + .toArray() + .map(_.asInstanceOf[PriorityQueueElement].element.asInstanceOf[SchedulerEvent]) + .toIndexedSeq + } + + def add(event: SchedulerEvent): Int = { + // 每次添加的时候需要给计数器+1,优先级相同时,控制先进先出 + event.setIndex(index.addAndGet(1)) + addToPriorityQueue(PriorityQueueElement(event, event.getPriority, event.getIndex)) + event.getIndex + } + + override def waitingSize: Int = size + + /** + * Add one, if the queue is full, it will block until the queue is + * available(添加一个,如果队列满了,将会一直阻塞,直到队列可用) + * + * @return + * Return index subscript(返回index下标) + */ + override def put(event: SchedulerEvent): Int = { + add(event) + } + + /** + * Add one, return None if the queue is full(添加一个,如果队列满了,返回None) + * + * @return + */ + override def offer(event: SchedulerEvent): Option[Int] = { + if (isFull) None else Some(add(event)) + } + + /** + * Get the latest SchedulerEvent of a group, if it does not exist, it will block + * [
(获取某个group最新的SchedulerEvent,如果不存在,就一直阻塞
) This method will move the pointer(该方法会移动指针) + * + * @return + */ + override def take(): SchedulerEvent = { + getAndRemoveTop + } + + /** + * Get the latest SchedulerEvent of a group, if it does not exist, block the maximum waiting + * time
(获取某个group最新的SchedulerEvent,如果不存在,就阻塞到最大等待时间
) This method will move the + * pointer(该方法会移动指针) + * @param mills + * Maximum waiting time(最大等待时间) + * @return + */ + override def take(mills: Long): Option[SchedulerEvent] = { + if (waitingSize == 0) { + Thread.sleep(mills) + } + if (waitingSize == 0) None else Option(getAndRemoveTop) + } + + /** + * Get the latest SchedulerEvent of a group and move the pointer to the next one. If not, return + * directly to None 获取某个group最新的SchedulerEvent,并移动指针到下一个。如果没有,直接返回None + * + * @return + */ + override def poll(): Option[SchedulerEvent] = { + if (waitingSize == 0) None + else Option(getAndRemoveTop) + } + + /** + * Only get the latest SchedulerEvent of a group, and do not move the pointer. If not, return + * directly to None 只获取某个group最新的SchedulerEvent,并不移动指针。如果没有,直接返回None + * + * @return + */ + override def peek(): Option[SchedulerEvent] = { + val ele: PriorityQueueElement = priorityEventQueue.peek() + if (ele == null) None else Option(ele.element.asInstanceOf[SchedulerEvent]) + } + + /** + * Get the latest SchedulerEvent whose group satisfies the condition and does not move the + * pointer. If not, return directly to None 获取某个group满足条件的最新的SchedulerEvent,并不移动指针。如果没有,直接返回None + * @param op + * 满足的条件 + * @return + */ + override def peek(op: (SchedulerEvent) => Boolean): Option[SchedulerEvent] = { + val ele: PriorityQueueElement = priorityEventQueue.peek() + if (ele == null) return None + val event: Option[SchedulerEvent] = Option( + priorityEventQueue.peek().element.asInstanceOf[SchedulerEvent] + ) + if (op(event.get)) event else None + } + +} diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala index 4f384d2384..3e87a06930 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala @@ -32,9 +32,13 @@ trait SchedulerEvent extends Logging { protected var scheduledTime: Long = 0L protected var startTime: Long = 0L protected var endTime: Long = 0L + protected var priority: Int = 100 + protected var index: Int = 0 def getEndTime: Long = endTime def getStartTime: Long = startTime + def getPriority: Int = priority + def getIndex: Int = index /* * To be compatible with old versions. @@ -50,6 +54,14 @@ trait SchedulerEvent extends Logging { this synchronized notify() } + def setPriority(priority: Int): Unit = { + this.priority = priority + } + + def setIndex(index: Int): Unit = { + this.index = index + } + def turnToScheduled(): Boolean = if (!isWaiting) { false } else { diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala index e95e172e06..02091e4f79 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala @@ -19,10 +19,11 @@ package org.apache.linkis.scheduler.queue.fifoqueue import org.apache.linkis.common.utils.Utils import org.apache.linkis.scheduler.SchedulerContext +import org.apache.linkis.scheduler.conf.SchedulerConfiguration.FIFO_QUEUE_STRATEGY import org.apache.linkis.scheduler.errorcode.LinkisSchedulerErrorCodeSummary._ import org.apache.linkis.scheduler.exception.SchedulerErrorException import org.apache.linkis.scheduler.listener.ConsumerListener -import org.apache.linkis.scheduler.queue.{Consumer, ConsumerManager, Group, LoopArrayQueue} +import org.apache.linkis.scheduler.queue._ import java.text.MessageFormat import java.util.concurrent.{ExecutorService, ThreadPoolExecutor} @@ -34,7 +35,7 @@ class FIFOConsumerManager(groupName: String) extends ConsumerManager { private var group: Group = _ private var executorService: ThreadPoolExecutor = _ private var consumerListener: ConsumerListener = _ - private var consumerQueue: LoopArrayQueue = _ + private var consumerQueue: ConsumeQueue = _ private var consumer: Consumer = _ override def setSchedulerContext(schedulerContext: SchedulerContext): Unit = { diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala index c64158e6e8..b079f12006 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala @@ -19,9 +19,14 @@ package org.apache.linkis.scheduler.queue.parallelqueue import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils} import org.apache.linkis.scheduler.conf.SchedulerConfiguration +import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ + FIFO_QUEUE_STRATEGY, + PFIFO_SCHEDULER_STRATEGY +} import org.apache.linkis.scheduler.listener.ConsumerListener import org.apache.linkis.scheduler.queue._ import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer +import org.apache.linkis.scheduler.util.SchedulerUtils.isSupportPriority import java.util.concurrent.{ExecutorService, TimeUnit} @@ -111,7 +116,16 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String) val newConsumer = createConsumer(groupName) val group = getSchedulerContext.getOrCreateGroupFactory.getGroup(groupName) newConsumer.setGroup(group) - newConsumer.setConsumeQueue(new LoopArrayQueue(group)) + val fifoQueueStrategy: String = FIFO_QUEUE_STRATEGY.toLowerCase() + // 需要判断人员是否是指定部门 + val consumerQueue: ConsumeQueue = + if ( + PFIFO_SCHEDULER_STRATEGY + .equals(fifoQueueStrategy) && isSupportPriority(groupName) + ) { + new PriorityLoopArrayQueue(group) + } else new LoopArrayQueue(group) + newConsumer.setConsumeQueue(consumerQueue) consumerListener.foreach(_.onConsumerCreated(newConsumer)) newConsumer.start() newConsumer diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java new file mode 100644 index 0000000000..f0a8c82696 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.queue; + +import org.apache.linkis.scheduler.queue.fifoqueue.FIFOGroup; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import scala.Option; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +class PriorityLoopArrayQueueTest { + AtomicInteger productCounter = new AtomicInteger(); + AtomicInteger consumerCounter = new AtomicInteger(); + + @Test + public void testConcurrentPutAndTake() throws Exception { + AtomicInteger counter = new AtomicInteger(); + FIFOGroup group = new FIFOGroup("test", 5000, 5000); + PriorityLoopArrayQueue queue = new PriorityLoopArrayQueue(group); + + // 获取开始时间的毫秒数 + long startTime = System.currentTimeMillis(); + // 三分钟的毫秒数 + long threeMinutesInMillis = 1 * 30 * 1000; + int genLen = 5; + int getLen = 7; + final CountDownLatch latch = new CountDownLatch(genLen + getLen + 1); + // 5 个生产者 + for (int i = 0; i < genLen; i++) { + final int id = i; + new Thread(() -> { + try{ + Thread.sleep(100 * id); + latch.countDown(); + latch.await(); + } catch (InterruptedException e){ + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName() + "开始生产:"); + while ((System.currentTimeMillis() - startTime) < threeMinutesInMillis) { + //生产 + try { + Thread.sleep(getRandom(200)); + product(counter, queue); + product(counter, queue); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + //消费 + //consume(queue); + } + System.out.println(Thread.currentThread().getName() + "结束生产:"); + }, "生产t-" + i).start(); + } + // 5 个消费者 + for (int i = 0; i < getLen; i++) { + final int id = i; + new Thread(() -> { + try{ + Thread.sleep(getRandom(200)); + latch.countDown(); + latch.await(); + } catch (InterruptedException e){ + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName() + "开始消费:"); + while (true) { + try { + Thread.sleep(getRandom(200)); + //消费 + consume(queue); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + }, "消费t-" + i).start(); + } + new Thread(() -> { + try { + Thread.sleep(100); + latch.countDown(); + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName() + "开始获取当前队列元素:"); + while ((System.currentTimeMillis() - startTime) < threeMinutesInMillis * 2) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("生产大小:" + productCounter.get()); + System.out.println("消费大小:" + consumerCounter.get()); + System.out.println("队列当前大小:" + queue.size()); + // 需要 去掉私有测试 + //System.out.println("index size: " + queue.indexMap().size()); + //System.out.println("cache size: " + queue.fixedSizeCollection().size()); + } + }).start(); + Thread.sleep(threeMinutesInMillis * 2); + System.out.println("product:" + productCounter.get() + ", consumer: " + consumerCounter.get()); + // 需要 去掉私有测试 + //Assertions.assertEquals(1000, queue.fixedSizeCollection().size()); + Assertions.assertEquals(productCounter.get(), consumerCounter.get()); + } + + //消费 + private void consume(PriorityLoopArrayQueue queue) { + SchedulerEvent take = null; + try { + take = queue.take(); + consumerCounter.addAndGet(1); + } catch (Exception e) { + throw new RuntimeException(e); + } + printEvent("消费" , take); + } + + //生产 + private void product(AtomicInteger counter, PriorityLoopArrayQueue queue) { + int i1 = counter.addAndGet(1); + //1000-重要,100-普通,10-不重要 + int[] proArr = {1000, 100, 10}; + int priority = getRandom(3); + String name = "item-" + i1 + "-" + priority; + System.out.println("生产:" + name); + Option offer = queue.offer(getJob(name, proArr[priority])); + if (offer.nonEmpty()) { + productCounter.addAndGet(1); + Option schedulerEventOption = queue.get((int) offer.get()); + printEvent("get:", schedulerEventOption.get()); + } else { + System.out.println("当前队列已满,大小:" + queue.size()); + } + } + @Test + void testFinally() { + + } + @Test + void enqueue() { + // 压测 offer take get + FIFOGroup group = new FIFOGroup("test", 100, 100); + PriorityLoopArrayQueue queue = new PriorityLoopArrayQueue(group); + Option idx = queue.offer(getJob("job1-1", 1)); + //插入测试 + Assertions.assertEquals(1, (int)idx.get()); + queue.offer(getJob("job2", 2)); + queue.offer(getJob("job3", 3)); + queue.offer(getJob("job1-2", 1)); + queue.offer(getJob("job5", 5)); + queue.offer(getJob("item1-3", 1)); + queue.offer(getJob("item6-1", 6)); + queue.offer(getJob("item4", 4)); + queue.offer(getJob("item6-2", 6)); + //peek 测试 + Option peek = queue.peek(); + Assertions.assertEquals("item6-1", peek.get().getId()); + while (queue.size() > 1) { + queue.take(); + } + SchedulerEvent event = queue.take(); + //优先级,以及先进先出测试 + Assertions.assertEquals("item1-3", event.getId()); + Assertions.assertEquals(1, event.priority()); + Assertions.assertEquals(6, event.getIndex()); + //缓存测试,需要设置 linkis.fifo.priority.queue.max.cache.size 为 5 + Assertions.assertThrows(IllegalArgumentException.class, () -> {queue.get(7);}); + + } + + private void printEvent(String opt, SchedulerEvent event) { + System.out.println("【" + Thread.currentThread().getName() + "】" + opt + ":" + event.getId() + ", priority: " + event.getPriority() + ", index: " + event.getIndex()); + } + private int getRandom(int bound){ + Random rand = new Random(); + int res = rand.nextInt(bound); + return res; + } + private UserJob getJob(String name, int priority) { + UserJob job = new UserJob(); + job.setId(name); + job.setPriority(priority); + return job; + } +} \ No newline at end of file diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/Test.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/Test.scala new file mode 100644 index 0000000000..e834099400 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/Test.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.queue + +import java.util +import java.util.{PriorityQueue, Queue} + +case class PriorityFIFOQueue() { + case class QueueItem(item: Queue[String], priority: Int) + + import java.util.Comparator + + val cNode: Comparator[QueueItem] = new Comparator[QueueItem]() { + override def compare(o1: QueueItem, o2: QueueItem): Int = o2.priority - o1.priority + } + + private val queue = new PriorityQueue[QueueItem](cNode) + private var _size = 0 + private var _count: Long = 0L + + def size: Int = _size + + def isEmpty: Boolean = _size == 0 + + def enqueue(item: String, priority: Int): Unit = { + val deque = new util.ArrayDeque[String]() + deque.add(item) + queue.add(QueueItem(deque, priority)) + } + +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala new file mode 100644 index 0000000000..ba281e4798 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.util + +import org.apache.linkis.scheduler.util.SchedulerUtils.{ + getCreatorFromGroupName, + getEngineTypeFromGroupName, + getUserFromGroupName, + isSupportPriority +} + +import org.junit.jupiter.api.{Assertions, Test} + +class TestSchedulerUtils { + + @Test + def testIsSupportPriority: Unit = { + // set linkis.fifo.queue.support.priority.users=hadoop + // set linkis.fifo.queue.support.priority.creators=IDE or ALL_CREATORS + val bool: Boolean = isSupportPriority("IdE_haDoop_hive") + Assertions.assertEquals(true, bool) + } + + @Test + def testShellDangerCode: Unit = { + var groupName = "IDE_hadoop_hive" + var username: String = getUserFromGroupName(groupName) + var engineType: String = getEngineTypeFromGroupName(groupName) + var creator: String = getCreatorFromGroupName(groupName) + Assertions.assertEquals("hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("IDE", creator) + groupName = "APP_TEST_v_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("v_hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("APP_TEST", creator) + + groupName = "TEST_v_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("v_hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("TEST", creator) + + groupName = "APP_TEST_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("APP_TEST", creator) + } + +} diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala index eed2929c23..04ede15574 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala @@ -17,14 +17,12 @@ package org.apache.linkis.entrance +import org.apache.commons.lang3.StringUtils +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.linkis.common.exception.{ErrorException, LinkisException, LinkisRuntimeException} import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.entrance.conf.EntranceConfiguration -import org.apache.linkis.entrance.conf.EntranceConfiguration.{ - ENABLE_JOB_TIMEOUT_CHECK, - ENTRANCE_TASK_TIMEOUT -} import org.apache.linkis.entrance.cs.CSEntranceHelper import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._ import org.apache.linkis.entrance.exception.{EntranceErrorException, SubmitFailedException} @@ -35,13 +33,12 @@ import org.apache.linkis.entrance.utils.JobHistoryHelper import org.apache.linkis.governance.common.entity.job.JobRequest import org.apache.linkis.governance.common.utils.LoggerUtils import org.apache.linkis.protocol.constants.TaskConstant +import org.apache.linkis.protocol.utils.TaskUtils import org.apache.linkis.rpc.Sender +import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ENGINE_PRIORITY_RUNTIME_KEY, FIFO_QUEUE_STRATEGY, PFIFO_SCHEDULER_STRATEGY} import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState} import org.apache.linkis.server.conf.ServerConfiguration -import org.apache.commons.lang3.StringUtils -import org.apache.commons.lang3.exception.ExceptionUtils - import java.text.MessageFormat import java.util import java.util.concurrent.TimeUnit @@ -174,6 +171,19 @@ abstract class EntranceServer extends Logging { ) } + Utils.tryAndWarn{ + // 如果是使用优先级队列,设置下优先级 + val properties: util.Map[String, AnyRef] = TaskUtils.getRuntimeMap(params) + val fifoStrategy: String = FIFO_QUEUE_STRATEGY + if (PFIFO_SCHEDULER_STRATEGY.equalsIgnoreCase(fifoStrategy) && properties != null && !properties.isEmpty) { + val priorityValue: AnyRef = properties.get(ENGINE_PRIORITY_RUNTIME_KEY) + if (priorityValue != null) { + val value: Int = priorityValue.toString.toInt + job.setPriority(value) + } + } + } + getEntranceContext.getOrCreateScheduler().submit(job) val msg = LogUtils.generateInfo( s"Job with jobId : ${jobRequest.getId} and execID : ${job.getId()} submitted " diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala index ba6be619b6..8498cd6892 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala @@ -46,6 +46,12 @@ object HiveEngineConfiguration { val HIVE_RANGER_ENABLE = CommonVars[Boolean]("linkis.hive.ranger.enabled", false).getValue + val HIVE_ENGINE_CONN_JAVA_EXTRA_OPTS = CommonVars( + "wds.linkis.hive.engineConn.java.extraOpts", + "", + "Specify the option parameter of the java process (please modify it carefully!!!)" + ) + val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename" val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue" diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/launch/HiveProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/launch/HiveProcessEngineConnLaunchBuilder.scala index 3aa760f824..b2dfb5d9a8 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/launch/HiveProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/launch/HiveProcessEngineConnLaunchBuilder.scala @@ -17,9 +17,12 @@ package org.apache.linkis.engineplugin.hive.launch +import org.apache.linkis.engineplugin.hive.conf.HiveEngineConfiguration.HIVE_ENGINE_CONN_JAVA_EXTRA_OPTS import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder +import org.apache.commons.lang3.StringUtils + import java.util import com.google.common.collect.Lists @@ -34,4 +37,13 @@ class HiveProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuil Lists.newArrayList("JarUDFLoadECMHook") } + override protected def getExtractJavaOpts: String = { + val hiveExtraOpts: String = HIVE_ENGINE_CONN_JAVA_EXTRA_OPTS.getValue + if (StringUtils.isNotBlank(hiveExtraOpts)) { + super.getExtractJavaOpts + " " + hiveExtraOpts + } else { + super.getExtractJavaOpts + } + } + } From ec342a033dddf4cffa2ff041f8d6e73581b5b522 Mon Sep 17 00:00:00 2001 From: Yonghao Mei <73584269+mayinrain@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:01:17 +0800 Subject: [PATCH 07/15] Dev 1.10.0 webank (#687) * chore: 1.9.0 * feat: 1.10.0 --- linkis-web/package.json | 8 +- linkis-web/src/apps/PythonModule/.npmrc | 2 - linkis-web/src/apps/URM/i18n/common/en.json | 1 + linkis-web/src/apps/URM/i18n/common/zh.json | 1 + .../module/udfManagement/addFunctionModal.vue | 130 ++++++++++++++--- .../apps/URM/module/udfManagement/index.vue | 31 +++- .../src/apps/linkis/i18n/common/en.json | 1 + .../src/apps/linkis/i18n/common/zh.json | 1 + .../globalHistoryManagement/viewHistory.vue | 24 +++- .../apps/linkis/module/setting/setting.vue | 2 +- .../src/components/consoleComponent/term.vue | 133 ++++++++++++++++++ 11 files changed, 301 insertions(+), 33 deletions(-) delete mode 100644 linkis-web/src/apps/PythonModule/.npmrc create mode 100644 linkis-web/src/components/consoleComponent/term.vue diff --git a/linkis-web/package.json b/linkis-web/package.json index 12f4ca8b33..30481a2c1a 100644 --- a/linkis-web/package.json +++ b/linkis-web/package.json @@ -1,6 +1,6 @@ { "name": "linkis", - "version": "1.4.0", + "version": "1.10.0", "private": true, "scripts": { "serve": "vue-cli-service serve", @@ -25,7 +25,7 @@ }, "dependencies": { "@form-create/iview": "2.5.27", - "axios": "0.28.1", + "axios": "1.7.4", "dexie": "3.2.3", "dt-sql-parser": "3.0.5", "highlight.js": "10.7.0", @@ -49,7 +49,9 @@ "vue-router": "3.4.8", "vuedraggable": "2.24.3", "vuescroll": "4.16.1", - "worker-loader": "3.0.8" + "worker-loader": "3.0.8", + "xterm": "5.3.0", + "xterm-addon-fit": "0.8.0" }, "devDependencies": { "@intlify/vue-i18n-loader": "1.0.0", diff --git a/linkis-web/src/apps/PythonModule/.npmrc b/linkis-web/src/apps/PythonModule/.npmrc deleted file mode 100644 index dd4617f0fa..0000000000 --- a/linkis-web/src/apps/PythonModule/.npmrc +++ /dev/null @@ -1,2 +0,0 @@ -registry=http://wnpm.weoa.com:8001 -shamefully-hoist=true \ No newline at end of file diff --git a/linkis-web/src/apps/URM/i18n/common/en.json b/linkis-web/src/apps/URM/i18n/common/en.json index f6f8319cc8..5b4c887a00 100644 --- a/linkis-web/src/apps/URM/i18n/common/en.json +++ b/linkis-web/src/apps/URM/i18n/common/en.json @@ -60,6 +60,7 @@ "no": "No", "sprak": "sprak", "jarPackage": "jar Package", + "registerFunc": "Register Function", "scriptPath": "Script Path", "registerFunction": "Register Function", "registerFormat": "Register Format", diff --git a/linkis-web/src/apps/URM/i18n/common/zh.json b/linkis-web/src/apps/URM/i18n/common/zh.json index 942c66a7e2..2c6896869a 100644 --- a/linkis-web/src/apps/URM/i18n/common/zh.json +++ b/linkis-web/src/apps/URM/i18n/common/zh.json @@ -60,6 +60,7 @@ "no": "否", "sprak": "sprak", "jarPackage": "jar包", + "registerFunc": "注册函数", "scriptPath": "脚本路径", "registerFunction": "注册的函数", "registerFormat": "注册格式", diff --git a/linkis-web/src/apps/URM/module/udfManagement/addFunctionModal.vue b/linkis-web/src/apps/URM/module/udfManagement/addFunctionModal.vue index 8f12aee69e..1dd8b22966 100644 --- a/linkis-web/src/apps/URM/module/udfManagement/addFunctionModal.vue +++ b/linkis-web/src/apps/URM/module/udfManagement/addFunctionModal.vue @@ -96,6 +96,38 @@ fs-type="share" @set-node="setNodePath"/> + + + + + + @@ -288,6 +322,8 @@ export default { }, }, title: '', + registerFunctions: [], + registerFunctionEditable: true, show: false, btnLabel: this.$t('message.common.ok'), modalHeight: '', @@ -552,7 +588,11 @@ export default { }; } }, - + handleFuncChange(val) { + if(this.model !== 1) { + this.setting.name = val; + } + }, init() { let { name, shared, description, path, udfName, directory, udfType, registerFormat, load, useFormat } = this.node; let fnType = 'Spark' @@ -575,9 +615,26 @@ export default { fnType, defaultLoad: !!load }); - this.$nextTick(() => { + this.$nextTick(async () => { this.$set(this.setting, this.getTypes(), path); + this.registerFunctionEditable = false; + if(this.getTypes() !== 'jarPath') { + if (/^[\w\u4e00-\u9fa5:.\\/]*(py|scala)$/.test(path)) { + try { + this.registerFunctions = await this.getRegisterFunction(path); + if(this.registerFunctions.length !== 0) { + this.registerFunctionEditable = false; + } + } catch (err) { + window.console.error(err); + this.registerFunctions = []; + this.registerFunctionEditable = true; + } + } + + } }); + }, close() { @@ -594,27 +651,31 @@ export default { } else if (this.node.udfType === 1) { this.setting.pyPara = conver(',', ')', 'indexOf', 'lastIndexOf'); } else { - const type = rf.slice(rf.indexOf('[') + 1, rf.indexOf(']')); + const typeStart = rf.indexOf('[') + 1; + const typeEnd = rf.lastIndexOf(']'); + const type = rf.slice(typeStart, typeEnd); window.console.log(type, rf, '====='); - // 如果存在多个逗号,就只用使用格式来截取,否则会出现多个类型填入input异常的问题 - if (type.indexOf(',') !== type.lastIndexOf(',')) { - // there are 2 case: - // 1. tuple, return params in (); - // 2. multi params, the first params is return params - if (type.indexOf('(') !== -1) { - // tuple - this.setting.scalaTypeL = type.slice(type.indexOf('('), type.indexOf(')') + 1) - this.setting.scalaTypeR = type.slice(type.indexOf(')')+2) - } else { - // multi params - this.setting.scalaTypeL = type.split(',')[0]; - this.setting.scalaTypeR = type.split(',').slice(1).toString(); + const findFirstValidComma = (str) => { + let brackets = 0; + for (let i = 0; i < str.length; i++) { + const char = str[i]; + // 计算所有类型的括号 + if (char === '[' || char === '(') brackets++; + if (char === ']' || char === ')') brackets--; + // 只在最外层找逗号 + if (char === ',' && brackets === 0) return i; } + return -1; + }; - this.showScalaRF = this.node.registerFormat; + const commaPos = findFirstValidComma(type); + + if (commaPos !== -1) { + this.setting.scalaTypeL = type.slice(0, commaPos).trim(); + this.setting.scalaTypeR = type.slice(commaPos + 1).trim(); } else { - this.setting.scalaTypeL = conver('[', ',', 'indexOf', 'indexOf'); - this.setting.scalaTypeR = conver(',', ']', 'indexOf', 'indexOf'); + this.setting.scalaTypeL = type.trim(); + this.setting.scalaTypeR = ''; } this.setting.scalapara = conver(',', ')', 'lastIndexOf', 'lastIndexOf'); } @@ -635,7 +696,6 @@ export default { this.$set(this.setting, `${type}R`, right); } }, - handleShareChange() { if (!this.isShareLoading) { this.isShareLoading = true; @@ -662,10 +722,10 @@ export default { udfType: this.fnType, isLeaf: true, directory, - clusterName + clusterName, }; if (this.model) { - postData = Object.assign(postData, { shared: false }); + postData = Object.assign(postData, { shared: false, defaultLoad }); this.$emit('update', postData); } else { postData = Object.assign(postData, { defaultLoad }); @@ -692,12 +752,36 @@ export default { resize(h) { this.modalHeight = h - 380 + 'px'; }, + // 获取注册函数 + async getRegisterFunction(path) { + const res = await api.fetch('/udf/get-register-functions', { path }, 'get'); + return res.functions + }, - setNodePath(node) { + async setNodePath(node) { ['jarPath', 'sparkPath', 'customPath'].forEach(item => { const temp = item === this.getTypes() ? node.path : ''; this.$set(this.setting, item, temp); }) + if(this.getTypes() !== 'jarPath') { + + + if (/^[\w\u4e00-\u9fa5:.\\/]*(py|scala)$/.test(node.path)) { + try { + this.registerFunctions = await this.getRegisterFunction(node.path); + if(this.registerFunctions.length !== 0) { + this.registerFunctionEditable = false; + } + } catch (err) { + window.console.error(err); + this.registerFunctions = []; + this.registerFunctionEditable = true; + } + } + + + } + }, getTypes() { diff --git a/linkis-web/src/apps/URM/module/udfManagement/index.vue b/linkis-web/src/apps/URM/module/udfManagement/index.vue index 870b6ddf2c..fc28df901a 100644 --- a/linkis-web/src/apps/URM/module/udfManagement/index.vue +++ b/linkis-web/src/apps/URM/module/udfManagement/index.vue @@ -360,7 +360,7 @@ export default { } }, // 新增函数 - addFunction(data) { + async addFunction(data) { if (this.loading) return this.loading = true; const params = { @@ -377,19 +377,24 @@ export default { clusterName: data.clusterName, directory: data.directory } - api + await api .fetch('/udf/add', {udfAddVo: params}, 'post') .then(() => { this.showAddModal(false) this.search() this.isLoading = false this.loading = false + if (data.defaultLoad) { + this.confirmKillIdle(data) + } }) .catch(() => { // this.list = [{}] this.loading = false this.isLoading = false + }) + }, // 更新 updateFunction(data) { @@ -417,12 +422,34 @@ export default { this.loading = false this.search() this.$Message.success(this.$t('message.linkis.udf.success')); + + console.log(data); + if (data.defaultLoad) { + this.confirmKillIdle(data) + } }) .catch(() => { this.isLoading = false this.loading = false }) }, + confirmKillIdle(data) { + console.log(data); + this.$Modal.confirm({ + title: this.$t('message.linkis.setting.killEngineTitle'), + content: this.$t('message.linkis.setting.killEngine'), + onOk: async () => { + try { + api.fetch("/linkisManager/rm/killEngineByCreatorEngineType", { + creator: '*', + engineType: data.udfType === 1 ? 'spark' : '*' + }) + } catch (err) { + window.console.warn(err) + } + } + }) + }, checkChange(v) { this.list = this.list.map(it => { it.checked = !it.disabled && v diff --git a/linkis-web/src/apps/linkis/i18n/common/en.json b/linkis-web/src/apps/linkis/i18n/common/en.json index 0f036dfeda..d65c595d6e 100644 --- a/linkis-web/src/apps/linkis/i18n/common/en.json +++ b/linkis-web/src/apps/linkis/i18n/common/en.json @@ -1,6 +1,7 @@ { "message": { "linkis": { + "diagnosticLog": "Diagnostic Log", "refresh": "Refresh", "tableSetting": "Table Setting", "downloadLog": "Download", diff --git a/linkis-web/src/apps/linkis/i18n/common/zh.json b/linkis-web/src/apps/linkis/i18n/common/zh.json index 1207ff676c..d8949eead2 100644 --- a/linkis-web/src/apps/linkis/i18n/common/zh.json +++ b/linkis-web/src/apps/linkis/i18n/common/zh.json @@ -1,6 +1,7 @@ { "message": { "linkis": { + "diagnosticLog": "诊断日志", "refresh": "刷新", "findNewVer": "检测到新版本", "whetherUpdateNow": "是否立即更新?", diff --git a/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue b/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue index 56725cbf15..83e62d0002 100644 --- a/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue +++ b/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue @@ -1,3 +1,4 @@ + + + + \ No newline at end of file From 9f4133a11a4fead3b2eb4d86faf7e6fdf0a94eeb Mon Sep 17 00:00:00 2001 From: Yonghao Mei <73584269+mayinrain@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:35:26 +0800 Subject: [PATCH 08/15] Dev 1.10.0 webank (#688) * chore: 1.9.0 * feat: 1.10.0 * fix --- linkis-web/src/apps/URM/module/udfManagement/index.vue | 2 -- 1 file changed, 2 deletions(-) diff --git a/linkis-web/src/apps/URM/module/udfManagement/index.vue b/linkis-web/src/apps/URM/module/udfManagement/index.vue index fc28df901a..f6d738fee2 100644 --- a/linkis-web/src/apps/URM/module/udfManagement/index.vue +++ b/linkis-web/src/apps/URM/module/udfManagement/index.vue @@ -423,7 +423,6 @@ export default { this.search() this.$Message.success(this.$t('message.linkis.udf.success')); - console.log(data); if (data.defaultLoad) { this.confirmKillIdle(data) } @@ -434,7 +433,6 @@ export default { }) }, confirmKillIdle(data) { - console.log(data); this.$Modal.confirm({ title: this.$t('message.linkis.setting.killEngineTitle'), content: this.$t('message.linkis.setting.killEngine'), From 8b6fd5e571b1f7bcfbcb6c85a209965e20e62e80 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Fri, 20 Dec 2024 11:12:24 +0800 Subject: [PATCH 09/15] code format --- .../conf/SchedulerConfiguration.scala | 2 +- .../ParallelConsumerManager.scala | 4 +-- .../linkis/entrance/EntranceServer.scala | 19 ++++++++---- .../impl/LogPathCreateInterceptor.scala | 3 +- .../linkis/ujes/jdbc/UJESSQLResultSet.scala | 3 +- .../manager/am/conf/AMConfiguration.java | 3 +- .../manager/am/restful/EngineRestfulApi.java | 12 ++++---- .../external/yarn/YarnResourceRequester.java | 3 +- .../executor/HiveEngineConnExecutor.scala | 4 ++- .../monitor/scheduled/JobHistoryMonitor.java | 10 +++---- .../JobHistoryAnalyzeAlertSender.scala | 1 - .../StarrocksTimeKillAlertSender.scala | 3 +- .../jobtime/StarrocksTimeKillRule.scala | 6 ++-- .../HiveMetaWithPermissionServiceImpl.java | 7 ++--- .../restful/api/QueryRestfulApi.java | 11 +++---- .../apache/linkis/udf/api/UDFRestfulApi.java | 12 ++++---- .../org/apache/linkis/udf/utils/UdfUtils.java | 30 +++++++++---------- 17 files changed, 73 insertions(+), 60 deletions(-) diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala index 24e7d3c4f0..69c5ab4351 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala @@ -38,7 +38,7 @@ object SchedulerConfiguration { // support fifo pfifo val FIFO_QUEUE_STRATEGY = - CommonVars("linkis.fifo.queue.strategy", FIFO_SCHEDULER_STRATEGY).getValue + CommonVars("linkis.fifo.queue.strategy", "fifo").getValue val SUPPORT_PRIORITY_TASK_USERS = CommonVars("linkis.fifo.queue.support.priority.users", "").getValue diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala index b079f12006..777adc89e3 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala @@ -116,13 +116,13 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String) val newConsumer = createConsumer(groupName) val group = getSchedulerContext.getOrCreateGroupFactory.getGroup(groupName) newConsumer.setGroup(group) - val fifoQueueStrategy: String = FIFO_QUEUE_STRATEGY.toLowerCase() // 需要判断人员是否是指定部门 val consumerQueue: ConsumeQueue = if ( PFIFO_SCHEDULER_STRATEGY - .equals(fifoQueueStrategy) && isSupportPriority(groupName) + .equalsIgnoreCase(FIFO_QUEUE_STRATEGY) && isSupportPriority(groupName) ) { + logger.info(s"use priority queue: ${groupName}") new PriorityLoopArrayQueue(group) } else new LoopArrayQueue(group) newConsumer.setConsumeQueue(consumerQueue) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala index 04ede15574..3744b3c25c 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala @@ -17,8 +17,6 @@ package org.apache.linkis.entrance -import org.apache.commons.lang3.StringUtils -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.linkis.common.exception.{ErrorException, LinkisException, LinkisRuntimeException} import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{Logging, Utils} @@ -35,10 +33,17 @@ import org.apache.linkis.governance.common.utils.LoggerUtils import org.apache.linkis.protocol.constants.TaskConstant import org.apache.linkis.protocol.utils.TaskUtils import org.apache.linkis.rpc.Sender -import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ENGINE_PRIORITY_RUNTIME_KEY, FIFO_QUEUE_STRATEGY, PFIFO_SCHEDULER_STRATEGY} +import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ + ENGINE_PRIORITY_RUNTIME_KEY, + FIFO_QUEUE_STRATEGY, + PFIFO_SCHEDULER_STRATEGY +} import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState} import org.apache.linkis.server.conf.ServerConfiguration +import org.apache.commons.lang3.StringUtils +import org.apache.commons.lang3.exception.ExceptionUtils + import java.text.MessageFormat import java.util import java.util.concurrent.TimeUnit @@ -171,11 +176,15 @@ abstract class EntranceServer extends Logging { ) } - Utils.tryAndWarn{ + Utils.tryAndWarn { // 如果是使用优先级队列,设置下优先级 val properties: util.Map[String, AnyRef] = TaskUtils.getRuntimeMap(params) val fifoStrategy: String = FIFO_QUEUE_STRATEGY - if (PFIFO_SCHEDULER_STRATEGY.equalsIgnoreCase(fifoStrategy) && properties != null && !properties.isEmpty) { + if ( + PFIFO_SCHEDULER_STRATEGY.equalsIgnoreCase( + fifoStrategy + ) && properties != null && !properties.isEmpty + ) { val priorityValue: AnyRef = properties.get(ENGINE_PRIORITY_RUNTIME_KEY) if (priorityValue != null) { val value: Int = priorityValue.toString.toInt diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/LogPathCreateInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/LogPathCreateInterceptor.scala index 414c42fdd3..5ed8d88fe0 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/LogPathCreateInterceptor.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/LogPathCreateInterceptor.scala @@ -17,7 +17,6 @@ package org.apache.linkis.entrance.interceptor.impl -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.linkis.common.exception.ErrorException import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.entrance.interceptor.EntranceInterceptor @@ -25,6 +24,8 @@ import org.apache.linkis.entrance.interceptor.exception.LogPathCreateException import org.apache.linkis.entrance.parser.ParserUtils import org.apache.linkis.governance.common.entity.job.JobRequest +import org.apache.commons.lang3.exception.ExceptionUtils + /** * Description:Log path generation interceptor, used to set the path log of the task(日志路径生成拦截器, * 用于设置task的路径日志) diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala index 7a3fdf0f7e..be37a77f91 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala @@ -177,7 +177,8 @@ class UJESSQLResultSet( } val metaTmp = resultSetResult.getMetadata if (NULL_VALUE.equals(String.valueOf(metaTmp))) { - val fileContentList = resultSetResult.getFileContent.asInstanceOf[util.List[util.List[String]]] + val fileContentList = + resultSetResult.getFileContent.asInstanceOf[util.List[util.List[String]]] if (null != fileContentList) { resultSetMetaData.setColumnNameProperties(1, "linkis_string") resultSetMetaData.setDataTypeProperties(1, "String") diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java index 197e450ea3..903c58ada7 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java @@ -104,7 +104,8 @@ public class AMConfiguration { CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "trino,appconn,io_file,jdbc"); public static final CommonVars MULTI_USER_ENGINE_USER = CommonVars.apply("wds.linkis.multi.user.engine.user", getDefaultMultiEngineUser()); - public static final String UDF_KILL_ENGINE_TYPE = CommonVars.apply("linkis.udf.kill.engine.type", "spark,hive").getValue(); + public static final String UDF_KILL_ENGINE_TYPE = + CommonVars.apply("linkis.udf.kill.engine.type", "spark,hive").getValue(); public static final CommonVars ENGINE_LOCKER_MAX_TIME = CommonVars.apply("wds.linkis.manager.am.engine.locker.max.time", 1000 * 60 * 5); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java index 223c1b2c23..9041d3f4e4 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java @@ -772,16 +772,16 @@ public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody Jso } if (engineType.equals(Configuration.GLOBAL_CONF_SYMBOL())) { Arrays.stream(AMConfiguration.UDF_KILL_ENGINE_TYPE.split(",")) - .forEach( - engine -> - engineStopService.stopUnlockECByUserCreatorAndECType( - userName, creatorStr, engine)); + .forEach( + engine -> + engineStopService.stopUnlockECByUserCreatorAndECType( + userName, creatorStr, engine)); } else { - engineStopService.stopUnlockECByUserCreatorAndECType( - userName, creatorStr, engineType); + engineStopService.stopUnlockECByUserCreatorAndECType(userName, creatorStr, engineType); } return Message.ok("Kill engineConn succeed"); } + static ServiceInstance getServiceInstance(JsonNode jsonNode) throws AMErrorException { String applicationName = jsonNode.get("applicationName").asText(); String instance = jsonNode.get("instance").asText(); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java index 3195f1f2a8..048dc96e97 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java @@ -77,7 +77,8 @@ public NodeResource requestResourceInfo( String queueName = ((YarnResourceIdentifier) identifier).getQueueName(); if (queueName.startsWith(queuePrefix)) { - logger.info("Queue name {} starts with '{}', remove '{}'", queueName, queuePrefix, queuePrefix); + logger.info( + "Queue name {} starts with '{}', remove '{}'", queueName, queuePrefix, queuePrefix); queueName = queueName.substring(queuePrefix.length()); } String realQueueName = queuePrefix + queueName; diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index e8eccd35a7..d50e4096f9 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -274,7 +274,9 @@ class HiveEngineConnExecutor( if (numberOfMRJobs > 0) { engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do") val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) - engineExecutorContext.appendStdout(s"Your task will be submitted to the $queueName queue") + engineExecutorContext.appendStdout( + s"Your task will be submitted to the $queueName queue" + ) } if (thread.isInterrupted) { logger.error( diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java index ef02f182f2..08dca4214b 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java @@ -173,7 +173,7 @@ public void jobHistoryFinishedScan() { // 新增失败任务分析扫描 try { JobHistoryAnalyzeRule jobHistoryAnalyzeRule = - new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender()); + new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender()); scanner.addScanRule(jobHistoryAnalyzeRule); } catch (Exception e) { logger.warn("JobHistoryAnalyzeRule Scan Error msg: " + e.getMessage()); @@ -252,21 +252,21 @@ public void jdbcUnfinishedAlertScan() { @Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}") public void jdbcUnfinishedKillScan() { long id = - Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan")) - .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); long intervalMs = 7200 * 1000; long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; long endTime = System.currentTimeMillis(); long startTime = endTime - intervalMs; AnomalyScanner scanner = new DefaultScanner(); List fetchers = - JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); if (fetchers.isEmpty()) { logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); return; } StarrocksTimeKillRule starrocksTimeKillRule = - new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender()); + new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender()); scanner.addScanRule(starrocksTimeKillRule); JobMonitorUtils.run(scanner, fetchers, true); } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala index 517e854dec..2b17adb39b 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala @@ -28,7 +28,6 @@ import java.util import scala.collection.JavaConverters._ - class JobHistoryAnalyzeAlertSender() extends Observer with Logging { override def update(e: Event, jobHistroyList: scala.Any): Unit = {} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala index 67cbadd2e8..18553f228e 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala @@ -25,7 +25,6 @@ class StarrocksTimeKillAlertSender extends Observer with Logging { /** * Observer Pattern */ - override def update(e: Event, jobHistroyList: scala.Any): Unit = { - } + override def update(e: Event, jobHistroyList: scala.Any): Unit = {} } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala index dfcd934908..e5df6f3ff7 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala @@ -17,8 +17,6 @@ package org.apache.linkis.monitor.jobhistory.jobtime -import org.apache.commons.collections.MapUtils -import org.apache.commons.lang3.StringUtils import org.apache.linkis.common.utils.Logging import org.apache.linkis.monitor.constants.Constants import org.apache.linkis.monitor.core.ob.Observer @@ -27,8 +25,12 @@ import org.apache.linkis.monitor.jobhistory.entity.JobHistory import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils} import org.apache.linkis.server.BDPJettyServerHelper +import org.apache.commons.collections.MapUtils +import org.apache.commons.lang3.StringUtils + import java.util import java.util.Locale + import scala.collection.JavaConverters._ class StarrocksTimeKillRule(hitObserver: Observer) diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java index 700a8aab3a..88a4d93b6e 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java @@ -103,11 +103,8 @@ public List> getTablesByDbNameAndOptionalUserName( queryParam.withRoles(roles); List> hiveTables = hiveMetaDao.getTablesByDbNameAndUserAndRolesFromDbPrvs(queryParam); - hiveTables.addAll( - hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam)); - return hiveTables.stream() - .distinct() - .collect(Collectors.toList()); + hiveTables.addAll(hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam)); + return hiveTables.stream().distinct().collect(Collectors.toList()); } else { log.info("user {} to getTablesByDbName no permission control", queryParam.getUserName()); return hiveMetaDao.getTablesByDbName(queryParam); diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index af87acb575..f40d028726 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -774,13 +774,14 @@ public Message getUserKeyValue( parms.put("nonce", SHAUtils.DOCTOR_NONCE); // doctor提供的token String token = SHAUtils.DOCTOR_TOKEN.getValue(); - if (StringUtils.isNotBlank(token)){ + if (StringUtils.isNotBlank(token)) { String signature = + SHAUtils.Encrypt( SHAUtils.Encrypt( - SHAUtils.Encrypt( - parms.get("app_id") + SHAUtils.DOCTOR_NONCE + System.currentTimeMillis(), null) - + token, - null); + parms.get("app_id") + SHAUtils.DOCTOR_NONCE + System.currentTimeMillis(), + null) + + token, + null); parms.put("signature", signature); return Message.ok().data("doctor", parms); } else { diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java index 2edb76c15a..81a1b7755a 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java @@ -1440,14 +1440,14 @@ public Message pythonUpload( } @ApiImplicitParam( - name = "path", - dataType = "String", - value = "path", - example = "file:///test-dir/test-sub-dir/test1012_01.py") + name = "path", + dataType = "String", + value = "path", + example = "file:///test-dir/test-sub-dir/test1012_01.py") @RequestMapping(path = "/get-register-functions", method = RequestMethod.GET) public Message getRegisterFunctions(HttpServletRequest req, @RequestParam("path") String path) { if (StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_PY) - || StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_SCALA)) { + || StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_SCALA)) { if (StringUtils.startsWithIgnoreCase(path, StorageUtils$.MODULE$.FILE_SCHEMA())) { try { FsPath fsPath = new FsPath(path); @@ -1456,7 +1456,7 @@ public Message getRegisterFunctions(HttpServletRequest req, @RequestParam("path" fileSystem.init(null); if (fileSystem.canRead(fsPath)) { return Message.ok() - .data("functions", UdfUtils.getRegisterFunctions(fileSystem, fsPath, path)); + .data("functions", UdfUtils.getRegisterFunctions(fileSystem, fsPath, path)); } else { return Message.error("您没有权限访问该文件"); } diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java index bef01af34f..f7401fc1f4 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java @@ -19,7 +19,10 @@ import org.apache.linkis.common.conf.Configuration; import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.utils.JsonUtils; import org.apache.linkis.common.utils.Utils; +import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageUtils$; import org.apache.linkis.udf.conf.Constants; import org.apache.linkis.udf.exception.UdfException; @@ -29,10 +32,6 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.linkis.common.utils.JsonUtils; -import org.apache.linkis.storage.fs.FileSystem; -import org.apache.linkis.storage.utils.StorageUtils$; -import com.fasterxml.jackson.core.type.TypeReference; import org.springframework.web.multipart.MultipartFile; @@ -47,6 +46,7 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import com.fasterxml.jackson.core.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -286,7 +286,7 @@ public static List extractDependencies(String content, String name) { } public static List getRegisterFunctions(FileSystem fileSystem, FsPath fsPath, String path) - throws Exception { + throws Exception { try (InputStream is = fileSystem.read(fsPath)) { // 将inputstream内容转换为字符串 String content = IOUtils.toString(is, StandardCharsets.UTF_8); @@ -322,17 +322,17 @@ public static List extractScalaMethodNames(String scalaCode) { public static List extractPythonMethodNames(String udfPath) throws Exception { String localPath = udfPath.replace(StorageUtils$.MODULE$.FILE_SCHEMA(), ""); String exec = - Utils.exec( - (new String[] { - Constants.PYTHON_COMMAND.getValue(), - Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py", - localPath - })); + Utils.exec( + (new String[] { + Constants.PYTHON_COMMAND.getValue(), + Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py", + localPath + })); logger.info( - "execute python script to get python method name...{} {} {}", - Constants.PYTHON_COMMAND.getValue(), - Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py", - localPath); + "execute python script to get python method name...{} {} {}", + Constants.PYTHON_COMMAND.getValue(), + Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py", + localPath); // 将exec转换为List,exec为一个json数组 return JsonUtils.jackson().readValue(exec, new TypeReference>() {}); } From b9283d17bd6f63421a2a4171567c7d975908ab53 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Fri, 20 Dec 2024 15:37:22 +0800 Subject: [PATCH 10/15] code optimization --- .../scala/org/apache/linkis/entrance/EntranceServer.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala index 3744b3c25c..3ea8eab7eb 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala @@ -178,7 +178,10 @@ abstract class EntranceServer extends Logging { Utils.tryAndWarn { // 如果是使用优先级队列,设置下优先级 - val properties: util.Map[String, AnyRef] = TaskUtils.getRuntimeMap(params) + val configMap = params + .getOrDefault(TaskConstant.PARAMS, new util.HashMap[String, AnyRef]()) + .asInstanceOf[util.Map[String, AnyRef]] + val properties: util.Map[String, AnyRef] = TaskUtils.getRuntimeMap(configMap) val fifoStrategy: String = FIFO_QUEUE_STRATEGY if ( PFIFO_SCHEDULER_STRATEGY.equalsIgnoreCase( From 7dd46d2ae51556f2c54b50e8374dcd4e4488e3b8 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Fri, 20 Dec 2024 16:35:35 +0800 Subject: [PATCH 11/15] deal with double --- .../linkis/entrance/EntranceServer.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala index 3ea8eab7eb..3937c497ad 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala @@ -190,7 +190,7 @@ abstract class EntranceServer extends Logging { ) { val priorityValue: AnyRef = properties.get(ENGINE_PRIORITY_RUNTIME_KEY) if (priorityValue != null) { - val value: Int = priorityValue.toString.toInt + val value: Int = getPriority(priorityValue.toString) job.setPriority(value) } } @@ -316,6 +316,23 @@ abstract class EntranceServer extends Logging { startTimeOutCheck() } + val DOT = "." + val DEFAULT_PRIORITY = 100 + + private def getPriority(value: String): Int = { + var priority: Int = -1 + Utils.tryAndWarn({ + priority = + if (value.contains(DOT)) value.substring(0, value.indexOf(DOT)).toInt else value.toInt + }) + if (priority < 0 || priority > Integer.MAX_VALUE - 1) { + logger.warn(s"illegal queue priority: ${value}") + DEFAULT_PRIORITY + } else { + priority + } + } + } object EntranceServer { From 1db5e456b02ad421dc2c33a1143e20c636ab7ac3 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Fri, 20 Dec 2024 17:38:23 +0800 Subject: [PATCH 12/15] fix label npe --- .../monitor/EngineConnMonitor.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala index e89193418a..751bb2d1b2 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala @@ -22,28 +22,21 @@ import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.governance.common.entity.{ExecutionNodeStatus, NodeExistStatus} -import org.apache.linkis.governance.common.protocol.engineconn.{ - RequestEngineStatusBatch, - ResponseEngineStatusBatch -} +import org.apache.linkis.governance.common.protocol.engineconn.{RequestEngineStatusBatch, ResponseEngineStatusBatch} import org.apache.linkis.governance.common.utils.GovernanceConstant import org.apache.linkis.manager.common.entity.enumeration.NodeStatus import org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus, ResponseNodeStatus} import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf import org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor, EngineConnTaskInfo} -import org.apache.linkis.orchestrator.listener.task.{ - TaskErrorResponseEvent, - TaskLogEvent, - TaskStatusEvent -} +import org.apache.linkis.orchestrator.listener.task.{TaskErrorResponseEvent, TaskLogEvent, TaskStatusEvent} import org.apache.linkis.rpc.Sender -import org.apache.linkis.server.{toJavaMap, BDPJettyServerHelper} +import org.apache.linkis.server.{BDPJettyServerHelper, toJavaMap} import java.util import java.util.concurrent.TimeUnit - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -203,7 +196,12 @@ object EngineConnMonitor extends Logging { val execTask = executor.getExecTask Utils.tryAndError { val labels: Array[Label[_]] = executor.getEngineConnExecutor.getLabels() - val engineType: String = LabelUtil.getEngineTypeLabel(labels.toList.asJava).getEngineType + val engineTypeKey = "engineType" + val labelArray: Array[Label[_]] = labels.filter(_.getLabelKey.equals(engineTypeKey)) + var engineType = "" + if (labelArray != null && labelArray.size > 0) { + engineType = labelArray(0).asInstanceOf[EngineTypeLabel].getEngineType + } logger.warn( s"Will kill task ${execTask.getIDInfo()} because the engine ${executor.getEngineConnExecutor.getServiceInstance.toString} quited unexpectedly." ) From e7f4ff1f7a2744b098f0f0cc60d6c44264ea880f Mon Sep 17 00:00:00 2001 From: Yonghao Mei <73584269+mayinrain@users.noreply.github.com> Date: Mon, 23 Dec 2024 15:49:11 +0800 Subject: [PATCH 13/15] Dev 1.10.0 webank (#689) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: 1.9.0 * feat: 1.10.0 * fix * upd: 新增loading展示 --- .../globalHistoryManagement/viewHistory.vue | 5 ++++- .../src/components/consoleComponent/term.vue | 22 ++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue b/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue index 83e62d0002..2f65033523 100644 --- a/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue +++ b/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue @@ -49,7 +49,7 @@ :visualParams="visualParams" /> - +