Skip to content

Commit

Permalink
[Improve] mybatis lambdaQueryWrapper improvements (apache#4139)
Browse files Browse the repository at this point in the history
* [Improve] mybatis-plus LambdaQueryWrapper improvements

* [Improve] mybatis-plus query page bug fixed.

* [Improve] minor improvement

* [Improve] code style improvement

* [Improve] e2e workflow improvement

* [Improve] isLocalProcessing bug fixed.
  • Loading branch information
wolfboys authored Dec 1, 2024
1 parent b85b5d4 commit b65b87d
Show file tree
Hide file tree
Showing 33 changed files with 150 additions and 272 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ jobs:
class: org.apache.streampark.e2e.cases.Flink117OnYarnClusterDeployTest
- name: Flink118OnYarnClusterDeployTest
class: org.apache.streampark.e2e.cases.Flink118OnYarnClusterDeployTest
- name: FlinkSQL116OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL116OnYarnTest
- name: FlinkSQL117OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL117OnYarnTest
- name: FlinkSQL118OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest
env:
RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.streampark.console.base.util;

import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.util.SystemPropertyUtils;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -90,4 +91,9 @@ public static File getAppClientDir() {
public static File getPluginDir() {
return getAppDir(PLUGINS);
}

public static boolean isHaEnable() {
return SystemPropertyUtils.getBoolean("high-availability.enable", false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

import org.apache.streampark.console.core.entity.AlertConfig;

import org.apache.ibatis.annotations.Param;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

public interface AlertConfigMapper extends BaseMapper<AlertConfig> {

AlertConfig selectAlertConfByName(@Param("alertConfig") AlertConfig alertConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ private void initConfig() {
}

private void initRegistryService() {
boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
if (enable) {
if (WebUtils.isHaEnable()) {
RegistryService registryService = SpringContextUtils.getBean(RegistryService.class);
registryService.registry();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public interface DistributedTaskService extends IService<DistributedTask> {
* @param appId Long
* @return boolean
*/
public boolean isLocalProcessing(Long appId);
boolean isLocalProcessing(Long appId);

/**
* Save Distributed Task.
Expand All @@ -70,5 +70,5 @@ public interface DistributedTaskService extends IService<DistributedTask> {
* @param autoStart boolean
* @param action It may be one of the following values: START, RESTART, REVOKE, CANCEL, ABORT
*/
public void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action);
void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,9 @@ public class AlertConfigServiceImpl extends ServiceImpl<AlertConfigMapper, Alert
@Override
public IPage<AlertConfigParams> page(Long userId, RestRequest request) {
// build query conditions
LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(userId != null, AlertConfig::getUserId, userId);

Page<AlertConfig> page = MybatisPager.getPage(request);
IPage<AlertConfig> resultPage = getBaseMapper().selectPage(page, wrapper);

IPage<AlertConfig> resultPage =
this.lambdaQuery().eq(userId != null, AlertConfig::getUserId, userId).page(page);
Page<AlertConfigParams> result = new Page<>();
if (CollectionUtils.isNotEmpty(resultPage.getRecords())) {
result.setRecords(
Expand All @@ -71,8 +68,7 @@ public IPage<AlertConfigParams> page(Long userId, RestRequest request) {

@Override
public boolean exist(AlertConfig alertConfig) {
AlertConfig confByName = this.baseMapper.selectAlertConfByName(alertConfig);
return confByName != null;
return this.lambdaQuery().eq(AlertConfig::getAlertName, alertConfig.getAlertName()).exists();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import org.apache.streampark.console.core.service.application.FlinkApplicationManageService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
Expand Down Expand Up @@ -70,9 +69,7 @@ public class FlinkApplicationBackupServiceImpl
@Override
public IPage<FlinkApplicationBackup> getPage(FlinkApplicationBackup bakParam, RestRequest request) {
Page<FlinkApplicationBackup> page = MybatisPager.getPage(request);
LambdaQueryWrapper<FlinkApplicationBackup> queryWrapper = new LambdaQueryWrapper<FlinkApplicationBackup>()
.eq(FlinkApplicationBackup::getAppId, bakParam.getAppId());
return this.baseMapper.selectPage(page, queryWrapper);
return this.lambdaQuery().eq(FlinkApplicationBackup::getAppId, bakParam.getAppId()).page(page);
}

@Override
Expand Down Expand Up @@ -135,11 +132,12 @@ public void rollback(FlinkApplicationBackup bakParam) {
public void revoke(FlinkApplication appParam) {
Page<FlinkApplicationBackup> page = new Page<>();
page.setCurrent(0).setSize(1).setSearchCount(false);
LambdaQueryWrapper<FlinkApplicationBackup> queryWrapper = new LambdaQueryWrapper<FlinkApplicationBackup>()
.eq(FlinkApplicationBackup::getAppId, appParam.getId())
.orderByDesc(FlinkApplicationBackup::getCreateTime);

Page<FlinkApplicationBackup> backUpPages = baseMapper.selectPage(page, queryWrapper);
Page<FlinkApplicationBackup> backUpPages = this.lambdaQuery().eq(
FlinkApplicationBackup::getAppId,
appParam.getId())
.orderByDesc(FlinkApplicationBackup::getCreateTime).page(page);

if (!backUpPages.getRecords().isEmpty()) {
FlinkApplicationBackup backup = backUpPages.getRecords().get(0);
String path = backup.getPath();
Expand All @@ -151,9 +149,7 @@ public void revoke(FlinkApplication appParam) {
@Override
public void remove(FlinkApplication appParam) {
try {
baseMapper.delete(
new LambdaQueryWrapper<FlinkApplicationBackup>()
.eq(FlinkApplicationBackup::getAppId, appParam.getId()));
this.lambdaUpdate().eq(FlinkApplicationBackup::getAppId, appParam.getId()).remove();
appParam
.getFsOperator()
.delete(
Expand All @@ -169,10 +165,8 @@ public void remove(FlinkApplication appParam) {

@Override
public void rollbackFlinkSql(FlinkApplication appParam, FlinkSql flinkSqlParam) {
LambdaQueryWrapper<FlinkApplicationBackup> queryWrapper = new LambdaQueryWrapper<FlinkApplicationBackup>()
.eq(FlinkApplicationBackup::getAppId, appParam.getId())
.eq(FlinkApplicationBackup::getSqlId, flinkSqlParam.getId());
FlinkApplicationBackup backUp = baseMapper.selectOne(queryWrapper);
FlinkApplicationBackup backUp = this.lambdaQuery().eq(FlinkApplicationBackup::getAppId, appParam.getId())
.eq(FlinkApplicationBackup::getSqlId, flinkSqlParam.getId()).one();
ApiAlertException.throwIfNull(
backUp, "Application backup can't be null. Rollback flink sql failed.");
// rollback config and sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.benmanes.caffeine.cache.Cache;
Expand Down Expand Up @@ -631,10 +630,8 @@ public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long> appId
if (CollectionUtils.isEmpty(appIds)) {
return new HashMap<>();
}
LambdaQueryWrapper<ApplicationBuildPipeline> queryWrapper = new LambdaQueryWrapper<ApplicationBuildPipeline>()
.in(ApplicationBuildPipeline::getAppId, appIds);

List<ApplicationBuildPipeline> appBuildPipelines = baseMapper.selectList(queryWrapper);
List<ApplicationBuildPipeline> appBuildPipelines =
this.lambdaQuery().in(ApplicationBuildPipeline::getAppId, appIds).list();
if (CollectionUtils.isEmpty(appBuildPipelines)) {
return new HashMap<>();
}
Expand All @@ -644,8 +641,7 @@ public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long> appId

@Override
public void removeByAppId(Long appId) {
baseMapper.delete(
new LambdaQueryWrapper<ApplicationBuildPipeline>().eq(ApplicationBuildPipeline::getAppId, appId));
this.lambdaUpdate().eq(ApplicationBuildPipeline::getAppId, appId).remove();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@
import org.apache.streampark.console.core.service.FlinkEffectiveService;
import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -92,13 +89,10 @@ public synchronized void create(FlinkApplication appParam, Boolean latest) {
}

public void setLatest(Long appId, Long configId) {
LambdaUpdateWrapper<FlinkApplicationConfig> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.set(FlinkApplicationConfig::getLatest, false).eq(FlinkApplicationConfig::getAppId, appId);
this.update(updateWrapper);

updateWrapper.clear();
updateWrapper.set(FlinkApplicationConfig::getLatest, true).eq(FlinkApplicationConfig::getId, configId);
this.update(updateWrapper);
this.lambdaUpdate().set(FlinkApplicationConfig::getLatest, false).eq(FlinkApplicationConfig::getAppId, appId)
.update();
this.lambdaUpdate().set(FlinkApplicationConfig::getLatest, true).eq(FlinkApplicationConfig::getId, configId)
.update();
}

@Override
Expand Down Expand Up @@ -187,9 +181,8 @@ public void setLatestOrEffective(Boolean latest, Long configId, Long appId) {

@Override
public void toEffective(Long appId, Long configId) {
LambdaUpdateWrapper<FlinkApplicationConfig> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkApplicationConfig::getAppId, appId).set(FlinkApplicationConfig::getLatest, false);
this.update(updateWrapper);
this.lambdaUpdate().eq(FlinkApplicationConfig::getAppId, appId).set(FlinkApplicationConfig::getLatest, false)
.update();
effectiveService.saveOrUpdate(appId, EffectiveTypeEnum.CONFIG, configId);
}

Expand Down Expand Up @@ -225,21 +218,17 @@ public IPage<FlinkApplicationConfig> getPage(FlinkApplicationConfig config, Rest

@Override
public List<FlinkApplicationConfig> list(Long appId) {
LambdaQueryWrapper<FlinkApplicationConfig> queryWrapper = new LambdaQueryWrapper<FlinkApplicationConfig>()
.eq(FlinkApplicationConfig::getAppId, appId)
.orderByDesc(FlinkApplicationConfig::getVersion);

List<FlinkApplicationConfig> configList = this.baseMapper.selectList(queryWrapper);
List<FlinkApplicationConfig> configList = this.lambdaQuery().eq(FlinkApplicationConfig::getAppId, appId)
.orderByDesc(FlinkApplicationConfig::getVersion).list();
fillEffectiveField(appId, configList);
return configList;
}

@Override
public synchronized String readTemplate() {
if (flinkConfTemplate == null) {
try {
Resource resource = resourceLoader.getResource("classpath:flink-application.conf");
Scanner scanner = new Scanner(resource.getInputStream());
Resource resource = resourceLoader.getResource("classpath:flink-application.conf");
try (Scanner scanner = new Scanner(resource.getInputStream())) {
StringBuilder stringBuffer = new StringBuilder();
while (scanner.hasNextLine()) {
stringBuffer.append(scanner.nextLine()).append(System.lineSeparator());
Expand All @@ -257,8 +246,7 @@ public synchronized String readTemplate() {

@Override
public void removeByAppId(Long appId) {
baseMapper.delete(
new LambdaQueryWrapper<FlinkApplicationConfig>().eq(FlinkApplicationConfig::getAppId, appId));
this.lambdaUpdate().eq(FlinkApplicationConfig::getAppId, appId).remove();
}

private void fillEffectiveField(Long id, List<FlinkApplicationConfig> configList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -251,14 +250,12 @@ public boolean checkAlter(FlinkApplication appParam) {

@Override
public boolean existsByTeamId(Long teamId) {
return baseMapper.exists(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getTeamId, teamId));
return this.lambdaQuery().eq(FlinkApplication::getTeamId, teamId).exists();
}

@Override
public boolean existsByUserId(Long userId) {
return baseMapper.exists(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getUserId, userId));
return this.lambdaQuery().eq(FlinkApplication::getUserId, userId).exists();
}

@Override
Expand All @@ -273,17 +270,13 @@ public boolean existsRunningByClusterId(Long clusterId) {

@Override
public boolean existsByClusterId(Long clusterId) {
return baseMapper.exists(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getFlinkClusterId, clusterId));
return this.lambdaQuery().eq(FlinkApplication::getFlinkClusterId, clusterId).exists();
}

@Override
public Integer countByClusterId(Long clusterId) {
return baseMapper
.selectCount(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getFlinkClusterId,
clusterId))
.intValue();
return this.lambdaQuery().eq(FlinkApplication::getFlinkClusterId,
clusterId).count().intValue();
}

@Override
Expand All @@ -293,8 +286,7 @@ public Integer countAffectedByClusterId(Long clusterId, String dbType) {

@Override
public boolean existsByFlinkEnvId(Long flinkEnvId) {
return baseMapper.exists(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getVersionId, flinkEnvId));
return this.lambdaQuery().eq(FlinkApplication::getVersionId, flinkEnvId).exists();
}

@Override
Expand Down Expand Up @@ -434,8 +426,8 @@ public AppExistsStateEnum checkExists(FlinkApplication appParam) {
return AppExistsStateEnum.INVALID;
}

FlinkApplication application = baseMapper.selectOne(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getJobName, jobName));
FlinkApplication application = this.lambdaQuery().eq(FlinkApplication::getJobName, jobName).one();

if (application != null && !application.getId().equals(appParamId)) {
return AppExistsStateEnum.IN_DB;
}
Expand Down
Loading

0 comments on commit b65b87d

Please sign in to comment.