diff --git a/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterAlertQuotaController.java b/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterAlertQuotaController.java index 37709a26..6243e043 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterAlertQuotaController.java +++ b/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterAlertQuotaController.java @@ -1,15 +1,7 @@ package com.datasophon.api.controller; -import java.io.IOException; import java.util.*; -import com.datasophon.api.service.FrameServiceService; -import com.datasophon.common.model.AlertItem; -import com.datasophon.common.model.Generators; -import com.datasophon.common.utils.FreemakerUtils; -import com.datasophon.dao.entity.FrameServiceEntity; -import freemarker.template.TemplateException; -import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import com.datasophon.common.utils.Result; @@ -29,44 +21,6 @@ public class ClusterAlertQuotaController { @Autowired private ClusterAlertQuotaService clusterAlertQuotaService; - @Autowired - private FrameServiceService service; - - - /** - * 列表 - */ - @RequestMapping("/generateAlertYml") - public Result list(Integer clusterId) throws IOException, TemplateException { - List list = clusterAlertQuotaService.list(); - List serviceList = service.list(); - for (FrameServiceEntity serviceEntity : serviceList) { - Generators generators = new Generators(); - generators.setFilename(serviceEntity.getServiceName().toLowerCase()+".yml"); - generators.setConfigFormat("prometheus"); - generators.setOutputDirectory("D:\\360downloads\\test\\"); - ArrayList alertItems = new ArrayList<>(); - for (ClusterAlertQuota clusterAlertQuota : list) { - if(clusterAlertQuota.getServiceCategory().equals(serviceEntity.getServiceName()) && StringUtils.isNotBlank(clusterAlertQuota.getServiceRoleName())){ - AlertItem alertItem = new AlertItem(); - alertItem.setAlertName(clusterAlertQuota.getAlertQuotaName()); - alertItem.setAlertExpr(clusterAlertQuota.getAlertExpr()+" "+ clusterAlertQuota.getCompareMethod()+" "+clusterAlertQuota.getAlertThreshold()); - alertItem.setClusterId(clusterId); - alertItem.setServiceRoleName(clusterAlertQuota.getServiceRoleName()); - alertItem.setAlertLevel(clusterAlertQuota.getAlertLevel().getDesc()); - alertItem.setAlertAdvice(clusterAlertQuota.getAlertAdvice()); - alertItem.setTriggerDuration(clusterAlertQuota.getTriggerDuration()); - alertItems.add(alertItem); - } - } - if(alertItems.size() > 0){ - FreemakerUtils.generatePromAlertFile(generators,alertItems,serviceEntity.getServiceName()); - } - } - - return Result.success(); - } - /** * 信息 diff --git a/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterServiceInstanceController.java b/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterServiceInstanceController.java index 2afd7038..e010f489 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterServiceInstanceController.java +++ b/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterServiceInstanceController.java @@ -95,10 +95,8 @@ public Result update(@RequestBody ClusterServiceInstanceEntity clusterServiceIns * 删除 */ @RequestMapping("/delete") - public Result delete(@RequestBody Integer[] ids){ - clusterServiceInstanceService.removeByIds(Arrays.asList(ids)); - - return Result.success(); + public Result delete(Integer serviceInstanceIds){ + return clusterServiceInstanceService.delServiceInstance(serviceInstanceIds); } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ServiceExecuteResultActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/ServiceExecuteResultActor.java index 7ded8823..df584b41 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/ServiceExecuteResultActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/ServiceExecuteResultActor.java @@ -7,13 +7,11 @@ import com.datasophon.common.command.SubmitActiveTaskNodeCommand; import com.datasophon.common.enums.ServiceExecuteState; import com.datasophon.common.enums.ServiceRoleType; -import com.datasophon.common.model.DAGGraph; -import com.datasophon.common.model.ServiceExecuteResultMessage; -import com.datasophon.common.model.ServiceNode; -import com.datasophon.common.model.ServiceRoleInfo; +import com.datasophon.common.model.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,23 +35,24 @@ public void onReceive(Object message) throws Throwable { ServiceNode servicNode = dag.getNode(node); if (result.getServiceRoleType().equals(ServiceRoleType.MASTER)) { if (result.getServiceExecuteState().equals(ServiceExecuteState.ERROR)) { - //该节点master角色操作失败,移动到error列表 + //move to error list errorTaskList.put(node, ""); activeTaskList.remove(node); readyToSubmitTaskList.remove(node); completeTaskList.put(node, ""); - //更改指令执行状态,依赖该节点的下游服务指令状态改为取消 - logger.info("{} master roles failed , cancel all next node by hostCommandId {}",node,servicNode.getMasterRoles().get(0).getHostCommandId()); - String hostCommandId = servicNode.getMasterRoles().get(0).getHostCommandId(); - ProcessUtils.updateCommandStateToFailed( hostCommandId); + //cancel all next node + logger.info("{} master roles failed , cancel all next node by commandId {}", node, servicNode.getCommandId()); + List commandIds = new ArrayList(); + listCancelCommand(dag,node,commandIds); + ProcessUtils.updateCommandStateToFailed(commandIds); } else if (result.getServiceExecuteState().equals(ServiceExecuteState.SUCCESS)) { - //该节点master角色指令执行完毕,开始执行worker节点操作 + //submit worker node ServiceNode serviceNode = dag.getNode(node); List elseRoles = serviceNode.getElseRoles(); if (elseRoles.size() > 0) { logger.info("start to submit worker/client roles"); for (ServiceRoleInfo elseRole : serviceNode.getElseRoles()) { - ActorRef serviceActor = ActorUtils.getLocalActor(WorkerServiceActor.class, result.getClusterCode() + "-serviceActor-" + node+"-"+elseRole.getHostname()); + ActorRef serviceActor = ActorUtils.getLocalActor(WorkerServiceActor.class, result.getClusterCode() + "-serviceActor-" + node + "-" + elseRole.getHostname()); ProcessUtils.buildExecuteServiceRoleCommand( result.getClusterId(), result.getCommandType(), @@ -75,7 +74,7 @@ public void onReceive(Object message) throws Throwable { readyToSubmitTaskList.remove(node); } logger.info("start to submit next node"); - tellToSubmitActiveTaskNode(result, dag, activeTaskList, errorTaskList, readyToSubmitTaskList, completeTaskList, submitTaskNodeActor,node); + tellToSubmitActiveTaskNode(result, dag, activeTaskList, errorTaskList, readyToSubmitTaskList, completeTaskList, submitTaskNodeActor, node); } } } else { @@ -83,6 +82,17 @@ public void onReceive(Object message) throws Throwable { } } + public void listCancelCommand(DAGGraph dag, String node, List commandIds) { + if (dag.getSubsequentNodes(node).size() == 0) { + return; + } + Set subsequentNodes = dag.getSubsequentNodes(node); + for (String subsequentNode : subsequentNodes) { + commandIds.add(dag.getNode(subsequentNode).getCommandId()); + listCancelCommand(dag, subsequentNode, commandIds); + } + } + private void tellToSubmitActiveTaskNode(ServiceExecuteResultMessage result, DAGGraph dag, Map activeTaskList, @@ -92,7 +102,7 @@ private void tellToSubmitActiveTaskNode(ServiceExecuteResultMessage result, ActorRef submitTaskNodeActor, String node) { Set subsequentNodes = dag.getSubsequentNodes(node); - logger.info("{}'s subsequent nodes is {}", node , subsequentNodes.toString()); + logger.info("{}'s subsequent nodes is {}", node, subsequentNodes.toString()); for (String subsequentNode : subsequentNodes) { readyToSubmitTaskList.put(subsequentNode, ""); } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/SubmitTaskNodeActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/SubmitTaskNodeActor.java index e63afc58..621630c7 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/SubmitTaskNodeActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/SubmitTaskNodeActor.java @@ -44,6 +44,9 @@ public void onReceive(Object message) throws Throwable { for (String previousNode : previousNodes) { if (errorTaskList.containsKey(previousNode)) { readyToSubmitTaskList.remove(node); + } + if(!completeTaskList.containsKey(previousNode)){ + readyToSubmitTaskList.remove(node); continue; } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java index e8c98b71..dffef4c3 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java @@ -25,5 +25,7 @@ public interface ClusterServiceInstanceService extends IService getObsoleteService(Integer id); List getStoppedRoleInstanceOnHost(Integer clusterId, String hostname, ServiceRoleState state); + + void reomveRoleInstance(Integer serviceInstanceId); } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java index 38f464b1..36373cfd 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java @@ -24,9 +24,11 @@ import com.datasophon.dao.mapper.ClusterServiceInstanceMapper; +import org.springframework.transaction.annotation.Transactional; @Service("clusterServiceInstanceService") +@Transactional public class ClusterServiceInstanceServiceImpl extends ServiceImpl implements ClusterServiceInstanceService { @Autowired @@ -50,9 +52,6 @@ public class ClusterServiceInstanceServiceImpl extends ServiceImpl list = roleInstanceService.getRunningServiceRoleInstanceListByServiceId(serviceInstanceId); + if(list.size() > 0){ + return true; + } + return false; + } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java index 981eafd0..524de08a 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java @@ -254,4 +254,11 @@ public List getStoppedRoleInstanceOnHost(Integ .eq(Constants.HOSTNAME, hostname) .eq(Constants.SERVICE_ROLE_STATE, state)); } + + @Override + public void reomveRoleInstance(Integer serviceInstanceId) { + this.remove(new QueryWrapper() + .eq(Constants.SERVICE_ID,serviceInstanceId) + .eq(Constants.SERVICE_ROLE_STATE,ServiceRoleState.STOP)); + } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java b/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java index 0249d73a..62441969 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java +++ b/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java @@ -151,38 +151,37 @@ public static void saveHostInstallInfo(StartWorkerMessage message, String cluste clusterHostService.save(clusterHostEntity); } - public static void updateCommandStateToFailed(String hostCommandId) { - logger.info("hostCommandId is {}", hostCommandId); - //worker以及下游节点全部取消 - ClusterServiceCommandHostCommandService service = SpringTool.getApplicationContext().getBean(ClusterServiceCommandHostCommandService.class); - ClusterServiceCommandHostCommandEntity hostCommand = service.getByHostCommandId(hostCommandId); - logger.info("hostCommandName is {}", hostCommand.getCommandName()); - ActorRef commandActor = ActorUtils.getLocalActor(ServiceCommandActor.class, "commandActor"); - List hostCommandList = service.getHostCommandListByCommandId(hostCommand.getCommandId()); - for (ClusterServiceCommandHostCommandEntity hostCommandEntity : hostCommandList) { - if (hostCommandEntity.getCommandState() == CommandState.RUNNING && hostCommandEntity.getHostCommandId() != hostCommandId) { - logger.info("{} host command set to failed", hostCommandEntity.getCommandName()); - hostCommandEntity.setCommandState(CommandState.FAILED); - hostCommandEntity.setCommandProgress(100); - service.updateByHostCommandId(hostCommandEntity); - UpdateCommandHostMessage message = new UpdateCommandHostMessage(); - message.setCommandId(hostCommand.getCommandId()); - message.setCommandHostId(hostCommandEntity.getCommandHostId()); - message.setHostname(hostCommandEntity.getHostname()); - if (hostCommand.getServiceRoleType() == RoleType.MASTER) { - message.setServiceRoleType(ServiceRoleType.MASTER); - } else { - message.setServiceRoleType(ServiceRoleType.WORKER); + public static void updateCommandStateToFailed(List commandIds) { + for (String commandId : commandIds) { + logger.info("command id is {}", commandId); + //cancel worker and sub node + ClusterServiceCommandHostCommandService service = SpringTool.getApplicationContext().getBean(ClusterServiceCommandHostCommandService.class); + ActorRef commandActor = ActorUtils.getLocalActor(ServiceCommandActor.class, "commandActor"); + List hostCommandList = service.getHostCommandListByCommandId(commandId); + for (ClusterServiceCommandHostCommandEntity hostCommandEntity : hostCommandList) { + if (hostCommandEntity.getCommandState() == CommandState.RUNNING) { + logger.info("{} host command set to failed", hostCommandEntity.getCommandName()); + hostCommandEntity.setCommandState(CommandState.FAILED); + hostCommandEntity.setCommandProgress(100); + service.updateByHostCommandId(hostCommandEntity); + UpdateCommandHostMessage message = new UpdateCommandHostMessage(); + message.setCommandId(commandId); + message.setCommandHostId(hostCommandEntity.getCommandHostId()); + message.setHostname(hostCommandEntity.getHostname()); + if (hostCommandEntity.getServiceRoleType() == RoleType.MASTER) { + message.setServiceRoleType(ServiceRoleType.MASTER); + } else { + message.setServiceRoleType(ServiceRoleType.WORKER); + } + ActorUtils.actorSystem.scheduler().scheduleOnce( + FiniteDuration.apply(3L, TimeUnit.SECONDS), + commandActor, + message, + ActorUtils.actorSystem.dispatcher(), + ActorRef.noSender()); } - ActorUtils.actorSystem.scheduler().scheduleOnce( - FiniteDuration.apply(3L, TimeUnit.SECONDS), - commandActor, - message, - ActorUtils.actorSystem.dispatcher(), - ActorRef.noSender()); } } - } public static void tellCommandActorResult(String serviceName, ExecuteServiceRoleCommand executeServiceRoleCommand, ServiceExecuteState state) { diff --git a/datasophon-common/src/main/java/com/datasophon/common/utils/FreemakerUtils.java b/datasophon-common/src/main/java/com/datasophon/common/utils/FreemakerUtils.java deleted file mode 100644 index 74de6255..00000000 --- a/datasophon-common/src/main/java/com/datasophon/common/utils/FreemakerUtils.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.datasophon.common.utils; - -import com.datasophon.common.Constants; -import com.datasophon.common.model.AlertItem; -import com.datasophon.common.model.Generators; -import com.datasophon.common.model.ServiceConfig; -import com.sun.org.apache.bcel.internal.Const; -import freemarker.template.Configuration; -import freemarker.template.Template; -import freemarker.template.TemplateException; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - - -public class FreemakerUtils { - - public static void generateConfigFile(Generators generators, List configs, String decompressPackageName) throws IOException, TemplateException { - // 1.加载模板 - // 创建核心配置对象 - Configuration config = new Configuration(Configuration.getVersion()); - // 设置加载的目录 - config.setClassForTemplateLoading(FreemakerUtils.class, "/templates"); // ""代表当前包 - - Map data = new HashMap<>(); - // 得到模板对象 - String configFormat = generators.getConfigFormat(); - Template template = null; - if ("xml".equals(configFormat)) { - template = config.getTemplate("xml.ftl"); - } - if ("properties".equals(configFormat)) { - template = config.getTemplate("properties.ftl"); - } - if ("prometheus".equals(configFormat)) { - template = config.getTemplate("alert.yml"); - } - if ("custom".equals(configFormat)) { - template = config.getTemplate(generators.getTemplateName()); - data = configs.stream().collect(Collectors.toMap(key -> key.getName(), value -> value.getValue())); - } - - data.put("itemList", configs); - // 3.产生输出 - processOut(generators, template, data, decompressPackageName); - } - - public static void generatePromAlertFile(Generators generators, List configs, String serviceName) throws IOException, TemplateException { - // 创建核心配置对象 - Configuration config = new Configuration(Configuration.getVersion()); - // 设置加载的目录 - config.setClassForTemplateLoading(FreemakerUtils.class, "/templates"); // ""代表当前包 - // 得到模板对象 - String configFormat = generators.getConfigFormat(); - Template template = null; - - if ("prometheus".equals(configFormat)) { - template = config.getTemplate("alert.yml"); - } - - Map data = new HashMap<>(); - data.put("itemList", configs); - data.put("serviceName", serviceName); - // 3.产生输出 - processOut(generators, template, data, serviceName); - } - - - public static void generatePromScrapeConfig(Generators generators, List configs, String serviceName) throws IOException, TemplateException { - // 创建核心配置对象 - Configuration config = new Configuration(Configuration.getVersion()); - // 设置加载的目录 - config.setClassForTemplateLoading(FreemakerUtils.class, "/templates"); // ""代表当前包 - // 得到模板对象 - Template template = config.getTemplate("scrape.ftl"); - - Map data = new HashMap<>(); - data.put("itemList", configs); - // 3.产生输出 - processOut(generators, template, data, serviceName); - } - - private static void processOut(Generators generators, Template template, Map data, String decompressPackageName) throws IOException, TemplateException { -// String packagePath = Constants.INSTALL_PATH + Constants.SLASH + decompressPackageName + Constants.SLASH; - if(generators.getOutputDirectory().contains(Constants.COMMA)){ - for (String outPutDir : generators.getOutputDirectory().split(",")) { - FileWriter out = new FileWriter(new File( outPutDir + generators.getFilename())); - template.process(data, out); - out.close(); - } - }else{ - FileWriter out = new FileWriter(new File( generators.getOutputDirectory() + generators.getFilename())); - template.process(data, out); - out.close(); - } - - - } -}