diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 9ff7011f93..d16a7b9660 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -146,6 +146,12 @@ jobs: class: org.apache.streampark.e2e.cases.Flink117OnYarnClusterDeployTest - name: Flink118OnYarnClusterDeployTest class: org.apache.streampark.e2e.cases.Flink118OnYarnClusterDeployTest + - name: FlinkSQL116OnYarnTest + class: org.apache.streampark.e2e.cases.FlinkSQL116OnYarnTest + - name: FlinkSQL117OnYarnTest + class: org.apache.streampark.e2e.cases.FlinkSQL117OnYarnTest + - name: FlinkSQL118OnYarnTest + class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest env: RECORDING_PATH: /tmp/recording-${{ matrix.case.name }} steps: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java index 57d8b53419..cacdc81f1a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java @@ -18,6 +18,7 @@ package org.apache.streampark.console.base.util; import org.apache.streampark.common.conf.ConfigKeys; +import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.commons.lang3.StringUtils; @@ -90,4 +91,9 @@ public static File getAppClientDir() { public static File getPluginDir() { return getAppDir(PLUGINS); } + + public static boolean isHaEnable() { + return SystemPropertyUtils.getBoolean("high-availability.enable", false); + } + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/AlertConfigMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/AlertConfigMapper.java index d87349423d..0040d550a5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/AlertConfigMapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/AlertConfigMapper.java @@ -19,11 +19,8 @@ import org.apache.streampark.console.core.entity.AlertConfig; -import org.apache.ibatis.annotations.Param; - import com.baomidou.mybatisplus.core.mapper.BaseMapper; public interface AlertConfigMapper extends BaseMapper { - AlertConfig selectAlertConfByName(@Param("alertConfig") AlertConfig alertConfig); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index ebcaef12fb..e4300cc648 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -116,8 +116,7 @@ private void initConfig() { } private void initRegistryService() { - boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); - if (enable) { + if (WebUtils.isHaEnable()) { RegistryService registryService = SpringContextUtils.getBean(RegistryService.class); registryService.registry(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java index 0a60c4c1d2..8f8fd36a34 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java @@ -61,7 +61,7 @@ public interface DistributedTaskService extends IService { * @param appId Long * @return boolean */ - public boolean isLocalProcessing(Long appId); + boolean isLocalProcessing(Long appId); /** * Save Distributed Task. @@ -70,5 +70,5 @@ public interface DistributedTaskService extends IService { * @param autoStart boolean * @param action It may be one of the following values: START, RESTART, REVOKE, CANCEL, ABORT */ - public void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action); + void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java index 75280e3640..f6dcaf7176 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java @@ -54,12 +54,9 @@ public class AlertConfigServiceImpl extends ServiceImpl page(Long userId, RestRequest request) { // build query conditions - LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); - wrapper.eq(userId != null, AlertConfig::getUserId, userId); - Page page = MybatisPager.getPage(request); - IPage resultPage = getBaseMapper().selectPage(page, wrapper); - + IPage resultPage = + this.lambdaQuery().eq(userId != null, AlertConfig::getUserId, userId).page(page); Page result = new Page<>(); if (CollectionUtils.isNotEmpty(resultPage.getRecords())) { result.setRecords( @@ -71,8 +68,7 @@ public IPage page(Long userId, RestRequest request) { @Override public boolean exist(AlertConfig alertConfig) { - AlertConfig confByName = this.baseMapper.selectAlertConfByName(alertConfig); - return confByName != null; + return this.lambdaQuery().eq(AlertConfig::getAlertName, alertConfig.getAlertName()).exists(); } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java index aa344b1431..f8ccdac432 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java @@ -35,7 +35,6 @@ import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; import org.apache.streampark.console.core.service.application.FlinkApplicationManageService; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -70,9 +69,7 @@ public class FlinkApplicationBackupServiceImpl @Override public IPage getPage(FlinkApplicationBackup bakParam, RestRequest request) { Page page = MybatisPager.getPage(request); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(FlinkApplicationBackup::getAppId, bakParam.getAppId()); - return this.baseMapper.selectPage(page, queryWrapper); + return this.lambdaQuery().eq(FlinkApplicationBackup::getAppId, bakParam.getAppId()).page(page); } @Override @@ -135,11 +132,12 @@ public void rollback(FlinkApplicationBackup bakParam) { public void revoke(FlinkApplication appParam) { Page page = new Page<>(); page.setCurrent(0).setSize(1).setSearchCount(false); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(FlinkApplicationBackup::getAppId, appParam.getId()) - .orderByDesc(FlinkApplicationBackup::getCreateTime); - Page backUpPages = baseMapper.selectPage(page, queryWrapper); + Page backUpPages = this.lambdaQuery().eq( + FlinkApplicationBackup::getAppId, + appParam.getId()) + .orderByDesc(FlinkApplicationBackup::getCreateTime).page(page); + if (!backUpPages.getRecords().isEmpty()) { FlinkApplicationBackup backup = backUpPages.getRecords().get(0); String path = backup.getPath(); @@ -151,9 +149,7 @@ public void revoke(FlinkApplication appParam) { @Override public void remove(FlinkApplication appParam) { try { - baseMapper.delete( - new LambdaQueryWrapper() - .eq(FlinkApplicationBackup::getAppId, appParam.getId())); + this.lambdaUpdate().eq(FlinkApplicationBackup::getAppId, appParam.getId()).remove(); appParam .getFsOperator() .delete( @@ -169,10 +165,8 @@ public void remove(FlinkApplication appParam) { @Override public void rollbackFlinkSql(FlinkApplication appParam, FlinkSql flinkSqlParam) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(FlinkApplicationBackup::getAppId, appParam.getId()) - .eq(FlinkApplicationBackup::getSqlId, flinkSqlParam.getId()); - FlinkApplicationBackup backUp = baseMapper.selectOne(queryWrapper); + FlinkApplicationBackup backUp = this.lambdaQuery().eq(FlinkApplicationBackup::getAppId, appParam.getId()) + .eq(FlinkApplicationBackup::getSqlId, flinkSqlParam.getId()).one(); ApiAlertException.throwIfNull( backUp, "Application backup can't be null. Rollback flink sql failed."); // rollback config and sql diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java index fcd6008cc1..db30f140e5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java @@ -85,7 +85,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.core.JsonProcessingException; import com.github.benmanes.caffeine.cache.Cache; @@ -631,10 +630,8 @@ public Map listAppIdPipelineStatusMap(List appId if (CollectionUtils.isEmpty(appIds)) { return new HashMap<>(); } - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .in(ApplicationBuildPipeline::getAppId, appIds); - - List appBuildPipelines = baseMapper.selectList(queryWrapper); + List appBuildPipelines = + this.lambdaQuery().in(ApplicationBuildPipeline::getAppId, appIds).list(); if (CollectionUtils.isEmpty(appBuildPipelines)) { return new HashMap<>(); } @@ -644,8 +641,7 @@ public Map listAppIdPipelineStatusMap(List appId @Override public void removeByAppId(Long appId) { - baseMapper.delete( - new LambdaQueryWrapper().eq(ApplicationBuildPipeline::getAppId, appId)); + this.lambdaUpdate().eq(ApplicationBuildPipeline::getAppId, appId).remove(); } /** diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java index 2f8a02dbce..b4b79e9f63 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java @@ -30,10 +30,7 @@ import org.apache.streampark.console.core.service.FlinkEffectiveService; import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; @@ -92,13 +89,10 @@ public synchronized void create(FlinkApplication appParam, Boolean latest) { } public void setLatest(Long appId, Long configId) { - LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(); - updateWrapper.set(FlinkApplicationConfig::getLatest, false).eq(FlinkApplicationConfig::getAppId, appId); - this.update(updateWrapper); - - updateWrapper.clear(); - updateWrapper.set(FlinkApplicationConfig::getLatest, true).eq(FlinkApplicationConfig::getId, configId); - this.update(updateWrapper); + this.lambdaUpdate().set(FlinkApplicationConfig::getLatest, false).eq(FlinkApplicationConfig::getAppId, appId) + .update(); + this.lambdaUpdate().set(FlinkApplicationConfig::getLatest, true).eq(FlinkApplicationConfig::getId, configId) + .update(); } @Override @@ -187,9 +181,8 @@ public void setLatestOrEffective(Boolean latest, Long configId, Long appId) { @Override public void toEffective(Long appId, Long configId) { - LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(); - updateWrapper.eq(FlinkApplicationConfig::getAppId, appId).set(FlinkApplicationConfig::getLatest, false); - this.update(updateWrapper); + this.lambdaUpdate().eq(FlinkApplicationConfig::getAppId, appId).set(FlinkApplicationConfig::getLatest, false) + .update(); effectiveService.saveOrUpdate(appId, EffectiveTypeEnum.CONFIG, configId); } @@ -225,11 +218,8 @@ public IPage getPage(FlinkApplicationConfig config, Rest @Override public List list(Long appId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(FlinkApplicationConfig::getAppId, appId) - .orderByDesc(FlinkApplicationConfig::getVersion); - - List configList = this.baseMapper.selectList(queryWrapper); + List configList = this.lambdaQuery().eq(FlinkApplicationConfig::getAppId, appId) + .orderByDesc(FlinkApplicationConfig::getVersion).list(); fillEffectiveField(appId, configList); return configList; } @@ -237,9 +227,8 @@ public List list(Long appId) { @Override public synchronized String readTemplate() { if (flinkConfTemplate == null) { - try { - Resource resource = resourceLoader.getResource("classpath:flink-application.conf"); - Scanner scanner = new Scanner(resource.getInputStream()); + Resource resource = resourceLoader.getResource("classpath:flink-application.conf"); + try (Scanner scanner = new Scanner(resource.getInputStream())) { StringBuilder stringBuffer = new StringBuilder(); while (scanner.hasNextLine()) { stringBuffer.append(scanner.nextLine()).append(System.lineSeparator()); @@ -257,8 +246,7 @@ public synchronized String readTemplate() { @Override public void removeByAppId(Long appId) { - baseMapper.delete( - new LambdaQueryWrapper().eq(FlinkApplicationConfig::getAppId, appId)); + this.lambdaUpdate().eq(FlinkApplicationConfig::getAppId, appId).remove(); } private void fillEffectiveField(Long id, List configList) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java index 6927945973..d602eb7f4e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationInfoServiceImpl.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; @@ -251,14 +250,12 @@ public boolean checkAlter(FlinkApplication appParam) { @Override public boolean existsByTeamId(Long teamId) { - return baseMapper.exists( - new LambdaQueryWrapper().eq(FlinkApplication::getTeamId, teamId)); + return this.lambdaQuery().eq(FlinkApplication::getTeamId, teamId).exists(); } @Override public boolean existsByUserId(Long userId) { - return baseMapper.exists( - new LambdaQueryWrapper().eq(FlinkApplication::getUserId, userId)); + return this.lambdaQuery().eq(FlinkApplication::getUserId, userId).exists(); } @Override @@ -273,17 +270,13 @@ public boolean existsRunningByClusterId(Long clusterId) { @Override public boolean existsByClusterId(Long clusterId) { - return baseMapper.exists( - new LambdaQueryWrapper().eq(FlinkApplication::getFlinkClusterId, clusterId)); + return this.lambdaQuery().eq(FlinkApplication::getFlinkClusterId, clusterId).exists(); } @Override public Integer countByClusterId(Long clusterId) { - return baseMapper - .selectCount( - new LambdaQueryWrapper().eq(FlinkApplication::getFlinkClusterId, - clusterId)) - .intValue(); + return this.lambdaQuery().eq(FlinkApplication::getFlinkClusterId, + clusterId).count().intValue(); } @Override @@ -293,8 +286,7 @@ public Integer countAffectedByClusterId(Long clusterId, String dbType) { @Override public boolean existsByFlinkEnvId(Long flinkEnvId) { - return baseMapper.exists( - new LambdaQueryWrapper().eq(FlinkApplication::getVersionId, flinkEnvId)); + return this.lambdaQuery().eq(FlinkApplication::getVersionId, flinkEnvId).exists(); } @Override @@ -434,8 +426,8 @@ public AppExistsStateEnum checkExists(FlinkApplication appParam) { return AppExistsStateEnum.INVALID; } - FlinkApplication application = baseMapper.selectOne( - new LambdaQueryWrapper().eq(FlinkApplication::getJobName, jobName)); + FlinkApplication application = this.lambdaQuery().eq(FlinkApplication::getJobName, jobName).one(); + if (application != null && !application.getId().equals(appParamId)) { return AppExistsStateEnum.IN_DB; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java index 2e84467755..c6987825a0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java @@ -66,11 +66,9 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.support.SFunction; +import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.google.common.annotations.VisibleForTesting; @@ -317,10 +315,8 @@ private AppControl getAppControl(FlinkApplication record) { @Override public void changeOwnership(Long userId, Long targetUserId) { - LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper() - .eq(FlinkApplication::getUserId, userId) - .set(FlinkApplication::getUserId, targetUserId); - this.baseMapper.update(null, updateWrapper); + this.lambdaUpdate().eq(FlinkApplication::getUserId, userId) + .set(FlinkApplication::getUserId, targetUserId).update(); } @SneakyThrows @@ -375,8 +371,7 @@ public boolean create(FlinkApplication appParam) { } private boolean existsByJobName(String jobName) { - return baseMapper.exists( - new LambdaQueryWrapper().eq(FlinkApplication::getJobName, jobName)); + return this.lambdaQuery().eq(FlinkApplication::getJobName, jobName).exists(); } @SuppressWarnings("checkstyle:WhitespaceAround") @@ -669,14 +664,12 @@ private void updateFlinkSqlJob(FlinkApplication application, FlinkApplication ap @Override public void updateRelease(FlinkApplication appParam) { - LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(); - updateWrapper.eq(FlinkApplication::getId, appParam.getId()); - updateWrapper.set(FlinkApplication::getRelease, appParam.getRelease()); - updateWrapper.set(FlinkApplication::getBuild, appParam.getBuild()); - if (appParam.getOptionState() != null) { - updateWrapper.set(FlinkApplication::getOptionState, appParam.getOptionState()); - } - this.update(updateWrapper); + this.lambdaUpdate() + .eq(FlinkApplication::getId, appParam.getId()) + .set(FlinkApplication::getRelease, appParam.getRelease()) + .set(FlinkApplication::getBuild, appParam.getBuild()) + .set(appParam.getOptionState() != null, FlinkApplication::getOptionState, appParam.getOptionState()) + .update(); } @Override @@ -693,31 +686,31 @@ public List listByTeamId(Long teamId) { public List listByTeamIdAndDeployModes( Long teamId, @Nonnull Collection deployModeEnums) { - return getBaseMapper() - .selectList( - new LambdaQueryWrapper() - .eq((SFunction) FlinkApplication::getTeamId, - teamId) - .in( - FlinkApplication::getDeployMode, - deployModeEnums.stream() - .map(FlinkDeployMode::getMode) - .collect(Collectors.toSet()))); + return this.lambdaQuery() + .eq((SFunction) FlinkApplication::getTeamId, + teamId) + .in( + FlinkApplication::getDeployMode, + deployModeEnums.stream() + .map(FlinkDeployMode::getMode) + .collect(Collectors.toSet())) + .list(); } @Override public boolean checkBuildAndUpdate(FlinkApplication appParam) { boolean build = appParam.getBuild(); if (!build) { - LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(); - updateWrapper.eq(FlinkApplication::getId, appParam.getId()); + LambdaUpdateChainWrapper update = this.lambdaUpdate() + .eq(FlinkApplication::getId, appParam.getId()); if (appParam.isRunning()) { - updateWrapper.set(FlinkApplication::getRelease, ReleaseStateEnum.NEED_RESTART.get()); + update.set(FlinkApplication::getRelease, ReleaseStateEnum.NEED_RESTART.get()); } else { - updateWrapper.set(FlinkApplication::getRelease, ReleaseStateEnum.DONE.get()); - updateWrapper.set(FlinkApplication::getOptionState, OptionStateEnum.NONE.getValue()); + update + .set(FlinkApplication::getRelease, ReleaseStateEnum.DONE.get()) + .set(FlinkApplication::getOptionState, OptionStateEnum.NONE.getValue()); } - this.update(updateWrapper); + this.update(update); // backup if (appParam.isFlinkSqlJob()) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java index b569d53478..9811743b5c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java @@ -69,10 +69,8 @@ public class SparkApplicationBackupServiceImpl @Override public IPage getPage(SparkApplicationBackup bakParam, RestRequest request) { - Page page = MybatisPager.getPage(request); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(SparkApplicationBackup::getAppId, bakParam.getAppId()); - return this.baseMapper.selectPage(page, queryWrapper); + return this.lambdaQuery().eq(SparkApplicationBackup::getAppId, bakParam.getAppId()) + .page(MybatisPager.getPage(request)); } @Override @@ -135,11 +133,9 @@ public void rollback(SparkApplicationBackup bakParam) { public void revoke(SparkApplication appParam) { Page page = new Page<>(); page.setCurrent(0).setSize(1).setSearchCount(false); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(SparkApplicationBackup::getAppId, appParam.getId()) - .orderByDesc(SparkApplicationBackup::getCreateTime); - - Page backUpPages = baseMapper.selectPage(page, queryWrapper); + Page backUpPages = + this.lambdaQuery().eq(SparkApplicationBackup::getAppId, appParam.getId()) + .orderByDesc(SparkApplicationBackup::getCreateTime).page(page); if (!backUpPages.getRecords().isEmpty()) { SparkApplicationBackup backup = backUpPages.getRecords().get(0); String path = backup.getPath(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java index fb72212747..d870bd2d14 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java @@ -489,10 +489,8 @@ public Map listAppIdPipelineStatusMap(List appId if (CollectionUtils.isEmpty(appIds)) { return new HashMap<>(); } - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .in(ApplicationBuildPipeline::getAppId, appIds); - - List appBuildPipelines = baseMapper.selectList(queryWrapper); + List appBuildPipelines = + this.lambdaQuery().in(ApplicationBuildPipeline::getAppId, appIds).list(); if (CollectionUtils.isEmpty(appBuildPipelines)) { return new HashMap<>(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java index 592d8ab2f7..4886d0baa7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java @@ -190,9 +190,8 @@ public void setLatestOrEffective(Boolean latest, Long configId, Long appId) { @Override public void toEffective(Long appId, Long configId) { - LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(); - updateWrapper.eq(SparkApplicationConfig::getAppId, appId).set(SparkApplicationConfig::getLatest, false); - this.update(updateWrapper); + this.lambdaUpdate().eq(SparkApplicationConfig::getAppId, appId).set(SparkApplicationConfig::getLatest, false) + .update(); effectiveService.saveOrUpdate(appId, EffectiveTypeEnum.SPARKCONFIG, configId); } @@ -228,11 +227,9 @@ public IPage getPage(SparkApplicationConfig config, Rest @Override public List list(Long appId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + List configList = this.lambdaQuery() .eq(SparkApplicationConfig::getAppId, appId) - .orderByDesc(SparkApplicationConfig::getVersion); - - List configList = this.baseMapper.selectList(queryWrapper); + .orderByDesc(SparkApplicationConfig::getVersion).list(); fillEffectiveField(appId, configList); return configList; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java index 4ec68298b5..0bf25adbc8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java @@ -60,7 +60,6 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; -import com.baomidou.mybatisplus.core.toolkit.support.SFunction; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.google.common.annotations.VisibleForTesting; @@ -580,16 +579,14 @@ public List listByTeamId(Long teamId) { public List listByTeamIdAndDeployModes( Long teamId, Collection deployModeEnums) { - return getBaseMapper() - .selectList( - new LambdaQueryWrapper() - .eq((SFunction) SparkApplication::getTeamId, - teamId) - .in( - SparkApplication::getDeployMode, - deployModeEnums.stream() - .map(SparkDeployMode::getMode) - .collect(Collectors.toSet()))); + + return this.lambdaQuery().eq(SparkApplication::getTeamId, teamId) + .in( + SparkApplication::getDeployMode, + deployModeEnums.stream() + .map(SparkDeployMode::getMode) + .collect(Collectors.toSet())) + .list(); } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java index fa5f56ab1c..7d33973e3a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java @@ -20,6 +20,7 @@ import org.apache.streampark.console.base.mybatis.entity.BaseEntity; import org.apache.streampark.console.base.util.ConsistentHash; import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.bean.FlinkTaskItem; import org.apache.streampark.console.core.bean.SparkTaskItem; import org.apache.streampark.console.core.entity.DistributedTask; @@ -199,6 +200,9 @@ public void removeServer(String serverId) { */ @Override public boolean isLocalProcessing(Long appId) { + if (!WebUtils.isHaEnable()) { + return true; + } return consistentHash.get(appId).equals(serverId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 5f18bf3648..70f1a480d6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -298,14 +298,13 @@ public Boolean existsByFlinkEnvId(Long flinkEnvId) { @Override public List listByDeployModes( Collection deployModeEnums) { - return getBaseMapper() - .selectList( - new LambdaQueryWrapper() - .in( - FlinkCluster::getDeployMode, - deployModeEnums.stream() - .map(FlinkDeployMode::getMode) - .collect(Collectors.toSet()))); + + return this.lambdaQuery().in( + FlinkCluster::getDeployMode, + deployModeEnums.stream() + .map(FlinkDeployMode::getMode) + .collect(Collectors.toSet())) + .list(); } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java index 84e1620e34..e1fbaf7f0e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java @@ -138,8 +138,7 @@ public FlinkEnv getByAppId(Long appId) { @Override public FlinkEnv getDefault() { - return this.baseMapper.selectOne( - new LambdaQueryWrapper().eq(FlinkEnv::getIsDefault, true)); + return this.lambdaQuery().eq(FlinkEnv::getIsDefault, true).one(); } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java index 3a93e42f73..a815748fa5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java @@ -453,20 +453,18 @@ private void expire(FlinkSavepoint entity) { cpThreshold = CHECKPOINT == CheckPointTypeEnum.of(entity.getType()) ? cpThreshold - 1 : cpThreshold; if (cpThreshold == 0) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(FlinkSavepoint::getAppId, entity.getAppId()) - .eq(FlinkSavepoint::getType, CHECKPOINT.get()); - this.remove(queryWrapper); + this.lambdaUpdate().eq(FlinkSavepoint::getAppId, entity.getAppId()) + .eq(FlinkSavepoint::getType, CHECKPOINT.get()).remove(); return; } - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .select(FlinkSavepoint::getTriggerTime) + Page savepointPage = this.lambdaQuery().select(FlinkSavepoint::getTriggerTime) .eq(FlinkSavepoint::getAppId, entity.getAppId()) .eq(FlinkSavepoint::getType, CHECKPOINT.get()) - .orderByDesc(FlinkSavepoint::getTriggerTime); + .orderByDesc(FlinkSavepoint::getTriggerTime) + .page( + new Page<>(1, cpThreshold + 1)); - Page savepointPage = this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1), queryWrapper); if (CollectionUtils.isEmpty(savepointPage.getRecords()) || savepointPage.getRecords().size() <= cpThreshold) { return; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java index 842c0017c8..80d5b95fb4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java @@ -81,11 +81,9 @@ public FlinkSql getEffective(Long appId, boolean decode) { public FlinkSql getLatestFlinkSql(Long appId, boolean decode) { Page page = new Page<>(); page.setCurrent(0).setSize(1).setSearchCount(false); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + Page flinkSqlPage = this.lambdaQuery() .eq(FlinkSql::getAppId, appId) - .orderByDesc(FlinkSql::getVersion); - - Page flinkSqlPage = baseMapper.selectPage(page, queryWrapper); + .orderByDesc(FlinkSql::getVersion).page(page); return Optional.ofNullable(flinkSqlPage.getRecords()) .filter(records -> !records.isEmpty()) .map(records -> records.get(0)) @@ -124,11 +122,8 @@ public void setCandidate(CandidateTypeEnum candidateTypeEnum, Long appId, Long s @Override public List listFlinkSqlHistory(Long appId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(FlinkSql::getAppId, appId) - .orderByDesc(FlinkSql::getVersion); - - List sqlList = this.baseMapper.selectList(queryWrapper); + List sqlList = + this.lambdaQuery().eq(FlinkSql::getAppId, appId).orderByDesc(FlinkSql::getVersion).list(); FlinkSql effective = getEffective(appId, false); if (effective != null) { sqlList.stream() @@ -165,8 +160,7 @@ public void cleanCandidate(Long id) { @Override public void removeByAppId(Long appId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(FlinkSql::getAppId, appId); - baseMapper.delete(queryWrapper); + this.lambdaUpdate().eq(FlinkSql::getAppId, appId).remove(); } @Override @@ -219,8 +213,7 @@ public List listByTeamId(Long teamId) { public IPage getPage(Long appId, RestRequest request) { request.setSortField("version"); Page page = MybatisPager.getPage(request); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(FlinkSql::getAppId, appId); - IPage sqlList = this.baseMapper.selectPage(page, queryWrapper); + IPage sqlList = this.lambdaQuery().eq(FlinkSql::getAppId, appId).page(page); FlinkSql effectiveSql = baseMapper.getEffective(appId); if (effectiveSql != null) { for (FlinkSql sql : sqlList.getRecords()) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java index 462ee8ddc1..e1aac54b1f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java @@ -25,9 +25,7 @@ import org.apache.streampark.console.core.service.MessageService; import org.apache.streampark.console.core.websocket.WebSocketEndpoint; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -49,11 +47,10 @@ public void push(Message message) { @Override public IPage getUnReadPage(NoticeTypeEnum noticeTypeEnum, RestRequest request) { - Page page = MybatisPager.getPage(request); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + return this.lambdaQuery() .eq(Message::getIsRead, false) .orderByDesc(Message::getCreateTime) - .eq(Message::getType, noticeTypeEnum); - return this.baseMapper.selectPage(page, queryWrapper); + .eq(Message::getType, noticeTypeEnum) + .page(MybatisPager.getPage(request)); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java index f645267d78..1cf2b9c8c3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java @@ -53,7 +53,8 @@ public class RegistryServiceImpl implements RegistryService { private String zkAddress; private ZooKeeper zk; private String nodePath; - private Watcher watcher = event -> { + + private final Watcher watcher = event -> { if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged && event.getPath().equals(REGISTRY_PATH)) { handleNodeChanges(); @@ -72,7 +73,6 @@ public void registry() { try { zkAddress = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181"); zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher); - if (zk.exists(REGISTRY_PATH, false) == null) { zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java index 2972c2f8cc..98ddc1145d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java @@ -241,8 +241,7 @@ public void remove(Long id) { } public List listByTeamId(Long teamId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Resource::getTeamId, teamId); - return baseMapper.selectList(queryWrapper); + return this.lambdaQuery().eq(Resource::getTeamId, teamId).list(); } /** diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java index a3a0b2b72a..9027721470 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java @@ -28,7 +28,6 @@ import org.apache.commons.lang3.StringUtils; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.model.AuthConfig; @@ -69,21 +68,16 @@ public void loadSettings() { @Override public Setting get(String key) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Setting::getSettingKey, key); - return this.getOne(queryWrapper); + return this.lambdaQuery().eq(Setting::getSettingKey, key).one(); } @Override public boolean update(Setting setting) { try { String value = StringUtils.trimToNull(setting.getSettingValue()); - setting.setSettingValue(value); - - Setting entity = new Setting(); - entity.setSettingValue(setting.getSettingValue()); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Setting::getSettingKey, - setting.getSettingKey()); - this.update(entity, queryWrapper); + this.lambdaUpdate().eq(Setting::getSettingKey, setting.getSettingKey()) + .set(Setting::getSettingValue, value) + .update(); getMavenConfig().updateConfig(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java index 1a70349837..3581b727ca 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java @@ -81,11 +81,9 @@ public SparkSql getEffective(Long appId, boolean decode) { public SparkSql getLatestSparkSql(Long appId, boolean decode) { Page page = new Page<>(); page.setCurrent(0).setSize(1).setSearchCount(false); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + Page sparkSqlPage = this.lambdaQuery() .eq(SparkSql::getAppId, appId) - .orderByDesc(SparkSql::getVersion); - - Page sparkSqlPage = baseMapper.selectPage(page, queryWrapper); + .orderByDesc(SparkSql::getVersion).page(page); return Optional.ofNullable(sparkSqlPage.getRecords()) .filter(records -> !records.isEmpty()) .map(records -> records.get(0)) @@ -124,11 +122,8 @@ public void setCandidate(CandidateTypeEnum candidateTypeEnum, Long appId, Long s @Override public List listSparkSqlHistory(Long appId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(SparkSql::getAppId, appId) - .orderByDesc(SparkSql::getVersion); - - List sqlList = this.baseMapper.selectList(queryWrapper); + List sqlList = this.lambdaQuery().eq(SparkSql::getAppId, appId) + .orderByDesc(SparkSql::getVersion).list(); SparkSql effective = getEffective(appId, false); if (effective != null) { sqlList.stream() @@ -219,8 +214,7 @@ public List listByTeamId(Long teamId) { public IPage getPage(Long appId, RestRequest request) { request.setSortField("version"); Page page = MybatisPager.getPage(request); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(SparkSql::getAppId, appId); - IPage sqlList = this.baseMapper.selectPage(page, queryWrapper); + IPage sqlList = this.lambdaQuery().eq(SparkSql::getAppId, appId).page(page); SparkSql effectiveSql = baseMapper.getEffective(appId); if (effectiveSql != null) { for (SparkSql sql : sqlList.getRecords()) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java index fec9f83538..45c027ddc4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java @@ -30,7 +30,6 @@ import org.apache.streampark.console.system.service.TeamService; import org.apache.streampark.console.system.service.UserService; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; @@ -70,8 +69,7 @@ public void removeByUserId(Long userId) { @Override public void removeByTeamId(Long teamId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Member::getTeamId, teamId); - this.remove(queryWrapper); + this.lambdaUpdate().eq(Member::getTeamId, teamId).remove(); } @Override @@ -102,16 +100,15 @@ public Member getByTeamIdUserName(Long teamId, String userName) { private Member findByUserId(Long teamId, Long userId) { ApiAlertException.throwIfNull(teamId, "The team id is required."); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + return this.lambdaQuery() .eq(Member::getTeamId, teamId) - .eq(Member::getUserId, userId); - return baseMapper.selectOne(queryWrapper); + .eq(Member::getUserId, userId) + .one(); } @Override public List listUserIdsByRoleId(Long roleId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Member::getRoleId, roleId); - List memberList = baseMapper.selectList(queryWrapper); + List memberList = this.lambdaQuery().eq(Member::getRoleId, roleId).list(); return memberList.stream().map(Member::getUserId).collect(Collectors.toList()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java index 12a040342f..f1907df3b9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java @@ -78,9 +78,8 @@ public List listMenus(Long userId, Long teamId) { String.format("The userId:[%s] not found", userId))); // Admin has the permission for all menus. if (UserTypeEnum.ADMIN == user.getUserType()) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(Menu::getType, "0") - .orderByAsc(Menu::getOrderNum); - return this.list(queryWrapper); + return this.lambdaQuery().eq(Menu::getType, "0") + .orderByAsc(Menu::getOrderNum).list(); } return this.baseMapper.selectMenus(userId, teamId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleMenuServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleMenuServiceImpl.java index a24260a834..d1c3eb9d7e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleMenuServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleMenuServiceImpl.java @@ -21,7 +21,6 @@ import org.apache.streampark.console.system.mapper.RoleMenuMapper; import org.apache.streampark.console.system.service.RoleMenuService; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; @@ -39,22 +38,17 @@ public class RoleMenuServiceImpl extends ServiceImpl @Override @Transactional public void removeByRoleId(Long roleId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(RoleMenu::getRoleId, roleId); - baseMapper.delete(queryWrapper); + this.lambdaUpdate().eq(RoleMenu::getRoleId, roleId).remove(); } @Override @Transactional public void removeByMenuIds(String[] menuIds) { - List menuIdList = Arrays.asList(menuIds); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().in(RoleMenu::getMenuId, - menuIdList); - baseMapper.delete(queryWrapper); + this.lambdaUpdate().in(RoleMenu::getRoleId, Arrays.asList(menuIds)).remove(); } @Override public List listByRoleId(String roleId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(RoleMenu::getRoleId, roleId); - return baseMapper.selectList(queryWrapper); + return this.lambdaQuery().eq(RoleMenu::getRoleId, roleId).list(); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java index cf9de0521a..7f484f916f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java @@ -44,7 +44,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; @@ -93,8 +92,7 @@ public class UserServiceImpl extends ServiceImpl implements Us @Override public User getByUsername(String username) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(User::getUsername, username); - return baseMapper.selectOne(queryWrapper); + return this.lambdaQuery().eq(User::getUsername, username).one(); } @Override @@ -110,10 +108,7 @@ public IPage getPage(User user, RestRequest request) { @Override public void updateLoginTime(String username) { - User user = new User(); - user.setLastLoginTime(new Date()); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(User::getUsername, username); - this.baseMapper.update(user, queryWrapper); + this.lambdaUpdate().eq(User::getUsername, username).set(User::getLastLoginTime, new Date()).update(); } @Override @@ -186,8 +181,7 @@ public String resetPassword(String username) { String password = ShaHashUtils.encrypt(salt, newPassword); user.setSalt(salt); user.setPassword(password); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(User::getUsername, username); - this.baseMapper.update(user, queryWrapper); + this.lambdaUpdate().eq(User::getUsername, username).update(user); return newPassword; } diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/AlertConfigMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/AlertConfigMapper.xml index 19561276da..22995ffebd 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/AlertConfigMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/AlertConfigMapper.xml @@ -18,10 +18,5 @@ - + diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java index ab76ac7197..a08141a9a9 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java @@ -124,13 +124,6 @@ void testStartFlinkApplicationOnYarnApplicationMode() { applicationsPage.startApplication(applicationName); - Awaitility.await() - .untilAsserted( - () -> assertThat(applicationsPage.applicationsList) - .as("Applications list should contain started application") - .extracting(WebElement::getText) - .anyMatch(it -> it.contains("RUNNING"))); - Awaitility.await() .untilAsserted( () -> assertThat(applicationsPage.applicationsList) @@ -142,7 +135,7 @@ void testStartFlinkApplicationOnYarnApplicationMode() { @Test @Order(4) @SneakyThrows - void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() { + void testCancelFlinkApplicationOnYarnApplicationMode() { Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); final ApplicationsPage applicationsPage = new ApplicationsPage(browser); diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java index 9403a69f37..c2cdaff273 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java @@ -123,13 +123,6 @@ void testStartFlinkApplicationOnYarnApplicationMode() { applicationsPage.startApplication(applicationName); - Awaitility.await() - .untilAsserted( - () -> assertThat(applicationsPage.applicationsList) - .as("Applications list should contain started application") - .extracting(WebElement::getText) - .anyMatch(it -> it.contains("RUNNING"))); - Awaitility.await() .untilAsserted( () -> assertThat(applicationsPage.applicationsList) @@ -141,7 +134,7 @@ void testStartFlinkApplicationOnYarnApplicationMode() { @Test @Order(4) @SneakyThrows - void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() { + void testCancelFlinkApplicationOnYarnApplicationMode() { Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); final ApplicationsPage applicationsPage = new ApplicationsPage(browser); diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java index 2951b5dbd9..df8e126cd9 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java @@ -123,13 +123,6 @@ void testStartFlinkApplicationOnYarnApplicationMode() { applicationsPage.startApplication(applicationName); - Awaitility.await() - .untilAsserted( - () -> assertThat(applicationsPage.applicationsList) - .as("Applications list should contain started application") - .extracting(WebElement::getText) - .anyMatch(it -> it.contains("RUNNING"))); - Awaitility.await() .untilAsserted( () -> assertThat(applicationsPage.applicationsList) @@ -141,7 +134,7 @@ void testStartFlinkApplicationOnYarnApplicationMode() { @Test @Order(4) @SneakyThrows - void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() { + void testCancelFlinkApplicationOnYarnApplicationMode() { Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); final ApplicationsPage applicationsPage = new ApplicationsPage(browser); @@ -245,7 +238,6 @@ void testStartFlinkApplicationOnYarnPerJobMode() { @Order(9) @SneakyThrows void testRestartAndCancelFlinkApplicationOnYarnPerJobMode() { - Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); final ApplicationsPage applicationsPage = new ApplicationsPage(browser); applicationsPage.startApplication(applicationName); @@ -330,13 +322,6 @@ void testStartFlinkApplicationOnYarnSessionMode() { applicationsPage.startApplication(applicationName); - Awaitility.await() - .untilAsserted( - () -> assertThat(applicationsPage.applicationsList) - .as("Applications list should contain started application") - .extracting(WebElement::getText) - .anyMatch(it -> it.contains("RUNNING"))); - Awaitility.await() .untilAsserted( () -> assertThat(applicationsPage.applicationsList) @@ -349,9 +334,7 @@ void testStartFlinkApplicationOnYarnSessionMode() { @Order(14) @SneakyThrows void testRestartAndCancelFlinkApplicationOnYarnSessionMode() { - Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); final ApplicationsPage applicationsPage = new ApplicationsPage(browser); - applicationsPage.startApplication(applicationName); Awaitility.await() @@ -375,9 +358,7 @@ void testRestartAndCancelFlinkApplicationOnYarnSessionMode() { @Order(15) void testDeleteFlinkApplicationOnYarnSessionMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); - applicationsPage.deleteApplication(applicationName); - Awaitility.await() .untilAsserted( () -> {