Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-11652][2022Q4 RoadMap] DS support remote SSH task. #12736

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/docs/en/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,25 @@ Location: `worker-server/conf/application.yaml`
|worker.registry-disconnect-strategy.max-waiting-time|100s|Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will wait infinitely |
|worker.task-execute-threads-full-policy|REJECT|If REJECT, when the task waiting in the worker reaches exec-threads, it will reject the received task and the Master will redispatch it; If CONTINUE, it will put the task into the worker's execution queue and wait for a free thread to start execution|

## SSH Session相关配置

位置:`worker-server/conf/application.yaml`

| Parameter | Default Value | Description |
|-------------------------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------|
| ssh.session.pool.maxTotal | 20 | The maximum number of active ssh session instances in the pool. |
| ssh.session.pool.maxTotalPerKey | 10 | The maximum number of active ssh session of per ssh host & account in the pool. |
| ssh.session.pool.maxIdlePerKey | 4 | The maximum number of sleeping instances of per ssh host in the pool. |
| ssh.session.pool.blockWhenExhausted | true | When borrow an ssh session, whether to block waiting for a session when the pool is exhausted |
| ssh.session.pool.maxWaitDuration | 500ms | The maximum waiting time for the caller when the pool session is exhausted, it will be ignored if `blockWhenExhausted` is `false` |
| ssh.session.pool.minEvictableIdleDuration | 60s | The minimum time to allow idle session to live (or not be evicted) |
| ssh.session.pool.durationBetweenEvictionRuns | 60s | The interval at which the eviction thread pool scans the pool for free sessions |
| ssh.session.pool.removeAbandonedOnBorrow | true | Whether borrowObject performs abandoned session removal |
| ssh.session.pool.removeAbandonedTimeoutDuration | 30s | How long the object has been unused and returned after it was lent out and used for the last time is considered a leak |
| ssh.session.sftp.enableUploadMonitor | true | enable upload monitor thread when upload files to remote host |
| ssh.session.sftp.maxUploadRate | 256 | max upload rate (KB), if negative, will not limit |
| ssh.session.sftp.maxFileSize | 100 | max file size (MB), if negative, will not limit |

### Alert Server related configuration

Location: `alert-server/conf/application.yaml`
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/guide/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,14 @@ Create a task node in the workflow definition, select the worker group and the e

![create-environment](../../../img/new_ui/dev/security/create-namespace.png)

## Task Remote Host Management

> Usage task remote host

- Create a task node in the workflow definition, select the task remote host. When executing the task, the Worker will SSH to the remote host to execute the task.

> Add or update the task remote host

- After configuring the relevant information of the remote host, you can use the Test button to test whether the current host can execute remote SSH. Because the host that can SSH remotely is valid.

![create-task-remote-host](../../../img/new_ui/dev/security/create-task-remote-host.png)
19 changes: 19 additions & 0 deletions docs/docs/zh/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,25 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
|worker.registry-disconnect-strategy.max-waiting-time|100s|当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
|worker.task-execute-threads-full-policy|REJECT|如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时, Worker将会拒绝接下来新接收的任务,Master将会重新分发该任务; 如果是 CONTINUE, Worker将会接收任务,放入等待队列中等待空闲线程去执行该任务|

## SSH Session相关配置

位置:`worker-server/conf/application.yaml`

