Skip to content

Commit

Permalink
update multi-query statement support impl
Browse files Browse the repository at this point in the history
  • Loading branch information
rolang committed Dec 24, 2023
1 parent ead12e5 commit 9b3b820
Showing 1 changed file with 42 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,54 @@ private[dumbo] object Query {
case _ => false
}.void

// discard messages until ReadyForQuery, fail on ErrorResponse/CopyInResponse/CopyOutResponse
def discard(stmt: Statement[?]): F[Unit] = flatExpect {
case ReadyForQuery(_) => ().pure[F]
// Finish up any single or multi-query statement, discard returned completions and/or rows
// Fail with first encountered error
def finishUpDiscard(stmt: Statement[?], error: Option[SkunkException]): F[Unit] =
flatExpect {
case ReadyForQuery(_) =>
error match {
case None => ().pure[F]
case Some(e) => e.raiseError[F, Unit]
}

case ErrorResponse(e) =>
discard(stmt) *> history(Int.MaxValue).flatMap(hi =>
new PostgresErrorException(
sql = stmt.sql,
sqlOrigin = Some(stmt.origin),
info = e,
history = hi,
).raiseError[F, Unit]
)
case RowDescription(_) | RowData(_) | CommandComplete(_) | EmptyQueryResponse =>
finishUpDiscard(stmt, error)

// We don't support COPY FROM STDIN yet but we do need to be able to clean up if a user
// tries it.
case CopyInResponse(_) =>
send(CopyFail) *>
expect { case ErrorResponse(_) => } *>
discard(stmt) *> new CopyNotSupportedException(stmt).raiseError[F, Unit]
case NoticeResponse(info) =>
error match {
case None =>
for {
hi <- history(Int.MaxValue)
err = new PostgresErrorException(stmt.sql, Some(stmt.origin), info, hi)
c <- finishUpDiscard(stmt, Some(err))
} yield c
case _ => finishUpDiscard(stmt, error)
}

case CopyOutResponse(_) =>
finishCopyOut *> discard(stmt) *>
new CopyNotSupportedException(stmt).raiseError[F, Unit]
case ErrorResponse(info) =>
error match {
case None =>
for {
hi <- history(Int.MaxValue)
err = new PostgresErrorException(stmt.sql, Some(stmt.origin), info, hi)
c <- finishUpDiscard(stmt, Some(err))
} yield c
case _ => finishUpDiscard(stmt, error)
}

case _ => discard(stmt)
}
// We don't support COPY FROM STDIN yet but we do need to be able to clean up if a user
// tries it.
case CopyInResponse(_) =>
send(CopyFail) *>
expect { case ErrorResponse(_) => } *>
finishUpDiscard(stmt, error.orElse(new CopyNotSupportedException(stmt).some))

case CopyOutResponse(_) =>
finishCopyOut *> finishUpDiscard(stmt, error.orElse(new CopyNotSupportedException(stmt).some))
}

override def apply(command: Statement[Void]): F[Unit] = exchange("query") {
Trace[F].put("command.sql" -> command.sql) *> send(QueryMessage(command.sql)) *> discard(command)
Trace[F].put("command.sql" -> command.sql) *> send(QueryMessage(command.sql)) *> finishUpDiscard(command, None)
}

}
Expand Down

0 comments on commit 9b3b820

Please sign in to comment.