From f3787de545899508fc800efa53638ae9a48fd253 Mon Sep 17 00:00:00 2001 From: Jakub Strojewski Date: Thu, 28 Nov 2024 12:58:43 +0000 Subject: [PATCH] poc: generalized approach to extracting steps (#2065) --- .../src/main/scala/doobie/hi/connection.scala | 54 +++++++++++++++++++ .../src/main/scala/doobie/util/query.scala | 42 +++++---------- 2 files changed, 66 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/scala/doobie/hi/connection.scala b/modules/core/src/main/scala/doobie/hi/connection.scala index abdf38deb..d40a1b0c0 100644 --- a/modules/core/src/main/scala/doobie/hi/connection.scala +++ b/modules/core/src/main/scala/doobie/hi/connection.scala @@ -87,6 +87,17 @@ object connection { loggingInfo ) + def executionWithResultSet[A]( + prepared: PreparedExecutionWithResultSet[A], + loggingInfo: LoggingInfo + ): ConnectionIO[A] = executeWithResultSet( + prepared.create, + prepared.prep, + prepared.exec, + prepared.process, + loggingInfo + ) + /** Create and execute a PreparedStatement which immediately returns the result without reading from a ResultSet. The * most common case is executing an INSERT/UPDATE and it returning the rows inserted/updated. If the query you're * executing returns a ResultSet, use `executeWithResultSet` instead for better logging and resource cleanup. @@ -116,6 +127,17 @@ object connection { loggingInfo ) + def executeWithoutResultSet[A]( + prepared: PreparedExecutionWithoutResultSet[A], + loggingInfo: LoggingInfo + ): ConnectionIO[A] = + executeWithoutResultSet( + prepared.create, + prepared.prep, + prepared.exec, + loggingInfo + ) + private def execImpl[A]( create: ConnectionIO[PreparedStatement], prep: PreparedStatementIO[Unit], @@ -226,6 +248,18 @@ object connection { } yield ele } + def stream[A: Read]( + prepared: PreparedExecutionStream, + loggingInfo: LoggingInfo + ): Stream[ConnectionIO, A] = + stream[A]( + prepared.create, + prepared.prep, + prepared.exec, + prepared.chunkSize, + loggingInfo + ) + // Old implementation, used by deprecated methods private def liftStream[A: Read]( chunkSize: Int, @@ -543,4 +577,24 @@ object connection { // val nativeTypeMap: ConnectionIO[Map[String, JdbcType]] = { // getMetaData(IFDMD.getTypeInfo.flatMap(IFDMD.embed(_, HRS.list[(String, JdbcType)].map(_.toMap)))) // } + + final case class PreparedExecutionWithResultSet[A]( + create: ConnectionIO[PreparedStatement], + prep: PreparedStatementIO[Unit], + exec: PreparedStatementIO[ResultSet], + process: ResultSetIO[A] + ) + + final case class PreparedExecutionWithoutResultSet[A]( + create: ConnectionIO[PreparedStatement], + prep: PreparedStatementIO[Unit], + exec: PreparedStatementIO[A] + ) + + final case class PreparedExecutionStream( + create: ConnectionIO[PreparedStatement], + prep: PreparedStatementIO[Unit], + exec: PreparedStatementIO[ResultSet], + chunkSize: Int + ) } diff --git a/modules/core/src/main/scala/doobie/util/query.scala b/modules/core/src/main/scala/doobie/util/query.scala index 7a8453c7e..7da75b8c2 100644 --- a/modules/core/src/main/scala/doobie/util/query.scala +++ b/modules/core/src/main/scala/doobie/util/query.scala @@ -11,6 +11,7 @@ import doobie.free.connection.ConnectionIO import doobie.free.preparedstatement.PreparedStatementIO import doobie.free.resultset.ResultSetIO import doobie.free.{connection as IFC, preparedstatement as IFPS} +import doobie.hi.connection.PreparedExecutionWithResultSet import doobie.hi.{connection as IHC, preparedstatement as IHPS, resultset as IHRS} import doobie.util.MultiVersionTypeSupport.=:= import doobie.util.analysis.Analysis @@ -20,8 +21,6 @@ import doobie.util.log.{LoggingInfo, Parameters} import doobie.util.pos.Pos import fs2.Stream -import java.sql.{PreparedStatement, ResultSet} - /** Module defining queries parameterized by input and output types. */ object query { @@ -153,14 +152,22 @@ object query { toConnectionIO(a, IHRS.nel[B]) private def toConnectionIO[C](a: A, rsio: ResultSetIO[C]): ConnectionIO[C] = - PreparedExecution(sql, a, rsio).execute(mkLoggingInfo(a)) + IHC.executionWithResultSet(preparedExecution(sql, a, rsio), mkLoggingInfo(a)) private def toConnectionIOAlteringExecution[C]( a: A, rsio: ResultSetIO[C], fn: PreparedExecutionUpdate[C] ): ConnectionIO[C] = - fn(PreparedExecution(sql, a, rsio)).execute(mkLoggingInfo(a)) + IHC.executionWithResultSet(fn(preparedExecution(sql, a, rsio)), mkLoggingInfo(a)) + + private def preparedExecution[C](sql: String, a: A, rsio: ResultSetIO[C]): PreparedExecutionWithResultSet[C] = + PreparedExecutionWithResultSet( + create = IFC.prepareStatement(sql), + prep = IHPS.set(a), + exec = IFPS.executeQuery, + process = rsio + ) private def mkLoggingInfo(a: A): LoggingInfo = LoggingInfo( @@ -258,32 +265,7 @@ object query { } - type PreparedExecutionUpdate[A] = PreparedExecution[A] => PreparedExecution[A] - - final case class PreparedExecution[C]( - create: ConnectionIO[PreparedStatement], - prep: PreparedStatementIO[Unit], - exec: PreparedStatementIO[ResultSet], - process: ResultSetIO[C] - ) { ctx => - private[util] def execute(loggingInfo: LoggingInfo) = IHC.executeWithResultSet( - create = ctx.create, - prep = ctx.prep, - exec = ctx.exec, - process = ctx.process, - loggingInfo = loggingInfo - ) - } - - private object PreparedExecution { - def apply[C, A](sql: String, a: A, rsio: ResultSetIO[C])(implicit w: Write[A]): PreparedExecution[C] = - PreparedExecution( - create = IFC.prepareStatement(sql), - prep = IHPS.set(a), - exec = IFPS.executeQuery, - process = rsio - ) - } + type PreparedExecutionUpdate[A] = PreparedExecutionWithResultSet[A] => PreparedExecutionWithResultSet[A] /** An abstract query closed over its input arguments and yielding values of type `B`, without a specified * disposition. Methods provided on `[[Query0]]` allow the query to be interpreted as a stream or program in