| 参数 | 默认值 | 描述 |
|-------------------------------------------------|-------|--------------------------------------------------------------------------------------------------|
| ssh.session.pool.maxTotal | 20 | Worker能够同时创建SSH Session的最大数量,当对象池满,新请求将阻塞或直接拒绝 |
| ssh.session.pool.maxTotalPerKey | 10 | Worker能够对单一主机 + 用户创建SSH Session的最大数量。当该主机+用户创建的SSH Session满,则Worker对该主机+用户的SSH ession请求将阻塞或直接拒绝。 |
| ssh.session.pool.maxIdlePerKey | 4 | Worker能够对单一主机 + 用户创建SSH Session的最大空闲数量。当释放的主机+用户创建的SSH Session超过该值,则Worker将直接销毁新收回的SSH Session。 |
| ssh.session.pool.blockWhenExhausted | true | 当对象池满,是否阻塞新请求。true为阻塞,false则为直接异常返回。推荐使用阻塞策略。 |
| ssh.session.pool.maxWaitDuration | 500ms | 该值仅当`blockWhenExhausted`设置为`true`时生效。表示请求最大阻塞时长。建议该值必须设置 |
| ssh.session.pool.minEvictableIdleDuration | 60s | 允许空闲SSH Session的最大存活时间。假如空闲SSH Session超过该值未被使用,将被驱逐 |
| ssh.session.pool.durationBetweenEvictionRuns | 60s | 检测是否驱逐空闲SSH Session的周期 |
| ssh.session.pool.removeAbandonedOnBorrow | true | 当每次从池子中借取SSH Session时,去扫描可以被丢弃的SSH Session |
| ssh.session.pool.removeAbandonedTimeoutDuration | 30s | SSH Session借出去最后一次使用后,多长时间未使用和归还就认为是泄漏 |
| ssh.session.sftp.enableUploadMonitor | true | 当通过SFTP上传文件时,是否开启进度监控 |
| ssh.session.sftp.maxUploadRate | 256 | SFTP的最大速率,单位为KB,若为负数则不限制 |
| ssh.session.sftp.maxFileSize | 100 | 单任务能够SFTP的最大文件大小,单位为MB,若为负数则不限制 |

## Alert Server相关配置

位置:`alert-server/conf/application.yaml`
Expand Down
12 changes: 12 additions & 0 deletions docs/docs/zh/guide/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,15 @@ worker:
- 创建和授权后,在相关k8s任务选择命名空间时下拉可选,如果k8s集群名字是`ds_null_k8s`是测试模式,不会真正操作集群.

![create-environment](../../../img/new_ui/dev/security/create-namespace.png)

## 任务远程主机管理

> 使用任务远程主机

- 在工作流定义中创建任务节点选择远程主机,任务将通过SSH到远程主机上执行任务.

> 创建/更新任务远程主机

- 配置远程主机的相关信息之后,可以通过测试按钮测试当前主机是否能够执行远程SSH。因为能够远程SSH的主机才是有效的.

