From 8f2b56af310a723647348406e311ef93e928762f Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Fri, 1 Apr 2022 05:15:29 -0700 Subject: [PATCH] Binary protocol channeling over HTTP (#319) * Implement proto 0.14 (doesn't have the Describe message) * Implement binary protocol layering over HTTP Co-authored-by: James Clarke --- package.json | 3 +- src/baseConn.ts | 1012 ++++++++++++++++++++++++++++++++++++++ src/client.ts | 5 +- src/codecs/ifaces.ts | 5 - src/codecs/namedtuple.ts | 2 +- src/codecs/object.ts | 48 +- src/fetchConn.ts | 135 +++++ src/rawConn.ts | 996 +++---------------------------------- test/client.test.ts | 23 +- yarn.lock | 39 ++ 10 files changed, 1288 insertions(+), 980 deletions(-) create mode 100644 src/baseConn.ts create mode 100644 src/fetchConn.ts diff --git a/package.json b/package.json index 1bb13bd2e..f849cb7c4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "edgedb", - "version": "0.20.3", + "version": "0.20.4", "description": "The official Node.js client library for EdgeDB", "homepage": "https://edgedb.com/docs", "author": "EdgeDB ", @@ -28,6 +28,7 @@ "@types/node": "14", "get-stdin": "^7.0.0", "jest": "^26.6.3", + "node-fetch": "2.6.7", "nodemon": "^2.0.13", "prettier": "^2.3.2", "proposal-temporal": "^0.7.0", diff --git a/src/baseConn.ts b/src/baseConn.ts new file mode 100644 index 000000000..45c302a72 --- /dev/null +++ b/src/baseConn.ts @@ -0,0 +1,1012 @@ +/*! + * This source file is part of the EdgeDB open source project. + * + * Copyright 2019-present MagicStack Inc. and the EdgeDB authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {NullCodec, NULL_CODEC} from "./codecs/codecs"; +import {ICodec, uuid} from "./codecs/ifaces"; +import {NamedTupleCodec} from "./codecs/namedtuple"; +import {ObjectCodec} from "./codecs/object"; +import {CodecsRegistry} from "./codecs/registry"; +import {EmptyTupleCodec, EMPTY_TUPLE_CODEC, TupleCodec} from "./codecs/tuple"; +import {versionGreaterThanOrEqual} from "./utils"; +import * as errors from "./errors"; +import {resolveErrorCode} from "./errors/resolve"; +import { + HeaderCodes, + ParseOptions, + PrepareMessageHeaders, + ProtocolVersion, + QueryArgs, + ServerSettings, +} from "./ifaces"; +import { + ReadBuffer, + ReadMessageBuffer, + WriteBuffer, + WriteMessageBuffer, +} from "./primitives/buffer"; +import char, * as chars from "./primitives/chars"; +import Event from "./primitives/event"; +import LRU from "./primitives/lru"; + +export const PROTO_VER: ProtocolVersion = [0, 14]; +export const PROTO_VER_MIN: ProtocolVersion = [0, 9]; + +enum TransactionStatus { + TRANS_IDLE = 0, // connection idle + TRANS_ACTIVE = 1, // command in progress + TRANS_INTRANS = 2, // idle, within transaction block + TRANS_INERROR = 3, // idle, within failed transaction + TRANS_UNKNOWN = 4, // cannot determine status +} + +enum Capabilities { + MODIFICATONS = 0b00001, // query is not read-only + SESSION_CONFIG = 0b00010, // query contains session config change + TRANSACTION = 0b00100, // query contains start/commit/rollback of + // transaction or savepoint manipulation + DDL = 0b01000, // query contains DDL + PERSISTENT_CONFIG = 0b10000, // server or database config change +} + +const NO_TRANSACTION_CAPABILITIES_BYTES = Buffer.from([ + 255, + 255, + 255, + 255, + 255, + 255, + 255, + 255 & ~Capabilities.TRANSACTION & ~Capabilities.SESSION_CONFIG, +]); + +const OLD_ERROR_CODES = new Map([ + [0x05_03_00_01, 0x05_03_01_01], // TransactionSerializationError #2431 + [0x05_03_00_02, 0x05_03_01_02], // TransactionDeadlockError #2431 +]); + +export class BaseRawConnection { + protected connected: boolean = false; + protected alwaysUseOptimisticFlow: boolean = false; // XXX + + protected lastStatus: string | null; + + protected codecsRegistry: CodecsRegistry; + protected queryCodecCache: LRU; + + protected serverSecret: Buffer | null; + /** @internal */ serverSettings: ServerSettings; + private serverXactStatus: TransactionStatus; + + protected buffer: ReadMessageBuffer; + + protected messageWaiter: Event | null; + protected connWaiter: Event; + connAbortWaiter: Event; + + protected _abortedWith: Error | null = null; + + protocolVersion: ProtocolVersion = PROTO_VER; + + /** @internal */ + protected constructor( + registry: CodecsRegistry + ) { + this.buffer = new ReadMessageBuffer(); + + this.codecsRegistry = registry; + this.queryCodecCache = new LRU({capacity: 1000}); + + this.lastStatus = null; + + this.serverSecret = null; + this.serverSettings = {}; + this.serverXactStatus = TransactionStatus.TRANS_UNKNOWN; + + this.messageWaiter = null; + this.connWaiter = new Event(); + this.connAbortWaiter = new Event(); + } + + protected throwNotImplemented(method: string): never { + throw new Error(`method ${method} is not implemented`); + } + + protected async _waitForMessage(): Promise { + this.throwNotImplemented('_waitForMessage'); + } + + protected _sendData(data: Buffer): void { + this.throwNotImplemented('_sendData'); + } + + getConnAbortError(): Error { + return ( + this._abortedWith ?? new errors.InterfaceError(`client has been closed`) + ); + } + + protected _checkState(): void { + if (this.isClosed()) { + throw this.getConnAbortError(); + } + } + + protected _abortWithError(err: Error): void { + this._abortedWith = err; + this._abort(); + } + + protected _ignoreHeaders(): void { + let numFields = this.buffer.readInt16(); + while (numFields) { + this.buffer.readInt16(); + this.buffer.readLenPrefixedBuffer(); + numFields--; + } + } + + protected _abortWaiters(err: Error): void { + if (!this.connWaiter.done) { + this.connWaiter.setError(err); + } + this.messageWaiter?.setError(err); + this.messageWaiter = null; + } + + protected _parseHeaders(): Map { + const ret = new Map(); + let numFields = this.buffer.readInt16(); + while (numFields) { + const key = this.buffer.readInt16(); + const value = this.buffer.readLenPrefixedBuffer(); + ret.set(key, value); + numFields--; + } + return ret; + } + + private _parseDescribeTypeMessage(): [ + number, + ICodec, + ICodec, + number, + Buffer, + Buffer + ] { + const headers = this._parseHeaders(); + let capabilities = -1; + if (headers.has(HeaderCodes.capabilities)) { + capabilities = Number( + headers.get(HeaderCodes.capabilities)!.readBigInt64BE() + ); + } + + const cardinality: char = this.buffer.readChar(); + + const inTypeId = this.buffer.readUUID(); + const inTypeData = this.buffer.readLenPrefixedBuffer(); + + const outTypeId = this.buffer.readUUID(); + const outTypeData = this.buffer.readLenPrefixedBuffer(); + + this.buffer.finishMessage(); + + let inCodec = this.codecsRegistry.getCodec(inTypeId); + if (inCodec == null) { + inCodec = this.codecsRegistry.buildCodec( + inTypeData, + this.protocolVersion + ); + } + + let outCodec = this.codecsRegistry.getCodec(outTypeId); + if (outCodec == null) { + outCodec = this.codecsRegistry.buildCodec( + outTypeData, + this.protocolVersion + ); + } + + return [ + cardinality, + inCodec, + outCodec, + capabilities, + inTypeData, + outTypeData, + ]; + } + + protected _parseCommandCompleteMessage(): string { + this._ignoreHeaders(); + const status = this.buffer.readString(); + this.buffer.finishMessage(); + return status; + } + + protected _parseErrorMessage(): Error { + this.buffer.readChar(); // ignore severity + const code = this.buffer.readUInt32(); + const message = this.buffer.readString(); + this._ignoreHeaders(); // ignore attrs + const errorType = resolveErrorCode(OLD_ERROR_CODES.get(code) ?? code); + this.buffer.finishMessage(); + + const err = new errorType(message); + return err; + } + + protected _parseSyncMessage(): void { + this._parseHeaders(); // TODO: Reject Headers + const status = this.buffer.readChar(); + switch (status) { + case chars.$I: + this.serverXactStatus = TransactionStatus.TRANS_IDLE; + break; + case chars.$T: + this.serverXactStatus = TransactionStatus.TRANS_INTRANS; + break; + case chars.$E: + this.serverXactStatus = TransactionStatus.TRANS_INERROR; + break; + default: + this.serverXactStatus = TransactionStatus.TRANS_UNKNOWN; + } + + this.buffer.finishMessage(); + } + + private _parseDataMessages( + codec: ICodec, + result: Array | WriteBuffer + ): void { + const frb = ReadBuffer.alloc(); + const $D = chars.$D; + const buffer = this.buffer; + + if (Array.isArray(result)) { + while (buffer.takeMessageType($D)) { + buffer.consumeMessageInto(frb); + frb.discard(6); + result.push(codec.decode(frb)); + frb.finish(); + } + } else { + while (buffer.takeMessageType($D)) { + const msg = buffer.consumeMessage(); + result.writeChar($D); + result.writeInt32(msg.length + 4); + result.writeBuffer(msg); + } + } + } + + private _parseServerSettings(name: string, value: Buffer): void { + switch (name) { + case "suggested_pool_concurrency": + this.serverSettings.suggested_pool_concurrency = parseInt( + value.toString("utf8"), + 10 + ); + break; + case "system_config": + const buf = new ReadBuffer(value); + const typedescLen = buf.readInt32() - 16; + const typedescId = buf.readUUID(); + const typedesc = buf.readBuffer(typedescLen); + + let codec = this.codecsRegistry.getCodec(typedescId); + if (codec === null) { + codec = this.codecsRegistry.buildCodec( + typedesc, + this.protocolVersion + ); + } + + buf.discard(4); // discard data length int32 + const data = codec.decode(buf); + buf.finish(); + + this.serverSettings.system_config = data; + break; + default: + this.serverSettings[name] = value; + break; + } + } + + protected _fallthrough(): void { + const mtype = this.buffer.getMessageType(); + + switch (mtype) { + case chars.$S: { + const name = this.buffer.readString(); + const value = this.buffer.readLenPrefixedBuffer(); + this._parseServerSettings(name, value); + this.buffer.finishMessage(); + break; + } + + case chars.$L: { + const severity = this.buffer.readChar(); + const code = this.buffer.readUInt32(); + const message = this.buffer.readString(); + this._parseHeaders(); + this.buffer.finishMessage(); + + /* tslint:disable */ + console.info("SERVER MESSAGE", severity, code, message); + /* tslint:enable */ + + break; + } + + default: + // TODO: terminate connection + throw new Error( + `unexpected message type ${mtype} ("${chars.chr(mtype)}")` + ); + } + } + + async _parse( + query: string, + asJson: boolean, + expectOne: boolean, + alwaysDescribe: boolean, + options?: ParseOptions + ): Promise<[number, ICodec, ICodec, number, Buffer | null, Buffer | null]> { + const wb = new WriteMessageBuffer(); + const parseSendsTypeData = versionGreaterThanOrEqual( + this.protocolVersion, + [0, 14] + ); + + wb.beginMessage(chars.$P) + .writeHeaders({ + explicitObjectids: "true", + ...(options?.headers ?? {}), + allowCapabilities: NO_TRANSACTION_CAPABILITIES_BYTES, + }) + .writeChar(asJson ? chars.$j : chars.$b) + .writeChar(expectOne ? chars.$o : chars.$m) + .writeString("") // statement name + .writeString(query) + .endMessage(); + + wb.writeSync(); + + this._sendData(wb.unwrap()); + + let cardinality: number | void; + let inTypeId: uuid | void; + let outTypeId: uuid | void; + let inCodec: ICodec | null; + let outCodec: ICodec | null; + let capabilities: number = -1; + let parsing = true; + let error: Error | null = null; + let inCodecData: Buffer | null = null; + let outCodecData: Buffer | null = null; + + while (parsing) { + if (!this.buffer.takeMessage()) { + await this._waitForMessage(); + } + + const mtype = this.buffer.getMessageType(); + + switch (mtype) { + case chars.$1: { + const headers = this._parseHeaders(); + if (headers.has(HeaderCodes.capabilities)) { + capabilities = Number( + headers.get(HeaderCodes.capabilities)!.readBigInt64BE() + ); + } + cardinality = this.buffer.readChar(); + + if (parseSendsTypeData) { + inTypeId = this.buffer.readUUID(); + inCodecData = this.buffer.readLenPrefixedBuffer(); + outTypeId = this.buffer.readUUID(); + outCodecData = this.buffer.readLenPrefixedBuffer(); + } else { + inTypeId = this.buffer.readUUID(); + outTypeId = this.buffer.readUUID(); + } + + this.buffer.finishMessage(); + break; + } + + case chars.$E: { + error = this._parseErrorMessage(); + break; + } + + case chars.$Z: { + this._parseSyncMessage(); + parsing = false; + break; + } + + default: + this._fallthrough(); + } + } + + if (error != null) { + throw error; + } + + if (inTypeId == null || outTypeId == null) { + throw new Error("did not receive in/out type ids in Parse response"); + } + + inCodec = this.codecsRegistry.getCodec(inTypeId); + outCodec = this.codecsRegistry.getCodec(outTypeId); + + if (inCodec == null && inCodecData != null) { + inCodec = this.codecsRegistry.buildCodec( + inCodecData, + this.protocolVersion + ); + } + + if (outCodec == null && outCodecData != null) { + outCodec = this.codecsRegistry.buildCodec( + outCodecData, + this.protocolVersion + ); + } + + if ( + inCodec == null || + outCodec == null || + (alwaysDescribe && !parseSendsTypeData) + ) { + if (parseSendsTypeData) { + // unreachable + throw new Error("in/out codecs were not sent"); + } + + wb.reset(); + wb.beginMessage(chars.$D) + .writeInt16(0) // no headers + .writeChar(chars.$T) + .writeString("") // statement name + .endMessage() + .writeSync(); + + this._sendData(wb.unwrap()); + + parsing = true; + while (parsing) { + if (!this.buffer.takeMessage()) { + await this._waitForMessage(); + } + + const mtype = this.buffer.getMessageType(); + + switch (mtype) { + case chars.$T: { + try { + [ + cardinality, + inCodec, + outCodec, + capabilities, + inCodecData, + outCodecData, + ] = this._parseDescribeTypeMessage(); + } catch (e: any) { + error = e; + } + break; + } + + case chars.$E: { + error = this._parseErrorMessage(); + break; + } + + case chars.$Z: { + this._parseSyncMessage(); + parsing = false; + break; + } + + default: + this._fallthrough(); + } + } + + if (error != null) { + throw error; + } + } + + if (cardinality == null || outCodec == null || inCodec == null) { + throw new Error( + "failed to receive type information in response to a Parse message" + ); + } + + return [ + cardinality, + inCodec, + outCodec, + capabilities, + inCodecData, + outCodecData, + ]; + } + + private _encodeArgs(args: QueryArgs, inCodec: ICodec): Buffer { + if (versionGreaterThanOrEqual(this.protocolVersion, [0, 12])) { + if (inCodec === NULL_CODEC) { + if (args != null) { + throw new errors.QueryArgumentError( + `This query does not contain any query parameters, ` + + `but query arguments were provided to the 'query*()' method` + ); + } + return NullCodec.BUFFER; + } + + if (inCodec instanceof ObjectCodec) { + return inCodec.encodeArgs(args); + } + + // Shouldn't ever happen. + throw new Error("invalid input codec"); + } else { + if (inCodec === EMPTY_TUPLE_CODEC) { + if (args != null) { + throw new errors.QueryArgumentError( + `This query does not contain any query parameters, ` + + `but query arguments were provided to the 'query*()' method` + ); + } + return EmptyTupleCodec.BUFFER; + } + + if ( + inCodec instanceof NamedTupleCodec || + inCodec instanceof TupleCodec + ) { + return inCodec.encodeArgs(args); + } + + // Shouldn't ever happen. + throw new Error("invalid input codec"); + } + } + + async _executeFlow( + args: QueryArgs, + inCodec: ICodec, + outCodec: ICodec, + result: Array | WriteBuffer + ): Promise { + const wb = new WriteMessageBuffer(); + wb.beginMessage(chars.$E) + .writeHeaders({allowCapabilities: NO_TRANSACTION_CAPABILITIES_BYTES}) + .writeString("") // statement name + .writeBuffer(this._encodeArgs(args, inCodec)) + .endMessage() + .writeSync(); + + this._sendData(wb.unwrap()); + + let parsing = true; + let error: Error | null = null; + + while (parsing) { + if (!this.buffer.takeMessage()) { + await this._waitForMessage(); + } + + const mtype = this.buffer.getMessageType(); + + switch (mtype) { + case chars.$D: { + if (error == null) { + try { + this._parseDataMessages(outCodec, result); + } catch (e: any) { + error = e; + this.buffer.finishMessage(); + } + } else { + this.buffer.discardMessage(); + } + break; + } + + case chars.$C: { + this.lastStatus = this._parseCommandCompleteMessage(); + break; + } + + case chars.$E: { + error = this._parseErrorMessage(); + break; + } + + case chars.$Z: { + this._parseSyncMessage(); + parsing = false; + break; + } + + default: + this._fallthrough(); + } + } + + if (error != null) { + throw error; + } + } + + private async _optimisticExecuteFlow( + args: QueryArgs, + asJson: boolean, + expectOne: boolean, + requiredOne: boolean, + inCodec: ICodec, + outCodec: ICodec, + query: string, + result: Array | WriteBuffer, + options?: ParseOptions + ): Promise { + const wb = new WriteMessageBuffer(); + wb.beginMessage(chars.$O); + wb.writeHeaders({ + explicitObjectids: "true", + ...(options?.headers ?? {}), + allowCapabilities: NO_TRANSACTION_CAPABILITIES_BYTES, + }); + wb.writeChar(asJson ? chars.$j : chars.$b); + wb.writeChar(expectOne ? chars.$o : chars.$m); + wb.writeString(query); + wb.writeBuffer(inCodec.tidBuffer); + wb.writeBuffer(outCodec.tidBuffer); + wb.writeBuffer(this._encodeArgs(args, inCodec)); + wb.endMessage(); + wb.writeSync(); + + this._sendData(wb.unwrap()); + + let reExec = false; + let error: Error | null = null; + let parsing = true; + let newCard: char | null = null; + let capabilities = -1; + + while (parsing) { + if (!this.buffer.takeMessage()) { + await this._waitForMessage(); + } + + const mtype = this.buffer.getMessageType(); + + switch (mtype) { + case chars.$D: { + if (error == null) { + try { + this._parseDataMessages(outCodec, result); + } catch (e: any) { + error = e; + this.buffer.finishMessage(); + } + } else { + this.buffer.discardMessage(); + } + break; + } + + case chars.$C: { + this.lastStatus = this._parseCommandCompleteMessage(); + break; + } + + case chars.$Z: { + this._parseSyncMessage(); + parsing = false; + break; + } + + case chars.$T: { + try { + [newCard, inCodec, outCodec, capabilities] = + this._parseDescribeTypeMessage(); + const key = this._getQueryCacheKey(query, asJson, expectOne); + this.queryCodecCache.set(key, [ + newCard, + inCodec, + outCodec, + capabilities, + ]); + reExec = true; + } catch (e: any) { + error = e; + } + break; + } + + case chars.$E: { + error = this._parseErrorMessage(); + break; + } + + default: + this._fallthrough(); + } + } + + if (error != null) { + throw error; + } + + if (reExec) { + this._validateFetchCardinality(newCard!, asJson, requiredOne); + return await this._executeFlow(args, inCodec, outCodec, result); + } + } + + private _getQueryCacheKey( + query: string, + asJson: boolean, + expectOne: boolean + ): string { + return [asJson, expectOne, query.length, query].join(";"); + } + + private _validateFetchCardinality( + card: char, + asJson: boolean, + requiredOne: boolean + ): void { + if (requiredOne && card === chars.$n) { + throw new errors.NoDataError( + `query executed via queryRequiredSingle${ + asJson ? "JSON" : "" + }() returned no data` + ); + } + } + + async fetch( + query: string, + args: QueryArgs = null, + asJson: boolean, + expectOne: boolean, + requiredOne: boolean = false + ): Promise { + this._checkState(); + + const key = this._getQueryCacheKey(query, asJson, expectOne); + const ret = new Array(); + + if (this.queryCodecCache.has(key)) { + const [card, inCodec, outCodec] = this.queryCodecCache.get(key)!; + this._validateFetchCardinality(card, asJson, requiredOne); + await this._optimisticExecuteFlow( + args, + asJson, + expectOne, + requiredOne, + inCodec, + outCodec, + query, + ret + ); + } else { + const [card, inCodec, outCodec, capabilities] = await this._parse( + query, + asJson, + expectOne, + false + ); + this._validateFetchCardinality(card, asJson, requiredOne); + this.queryCodecCache.set(key, [card, inCodec, outCodec, capabilities]); + if (this.alwaysUseOptimisticFlow) { + await this._optimisticExecuteFlow( + args, + asJson, + expectOne, + requiredOne, + inCodec, + outCodec, + query, + ret + ); + } else { + await this._executeFlow(args, inCodec, outCodec, ret); + } + } + + if (expectOne) { + if (requiredOne && !ret.length) { + throw new errors.NoDataError("query returned no data"); + } else { + return ret[0] ?? (asJson ? "null" : null); + } + } else { + if (ret && ret.length) { + if (asJson) { + return ret[0]; + } else { + return ret; + } + } else { + if (asJson) { + return "[]"; + } else { + return ret; + } + } + } + } + + getQueryCapabilities( + query: string, + asJson: boolean, + expectOne: boolean + ): number | null { + const key = this._getQueryCacheKey(query, asJson, expectOne); + return this.queryCodecCache.get(key)?.[3] ?? null; + } + + async execute( + query: string, + allowTransactionCommands: boolean = false + ): Promise { + this._checkState(); + + const wb = new WriteMessageBuffer(); + wb.beginMessage(chars.$Q) + .writeHeaders({ + allowCapabilities: !allowTransactionCommands + ? NO_TRANSACTION_CAPABILITIES_BYTES + : undefined, + }) + .writeString(query) // statement name + .endMessage(); + + this._sendData(wb.unwrap()); + + let error: Error | null = null; + let parsing = true; + + while (parsing) { + if (!this.buffer.takeMessage()) { + await this._waitForMessage(); + } + + const mtype = this.buffer.getMessageType(); + + switch (mtype) { + case chars.$C: { + this.lastStatus = this._parseCommandCompleteMessage(); + break; + } + + case chars.$Z: { + this._parseSyncMessage(); + parsing = false; + break; + } + + case chars.$E: { + error = this._parseErrorMessage(); + break; + } + + default: + this._fallthrough(); + } + } + + if (error != null) { + throw error; + } + } + + async resetState(): Promise { + if ( + this.connected && + this.serverXactStatus !== TransactionStatus.TRANS_IDLE + ) { + try { + await this.execute(`rollback`, true); + } catch { + this._abortWithError( + new errors.ClientConnectionClosedError("failed to reset state") + ); + } + } + } + + protected _abort(): void { + this.connected = false; + this._abortWaiters(this.getConnAbortError()); + if (!this.connAbortWaiter.done) { + this.connAbortWaiter.set(); + } + } + + isClosed(): boolean { + return !this.connected; + } + + async close(): Promise { + this._abort(); + } + + // These methods are exposed for use by EdgeDB Studio + public async rawParse( + query: string, + headers?: PrepareMessageHeaders + ): Promise<[ICodec, ICodec, Buffer, Buffer, ProtocolVersion]> { + const result = await this._parse(query, false, false, true, { + headers, + }); + return [ + result[1], + result[2], + result[4]!, + result[5]!, + this.protocolVersion, + ]; + } + + public async rawExecute( + query: string, + outCodec: ICodec, + headers?: PrepareMessageHeaders, + inCodec?: ICodec, + args: QueryArgs = null + ): Promise { + const result = new WriteBuffer(); + inCodec = + inCodec ?? + (versionGreaterThanOrEqual(this.protocolVersion, [0, 12]) + ? NULL_CODEC + : EMPTY_TUPLE_CODEC); + await this._optimisticExecuteFlow( + args, + false, + false, + false, + inCodec, + outCodec, + query, + result, + {headers} + ); + return result.unwrap(); + } + + public async rawExecuteScript(script: string): Promise { + await this.execute(script, true); + } +} diff --git a/src/client.ts b/src/client.ts index fcca1ac76..62a10868c 100644 --- a/src/client.ts +++ b/src/client.ts @@ -179,8 +179,8 @@ export class ClientConnectionHolder { err instanceof errors.EdgeDBError && err.hasTag(errors.SHOULD_RETRY) && // query is readonly or it's a transaction serialization error - (conn.getQueryCapabilities(query, asJson, expectOne) === 0 - || err instanceof errors.TransactionConflictError) + (conn.getQueryCapabilities(query, asJson, expectOne) === 0 || + err instanceof errors.TransactionConflictError) ) { const rule = this.options.retryOptions.getRuleForException(err); if (iteration + 1 >= rule.attempts) { @@ -333,6 +333,7 @@ class ClientPool { const config = await this._getNormalizedConnectConfig(); const connection = await retryingConnect(config, this._codecsRegistry); + const suggestedConcurrency = connection.serverSettings.suggested_pool_concurrency; if ( diff --git a/src/codecs/ifaces.ts b/src/codecs/ifaces.ts index ba20703d3..c5f2af6b5 100644 --- a/src/codecs/ifaces.ts +++ b/src/codecs/ifaces.ts @@ -37,7 +37,6 @@ export interface ICodec { decode(buf: ReadBuffer): any; getSubcodecs(): ICodec[]; - getSubcodecsNames(): string[]; getKind(): CodecKind; getKnownTypeName(): string; } @@ -58,10 +57,6 @@ export abstract class Codec { getKnownTypeName(): string { return "anytype"; } - - getSubcodecsNames(): string[] { - return []; - } } export abstract class ScalarCodec extends Codec { diff --git a/src/codecs/namedtuple.ts b/src/codecs/namedtuple.ts index 66c2dcceb..0afdab402 100644 --- a/src/codecs/namedtuple.ts +++ b/src/codecs/namedtuple.ts @@ -114,7 +114,7 @@ export class NamedTupleCodec extends Codec implements ICodec, IArgsCodec { return Array.from(this.subCodecs); } - getSubcodecsNames(): string[] { + getNames(): string[] { return Array.from(this.names); } diff --git a/src/codecs/object.ts b/src/codecs/object.ts index 6530818b3..2ff7d86c6 100644 --- a/src/codecs/object.ts +++ b/src/codecs/object.ts @@ -20,11 +20,18 @@ import {ICodec, Codec, uuid, CodecKind} from "./ifaces"; import {ReadBuffer, WriteBuffer} from "../primitives/buffer"; import {ONE, AT_LEAST_ONE} from "./consts"; +const EDGE_POINTER_IS_IMPLICIT = 1 << 0; const EDGE_POINTER_IS_LINKPROP = 1 << 1; +export interface ObjectFieldInfo { + name: string; + implicit: boolean; + linkprop: boolean; +} + export class ObjectCodec extends Codec implements ICodec { private codecs: ICodec[]; - private names: string[]; + private fields: ObjectFieldInfo[]; private namesSet: Set; private cardinalities: number[]; @@ -39,17 +46,20 @@ export class ObjectCodec extends Codec implements ICodec { this.codecs = codecs; - const newNames: string[] = new Array(names.length); + this.fields = new Array(names.length); + this.namesSet = new Set(); + this.cardinalities = cards; + for (let i = 0; i < names.length; i++) { - if (flags[i] & EDGE_POINTER_IS_LINKPROP) { - newNames[i] = `@${names[i]}`; - } else { - newNames[i] = names[i]; - } + const isLinkprop = !!(flags[i] & EDGE_POINTER_IS_LINKPROP); + const name = isLinkprop ? `@${names[i]}` : names[i]; + this.fields[i] = { + name, + implicit: !!(flags[i] & EDGE_POINTER_IS_IMPLICIT), + linkprop: isLinkprop, + }; + this.namesSet.add(name); } - this.names = newNames; - this.namesSet = new Set(newNames); - this.cardinalities = cards; } encode(_buf: WriteBuffer, _object: any): void { @@ -57,7 +67,7 @@ export class ObjectCodec extends Codec implements ICodec { } encodeArgs(args: any): Buffer { - if (this.names[0] === "0") { + if (this.fields[0].name === "0") { return this._encodePositionalArgs(args); } return this._encodeNamedArgs(args); @@ -87,7 +97,7 @@ export class ObjectCodec extends Codec implements ICodec { const card = this.cardinalities[i]; if (card === ONE || card === AT_LEAST_ONE) { throw new Error( - `argument ${this.names[i]} is required, but received ${arg}` + `argument ${this.fields[i].name} is required, but received ${arg}` ); } elemData.writeInt32(-1); @@ -111,7 +121,7 @@ export class ObjectCodec extends Codec implements ICodec { } const keys = Object.keys(args); - const names = this.names; + const fields = this.fields; const namesSet = this.namesSet; const codecs = this.codecs; const codecsLen = codecs.length; @@ -127,7 +137,7 @@ export class ObjectCodec extends Codec implements ICodec { const elemData = new WriteBuffer(); for (let i = 0; i < codecsLen; i++) { - const key = names[i]; + const key = fields[i].name; const val = args[key]; elemData.writeInt32(0); // reserved bytes @@ -135,7 +145,7 @@ export class ObjectCodec extends Codec implements ICodec { const card = this.cardinalities[i]; if (card === ONE || card === AT_LEAST_ONE) { throw new Error( - `argument ${this.names[i]} is required, but received ${val}` + `argument ${this.fields[i].name} is required, but received ${val}` ); } elemData.writeInt32(-1); @@ -155,7 +165,7 @@ export class ObjectCodec extends Codec implements ICodec { decode(buf: ReadBuffer): any { const codecs = this.codecs; - const names = this.names; + const fields = this.fields; const els = buf.readUInt32(); if (els !== codecs.length) { @@ -169,7 +179,7 @@ export class ObjectCodec extends Codec implements ICodec { for (let i = 0; i < els; i++) { buf.discard(4); // reserved const elemLen = buf.readInt32(); - const name = names[i]; + const name = fields[i].name; let val = null; if (elemLen !== -1) { buf.sliceInto(elemBuf, elemLen); @@ -186,8 +196,8 @@ export class ObjectCodec extends Codec implements ICodec { return Array.from(this.codecs); } - getSubcodecsNames(): string[] { - return Array.from(this.names); + getFields(): ObjectFieldInfo[] { + return Array.from(this.fields); } getKind(): CodecKind { diff --git a/src/fetchConn.ts b/src/fetchConn.ts new file mode 100644 index 000000000..1c8ba0bfc --- /dev/null +++ b/src/fetchConn.ts @@ -0,0 +1,135 @@ +/*! + * This source file is part of the EdgeDB open source project. + * + * Copyright 2022-present MagicStack Inc. and the EdgeDB authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {CodecsRegistry} from "./codecs/registry"; +import {Address} from "./conUtils"; +import {BaseRawConnection} from "./baseConn"; +import Event from "./primitives/event"; +import * as chars from "./primitives/chars"; + +// @ts-ignore +if (typeof fetch === "undefined") { + // Pre 17.5 NodeJS environment. + // @ts-ignore + var fetch = require("node-fetch"); // tslint:disable-line +} + +interface FetchConfig { + address: Address | string; + database: string; +} + +export class FetchConnection extends BaseRawConnection { + private config: FetchConfig; + private addr: string; + + constructor( + config: FetchConfig, + registry: CodecsRegistry + ) { + super(registry); + this.config = config; + + this.addr = `${ + typeof this.config.address === "string" ? + config.address : `http://${config.address[0]}:${config.address[1]}` + }/admin/protocol/${config.database}`; + } + + protected async _waitForMessage(): Promise { + if (this.buffer.takeMessage()) { + return; + } + + if (this.messageWaiter == null || this.messageWaiter.done) { + throw new Error( + `message waiter was not initialized before waiting for response` + ); + } + + await this.messageWaiter.wait(); + } + + protected async __sendData(data: Buffer): Promise { + if (this.buffer.takeMessage()) { + const mtype = this.buffer.getMessageType(); + throw new Error( + `sending request before reading all data of the previous one: ` + + `${chars.chr(mtype)}`); + } + + if (this.messageWaiter != null && !this.messageWaiter.done) { + throw new Error( + `sending request before waiting for completion of the previous one` + ); + } + + this.messageWaiter = new Event(); + + try { + const resp: any = await fetch(this.addr, { + method: "post", + body: data, + headers: {"Content-Type": "application/x.edgedb"}, + }); + + if (!resp.ok) { + throw new Error( + `fetch failed with status code ${resp.status}: ${resp.statusText}` + ); + } + + const respData: any = await resp.arrayBuffer(); + const buf = Buffer.from(respData); + + let pause = false; + try { + pause = this.buffer.feed(buf); + } catch (e: any) { + this.messageWaiter.setError(e); + } + + if (pause) { + // unreachable + throw new Error('too much data received'); + } + + if (!this.buffer.takeMessage()) { + throw new Error('no binary protocol messages in the response'); + } + + this.messageWaiter.set(); + } catch (e) { + this.messageWaiter.setError(e); + } + } + + protected _sendData(data: Buffer): void { + this.__sendData(data); + } + + static create( + config: FetchConfig, + registry: CodecsRegistry + ): FetchConnection { + const conn = new FetchConnection(config, registry); + conn.connected = true; + conn.alwaysUseOptimisticFlow = true; + return conn; + } +} diff --git a/src/rawConn.ts b/src/rawConn.ts index ef6c74b05..25a68fb93 100644 --- a/src/rawConn.ts +++ b/src/rawConn.ts @@ -17,37 +17,16 @@ */ import {net, tls} from "./adapter.node"; -import {NullCodec, NULL_CODEC} from "./codecs/codecs"; -import {ICodec, uuid} from "./codecs/ifaces"; -import {NamedTupleCodec} from "./codecs/namedtuple"; -import {ObjectCodec} from "./codecs/object"; +import {PROTO_VER, PROTO_VER_MIN, BaseRawConnection} from "./baseConn"; import {CodecsRegistry} from "./codecs/registry"; -import {EmptyTupleCodec, EMPTY_TUPLE_CODEC, TupleCodec} from "./codecs/tuple"; import {Address, NormalizedConnectConfig} from "./conUtils"; -import * as errors from "./errors"; -import {resolveErrorCode} from "./errors/resolve"; -import { - HeaderCodes, - ParseOptions, - PrepareMessageHeaders, - ProtocolVersion, - QueryArgs, - ServerSettings, -} from "./ifaces"; -import { - ReadBuffer, - ReadMessageBuffer, - WriteBuffer, - WriteMessageBuffer, -} from "./primitives/buffer"; -import char, * as chars from "./primitives/chars"; +import {versionGreaterThan, versionGreaterThanOrEqual} from "./utils"; +import {ProtocolVersion} from "./ifaces"; +import {WriteMessageBuffer} from "./primitives/buffer"; import Event from "./primitives/event"; -import LRU from "./primitives/lru"; +import char, * as chars from "./primitives/chars"; import * as scram from "./scram"; -import {versionGreaterThan, versionGreaterThanOrEqual} from "./utils"; - -const PROTO_VER: ProtocolVersion = [0, 13]; -const PROTO_VER_MIN: ProtocolVersion = [0, 9]; +import * as errors from "./errors"; enum AuthenticationStatuses { AUTH_OK = 0, @@ -56,64 +35,11 @@ enum AuthenticationStatuses { AUTH_SASL_FINAL = 12, } -enum TransactionStatus { - TRANS_IDLE = 0, // connection idle - TRANS_ACTIVE = 1, // command in progress - TRANS_INTRANS = 2, // idle, within transaction block - TRANS_INERROR = 3, // idle, within failed transaction - TRANS_UNKNOWN = 4, // cannot determine status -} - -enum Capabilities { - MODIFICATONS = 0b00001, // query is not read-only - SESSION_CONFIG = 0b00010, // query contains session config change - TRANSACTION = 0b00100, // query contains start/commit/rollback of - // transaction or savepoint manipulation - DDL = 0b01000, // query contains DDL - PERSISTENT_CONFIG = 0b10000, // server or database config change -} +export class RawConnection extends BaseRawConnection { + private config: NormalizedConnectConfig; -const NO_TRANSACTION_CAPABILITIES_BYTES = Buffer.from([ - 255, - 255, - 255, - 255, - 255, - 255, - 255, - 255 & ~Capabilities.TRANSACTION & ~Capabilities.SESSION_CONFIG, -]); - -const OLD_ERROR_CODES = new Map([ - [0x05_03_00_01, 0x05_03_01_01], // TransactionSerializationError #2431 - [0x05_03_00_02, 0x05_03_01_02], // TransactionDeadlockError #2431 -]); - -export class RawConnection { private sock: net.Socket; - private config: NormalizedConnectConfig; private paused: boolean; - private connected: boolean = false; - - protected lastStatus: string | null; - - private codecsRegistry: CodecsRegistry; - private queryCodecCache: LRU; - - protected serverSecret: Buffer | null; - /** @internal */ serverSettings: ServerSettings; - private serverXactStatus: TransactionStatus; - - private buffer: ReadMessageBuffer; - - private messageWaiter: Event | null; - - private connWaiter: Event; - - protocolVersion: ProtocolVersion = PROTO_VER; - - private _abortedWith: Error | null = null; - connAbortWaiter: Event; /** @internal */ protected constructor( @@ -121,21 +47,9 @@ export class RawConnection { config: NormalizedConnectConfig, registry: CodecsRegistry ) { - this.buffer = new ReadMessageBuffer(); + super(registry); - this.codecsRegistry = registry; - this.queryCodecCache = new LRU({capacity: 1000}); - - this.lastStatus = null; - - this.serverSecret = null; - this.serverSettings = {}; - this.serverXactStatus = TransactionStatus.TRANS_UNKNOWN; - - this.messageWaiter = null; - - this.connWaiter = new Event(); - this.connAbortWaiter = new Event(); + this.config = config; this.paused = false; this.sock = sock; @@ -143,7 +57,7 @@ export class RawConnection { this.sock.on("error", this._onError.bind(this)); this.sock.on("data", this._onData.bind(this)); - if (this.sock instanceof tls.TLSSocket) { + if (tls.TLSSocket && this.sock instanceof tls.TLSSocket) { // This is bizarre, but "connect" can be fired before // "secureConnect" for some reason. The documentation // doesn't provide a clue why. We need to be able to validate @@ -155,41 +69,12 @@ export class RawConnection { this.sock.on("connect", this._onConnect.bind(this)); } this.sock.on("close", this._onClose.bind(this)); - - this.config = config; - } - - private async _waitForMessage(): Promise { - if (this.buffer.takeMessage()) { - return; - } - - if (this.paused) { - this.paused = false; - this.sock.resume(); - } - - this.sock.ref(); - this.messageWaiter = new Event(); - try { - await this.messageWaiter.wait(); - } finally { - this.sock.unref(); - } } private _onConnect(): void { this.connWaiter.set(); } - private _abortWaiters(err: Error): void { - if (!this.connWaiter.done) { - this.connWaiter.setError(err); - } - this.messageWaiter?.setError(err); - this.messageWaiter = null; - } - private _onClose(): void { if (!this.connected) { return; @@ -201,10 +86,10 @@ export class RawConnection { if (!this.connWaiter.done || this.messageWaiter) { /* This can happen, particularly, during the connect phase. - If the connection is aborted with a client-side timeout, there can be - a situation where the connection has actually been established, - and so `conn.sock.destroy` would simply close the socket, - without invoking the 'error' event. + If the connection is aborted with a client-side timeout, there can be + a situation where the connection has actually been established, + and so `conn.sock.destroy` would simply close the socket, + without invoking the 'error' event. */ this._abortWaiters(newErr); } @@ -219,7 +104,7 @@ export class RawConnection { this._abortWithError(newErr); } - private _onError(err: Error): void { + protected _onError(err: Error): void { const newErr = new errors.ClientConnectionClosedError( `network error: ${err}` ); @@ -234,23 +119,6 @@ export class RawConnection { } } - private _abortWithError(err: Error): void { - this._abortedWith = err; - this._abort(); - } - - getConnAbortError(): Error { - return ( - this._abortedWith ?? new errors.InterfaceError(`client has been closed`) - ); - } - - private _checkState(): void { - if (this.isClosed()) { - throw this.getConnAbortError(); - } - } - private _onData(data: Buffer): void { let pause = false; try { @@ -277,209 +145,62 @@ export class RawConnection { } } - private _ignoreHeaders(): void { - let numFields = this.buffer.readInt16(); - while (numFields) { - this.buffer.readInt16(); - this.buffer.readLenPrefixedBuffer(); - numFields--; - } - } - - private _parseHeaders(): Map { - const ret = new Map(); - let numFields = this.buffer.readInt16(); - while (numFields) { - const key = this.buffer.readInt16(); - const value = this.buffer.readLenPrefixedBuffer(); - ret.set(key, value); - numFields--; - } - return ret; - } - - private _parseDescribeTypeMessage(): [ - number, - ICodec, - ICodec, - number, - Buffer, - Buffer - ] { - const headers = this._parseHeaders(); - let capabilities = -1; - if (headers.has(HeaderCodes.capabilities)) { - capabilities = Number( - headers.get(HeaderCodes.capabilities)!.readBigInt64BE() - ); + protected async _waitForMessage(): Promise { + if (this.buffer.takeMessage()) { + return; } - const cardinality: char = this.buffer.readChar(); - - const inTypeId = this.buffer.readUUID(); - const inTypeData = this.buffer.readLenPrefixedBuffer(); - - const outTypeId = this.buffer.readUUID(); - const outTypeData = this.buffer.readLenPrefixedBuffer(); - - this.buffer.finishMessage(); - - let inCodec = this.codecsRegistry.getCodec(inTypeId); - if (inCodec == null) { - inCodec = this.codecsRegistry.buildCodec( - inTypeData, - this.protocolVersion - ); + if (this.paused) { + this.paused = false; + this.sock.resume(); } - let outCodec = this.codecsRegistry.getCodec(outTypeId); - if (outCodec == null) { - outCodec = this.codecsRegistry.buildCodec( - outTypeData, - this.protocolVersion - ); + this.sock.ref(); + this.messageWaiter = new Event(); + try { + await this.messageWaiter.wait(); + } finally { + this.sock.unref(); } - - return [ - cardinality, - inCodec, - outCodec, - capabilities, - inTypeData, - outTypeData, - ]; } - private _parseCommandCompleteMessage(): string { - this._ignoreHeaders(); - const status = this.buffer.readString(); - this.buffer.finishMessage(); - return status; - } - - private _parseErrorMessage(): Error { - this.buffer.readChar(); // ignore severity - const code = this.buffer.readUInt32(); - const message = this.buffer.readString(); - this._ignoreHeaders(); // ignore attrs - const errorType = resolveErrorCode(OLD_ERROR_CODES.get(code) ?? code); - this.buffer.finishMessage(); - - const err = new errorType(message); - return err; + protected _sendData(data: Buffer): void { + this.sock.write(data); } - private _parseSyncMessage(): void { - this._parseHeaders(); // TODO: Reject Headers - const status = this.buffer.readChar(); - switch (status) { - case chars.$I: - this.serverXactStatus = TransactionStatus.TRANS_IDLE; - break; - case chars.$T: - this.serverXactStatus = TransactionStatus.TRANS_INTRANS; - break; - case chars.$E: - this.serverXactStatus = TransactionStatus.TRANS_INERROR; - break; - default: - this.serverXactStatus = TransactionStatus.TRANS_UNKNOWN; + /** @internal */ + private static newSock( + addr: string | [string, number], + options?: tls.ConnectionOptions + ): net.Socket { + if (typeof addr === "string") { + // unix socket + return net.createConnection(addr); } - this.buffer.finishMessage(); - } - - private _parseDataMessages( - codec: ICodec, - result: Array | WriteBuffer - ): void { - const frb = ReadBuffer.alloc(); - const $D = chars.$D; - const buffer = this.buffer; - - if (Array.isArray(result)) { - while (buffer.takeMessageType($D)) { - buffer.consumeMessageInto(frb); - frb.discard(6); - result.push(codec.decode(frb)); - frb.finish(); - } - } else { - while (buffer.takeMessageType($D)) { - const msg = buffer.consumeMessage(); - result.writeChar($D); - result.writeInt32(msg.length + 4); - result.writeBuffer(msg); - } + const [host, port] = addr; + if (options == null) { + return net.createConnection(port, host); } - } - private _parseServerSettings(name: string, value: Buffer): void { - switch (name) { - case "suggested_pool_concurrency": - this.serverSettings.suggested_pool_concurrency = parseInt( - value.toString("utf8"), - 10 - ); - break; - case "system_config": - const buf = new ReadBuffer(value); - const typedescLen = buf.readInt32() - 16; - const typedescId = buf.readUUID(); - const typedesc = buf.readBuffer(typedescLen); - - let codec = this.codecsRegistry.getCodec(typedescId); - if (codec === null) { - codec = this.codecsRegistry.buildCodec( - typedesc, - this.protocolVersion - ); - } - - buf.discard(4); // discard data length int32 - const data = codec.decode(buf); - buf.finish(); + const opts = {...options, host, port}; + return tls.connect(opts); + } - this.serverSettings.system_config = data; - break; - default: - this.serverSettings[name] = value; - break; + protected _abort(): void { + if (this.sock && this.connected) { + this.sock.destroy(); } + super._abort(); } - private _fallthrough(): void { - const mtype = this.buffer.getMessageType(); - - switch (mtype) { - case chars.$S: { - const name = this.buffer.readString(); - const value = this.buffer.readLenPrefixedBuffer(); - this._parseServerSettings(name, value); - this.buffer.finishMessage(); - break; - } - - case chars.$L: { - const severity = this.buffer.readChar(); - const code = this.buffer.readUInt32(); - const message = this.buffer.readString(); - this._parseHeaders(); - this.buffer.finishMessage(); - - /* tslint:disable */ - console.info("SERVER MESSAGE", severity, code, message); - /* tslint:enable */ - - break; - } - - default: - // TODO: terminate connection - throw new Error( - `unexpected message type ${mtype} ("${chars.chr(mtype)}")` - ); + async close(): Promise { + if (this.sock && this.connected) { + this.sock.write( + new WriteMessageBuffer().beginMessage(chars.$X).endMessage().unwrap() + ); } + return await super.close(); } /** @internal */ @@ -516,8 +237,9 @@ export class RawConnection { } catch (e: any) { conn._abort(); if (timeoutHappened && e instanceof errors.ClientConnectionClosedError) { - /* A race between our timeout `timeoutCb` callback and the client - being actually connected. See the `ConnectionImpl._onClose` method. + /* + A race between our timeout `timeoutCb` callback and the client + being actually connected. See the `ConnectionImpl._onClose` method. */ throw new errors.ClientConnectionTimeoutError( `connection timed out (${config.connectTimeout}ms)` @@ -575,7 +297,7 @@ export class RawConnection { return conn; } - private async connect(): Promise { + protected async connect(): Promise { await this.connWaiter.wait(); if (this.sock instanceof tls.TLSSocket) { @@ -795,606 +517,4 @@ export class RawConnection { } } } - - async _parse( - query: string, - asJson: boolean, - expectOne: boolean, - alwaysDescribe: boolean, - options?: ParseOptions - ): Promise<[number, ICodec, ICodec, number, Buffer | null, Buffer | null]> { - const wb = new WriteMessageBuffer(); - - wb.beginMessage(chars.$P) - .writeHeaders({ - explicitObjectids: "true", - ...(options?.headers ?? {}), - allowCapabilities: NO_TRANSACTION_CAPABILITIES_BYTES, - }) - .writeChar(asJson ? chars.$j : chars.$b) - .writeChar(expectOne ? chars.$o : chars.$m) - .writeString("") // statement name - .writeString(query) - .endMessage(); - - wb.writeSync(); - - this.sock.write(wb.unwrap()); - - let cardinality: number | void; - let inTypeId: uuid | void; - let outTypeId: uuid | void; - let inCodec: ICodec | null; - let outCodec: ICodec | null; - let capabilities: number = -1; - let parsing = true; - let error: Error | null = null; - let inCodecData: Buffer | null = null; - let outCodecData: Buffer | null = null; - - while (parsing) { - if (!this.buffer.takeMessage()) { - await this._waitForMessage(); - } - - const mtype = this.buffer.getMessageType(); - - switch (mtype) { - case chars.$1: { - const headers = this._parseHeaders(); - if (headers.has(HeaderCodes.capabilities)) { - capabilities = Number( - headers.get(HeaderCodes.capabilities)!.readBigInt64BE() - ); - } - cardinality = this.buffer.readChar(); - inTypeId = this.buffer.readUUID(); - outTypeId = this.buffer.readUUID(); - this.buffer.finishMessage(); - break; - } - - case chars.$E: { - error = this._parseErrorMessage(); - break; - } - - case chars.$Z: { - this._parseSyncMessage(); - parsing = false; - break; - } - - default: - this._fallthrough(); - } - } - - if (error != null) { - throw error; - } - - if (inTypeId == null || outTypeId == null) { - throw new Error("did not receive in/out type ids in Parse response"); - } - - inCodec = this.codecsRegistry.getCodec(inTypeId); - outCodec = this.codecsRegistry.getCodec(outTypeId); - - if (inCodec == null || outCodec == null || alwaysDescribe) { - wb.reset(); - wb.beginMessage(chars.$D) - .writeInt16(0) // no headers - .writeChar(chars.$T) - .writeString("") // statement name - .endMessage() - .writeSync(); - - this.sock.write(wb.unwrap()); - - parsing = true; - while (parsing) { - if (!this.buffer.takeMessage()) { - await this._waitForMessage(); - } - - const mtype = this.buffer.getMessageType(); - - switch (mtype) { - case chars.$T: { - try { - [ - cardinality, - inCodec, - outCodec, - capabilities, - inCodecData, - outCodecData, - ] = this._parseDescribeTypeMessage(); - } catch (e: any) { - error = e; - } - break; - } - - case chars.$E: { - error = this._parseErrorMessage(); - break; - } - - case chars.$Z: { - this._parseSyncMessage(); - parsing = false; - break; - } - - default: - this._fallthrough(); - } - } - - if (error != null) { - throw error; - } - } - - if (cardinality == null || outCodec == null || inCodec == null) { - throw new Error( - "failed to receive type information in response to a Parse message" - ); - } - - return [ - cardinality, - inCodec, - outCodec, - capabilities, - inCodecData, - outCodecData, - ]; - } - - private _encodeArgs(args: QueryArgs, inCodec: ICodec): Buffer { - if (versionGreaterThanOrEqual(this.protocolVersion, [0, 12])) { - if (inCodec === NULL_CODEC) { - if (args != null) { - throw new errors.QueryArgumentError( - `This query does not contain any query parameters, ` + - `but query arguments were provided to the 'query*()' method` - ); - } - return NullCodec.BUFFER; - } - - if (inCodec instanceof ObjectCodec) { - return inCodec.encodeArgs(args); - } - - // Shouldn't ever happen. - throw new Error("invalid input codec"); - } else { - if (inCodec === EMPTY_TUPLE_CODEC) { - if (args != null) { - throw new errors.QueryArgumentError( - `This query does not contain any query parameters, ` + - `but query arguments were provided to the 'query*()' method` - ); - } - return EmptyTupleCodec.BUFFER; - } - - if ( - inCodec instanceof NamedTupleCodec || - inCodec instanceof TupleCodec - ) { - return inCodec.encodeArgs(args); - } - - // Shouldn't ever happen. - throw new Error("invalid input codec"); - } - } - - async _executeFlow( - args: QueryArgs | Buffer, - inCodec: ICodec, - outCodec: ICodec, - result: Array | WriteBuffer - ): Promise { - const wb = new WriteMessageBuffer(); - wb.beginMessage(chars.$E) - .writeHeaders({allowCapabilities: NO_TRANSACTION_CAPABILITIES_BYTES}) - .writeString("") // statement name - .writeBuffer( - args instanceof Buffer ? args : this._encodeArgs(args, inCodec) - ) - .endMessage() - .writeSync(); - - this.sock.write(wb.unwrap()); - - let parsing = true; - let error: Error | null = null; - - while (parsing) { - if (!this.buffer.takeMessage()) { - await this._waitForMessage(); - } - - const mtype = this.buffer.getMessageType(); - - switch (mtype) { - case chars.$D: { - if (error == null) { - try { - this._parseDataMessages(outCodec, result); - } catch (e: any) { - error = e; - this.buffer.finishMessage(); - } - } else { - this.buffer.discardMessage(); - } - break; - } - - case chars.$C: { - this.lastStatus = this._parseCommandCompleteMessage(); - break; - } - - case chars.$E: { - error = this._parseErrorMessage(); - break; - } - - case chars.$Z: { - this._parseSyncMessage(); - parsing = false; - break; - } - - default: - this._fallthrough(); - } - } - - if (error != null) { - throw error; - } - } - - private async _optimisticExecuteFlow( - args: QueryArgs, - asJson: boolean, - expectOne: boolean, - requiredOne: boolean, - inCodec: ICodec, - outCodec: ICodec, - query: string, - result: Array - ): Promise { - const wb = new WriteMessageBuffer(); - wb.beginMessage(chars.$O); - wb.writeHeaders({ - allowCapabilities: NO_TRANSACTION_CAPABILITIES_BYTES, - explicitObjectids: "true", - }); - wb.writeChar(asJson ? chars.$j : chars.$b); - wb.writeChar(expectOne ? chars.$o : chars.$m); - wb.writeString(query); - wb.writeBuffer(inCodec.tidBuffer); - wb.writeBuffer(outCodec.tidBuffer); - wb.writeBuffer(this._encodeArgs(args, inCodec)); - wb.endMessage(); - wb.writeSync(); - - this.sock.write(wb.unwrap()); - - let reExec = false; - let error: Error | null = null; - let parsing = true; - let newCard: char | null = null; - let capabilities = -1; - - while (parsing) { - if (!this.buffer.takeMessage()) { - await this._waitForMessage(); - } - - const mtype = this.buffer.getMessageType(); - - switch (mtype) { - case chars.$D: { - if (error == null) { - try { - this._parseDataMessages(outCodec, result); - } catch (e: any) { - error = e; - this.buffer.finishMessage(); - } - } else { - this.buffer.discardMessage(); - } - break; - } - - case chars.$C: { - this.lastStatus = this._parseCommandCompleteMessage(); - break; - } - - case chars.$Z: { - this._parseSyncMessage(); - parsing = false; - break; - } - - case chars.$T: { - try { - [newCard, inCodec, outCodec, capabilities] = - this._parseDescribeTypeMessage(); - const key = this._getQueryCacheKey(query, asJson, expectOne); - this.queryCodecCache.set(key, [ - newCard, - inCodec, - outCodec, - capabilities, - ]); - reExec = true; - } catch (e: any) { - error = e; - } - break; - } - - case chars.$E: { - error = this._parseErrorMessage(); - break; - } - - default: - this._fallthrough(); - } - } - - if (error != null) { - throw error; - } - - if (reExec) { - this._validateFetchCardinality(newCard!, asJson, requiredOne); - return await this._executeFlow(args, inCodec, outCodec, result); - } - } - - private _getQueryCacheKey( - query: string, - asJson: boolean, - expectOne: boolean - ): string { - return [asJson, expectOne, query.length, query].join(";"); - } - - private _validateFetchCardinality( - card: char, - asJson: boolean, - requiredOne: boolean - ): void { - if (requiredOne && card === chars.$n) { - throw new errors.NoDataError( - `query executed via queryRequiredSingle${ - asJson ? "JSON" : "" - }() returned no data` - ); - } - } - - async fetch( - query: string, - args: QueryArgs = null, - asJson: boolean, - expectOne: boolean, - requiredOne: boolean = false - ): Promise { - this._checkState(); - - const key = this._getQueryCacheKey(query, asJson, expectOne); - const ret = new Array(); - - if (this.queryCodecCache.has(key)) { - const [card, inCodec, outCodec] = this.queryCodecCache.get(key)!; - this._validateFetchCardinality(card, asJson, requiredOne); - await this._optimisticExecuteFlow( - args, - asJson, - expectOne, - requiredOne, - inCodec, - outCodec, - query, - ret - ); - } else { - const [card, inCodec, outCodec, capabilities] = await this._parse( - query, - asJson, - expectOne, - false - ); - this._validateFetchCardinality(card, asJson, requiredOne); - this.queryCodecCache.set(key, [card, inCodec, outCodec, capabilities]); - await this._executeFlow(args, inCodec, outCodec, ret); - } - - if (expectOne) { - if (requiredOne && !ret.length) { - throw new errors.NoDataError("query returned no data"); - } else { - return ret[0] ?? (asJson ? "null" : null); - } - } else { - if (ret && ret.length) { - if (asJson) { - return ret[0]; - } else { - return ret; - } - } else { - if (asJson) { - return "[]"; - } else { - return ret; - } - } - } - } - - getQueryCapabilities( - query: string, - asJson: boolean, - expectOne: boolean - ): number | null { - const key = this._getQueryCacheKey(query, asJson, expectOne); - return this.queryCodecCache.get(key)?.[3] ?? null; - } - - async execute( - query: string, - allowTransactionCommands: boolean = false - ): Promise { - this._checkState(); - - const wb = new WriteMessageBuffer(); - wb.beginMessage(chars.$Q) - .writeHeaders({ - allowCapabilities: !allowTransactionCommands - ? NO_TRANSACTION_CAPABILITIES_BYTES - : undefined, - }) - .writeString(query) // statement name - .endMessage(); - - this.sock.write(wb.unwrap()); - - let error: Error | null = null; - let parsing = true; - - while (parsing) { - if (!this.buffer.takeMessage()) { - await this._waitForMessage(); - } - - const mtype = this.buffer.getMessageType(); - - switch (mtype) { - case chars.$C: { - this.lastStatus = this._parseCommandCompleteMessage(); - break; - } - - case chars.$Z: { - this._parseSyncMessage(); - parsing = false; - break; - } - - case chars.$E: { - error = this._parseErrorMessage(); - break; - } - - default: - this._fallthrough(); - } - } - - if (error != null) { - throw error; - } - } - - async resetState(): Promise { - if ( - this.connected && - this.serverXactStatus !== TransactionStatus.TRANS_IDLE - ) { - try { - await this.execute(`rollback`, true); - } catch { - this._abortWithError( - new errors.ClientConnectionClosedError("failed to reset state") - ); - } - } - } - - private _abort(): void { - if (this.sock && this.connected) { - this.sock.destroy(); - } - this.connected = false; - this._abortWaiters(this.getConnAbortError()); - if (!this.connAbortWaiter.done) { - this.connAbortWaiter.set(); - } - } - - isClosed(): boolean { - return !this.connected; - } - - async close(): Promise { - if (this.sock && this.connected) { - this.sock.write( - new WriteMessageBuffer().beginMessage(chars.$X).endMessage().unwrap() - ); - } - this._abort(); - } - - /** @internal */ - private static newSock( - addr: string | [string, number], - options?: tls.ConnectionOptions - ): net.Socket { - if (typeof addr === "string") { - // unix socket - return net.createConnection(addr); - } - - const [host, port] = addr; - if (options == null) { - return net.createConnection(port, host); - } - - const opts = {...options, host, port}; - return tls.connect(opts); - } - - // These methods are exposed for use by EdgeDB Studio - public async rawParse( - query: string, - headers?: PrepareMessageHeaders - ): Promise<[Buffer, Buffer, ProtocolVersion]> { - const result = await this._parse(query, false, false, true, { - headers, - }); - return [result[4]!, result[5]!, this.protocolVersion]; - } - - public async rawExecute(encodedArgs: Buffer | null = null): Promise { - const result = new WriteBuffer(); - let inCodec = EMPTY_TUPLE_CODEC; - if (versionGreaterThanOrEqual(this.protocolVersion, [0, 12])) { - inCodec = NULL_CODEC; - } - await this._executeFlow( - encodedArgs, // arguments - inCodec, // inCodec -- to encode lack of arguments. - EMPTY_TUPLE_CODEC, // outCodec -- does not matter, it will not be used. - result - ); - return result.unwrap(); - } } diff --git a/test/client.test.ts b/test/client.test.ts index d90e53a10..c9528015a 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -1110,7 +1110,6 @@ test("fetch: set of arrays", async () => { limit 1 `); - res = res.sets; expect(res).toEqual([[1, 2], [1]]); expect(res.length).toBe(2); @@ -1566,19 +1565,15 @@ test("'implicit*' headers", async () => { const registry = new _CodecsRegistry(); const con = await retryingConnect(config, registry); try { - const [_, outCodecData, protocolVersion] = await con.rawParse( - `SELECT schema::Function { + const query = `SELECT schema::Function { name - }`, - { - implicitTypenames: "true", - implicitLimit: "5", - } - ); - const resultData = await con.rawExecute(); - - const registry = new _CodecsRegistry(); - const codec = registry.buildCodec(outCodecData, protocolVersion); + }`; + const headers = { + implicitTypenames: "true", + implicitLimit: "5", + } as const; + const [_, outCodec] = await con.rawParse(query, headers); + const resultData = await con.rawExecute(query, outCodec, headers); const result = new Array(); const buf = new _ReadBuffer(resultData); @@ -1593,7 +1588,7 @@ test("'implicit*' headers", async () => { buf.sliceInto(codecReadBuf, len - 4); codecReadBuf.discard(6); - const val = codec.decode(codecReadBuf); + const val = outCodec.decode(codecReadBuf); result.push(val); } diff --git a/yarn.lock b/yarn.lock index 4d68baa50..1293abf5e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2024,6 +2024,7 @@ __metadata: "@types/node": 14 get-stdin: ^7.0.0 jest: ^26.6.3 + node-fetch: 2.6.7 nodemon: ^2.0.13 prettier: ^2.3.2 proposal-temporal: ^0.7.0 @@ -4367,6 +4368,20 @@ __metadata: languageName: node linkType: hard +"node-fetch@npm:2.6.7": + version: 2.6.7 + resolution: "node-fetch@npm:2.6.7" + dependencies: + whatwg-url: ^5.0.0 + peerDependencies: + encoding: ^0.1.0 + peerDependenciesMeta: + encoding: + optional: true + checksum: 8d816ffd1ee22cab8301c7756ef04f3437f18dace86a1dae22cf81db8ef29c0bf6655f3215cb0cdb22b420b6fe141e64b26905e7f33f9377a7fa59135ea3e10b + languageName: node + linkType: hard + "node-gyp@npm:latest": version: 8.4.0 resolution: "node-gyp@npm:8.4.0" @@ -5805,6 +5820,13 @@ __metadata: languageName: node linkType: hard +"tr46@npm:~0.0.3": + version: 0.0.3 + resolution: "tr46@npm:0.0.3" + checksum: 726321c5eaf41b5002e17ffbd1fb7245999a073e8979085dacd47c4b4e8068ff5777142fc6726d6ca1fd2ff16921b48788b87225cbc57c72636f6efa8efbffe3 + languageName: node + linkType: hard + "ts-jest@npm:^26.5.2": version: 26.5.6 resolution: "ts-jest@npm:26.5.6" @@ -6187,6 +6209,13 @@ typescript@^4.5.2: languageName: node linkType: hard +"webidl-conversions@npm:^3.0.0": + version: 3.0.1 + resolution: "webidl-conversions@npm:3.0.1" + checksum: c92a0a6ab95314bde9c32e1d0a6dfac83b578f8fa5f21e675bc2706ed6981bc26b7eb7e6a1fab158e5ce4adf9caa4a0aee49a52505d4d13c7be545f15021b17c + languageName: node + linkType: hard + "webidl-conversions@npm:^5.0.0": version: 5.0.0 resolution: "webidl-conversions@npm:5.0.0" @@ -6217,6 +6246,16 @@ typescript@^4.5.2: languageName: node linkType: hard +"whatwg-url@npm:^5.0.0": + version: 5.0.0 + resolution: "whatwg-url@npm:5.0.0" + dependencies: + tr46: ~0.0.3 + webidl-conversions: ^3.0.0 + checksum: b8daed4ad3356cc4899048a15b2c143a9aed0dfae1f611ebd55073310c7b910f522ad75d727346ad64203d7e6c79ef25eafd465f4d12775ca44b90fa82ed9e2c + languageName: node + linkType: hard + "whatwg-url@npm:^8.0.0, whatwg-url@npm:^8.5.0": version: 8.7.0 resolution: "whatwg-url@npm:8.7.0"