Skip to content

Coroutines based database-vertx implementation #106

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/src/main/java/ru/tinkoff/kora/common/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public static Context current(CoroutineContext ctx) {
public static CoroutineContext inject(CoroutineContext cctx, Context context) {
var reactorContext = Reactor.inject(reactor.util.context.Context.of(Context.class, cctx), context);
var coroutineContext = (CoroutineContext) (Object) ReactorContextKt.asCoroutineContext(reactorContext);

return cctx.plus(coroutineContext).plus(asCoroutineContext(context));
var contextElement = new CoroutineContextElement(context);
return cctx.plus(contextElement).plus(coroutineContext).plus(asCoroutineContext(context));
}

public static CoroutineContext asCoroutineContext(Context ctx) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ru.tinkoff.kora.common

import kotlin.coroutines.CoroutineContext

fun CoroutineContext.currentKoraContext(): Context = Context.Kotlin.current(this)

fun CoroutineContext.inject(context: Context): CoroutineContext = Context.Kotlin.inject(this, context)
2 changes: 2 additions & 0 deletions database/database-annotation-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ dependencies {
testImplementation testFixtures(project(':annotation-processor-common'))
testImplementation project(':database:database-common')
testImplementation project(':database:database-jdbc')
testImplementation project(':database:database-vertx-common')
testImplementation project(':database:database-vertx')
testImplementation project(':database:database-vertx-coroutines')
testImplementation project(':database:database-r2dbc')
testImplementation project(':database:database-cassandra')
}
Expand Down
2 changes: 2 additions & 0 deletions database/database-symbol-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ dependencies {
testImplementation testFixtures(project(':annotation-processor-common'))
testImplementation project(':database:database-common')
testImplementation project(':database:database-jdbc')
testImplementation project(':database:database-vertx-common')
testImplementation project(':database:database-vertx')
testImplementation project(':database:database-vertx-coroutines')
testImplementation project(':database:database-r2dbc')
testImplementation project(':database:database-cassandra')
testImplementation(libs.kotlin.stdlib.lib)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import com.google.devtools.ksp.processing.KSPLogger
import com.google.devtools.ksp.processing.Resolver
import com.google.devtools.ksp.symbol.ClassKind
import com.google.devtools.ksp.symbol.KSClassDeclaration
import com.squareup.kotlinpoet.*
import com.squareup.kotlinpoet.AnnotationSpec
import com.squareup.kotlinpoet.CodeBlock
import com.squareup.kotlinpoet.FunSpec
import com.squareup.kotlinpoet.ParameterSpec
import com.squareup.kotlinpoet.TypeSpec
import com.squareup.kotlinpoet.ksp.addOriginatingKSFile
import com.squareup.kotlinpoet.ksp.toClassName
import com.squareup.kotlinpoet.ksp.toTypeName
import org.slf4j.LoggerFactory
import ru.tinkoff.kora.database.symbol.processor.cassandra.CassandraRepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.jdbc.JdbcRepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.r2dbc.R2DbcRepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.vertx.VertxCoroutineBasedRepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.vertx.VertxRepositoryGenerator
import ru.tinkoff.kora.kora.app.ksp.extendsKeepAop
import ru.tinkoff.kora.ksp.common.KspCommonUtils.generated
Expand All @@ -27,6 +32,7 @@ class RepositoryBuilder(
private val availableGenerators = listOf(
JdbcRepositoryGenerator(resolver),
VertxRepositoryGenerator(resolver, kspLogger),
VertxCoroutineBasedRepositoryGenerator(resolver, kspLogger),
R2DbcRepositoryGenerator(resolver),
CassandraRepositoryGenerator(resolver),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package ru.tinkoff.kora.database.symbol.processor.vertx

import com.google.devtools.ksp.processing.KSPLogger
import com.google.devtools.ksp.processing.Resolver
import com.google.devtools.ksp.symbol.KSClassDeclaration
import com.google.devtools.ksp.symbol.KSFunction
import com.google.devtools.ksp.symbol.KSFunctionDeclaration
import com.squareup.kotlinpoet.AnnotationSpec
import com.squareup.kotlinpoet.ClassName
import com.squareup.kotlinpoet.CodeBlock
import com.squareup.kotlinpoet.FunSpec
import com.squareup.kotlinpoet.KModifier
import com.squareup.kotlinpoet.MemberName
import com.squareup.kotlinpoet.ParameterSpec
import com.squareup.kotlinpoet.ParameterizedTypeName.Companion.parameterizedBy
import com.squareup.kotlinpoet.PropertySpec
import com.squareup.kotlinpoet.TypeSpec
import com.squareup.kotlinpoet.ksp.toTypeName
import ru.tinkoff.kora.database.symbol.processor.DbUtils
import ru.tinkoff.kora.database.symbol.processor.DbUtils.addMapper
import ru.tinkoff.kora.database.symbol.processor.DbUtils.findQueryMethods
import ru.tinkoff.kora.database.symbol.processor.DbUtils.parseExecutorTag
import ru.tinkoff.kora.database.symbol.processor.DbUtils.queryMethodBuilder
import ru.tinkoff.kora.database.symbol.processor.DbUtils.resultMapperName
import ru.tinkoff.kora.database.symbol.processor.DbUtils.updateCount
import ru.tinkoff.kora.database.symbol.processor.Mapper
import ru.tinkoff.kora.database.symbol.processor.QueryWithParameters
import ru.tinkoff.kora.database.symbol.processor.RepositoryGenerator
import ru.tinkoff.kora.database.symbol.processor.model.QueryParameter
import ru.tinkoff.kora.database.symbol.processor.model.QueryParameterParser
import ru.tinkoff.kora.ksp.common.AnnotationUtils.findAnnotation
import ru.tinkoff.kora.ksp.common.AnnotationUtils.findValue
import ru.tinkoff.kora.ksp.common.CommonClassNames
import ru.tinkoff.kora.ksp.common.CommonClassNames.isFlow
import ru.tinkoff.kora.ksp.common.CommonClassNames.isList
import ru.tinkoff.kora.ksp.common.FieldFactory
import ru.tinkoff.kora.ksp.common.FunctionUtils.isFlow
import ru.tinkoff.kora.ksp.common.FunctionUtils.isSuspend
import ru.tinkoff.kora.ksp.common.parseMappingData

class VertxCoroutineBasedRepositoryGenerator(private val resolver: Resolver, private val kspLogger: KSPLogger) : RepositoryGenerator {
private val runBlocking = MemberName("kotlinx.coroutines", "runBlocking")
private val repositoryInterface = resolver.getClassDeclarationByName(resolver.getKSNameFromString(VertxTypes.Coroutines.repository.canonicalName))?.asStarProjectedType()
override fun repositoryInterface() = repositoryInterface

override fun generate(repositoryType: KSClassDeclaration, typeBuilder: TypeSpec.Builder, constructorBuilder: FunSpec.Builder): TypeSpec {
this.enrichWithExecutor(repositoryType, typeBuilder, constructorBuilder)
val repositoryResolvedType = repositoryType.asStarProjectedType()
val resultMappers = FieldFactory(typeBuilder, constructorBuilder, "_result_mapper_")
val parameterMappers = FieldFactory(typeBuilder, constructorBuilder, "_parameter_mapper_")
for (method in repositoryType.findQueryMethods()) {
val methodType = method.asMemberOf(repositoryResolvedType)
val parameters = QueryParameterParser.parse(listOf(VertxTypes.sqlConnection, VertxTypes.sqlClient), method, methodType)
val queryAnnotation = method.findAnnotation(DbUtils.queryAnnotation)!!
val queryString = queryAnnotation.findValue<String>("value")!!
val query = QueryWithParameters.parse(queryString, parameters)
val resultMapperName = this.parseResultMapper(method, parameters, methodType)?.let { resultMappers.addMapper(it) }
DbUtils.parseParameterMappers(method, parameters, query, VertxTypes.parameterColumnMapper) { VertxNativeTypes.findNativeType(it.toTypeName()) != null }
.forEach { parameterMappers.addMapper(it) }
val methodSpec = this.generate(method, methodType, query, parameters, resultMapperName, parameterMappers)
typeBuilder.addFunction(methodSpec)
}

return typeBuilder
.addAnnotation(
AnnotationSpec.builder(ClassName("kotlin", "OptIn"))
.addMember("%T::class", ClassName("kotlinx.coroutines", "ExperimentalCoroutinesApi"))
.build()
)
.primaryConstructor(constructorBuilder.build())
.build()
}

private fun generate(funDeclaration: KSFunctionDeclaration, function: KSFunction, query: QueryWithParameters, parameters: List<QueryParameter>, resultMapperName: String?, parameterMappers: FieldFactory): FunSpec {
var sql = query.rawQuery
query.parameters.indices.asSequence()
.map { query.parameters[it].sqlParameterName to "$" + (it + 1) }
.sortedByDescending { it.first.length }
.forEach { sql = sql.replace(":" + it.first, it.second) }

val b = funDeclaration.queryMethodBuilder(resolver)
b.addCode("val _query = %T(\n %S,\n %S\n)\n", DbUtils.queryContext, query.rawQuery, sql)
val batchParam = parameters.firstOrNull { it is QueryParameter.BatchParameter }
val isSuspend = funDeclaration.isSuspend()
val isFlow = funDeclaration.isFlow()
ParametersToTupleBuilder.generate(b, query, funDeclaration, parameters, batchParam, parameterMappers)
val connectionParameter = parameters.asSequence().filterIsInstance<QueryParameter.ConnectionParameter>().firstOrNull()?.variable?.name?.asString()

if (isSuspend) {
b.addCode("return ")
} else {
b.addCode("return %M {\n", runBlocking)
}

if (batchParam != null) {
if (connectionParameter == null) {
b.addCode("%T.awaitBatch(vertxConnectionFactory, _query, _batchParams)", VertxTypes.Coroutines.repositoryHelper)
} else {
b.addCode("%T.awaitBatch(%N, vertxConnectionFactory.telemetry(), _query, _batchParams)", VertxTypes.Coroutines.repositoryHelper, connectionParameter)
}
if (function.returnType == resolver.builtIns.unitType) {
b.addCode(" \n.let {}")
}
} else if (isFlow) {
if (connectionParameter == null) {
b.addCode("%T.flow(vertxConnectionFactory, _query, _tuple, %N)", VertxTypes.Coroutines.repositoryHelper, resultMapperName)
} else {
b.addCode("%T.flow(%N, vertxConnectionFactory.telemetry(), _query, _tuple, %N)", VertxTypes.Coroutines.repositoryHelper, connectionParameter, resultMapperName)
}
} else {
if (function.returnType == resolver.builtIns.unitType) {
if (connectionParameter == null) {
b.addCode("%T.await(vertxConnectionFactory, _query, _tuple)", VertxTypes.Coroutines.repositoryHelper)
} else {
b.addCode("%T.await(%N, vertxConnectionFactory.telemetry(), _query, _tuple)", VertxTypes.Coroutines.repositoryHelper, connectionParameter)
}
} else {
if (connectionParameter == null) {
b.addCode("%T.awaitSingleOrNull(vertxConnectionFactory, _query, _tuple", VertxTypes.Coroutines.repositoryHelper)
} else {
b.addCode("%T.awaitSingleOrNull(%N, vertxConnectionFactory.telemetry(), _query, _tuple", VertxTypes.Coroutines.repositoryHelper, connectionParameter)
}
if (function.returnType?.toTypeName() == updateCount) {
b.addCode(") { %T.extractUpdateCount(it) }", VertxTypes.rowSetMapper)
} else {
b.addCode(", %N)", resultMapperName)
}
if (function.returnType?.isMarkedNullable == false) {
b.addCode("!!")
}
}
}
b.addCode("\n")
if (!isSuspend) {
b.addCode(" }\n")
}
return b.build()
}

private fun parseResultMapper(method: KSFunctionDeclaration, parameters: List<QueryParameter>, methodType: KSFunction): Mapper? {
for (parameter in parameters) {
if (parameter is QueryParameter.BatchParameter) {
return null
}
}
val returnType = methodType.returnType!!
val mapperName = method.resultMapperName()
val mappings = method.parseMappingData()
val resultSetMapper = mappings.getMapping(VertxTypes.rowSetMapper)
val rowMapper = mappings.getMapping(VertxTypes.rowMapper)
if (returnType.isFlow()) {
val flowParam = returnType.arguments[0]
val returnTypeName = flowParam.toTypeName().copy(false)
val mapperType = VertxTypes.rowMapper.parameterizedBy(returnTypeName)
if (rowMapper != null) {
return Mapper(rowMapper, mapperType, mapperName)
}
return Mapper(mapperType, mapperName)
}
val mapperType = VertxTypes.rowSetMapper.parameterizedBy(returnType.toTypeName())
if (resultSetMapper != null) {
return Mapper(resultSetMapper, mapperType, mapperName)
}
if (rowMapper != null) {
if (returnType.isList()) {
return Mapper(rowMapper, mapperType, mapperName) {
CodeBlock.of("%T.listRowSetMapper(%L)", VertxTypes.rowSetMapper, it)
}
} else {
return Mapper(rowMapper, mapperType, mapperName) {
CodeBlock.of("%T.singleRowSetMapper(%L)", VertxTypes.rowSetMapper, it)
}
}
}
if (returnType == resolver.builtIns.unitType) {
return null
}
if (returnType.toTypeName() == updateCount) {
return null
}
return Mapper(mapperType, mapperName)
}

private fun enrichWithExecutor(repositoryElement: KSClassDeclaration, builder: TypeSpec.Builder, constructorBuilder: FunSpec.Builder) {
builder.addSuperinterface(VertxTypes.Coroutines.repository)

builder.addProperty(
PropertySpec.builder("vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory, KModifier.OVERRIDE)
.build()
)
val executorTag = repositoryElement.parseExecutorTag()
if (executorTag != null) {
constructorBuilder.addParameter(
ParameterSpec.builder("_vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory).addAnnotation(
AnnotationSpec.builder(CommonClassNames.tag).addMember("value = %L", executorTag).build()
).build()
)
} else {
constructorBuilder.addParameter("_vertxConnectionFactory", VertxTypes.Coroutines.connectionFactory)
}
constructorBuilder.addStatement("this.vertxConnectionFactory = _vertxConnectionFactory")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@ object VertxTypes {
val rowMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.result", "VertxRowMapper")
val resultColumnMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.result", "VertxResultColumnMapper")
val parameterColumnMapper = ClassName("ru.tinkoff.kora.database.vertx.mapper.parameter", "VertxParameterColumnMapper")

object Coroutines {
val connectionFactory = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxConnectionFactory")
val repository = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxRepository")
val repositoryHelper = ClassName("ru.tinkoff.kora.database.vertx.coroutines", "VertxRepositoryHelper")
}
}
17 changes: 17 additions & 0 deletions database/database-vertx-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apply from: "${project.rootDir}/kotlin-plugin.gradle"

dependencies {
api project(":database:database-common")
api project(":vertx-common")
api project(":common")

api(libs.vertx.pg.client)
compileOnly(libs.kotlin.stdlib.lib)
compileOnly(libs.kotlin.coroutines.reactor)
compileOnly(libs.kotlin.coroutines.jdk8)
implementation "com.ongres.scram:common:2.1"
implementation "com.ongres.scram:client:2.1"

testImplementation project(':test:test-postgres')
testImplementation libs.reactor.test
}
19 changes: 19 additions & 0 deletions database/database-vertx-coroutines/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apply from: "${project.rootDir}/kotlin-plugin.gradle"

dependencies {
api project(":database:database-vertx-common")
api project(":vertx-common")
api project(":common")

api(libs.vertx.pg.client)
api(libs.vertx.kotlin.coroutines)
compileOnly(libs.kotlin.stdlib.lib)
compileOnly(libs.kotlin.coroutines.reactor)
compileOnly(libs.kotlin.coroutines.reactive)
compileOnly(libs.kotlin.coroutines.jdk8)
implementation "com.ongres.scram:common:2.1"
implementation "com.ongres.scram:client:2.1"

testImplementation project(':test:test-postgres')
testImplementation libs.reactor.test
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ru.tinkoff.kora.database.vertx.coroutines;

import com.typesafe.config.Config;
import io.netty.channel.EventLoopGroup;
import ru.tinkoff.kora.config.common.extractor.ConfigValueExtractor;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetryFactory;
import ru.tinkoff.kora.database.vertx.VertxDatabaseBaseModule;
import ru.tinkoff.kora.database.vertx.VertxDatabaseConfig;

public interface VertxDatabaseModule extends VertxDatabaseBaseModule {
default VertxDatabaseConfig vertxDatabaseConfig(Config config, ConfigValueExtractor<VertxDatabaseConfig> extractor) {
var value = config.getValue("db");
return extractor.extract(value);
}

default VertxDatabase vertxDatabase(VertxDatabaseConfig vertxDatabaseConfig, EventLoopGroup eventLoopGroup, DataBaseTelemetryFactory telemetryFactory) {
return new VertxDatabase(vertxDatabaseConfig, eventLoopGroup, telemetryFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ru.tinkoff.kora.database.vertx.coroutines

import io.vertx.sqlclient.Pool
import io.vertx.sqlclient.SqlConnection
import kotlinx.coroutines.ExperimentalCoroutinesApi
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry

@ExperimentalCoroutinesApi
interface VertxConnectionFactory {

suspend fun currentConnection(): SqlConnection?

suspend fun newConnection(): SqlConnection

fun pool(): Pool

fun telemetry(): DataBaseTelemetry

suspend fun <T: Any?> withConnection(callback: suspend (SqlConnection) -> T): T

suspend fun <T: Any?> inTx(callback: suspend (SqlConnection) -> T): T
}
Loading