Skip to content

Commit df4d80a

Browse files
committed
Add locking, corresponding error type.
1 parent b27d376 commit df4d80a

14 files changed

+175
-90
lines changed

src/actor.ts

+71-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import { Backend } from "./backend"
66
import { Observer } from "./helpers/observer"
7-
import { PublicKeys, Result, Error, Status } from "./interfaces"
7+
import { PublicKeys, Result, Error, ErrorType, Status } from "./interfaces"
88

99
interface PublicKeysResult extends Result {
1010
keys: PublicKeys
@@ -19,6 +19,29 @@ export interface Cache<T, G> {
1919
get(...args: any[]): Promise<T>
2020
}
2121

22+
type Task = [string, Date, number]
23+
24+
function timeout(ms: number) {
25+
return new Promise((resolve) => setTimeout(resolve, ms))
26+
}
27+
28+
export function locked<T, G extends Actor>(
29+
f: (this: G, ...args: any[]) => Promise<T | Error>
30+
): (this: G, ...args: any[]) => Promise<T | Error> {
31+
return async function (this: G, ...args: any[]): Promise<T | Error> {
32+
// we lock the task
33+
if ((await this.lock(f.name)) === false)
34+
return { status: Status.Failed } // we can't obtain a lock
35+
36+
const result = await f.apply(this, args)
37+
38+
// we unlock the task
39+
this.unlock(f.name)
40+
41+
return result
42+
}
43+
}
44+
2245
export function cached<T, G extends Actor>(
2346
f: (this: G, ...args: any[]) => Promise<T>,
2447
name: string
@@ -48,32 +71,43 @@ export function cached<T, G extends Actor>(
4871
}
4972

5073
async function getKeys(this: Actor): Promise<PublicKeysResult | Error> {
51-
const result = await this.backend.appointments.getKeys()
74+
const response = await this.backend.appointments.getKeys()
5275

53-
if ("code" in result)
76+
if ("code" in response)
5477
return {
5578
status: Status.Failed,
56-
error: result,
79+
error: {
80+
type: ErrorType.RPC,
81+
data: response,
82+
},
5783
}
5884

5985
return {
6086
status: Status.Succeeded,
61-
keys: result,
87+
keys: response,
6288
}
6389
}
6490

6591
export class Actor extends Observer {
66-
public keys = cached(getKeys, "keys")
92+
public keys = cached(locked(getKeys), "keys")
6793
public backend: Backend
6894
public actor: string
6995
public id: string
7096

97+
private _taskId: number
98+
private _tasks: Task[]
99+
private _locked: boolean
100+
71101
constructor(actor: string, id: string, backend: Backend) {
72102
// the ID will be used to address local storage so that e.g. we can
73103
// manage multiple providers, users etc. if necessary...
74104

75105
super()
76106

107+
this._taskId = 0
108+
this._tasks = []
109+
this._locked = false
110+
77111
this.actor = actor
78112
this.id = id
79113
this.backend = backend
@@ -97,9 +131,37 @@ export class Actor extends Observer {
97131
this.backend.temporary.set(`${this.actor}::${this.id}::${key}`, value)
98132
}
99133

100-
public unlock(key: string) {}
134+
unlock(task: string) {
135+
if (this._tasks.length === 0) return false // should never happen
136+
if (this._tasks[0][0] !== task) return false // wrong task order (should not happen)
137+
this._tasks = this._tasks.slice(1)
138+
return true
139+
}
101140

102-
public clearLocks() {}
141+
async lock(task: string) {
142+
if (this._tasks.find((t: Task) => t[0] === task) !== undefined) {
143+
console.warn(
144+
`Task ${this.actor}::${this.id}::${task} is already in queue, aborting...`
145+
)
146+
return false
147+
}
103148

104-
public lock(key: string) {}
149+
const taskId = this._taskId++
150+
this._tasks.push([task, new Date(), taskId])
151+
152+
while (true) {
153+
if (this._tasks.length === 0) return false // should not happen
154+
const [t, dt, id] = this._tasks[0]
155+
if (id === taskId) break // it's our turn
156+
if (new Date().getTime() - dt.getTime() > 1000 * 10)
157+
// tasks time out after 10 seconds
158+
this._tasks = this._tasks.slice(1)
159+
await timeout(10)
160+
}
161+
return true
162+
}
163+
164+
clearLocks() {
165+
this._tasks = []
166+
}
105167
}

src/backend/local.ts

-46
Original file line numberDiff line numberDiff line change
@@ -2,68 +2,22 @@
22
// Copyright (C) 2021-2021 The Kiebitz Authors
33
// README.md contains license information.
44

5-
function timeout(ms: number) {
6-
return new Promise((resolve) => setTimeout(resolve, ms))
7-
}
8-
95
import { Settings, Store } from "../interfaces"
106

11-
type Task = [string, Date, number]
12-
137
// The local backend
148
export class LocalBackend {
159
public store: Store
1610
public settings: Settings
1711

18-
private _taskId: number
19-
private _tasks: Task[]
20-
private _locked: boolean
21-
2212
constructor(settings: Settings, store: Store) {
2313
this.settings = settings
2414
this.store = store
25-
this._taskId = 0
26-
this._tasks = []
27-
this._locked = false
2815
}
2916

3017
get(key: string, defaultValue?: any) {
3118
return this.store.get(key, defaultValue)
3219
}
3320

34-
unlock(task: string) {
35-
if (this._tasks.length === 0) throw "should not happen"
36-
if (this._tasks[0][0] !== task) throw "wrong task"
37-
this._tasks = this._tasks.slice(1)
38-
console.log(`Finished task ${task}...`)
39-
}
40-
41-
clearLocks() {
42-
this._tasks = []
43-
}
44-
45-
async lock(task: string) {
46-
if (this._tasks.find((t: Task) => t[0] === task) !== undefined) {
47-
console.log(`task ${task} is already in queue, aborting...`)
48-
throw "already queued up" // there's already a task queued up
49-
}
50-
51-
const taskId = this._taskId++
52-
this._tasks.push([task, new Date(), taskId])
53-
54-
while (true) {
55-
if (this._tasks.length === 0) throw "should not happen"
56-
const [t, dt, id] = this._tasks[0]
57-
if (id === taskId) break // it's our turn
58-
if (new Date().getTime() - dt.getTime() > 1000 * 60 * 5)
59-
// tasks time out after 5 minutes
60-
this._tasks = this._tasks.slice(1)
61-
await timeout(10)
62-
}
63-
console.log(`Executing task ${task}...`)
64-
// now we go...
65-
}
66-
6721
set(key: string, data: any) {
6822
return this.store.set(key, data)
6923
}

src/interfaces/backend.ts

+11-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,17 @@ export interface Result {
2626
[key: string]: any
2727
}
2828

29+
export enum ErrorType {
30+
Locking = "locking",
31+
Crypto = "crypto",
32+
Data = "data",
33+
RPC = "rpc",
34+
}
35+
2936
export interface Error {
3037
status: Status.Failed
31-
error?: { [key: string]: any }
38+
error?: {
39+
type: ErrorType
40+
data?: { [key: string]: any }
41+
}
3242
}

src/mediator/providers.ts

+26-16
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
Error,
88
Result,
99
Status,
10+
ErrorType,
1011
EncryptedProviderData,
1112
KeyPair,
1213
ProviderData,
@@ -20,26 +21,31 @@ interface ProvidersResult extends Result {
2021
export async function pendingProviders(
2122
this: Mediator
2223
): Promise<ProvidersResult | Error> {
23-
const providerData = await this.backend.appointments.getPendingProviderData(
24+
const response = await this.backend.appointments.getPendingProviderData(
2425
{},
2526
this.keyPairs!.signing
2627
)
2728

28-
if ("code" in providerData)
29+
if ("code" in response)
2930
return {
3031
status: Status.Failed,
31-
error: providerData,
32+
error: {
33+
type: ErrorType.RPC,
34+
data: response,
35+
},
3236
}
3337

34-
for (const pd of providerData) {
38+
for (const pd of response) {
3539
const decryptedData = await ecdhDecrypt(
3640
pd.encryptedData,
3741
this.keyPairs!.provider.privateKey
3842
)
3943
if (decryptedData === null)
4044
return {
4145
status: Status.Failed,
42-
error: pd,
46+
error: {
47+
type: ErrorType.Crypto,
48+
},
4349
}
4450

4551
// to do: verify provider data!
@@ -49,34 +55,38 @@ export async function pendingProviders(
4955

5056
return {
5157
status: Status.Succeeded,
52-
providers: providerData,
58+
providers: response,
5359
}
5460
}
5561

5662
export async function verifiedProviders(
5763
this: Mediator
5864
): Promise<ProvidersResult | Error> {
59-
const providerData =
60-
await this.backend.appointments.getVerifiedProviderData(
61-
{},
62-
this.keyPairs!.signing
63-
)
65+
const response = await this.backend.appointments.getVerifiedProviderData(
66+
{},
67+
this.keyPairs!.signing
68+
)
6469

65-
if ("code" in providerData)
70+
if ("code" in response)
6671
return {
6772
status: Status.Failed,
68-
error: providerData,
73+
error: {
74+
type: ErrorType.RPC,
75+
data: response,
76+
},
6977
}
7078

71-
for (const pd of providerData) {
79+
for (const pd of response) {
7280
const decryptedData = await ecdhDecrypt(
7381
pd.encryptedData,
7482
this.keyPairs!.provider.privateKey
7583
)
7684
if (decryptedData === null)
7785
return {
7886
status: Status.Failed,
79-
error: pd,
87+
error: {
88+
type: ErrorType.Crypto,
89+
},
8090
}
8191

8292
// to do: verify provider data!
@@ -86,6 +96,6 @@ export async function verifiedProviders(
8696

8797
return {
8898
status: Status.Succeeded,
89-
providers: providerData,
99+
providers: response,
90100
}
91101
}

src/provider/check-data.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { ecdhDecrypt } from "../crypto"
66
import {
77
Result,
88
Error,
9+
ErrorType,
910
Status,
1011
ConfirmedProviderData,
1112
ProviderData,
@@ -28,7 +29,10 @@ export async function checkData(
2829
if ("code" in response)
2930
return {
3031
status: Status.Failed,
31-
error: response,
32+
error: {
33+
type: ErrorType.RPC,
34+
data: response,
35+
},
3236
}
3337

3438
// to do: check signature

src/provider/get-appointments.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
SignedAppointment,
1111
Result,
1212
Error,
13+
ErrorType,
1314
} from "../interfaces"
1415
import { Provider } from "./"
1516

@@ -41,7 +42,10 @@ export async function getAppointments(
4142
if (!(response instanceof Array))
4243
return {
4344
status: Status.Failed,
44-
error: response,
45+
error: {
46+
type: ErrorType.RPC,
47+
data: response,
48+
},
4549
}
4650

4751
const newAppointments: Appointment[] = []

src/provider/publish-appointments.ts

+14-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33
// README.md contains license information.
44

55
import { randomBytes, sign } from "../crypto"
6-
import { Appointment, SignedData, Result, Error, Status } from "../interfaces"
6+
import {
7+
Appointment,
8+
SignedData,
9+
Result,
10+
Error,
11+
ErrorType,
12+
Status,
13+
} from "../interfaces"
714
import { Provider } from "./"
815

916
export async function publishAppointments(
@@ -48,17 +55,20 @@ export async function publishAppointments(
4855
status: Status.Succeeded,
4956
}
5057

51-
const result = await this.backend.appointments.publishAppointments(
58+
const response = await this.backend.appointments.publishAppointments(
5259
{
5360
appointments: signedAppointments,
5461
},
5562
this.keyPairs!.signing
5663
)
5764

58-
if (result !== "ok")
65+
if (response !== "ok")
5966
return {
6067
status: Status.Failed,
61-
error: result,
68+
error: {
69+
type: ErrorType.RPC,
70+
data: response,
71+
},
6272
}
6373

6474
return {

0 commit comments

Comments
 (0)