diff --git a/common/src/main/java/ru/tinkoff/kora/common/Context.java b/common/src/main/java/ru/tinkoff/kora/common/Context.java index 6b896bd10..9c712558c 100644 --- a/common/src/main/java/ru/tinkoff/kora/common/Context.java +++ b/common/src/main/java/ru/tinkoff/kora/common/Context.java @@ -96,8 +96,8 @@ public static Context current(CoroutineContext ctx) { public static CoroutineContext inject(CoroutineContext cctx, Context context) { var reactorContext = Reactor.inject(reactor.util.context.Context.of(Context.class, cctx), context); var coroutineContext = (CoroutineContext) (Object) ReactorContextKt.asCoroutineContext(reactorContext); - - return cctx.plus(coroutineContext).plus(asCoroutineContext(context)); + var contextElement = new CoroutineContextElement(context); + return cctx.plus(contextElement).plus(coroutineContext).plus(asCoroutineContext(context)); } public static CoroutineContext asCoroutineContext(Context ctx) { diff --git a/database/database-annotation-processor/build.gradle b/database/database-annotation-processor/build.gradle index 36bb9ddd7..809cd40c1 100644 --- a/database/database-annotation-processor/build.gradle +++ b/database/database-annotation-processor/build.gradle @@ -8,7 +8,9 @@ dependencies { testImplementation testFixtures(project(':annotation-processor-common')) testImplementation project(':database:database-common') testImplementation project(':database:database-jdbc') + testImplementation project(':database:database-vertx-common') testImplementation project(':database:database-vertx') + testImplementation project(':database:database-vertx-coroutines') testImplementation project(':database:database-r2dbc') testImplementation project(':database:database-cassandra') } diff --git a/database/database-symbol-processor/build.gradle b/database/database-symbol-processor/build.gradle index b77fbb7df..5ccb49e88 100644 --- a/database/database-symbol-processor/build.gradle +++ b/database/database-symbol-processor/build.gradle @@ -19,7 +19,9 @@ dependencies { testImplementation testFixtures(project(':annotation-processor-common')) testImplementation project(':database:database-common') testImplementation project(':database:database-jdbc') + testImplementation project(':database:database-vertx-common') testImplementation project(':database:database-vertx') + testImplementation project(':database:database-vertx-coroutines') testImplementation project(':database:database-r2dbc') testImplementation project(':database:database-cassandra') testImplementation(libs.kotlin.stdlib.lib) diff --git a/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/RepositoryBuilder.kt b/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/RepositoryBuilder.kt index 93167c892..0fc2c0645 100644 --- a/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/RepositoryBuilder.kt +++ b/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/RepositoryBuilder.kt @@ -6,7 +6,11 @@ import com.google.devtools.ksp.processing.KSPLogger import com.google.devtools.ksp.processing.Resolver import com.google.devtools.ksp.symbol.ClassKind import com.google.devtools.ksp.symbol.KSClassDeclaration -import com.squareup.kotlinpoet.* +import com.squareup.kotlinpoet.AnnotationSpec +import com.squareup.kotlinpoet.CodeBlock +import com.squareup.kotlinpoet.FunSpec +import com.squareup.kotlinpoet.ParameterSpec +import com.squareup.kotlinpoet.TypeSpec import com.squareup.kotlinpoet.ksp.addOriginatingKSFile import com.squareup.kotlinpoet.ksp.toClassName import com.squareup.kotlinpoet.ksp.toTypeName @@ -14,6 +18,7 @@ import org.slf4j.LoggerFactory import ru.tinkoff.kora.database.symbol.processor.cassandra.CassandraRepositoryGenerator import ru.tinkoff.kora.database.symbol.processor.jdbc.JdbcRepositoryGenerator import ru.tinkoff.kora.database.symbol.processor.r2dbc.R2DbcRepositoryGenerator +import ru.tinkoff.kora.database.symbol.processor.vertx.VertxCoroutineBasedRepositoryGenerator import ru.tinkoff.kora.database.symbol.processor.vertx.VertxRepositoryGenerator import ru.tinkoff.kora.kora.app.ksp.extendsKeepAop import ru.tinkoff.kora.ksp.common.KspCommonUtils.generated @@ -27,6 +32,7 @@ class RepositoryBuilder( private val availableGenerators = listOf( JdbcRepositoryGenerator(resolver), VertxRepositoryGenerator(resolver, kspLogger), + VertxCoroutineBasedRepositoryGenerator(resolver, kspLogger), R2DbcRepositoryGenerator(resolver), CassandraRepositoryGenerator(resolver), ) diff --git a/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/vertx/VertxCoroutineBasedRepositoryGenerator.kt b/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/vertx/VertxCoroutineBasedRepositoryGenerator.kt new file mode 100644 index 000000000..45ac11b6d --- /dev/null +++ b/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/vertx/VertxCoroutineBasedRepositoryGenerator.kt @@ -0,0 +1,203 @@ +package ru.tinkoff.kora.database.symbol.processor.vertx + +import com.google.devtools.ksp.processing.KSPLogger +import com.google.devtools.ksp.processing.Resolver +import com.google.devtools.ksp.symbol.KSClassDeclaration +import com.google.devtools.ksp.symbol.KSFunction +import com.google.devtools.ksp.symbol.KSFunctionDeclaration +import com.squareup.kotlinpoet.AnnotationSpec +import com.squareup.kotlinpoet.ClassName +import com.squareup.kotlinpoet.CodeBlock +import com.squareup.kotlinpoet.FunSpec +import com.squareup.kotlinpoet.KModifier +import com.squareup.kotlinpoet.MemberName +import com.squareup.kotlinpoet.ParameterSpec +import com.squareup.kotlinpoet.ParameterizedTypeName.Companion.parameterizedBy +import com.squareup.kotlinpoet.PropertySpec +import com.squareup.kotlinpoet.TypeSpec +import com.squareup.kotlinpoet.ksp.toTypeName +import ru.tinkoff.kora.database.symbol.processor.DbUtils +import ru.tinkoff.kora.database.symbol.processor.DbUtils.addMapper +import ru.tinkoff.kora.database.symbol.processor.DbUtils.findQueryMethods +import ru.tinkoff.kora.database.symbol.processor.DbUtils.parseExecutorTag +import ru.tinkoff.kora.database.symbol.processor.DbUtils.queryMethodBuilder +import ru.tinkoff.kora.database.symbol.processor.DbUtils.resultMapperName +import ru.tinkoff.kora.database.symbol.processor.DbUtils.updateCount +import ru.tinkoff.kora.database.symbol.processor.Mapper +import ru.tinkoff.kora.database.symbol.processor.QueryWithParameters +import ru.tinkoff.kora.database.symbol.processor.RepositoryGenerator +import ru.tinkoff.kora.database.symbol.processor.model.QueryParameter +import ru.tinkoff.kora.database.symbol.processor.model.QueryParameterParser +import ru.tinkoff.kora.ksp.common.AnnotationUtils.findAnnotation +import ru.tinkoff.kora.ksp.common.AnnotationUtils.findValue +import ru.tinkoff.kora.ksp.common.CommonClassNames +import ru.tinkoff.kora.ksp.common.CommonClassNames.isFlow +import ru.tinkoff.kora.ksp.common.CommonClassNames.isList +import ru.tinkoff.kora.ksp.common.FieldFactory +import ru.tinkoff.kora.ksp.common.FunctionUtils.isFlow +import ru.tinkoff.kora.ksp.common.FunctionUtils.isSuspend +import ru.tinkoff.kora.ksp.common.parseMappingData + +class VertxCoroutineBasedRepositoryGenerator(private val resolver: Resolver, private val kspLogger: KSPLogger) : RepositoryGenerator { + private val runBlocking = MemberName("kotlinx.coroutines", "runBlocking") + private val repositoryInterface = resolver.getClassDeclarationByName(resolver.getKSNameFromString(VertxTypes.Coroutines.repository.canonicalName))?.asStarProjectedType() + override fun repositoryInterface() = repositoryInterface + + override fun generate(repositoryType: KSClassDeclaration, typeBuilder: TypeSpec.Builder, constructorBuilder: FunSpec.Builder): TypeSpec { + this.enrichWithExecutor(repositoryType, typeBuilder, constructorBuilder) + val repositoryResolvedType = repositoryType.asStarProjectedType() + val resultMappers = FieldFactory(typeBuilder, constructorBuilder, "_result_mapper_") + val parameterMappers = FieldFactory(typeBuilder, constructorBuilder, "_parameter_mapper_") + for (method in repositoryType.findQueryMethods()) { + val methodType = method.asMemberOf(repositoryResolvedType) + val parameters = QueryParameterParser.parse(listOf(VertxTypes.sqlConnection, VertxTypes.sqlClient), method, methodType) + val queryAnnotation = method.findAnnotation(DbUtils.queryAnnotation)!! + val queryString = queryAnnotation.findValue("value")!! + val query = QueryWithParameters.parse(queryString, parameters) + val resultMapperName = this.parseResultMapper(method, parameters, methodType)?.let { resultMappers.addMapper(it) } + DbUtils.parseParameterMappers(method, parameters, query, VertxTypes.parameterColumnMapper) { VertxNativeTypes.findNativeType(it.toTypeName()) != null } + .forEach { parameterMappers.addMapper(it) } + val methodSpec = this.generate(method, methodType, query, parameters, resultMapperName, parameterMappers) + typeBuilder.addFunction(methodSpec) + } + + return typeBuilder + .addAnnotation( + AnnotationSpec.builder(ClassName("kotlin", "OptIn")) + .addMember("%T::class", ClassName("kotlinx.coroutines", "ExperimentalCoroutinesApi")) + .build() + ) + .primaryConstructor(constructorBuilder.build()) + .build() + } + + private fun generate(funDeclaration: KSFunctionDeclaration, function: KSFunction, query: QueryWithParameters, parameters: List, resultMapperName: String?, parameterMappers: FieldFactory): FunSpec { + var sql = query.rawQuery + query.parameters.indices.asSequence() + .map { query.parameters[it].sqlParameterName to "$" + (it + 1) } + .sortedByDescending { it.first.length } + .forEach { sql = sql.replace(":" + it.first, it.second) } + + val b = funDeclaration.queryMethodBuilder(resolver) + b.addCode("val _query = %T(\n %S,\n %S\n)\n", DbUtils.queryContext, query.rawQuery, sql) + val batchParam = parameters.firstOrNull { it is QueryParameter.BatchParameter } + val isSuspend = funDeclaration.isSuspend() + val isFlow = funDeclaration.isFlow() + ParametersToTupleBuilder.generate(b, query, funDeclaration, parameters, batchParam, parameterMappers) + val connectionParameter = parameters.asSequence().filterIsInstance().firstOrNull()?.variable?.name?.asString() + + if (isSuspend) { + b.addCode("return ") + } else { + b.addCode("return %M {\n", runBlocking) + } + + if (batchParam != null) { + if (connectionParameter == null) { + b.addCode("%T.awaitBatch(this.vertxConnectionFactory, _query, _batchParams)", VertxTypes.Coroutines.repositoryHelper) + } else { + b.addCode("%T.awaitBatch(%N, this.vertxConnectionFactory.telemetry(), _query, _batchParams)", VertxTypes.Coroutines.repositoryHelper, connectionParameter) + } + if (function.returnType == resolver.builtIns.unitType) { + b.addCode(" \n.let {}") + } + } else if (isFlow) { + if (connectionParameter == null) { + b.addCode("%T.flow(this.vertxConnectionFactory, _query, _tuple, %N)", VertxTypes.Coroutines.repositoryHelper, resultMapperName) + } else { + b.addCode("%T.flow(%N, this.vertxConnectionFactory.telemetry(), _query, _tuple, %N)", VertxTypes.Coroutines.repositoryHelper, connectionParameter, resultMapperName) + } + } else { + if (function.returnType == resolver.builtIns.unitType) { + if (connectionParameter == null) { + b.addCode("%T.await(this.vertxConnectionFactory, _query, _tuple)", VertxTypes.Coroutines.repositoryHelper) + } else { + b.addCode("%T.await(%N, this.vertxConnectionFactory.telemetry(), _query, _tuple)", VertxTypes.Coroutines.repositoryHelper, connectionParameter) + } + } else { + if (connectionParameter == null) { + b.addCode("%T.awaitSingleOrNull(this.vertxConnectionFactory, _query, _tuple", VertxTypes.Coroutines.repositoryHelper) + } else { + b.addCode("%T.awaitSingleOrNull(%N, this.vertxConnectionFactory.telemetry(), _query, _tuple", VertxTypes.Coroutines.repositoryHelper, connectionParameter) + } + if (function.returnType?.toTypeName() == updateCount) { + b.addCode(") { %T.extractUpdateCount(it) }", VertxTypes.rowSetMapper) + } else { + b.addCode(", %N)", resultMapperName) + } + if (function.returnType?.isMarkedNullable == false) { + b.addCode("!!") + } + } + } + b.addCode("\n") + if (!isSuspend) { + b.addCode(" }\n") + } + return b.build() + } + + private fun parseResultMapper(method: KSFunctionDeclaration, parameters: List, methodType: KSFunction): Mapper? { + for (parameter in parameters) { + if (parameter is QueryParameter.BatchParameter) { + return null + } + } + val returnType = methodType.returnType!! + val mapperName = method.resultMapperName() + val mappings = method.parseMappingData() + val resultSetMapper = mappings.getMapping(VertxTypes.rowSetMapper) + val rowMapper = mappings.getMapping(VertxTypes.rowMapper) + if (returnType.isFlow()) { + val flowParam = returnType.arguments[0] + val returnTypeName = flowParam.toTypeName().copy(false) + val mapperType = VertxTypes.rowMapper.parameterizedBy(returnTypeName) + if (rowMapper != null) { + return Mapper(rowMapper, mapperType, mapperName) + } + return Mapper(mapperType, mapperName) + } + val mapperType = VertxTypes.rowSetMapper.parameterizedBy(returnType.toTypeName()) + if (resultSetMapper != null) { + return Mapper(resultSetMapper, mapperType, mapperName) + } + if (rowMapper != null) { + if (returnType.isList()) { + return Mapper(rowMapper, mapperType, mapperName) { + CodeBlock.of("%T.listRowSetMapper(%L)", VertxTypes.rowSetMapper, it) + } + } else { + return Mapper(rowMapper, mapperType, mapperName) { + CodeBlock.of("%T.singleRowSetMapper(%L)", VertxTypes.rowSetMapper, it) + } + } + } + if (returnType == resolver.builtIns.unitType) { + return null + } + if (returnType.toTypeName() == updateCount) { + return null + } + return Mapper(mapperType, mapperName) + } + + private fun enrichWithExecutor(repositoryElement: KSClassDeclaration, builder: TypeSpec.Builder, constructorBuilder: FunSpec.Builder) { + builder.addSuperinterface(VertxTypes.Coroutines.repository) + + builder.addProperty( + PropertySpec.builder("vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory, KModifier.OVERRIDE) + .build() + ) + val executorTag = repositoryElement.parseExecutorTag() + if (executorTag != null) { + constructorBuilder.addParameter( + ParameterSpec.builder("_vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory).addAnnotation( + AnnotationSpec.builder(CommonClassNames.tag).addMember("value = %L", executorTag).build() + ).build() + ) + } else { + constructorBuilder.addParameter("_vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory) + } + constructorBuilder.addStatement("this.vertxConnectionFactory = _vertxConnectionFactory") + } +} diff --git a/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/vertx/VertxTypes.kt b/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/vertx/VertxTypes.kt index 30d029a7f..abe8e8327 100644 --- a/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/vertx/VertxTypes.kt +++ b/database/database-symbol-processor/src/main/kotlin/ru/tinkoff/kora/database/symbol/processor/vertx/VertxTypes.kt @@ -17,4 +17,10 @@ object VertxTypes { val rowMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.result", "VertxRowMapper") val resultColumnMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.result", "VertxResultColumnMapper") val parameterColumnMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.parameter", "VertxParameterColumnMapper") + + object Coroutines { + val connectionFactory = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxConnectionFactory") + val repository = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxRepository") + val repositoryHelper = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxRepositoryHelper") + } } diff --git a/database/database-vertx-common/build.gradle b/database/database-vertx-common/build.gradle new file mode 100644 index 000000000..3e5901d7b --- /dev/null +++ b/database/database-vertx-common/build.gradle @@ -0,0 +1,17 @@ +apply from: "${project.rootDir}/kotlin-plugin.gradle" + +dependencies { + api project(":database:database-common") + api project(":vertx-common") + api project(":common") + + api(libs.vertx.pg.client) + compileOnly(libs.kotlin.stdlib.lib) + compileOnly(libs.kotlin.coroutines.reactor) + compileOnly(libs.kotlin.coroutines.jdk8) + implementation "com.ongres.scram:common:2.1" + implementation "com.ongres.scram:client:2.1" + + testImplementation project(':test:test-postgres') + testImplementation libs.reactor.test +} diff --git a/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabaseBaseModule.java b/database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabaseBaseModule.java similarity index 100% rename from database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabaseBaseModule.java rename to database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabaseBaseModule.java diff --git a/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabaseConfig.java b/database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabaseConfig.java similarity index 100% rename from database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabaseConfig.java rename to database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/VertxDatabaseConfig.java diff --git a/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/mapper/parameter/VertxParameterColumnMapper.java b/database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/mapper/parameter/VertxParameterColumnMapper.java similarity index 100% rename from database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/mapper/parameter/VertxParameterColumnMapper.java rename to database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/mapper/parameter/VertxParameterColumnMapper.java diff --git a/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxResultColumnMapper.java b/database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxResultColumnMapper.java similarity index 100% rename from database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxResultColumnMapper.java rename to database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxResultColumnMapper.java diff --git a/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxRowMapper.java b/database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxRowMapper.java similarity index 100% rename from database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxRowMapper.java rename to database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxRowMapper.java diff --git a/database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxRowSetMapper.java b/database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxRowSetMapper.java similarity index 100% rename from database/database-vertx/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxRowSetMapper.java rename to database/database-vertx-common/src/main/java/ru/tinkoff/kora/database/vertx/mapper/result/VertxRowSetMapper.java diff --git a/database/database-vertx-coroutines/build.gradle b/database/database-vertx-coroutines/build.gradle new file mode 100644 index 000000000..b0264c60e --- /dev/null +++ b/database/database-vertx-coroutines/build.gradle @@ -0,0 +1,19 @@ +apply from: "${project.rootDir}/kotlin-plugin.gradle" + +dependencies { + api project(":database:database-vertx-common") + api project(":vertx-common") + api project(":common") + + api(libs.vertx.pg.client) + api(libs.vertx.kotlin.coroutines) + compileOnly(libs.kotlin.stdlib.lib) + compileOnly(libs.kotlin.coroutines.reactor) + compileOnly(libs.kotlin.coroutines.reactive) + compileOnly(libs.kotlin.coroutines.jdk8) + implementation "com.ongres.scram:common:2.1" + implementation "com.ongres.scram:client:2.1" + + testImplementation project(':test:test-postgres') + testImplementation libs.reactor.test +} diff --git a/database/database-vertx-coroutines/src/main/java/ru/tinkoff/kora/database/vertx/coroutines/VertxDatabaseModule.java b/database/database-vertx-coroutines/src/main/java/ru/tinkoff/kora/database/vertx/coroutines/VertxDatabaseModule.java new file mode 100644 index 000000000..31d556b71 --- /dev/null +++ b/database/database-vertx-coroutines/src/main/java/ru/tinkoff/kora/database/vertx/coroutines/VertxDatabaseModule.java @@ -0,0 +1,19 @@ +package ru.tinkoff.kora.database.vertx.coroutines; + +import com.typesafe.config.Config; +import io.netty.channel.EventLoopGroup; +import ru.tinkoff.kora.config.common.extractor.ConfigValueExtractor; +import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetryFactory; +import ru.tinkoff.kora.database.vertx.VertxDatabaseBaseModule; +import ru.tinkoff.kora.database.vertx.VertxDatabaseConfig; + +public interface VertxDatabaseModule extends VertxDatabaseBaseModule { + default VertxDatabaseConfig vertxDatabaseConfig(Config config, ConfigValueExtractor extractor) { + var value = config.getValue("db"); + return extractor.extract(value); + } + + default VertxDatabase vertxDatabase(VertxDatabaseConfig vertxDatabaseConfig, EventLoopGroup eventLoopGroup, DataBaseTelemetryFactory telemetryFactory) { + return new VertxDatabase(vertxDatabaseConfig, eventLoopGroup, telemetryFactory); + } +} diff --git a/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxConnectionFactory.kt b/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxConnectionFactory.kt new file mode 100644 index 000000000..6212b3227 --- /dev/null +++ b/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxConnectionFactory.kt @@ -0,0 +1,22 @@ +package ru.tinkoff.kora.database.vertx.coroutines + +import io.vertx.sqlclient.Pool +import io.vertx.sqlclient.SqlConnection +import kotlinx.coroutines.ExperimentalCoroutinesApi +import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry + +@ExperimentalCoroutinesApi +interface VertxConnectionFactory { + + suspend fun currentConnection(): SqlConnection? + + suspend fun newConnection(): SqlConnection + + fun pool(): Pool + + fun telemetry(): DataBaseTelemetry + + suspend fun withConnection(callback: suspend (SqlConnection) -> T): T + + suspend fun inTx(callback: suspend (SqlConnection) -> T): T +} diff --git a/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxDatabase.kt b/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxDatabase.kt new file mode 100644 index 000000000..f68b8ae6e --- /dev/null +++ b/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxDatabase.kt @@ -0,0 +1,119 @@ +package ru.tinkoff.kora.database.vertx.coroutines + +import io.netty.channel.EventLoopGroup +import io.vertx.kotlin.coroutines.await +import io.vertx.pgclient.PgPool +import io.vertx.sqlclient.Pool +import io.vertx.sqlclient.SqlConnection +import io.vertx.sqlclient.Transaction +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.reactor.mono +import reactor.core.publisher.Mono +import ru.tinkoff.kora.application.graph.Lifecycle +import ru.tinkoff.kora.application.graph.Wrapped +import ru.tinkoff.kora.common.Context +import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry +import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetryFactory +import ru.tinkoff.kora.database.vertx.VertxDatabaseConfig +import ru.tinkoff.kora.vertx.common.VertxUtil +import kotlin.coroutines.coroutineContext + +@ExperimentalCoroutinesApi +class VertxDatabase( + vertxDatabaseConfig: VertxDatabaseConfig, + eventLoopGroup: EventLoopGroup, + telemetryFactory: DataBaseTelemetryFactory +) : Lifecycle, Wrapped, VertxConnectionFactory { + + private val connectionKey: Context.Key = object : Context.Key() { + override fun copy(connection: SqlConnection): SqlConnection? { + return null + } + } + private val transactionKey: Context.Key = object : Context.Key() { + override fun copy(transaction: Transaction): Transaction? { + return null + } + } + private val pool: Pool + private val telemetry: DataBaseTelemetry + + init { + telemetry = telemetryFactory[vertxDatabaseConfig.poolName, "postgres", vertxDatabaseConfig.username] + pool = PgPool.pool(VertxUtil.customEventLoopVertx(eventLoopGroup), vertxDatabaseConfig.toPgConnectOptions(), vertxDatabaseConfig.toPgPoolOptions()) + } + + override suspend fun currentConnection(): SqlConnection? { + val ctx = Context.Kotlin.current(coroutineContext) + return ctx.get(this.connectionKey) + } + + override suspend fun newConnection(): SqlConnection { + return this.pool.connection + .await() + } + + override fun pool(): Pool = this.pool + + override fun telemetry(): DataBaseTelemetry = this.telemetry + + override suspend fun withConnection(callback: suspend (SqlConnection) -> T): T { + val ctx = Context.Kotlin.current(coroutineContext) + val currentConnection = ctx[connectionKey] + if (currentConnection != null) { + return callback(currentConnection) + } + var connection: SqlConnection? = null + return try { + connection = newConnection() + ctx[connectionKey] = connection + Context.Kotlin.inject(coroutineContext, ctx) + callback(connection) + } finally { + connection?.close() + ?.await() + ctx.remove(connectionKey) + } + } + + override suspend fun inTx(callback: suspend (SqlConnection) -> T): T { + return withConnection { connection -> + val ctx = Context.Kotlin.current(coroutineContext) + val currentTransaction = ctx[this.transactionKey] + if (currentTransaction != null) { + callback(connection) + } else { + val tx = connection.begin() + .await() + try { + ctx[this.transactionKey] = tx + val result = try { + callback(connection) + } catch (ex: Exception) { + tx.rollback() + .await() + throw ex + } + tx.commit() + .await() + result + } finally { + ctx.remove(this.transactionKey) + } + } + } + } + + override fun init(): Mono<*> = mono { + pool.query("SELECT 1") + .execute() + .await() + } + + override fun release(): Mono<*> = mono { + pool.close() + .await() + } + + override fun value(): Pool = this.pool +} diff --git a/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxRepository.kt b/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxRepository.kt new file mode 100644 index 000000000..6073f42cc --- /dev/null +++ b/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxRepository.kt @@ -0,0 +1,8 @@ +package ru.tinkoff.kora.database.vertx.coroutines + +import kotlinx.coroutines.ExperimentalCoroutinesApi + +@ExperimentalCoroutinesApi +interface VertxRepository { + val vertxConnectionFactory: VertxConnectionFactory +} diff --git a/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxRepositoryHelper.kt b/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxRepositoryHelper.kt new file mode 100644 index 000000000..9534d3be3 --- /dev/null +++ b/database/database-vertx-coroutines/src/main/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxRepositoryHelper.kt @@ -0,0 +1,172 @@ +package ru.tinkoff.kora.database.vertx.coroutines + +import io.vertx.core.AsyncResult +import io.vertx.kotlin.coroutines.await +import io.vertx.sqlclient.PreparedStatement +import io.vertx.sqlclient.SqlClient +import io.vertx.sqlclient.SqlConnection +import io.vertx.sqlclient.Tuple +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.future.await +import kotlinx.coroutines.reactive.asFlow +import reactor.core.publisher.Flux +import ru.tinkoff.kora.common.Context +import ru.tinkoff.kora.database.common.QueryContext +import ru.tinkoff.kora.database.common.UpdateCount +import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry +import ru.tinkoff.kora.database.vertx.mapper.result.VertxRowMapper +import ru.tinkoff.kora.database.vertx.mapper.result.VertxRowSetMapper +import java.util.concurrent.CompletableFuture +import kotlin.coroutines.coroutineContext + +@ExperimentalCoroutinesApi +@Suppress("MemberVisibilityCanBePrivate") +object VertxRepositoryHelper { + + suspend fun await(connectionFactory: VertxConnectionFactory, query: QueryContext, params: Tuple?) { + awaitSingleOrNull(connectionFactory, query, params) {} + } + + suspend fun await(connection: SqlClient, t: DataBaseTelemetry, query: QueryContext, params: Tuple?) { + awaitSingleOrNull(connection, t, query, params) {} + } + + suspend fun awaitSingleOrNull(connectionFactory: VertxConnectionFactory, query: QueryContext, params: Tuple?, mapper: VertxRowSetMapper): T? { + val currentConnection: SqlConnection? = connectionFactory.currentConnection() + if (currentConnection != null) { + return awaitSingleOrNull(currentConnection, connectionFactory.telemetry(), query, params, mapper) + } + var connection: SqlConnection? = null + try { + connection = connectionFactory.newConnection() + return awaitSingleOrNull(connection, connectionFactory.telemetry(), query, params, mapper) + } finally { + connection?.close() + ?.await() + } + } + + suspend fun awaitSingleOrNull(connection: SqlClient, t: DataBaseTelemetry, query: QueryContext, params: Tuple?, mapper: VertxRowSetMapper): T? { + val currentCoroutineContext = coroutineContext + val ctx = Context.Kotlin.current(currentCoroutineContext) + val telemetry = t.createContext(ctx, query) + val future = CompletableFuture() + connection.preparedQuery(query.sql) + .execute(params) { rowSetEvent -> + Context.Kotlin.inject(currentCoroutineContext, ctx) + if (rowSetEvent.failed()) { + telemetry.close(rowSetEvent.cause()) + future.completeExceptionally(rowSetEvent.cause()) + } else { + val result: T = try { + val rowSet = rowSetEvent.result() + mapper.apply(rowSet) + } catch (e: Exception) { + telemetry.close(e) + future.completeExceptionally(e) + return@execute + } + telemetry.close(null) + future.complete(result) + } + } + return future.await() + } + + suspend fun awaitBatch(connectionFactory: VertxConnectionFactory, query: QueryContext, params: List?): UpdateCount { + val currentConnection: SqlConnection? = connectionFactory.currentConnection() + if (currentConnection != null) { + return awaitBatch(currentConnection, connectionFactory.telemetry(), query, params) + } + var connection: SqlConnection? = null + try { + connection = connectionFactory.newConnection() + return awaitBatch(connection, connectionFactory.telemetry(), query, params) + } finally { + connection?.close() + ?.await() + } + } + + suspend fun awaitBatch(connection: SqlClient, t: DataBaseTelemetry, query: QueryContext, params: List?): UpdateCount { + val currentCoroutineContext = coroutineContext + val ctx = Context.Kotlin.current(currentCoroutineContext) + val telemetry = t.createContext(ctx, query) + val future = CompletableFuture() + connection.preparedQuery(query.sql).executeBatch(params) { rowSetEvent -> + Context.Kotlin.inject(currentCoroutineContext, ctx) + if (rowSetEvent.failed()) { + telemetry.close(rowSetEvent.cause()) + future.completeExceptionally(rowSetEvent.cause()) + } else { + var result = 0 + try { + var rowSet = rowSetEvent.result() + while (rowSet != null) { + result += rowSet.rowCount() + rowSet = rowSet.next() + } + } catch (e: Exception) { + telemetry.close(e) + future.completeExceptionally(e) + return@executeBatch + } + telemetry.close(null) + future.complete(UpdateCount(result.toLong())) + } + } + return future.await() + } + + suspend fun flow(connectionFactory: VertxConnectionFactory, query: QueryContext, params: Tuple?, mapper: VertxRowMapper): Flow { + val currentConnection: SqlConnection? = connectionFactory.currentConnection() + if (currentConnection != null) { + return flow(currentConnection, connectionFactory.telemetry(), query, params, mapper) + } + + var connection: SqlConnection? = null + try { + connection = connectionFactory.newConnection() + return flow(connection, connectionFactory.telemetry(), query, params, mapper) + } finally { + connection?.close() + ?.await() + } + } + + suspend fun flow(connection: SqlConnection, telemetry: DataBaseTelemetry, query: QueryContext, params: Tuple?, mapper: VertxRowMapper): Flow { + val currentCoroutineContext = coroutineContext + val ctx = Context.Kotlin.current(currentCoroutineContext) + + return Flux.create { sink -> + Context.Kotlin.inject(currentCoroutineContext, ctx) + val tctx = telemetry.createContext(ctx, query) + connection.prepare(query.sql) { statementEvent: AsyncResult -> + if (statementEvent.failed()) { + tctx.close(statementEvent.cause()) + sink.error(statementEvent.cause()) + return@prepare + } + val stmt = statementEvent.result() + val stream = stmt.createStream(50, params).pause() + sink.onDispose { stream.close() } + sink.onRequest { stream.fetch(it) } + stream.exceptionHandler { throwable -> + stmt.close() + tctx.close(null) + sink.error(throwable) + } + stream.endHandler { + stmt.close() + tctx.close(null) + sink.complete() + } + stream.handler { row -> + val mappedRow = mapper.apply(row) + sink.next(mappedRow) + } + } + }.asFlow() + } +} diff --git a/database/database-vertx-coroutines/src/main/resources/modules.json b/database/database-vertx-coroutines/src/main/resources/modules.json new file mode 100644 index 000000000..94f8e2439 --- /dev/null +++ b/database/database-vertx-coroutines/src/main/resources/modules.json @@ -0,0 +1,8 @@ +[ + { + "tags": [], + "typeRegex": "ru.tinkoff.kora.database.vertx.coroutines.VertxConnectionFactory", + "moduleName": "ru.tinkoff.kora.database.vertx.coroutines.VertxDatabaseModule", + "artifact": "ru.tinkoff.kora:database-vertx-coroutines" + } +] diff --git a/database/database-vertx-coroutines/src/test/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxConnectionFactoryTest.kt b/database/database-vertx-coroutines/src/test/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxConnectionFactoryTest.kt new file mode 100644 index 000000000..d66b1dcff --- /dev/null +++ b/database/database-vertx-coroutines/src/test/kotlin/ru/tinkoff/kora/database/vertx/coroutines/VertxConnectionFactoryTest.kt @@ -0,0 +1,141 @@ +package ru.tinkoff.kora.database.vertx.coroutines + +import io.netty.channel.nio.NioEventLoopGroup +import io.vertx.sqlclient.Tuple +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.extension.ExtendWith +import ru.tinkoff.kora.database.common.QueryContext +import ru.tinkoff.kora.database.common.telemetry.DefaultDataBaseTelemetryFactory +import ru.tinkoff.kora.database.vertx.VertxDatabaseConfig +import ru.tinkoff.kora.database.vertx.mapper.result.VertxRowSetMapper +import ru.tinkoff.kora.test.postgres.PostgresParams +import ru.tinkoff.kora.test.postgres.PostgresParams.ResultSetMapper +import ru.tinkoff.kora.test.postgres.PostgresTestContainer +import ru.tinkoff.kora.vertx.common.VertxUtil +import java.sql.SQLException +import java.time.Duration +import java.util.function.Consumer + +@OptIn(ExperimentalCoroutinesApi::class) +@ExtendWith(PostgresTestContainer::class) +class VertxConnectionFactoryTest { + + @Test + fun testQuery(params: PostgresParams) { + params.execute( + """ + CREATE TABLE test_table(id BIGSERIAL, value VARCHAR); + INSERT INTO test_table(value) VALUES ('test1'); + INSERT INTO test_table(value) VALUES ('test2'); + + """ + .trimIndent() + ) + val id = "SELECT * FROM test_table WHERE value = :value" + val sql = "SELECT * FROM test_table WHERE value = $1" + + data class Entity(val id: Long, val value: String) + + val mapper = VertxRowSetMapper { rows -> + Assertions.assertThat(rows.size() == 1) + val row = rows.iterator().next() + Entity(row.getLong(0), row.getString(1)) + } + + withDb(params) { db -> + val entity = runBlocking { + db.withConnection { connection -> + VertxRepositoryHelper.awaitSingleOrNull(connection, db.telemetry(), QueryContext(id, sql), Tuple.of("test1"), mapper) + } + } + Assertions.assertThat(entity) + .isNotNull + .isEqualTo(Entity(1, "test1")) + } + } + + @Test + fun testTransaction(params: PostgresParams) { + params.execute("CREATE TABLE test_table(id BIGSERIAL, value VARCHAR);") + val id = "INSERT INTO test_table(value) VALUES ('test1');" + val sql = "INSERT INTO test_table(value) VALUES ('test1');" + val extractor = ResultSetMapper, RuntimeException> { rs -> + val result = ArrayList() + try { + while (rs.next()) { + result.add(rs.getString(1)) + } + } catch (sqlException: SQLException) { + throw RuntimeException(sqlException) + } + result + } + + withDb(params) { db -> + assertThrows { + runBlocking { + db.inTx { connection -> + VertxRepositoryHelper.await(connection, db.telemetry(), QueryContext(id, sql), Tuple.tuple()) + throw RuntimeException("test") + } + } + } + var values = params.query("SELECT value FROM test_table", extractor) + Assertions.assertThat(values).hasSize(0) + + runBlocking { + db.inTx { connection -> + VertxRepositoryHelper.await(connection, db.telemetry(), QueryContext(id, sql), Tuple.tuple()) + } + } + values = params.query("SELECT value FROM test_table", extractor) + Assertions.assertThat(values).hasSize(1) + } + } + + companion object { + + private var eventLoopGroup: NioEventLoopGroup? = null + + @BeforeAll + @JvmStatic + fun beforeAll() { + eventLoopGroup = NioEventLoopGroup(1, VertxUtil.vertxThreadFactory()) + } + + @AfterAll + @JvmStatic + fun afterAll() { + eventLoopGroup!!.shutdownGracefully() + } + + private fun withDb(params: PostgresParams, consumer: Consumer) { + val config = VertxDatabaseConfig( + params.user, + params.password, + params.host, + params.port, + params.db, + "test", + Duration.ofMillis(1000), + Duration.ofMillis(1000), + Duration.ofMillis(1000), + 1, + true + ) + val db = VertxDatabase(config, eventLoopGroup!!, DefaultDataBaseTelemetryFactory(null, null, null)) + db.init().block() + try { + consumer.accept(db) + } finally { + db.release().block() + } + } + } +} diff --git a/database/database-vertx/build.gradle b/database/database-vertx/build.gradle index 4e71406cf..c983844c9 100644 --- a/database/database-vertx/build.gradle +++ b/database/database-vertx/build.gradle @@ -1,13 +1,14 @@ apply from: "${project.rootDir}/kotlin-plugin.gradle" dependencies { - api project(":database:database-common") + api project(":database:database-vertx-common") api project(":vertx-common") api project(":common") api(libs.vertx.pg.client) compileOnly(libs.kotlin.stdlib.lib) compileOnly(libs.kotlin.coroutines.reactor) + compileOnly(libs.kotlin.coroutines.jdk8) implementation "com.ongres.scram:common:2.1" implementation "com.ongres.scram:client:2.1" diff --git a/database/database-vertx/src/main/kotlin/ru/tinkoff/kora/database/vertx/VertxQueryExecutorExtension.kt b/database/database-vertx/src/main/kotlin/ru/tinkoff/kora/database/vertx/VertxQueryExecutorExtension.kt deleted file mode 100644 index 7edb0bf6b..000000000 --- a/database/database-vertx/src/main/kotlin/ru/tinkoff/kora/database/vertx/VertxQueryExecutorExtension.kt +++ /dev/null @@ -1,34 +0,0 @@ -package ru.tinkoff.kora.database.vertx - -import io.vertx.sqlclient.* -import kotlinx.coroutines.reactor.awaitSingle -import kotlinx.coroutines.reactor.mono -import ru.tinkoff.kora.database.common.QueryContext -import kotlin.coroutines.coroutineContext - -suspend inline fun VertxConnectionFactory.withConnection(noinline callback: suspend (SqlClient) -> T): T { - val ctx = coroutineContext - return withConnection { - mono(ctx) { - callback.invoke(it) - } - } - .awaitSingle() -} - -suspend inline fun VertxConnectionFactory.inTx(noinline callback: suspend (SqlConnection) -> T): T { - val ctx = coroutineContext - return this.inTx { - mono(ctx) { - callback.invoke(it) - } - }.awaitSingle() -} -// -//suspend inline fun VertxConnectionFactory.query(connection: SqlConnection, query: QueryContext, noinline parameters: () -> Tuple, noinline mapper: (RowSet) -> T): T { -// return this.query(connection, query, parameters, mapper).awaitSingle() -//} -// -//suspend inline fun VertxConnectionFactory.query(query: QueryContext, noinline parameters: () -> Tuple, noinline mapper: (RowSet) -> T): T { -// return this.query(query, parameters, mapper).awaitSingle() -//} diff --git a/dependencies.gradle b/dependencies.gradle index 21e5d6646..07935991a 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -54,6 +54,7 @@ dependencyResolutionManagement { DependencyResolutionManagement it -> library('kotlin-coroutines-core', 'org.jetbrains.kotlinx', 'kotlinx-coroutines-core').versionRef('kotlin-coroutines') library('kotlin-coroutines-reactor', 'org.jetbrains.kotlinx', 'kotlinx-coroutines-reactor').versionRef('kotlin-coroutines') + library('kotlin-coroutines-reactive', 'org.jetbrains.kotlinx', 'kotlinx-coroutines-reactive').versionRef('kotlin-coroutines') library('kotlin-coroutines-jdk8', 'org.jetbrains.kotlinx', 'kotlinx-coroutines-jdk8').versionRef('kotlin-coroutines') library('jackson-core', 'com.fasterxml.jackson.core', 'jackson-core').versionRef('jackson') @@ -87,6 +88,7 @@ dependencyResolutionManagement { DependencyResolutionManagement it -> library('vertx-core', 'io.vertx', 'vertx-core').versionRef('vertx') library('vertx-pg-client', 'io.vertx', 'vertx-pg-client').versionRef('vertx') + library('vertx-kotlin-coroutines', 'io.vertx', 'vertx-lang-kotlin-coroutines').versionRef('vertx') library('r2dbc-spi', 'io.r2dbc', 'r2dbc-spi').version('1.0.0.RELEASE') library('r2dbc-pool', 'io.r2dbc', 'r2dbc-pool').version('1.0.0.RELEASE') diff --git a/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/declaration/ComponentDeclaration.kt b/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/declaration/ComponentDeclaration.kt index 33f514b64..f5f3013eb 100644 --- a/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/declaration/ComponentDeclaration.kt +++ b/kora-app-symbol-processor/src/main/kotlin/ru/tinkoff/kora/kora/app/ksp/declaration/ComponentDeclaration.kt @@ -1,7 +1,13 @@ package ru.tinkoff.kora.kora.app.ksp.declaration -import com.google.devtools.ksp.symbol.* +import com.google.devtools.ksp.symbol.KSClassDeclaration +import com.google.devtools.ksp.symbol.KSDeclaration +import com.google.devtools.ksp.symbol.KSFunctionDeclaration +import com.google.devtools.ksp.symbol.KSType +import com.google.devtools.ksp.symbol.KSTypeArgument +import com.google.devtools.ksp.symbol.KSTypeParameter import com.squareup.kotlinpoet.TypeName +import com.squareup.kotlinpoet.ksp.toClassName import ru.tinkoff.kora.kora.app.ksp.ProcessingContext import ru.tinkoff.kora.kora.app.ksp.extension.ExtensionResult import ru.tinkoff.kora.ksp.common.CommonClassNames @@ -130,9 +136,7 @@ sealed interface ComponentDeclaration { fun fromDependency(ctx: ProcessingContext, classDeclaration: KSClassDeclaration): DiscoveredAsDependencyComponent { val constructor = classDeclaration.primaryConstructor - if (constructor == null) { - throw ProcessingErrorException("No primary constructor to parse component", classDeclaration) - } + ?: throw ProcessingErrorException("No primary constructor to parse component ${classDeclaration.toClassName().canonicalName}", classDeclaration) val type = classDeclaration.asType(listOf()) if (type.isError) { throw ProcessingErrorException("Component type is not resolvable in the current round of processing", classDeclaration) diff --git a/mkdocs/docs/features/database.md b/mkdocs/docs/features/database.md index bce54fbe5..82bd1c474 100644 --- a/mkdocs/docs/features/database.md +++ b/mkdocs/docs/features/database.md @@ -136,7 +136,9 @@ interface ComplexRepository extends JdbcRepository { ``` ### Vert.x -При подключении через `Vert.x` следует добавить `VertxDatabaseModule`. Внутри `VertxDatabaseModule` создаются экземпляры классов `VertxDatabaseConfig` и `VertxDatabase`. +При подключении через `Vert.x` следует добавить либо `VertxDatabaseModule` из `database-vertx`, либо `VertxDatabaseModule` из `database-vertx-coroutines`. +Последний нужен только для Kotlin проектов, в которых используются корутины. +Внутри `VertxDatabaseModule` создаются экземпляры классов `VertxDatabaseConfig` и `VertxDatabase`. Параметры, описанные в классе `VertxDatabaseConfig`: @@ -170,14 +172,30 @@ db { @Repository public interface WithExecutorAccessorRepository extends VertxRepository { default Mono selectTwo() { - return this.getVertxConnectionFactory().inTx(connection -> { - return getVertxConnectionFactory().query(connection, new QueryContext("SELECT 2", "SELECT 2"), Tuple::tuple, rs -> { + var vertxConnectionFactory = getVertxConnectionFactory(); + return vertxConnectionFactory.inTx(connection -> + VertxRepositoryHelper.mono(connection, vertxConnectionFactory.telemetry(), new QueryContext("SELECT 2", "SELECT 2"), Tuple.tuple(), rs -> { for (var row : rs) { return row.getInteger(1); } return null; - }); - }); + }) + ); + } +} +``` + +Для Kotlin проектов с корутинами `VertxConnectionFactory` и `VertxRepository` используются следующим способом: + +```kotlin +@Repository +interface WithExecutorAccessorRepository : VertxRepository { + suspend fun selectTwo(): Int { + return this.vertxConnectionFactory.inTx { connection -> + VertxRepositoryHelper.awaitSingleOrNull(connection, vertxConnectionFactory.telemetry(), QueryContext("SELECT 2", "SELECT 2"), Tuple.tuple()) { rs -> + rs.first().getInteger(1) + }!! + } } } ``` diff --git a/settings.gradle b/settings.gradle index a7b6f0cde..edf435d84 100644 --- a/settings.gradle +++ b/settings.gradle @@ -44,7 +44,9 @@ include( 'database:database-jdbc', 'database:database-jdbi', 'database:database-r2dbc', + 'database:database-vertx-common', 'database:database-vertx', + 'database:database-vertx-coroutines', 'database:database-flyway', 'database:database-cassandra', 'jms',