Skip to content

Commit

Permalink
Expose query metadata in PostgresRowSequence
Browse files Browse the repository at this point in the history
  • Loading branch information
MahdiBM committed Aug 22, 2024
1 parent 9f84290 commit 2e98065
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
4 changes: 2 additions & 2 deletions Sources/PostgresNIO/New/PSQLRowStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ final class PSQLRowStream: @unchecked Sendable {
}

internal let rowDescription: [RowDescription.Column]
private let lookupTable: [String: Int]
internal let lookupTable: [String: Int]
private var downstreamState: DownstreamState

init(
Expand Down Expand Up @@ -114,7 +114,7 @@ final class PSQLRowStream: @unchecked Sendable {
self.downstreamState = .consumed(.failure(error))
}

return PostgresRowSequence(producer.sequence, lookupTable: self.lookupTable, columns: self.rowDescription)
return PostgresRowSequence(producer.sequence, stream: self)
}

func demand() {
Expand Down
22 changes: 16 additions & 6 deletions Sources/PostgresNIO/New/PostgresRowSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ public struct PostgresRowSequence: AsyncSequence, Sendable {

typealias BackingSequence = NIOThrowingAsyncSequenceProducer<DataRow, Error, AdaptiveRowBuffer, PSQLRowStream>

let backing: BackingSequence
let lookupTable: [String: Int]
let columns: [RowDescription.Column]
private let backing: BackingSequence
private let stream: PSQLRowStream
var lookupTable: [String: Int] {
self.stream.lookupTable
}
var columns: [RowDescription.Column] {
self.stream.rowDescription
}

init(_ backing: BackingSequence, lookupTable: [String: Int], columns: [RowDescription.Column]) {
init(_ backing: BackingSequence, stream: PSQLRowStream) {
self.backing = backing
self.lookupTable = lookupTable
self.columns = columns
self.stream = stream
}

public func makeAsyncIterator() -> AsyncIterator {
Expand Down Expand Up @@ -67,6 +71,12 @@ extension PostgresRowSequence {
}
return result
}

public func collectWithMetadata() async throws -> (metadata: PostgresQueryMetadata?, rows: [PostgresRow]) {
let rows = try await self.collect()
let metadata = PostgresQueryMetadata(string: self.stream.commandTag)
return (metadata, rows)
}
}

struct AdaptiveRowBuffer: NIOAsyncSequenceProducerBackPressureStrategy {
Expand Down
37 changes: 29 additions & 8 deletions Tests/IntegrationTests/AsyncTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,31 @@ final class AsyncPostgresConnectionTests: XCTestCase {
}
}

func testSelect10kRowsAndCollect() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()

let start = 1
let end = 10000

try await withTestConnection(on: eventLoop) { connection in
let rows = try await connection.query("SELECT generate_series(\(start), \(end));", logger: .psqlTest)
let (metadata, elements) = try await rows.collectWithMetadata()
var counter = 0
for row in elements {
let element = try row.decode(Int.self)
XCTAssertEqual(element, counter + 1)
counter += 1
}
XCTAssertEqual(metadata?.command, "SELECT")
XCTAssertEqual(metadata?.oid, nil)
XCTAssertEqual(metadata?.rows, 10000)

XCTAssertEqual(counter, end)
}
}

func testSelectActiveConnection() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
Expand Down Expand Up @@ -207,7 +232,7 @@ final class AsyncPostgresConnectionTests: XCTestCase {

try await withTestConnection(on: eventLoop) { connection in
// Max binds limit is UInt16.max which is 65535 which is 3 * 5 * 17 * 257
// Max columns limit is 1664, so we will only make 5 * 257 columns which is less
// Max columns limit appears to be ~1600, so we will only make 5 * 257 columns which is less
// Then we will insert 3 * 17 rows
// In the insertion, there will be a total of 3 * 17 * 5 * 257 == UInt16.max bindings
// If the test is successful, it means Postgres supports UInt16.max bindings
Expand Down Expand Up @@ -241,13 +266,9 @@ final class AsyncPostgresConnectionTests: XCTestCase {
unsafeSQL: "INSERT INTO table1 VALUES \(insertionValues)",
binds: binds
)
try await connection.query(insertionQuery, logger: .psqlTest)

let countQuery = PostgresQuery(unsafeSQL: "SELECT COUNT(*) FROM table1")
let countRows = try await connection.query(countQuery, logger: .psqlTest)
var countIterator = countRows.makeAsyncIterator()
let insertedRowsCount = try await countIterator.next()?.decode(Int.self, context: .default)
XCTAssertEqual(rowsCount, insertedRowsCount)
let result = try await connection.query(insertionQuery, logger: .psqlTest)
let metadata = try await result.collectWithMetadata().metadata
XCTAssertEqual(metadata?.rows, rowsCount)

let dropQuery = PostgresQuery(unsafeSQL: "DROP TABLE table1")
try await connection.query(dropQuery, logger: .psqlTest)
Expand Down

0 comments on commit 2e98065

Please sign in to comment.