Skip to content

Commit

Permalink
v0.3.22: fix readable stream lock (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
taichunmin authored Nov 4, 2024
2 parents 82362fe + 1c0f9ac commit e96fb28
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 108 deletions.
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"module": "./dist/index.mjs",
"name": "chameleon-ultra.js",
"type": "commonjs",
"version": "0.3.21",
"version": "0.3.22",
"bugs": {
"url": "https://github.com/taichunmin/chameleon-ultra.js/issues"
},
Expand All @@ -30,31 +30,31 @@
"webbluetooth": "^3.2.1"
},
"devDependencies": {
"@tsconfig/node-lts": "^20.1.3",
"@tsconfig/node-lts": "^22.0.0",
"@types/debug": "^4.1.12",
"@types/finalhandler": "^1.2.3",
"@types/html-minifier": "^4.0.5",
"@types/jest": "^29.5.14",
"@types/livereload": "^0.9.5",
"@types/lodash": "^4.17.13",
"@types/node": "^22.8.4",
"@types/node": "^22.8.7",
"@types/pug": "^2.0.10",
"@types/serve-static": "^1.15.7",
"@types/uglify-js": "^3.17.5",
"@types/web-bluetooth": "^0.0.20",
"@typescript-eslint/eslint-plugin": "^7.18.0",
"@typescript-eslint/parser": "^7.18.0",
"chokidar": "^4.0.1",
"concurrently": "^9.0.1",
"concurrently": "^9.1.0",
"dayjs": "^1.11.13",
"dotenv": "^16.4.5",
"esbuild-plugins-node-modules-polyfill": "^1.6.6",
"esbuild-plugins-node-modules-polyfill": "^1.6.7",
"eslint": "^8.57.0",
"eslint-config-love": "^43",
"eslint-config-standard": "^17.1.0",
"eslint-plugin-import": "^2.31.0",
"eslint-plugin-local-rules": "^3.0.2",
"eslint-plugin-n": "^17.11.1",
"eslint-plugin-n": "^17.12.0",
"eslint-plugin-promise": "^7.1.0",
"eslint-plugin-pug": "^1.2.5",
"eslint-plugin-tsdoc": "^0.3.0",
Expand All @@ -74,8 +74,8 @@
"ts-node": "^10.9.2",
"tsup": "^8.3.5",
"tsx": "^4.19.2",
"typedoc": "^0.26.10",
"typedoc-plugin-mdn-links": "^3.3.5",
"typedoc": "^0.26.11",
"typedoc-plugin-mdn-links": "^3.3.6",
"typedoc-plugin-missing-exports": "^3.0.0",
"typedoc-plugin-rename-defaults": "^0.7.1",
"typescript": "^5.6.3",
Expand Down
61 changes: 22 additions & 39 deletions src/ChameleonUltra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ function toUpperHex (buf: Buffer): string {
export class ChameleonUltra {
#deviceMode: DeviceMode | null = null
#isDisconnecting: boolean = false
#readAsyncGenerator: ReadableToAsyncGenerator<Uint8Array> | null = null
#rxReader: ReadableStreamDefaultReader<Uint8Array> | null = null
#supportedCmds: Set<Cmd> = new Set<Cmd>()
readonly #emitErr: (err: Error) => void
readonly #hooks = new Map<string, ReturnType<typeof middlewareCompose>>()
Expand Down Expand Up @@ -226,9 +226,9 @@ export class ChameleonUltra {
// serial.readable pipeTo this.rxSink
const promiseConnected = new Promise<Date>(resolve => this.emitter.once('connected', resolve))
if (_.isNil(this.port.readable)) throw new Error('this.port.readable is nil')
this.#readAsyncGenerator = new ReadableToAsyncGenerator(this.port.readable)
if (this.isDfu()) void this.#startDfuReadAsyncGenerator()
else void this.#startUltraReadAsyncGenerator()
this.#rxReader = this.port.readable.getReader()
if (this.isDfu()) void this.#dfuStartReading()
else void this.#ultraStartReading()

const connectedAt = await promiseConnected
this.#debug('core', `connected at ${connectedAt.toISOString()}`)
Expand All @@ -241,14 +241,16 @@ export class ChameleonUltra {
}
}

async #startUltraReadAsyncGenerator (): Promise<void> {
const generator = this.#readAsyncGenerator
if (_.isNil(generator)) throw new Error('this.#readAsyncGenerator is nil')
async #ultraStartReading (): Promise<void> {
const reader = this.#rxReader
if (_.isNil(reader)) throw new Error('this.#rxReader is nil')

try {
const bufs: Buffer[] = []
this.emitter.emit('connected', new Date())
for await (const chunk of generator) {
while (true) {
const { done, value: chunk } = await reader.read().catch(err => { throw _.set(new Error(err.message), 'originalError', err) })
if (_.isNil(chunk)) break
bufs.push(Buffer.isBuffer(chunk) ? chunk : Buffer.fromView(chunk))
let concated = Buffer.concat(bufs.splice(0, bufs.length))
try {
Expand All @@ -274,6 +276,7 @@ export class ChameleonUltra {
} finally {
if (concated.length > 0) bufs.push(concated)
}
if (done) break
}
this.emitter.emit('disconnected', new Date())
} catch (err) {
Expand All @@ -282,14 +285,17 @@ export class ChameleonUltra {
}
}

async #startDfuReadAsyncGenerator (): Promise<void> {
const generator = this.#readAsyncGenerator
if (_.isNil(generator)) throw new Error('this.#readAsyncGenerator is nil')
async #dfuStartReading (): Promise<void> {
const reader = this.#rxReader
if (_.isNil(reader)) throw new Error('this.#rxReader is nil')

try {
this.emitter.emit('connected', new Date())
for await (const chunk of generator) {
while (true) {
const { done, value: chunk } = await reader.read().catch(err => { throw _.set(new Error(err.message), 'originalError', err) })
if (_.isNil(chunk)) break
this.emitter.emit('resp', new DfuFrame(Buffer.isBuffer(chunk) ? chunk : Buffer.fromView(chunk)))
if (done) break
}
this.emitter.emit('disconnected', new Date())
} catch (err) {
Expand Down Expand Up @@ -317,13 +323,14 @@ export class ChameleonUltra {
const promiseDisconnected: Promise<[Date, string | undefined]> = this.isConnected() ? new Promise(resolve => {
this.emitter.once('disconnected', (disconnected: Date, reason?: string) => { resolve([disconnected, reason]) })
}) : Promise.resolve([new Date(), err.message])
await this.#readAsyncGenerator?.reader?.cancel(err).catch(this.#emitErr)
await this.#rxReader?.cancel(err).catch(this.#emitErr)
if (this.port?.readable?.locked) this.#rxReader?.releaseLock() // if cancel() not implemented
await this.port?.writable?.close().catch(this.#emitErr)
this.#debug('core', `locked: readable = ${this.port?.readable?.locked ?? '?'}, writable = ${this.port?.writable?.locked ?? '?'}`)
this.port = null

const [disconnectedAt, reason] = await promiseDisconnected
this.#debug('core', `disconnected at ${disconnectedAt.toISOString()}, reason = ${reason ?? '?'}`)
this.#debug('core', `disconnected at ${disconnectedAt.toISOString()}, reason = ${reason ?? err.message}`)
} catch (err) {
throw _.merge(new Error(err.message ?? 'Failed to disconnect'), { originalError: err })
}
Expand Down Expand Up @@ -400,7 +407,7 @@ export class ChameleonUltra {
}): Promise<() => Promise<T>> {
try {
if (!this.isConnected()) await this.connect()
if (_.isNil(this.#readAsyncGenerator)) throw new Error('#readAsyncGenerator is undefined')
if (_.isNil(this.#rxReader)) throw new Error('#rxReader is undefined')
if (_.isNil(args.timeout)) args.timeout = this.readDefaultTimeout
const respGenerator = new EventAsyncGenerator<T>()
this.emitter.on('resp', respGenerator.onData)
Expand Down Expand Up @@ -4264,28 +4271,4 @@ function mfuCheckRespNakCrc16a (resp: Buffer): Buffer {
return data
}

class ReadableToAsyncGenerator<T> implements AsyncGenerator<T> {
readonly reader: ReadableStreamDefaultReader<T>

constructor (readable: ReadableStream<T>) {
this.reader = readable.getReader()
}

async next (): Promise<IteratorResult<T>> {
return await this.reader.read() as IteratorResult<T>
}

async return (): Promise<IteratorResult<T>> {
await this.reader.cancel()
return { done: true, value: undefined }
}

async throw (err: any): Promise<IteratorResult<T>> {
await this.reader.cancel(err)
return { done: true, value: undefined }
}

[Symbol.asyncIterator] (): AsyncGenerator<T> { return this }
}

export { Decoder as ResponseDecoder }
Loading

0 comments on commit e96fb28

Please sign in to comment.