Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The transaction does not seem to take effect in this case #652

Open
funky-eyes opened this issue Nov 19, 2022 · 13 comments
Open

The transaction does not seem to take effect in this case #652

funky-eyes opened this issue Nov 19, 2022 · 13 comments
Labels
status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged

Comments

@funky-eyes
Copy link

public interface DistributedLockRepository extends ReactiveCrudRepository<DistributedLock, String> {

    @Query("SELECT * FROM distributed_lock WHERE lock_key = :#{[0]} for update ")
    Mono<DistributedLock> findByLockKey(String lockKey);

}
    @Override
    public boolean acquireLock(DistributedLockDO distributedLockDO) {
        try {
            return Boolean.TRUE.equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())
                .publishOn(Schedulers.boundedElastic()).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return false;
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).block() != null;
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).block() != null;
                }).as(operator::transactional).block());
        } catch (R2dbcDataIntegrityViolationException e) {
            // being scrambled by other threads to succeed
            return false;
        }
    }
Caused by: io.r2dbc.spi.R2dbcTimeoutException: Lock wait timeout exceeded; try restarting transaction
	at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ SQL "UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?" [DatabaseClient]
Original Stack Trace:
		at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:317)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:292)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
		at dev.miku.r2dbc.mysql.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:70)
		at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
		at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126)
		at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
		at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
		at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
		at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
		at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:265)
		at dev.miku.r2dbc.mysql.client.ReactorNettyClient$ResponseSink.next(ReactorNettyClient.java:340)
		at dev.miku.r2dbc.mysql.client.ReactorNettyClient.lambda$new$0(ReactorNettyClient.java:103)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.handleDecoded(MessageDuplexCodec.java:187)
		at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.channelRead(MessageDuplexCodec.java:95)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
		at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
		at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
		at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
		at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:829)
<==

When using the above code, the transaction will fail, and querying the data first and then updating it will fail due to the inability to get an x-lock on the database

@mp911de
Copy link
Member

mp911de commented Nov 22, 2022

UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ? doesn't seem to use database-specific locks. Lock wait timeout exceeded; try restarting transaction indicates that another transaction is ongoing and that the current connection didn't get a lock to update the row.

Have you checked what transaction is holding the lock?

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Nov 22, 2022
@funky-eyes
Copy link
Author

UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ? doesn't seem to use database-specific locks. Lock wait timeout exceeded; try restarting transaction indicates that another transaction is ongoing and that the current connection didn't get a lock to update the row.

Have you checked what transaction is holding the lock?

distributedLockRepository.findByLockKey(distributedLockDO.getLockKey()) The lock is held, which means that his transaction is not maintained to the update sql block,or it's not the same transaction as the update transaction

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Nov 22, 2022
@funky-eyes
Copy link
Author

funky-eyes commented Nov 22, 2022

'for update' and 'update' are not connected in a transaction, which causes 'update' to time out while waiting for an x-lock on the database, how do I get 'select for update' and 'update' to stay in the same transaction? @mp911de

@mp911de
Copy link
Member

mp911de commented Nov 22, 2022

Make sure that you have a single connection factory to obtain connections from. Any routing might cause interferences.

@funky-eyes
Copy link
Author

Make sure that you have a single connection factory to obtain connections from. Any routing might cause interferences.

You can try this example, the transaction is invalid

@funky-eyes
Copy link
Author

@ConditionalOnBean(DatabaseClient.class)
@Component
public class R2dbcDistributedLockerDAO implements DistributedLocker {
    private static final Logger LOGGER = LoggerFactory.getLogger(R2dbcDistributedLockerDAO.class);

    private final BeanCopier distributedLockDOToEntity = BeanCopier.create(DistributedLockDO.class, DistributedLock.class, false);

    @Resource
    private DistributedLockRepository distributedLockRepository;

    @Resource
    private TransactionalOperator operator;

    /**
     * Instantiates a new Log store data base dao.
     */
    public R2dbcDistributedLockerDAO() {
    }

    @Override
    public boolean acquireLock(DistributedLockDO distributedLockDO) {
        try {
            return Boolean.TRUE.equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())
                .publishOn(Schedulers.boundedElastic()).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return false;
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).block() != null;
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).block() != null;
                }).as(operator::transactional).block());
        } catch (R2dbcDataIntegrityViolationException e) {
            // being scrambled by other threads to succeed
            return false;
        }
    }
}
@SpringBootApplication( exclude = R2dbcAutoConfiguration.class)
public class ServerApplication {
    public static void main(String[] args) throws IOException {
        // run the spring-boot application
        SpringApplication.run(ServerApplication.class, args);
    }
}
@Configuration
@Import(R2dbcDataAutoConfiguration.class)
public class R2dbcAutoConfiguration {
}
@Configuration
@EnableConfigurationProperties(R2dbcProperties.class)
@AutoConfigureBefore(R2dbcAutoConfiguration.class)
public class R2dbcConfiguration extends AbstractDataSourceProvider {

