From 151b679c19182842bbaa65d8b9e53651d43e86fc Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:24:20 +0800 Subject: [PATCH 1/7] Dev 1.10.0 monitor starrock update (#667) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * starrock monitor update * Code optimization * Code optimization --------- Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- .../monitor/scheduled/JobHistoryMonitor.java | 58 +++++++-- .../linkis/monitor/until/JobMonitorUtils.java | 8 +- .../mapper/common/JobHistoryMapper.xml | 2 +- .../jobhistory/JobHistoryDataFetcher.scala | 94 ++++++------- .../jobtime/JobTimeExceedRule.scala | 16 +-- ...a => StarrocksTimeExceedAlertSender.scala} | 2 +- .../jobtime/StarrocksTimeExceedRule.scala | 28 +--- .../StarrocksTimeKillAlertSender.scala | 31 +++++ .../jobtime/StarrocksTimeKillHitEvent.scala | 22 ++++ .../jobtime/StarrocksTimeKillRule.scala | 123 ++++++++++++++++++ 10 files changed, 279 insertions(+), 105 deletions(-) rename linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/{StarrocksTimeExceedAlterSender.scala => StarrocksTimeExceedAlertSender.scala} (98%) create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java index 1a0deb7b48..ef02f182f2 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java @@ -29,10 +29,7 @@ import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrorCodeAlertSender; import org.apache.linkis.monitor.jobhistory.index.JobIndexRule; import org.apache.linkis.monitor.jobhistory.index.JobIndexSender; -import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedAlertSender; -import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedRule; -import org.apache.linkis.monitor.jobhistory.jobtime.StarrocksTimeExceedAlterSender; -import org.apache.linkis.monitor.jobhistory.jobtime.StarrocksTimeExceedRule; +import org.apache.linkis.monitor.jobhistory.jobtime.*; import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsAlertSender; import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsRule; import org.apache.linkis.monitor.jobhistory.runtime.CommonJobRunTimeRule; @@ -102,7 +99,7 @@ public void jobHistoryFinishedScan() { logger.info("Get JobHistoryId from cache ID:" + id); } List fetchers = - JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "updated_time"); + JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "finished_job"); if (fetchers.isEmpty()) { logger.warn("generated 0 dataFetchers, plz check input"); return; @@ -188,7 +185,7 @@ public void jobHistoryFinishedScan() { JobIndexRule jobIndexRule = new JobIndexRule(new JobIndexSender()); scannerIndex.addScanRule(jobIndexRule); List createFetcher = - JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "department"); + JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, ""); JobMonitorUtils.run(scannerIndex, createFetcher, true); } @@ -205,7 +202,7 @@ public void jobHistoryUnfinishedScan() { AnomalyScanner scanner = new DefaultScanner(); boolean shouldStart = false; List fetchers = - JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "created_time"); + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "unfinished_job"); if (fetchers.isEmpty()) { logger.warn("generated 0 dataFetchers, plz check input"); return; @@ -225,9 +222,52 @@ public void jobHistoryUnfinishedScan() { jobTimeAlerts.keySet(), new JobTimeExceedAlertSender(jobTimeAlerts)); scanner.addScanRule(jobTimeExceedRule); } + JobMonitorUtils.run(scanner, fetchers, shouldStart); + } + + /** * 每10分钟扫描一次,扫描两个小时之内的任务,告警要求:管理台配置告警相关参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.alert.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedAlertScan() { + long id = + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedAlertScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + long intervalMs = 7200 * 1000; + long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; + long endTime = System.currentTimeMillis(); + long startTime = endTime - intervalMs; + AnomalyScanner scanner = new DefaultScanner(); + List fetchers = + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + if (fetchers.isEmpty()) { + logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); + return; + } StarrocksTimeExceedRule starrocksTimeExceedRule = - new StarrocksTimeExceedRule(new StarrocksTimeExceedAlterSender()); + new StarrocksTimeExceedRule(new StarrocksTimeExceedAlertSender()); scanner.addScanRule(starrocksTimeExceedRule); - JobMonitorUtils.run(scanner, fetchers, shouldStart); + JobMonitorUtils.run(scanner, fetchers, true); + } + + /** * 每10分钟扫描一次,扫描两个小时之内的任务,满足要求触发kill kill要求:数据源配置kill参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedKillScan() { + long id = + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + long intervalMs = 7200 * 1000; + long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; + long endTime = System.currentTimeMillis(); + long startTime = endTime - intervalMs; + AnomalyScanner scanner = new DefaultScanner(); + List fetchers = + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + if (fetchers.isEmpty()) { + logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); + return; + } + StarrocksTimeKillRule starrocksTimeKillRule = + new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender()); + scanner.addScanRule(starrocksTimeKillRule); + JobMonitorUtils.run(scanner, fetchers, true); } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java index 3366014c89..66dd4a3b68 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java @@ -44,14 +44,14 @@ public static void run(AnomalyScanner scanner, List fetchers, Boole } public static List generateFetchers( - long startTime, long endTime, long maxIntervalMs, long id, String timeType) { + long startTime, long endTime, long maxIntervalMs, long id, String jobStatus) { List ret = new ArrayList<>(); long pe = endTime; long ps; while (pe > startTime) { ps = Math.max(pe - maxIntervalMs, startTime); String[] fetcherArgs = - new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), timeType}; + new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), jobStatus}; ret.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper())); logger.info( "Generated dataFetcher for startTime: " + new Date(ps) + ". EndTime: " + new Date(pe)); @@ -61,11 +61,11 @@ public static List generateFetchers( } public static List generateFetchersfortime( - long startTime, long endTime, long id, String timeType) { + long startTime, long endTime, long id, String jobStatus) { List fetchers = new ArrayList<>(); String[] fetcherArgs = new String[] { - String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), timeType + String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), jobStatus }; fetchers.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper())); logger.info( diff --git a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml index f6807b8857..c711af0f5d 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml +++ b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml @@ -127,7 +127,7 @@ FROM linkis_ps_job_history_group_history job JOIN linkis_org_user org ON job.submit_user = org.user_name - job.id > #{id} + job.id >= #{id} and job.submit_user = #{umUser} and job.engine_type = #{engineType} and job.created_time >= #{startDate} AND job.created_time #{endDate} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala index 4f43d86d40..4f553847f6 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala @@ -54,60 +54,46 @@ class JobHistoryDataFetcher(args: Array[Any], mapper: JobHistoryMapper) "Wrong input for JobHistoryDataFetcher. DataType: " + args.getClass.getCanonicalName ) } - if (args != null && args.length == 2) { - val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - mapper - .search(null, null, null, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] - } else if (args != null && args.length == 4) { - val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val id = Utils.tryCatch(args(2).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - if ( - StringUtils.isNotBlank(args(3).asInstanceOf[String]) && args(3) - .asInstanceOf[String] - .equals("updated_time") - ) { - val list = new util.ArrayList[String]() - Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add) - mapper - .searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] - } else { - var list = new util.ArrayList[String]() - Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add) - if (args(3).asInstanceOf[String].equals("department")) { - list = null; - } - mapper - .searchByCache(id, null, list, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] + if (args != null) { + val start = args(0).asInstanceOf[String].toLong + val end = args(1).asInstanceOf[String].toLong + // 根据参数数量进行不同的处理 + args.length match { + // 参数数量为2,则数据库查询仅筛选开始和结束时间 + case 2 => + mapper + .search(null, null, null, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 参数数量为4,根据第四个参数进行不同的查询 + case 4 => + val id = args(2).asInstanceOf[String].toLong + val parm = args(3).asInstanceOf[String] + parm match { + // 筛选任务包含id,时间,已完成状态任务 + case "finished_job" => + val list = new util.ArrayList[String]() + Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add) + mapper + .searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 筛选任务包含id,时间,未完成状态任务 + case "unfinished_job" => + var list = new util.ArrayList[String]() + Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add) + mapper + .searchByCache(id, null, list, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 筛选任务包含id,时间 + case _ => + mapper + .searchByCache(id, null, null, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + } + case _ => + throw new AnomalyScannerException( + 21304, + "Wrong input for JobHistoryDataFetcher. Data: " + args + ) } } else { throw new AnomalyScannerException( diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala index 9e633496a7..0367bfc05e 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala @@ -73,7 +73,6 @@ class JobTimeExceedRule(thresholds: util.Set[String], hitObserver: Observer) val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]() for (sd <- data.asScala) { if (sd != null && sd.getData() != null) { - var idLong = 0L for (d <- sd.getData().asScala) { if (d.isInstanceOf[JobHistory]) { val jobHistory = d.asInstanceOf[JobHistory] @@ -84,24 +83,11 @@ class JobTimeExceedRule(thresholds: util.Set[String], hitObserver: Observer) alertData.add(d.asInstanceOf[JobHistory]) } } - if (idLong == 0L || jobHistory.getId < idLong) { - idLong = jobHistory.getId - } + scanRuleList.put("jobhistoryScan", jobHistory.getId) } else { logger.warn("Ignored wrong input data Type : " + d + ", " + d.getClass.getCanonicalName) } } - if (idLong > 0L) { - val id = Optional - .ofNullable(CacheUtils.cacheBuilder.getIfPresent("jobhistoryScan")) - .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue) - if (id == 0) { - scanRuleList.put("jobhistoryScan", idLong) - } - if (id > idLong) { - scanRuleList.put("jobhistoryScan", idLong) - } - } } else { logger.warn("Ignored null scanned data") } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala similarity index 98% rename from linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala rename to linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala index 492d1f0a23..4e6e707335 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlterSender.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala @@ -32,7 +32,7 @@ import java.util import scala.collection.JavaConverters.asScalaBufferConverter -class StarrocksTimeExceedAlterSender extends Observer with Logging { +class StarrocksTimeExceedAlertSender extends Observer with Logging { /** * Observer Pattern diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala index 8521af6b57..b616c5c02c 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala @@ -37,6 +37,8 @@ class StarrocksTimeExceedRule(hitObserver: Observer) extends AbstractScanRule(event = new StarrocksTimeExceedHitEvent, observer = hitObserver) with Logging { + private val scanRuleList = CacheUtils.cacheBuilder + /** * if data match the pattern, return true and trigger observer should call isMatched() * @@ -52,6 +54,7 @@ class StarrocksTimeExceedRule(hitObserver: Observer) val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]() for (scannedData <- data.asScala) { if (scannedData != null && scannedData.getData() != null) { + var taskMinID = 0L; for (jobHistory <- scannedData.getData().asScala) { jobHistory match { case job: JobHistory => @@ -80,27 +83,10 @@ class StarrocksTimeExceedRule(hitObserver: Observer) alertData.add(job) } } - // 获取超时kill配置信息 - if (StringUtils.isNotBlank(job.getParams)) { - val connectParamsMap = MapUtils.getMap( - datasourceConfMap, - "connectParams", - new util.HashMap[AnyRef, AnyRef] - ) - val killTime = MapUtils.getString(connectParamsMap, "kill_task_time", "") - logger.info("starock killTime: {}", killTime) - if (StringUtils.isNotBlank(killTime) && elapse > killTime.toLong * 60 * 1000) { - if (StringUtils.isNotBlank(killTime)) { - val timeoutInSeconds = timeValue.toDouble - val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong - if (elapse > timeoutInMillis) { - // 触发kill任务 - HttpsUntils.killJob(job) - } - } - } - } -// } + } + if (taskMinID == 0L || taskMinID > job.getId) { + taskMinID = job.getId + scanRuleList.put("jdbcUnfinishedAlertScan", taskMinID) } case _ => logger.warn( diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala new file mode 100644 index 0000000000..67cbadd2e8 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.core.ob.{Event, Observer} + +class StarrocksTimeKillAlertSender extends Observer with Logging { + + /** + * Observer Pattern + */ + override def update(e: Event, jobHistroyList: scala.Any): Unit = { + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala new file mode 100644 index 0000000000..7cdabf33a8 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillHitEvent.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.monitor.core.ob.SingleObserverEvent + +class StarrocksTimeKillHitEvent extends SingleObserverEvent diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala new file mode 100644 index 0000000000..dfcd934908 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.commons.collections.MapUtils +import org.apache.commons.lang3.StringUtils +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.Observer +import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils} +import org.apache.linkis.server.BDPJettyServerHelper + +import java.util +import java.util.Locale +import scala.collection.JavaConverters._ + +class StarrocksTimeKillRule(hitObserver: Observer) + extends AbstractScanRule(event = new StarrocksTimeKillHitEvent, observer = hitObserver) + with Logging { + + private val scanRuleList = CacheUtils.cacheBuilder + + /** + * if data match the pattern, return true and trigger observer should call isMatched() + * + * @param data + * @return + */ + override def triggerIfMatched(data: util.List[ScannedData]): Boolean = { + + if (!getHitEvent().isRegistered || data == null) { + logger.error("ScanRule is not bind with an observer. Will not be triggered") + return false + } + for (scannedData <- data.asScala) { + if (scannedData != null && scannedData.getData() != null) { + var taskMinID = 0L; + for (jobHistory <- scannedData.getData().asScala) { + jobHistory match { + case job: JobHistory => + val status = job.getStatus.toUpperCase(Locale.getDefault) + val engineType = job.getEngineType.toUpperCase(Locale.getDefault) + if ( + Constants.UNFINISHED_JOB_STATUS + .contains(status) && engineType.equals( + Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault) + ) + ) { + // 计算任务执行时间 + val elapse = System.currentTimeMillis() - job.getCreatedTime.getTime + // 获取超时kill配置信息 + if (StringUtils.isNotBlank(job.getParams)) { + val connectParamsMap = MapUtils.getMap( + getDatasourceConf(job), + "connectParams", + new util.HashMap[AnyRef, AnyRef] + ) + val killTime = MapUtils.getString(connectParamsMap, "kill_task_time", "") + logger.info("starock killTime: {}", killTime) + if (StringUtils.isNotBlank(killTime) && elapse > killTime.toLong * 60 * 1000) { + if (StringUtils.isNotBlank(killTime)) { + val timeoutInSeconds = killTime.toDouble + val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong + if (elapse > timeoutInMillis) { + // 触发kill任务 + HttpsUntils.killJob(job) + } + } + } + } + } + if (taskMinID == 0L || taskMinID > job.getId) { + taskMinID = job.getId + scanRuleList.put("jdbcUnfinishedKillScan", taskMinID) + } + case _ => + logger.warn( + "Ignored wrong input data Type : " + jobHistory + ", " + jobHistory.getClass.getCanonicalName + ) + } + } + } else { + logger.warn("Ignored null scanned data") + } + } + true + } + + private def getDatasourceConf(job: JobHistory): util.Map[_, _] = { + // 获取任务参数中datasourcename + val parmMap = + BDPJettyServerHelper.gson.fromJson(job.getParams, classOf[java.util.Map[String, String]]) + val configurationMap = + MapUtils.getMap(parmMap, "configuration", new util.HashMap[String, String]()) + val runtimeMap = + MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String, String]()) + val datasourceName = MapUtils.getString(runtimeMap, Constants.JOB_DATASOURCE_CONF, "") + // 获取datasource信息 + if (StringUtils.isNotBlank(datasourceName)) { + HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName) + } else { + new util.HashMap[String, String]() + } + } + +} From 4b0c2dbf47226b7e410e5619f4802a7eeac12c8f Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:28:12 +0800 Subject: [PATCH 2/7] Dev 1.10.0 hive engine add yarn log (#669) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * hive engine add real yarn queue print * Optimization of Hive Log Printing --------- Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- .../executor/conf/ComputationExecutorConf.scala | 2 +- .../executor/execute/ComputationExecutor.scala | 7 +++++-- .../hive/conf/HiveEngineConfiguration.scala | 6 ++++++ .../hive/creation/HiveEngineConnFactory.scala | 10 +++------- .../executor/HiveEngineConcurrentConnExecutor.scala | 4 ++++ .../hive/executor/HiveEngineConnExecutor.scala | 2 ++ 6 files changed, 21 insertions(+), 10 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala index 575f0165fa..41dc4b9783 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala @@ -35,7 +35,7 @@ object ComputationExecutorConf { val PRINT_TASK_PARAMS_SKIP_KEYS = CommonVars( "linkis.engineconn.print.task.params.skip.keys", - "jobId", + "jobId,wds.linkis.rm.yarnqueue", "skip to print params key at job logs" ) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index 5a06ca007f..d084c0dd82 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -414,10 +414,13 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) def printTaskParamsLog(engineExecutorContext: EngineExecutionContext): Unit = { val sb = new StringBuilder - EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) => // skip log jobId because it corresponding jobid when the ec created - if (!ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue.contains(key)) { + if ( + !ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue + .split(",") + .exists(_.equals(key)) + ) { sb.append(s"${key}=${value}\n") } }) diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala index 4de8f02f52..ba6be619b6 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala @@ -46,4 +46,10 @@ object HiveEngineConfiguration { val HIVE_RANGER_ENABLE = CommonVars[Boolean]("linkis.hive.ranger.enabled", false).getValue + val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename" + + val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue" + + val HIVE_TEZ_QUEUE_NAME: String = "tez.queue.name" + } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala index a9b217074d..43d845e6f3 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala @@ -59,10 +59,6 @@ import scala.collection.JavaConverters._ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory with Logging { - private val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename" - private val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue" - private val HIVE_TEZ_QUEUE_NAME: String = "tez.queue.name" - override protected def newExecutor( id: Int, engineCreationContext: EngineCreationContext, @@ -188,10 +184,10 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w } .foreach { case (k, v) => logger.info(s"key is $k, value is $v") - if (BDP_QUEUE_NAME.equals(k)) { - hiveConf.set(HIVE_QUEUE_NAME, v) + if (HiveEngineConfiguration.BDP_QUEUE_NAME.equals(k)) { + hiveConf.set(HiveEngineConfiguration.HIVE_QUEUE_NAME, v) if ("tez".equals(HiveEngineConfiguration.HIVE_ENGINE_TYPE)) { - hiveConf.set(HIVE_TEZ_QUEUE_NAME, v) + hiveConf.set(HiveEngineConfiguration.HIVE_TEZ_QUEUE_NAME, v) } } else hiveConf.set(k, v) } diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala index 47496cf17a..b5abb36530 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala @@ -241,6 +241,10 @@ class HiveEngineConcurrentConnExecutor( engineExecutorContext.appendStdout( s"Your hive taskId: $taskId has $numberOfJobs MR jobs to do" ) + val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) + engineExecutorContext.appendStdout( + s"Your task will be submitted to the $queueName queue" + ) } logger.info(s"there are ${numberOfJobs} jobs.") diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index 5499cb3d62..e8eccd35a7 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -273,6 +273,8 @@ class HiveEngineConnExecutor( } if (numberOfMRJobs > 0) { engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do") + val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME) + engineExecutorContext.appendStdout(s"Your task will be submitted to the $queueName queue") } if (thread.isInterrupted) { logger.error( From 46a1d1ddf3fd2e8b87eec4aeee5abdbd032fc167 Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:28:53 +0800 Subject: [PATCH 3/7] Dev 1.10.0 sdk support shell return (#670) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add a method to obtain the result set, getResultSet is used to obtain the result set * jdbc driver support shell return * jdbc driver support shell return * Code optimization --------- Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- .../linkis/ujes/jdbc/UJESSQLResultSet.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala index 37c379a7ca..cbfa94b232 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala @@ -175,13 +175,23 @@ class UJESSQLResultSet( if (null == resultSetResult) { return } - metaData = resultSetResult.getMetadata.asInstanceOf[util.List[util.Map[String, String]]] - if (null != metaData) { - for (cursor <- 1 to metaData.size()) { - val col = metaData.get(cursor - 1) - resultSetMetaData.setColumnNameProperties(cursor, col.get("columnName")) - resultSetMetaData.setDataTypeProperties(cursor, col.get("dataType")) - resultSetMetaData.setCommentPropreties(cursor, col.get("comment")) + val metaTmp = resultSetResult.getMetadata + if ("NULL".equals(String.valueOf(metaTmp))) { + val fileContentList = resultSetResult.getFileContent.asInstanceOf[util.List[util.List[String]]] + if (null != fileContentList) { + resultSetMetaData.setColumnNameProperties(1, "linkis_string") + resultSetMetaData.setDataTypeProperties(1, "String") + resultSetMetaData.setCommentPropreties(1, "NULL") + } + } else { + metaData = metaTmp.asInstanceOf[util.List[util.Map[String, String]]] + if (null != metaData) { + for (cursor <- 1 to metaData.size()) { + val col = metaData.get(cursor - 1) + resultSetMetaData.setColumnNameProperties(cursor, col.get("columnName")) + resultSetMetaData.setDataTypeProperties(cursor, col.get("dataType")) + resultSetMetaData.setCommentPropreties(cursor, col.get("comment")) + } } } } @@ -194,12 +204,6 @@ class UJESSQLResultSet( resultSetResult.getFileContent.asInstanceOf[util.ArrayList[util.ArrayList[String]]] } - def getResultSet(): util.ArrayList[util.ArrayList[String]] = { - resultSetResultInit() - resultSetInit() - resultSetRow - } - private def init(): Unit = { resultSetResultInit() metaDataInit() From af0b5e416de78044b980ec962dd66b226f893c34 Mon Sep 17 00:00:00 2001 From: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:29:37 +0800 Subject: [PATCH 4/7] Slow query repair (#672) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: “v_kkhuang” <“420895376@qq.com”> --- .../org/apache/linkis/metadata/hive/dao/HiveMetaDao.java | 6 +++++- .../service/impl/HiveMetaWithPermissionServiceImpl.java | 9 ++++++++- .../src/main/resources/mapper/common/HiveMetaDao.xml | 6 ++++-- .../apache/linkis/metadata/hive/dao/HiveMetaDaoTest.java | 3 ++- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dao/HiveMetaDao.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dao/HiveMetaDao.java index f5404f0e67..f81456085a 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dao/HiveMetaDao.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dao/HiveMetaDao.java @@ -55,7 +55,11 @@ List getDbsByUserAndRoles( */ List getAllDbs(); - List> getTablesByDbNameAndUserAndRoles(MetadataQueryParam queryParam); + List> getTablesByDbNameAndUserAndRolesFromDbPrvs( + MetadataQueryParam queryParam); + + List> getTablesByDbNameAndUserAndRolesFromTblPrvs( + MetadataQueryParam queryParam); List> getTablesByDbName(MetadataQueryParam queryParam); diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java index c130693f60..700a8aab3a 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/service/impl/HiveMetaWithPermissionServiceImpl.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import com.fasterxml.jackson.databind.JsonNode; import org.slf4j.Logger; @@ -100,7 +101,13 @@ public List> getTablesByDbNameAndOptionalUserName( if (flag) { List roles = hiveMetaDao.getRolesByUser(queryParam.getUserName()); queryParam.withRoles(roles); - return hiveMetaDao.getTablesByDbNameAndUserAndRoles(queryParam); + List> hiveTables = + hiveMetaDao.getTablesByDbNameAndUserAndRolesFromDbPrvs(queryParam); + hiveTables.addAll( + hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam)); + return hiveTables.stream() + .distinct() + .collect(Collectors.toList()); } else { log.info("user {} to getTablesByDbName no permission control", queryParam.getUserName()); return hiveMetaDao.getTablesByDbName(queryParam); diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/resources/mapper/common/HiveMetaDao.xml b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/resources/mapper/common/HiveMetaDao.xml index 80dd7b6e45..e4e0ed0fef 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/resources/mapper/common/HiveMetaDao.xml +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/resources/mapper/common/HiveMetaDao.xml @@ -98,7 +98,7 @@ GROUP BY NAME order by NAME - select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER from DB_PRIVS t1 inner join TBLS t2 on t1.DB_ID = t2.DB_ID and t1.DB_PRIV in ('SELECT','ALL') @@ -110,7 +110,9 @@ #{id} ) - union + order by NAME; + +