diff --git a/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java b/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java new file mode 100644 index 0000000000..62191aa20e --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/main/java/org/apache/linkis/scheduler/util/SchedulerUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.util; + +import org.apache.linkis.scheduler.conf.SchedulerConfiguration; + +import org.apache.commons.lang3.StringUtils; + +public class SchedulerUtils { + private static final String EVENT_ID_SPLIT = "_"; + private static final String ALL_CREATORS = "ALL_CREATORS"; + private static final String SPACIAL_USER_SPLIT = "_v_"; + + /** + * support priority queue with config username or creator + * + * @param groupName + * @return + */ + public static boolean isSupportPriority(String groupName) { + String users = SchedulerConfiguration.SUPPORT_PRIORITY_TASK_USERS(); + if (StringUtils.isEmpty(users)) { + return false; + } + String userName = getUserFromGroupName(groupName); + if (StringUtils.isEmpty(userName)) { + return false; + } + String creators = SchedulerConfiguration.SUPPORT_PRIORITY_TASK_CREATORS(); + creators = creators.toLowerCase(); + users = users.toLowerCase(); + if (ALL_CREATORS.equalsIgnoreCase(creators)) { + return users.contains(userName.toLowerCase()); + } else { + String creatorName = getCreatorFromGroupName(groupName); + return users.contains(userName.toLowerCase()) && creators.contains(creatorName.toLowerCase()); + } + } + + public static String getUserFromGroupName(String groupName) { + if (groupName.contains(SPACIAL_USER_SPLIT)) { + int vIndex = groupName.lastIndexOf(SPACIAL_USER_SPLIT); + int lastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT); + String user = groupName.substring(vIndex + 1, lastIndex); + return user; + } + String[] groupNames = groupName.split(EVENT_ID_SPLIT); + String user = groupNames[groupNames.length - 2]; + return user; + } + + public static String getEngineTypeFromGroupName(String groupName) { + String[] groupNames = groupName.split(EVENT_ID_SPLIT); + String ecType = groupNames[groupNames.length - 1]; + return ecType; + } + + public static String getCreatorFromGroupName(String groupName) { + if (groupName.contains(SPACIAL_USER_SPLIT)) { + int vIndex = groupName.lastIndexOf(SPACIAL_USER_SPLIT); + String creatorName = groupName.substring(0, vIndex); + return creatorName; + } + int lastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT); + int secondLastIndex = groupName.lastIndexOf(EVENT_ID_SPLIT, lastIndex - 1); + String creatorName = groupName.substring(0, secondLastIndex); + return creatorName; + } +} diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala index e3b76ac4e7..69c5ab4351 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala @@ -36,4 +36,21 @@ object SchedulerConfiguration { val MAX_GROUP_ALTER_WAITING_SIZE = CommonVars("linkis.fifo.consumer.group.max.alter.waiting.size", 1000).getValue + // support fifo pfifo + val FIFO_QUEUE_STRATEGY = + CommonVars("linkis.fifo.queue.strategy", "fifo").getValue + + val SUPPORT_PRIORITY_TASK_USERS = + CommonVars("linkis.fifo.queue.support.priority.users", "").getValue + + val SUPPORT_PRIORITY_TASK_CREATORS = + CommonVars("linkis.fifo.queue.support.priority.creators", "ALL_CREATORS").getValue + + val MAX_PRIORITY_QUEUE_CACHE_SIZE = + CommonVars("linkis.fifo.priority.queue.max.cache.size", 1000).getValue + + val ENGINE_PRIORITY_RUNTIME_KEY = "wds.linkis.engine.runtime.priority" + + val PFIFO_SCHEDULER_STRATEGY = "pfifo" + val FIFO_SCHEDULER_STRATEGY = "fifo" } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala new file mode 100644 index 0000000000..fd3fecc71b --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueue.scala @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.queue + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.scheduler.conf.SchedulerConfiguration + +import java.util +import java.util.Comparator +import java.util.concurrent.PriorityBlockingQueue +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantReadWriteLock + +/** + * 优先级队列元素 + * @param element + * 实际元素 + * @param priority + * 优先级 + * @param index + * 唯一索引 + */ +case class PriorityQueueElement(element: Any, priority: Int, index: Int) + +/** + * 固定大小集合,元素满后会移除最先插入集合的元素 + * @param maxSize + * 集合大小 + * @tparam K + * @tparam V + */ +class FixedSizeCollection[K, V](val maxSize: Int) extends util.LinkedHashMap[K, V] { + // 当集合大小超过最大值时,返回true,自动删除最老的元素 + protected override def removeEldestEntry(eldest: util.Map.Entry[K, V]): Boolean = size > maxSize +} + +/** + * 优先级队列,优先级相同时先进先出 + * @param group + */ +class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging { + + private val maxCapacity = group.getMaximumCapacity + + /** 优先级队列 */ + private val priorityEventQueue = new PriorityBlockingQueue[PriorityQueueElement]( + group.getMaximumCapacity, + new Comparator[PriorityQueueElement] { + + override def compare(o1: PriorityQueueElement, o2: PriorityQueueElement): Int = + if (o1.priority != o2.priority) o2.priority - o1.priority + else o1.index - o2.index + + } + ) + + /** 累加器 1.越先进队列值越小,优先级相同时控制先进先出 2.队列元素唯一索引,不会重复 */ + private val index = new AtomicInteger + + /** 记录队列中当前所有元素索引,元素存入优先级队列时添加,从优先级队列移除时删除 */ + private val indexMap = new util.HashMap[Int, SchedulerEvent]() + + /** 记录已经消费的元素,会有固定缓存大小,默认1000,元素从优先级队列移除时添加 */ + private val fixedSizeCollection = + new FixedSizeCollection[Integer, SchedulerEvent]( + SchedulerConfiguration.MAX_PRIORITY_QUEUE_CACHE_SIZE + ) + + private val rwLock = new ReentrantReadWriteLock + + protected[this] var realSize = size + override def isEmpty: Boolean = size <= 0 + override def isFull: Boolean = size >= maxCapacity + def size: Int = priorityEventQueue.size + + /** + * 将元素添加进队列 + * @param element + * @return + */ + private def addToPriorityQueue(element: PriorityQueueElement): Boolean = { + priorityEventQueue.offer(element) + rwLock.writeLock.lock + Utils.tryFinally(indexMap.put(element.index, element.element.asInstanceOf[SchedulerEvent]))( + rwLock.writeLock.unlock() + ) + true + } + + /** + * 从队列中获取并移除元素 + * @return + */ + private def getAndRemoveTop: SchedulerEvent = { + val top: PriorityQueueElement = priorityEventQueue.take() + rwLock.writeLock.lock + Utils.tryFinally { + indexMap.remove(top.index) + fixedSizeCollection.put(top.index, top.element.asInstanceOf[SchedulerEvent]) + }(rwLock.writeLock.unlock()) + top.element.asInstanceOf[SchedulerEvent] + } + + override def remove(event: SchedulerEvent): Unit = { + get(event).foreach(x => x.cancel()) + } + + override def getWaitingEvents: Array[SchedulerEvent] = { + toIndexedSeq + .filter(x => + x.getState.equals(SchedulerEventState.Inited) || x.getState + .equals(SchedulerEventState.Scheduled) + ) + .toArray + } + + override def clearAll(): Unit = priorityEventQueue synchronized { + realSize = 0 + index.set(0) + priorityEventQueue.clear() + fixedSizeCollection.clear() + indexMap.clear() + } + + override def get(event: SchedulerEvent): Option[SchedulerEvent] = { + val eventSeq = toIndexedSeq.filter(x => x.getId.equals(event.getId)).seq + if (eventSeq.size > 0) Some(eventSeq(0)) else None + } + + /** + * 根据索引获取队列元素 + * @param index + * @return + */ + override def get(index: Int): Option[SchedulerEvent] = { + if (!indexMap.containsKey(index) && !fixedSizeCollection.containsKey(index)) { + throw new IllegalArgumentException( + "The index " + index + " has already been deleted, now index must be better than " + index + ) + } + rwLock.readLock().lock() + Utils.tryFinally { + if (fixedSizeCollection.get(index) != null) Option(fixedSizeCollection.get(index)) + else Option(indexMap.get(index)) + }(rwLock.readLock().unlock()) + } + + override def getGroup: Group = group + + override def setGroup(group: Group): Unit = { + this.group = group + } + + def toIndexedSeq: IndexedSeq[SchedulerEvent] = if (size == 0) { + IndexedSeq.empty[SchedulerEvent] + } else { + priorityEventQueue + .toArray() + .map(_.asInstanceOf[PriorityQueueElement].element.asInstanceOf[SchedulerEvent]) + .toIndexedSeq + } + + def add(event: SchedulerEvent): Int = { + // 每次添加的时候需要给计数器+1,优先级相同时,控制先进先出 + event.setIndex(index.addAndGet(1)) + addToPriorityQueue(PriorityQueueElement(event, event.getPriority, event.getIndex)) + event.getIndex + } + + override def waitingSize: Int = size + + /** + * Add one, if the queue is full, it will block until the queue is + * available(添加一个,如果队列满了,将会一直阻塞,直到队列可用) + * + * @return + * Return index subscript(返回index下标) + */ + override def put(event: SchedulerEvent): Int = { + add(event) + } + + /** + * Add one, return None if the queue is full(添加一个,如果队列满了,返回None) + * + * @return + */ + override def offer(event: SchedulerEvent): Option[Int] = { + if (isFull) None else Some(add(event)) + } + + /** + * Get the latest SchedulerEvent of a group, if it does not exist, it will block + * [
(获取某个group最新的SchedulerEvent,如果不存在,就一直阻塞
) This method will move the pointer(该方法会移动指针) + * + * @return + */ + override def take(): SchedulerEvent = { + getAndRemoveTop + } + + /** + * Get the latest SchedulerEvent of a group, if it does not exist, block the maximum waiting + * time
(获取某个group最新的SchedulerEvent,如果不存在,就阻塞到最大等待时间
) This method will move the + * pointer(该方法会移动指针) + * @param mills + * Maximum waiting time(最大等待时间) + * @return + */ + override def take(mills: Long): Option[SchedulerEvent] = { + if (waitingSize == 0) { + Thread.sleep(mills) + } + if (waitingSize == 0) None else Option(getAndRemoveTop) + } + + /** + * Get the latest SchedulerEvent of a group and move the pointer to the next one. If not, return + * directly to None 获取某个group最新的SchedulerEvent,并移动指针到下一个。如果没有,直接返回None + * + * @return + */ + override def poll(): Option[SchedulerEvent] = { + if (waitingSize == 0) None + else Option(getAndRemoveTop) + } + + /** + * Only get the latest SchedulerEvent of a group, and do not move the pointer. If not, return + * directly to None 只获取某个group最新的SchedulerEvent,并不移动指针。如果没有,直接返回None + * + * @return + */ + override def peek(): Option[SchedulerEvent] = { + val ele: PriorityQueueElement = priorityEventQueue.peek() + if (ele == null) None else Option(ele.element.asInstanceOf[SchedulerEvent]) + } + + /** + * Get the latest SchedulerEvent whose group satisfies the condition and does not move the + * pointer. If not, return directly to None 获取某个group满足条件的最新的SchedulerEvent,并不移动指针。如果没有,直接返回None + * @param op + * 满足的条件 + * @return + */ + override def peek(op: (SchedulerEvent) => Boolean): Option[SchedulerEvent] = { + val ele: PriorityQueueElement = priorityEventQueue.peek() + if (ele == null) return None + val event: Option[SchedulerEvent] = Option( + priorityEventQueue.peek().element.asInstanceOf[SchedulerEvent] + ) + if (op(event.get)) event else None + } + +} diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala index 4f384d2384..3e87a06930 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala @@ -32,9 +32,13 @@ trait SchedulerEvent extends Logging { protected var scheduledTime: Long = 0L protected var startTime: Long = 0L protected var endTime: Long = 0L + protected var priority: Int = 100 + protected var index: Int = 0 def getEndTime: Long = endTime def getStartTime: Long = startTime + def getPriority: Int = priority + def getIndex: Int = index /* * To be compatible with old versions. @@ -50,6 +54,14 @@ trait SchedulerEvent extends Logging { this synchronized notify() } + def setPriority(priority: Int): Unit = { + this.priority = priority + } + + def setIndex(index: Int): Unit = { + this.index = index + } + def turnToScheduled(): Boolean = if (!isWaiting) { false } else { diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala index e95e172e06..02091e4f79 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala @@ -19,10 +19,11 @@ package org.apache.linkis.scheduler.queue.fifoqueue import org.apache.linkis.common.utils.Utils import org.apache.linkis.scheduler.SchedulerContext +import org.apache.linkis.scheduler.conf.SchedulerConfiguration.FIFO_QUEUE_STRATEGY import org.apache.linkis.scheduler.errorcode.LinkisSchedulerErrorCodeSummary._ import org.apache.linkis.scheduler.exception.SchedulerErrorException import org.apache.linkis.scheduler.listener.ConsumerListener -import org.apache.linkis.scheduler.queue.{Consumer, ConsumerManager, Group, LoopArrayQueue} +import org.apache.linkis.scheduler.queue._ import java.text.MessageFormat import java.util.concurrent.{ExecutorService, ThreadPoolExecutor} @@ -34,7 +35,7 @@ class FIFOConsumerManager(groupName: String) extends ConsumerManager { private var group: Group = _ private var executorService: ThreadPoolExecutor = _ private var consumerListener: ConsumerListener = _ - private var consumerQueue: LoopArrayQueue = _ + private var consumerQueue: ConsumeQueue = _ private var consumer: Consumer = _ override def setSchedulerContext(schedulerContext: SchedulerContext): Unit = { diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala index c64158e6e8..777adc89e3 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala @@ -19,9 +19,14 @@ package org.apache.linkis.scheduler.queue.parallelqueue import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils} import org.apache.linkis.scheduler.conf.SchedulerConfiguration +import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ + FIFO_QUEUE_STRATEGY, + PFIFO_SCHEDULER_STRATEGY +} import org.apache.linkis.scheduler.listener.ConsumerListener import org.apache.linkis.scheduler.queue._ import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer +import org.apache.linkis.scheduler.util.SchedulerUtils.isSupportPriority import java.util.concurrent.{ExecutorService, TimeUnit} @@ -111,7 +116,16 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String) val newConsumer = createConsumer(groupName) val group = getSchedulerContext.getOrCreateGroupFactory.getGroup(groupName) newConsumer.setGroup(group) - newConsumer.setConsumeQueue(new LoopArrayQueue(group)) + // 需要判断人员是否是指定部门 + val consumerQueue: ConsumeQueue = + if ( + PFIFO_SCHEDULER_STRATEGY + .equalsIgnoreCase(FIFO_QUEUE_STRATEGY) && isSupportPriority(groupName) + ) { + logger.info(s"use priority queue: ${groupName}") + new PriorityLoopArrayQueue(group) + } else new LoopArrayQueue(group) + newConsumer.setConsumeQueue(consumerQueue) consumerListener.foreach(_.onConsumerCreated(newConsumer)) newConsumer.start() newConsumer diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java new file mode 100644 index 0000000000..f0a8c82696 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/PriorityLoopArrayQueueTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.queue; + +import org.apache.linkis.scheduler.queue.fifoqueue.FIFOGroup; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import scala.Option; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +class PriorityLoopArrayQueueTest { + AtomicInteger productCounter = new AtomicInteger(); + AtomicInteger consumerCounter = new AtomicInteger(); + + @Test + public void testConcurrentPutAndTake() throws Exception { + AtomicInteger counter = new AtomicInteger(); + FIFOGroup group = new FIFOGroup("test", 5000, 5000); + PriorityLoopArrayQueue queue = new PriorityLoopArrayQueue(group); + + // 获取开始时间的毫秒数 + long startTime = System.currentTimeMillis(); + // 三分钟的毫秒数 + long threeMinutesInMillis = 1 * 30 * 1000; + int genLen = 5; + int getLen = 7; + final CountDownLatch latch = new CountDownLatch(genLen + getLen + 1); + // 5 个生产者 + for (int i = 0; i < genLen; i++) { + final int id = i; + new Thread(() -> { + try{ + Thread.sleep(100 * id); + latch.countDown(); + latch.await(); + } catch (InterruptedException e){ + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName() + "开始生产:"); + while ((System.currentTimeMillis() - startTime) < threeMinutesInMillis) { + //生产 + try { + Thread.sleep(getRandom(200)); + product(counter, queue); + product(counter, queue); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + //消费 + //consume(queue); + } + System.out.println(Thread.currentThread().getName() + "结束生产:"); + }, "生产t-" + i).start(); + } + // 5 个消费者 + for (int i = 0; i < getLen; i++) { + final int id = i; + new Thread(() -> { + try{ + Thread.sleep(getRandom(200)); + latch.countDown(); + latch.await(); + } catch (InterruptedException e){ + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName() + "开始消费:"); + while (true) { + try { + Thread.sleep(getRandom(200)); + //消费 + consume(queue); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + }, "消费t-" + i).start(); + } + new Thread(() -> { + try { + Thread.sleep(100); + latch.countDown(); + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName() + "开始获取当前队列元素:"); + while ((System.currentTimeMillis() - startTime) < threeMinutesInMillis * 2) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("生产大小:" + productCounter.get()); + System.out.println("消费大小:" + consumerCounter.get()); + System.out.println("队列当前大小:" + queue.size()); + // 需要 去掉私有测试 + //System.out.println("index size: " + queue.indexMap().size()); + //System.out.println("cache size: " + queue.fixedSizeCollection().size()); + } + }).start(); + Thread.sleep(threeMinutesInMillis * 2); + System.out.println("product:" + productCounter.get() + ", consumer: " + consumerCounter.get()); + // 需要 去掉私有测试 + //Assertions.assertEquals(1000, queue.fixedSizeCollection().size()); + Assertions.assertEquals(productCounter.get(), consumerCounter.get()); + } + + //消费 + private void consume(PriorityLoopArrayQueue queue) { + SchedulerEvent take = null; + try { + take = queue.take(); + consumerCounter.addAndGet(1); + } catch (Exception e) { + throw new RuntimeException(e); + } + printEvent("消费" , take); + } + + //生产 + private void product(AtomicInteger counter, PriorityLoopArrayQueue queue) { + int i1 = counter.addAndGet(1); + //1000-重要,100-普通,10-不重要 + int[] proArr = {1000, 100, 10}; + int priority = getRandom(3); + String name = "item-" + i1 + "-" + priority; + System.out.println("生产:" + name); + Option offer = queue.offer(getJob(name, proArr[priority])); + if (offer.nonEmpty()) { + productCounter.addAndGet(1); + Option schedulerEventOption = queue.get((int) offer.get()); + printEvent("get:", schedulerEventOption.get()); + } else { + System.out.println("当前队列已满,大小:" + queue.size()); + } + } + @Test + void testFinally() { + + } + @Test + void enqueue() { + // 压测 offer take get + FIFOGroup group = new FIFOGroup("test", 100, 100); + PriorityLoopArrayQueue queue = new PriorityLoopArrayQueue(group); + Option idx = queue.offer(getJob("job1-1", 1)); + //插入测试 + Assertions.assertEquals(1, (int)idx.get()); + queue.offer(getJob("job2", 2)); + queue.offer(getJob("job3", 3)); + queue.offer(getJob("job1-2", 1)); + queue.offer(getJob("job5", 5)); + queue.offer(getJob("item1-3", 1)); + queue.offer(getJob("item6-1", 6)); + queue.offer(getJob("item4", 4)); + queue.offer(getJob("item6-2", 6)); + //peek 测试 + Option peek = queue.peek(); + Assertions.assertEquals("item6-1", peek.get().getId()); + while (queue.size() > 1) { + queue.take(); + } + SchedulerEvent event = queue.take(); + //优先级,以及先进先出测试 + Assertions.assertEquals("item1-3", event.getId()); + Assertions.assertEquals(1, event.priority()); + Assertions.assertEquals(6, event.getIndex()); + //缓存测试,需要设置 linkis.fifo.priority.queue.max.cache.size 为 5 + Assertions.assertThrows(IllegalArgumentException.class, () -> {queue.get(7);}); + + } + + private void printEvent(String opt, SchedulerEvent event) { + System.out.println("【" + Thread.currentThread().getName() + "】" + opt + ":" + event.getId() + ", priority: " + event.getPriority() + ", index: " + event.getIndex()); + } + private int getRandom(int bound){ + Random rand = new Random(); + int res = rand.nextInt(bound); + return res; + } + private UserJob getJob(String name, int priority) { + UserJob job = new UserJob(); + job.setId(name); + job.setPriority(priority); + return job; + } +} \ No newline at end of file diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/Test.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/Test.scala new file mode 100644 index 0000000000..e834099400 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/Test.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.queue + +import java.util +import java.util.{PriorityQueue, Queue} + +case class PriorityFIFOQueue() { + case class QueueItem(item: Queue[String], priority: Int) + + import java.util.Comparator + + val cNode: Comparator[QueueItem] = new Comparator[QueueItem]() { + override def compare(o1: QueueItem, o2: QueueItem): Int = o2.priority - o1.priority + } + + private val queue = new PriorityQueue[QueueItem](cNode) + private var _size = 0 + private var _count: Long = 0L + + def size: Int = _size + + def isEmpty: Boolean = _size == 0 + + def enqueue(item: String, priority: Int): Unit = { + val deque = new util.ArrayDeque[String]() + deque.add(item) + queue.add(QueueItem(deque, priority)) + } + +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala new file mode 100644 index 0000000000..ba281e4798 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/util/TestSchedulerUtils.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.scheduler.util + +import org.apache.linkis.scheduler.util.SchedulerUtils.{ + getCreatorFromGroupName, + getEngineTypeFromGroupName, + getUserFromGroupName, + isSupportPriority +} + +import org.junit.jupiter.api.{Assertions, Test} + +class TestSchedulerUtils { + + @Test + def testIsSupportPriority: Unit = { + // set linkis.fifo.queue.support.priority.users=hadoop + // set linkis.fifo.queue.support.priority.creators=IDE or ALL_CREATORS + val bool: Boolean = isSupportPriority("IdE_haDoop_hive") + Assertions.assertEquals(true, bool) + } + + @Test + def testShellDangerCode: Unit = { + var groupName = "IDE_hadoop_hive" + var username: String = getUserFromGroupName(groupName) + var engineType: String = getEngineTypeFromGroupName(groupName) + var creator: String = getCreatorFromGroupName(groupName) + Assertions.assertEquals("hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("IDE", creator) + groupName = "APP_TEST_v_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("v_hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("APP_TEST", creator) + + groupName = "TEST_v_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("v_hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("TEST", creator) + + groupName = "APP_TEST_hadoop_hive" + username = getUserFromGroupName(groupName) + engineType = getEngineTypeFromGroupName(groupName) + creator = getCreatorFromGroupName(groupName) + Assertions.assertEquals("hadoop", username) + Assertions.assertEquals("hive", engineType) + Assertions.assertEquals("APP_TEST", creator) + } + +} diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala index 4a13da3c25..35a1a1ac58 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala @@ -31,6 +31,7 @@ import org.apache.linkis.engineconn.acessible.executor.listener.event.{ import org.apache.linkis.engineconn.acessible.executor.log.LogHelper import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf import org.apache.linkis.engineconn.computation.executor.cs.CSTableResultSetWriter +import org.apache.linkis.engineconn.core.EngineConnObject import org.apache.linkis.engineconn.executor.ExecutorExecutionContext import org.apache.linkis.engineconn.executor.entity.Executor import org.apache.linkis.engineconn.executor.listener.{ @@ -191,15 +192,28 @@ class EngineExecutionContext(executor: ComputationExecutor, executorUser: String } else { var taskLog = log val limitLength = ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue - if ( - ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue && - log.length > limitLength - ) { - taskLog = s"${log.substring(0, limitLength)}..." - logger.info("The log is too long and will be intercepted,log limit length : {}", limitLength) + val limitEnableObj = + properties.get(ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.key) + val limitEnable = + if (limitEnableObj == null) { + ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue + } else { + limitEnableObj.toString.toBoolean + } + if (limitEnable) { + if (log.length > limitLength) { + taskLog = s"${log.substring(0, limitLength)}..." + logger.info( + "The log is too long and will be intercepted,log limit length : {}", + limitLength + ) + } } if (!AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue) { - LogHelper.cacheLog(taskLog) + val taskLogs = taskLog.split("\n") + taskLogs.foreach(line => { + LogHelper.cacheLog(line) + }) } else { val listenerBus = getEngineSyncListenerBus getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, taskLog))) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala index eed2929c23..3937c497ad 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala @@ -21,10 +21,6 @@ import org.apache.linkis.common.exception.{ErrorException, LinkisException, Link import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.entrance.conf.EntranceConfiguration -import org.apache.linkis.entrance.conf.EntranceConfiguration.{ - ENABLE_JOB_TIMEOUT_CHECK, - ENTRANCE_TASK_TIMEOUT -} import org.apache.linkis.entrance.cs.CSEntranceHelper import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._ import org.apache.linkis.entrance.exception.{EntranceErrorException, SubmitFailedException} @@ -35,7 +31,13 @@ import org.apache.linkis.entrance.utils.JobHistoryHelper import org.apache.linkis.governance.common.entity.job.JobRequest import org.apache.linkis.governance.common.utils.LoggerUtils import org.apache.linkis.protocol.constants.TaskConstant +import org.apache.linkis.protocol.utils.TaskUtils import org.apache.linkis.rpc.Sender +import org.apache.linkis.scheduler.conf.SchedulerConfiguration.{ + ENGINE_PRIORITY_RUNTIME_KEY, + FIFO_QUEUE_STRATEGY, + PFIFO_SCHEDULER_STRATEGY +} import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState} import org.apache.linkis.server.conf.ServerConfiguration @@ -174,6 +176,26 @@ abstract class EntranceServer extends Logging { ) } + Utils.tryAndWarn { + // 如果是使用优先级队列,设置下优先级 + val configMap = params + .getOrDefault(TaskConstant.PARAMS, new util.HashMap[String, AnyRef]()) + .asInstanceOf[util.Map[String, AnyRef]] + val properties: util.Map[String, AnyRef] = TaskUtils.getRuntimeMap(configMap) + val fifoStrategy: String = FIFO_QUEUE_STRATEGY + if ( + PFIFO_SCHEDULER_STRATEGY.equalsIgnoreCase( + fifoStrategy + ) && properties != null && !properties.isEmpty + ) { + val priorityValue: AnyRef = properties.get(ENGINE_PRIORITY_RUNTIME_KEY) + if (priorityValue != null) { + val value: Int = getPriority(priorityValue.toString) + job.setPriority(value) + } + } + } + getEntranceContext.getOrCreateScheduler().submit(job) val msg = LogUtils.generateInfo( s"Job with jobId : ${jobRequest.getId} and execID : ${job.getId()} submitted " @@ -294,6 +316,23 @@ abstract class EntranceServer extends Logging { startTimeOutCheck() } + val DOT = "." + val DEFAULT_PRIORITY = 100 + + private def getPriority(value: String): Int = { + var priority: Int = -1 + Utils.tryAndWarn({ + priority = + if (value.contains(DOT)) value.substring(0, value.indexOf(DOT)).toInt else value.toInt + }) + if (priority < 0 || priority > Integer.MAX_VALUE - 1) { + logger.warn(s"illegal queue priority: ${value}") + DEFAULT_PRIORITY + } else { + priority + } + } + } object EntranceServer { diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/LogPathCreateInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/LogPathCreateInterceptor.scala index 414c42fdd3..5ed8d88fe0 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/LogPathCreateInterceptor.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/LogPathCreateInterceptor.scala @@ -17,7 +17,6 @@ package org.apache.linkis.entrance.interceptor.impl -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.linkis.common.exception.ErrorException import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.entrance.interceptor.EntranceInterceptor @@ -25,6 +24,8 @@ import org.apache.linkis.entrance.interceptor.exception.LogPathCreateException import org.apache.linkis.entrance.parser.ParserUtils import org.apache.linkis.governance.common.entity.job.JobRequest +import org.apache.commons.lang3.exception.ExceptionUtils + /** * Description:Log path generation interceptor, used to set the path log of the task(日志路径生成拦截器, * 用于设置task的路径日志) diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala index 7a3fdf0f7e..be37a77f91 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala @@ -177,7 +177,8 @@ class UJESSQLResultSet( } val metaTmp = resultSetResult.getMetadata if (NULL_VALUE.equals(String.valueOf(metaTmp))) { - val fileContentList = resultSetResult.getFileContent.asInstanceOf[util.List[util.List[String]]] + val fileContentList = + resultSetResult.getFileContent.asInstanceOf[util.List[util.List[String]]] if (null != fileContentList) { resultSetMetaData.setColumnNameProperties(1, "linkis_string") resultSetMetaData.setDataTypeProperties(1, "String") diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java index 197e450ea3..903c58ada7 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java @@ -104,7 +104,8 @@ public class AMConfiguration { CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "trino,appconn,io_file,jdbc"); public static final CommonVars MULTI_USER_ENGINE_USER = CommonVars.apply("wds.linkis.multi.user.engine.user", getDefaultMultiEngineUser()); - public static final String UDF_KILL_ENGINE_TYPE = CommonVars.apply("linkis.udf.kill.engine.type", "spark,hive").getValue(); + public static final String UDF_KILL_ENGINE_TYPE = + CommonVars.apply("linkis.udf.kill.engine.type", "spark,hive").getValue(); public static final CommonVars ENGINE_LOCKER_MAX_TIME = CommonVars.apply("wds.linkis.manager.am.engine.locker.max.time", 1000 * 60 * 5); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java index 223c1b2c23..9041d3f4e4 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java @@ -772,16 +772,16 @@ public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody Jso } if (engineType.equals(Configuration.GLOBAL_CONF_SYMBOL())) { Arrays.stream(AMConfiguration.UDF_KILL_ENGINE_TYPE.split(",")) - .forEach( - engine -> - engineStopService.stopUnlockECByUserCreatorAndECType( - userName, creatorStr, engine)); + .forEach( + engine -> + engineStopService.stopUnlockECByUserCreatorAndECType( + userName, creatorStr, engine)); } else { - engineStopService.stopUnlockECByUserCreatorAndECType( - userName, creatorStr, engineType); + engineStopService.stopUnlockECByUserCreatorAndECType(userName, creatorStr, engineType); } return Message.ok("Kill engineConn succeed"); } + static ServiceInstance getServiceInstance(JsonNode jsonNode) throws AMErrorException { String applicationName = jsonNode.get("applicationName").asText(); String instance = jsonNode.get("instance").asText(); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java index 3195f1f2a8..048dc96e97 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java @@ -77,7 +77,8 @@ public NodeResource requestResourceInfo( String queueName = ((YarnResourceIdentifier) identifier).getQueueName(); if (queueName.startsWith(queuePrefix)) { - logger.info("Queue name {} starts with '{}', remove '{}'", queueName, queuePrefix, queuePrefix); + logger.info( + "Queue name {} starts with '{}', remove '{}'", queueName, queuePrefix, queuePrefix); queueName = queueName.substring(queuePrefix.length()); } String realQueueName = queuePrefix + queueName; diff --git a/linkis-dist/package/admin/linkis_udf_get_python_methods.py b/linkis-dist/package/admin/linkis_udf_get_python_methods.py new file mode 100644 index 0000000000..d3f8268141 --- /dev/null +++ b/linkis-dist/package/admin/linkis_udf_get_python_methods.py @@ -0,0 +1,18 @@ +import sys +import ast +import json + +def extract_method_names(file_path): + with open(file_path, 'r') as file: + code = file.read() + tree = ast.parse(code) + method_names = set() + + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + method_names.add(node.name) + + return json.dumps(list(method_names), indent=4) + +file_path = sys.argv[1] +print(extract_method_names(file_path)) diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala index ba6be619b6..8498cd6892 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala @@ -46,6 +46,12 @@ object HiveEngineConfiguration { val HIVE_RANGER_ENABLE = CommonVars[Boolean]("linkis.hive.ranger.enabled", false).getValue + val HIVE_ENGINE_CONN_JAVA_EXTRA_OPTS = CommonVars( + "wds.linkis.hive.engineConn.java.extraOpts", + "", + "Specify the option parameter of the java process (please modify it carefully!!!)" + ) + val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename" val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue" diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index e8eccd35a7..d50e4096f9 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -274,7 +274,9 @@ class HiveEngineConnExecutor( if (numberOfMRJobs > 0) { engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do") val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) - engineExecutorContext.appendStdout(s"Your task will be submitted to the $queueName queue") + engineExecutorContext.appendStdout( + s"Your task will be submitted to the $queueName queue" + ) } if (thread.isInterrupted) { logger.error( diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/launch/HiveProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/launch/HiveProcessEngineConnLaunchBuilder.scala index 3aa760f824..b2dfb5d9a8 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/launch/HiveProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/launch/HiveProcessEngineConnLaunchBuilder.scala @@ -17,9 +17,12 @@ package org.apache.linkis.engineplugin.hive.launch +import org.apache.linkis.engineplugin.hive.conf.HiveEngineConfiguration.HIVE_ENGINE_CONN_JAVA_EXTRA_OPTS import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder +import org.apache.commons.lang3.StringUtils + import java.util import com.google.common.collect.Lists @@ -34,4 +37,13 @@ class HiveProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuil Lists.newArrayList("JarUDFLoadECMHook") } + override protected def getExtractJavaOpts: String = { + val hiveExtraOpts: String = HIVE_ENGINE_CONN_JAVA_EXTRA_OPTS.getValue + if (StringUtils.isNotBlank(hiveExtraOpts)) { + super.getExtractJavaOpts + " " + hiveExtraOpts + } else { + super.getExtractJavaOpts + } + } + } diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java index ef02f182f2..08dca4214b 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java @@ -173,7 +173,7 @@ public void jobHistoryFinishedScan() { // 新增失败任务分析扫描 try { JobHistoryAnalyzeRule jobHistoryAnalyzeRule = - new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender()); + new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender()); scanner.addScanRule(jobHistoryAnalyzeRule); } catch (Exception e) { logger.warn("JobHistoryAnalyzeRule Scan Error msg: " + e.getMessage()); @@ -252,21 +252,21 @@ public void jdbcUnfinishedAlertScan() { @Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}") public void jdbcUnfinishedKillScan() { long id = - Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan")) - .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); long intervalMs = 7200 * 1000; long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; long endTime = System.currentTimeMillis(); long startTime = endTime - intervalMs; AnomalyScanner scanner = new DefaultScanner(); List fetchers = - JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); if (fetchers.isEmpty()) { logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); return; } StarrocksTimeKillRule starrocksTimeKillRule = - new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender()); + new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender()); scanner.addScanRule(starrocksTimeKillRule); JobMonitorUtils.run(scanner, fetchers, true); } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala index 517e854dec..2b17adb39b 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala @@ -28,7 +28,6 @@ import java.util import scala.collection.JavaConverters._ - class JobHistoryAnalyzeAlertSender() extends Observer with Logging { override def update(e: Event, jobHistroyList: scala.Any): Unit = {} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala index 67cbadd2e8..18553f228e 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala @@ -25,7 +25,6 @@ class StarrocksTimeKillAlertSender extends Observer with Logging { /** * Observer Pattern */ - override def update(e: Event, jobHistroyList: scala.Any): Unit = { - } + override def update(e: Event, jobHistroyList: scala.Any): Unit = {} } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala index dfcd934908..e5df6f3ff7 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala @@ -17,8 +17,6 @@ package org.apache.linkis.monitor.jobhistory.jobtime -import org.apache.commons.collections.MapUtils -import org.apache.commons.lang3.StringUtils import org.apache.linkis.common.utils.Logging import org.apache.linkis.monitor.constants.Constants import org.apache.linkis.monitor.core.ob.Observer @@ -27,8 +25,12 @@ import org.apache.linkis.monitor.jobhistory.entity.JobHistory import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils} import org.apache.linkis.server.BDPJettyServerHelper +import org.apache.commons.collections.MapUtils +import org.apache.commons.lang3.StringUtils + import java.util import java.util.Locale + import scala.collection.JavaConverters._ class StarrocksTimeKillRule(hitObserver: Observer) diff --git a/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala b/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala index 92feb8a561..b0581967f1 100644 --- a/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala +++ b/linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala @@ -40,7 +40,6 @@ import java.net.InetAddress import scala.beans.BeanProperty import scala.collection.JavaConverters._ -import scala.collection.mutable import com.google.gson.reflect.TypeToken @@ -48,7 +47,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging @BeanProperty var ioClient: IOClient = _ - private val properties: mutable.HashMap[String, String] = mutable.HashMap[String, String]() + private var properties: java.util.Map[String, String] = new java.util.HashMap[String, String]() private var inited = false @@ -69,7 +68,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging label.setJobGroupId(IOClientUtils.generateJobGrupID()) } - def getProxyUser: String = StorageConfiguration.PROXY_USER.getValue(properties.asJava) + def getProxyUser: String = StorageConfiguration.PROXY_USER.getValue(properties) def getCreatorUser: String = StorageUtils.getJvmUser @@ -103,7 +102,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging } def initFS(methodName: String = "init"): Unit = { - if (!properties.contains(StorageConfiguration.PROXY_USER.key)) { + if (!properties.containsKey(StorageConfiguration.PROXY_USER.key)) { throw new StorageErrorException(NO_PROXY_USER.getErrorCode, NO_PROXY_USER.getErrorDesc) } bindEngineLabel.setIsJobGroupHead("true") @@ -117,7 +116,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging getProxyUser, getLocalIP, methodName, - Array(properties.toMap) + Array(properties) ), bindEngineLabel ) @@ -172,7 +171,7 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging case "init" => case "storageName" => return fsType case "setUser" => - properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String]; + properties.put(StorageConfiguration.PROXY_USER.key, args(0).asInstanceOf[String]); return Unit case _ => if (inited) { @@ -185,23 +184,23 @@ class IOMethodInterceptor(fsType: String) extends MethodInterceptor with Logging method.getName match { case "init" => val user = - if (properties.contains(StorageConfiguration.PROXY_USER.key)) { - StorageConfiguration.PROXY_USER.getValue(properties.toMap) + if (properties.containsKey(StorageConfiguration.PROXY_USER.key)) { + StorageConfiguration.PROXY_USER.getValue(properties) } else { null } if (args.length > 0 && args(0).isInstanceOf[java.util.Map[String, String]]) { - properties ++= args(0).asInstanceOf[java.util.Map[String, String]].asScala + properties = args(0).asInstanceOf[java.util.Map[String, String]] } if (StringUtils.isNoneBlank(user)) { - properties += StorageConfiguration.PROXY_USER.key -> user + properties.put(StorageConfiguration.PROXY_USER.key, user) } initFS() logger.warn(s"For user($user)inited a $fsType storage($id) .") Unit case "fsName" => fsType case "setUser" => - properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String]; Unit + properties.put(StorageConfiguration.PROXY_USER.key, args(0).asInstanceOf[String]); Unit case "read" => if (!inited) throw new IllegalAccessException("storage has not been inited.") new IOInputStream(args) diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala index e89193418a..33e57a7c22 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala @@ -30,9 +30,12 @@ import org.apache.linkis.governance.common.utils.GovernanceConstant import org.apache.linkis.manager.common.entity.enumeration.NodeStatus import org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus, ResponseNodeStatus} import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf import org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor, EngineConnTaskInfo} +import org.apache.linkis.orchestrator.ecm.entity.{Mark, MarkReq} +import org.apache.linkis.orchestrator.ecm.service.impl.ComputationEngineConnExecutor import org.apache.linkis.orchestrator.listener.task.{ TaskErrorResponseEvent, TaskLogEvent, @@ -41,6 +44,8 @@ import org.apache.linkis.orchestrator.listener.task.{ import org.apache.linkis.rpc.Sender import org.apache.linkis.server.{toJavaMap, BDPJettyServerHelper} +import org.apache.commons.lang3.StringUtils + import java.util import java.util.concurrent.TimeUnit @@ -53,6 +58,8 @@ object EngineConnMonitor extends Logging { private val ENGINECONN_LASTUPDATE_TIMEOUT = ComputationOrchestratorConf.ENGINECONN_LASTUPDATE_TIMEOUT.getValue.toLong + private val engineTypeKey = "engineType" + private[linkis] def addEngineExecutorStatusMonitor( engineConnExecutorCache: util.Map[EngineConnTaskInfo, CodeExecTaskExecutor] ): Unit = { @@ -203,7 +210,20 @@ object EngineConnMonitor extends Logging { val execTask = executor.getExecTask Utils.tryAndError { val labels: Array[Label[_]] = executor.getEngineConnExecutor.getLabels() - val engineType: String = LabelUtil.getEngineTypeLabel(labels.toList.asJava).getEngineType + var engineType = "" + val mark: Mark = executor.getMark + if (mark != null) { + val req: MarkReq = mark.getMarkReq + if (req != null) { + val engineTypeRef: AnyRef = req.labels.get(engineTypeKey) + if (engineTypeRef != null && engineTypeRef.toString.contains("-")) { + engineType = engineTypeRef.toString.split("-")(0) + } + } + } + if (StringUtils.isEmpty(engineType)) { + engineType = getEngineType(labels) + } logger.warn( s"Will kill task ${execTask.getIDInfo()} because the engine ${executor.getEngineConnExecutor.getServiceInstance.toString} quited unexpectedly." ) @@ -225,6 +245,15 @@ object EngineConnMonitor extends Logging { } } + private def getEngineType(labels: Array[Label[_]]): String = { + val labelArray: Array[Label[_]] = labels.filter(_.getLabelKey.equals(engineTypeKey)) + var engineType = "" + if (labelArray != null && labelArray.size > 0) { + engineType = labelArray(0).asInstanceOf[EngineTypeLabel].getEngineType + } + engineType + } + private def updateExecutorActivityTime( serviceInstance: ServiceInstance, engineConnExecutorCache: mutable.HashMap[ServiceInstance, ArrayBuffer[CodeExecTaskExecutor]] diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java index 700a8aab3a..88a4d93b6e 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java @@ -103,11 +103,8 @@ public List> getTablesByDbNameAndOptionalUserName( queryParam.withRoles(roles); List> hiveTables = hiveMetaDao.getTablesByDbNameAndUserAndRolesFromDbPrvs(queryParam); - hiveTables.addAll( - hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam)); - return hiveTables.stream() - .distinct() - .collect(Collectors.toList()); + hiveTables.addAll(hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam)); + return hiveTables.stream().distinct().collect(Collectors.toList()); } else { log.info("user {} to getTablesByDbName no permission control", queryParam.getUserName()); return hiveMetaDao.getTablesByDbName(queryParam); diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java index 4fa9f49175..25afc1c44a 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/MdqServiceImpl.java @@ -376,7 +376,7 @@ public String getTableLocation(MetadataQueryParam queryParam) { private int getTableFileNum(String tableLocation) throws IOException { int tableFileNum = 0; - if (StringUtils.isNotBlank(tableLocation)) { + if (StringUtils.isNotBlank(tableLocation) && getRootHdfs().exists(new Path(tableLocation))) { FileStatus tableFile = getFileStatus(tableLocation); tableFileNum = (int) getRootHdfs().getContentSummary(tableFile.getPath()).getFileCount(); } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index af87acb575..f40d028726 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -774,13 +774,14 @@ public Message getUserKeyValue( parms.put("nonce", SHAUtils.DOCTOR_NONCE); // doctor提供的token String token = SHAUtils.DOCTOR_TOKEN.getValue(); - if (StringUtils.isNotBlank(token)){ + if (StringUtils.isNotBlank(token)) { String signature = + SHAUtils.Encrypt( SHAUtils.Encrypt( - SHAUtils.Encrypt( - parms.get("app_id") + SHAUtils.DOCTOR_NONCE + System.currentTimeMillis(), null) - + token, - null); + parms.get("app_id") + SHAUtils.DOCTOR_NONCE + System.currentTimeMillis(), + null) + + token, + null); parms.put("signature", signature); return Message.ok().data("doctor", parms); } else { diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java index 50916755a0..81a1b7755a 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/api/UDFRestfulApi.java @@ -23,6 +23,7 @@ import org.apache.linkis.server.utils.ModuleUserUtils; import org.apache.linkis.storage.FSFactory; import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageUtils$; import org.apache.linkis.udf.conf.Constants; import org.apache.linkis.udf.entity.PythonModuleInfo; import org.apache.linkis.udf.entity.UDFInfo; @@ -1437,4 +1438,36 @@ public Message pythonUpload( .data("dependencies", dependencies) .data("fileName", fileName); } + + @ApiImplicitParam( + name = "path", + dataType = "String", + value = "path", + example = "file:///test-dir/test-sub-dir/test1012_01.py") + @RequestMapping(path = "/get-register-functions", method = RequestMethod.GET) + public Message getRegisterFunctions(HttpServletRequest req, @RequestParam("path") String path) { + if (StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_PY) + || StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_SCALA)) { + if (StringUtils.startsWithIgnoreCase(path, StorageUtils$.MODULE$.FILE_SCHEMA())) { + try { + FsPath fsPath = new FsPath(path); + // 获取文件系统实例 + FileSystem fileSystem = (FileSystem) FSFactory.getFs(fsPath); + fileSystem.init(null); + if (fileSystem.canRead(fsPath)) { + return Message.ok() + .data("functions", UdfUtils.getRegisterFunctions(fileSystem, fsPath, path)); + } else { + return Message.error("您没有权限访问该文件"); + } + } catch (Exception e) { + return Message.error("解析文件失败,错误信息:" + e); + } + } else { + return Message.error("仅支持本地文件"); + } + } else { + return Message.error("仅支持.py和.scala文件"); + } + } } diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/conf/Constants.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/conf/Constants.java index ad2692333d..14f93370fa 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/conf/Constants.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/conf/Constants.java @@ -22,6 +22,7 @@ public class Constants { public static final String FILE_EXTENSION_PY = ".py"; public static final String FILE_EXTENSION_ZIP = ".zip"; + public static final String FILE_EXTENSION_SCALA = ".scala"; public static final String FILE_EXTENSION_TAR_GZ = ".tar.gz"; public static final String FILE_PERMISSION = "770"; public static final String DELIMITER_COMMA = ","; diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java index f72ff96dfe..f7401fc1f4 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java @@ -19,7 +19,10 @@ import org.apache.linkis.common.conf.Configuration; import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.common.utils.JsonUtils; import org.apache.linkis.common.utils.Utils; +import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageUtils$; import org.apache.linkis.udf.conf.Constants; import org.apache.linkis.udf.exception.UdfException; @@ -43,6 +46,7 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import com.fasterxml.jackson.core.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -280,4 +284,56 @@ public static List extractDependencies(String content, String name) { } return modules; } + + public static List getRegisterFunctions(FileSystem fileSystem, FsPath fsPath, String path) + throws Exception { + try (InputStream is = fileSystem.read(fsPath)) { + // 将inputstream内容转换为字符串 + String content = IOUtils.toString(is, StandardCharsets.UTF_8); + if (StringUtils.endsWith(path, Constants.FILE_EXTENSION_PY)) { + // 解析python文件 + return extractPythonMethodNames(path); + } else if (StringUtils.endsWith(path, Constants.FILE_EXTENSION_SCALA)) { + // 解析scala代码 + return extractScalaMethodNames(content); + } else { + throw new UdfException(80041, "Unsupported file type: " + path); + } + } catch (IOException e) { + throw new UdfException(80042, "Failed to read file: " + path, e); + } + } + + public static List extractScalaMethodNames(String scalaCode) { + List methodNames = new ArrayList<>(); + // 正则表达式匹配方法定义,包括修饰符 + String regex = "(\\b(private|protected)\\b\\s+)?\\bdef\\s+(\\w+)\\b"; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(scalaCode); + logger.info("use regex to get scala method names.. reg:{}", regex); + while (matcher.find()) { + String methodName = matcher.group(3); + methodNames.add(methodName); + } + + return methodNames; + } + + public static List extractPythonMethodNames(String udfPath) throws Exception { + String localPath = udfPath.replace(StorageUtils$.MODULE$.FILE_SCHEMA(), ""); + String exec = + Utils.exec( + (new String[] { + Constants.PYTHON_COMMAND.getValue(), + Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py", + localPath + })); + logger.info( + "execute python script to get python method name...{} {} {}", + Constants.PYTHON_COMMAND.getValue(), + Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py", + localPath); + // 将exec转换为List,exec为一个json数组 + return JsonUtils.jackson().readValue(exec, new TypeReference>() {}); + } } diff --git a/linkis-web/package.json b/linkis-web/package.json index 12f4ca8b33..30481a2c1a 100644 --- a/linkis-web/package.json +++ b/linkis-web/package.json @@ -1,6 +1,6 @@ { "name": "linkis", - "version": "1.4.0", + "version": "1.10.0", "private": true, "scripts": { "serve": "vue-cli-service serve", @@ -25,7 +25,7 @@ }, "dependencies": { "@form-create/iview": "2.5.27", - "axios": "0.28.1", + "axios": "1.7.4", "dexie": "3.2.3", "dt-sql-parser": "3.0.5", "highlight.js": "10.7.0", @@ -49,7 +49,9 @@ "vue-router": "3.4.8", "vuedraggable": "2.24.3", "vuescroll": "4.16.1", - "worker-loader": "3.0.8" + "worker-loader": "3.0.8", + "xterm": "5.3.0", + "xterm-addon-fit": "0.8.0" }, "devDependencies": { "@intlify/vue-i18n-loader": "1.0.0", diff --git a/linkis-web/src/apps/PythonModule/.npmrc b/linkis-web/src/apps/PythonModule/.npmrc deleted file mode 100644 index dd4617f0fa..0000000000 --- a/linkis-web/src/apps/PythonModule/.npmrc +++ /dev/null @@ -1,2 +0,0 @@ -registry=http://wnpm.weoa.com:8001 -shamefully-hoist=true \ No newline at end of file diff --git a/linkis-web/src/apps/URM/i18n/common/en.json b/linkis-web/src/apps/URM/i18n/common/en.json index f6f8319cc8..5b4c887a00 100644 --- a/linkis-web/src/apps/URM/i18n/common/en.json +++ b/linkis-web/src/apps/URM/i18n/common/en.json @@ -60,6 +60,7 @@ "no": "No", "sprak": "sprak", "jarPackage": "jar Package", + "registerFunc": "Register Function", "scriptPath": "Script Path", "registerFunction": "Register Function", "registerFormat": "Register Format", diff --git a/linkis-web/src/apps/URM/i18n/common/zh.json b/linkis-web/src/apps/URM/i18n/common/zh.json index 942c66a7e2..2c6896869a 100644 --- a/linkis-web/src/apps/URM/i18n/common/zh.json +++ b/linkis-web/src/apps/URM/i18n/common/zh.json @@ -60,6 +60,7 @@ "no": "否", "sprak": "sprak", "jarPackage": "jar包", + "registerFunc": "注册函数", "scriptPath": "脚本路径", "registerFunction": "注册的函数", "registerFormat": "注册格式", diff --git a/linkis-web/src/apps/URM/module/udfManagement/addFunctionModal.vue b/linkis-web/src/apps/URM/module/udfManagement/addFunctionModal.vue index 8f12aee69e..1dd8b22966 100644 --- a/linkis-web/src/apps/URM/module/udfManagement/addFunctionModal.vue +++ b/linkis-web/src/apps/URM/module/udfManagement/addFunctionModal.vue @@ -96,6 +96,38 @@ fs-type="share" @set-node="setNodePath"/> + + + + + + @@ -288,6 +322,8 @@ export default { }, }, title: '', + registerFunctions: [], + registerFunctionEditable: true, show: false, btnLabel: this.$t('message.common.ok'), modalHeight: '', @@ -552,7 +588,11 @@ export default { }; } }, - + handleFuncChange(val) { + if(this.model !== 1) { + this.setting.name = val; + } + }, init() { let { name, shared, description, path, udfName, directory, udfType, registerFormat, load, useFormat } = this.node; let fnType = 'Spark' @@ -575,9 +615,26 @@ export default { fnType, defaultLoad: !!load }); - this.$nextTick(() => { + this.$nextTick(async () => { this.$set(this.setting, this.getTypes(), path); + this.registerFunctionEditable = false; + if(this.getTypes() !== 'jarPath') { + if (/^[\w\u4e00-\u9fa5:.\\/]*(py|scala)$/.test(path)) { + try { + this.registerFunctions = await this.getRegisterFunction(path); + if(this.registerFunctions.length !== 0) { + this.registerFunctionEditable = false; + } + } catch (err) { + window.console.error(err); + this.registerFunctions = []; + this.registerFunctionEditable = true; + } + } + + } }); + }, close() { @@ -594,27 +651,31 @@ export default { } else if (this.node.udfType === 1) { this.setting.pyPara = conver(',', ')', 'indexOf', 'lastIndexOf'); } else { - const type = rf.slice(rf.indexOf('[') + 1, rf.indexOf(']')); + const typeStart = rf.indexOf('[') + 1; + const typeEnd = rf.lastIndexOf(']'); + const type = rf.slice(typeStart, typeEnd); window.console.log(type, rf, '====='); - // 如果存在多个逗号,就只用使用格式来截取,否则会出现多个类型填入input异常的问题 - if (type.indexOf(',') !== type.lastIndexOf(',')) { - // there are 2 case: - // 1. tuple, return params in (); - // 2. multi params, the first params is return params - if (type.indexOf('(') !== -1) { - // tuple - this.setting.scalaTypeL = type.slice(type.indexOf('('), type.indexOf(')') + 1) - this.setting.scalaTypeR = type.slice(type.indexOf(')')+2) - } else { - // multi params - this.setting.scalaTypeL = type.split(',')[0]; - this.setting.scalaTypeR = type.split(',').slice(1).toString(); + const findFirstValidComma = (str) => { + let brackets = 0; + for (let i = 0; i < str.length; i++) { + const char = str[i]; + // 计算所有类型的括号 + if (char === '[' || char === '(') brackets++; + if (char === ']' || char === ')') brackets--; + // 只在最外层找逗号 + if (char === ',' && brackets === 0) return i; } + return -1; + }; - this.showScalaRF = this.node.registerFormat; + const commaPos = findFirstValidComma(type); + + if (commaPos !== -1) { + this.setting.scalaTypeL = type.slice(0, commaPos).trim(); + this.setting.scalaTypeR = type.slice(commaPos + 1).trim(); } else { - this.setting.scalaTypeL = conver('[', ',', 'indexOf', 'indexOf'); - this.setting.scalaTypeR = conver(',', ']', 'indexOf', 'indexOf'); + this.setting.scalaTypeL = type.trim(); + this.setting.scalaTypeR = ''; } this.setting.scalapara = conver(',', ')', 'lastIndexOf', 'lastIndexOf'); } @@ -635,7 +696,6 @@ export default { this.$set(this.setting, `${type}R`, right); } }, - handleShareChange() { if (!this.isShareLoading) { this.isShareLoading = true; @@ -662,10 +722,10 @@ export default { udfType: this.fnType, isLeaf: true, directory, - clusterName + clusterName, }; if (this.model) { - postData = Object.assign(postData, { shared: false }); + postData = Object.assign(postData, { shared: false, defaultLoad }); this.$emit('update', postData); } else { postData = Object.assign(postData, { defaultLoad }); @@ -692,12 +752,36 @@ export default { resize(h) { this.modalHeight = h - 380 + 'px'; }, + // 获取注册函数 + async getRegisterFunction(path) { + const res = await api.fetch('/udf/get-register-functions', { path }, 'get'); + return res.functions + }, - setNodePath(node) { + async setNodePath(node) { ['jarPath', 'sparkPath', 'customPath'].forEach(item => { const temp = item === this.getTypes() ? node.path : ''; this.$set(this.setting, item, temp); }) + if(this.getTypes() !== 'jarPath') { + + + if (/^[\w\u4e00-\u9fa5:.\\/]*(py|scala)$/.test(node.path)) { + try { + this.registerFunctions = await this.getRegisterFunction(node.path); + if(this.registerFunctions.length !== 0) { + this.registerFunctionEditable = false; + } + } catch (err) { + window.console.error(err); + this.registerFunctions = []; + this.registerFunctionEditable = true; + } + } + + + } + }, getTypes() { diff --git a/linkis-web/src/apps/URM/module/udfManagement/index.vue b/linkis-web/src/apps/URM/module/udfManagement/index.vue index 870b6ddf2c..f6d738fee2 100644 --- a/linkis-web/src/apps/URM/module/udfManagement/index.vue +++ b/linkis-web/src/apps/URM/module/udfManagement/index.vue @@ -360,7 +360,7 @@ export default { } }, // 新增函数 - addFunction(data) { + async addFunction(data) { if (this.loading) return this.loading = true; const params = { @@ -377,19 +377,24 @@ export default { clusterName: data.clusterName, directory: data.directory } - api + await api .fetch('/udf/add', {udfAddVo: params}, 'post') .then(() => { this.showAddModal(false) this.search() this.isLoading = false this.loading = false + if (data.defaultLoad) { + this.confirmKillIdle(data) + } }) .catch(() => { // this.list = [{}] this.loading = false this.isLoading = false + }) + }, // 更新 updateFunction(data) { @@ -417,12 +422,32 @@ export default { this.loading = false this.search() this.$Message.success(this.$t('message.linkis.udf.success')); + + if (data.defaultLoad) { + this.confirmKillIdle(data) + } }) .catch(() => { this.isLoading = false this.loading = false }) }, + confirmKillIdle(data) { + this.$Modal.confirm({ + title: this.$t('message.linkis.setting.killEngineTitle'), + content: this.$t('message.linkis.setting.killEngine'), + onOk: async () => { + try { + api.fetch("/linkisManager/rm/killEngineByCreatorEngineType", { + creator: '*', + engineType: data.udfType === 1 ? 'spark' : '*' + }) + } catch (err) { + window.console.warn(err) + } + } + }) + }, checkChange(v) { this.list = this.list.map(it => { it.checked = !it.disabled && v diff --git a/linkis-web/src/apps/linkis/i18n/common/en.json b/linkis-web/src/apps/linkis/i18n/common/en.json index 0f036dfeda..d65c595d6e 100644 --- a/linkis-web/src/apps/linkis/i18n/common/en.json +++ b/linkis-web/src/apps/linkis/i18n/common/en.json @@ -1,6 +1,7 @@ { "message": { "linkis": { + "diagnosticLog": "Diagnostic Log", "refresh": "Refresh", "tableSetting": "Table Setting", "downloadLog": "Download", diff --git a/linkis-web/src/apps/linkis/i18n/common/zh.json b/linkis-web/src/apps/linkis/i18n/common/zh.json index 1207ff676c..d8949eead2 100644 --- a/linkis-web/src/apps/linkis/i18n/common/zh.json +++ b/linkis-web/src/apps/linkis/i18n/common/zh.json @@ -1,6 +1,7 @@ { "message": { "linkis": { + "diagnosticLog": "诊断日志", "refresh": "刷新", "findNewVer": "检测到新版本", "whetherUpdateNow": "是否立即更新?", diff --git a/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue b/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue index 56725cbf15..2f65033523 100644 --- a/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue +++ b/linkis-web/src/apps/linkis/module/globalHistoryManagement/viewHistory.vue @@ -1,3 +1,4 @@ + + + + \ No newline at end of file