    @Bean
    public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
        R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory);
        return DatabaseClient.builder().connectionFactory(connectionFactory)
            .bindMarkers(dialect.getBindMarkersFactory()).build();
    }

    @Bean
    public ReactiveTransactionManager reactiveTransactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }

    @Bean
    public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
        R2dbcDialect dialect = DialectResolver.getDialect(databaseClient.getConnectionFactory());
        return new R2dbcEntityTemplate(databaseClient, dialect);
    }

    @Bean
    public ConnectionPool connectionFactory(R2dbcProperties r2dbcProperties) {
        String url = getUrl();
        ConnectionInfo connectionInfo = URLParser.parser(url);
        String[] dbPeer = connectionInfo.getDbPeer().split(":");
        String host = dbPeer[0];
        int port = Integer.parseInt(dbPeer[1]);
        ConnectionFactoryOptions.Builder options = ConnectionFactoryOptions.builder()
            .option(DRIVER, getDBType().name().toLowerCase()).option(HOST, host).option(USER, getUser())
            .option(PORT, port).option(PASSWORD, getPassword()).option(DATABASE, connectionInfo.getDbInstance())
            .option(CONNECT_TIMEOUT, Duration.ofMillis(getMaxWait()));
        String paramUrl = url.substring(url.indexOf("?") + 1);
        if (StringUtils.isNotBlank(paramUrl)) {
            String useSSL = "useSSL";
            if (paramUrl.contains(useSSL)) {
                String[] params = paramUrl.split("&");
                for (String param : params) {
                    if (param.contains(useSSL)) {
                        options.option(SSL, Boolean.parseBoolean(param.split("=")[1]));
                        break;
                    }
                }
            }
        }
        ConnectionFactory connectionFactory = ConnectionFactories.get(options.build());
        R2dbcProperties.Pool pool = r2dbcProperties.getPool();
        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
        ConnectionPoolConfiguration.Builder builder = ConnectionPoolConfiguration.builder(connectionFactory);
        map.from(Duration.ofMillis(getMaxWait())).to(builder::maxIdleTime);
        map.from(pool.getMaxAcquireTime()).to(builder::maxAcquireTime);
        map.from(pool.getMaxCreateConnectionTime()).to(builder::maxCreateConnectionTime);
        map.from(getMinConn()).to(builder::initialSize);
        map.from(getMaxConn()).to(builder::maxSize);
        map.from(pool.getValidationQuery()).whenHasText().to(builder::validationQuery);
        map.from(pool.getValidationDepth()).to(builder::validationDepth);
        return new ConnectionPool(builder.build());
    }

    @Bean
    public R2dbcMappingContext r2dbcMappingContext(ObjectProvider<NamingStrategy> namingStrategy,
        R2dbcCustomConversions r2dbcCustomConversions) {
        R2dbcMappingContext relationalMappingContext =
            new R2dbcMappingContext(namingStrategy.getIfAvailable(() -> NamingStrategy.INSTANCE));
        relationalMappingContext.setSimpleTypeHolder(r2dbcCustomConversions.getSimpleTypeHolder());
        return relationalMappingContext;
    }

    @Bean
    public MappingR2dbcConverter r2dbcConverter(R2dbcMappingContext mappingContext,
        R2dbcCustomConversions r2dbcCustomConversions) {
        return new MappingR2dbcConverter(mappingContext, r2dbcCustomConversions);
    }

    @Bean
    public R2dbcCustomConversions r2dbcCustomConversions(ConnectionFactory connectionFactory) {
        R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory);
        List<Object> converters = new ArrayList<>(dialect.getConverters());
        converters.addAll(R2dbcCustomConversions.STORE_CONVERTERS);
        return new R2dbcCustomConversions(
            CustomConversions.StoreConversions.of(dialect.getSimpleTypeHolder(), converters), Collections.emptyList());
    }
}

apache/incubator-seata#4926

@funky-eyes
Copy link
Author

funky-eyes commented Nov 22, 2022

I changed the code to the following and the transaction took effect, I will continue to watch and learn tomorrow, thanks for your help @mp911de

    @Transactional
    public Mono<Boolean> acquireLock(DistributedLockDO distributedLockDO) {
        try {
            return Mono.from(connectionFactory.create()).flatMap(connection -> Mono.from(connection.beginTransaction())
                .then(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return Mono.just(false);
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                }).flatMap(Mono::from)
                .delayUntil(bool -> bool ? connection.commitTransaction() : connection.rollbackTransaction())
                .doFinally(c -> connection.close()));
        } catch (R2dbcDataIntegrityViolationException e) {
            // being scrambled by other threads to succeed
            return Mono.just(false);
        }
    }

@funky-eyes
Copy link
Author

funky-eyes commented Nov 23, 2022

Changing the above code to use TransactionalOperator will invalidate the transaction

            return Boolean.TRUE
                .equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey()).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return Mono.just(false);
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                }).flatMap(Mono::from).as(operator::transactional).block());
 Lock wait timeout exceeded; try restarting transaction
	at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ SQL "UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?" [DatabaseClient]
Original Stack Trace:
		at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:317)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:292)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)

@funky-eyes
Copy link
Author

The 2 example transactions above both fail except that they no longer output 'Lock wait timeout exceeded'
Can you give me an example of 'select for update ' combined with ' update/save'?
I need the for update to hold the x lock on the database to ensure that my update is correct
@mp911de

@funky-eyes
Copy link
Author

Can anyone tell me what to do for this application scenario

@funky-eyes
Copy link
Author

help

@mp911de
Copy link
Member

mp911de commented Dec 12, 2022

return distributedLockRepository.save(distributedLock).block() does not participate within a transaction because there's no context propagation. I also wonder why you use R2DBC if your calling code is blocking. Either rewrite everything to return Mono/Flux or use JDBC. Never call .block as that is the source of your transaction context issues.

@funky-eyes
Copy link
Author

return distributedLockRepository.save(distributedLock).block() does not participate within a transaction because there's no context propagation. I also wonder why you use R2DBC if your calling code is blocking. Either rewrite everything to return Mono/Flux or use JDBC. Never call .block as that is the source of your transaction context issues.

In a transaction, there is inherently the possibility of relying on query results, so how can you ensure that data is not read dirty if it is not in the same transaction? Can you tell me how to have both query and insert actions in a single transaction in r2dbc?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

3 participants