diff --git a/package-lock.json b/package-lock.json index 00832d7..8e6e644 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5,7 +5,9 @@ "packages": { "": { "dependencies": { + "@effect/schema": "^0.50.0", "effect": "^2.0.0-next.56", + "ioredis": "^5.3.2", "orcid-id-ts": "^0.1.2" }, "devDependencies": { @@ -288,6 +290,15 @@ "node": ">=6.9.0" } }, + "node_modules/@effect/schema": { + "version": "0.50.0", + "resolved": "https://registry.npmjs.org/@effect/schema/-/schema-0.50.0.tgz", + "integrity": "sha512-kzFwbKDvv7TjHe0ZL1jq9KmZ6HzeVd8m4acpfRxfMKglEVQq/srT0JmvW1AD4kBLrN+4IAuCZBL892E9Z0YhWQ==", + "peerDependencies": { + "effect": "2.0.0-next.56", + "fast-check": "^3.13.2" + } + }, "node_modules/@esbuild/android-arm": { "version": "0.18.20", "resolved": "https://registry.npmjs.org/@esbuild/android-arm/-/android-arm-0.18.20.tgz", @@ -744,6 +755,11 @@ "integrity": "sha512-dvuCeX5fC9dXgJn9t+X5atfmgQAzUOWqS1254Gh0m6i8wKd10ebXkfNKiRK+1GWi/yTvvLDHpoxLr0xxxeslWw==", "dev": true }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==" + }, "node_modules/@jridgewell/gen-mapping": { "version": "0.3.3", "resolved": "https://registry.npmjs.org/@jridgewell/gen-mapping/-/gen-mapping-0.3.3.tgz", @@ -1336,6 +1352,14 @@ "node": ">=4" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/color-convert": { "version": "1.9.3", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz", @@ -1375,7 +1399,6 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, "dependencies": { "ms": "2.1.2" }, @@ -1425,6 +1448,14 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "engines": { + "node": ">=0.10" + } + }, "node_modules/dir-glob": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/dir-glob/-/dir-glob-3.0.1.tgz", @@ -1995,6 +2026,28 @@ "node": ">=0.10.0" } }, + "node_modules/fast-check": { + "version": "3.14.0", + "resolved": "https://registry.npmjs.org/fast-check/-/fast-check-3.14.0.tgz", + "integrity": "sha512-9Z0zqASzDNjXBox/ileV/fd+4P+V/f3o4shM6QawvcdLFh8yjPG4h5BrHUZ8yzY6amKGDTAmRMyb/JZqe+dCgw==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/dubzzz" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fast-check" + } + ], + "peer": true, + "dependencies": { + "pure-rand": "^6.0.0" + }, + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", @@ -2469,6 +2522,29 @@ "node": ">= 0.4" } }, + "node_modules/ioredis": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/is-array-buffer": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/is-array-buffer/-/is-array-buffer-3.0.2.tgz", @@ -2822,6 +2898,16 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", "dev": true }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==" + }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" + }, "node_modules/lodash.merge": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", @@ -2886,8 +2972,7 @@ "node_modules/ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "node_modules/mvdan-sh": { "version": "0.10.1", @@ -3173,6 +3258,22 @@ "node": ">=6" } }, + "node_modules/pure-rand": { + "version": "6.0.4", + "resolved": "https://registry.npmjs.org/pure-rand/-/pure-rand-6.0.4.tgz", + "integrity": "sha512-LA0Y9kxMYv47GIPJy6MI84fqTd2HmYZI83W/kM/SkKfDlajnZYfmXFTxkbY+xSBPkLJxltMa9hIkmdc29eguMA==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/dubzzz" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fast-check" + } + ], + "peer": true + }, "node_modules/queue-microtask": { "version": "1.2.3", "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.3.tgz", @@ -3193,6 +3294,25 @@ } ] }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/regexp.prototype.flags": { "version": "1.5.1", "resolved": "https://registry.npmjs.org/regexp.prototype.flags/-/regexp.prototype.flags-1.5.1.tgz", @@ -3437,6 +3557,11 @@ "node": ">=0.10.0" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==" + }, "node_modules/string.prototype.trim": { "version": "1.2.8", "resolved": "https://registry.npmjs.org/string.prototype.trim/-/string.prototype.trim-1.2.8.tgz", diff --git a/package.json b/package.json index 544aefd..0064317 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,9 @@ "typescript": "^5.3.2" }, "dependencies": { + "@effect/schema": "^0.50.0", "effect": "^2.0.0-next.56", + "ioredis": "^5.3.2", "orcid-id-ts": "^0.1.2" } } diff --git a/src/OrcidId.ts b/src/OrcidId.ts index d0e8e7f..f192164 100644 --- a/src/OrcidId.ts +++ b/src/OrcidId.ts @@ -1,6 +1,9 @@ +import { Schema } from '@effect/schema' import { Brand } from 'effect' import { isOrcid } from 'orcid-id-ts' export type OrcidId = Brand.Branded export const OrcidId = Brand.refined(isOrcid, s => Brand.error(`Expected ${s} to be an ORCID iD`)) + +export const OrcidIdSchema = Schema.string.pipe(Schema.fromBrand(OrcidId)) diff --git a/src/Program.ts b/src/Program.ts index 765b730..ba8fcf6 100644 --- a/src/Program.ts +++ b/src/Program.ts @@ -5,4 +5,4 @@ import * as Users from './Users.js' const processUser = (orcidId: OrcidId.OrcidId) => Effect.logInfo('Processing user').pipe(Effect.annotateLogs('orcidId', orcidId)) -export const program = Users.getUsers.pipe(Stream.runForEach(processUser)) +export const program = Users.getUsers.pipe(Stream.runForEach(processUser)).pipe(Effect.scoped) diff --git a/src/Redis.ts b/src/Redis.ts new file mode 100644 index 0000000..f4fb646 --- /dev/null +++ b/src/Redis.ts @@ -0,0 +1,22 @@ +import { Schema } from '@effect/schema' +import { Context, Data, Effect, type ReadonlyArray, Stream } from 'effect' +import type IoRedis from 'ioredis' + +export type Redis = IoRedis.Redis + +export const Redis = Context.Tag('IoRedis/Redis') + +export class RedisError extends Data.TaggedError('RedisError')<{ + readonly error: unknown +}> {} + +export const scanStream = ( + options: Parameters[0], +): Stream.Stream> => + Stream.unwrap( + Effect.gen(function* (_) { + const redis = yield* _(Redis) + + return Stream.fromAsyncIterable(redis.scanStream(options), error => new RedisError({ error })) + }), + ).pipe(Stream.map(Schema.decodeSync(Schema.array(Schema.string)))) diff --git a/src/Users.ts b/src/Users.ts index 06862b4..781f1dc 100644 --- a/src/Users.ts +++ b/src/Users.ts @@ -1,4 +1,17 @@ -import { Stream } from 'effect' +import { type ParseResult, Schema } from '@effect/schema' +import { ReadonlyArray, type Scope, Stream, String, identity } from 'effect' import * as OrcidId from './OrcidId.js' +import * as Redis from './Redis.js' -export const getUsers = Stream.fromIterable([OrcidId.OrcidId('0000-0001-8778-8651')]) +export const getUsers: Stream.Stream< + Redis.Redis | Scope.Scope, + Redis.RedisError | ParseResult.ParseError, + OrcidId.OrcidId +> = Redis.scanStream({ + match: 'orcid-token:*', +}).pipe( + Stream.mapConcat(identity), + Stream.map(String.split(':')), + Stream.map(ReadonlyArray.lastNonEmpty), + Stream.flatMap(Schema.decodeEither(OrcidId.OrcidIdSchema)), +) diff --git a/src/index.ts b/src/index.ts index fcac8f0..7db88db 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,19 @@ -import { Effect } from 'effect' +import { Effect, Layer } from 'effect' +import IoRedis from 'ioredis' import { program } from './Program.js' +import * as Redis from './Redis.js' -await Effect.runPromise(program) +const RedisLive = Layer.effect( + Redis.Redis, + Effect.gen(function* (_) { + const redis = new IoRedis.Redis() + + yield* _(Effect.addFinalizer(() => Effect.sync(() => redis.disconnect()))) + + return redis + }), +) + +const runnable = Effect.provide(program, RedisLive).pipe(Effect.scoped) + +await Effect.runPromise(runnable)