Skip to content

Commit

Permalink
make sure the runPromise is fulfilled or precondition
Browse files Browse the repository at this point in the history
  • Loading branch information
MahdiBM committed Aug 3, 2024
1 parent 7a8b29c commit 876d352
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions Sources/PostgresNIO/New/PostgresChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,10 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
case .establishSSLConnection:
self.establishSSLConnection(context: context, promise: runPromise)
case .read:
precondition(runPromise == nil, "This Promise will leak.")
context.read()
case .wait:
precondition(runPromise == nil, "This Promise will leak.")
break
case .sendStartupMessage(let authContext):
self.encoder.startup(user: authContext.username, database: authContext.database, options: authContext.additionalParameters)
Expand All @@ -346,18 +348,28 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
self.encoder.saslResponse(bytes)
context.writeAndFlush(self.wrapOutboundOut(self.encoder.flushBuffer()), promise: runPromise)
case .closeConnectionAndCleanup(let cleanupContext):
precondition(runPromise == nil, "This Promise will leak.")
self.closeConnectionAndCleanup(cleanupContext, context: context)
case .fireChannelInactive:
precondition(runPromise == nil, "This Promise will leak.")
context.fireChannelInactive()
case .sendParseDescribeSync(let name, let query, let bindingDataTypes):
self.sendParseDescribeAndSyncMessage(statementName: name, query: query, bindingDataTypes: bindingDataTypes, context: context, promise: runPromise)
self.sendParseDescribeAndSyncMessage(
statementName: name,
query: query,
bindingDataTypes: bindingDataTypes,
context: context,
promise: runPromise
)
case .sendBindExecuteSync(let executeStatement):
self.sendBindExecuteAndSyncMessage(executeStatement: executeStatement, context: context, promise: runPromise)
case .sendParseDescribeBindExecuteSync(let query):
self.sendParseDescribeBindExecuteAndSyncMessage(query: query, context: context, promise: runPromise)
case .succeedQuery(let promise, with: let result):
precondition(runPromise == nil, "This Promise will leak.")
self.succeedQuery(promise, result: result, context: context)
case .failQuery(let promise, with: let error, let cleanupContext):
precondition(runPromise == nil, "This Promise will leak.")
promise.fail(error)
if let cleanupContext = cleanupContext {
self.closeConnectionAndCleanup(cleanupContext, context: context)
Expand All @@ -367,6 +379,7 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
self.rowStream!.receive(rows)

case .forwardStreamComplete(let buffer, let commandTag):
precondition(runPromise == nil, "This Promise will leak.")
guard let rowStream = self.rowStream else {
// if the stream was cancelled we don't have it here anymore.
return
Expand All @@ -378,6 +391,7 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
rowStream.receive(completion: .success(commandTag))

case .forwardStreamError(let error, let read, let cleanupContext):
precondition(runPromise == nil, "This Promise will leak.")
self.rowStream!.receive(completion: .failure(error))
self.rowStream = nil
if let cleanupContext = cleanupContext {
Expand All @@ -400,33 +414,40 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
return self.run(action, with: context, promise: runPromise)
}
case .fireEventReadyForQuery:
precondition(runPromise == nil, "This Promise will leak.")
context.fireUserInboundEventTriggered(PSQLEvent.readyForQuery)
case .closeConnection(let promise):
precondition(runPromise == nil, "This Promise will leak.")
if context.channel.isActive {
// The normal, graceful termination procedure is that the frontend sends a Terminate
// message and immediately closes the connection. On receipt of this message, the
// backend closes the connection and terminates.
self.encoder.terminate()
context.writeAndFlush(self.wrapOutboundOut(self.encoder.flushBuffer()), promise: runPromise)
context.writeAndFlush(self.wrapOutboundOut(self.encoder.flushBuffer()))
}
context.close(mode: .all, promise: promise)
case .succeedPreparedStatementCreation(let promise, with: let rowDescription):
precondition(runPromise == nil, "This Promise will leak.")
promise.succeed(rowDescription)
case .failPreparedStatementCreation(let promise, with: let error, let cleanupContext):
precondition(runPromise == nil, "This Promise will leak.")
promise.fail(error)
if let cleanupContext = cleanupContext {
self.closeConnectionAndCleanup(cleanupContext, context: context)
}
case .sendCloseSync(let sendClose):
self.sendCloseAndSyncMessage(sendClose, context: context, promise: runPromise)
case .succeedClose(let closeContext):
precondition(runPromise == nil, "This Promise will leak.")
closeContext.promise.succeed(Void())
case .failClose(let closeContext, with: let error, let cleanupContext):
precondition(runPromise == nil, "This Promise will leak.")
closeContext.promise.fail(error)
if let cleanupContext = cleanupContext {
self.closeConnectionAndCleanup(cleanupContext, context: context)
}
case .forwardNotificationToListeners(let notification):
precondition(runPromise == nil, "This Promise will leak.")
self.forwardNotificationToListeners(notification, context: context)
}
}
Expand Down

0 comments on commit 876d352

Please sign in to comment.