Skip to content

Commit

Permalink
2.x: Async persist (#259)
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler authored Nov 1, 2023
1 parent 268b92b commit de6927a
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 130 deletions.
52 changes: 0 additions & 52 deletions Sources/Hummingbird/AsyncAwaitSupport/Request+Persist+async.swift

This file was deleted.

86 changes: 33 additions & 53 deletions Sources/Hummingbird/Storage/MemoryPersistDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,43 @@
//
//===----------------------------------------------------------------------===//

#if os(Linux)
import Glibc
#else
import Darwin.C
#endif
import AsyncAlgorithms
import Atomics
import NIOCore

/// In memory driver for persist system for storing persistent cross request key/value pairs
public final class HBMemoryPersistDriver: HBPersistDriver {
public init(eventLoopGroupProvider: EventLoopGroupProvider = .singleton) {
let eventLoopGroup = eventLoopGroupProvider.eventLoopGroup
self.eventLoop = eventLoopGroup.next()
public actor HBMemoryPersistDriver<C: Clock>: HBPersistDriver where C.Duration == Duration {
public init(_ clock: C = .suspending) {
self.values = [:]
self.task = self.eventLoop.scheduleRepeatedTask(initialDelay: .hours(1), delay: .hours(1)) { _ in
self.tidy()
}
self.clock = clock
}

public func shutdown() {
self.task?.cancel()
public func create<Object: Codable>(key: String, value: Object, expires: Duration?) async throws {
guard self.values[key] == nil else { throw HBPersistError.duplicate }
self.values[key] = .init(value: value, expires: expires.map { self.clock.now.advanced(by: $0) })
}

public func create<Object: Codable>(key: String, value: Object, expires: TimeAmount?, request: HBRequest) -> EventLoopFuture<Void> {
return self.eventLoop.submit {
guard self.values[key] == nil else { throw HBPersistError.duplicate }
self.values[key] = .init(value: value, expires: expires)
}
public func set<Object: Codable>(key: String, value: Object, expires: Duration?) async throws {
self.values[key] = .init(value: value, expires: expires.map { self.clock.now.advanced(by: $0) })
}

public func set<Object: Codable>(key: String, value: Object, expires: TimeAmount?, request: HBRequest) -> EventLoopFuture<Void> {
return self.eventLoop.submit {
self.values[key] = .init(value: value, expires: expires)
}
public func get<Object: Codable>(key: String, as: Object.Type) async throws -> Object? {
guard let item = self.values[key] else { return nil }
guard let expires = item.expires else { return item.value as? Object }
guard self.clock.now <= expires else { return nil }
return item.value as? Object
}

public func get<Object: Codable>(key: String, as: Object.Type, request: HBRequest) -> EventLoopFuture<Object?> {
return self.eventLoop.submit {
guard let item = self.values[key] else { return nil }
guard let expires = item.epochExpires else { return item.value as? Object }
guard Item.getEpochTime() <= expires else { return nil }
return item.value as? Object
}
}

public func remove(key: String, request: HBRequest) -> EventLoopFuture<Void> {
return self.eventLoop.submit {
self.values[key] = nil
}
public func remove(key: String) async throws {
self.values[key] = nil
}

/// Delete any values that have expired
private func tidy() {
let currentTime = Item.getEpochTime()
let now = self.clock.now
self.values = self.values.compactMapValues {
if let expires = $0.epochExpires {
if expires > currentTime {
if let expires = $0.expires {
if expires > now {
return nil
}
}
Expand All @@ -77,26 +59,24 @@ public final class HBMemoryPersistDriver: HBPersistDriver {
struct Item {
/// value stored
let value: Codable
/// epoch time for when item expires
let epochExpires: Int?
/// time when item expires
let expires: C.Instant?

init(value: Codable, expires: TimeAmount?) {
init(value: Codable, expires: C.Instant?) {
self.value = value
self.epochExpires = expires.map { Self.getEpochTime() + Int($0.nanoseconds / 1_000_000_000) }
self.expires = expires
}
}

static func getEpochTime() -> Int {
var timeVal = timeval.init()
gettimeofday(&timeVal, nil)
return timeVal.tv_sec
public func run() async throws {
let cancelled = ManagedAtomic(false)
let timerSequence = AsyncTimerSequence(interval: .seconds(600), clock: .suspending)
.cancelOnGracefulShutdown()
for try await _ in timerSequence {
self.tidy()
}
}

let eventLoop: EventLoop
var values: [String: Item]
var task: RepeatedTask?
let clock: C
}

// We are able to conform HBMemoryPersistDriver to `@unchecked Sendable` as the value dictionary
// is only ever access on the one event loop and the task is only set in the `init`
extension HBMemoryPersistDriver: @unchecked Sendable {}
36 changes: 19 additions & 17 deletions Sources/Hummingbird/Storage/PersistDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,62 @@
//===----------------------------------------------------------------------===//

import NIOCore
import ServiceLifecycle

/// Protocol for driver supporting persistent Key/Value pairs across requests
public protocol HBPersistDriver {
public protocol HBPersistDriver: Service {
/// shutdown driver
func shutdown()
func shutdown() async throws

/// create key/value pair. If key already exist throw `HBPersistError.duplicate` error
/// - Parameters:
/// - key: Key to store value against
/// - value: Codable value to store
/// - expires: If non-nil defines time that value will expire
/// - request: Request making this call
func create<Object: Codable>(key: String, value: Object, expires: TimeAmount?, request: HBRequest) -> EventLoopFuture<Void>
func create<Object: Codable>(key: String, value: Object, expires: Duration?) async throws

/// set value for key. If value already exists overwrite it
/// - Parameters:
/// - key: Key to store value against
/// - value: Codable value to store
/// - expires: If non-nil defines time that value will expire
/// - request: Request making this call
func set<Object: Codable>(key: String, value: Object, expires: TimeAmount?, request: HBRequest) -> EventLoopFuture<Void>
func set<Object: Codable>(key: String, value: Object, expires: Duration?) async throws

/// get value for key
/// - Parameters:
/// - key: Key used to look for value
/// - as: Type you want value to be returned as. If it cannot be returned as this value then nil will be returned
/// - request: Request making this call
func get<Object: Codable>(key: String, as: Object.Type, request: HBRequest) -> EventLoopFuture<Object?>
func get<Object: Codable>(key: String, as: Object.Type) async throws -> Object?

/// remove value associated with key
/// - Parameters:
/// - key: Key used to look for value
/// - request: Request making this call
func remove(key: String, request: HBRequest) -> EventLoopFuture<Void>
func remove(key: String) async throws
}

extension HBPersistDriver {
/// default implemenation of shutdown()
public func shutdown() {}
public func shutdown() async throws {}

/// create key/value pair. If key already exist throw `HBPersistError.duplicate` error
/// - Parameters:
/// - key: Key to store value against
/// - value: Codable value to store
/// - request: Request making this call
func create<Object: Codable>(key: String, value: Object, request: HBRequest) -> EventLoopFuture<Void> {
self.create(key: key, value: value, expires: nil, request: request)
public func create<Object: Codable>(key: String, value: Object) async throws {
try await self.create(key: key, value: value, expires: nil)
}

/// set value for key. If value already exists overwrite it
/// - Parameters:
/// - key: Key to store value against
/// - value: Codable value to store
/// - expires: If non-nil defines time that value will expire
/// - request: Request making this call
func set<Object: Codable>(key: String, value: Object, request: HBRequest) -> EventLoopFuture<Void> {
self.set(key: key, value: value, expires: nil, request: request)
public func set<Object: Codable>(key: String, value: Object) async throws {
try await self.set(key: key, value: value, expires: nil)
}

public func run() async throws {
await ShutdownWaiter().wait()
try await self.shutdown()
}
}
38 changes: 38 additions & 0 deletions Sources/Hummingbird/Utils/ShutdownWaiter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2021 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import ServiceLifecycle

actor ShutdownWaiter {
private var taskContinuation: CheckedContinuation<Void, Never>?

init() {}

func wait() async {
await withGracefulShutdownHandler {
await withCheckedContinuation { continuation in
self.taskContinuation = continuation
}
} onGracefulShutdown: {
Task {
await self.stop()
}
}
}

private func stop() {
self.taskContinuation?.resume()
self.taskContinuation = nil
}
}
16 changes: 8 additions & 8 deletions Tests/HummingbirdTests/PersistTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ final class PersistTests: XCTestCase {
router.put("/persist/:tag") { request, context -> HTTPResponseStatus in
guard let buffer = request.body.buffer else { throw HBHTTPError(.badRequest) }
let tag = try context.parameters.require("tag")
try await persist.set(key: tag, value: String(buffer: buffer), request: request)
try await persist.set(key: tag, value: String(buffer: buffer))
return .ok
}
router.put("/persist/:tag/:time") { request, context -> HTTPResponseStatus in
guard let time = context.parameters.get("time", as: Int.self) else { throw HBHTTPError(.badRequest) }
guard let buffer = request.body.buffer else { throw HBHTTPError(.badRequest) }
let tag = try context.parameters.require("tag")
try await persist.set(key: tag, value: String(buffer: buffer), expires: .seconds(numericCast(time)), request: request)
try await persist.set(key: tag, value: String(buffer: buffer), expires: .seconds(time))
return .ok
}
router.get("/persist/:tag") { request, context -> String? in
guard let tag = context.parameters.get("tag", as: String.self) else { throw HBHTTPError(.badRequest) }
return try await persist.get(key: tag, as: String.self, request: request)
return try await persist.get(key: tag, as: String.self)
}
router.delete("/persist/:tag") { request, context -> HTTPResponseStatus in
guard let tag = context.parameters.get("tag", as: String.self) else { throw HBHTTPError(.badRequest) }
try await persist.remove(key: tag, request: request).get()
try await persist.remove(key: tag)
return .noContent
}
return (router, persist)
Expand All @@ -67,7 +67,7 @@ final class PersistTests: XCTestCase {
router.put("/create/:tag") { request, context -> HTTPResponseStatus in
guard let buffer = request.body.buffer else { throw HBHTTPError(.badRequest) }
let tag = try context.parameters.require("tag")
try await persist.create(key: tag, value: String(buffer: buffer), request: request)
try await persist.create(key: tag, value: String(buffer: buffer))
return .ok
}
let app = HBApplication(responder: router.buildResponder())
Expand All @@ -87,7 +87,7 @@ final class PersistTests: XCTestCase {
guard let buffer = request.body.buffer else { throw HBHTTPError(.badRequest) }
let tag = try context.parameters.require("tag")
do {
try await persist.create(key: tag, value: String(buffer: buffer), request: request)
try await persist.create(key: tag, value: String(buffer: buffer))
} catch let error as HBPersistError where error == .duplicate {
throw HBHTTPError(.conflict)
}
Expand Down Expand Up @@ -155,12 +155,12 @@ final class PersistTests: XCTestCase {
router.put("/codable/:tag") { request, context -> HTTPResponseStatus in
guard let tag = context.parameters.get("tag") else { throw HBHTTPError(.badRequest) }
guard let buffer = request.body.buffer else { throw HBHTTPError(.badRequest) }
try await persist.set(key: tag, value: TestCodable(buffer: String(buffer: buffer)), request: request)
try await persist.set(key: tag, value: TestCodable(buffer: String(buffer: buffer)))
return .ok
}
router.get("/codable/:tag") { request, context -> String? in
guard let tag = context.parameters.get("tag") else { throw HBHTTPError(.badRequest) }
let value = try await persist.get(key: tag, as: TestCodable.self, request: request)
let value = try await persist.get(key: tag, as: TestCodable.self)
return value?.buffer
}
let app = HBApplication(responder: router.buildResponder())
Expand Down

0 comments on commit de6927a

Please sign in to comment.