Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-engine-doris] update doris operator instance status by scheduler #666

Merged
merged 4 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.sliew.scaleph.common.dict.common.YesOrNo;
import cn.sliew.scaleph.dao.entity.BaseDO;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
Expand Down Expand Up @@ -66,16 +67,16 @@ public class WsDorisOperatorInstance extends BaseDO {
@TableField("deployed")
private YesOrNo deployed;

@TableField("fe_status")
@TableField(value = "fe_status", updateStrategy = FieldStrategy.IGNORED)
private String feStatus;

@TableField("be_status")
@TableField(value = "be_status", updateStrategy = FieldStrategy.IGNORED)
private String beStatus;

@TableField("cn_status")
@TableField(value = "cn_status", updateStrategy = FieldStrategy.IGNORED)
private String cnStatus;

@TableField("broker_status")
@TableField(value = "broker_status", updateStrategy = FieldStrategy.IGNORED)
private String brokerStatus;

@TableField("remark")
Expand Down
5 changes: 5 additions & 0 deletions scaleph-engine/scaleph-engine-doris/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>scaleph-project</artifactId>
</dependency>

<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>scaleph-workflow-quartz</artifactId>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.doris.action;

import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.engine.doris.operator.status.DorisClusterStatus;
import cn.sliew.scaleph.engine.doris.service.DorisOperatorService;
import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorInstanceService;
import cn.sliew.scaleph.engine.doris.service.dto.WsDorisOperatorInstanceDTO;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;

@Slf4j
@Service
public class DorisOperatorInstanceStatusSyncJob extends AbstractWorkFlow {

@Autowired
private WsDorisOperatorInstanceService wsDorisOperatorInstanceService;
@Autowired
private DorisOperatorService dorisOperatorService;

public DorisOperatorInstanceStatusSyncJob() {
super("DORIS_OPERATOR_INSTANCE_STATUS_SYNC_JOB");
}

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
}

private void process() {
List<Long> ids = wsDorisOperatorInstanceService.listAll();
ids.forEach(this::doProcess);
log.debug("update doris operator instance status success! update size: {}", ids.size());
}

private void doProcess(Long id) {
try {
WsDorisOperatorInstanceDTO instanceDTO = wsDorisOperatorInstanceService.selectOne(id);
Optional<GenericKubernetesResource> optional = dorisOperatorService.get(instanceDTO);
if (optional.isPresent()) {
String json = JacksonUtil.toJsonString(optional.get().get("status"));
DorisClusterStatus status = JacksonUtil.parseJsonString(json, DorisClusterStatus.class);
wsDorisOperatorInstanceService.updateStatus(id, status);
} else {
wsDorisOperatorInstanceService.clearStatus(id);
}
} catch (Exception e) {
log.error("update doris operator instance status error! id: {}", id, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.engine.doris.service;

import cn.sliew.scaleph.engine.doris.operator.DorisCluster;
import cn.sliew.scaleph.engine.doris.operator.status.DorisClusterStatus;
import cn.sliew.scaleph.engine.doris.service.dto.WsDorisOperatorInstanceDTO;
import cn.sliew.scaleph.engine.doris.service.param.WsDorisOperatorInstanceAddParam;
import cn.sliew.scaleph.engine.doris.service.param.WsDorisOperatorInstanceListParam;
Expand All @@ -31,6 +32,8 @@ public interface WsDorisOperatorInstanceService {

Page<WsDorisOperatorInstanceDTO> list(WsDorisOperatorInstanceListParam param);

List<Long> listAll();

WsDorisOperatorInstanceDTO selectOne(Long id);

WsDorisOperatorInstanceDTO fromTemplate(Long templateId);
Expand All @@ -50,4 +53,8 @@ public interface WsDorisOperatorInstanceService {
void apply(Long id);

void shutdown(Long id);

int updateStatus(Long id, DorisClusterStatus status);

int clearStatus(Long id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cn.sliew.scaleph.dao.entity.master.ws.WsDorisOperatorInstance;
import cn.sliew.scaleph.dao.mapper.master.ws.WsDorisOperatorInstanceMapper;
import cn.sliew.scaleph.engine.doris.operator.DorisCluster;
import cn.sliew.scaleph.engine.doris.operator.status.DorisClusterStatus;
import cn.sliew.scaleph.engine.doris.service.DorisOperatorService;
import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorInstanceService;
import cn.sliew.scaleph.engine.doris.service.WsDorisOperatorTemplateService;
Expand All @@ -45,6 +46,7 @@
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.stream.Collectors;

import static cn.sliew.milky.common.check.Ensures.checkState;

Expand Down Expand Up @@ -73,6 +75,15 @@ public Page<WsDorisOperatorInstanceDTO> list(WsDorisOperatorInstanceListParam pa
return result;
}

@Override
public List<Long> listAll() {
LambdaQueryWrapper<WsDorisOperatorInstance> queryWrapper = Wrappers.lambdaQuery(WsDorisOperatorInstance.class)
.eq(WsDorisOperatorInstance::getDeployed, YesOrNo.YES)
.select(WsDorisOperatorInstance::getId);
List<WsDorisOperatorInstance> wsDorisOperatorInstances = wsDorisOperatorInstanceMapper.selectList(queryWrapper);
return wsDorisOperatorInstances.stream().map(WsDorisOperatorInstance::getId).collect(Collectors.toList());
}

@Override
public WsDorisOperatorInstanceDTO selectOne(Long id) {
WsDorisOperatorInstance record = wsDorisOperatorInstanceMapper.selectById(id);
Expand Down Expand Up @@ -146,12 +157,15 @@ public int update(WsDorisOperatorInstanceUpdateParam param) {

@Override
public int deleteById(Long id) {
WsDorisOperatorInstanceDTO instanceDTO = selectOne(id);
checkState(instanceDTO.getDeployed() == YesOrNo.NO, () -> "doris instance already deployed! can't delete");
return wsDorisOperatorInstanceMapper.deleteById(id);
}

@Override
public int deleteBatch(List<Long> ids) {
return wsDorisOperatorInstanceMapper.deleteBatchIds(ids);
ids.forEach(this::deleteById);
return ids.size();
}

@Override
Expand All @@ -164,6 +178,10 @@ public void deploy(Long id) {
}
String yaml = Serialization.asYaml(dorisCluster);
dorisOperatorService.deploy(instanceDTO.getClusterCredentialId(), yaml);
WsDorisOperatorInstance record = new WsDorisOperatorInstance();
record.setId(instanceDTO.getId());
record.setDeployed(YesOrNo.YES);
wsDorisOperatorInstanceMapper.updateById(record);
}

@Override
Expand All @@ -180,5 +198,42 @@ public void shutdown(Long id) {
DorisCluster dorisCluster = asYaml(instanceDTO);
String yaml = Serialization.asYaml(dorisCluster);
dorisOperatorService.shutdown(instanceDTO.getClusterCredentialId(), yaml);
WsDorisOperatorInstance record = new WsDorisOperatorInstance();
record.setId(instanceDTO.getId());
wsDorisOperatorInstanceMapper.updateById(record);
}

@Override
public int updateStatus(Long id, DorisClusterStatus status) {
if (status == null) {
return -1;
}
WsDorisOperatorInstance record = new WsDorisOperatorInstance();
record.setId(id);
if (status.getFeStatus() != null) {
record.setFeStatus(JacksonUtil.toJsonString(status.getFeStatus()));
}
if (status.getBeStatus() != null) {
record.setBeStatus(JacksonUtil.toJsonString(status.getBeStatus()));
}
if (status.getCnStatus() != null) {
record.setCnStatus(JacksonUtil.toJsonString(status.getCnStatus()));
}
if (status.getBrokerStatus() != null) {
record.setBrokerStatus(JacksonUtil.toJsonString(status.getBrokerStatus()));
}
return wsDorisOperatorInstanceMapper.updateById(record);
}

@Override
public int clearStatus(Long id) {
WsDorisOperatorInstance record = new WsDorisOperatorInstance();
record.setId(id);
record.setDeployed(YesOrNo.NO);
record.setFeStatus(null);
record.setBeStatus(null);
record.setCnStatus(null);
record.setBrokerStatus(null);
return wsDorisOperatorInstanceMapper.updateById(record);
}
}
8 changes: 8 additions & 0 deletions scaleph-ui-react/src/locales/zh-CN/pages/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,14 @@ export default {

'pages.project.doris.instance.detail': '实例详情',
'pages.project.doris.instance.detail.component': '集群组件',
'pages.project.doris.instance.detail.component.replicas': '全部',
'pages.project.doris.instance.detail.component.creating': '创建中',
'pages.project.doris.instance.detail.component.running': '运行中',
'pages.project.doris.instance.detail.component.failed': '已失败',
'pages.project.doris.instance.detail.component.fe': 'FE节点',
'pages.project.doris.instance.detail.component.be': 'BE节点',
'pages.project.doris.instance.detail.component.cn': '计算节点',
'pages.project.doris.instance.detail.component.broker': 'Broker节点',
'pages.project.doris.instance.detail.access': '连接信息',
'pages.project.doris.instance.detail.yaml': 'YAML',
'pages.project.doris.instance.detail.deploy': 'Deploy',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const DorisInstanceDetailAction: React.FC<{ data: WsDorisOperatorInstance }> = (
<div>
<Popconfirm
title={intl.formatMessage({id: 'app.common.operate.submit.confirm.title'})}
disabled={data.deployed?.value == '1'}
onConfirm={() => {
WsDorisOperatorInstanceService.deploy(data.id).then(response => {
if (response.success) {
Expand All @@ -23,14 +24,19 @@ const DorisInstanceDetailAction: React.FC<{ data: WsDorisOperatorInstance }> = (
})
}}
>
<Button type="default" icon={<CaretRightOutlined/>}>
<Button
type="default"
icon={<CaretRightOutlined/>}
disabled={data.deployed?.value == '1'}
>
{intl.formatMessage({id: 'pages.project.doris.instance.detail.deploy'})}
</Button>
</Popconfirm>


<Popconfirm
title={intl.formatMessage({id: 'app.common.operate.submit.confirm.title'})}
disabled={data.deployed?.value == '0'}
onConfirm={() => {
WsDorisOperatorInstanceService.shutdown(data.id).then(response => {
if (response.success) {
Expand All @@ -39,7 +45,10 @@ const DorisInstanceDetailAction: React.FC<{ data: WsDorisOperatorInstance }> = (
})
}}
>
<Button icon={<CloseOutlined/>}>
<Button
icon={<CloseOutlined/>}
disabled={data.deployed?.value == '0'}
>
{intl.formatMessage({id: 'pages.project.doris.instance.detail.shutdown'})}
</Button>
</Popconfirm>
Expand Down
Loading