Skip to content

Commit

Permalink
poc: generalized approach to extracting steps (#2065)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulfryk committed Nov 28, 2024
1 parent 4d1029e commit f3787de
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 30 deletions.
54 changes: 54 additions & 0 deletions modules/core/src/main/scala/doobie/hi/connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ object connection {
loggingInfo
)

def executionWithResultSet[A](
prepared: PreparedExecutionWithResultSet[A],
loggingInfo: LoggingInfo
): ConnectionIO[A] = executeWithResultSet(
prepared.create,
prepared.prep,
prepared.exec,
prepared.process,
loggingInfo
)

/** Create and execute a PreparedStatement which immediately returns the result without reading from a ResultSet. The
* most common case is executing an INSERT/UPDATE and it returning the rows inserted/updated. If the query you're
* executing returns a ResultSet, use `executeWithResultSet` instead for better logging and resource cleanup.
Expand Down Expand Up @@ -116,6 +127,17 @@ object connection {
loggingInfo
)

def executeWithoutResultSet[A](
prepared: PreparedExecutionWithoutResultSet[A],
loggingInfo: LoggingInfo
): ConnectionIO[A] =
executeWithoutResultSet(
prepared.create,
prepared.prep,
prepared.exec,
loggingInfo
)

private def execImpl[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
Expand Down Expand Up @@ -226,6 +248,18 @@ object connection {
} yield ele
}

def stream[A: Read](
prepared: PreparedExecutionStream,
loggingInfo: LoggingInfo
): Stream[ConnectionIO, A] =
stream[A](
prepared.create,
prepared.prep,
prepared.exec,
prepared.chunkSize,
loggingInfo
)

// Old implementation, used by deprecated methods
private def liftStream[A: Read](
chunkSize: Int,
Expand Down Expand Up @@ -543,4 +577,24 @@ object connection {
// val nativeTypeMap: ConnectionIO[Map[String, JdbcType]] = {
// getMetaData(IFDMD.getTypeInfo.flatMap(IFDMD.embed(_, HRS.list[(String, JdbcType)].map(_.toMap))))
// }

final case class PreparedExecutionWithResultSet[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[ResultSet],
process: ResultSetIO[A]
)

final case class PreparedExecutionWithoutResultSet[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[A]
)

final case class PreparedExecutionStream(
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[ResultSet],
chunkSize: Int
)
}
42 changes: 12 additions & 30 deletions modules/core/src/main/scala/doobie/util/query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import doobie.free.connection.ConnectionIO
import doobie.free.preparedstatement.PreparedStatementIO
import doobie.free.resultset.ResultSetIO
import doobie.free.{connection as IFC, preparedstatement as IFPS}
import doobie.hi.connection.PreparedExecutionWithResultSet
import doobie.hi.{connection as IHC, preparedstatement as IHPS, resultset as IHRS}
import doobie.util.MultiVersionTypeSupport.=:=
import doobie.util.analysis.Analysis
Expand All @@ -20,8 +21,6 @@ import doobie.util.log.{LoggingInfo, Parameters}
import doobie.util.pos.Pos
import fs2.Stream

import java.sql.{PreparedStatement, ResultSet}

/** Module defining queries parameterized by input and output types. */
object query {

Expand Down Expand Up @@ -153,14 +152,22 @@ object query {
toConnectionIO(a, IHRS.nel[B])

private def toConnectionIO[C](a: A, rsio: ResultSetIO[C]): ConnectionIO[C] =
PreparedExecution(sql, a, rsio).execute(mkLoggingInfo(a))
IHC.executionWithResultSet(preparedExecution(sql, a, rsio), mkLoggingInfo(a))

private def toConnectionIOAlteringExecution[C](
a: A,
rsio: ResultSetIO[C],
fn: PreparedExecutionUpdate[C]
): ConnectionIO[C] =
fn(PreparedExecution(sql, a, rsio)).execute(mkLoggingInfo(a))
IHC.executionWithResultSet(fn(preparedExecution(sql, a, rsio)), mkLoggingInfo(a))

private def preparedExecution[C](sql: String, a: A, rsio: ResultSetIO[C]): PreparedExecutionWithResultSet[C] =
PreparedExecutionWithResultSet(
create = IFC.prepareStatement(sql),
prep = IHPS.set(a),
exec = IFPS.executeQuery,
process = rsio
)

private def mkLoggingInfo(a: A): LoggingInfo =
LoggingInfo(
Expand Down Expand Up @@ -258,32 +265,7 @@ object query {

}

type PreparedExecutionUpdate[A] = PreparedExecution[A] => PreparedExecution[A]

final case class PreparedExecution[C](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[ResultSet],
process: ResultSetIO[C]
) { ctx =>
private[util] def execute(loggingInfo: LoggingInfo) = IHC.executeWithResultSet(
create = ctx.create,
prep = ctx.prep,
exec = ctx.exec,
process = ctx.process,
loggingInfo = loggingInfo
)
}

private object PreparedExecution {
def apply[C, A](sql: String, a: A, rsio: ResultSetIO[C])(implicit w: Write[A]): PreparedExecution[C] =
PreparedExecution(
create = IFC.prepareStatement(sql),
prep = IHPS.set(a),
exec = IFPS.executeQuery,
process = rsio
)
}
type PreparedExecutionUpdate[A] = PreparedExecutionWithResultSet[A] => PreparedExecutionWithResultSet[A]

/** An abstract query closed over its input arguments and yielding values of type `B`, without a specified
* disposition. Methods provided on `[[Query0]]` allow the query to be interpreted as a stream or program in
Expand Down

0 comments on commit f3787de

Please sign in to comment.