![create-task-remote-host](../../../img/new_ui/dev/security/create-task-remote-host.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ public class ApiFuncIdentificationConstant {

public static final String MONITOR_STATISTICS_VIEW = "monitor:statistics:view";

public static final String TASK_REMOTE_HOST_VIEW = ("security:task-remote-host:view");

public static final String TASK_REMOTE_HOST_CREATE = ("security:task-remote-host:create");

public static final String TASK_REMOTE_HOST_EDIT = ("security:task-remote-host:update");

public static final String TASK_REMOTE_HOST_DELETE = ("security:task-remote-host:delete");

public final static Map<ExecuteType, String> map = new HashMap<ExecuteType, String>();

static {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.controller;

import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_REMOTE_HOST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_REMOTE_HOST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_REMOTE_HOST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_REMOTE_HOST_PAGE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.SUCCESS;
import static org.apache.dolphinscheduler.api.enums.Status.TEST_CONNECT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_REMOTE_HOST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.VARIFY_TASK_REMOTE_HOST_ERROR;

import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.TaskRemoteHostDTO;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskRemoteHostService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.vo.TaskRemoteHostVO;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.User;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;

/**
* task remote host controller
*/
@Tag(name = "TASK_REMOTE_HOST_TAG")
@RestController
@RequestMapping("/remote_host")
public class TaskRemoteHostController {

@Autowired
private TaskRemoteHostService taskRemoteHostService;

@Operation(summary = "createTaskRemoteHost", description = "CREATE_TASK_REMOTE_HOST_NOTES")
@PostMapping()
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_TASK_REMOTE_HOST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result createTaskRemoteHost(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestBody TaskRemoteHostDTO taskRemoteHostDTO) {
int result = taskRemoteHostService.createTaskRemoteHost(loginUser, taskRemoteHostDTO);
return result > 0 ? Result.success() : Result.error(CREATE_TASK_REMOTE_HOST_ERROR);
}

@Operation(summary = "updateTaskRemoteHost", description = "UPDATE_TASK_REMOTE_HOST_NOTES")
@PutMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_TASK_REMOTE_HOST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateTaskRemoteHost(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("code") Long code,
@RequestBody TaskRemoteHostDTO taskRemoteHostDTO) {
int result = taskRemoteHostService.updateTaskRemoteHost(code, loginUser, taskRemoteHostDTO);
return result > 0 ? Result.success() : Result.error(UPDATE_TASK_REMOTE_HOST_ERROR);
}

@Operation(summary = "delete", description = "DELETE_TASK_REMOTE_HOST_NOTES")
@Parameters({
@Parameter(name = "code", description = "TASK_REMOTE_HOST_CODE", schema = @Schema(implementation = long.class, example = "123456", required = true))
})
@DeleteMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_TASK_REMOTE_HOST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteTaskRemoteHost(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("code") Long code) {
int result = taskRemoteHostService.deleteByCode(code, loginUser);
return result > 0 ? Result.success() : Result.error(DELETE_TASK_REMOTE_HOST_ERROR);
}

@Operation(summary = "queryTaskRemoteHostListPaging", description = "QUERY_TASK_REMOTE_HOST_PAGE_NOTES")
@Parameters({
@Parameter(name = "searchVal", description = "SEARCH_VAL", schema = @Schema(implementation = String.class)),
@Parameter(name = "pageSize", description = "PAGE_SIZE", required = true, schema = @Schema(implementation = int.class, example = "20")),
@Parameter(name = "pageNo", description = "PAGE_NO", required = true, schema = @Schema(implementation = int.class, example = "1"))
})
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_REMOTE_HOST_PAGE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<PageInfo<TaskRemoteHostVO>> queryTaskRemoteHostListPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageSize") Integer pageSize,
@RequestParam("pageNo") Integer pageNo) {
PageInfo<TaskRemoteHostVO> taskRemoteHostVOPageInfo =
taskRemoteHostService.queryTaskRemoteHostListPaging(loginUser, searchVal, pageNo, pageSize);
return Result.success(taskRemoteHostVOPageInfo);
}

@Operation(summary = "queryTaskRemoteHostList", description = "QUERY_ALL_TASK_REMOTE_HOST_LIST_NOTES")
@GetMapping(value = "/query-remote-host-list")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_REMOTE_HOST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<List<TaskRemoteHostVO>> queryTaskRemoteHostList(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
List<TaskRemoteHostVO> taskRemoteHostVOList = taskRemoteHostService.queryAllTaskRemoteHosts(loginUser);
return Result.success(taskRemoteHostVOList);
}

@Operation(summary = "textConnect", description = "TEXT_CONNECT_HOST_NOTES")
@PostMapping(value = "/test-connect")
@ResponseStatus(HttpStatus.OK)
@ApiException(TEST_CONNECT_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result testConnect(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'loginUser' is never used.
@RequestBody TaskRemoteHostDTO taskRemoteHostDTO) {
boolean result = taskRemoteHostService.testConnect(taskRemoteHostDTO);
return result ? Result.success(SUCCESS) : Result.error(TEST_CONNECT_ERROR);
}

@Operation(summary = "verifyTaskRemoteHost", description = "VERIFY_TASK_REMOTE_HOST_NOTES")
@Parameters({
@Parameter(name = "taskRemoteHostName", description = "TASK_REMOTE_HOST_NAME", required = true, schema = @Schema(implementation = String.class))
})
@PostMapping(value = "/verify-host")
@ResponseStatus(HttpStatus.OK)
@ApiException(VARIFY_TASK_REMOTE_HOST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result verifyTaskRemoteHost(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'loginUser' is never used.
@RequestParam(value = "taskRemoteHostName") String taskRemoteHostName) {
boolean result = taskRemoteHostService.verifyTaskRemoteHost(taskRemoteHostName);
return result ? Result.success() : Result.error(VARIFY_TASK_REMOTE_HOST_ERROR);
}

}
Loading