From 6e6e896fbf1e9fda393d91fdd02db9812f90493d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Can=20G=C3=BCven?= Date: Mon, 6 May 2019 16:49:57 +0300 Subject: [PATCH 1/2] Implemented retry stream --- README.md | 24 ++++---- demo.ts | 9 +++ demo/demo-starter.ts | 99 --------------------------------- jest.config.js | 4 +- package.json | 2 +- src/cache-then-network.ts | 76 +++++++++++++------------ src/holder.ts | 8 +-- src/network.ts | 3 +- src/retry.ts | 43 ++++++++++++++ src/stream-factory.ts | 9 ++- src/warden-stream.ts | 14 ++--- test/cache-then-network.spec.ts | 2 +- test/holder.spec.ts | 8 ++- test/network.spec.ts | 3 +- 14 files changed, 133 insertions(+), 171 deletions(-) create mode 100644 demo.ts delete mode 100644 demo/demo-starter.ts create mode 100644 src/retry.ts diff --git a/README.md b/README.md index 0403e6e..e49e8ba 100644 --- a/README.md +++ b/README.md @@ -10,16 +10,16 @@ Warden is an outgoing request optimizer for creating fast and scalable applicati [![Codacy](https://api.codacy.com/project/badge/Grade/e806d72373414fd9818ab2a403f1b36d)](https://www.codacy.com/app/Acanguven/puzzle-warden?utm_source=github.com&utm_medium=referral&utm_content=puzzle-js/puzzle-warden&utm_campaign=Badge_Grade) ## Features -- 📥 **Smart Caching** Caches requests by converting HTTP requests to smart key strings. ✅ -- 🚧 **Request Holder** Stopping same request to be sent multiple times. ✅ -- 🔌 **Support** Warden can be used with anything but it supports [request](https://github.com/request/request) out of the box. ✅ -- 😎 **Easy Implementation** Warden can be easily implemented with a few lines of codes. ✅ -- 🔁 **Request Retry** Requests will automatically be re-attempted on recoverable errors. 📝 -- 📇 **Schema Parser** Warden uses a schema which can be provided by you for parsing JSON faster. 📝 -- 🚥 **API Queue** Throttles API calls to protect target service. 📝 -- 👻 **Request Shadowing** Copies a fraction of traffic to a new deployment for observation. 📝 -- 🚉 **Reverse Proxy** It can be deployable as an external application which can serve as a reverse proxy. 📝 -- 📛 **Circuit Breaker** Immediately refuses new requests to provide time for the API to become healthy. 📝 +- 📥 **Smart Caching** Caches requests by converting HTTP requests to smart key strings. ✅ +- 🚧 **Request Holder** Stopping same request to be sent multiple times. ✅ +- 🔌 **Support** Warden can be used with anything but it supports [request](https://github.com/request/request) out of the box. ✅ +- 😎 **Easy Implementation** Warden can be easily implemented with a few lines of codes. ✅ +- 🔁 **Request Retry** Requests will automatically be re-attempted on recoverable errors. 📝 +- 📇 **Schema Parser** Warden uses a schema which can be provided by you for parsing JSON faster. 📝 +- 🚥 **API Queue** Throttles API calls to protect target service. 📝 +- 👻 **Request Shadowing** Copies a fraction of traffic to a new deployment for observation. 📝 +- 🚉 **Reverse Proxy** It can be deployable as an external application which can serve as a reverse proxy. 📝 +- 📛 **Circuit Breaker** Immediately refuses new requests to provide time for the API to become healthy. 📝 ![Warden Achitecture](./warden_architecture.svg) @@ -35,11 +35,11 @@ Warden is an outgoing request optimizer for creating fast and scalable applicati ### Installing Yarn -``` +```bash yarn add puzzle-warden ``` Npm -``` +```bash npm i puzzle-warden --save ``` diff --git a/demo.ts b/demo.ts new file mode 100644 index 0000000..4712122 --- /dev/null +++ b/demo.ts @@ -0,0 +1,9 @@ +const request = require("request"); + + +request({ + url: 'https://m.trendyol.com', + timeout: 5 +}, (err: any, res: any, data: any) => { + console.log(err.connect); +}); \ No newline at end of file diff --git a/demo/demo-starter.ts b/demo/demo-starter.ts deleted file mode 100644 index 6381d4f..0000000 --- a/demo/demo-starter.ts +++ /dev/null @@ -1,99 +0,0 @@ -import {Warden} from "../src/warden"; -import {CacheFactory} from "../src/cache-factory"; -import {RequestManager} from "../src/request-manager"; -import {Tokenizer} from "../src/tokenizer"; -import {StreamFactory} from "../src/stream-factory"; -import {RequestWrapper} from "../src/request-wrapper"; - -const request = require('request'); - -const cacheFactory = new CacheFactory(); - -const tokenizer = new Tokenizer(); -const requestWrapper = new RequestWrapper(); -const streamFactory = new StreamFactory(cacheFactory, requestWrapper); -const requestManager = new RequestManager(streamFactory, tokenizer); -const warden = new Warden(requestManager, requestWrapper); - - -warden.register('test', { - identifier: 'ty_{query.foo2}_{cookie.osman}', - cache: true, - holder: true -}); - -import https from "https"; -const agent = new https.Agent({ - keepAlive: true, - keepAliveMsecs: 1000, - maxSockets: 50 -}); -let input = 0; -let output = 0; -let failedToPush = 0; -let stop = false; - -// setTimeout(() => { -// stop = true; -// console.log(`${output}/${failedToPush}/${input}`); -// console.log(`${(output / 100).toFixed(2)} rps`); -// }, 30000); -let errorCount = 0; -let responseCount = 0; -const startTime = Date.now(); -const requestCount = 6000; -const newRequest = () => { - input++; - // - // warden.request('test', { - // url: `https://postman-echo.com/get?foo1=${Math.random().toFixed(2)}&foo2=${Math.random().toFixed(2)}`, - // headers: { - // cookie: `osman=${Math.random().toFixed(1)}` - // }, - // gzip: true, - // json: true, - // method: "get", - // agent, - // strictSSL: false, - // timeout: 1000 - // }, (err, response, data) => { - // if (!err && data) { - // responseCount++; - // } else { - // errorCount++; - // } - // output++; - // - // if(output >= requestCount){ - // console.log(Date.now() - startTime); - // } - // }); - // - request({ - url: `https://postman-echo.com/get?foo1=${Math.random().toFixed(2)}&foo2=${Math.random().toFixed(2)}`, - gzip: true, - json: true, - timeout: 1000, - headers: { - cookie: `osman=${Math.random().toFixed(1)}` - }, - agent, - strictSSL: false, - method: "get", - }, (err: any, res: any, data: any) => { - if (!err && data) { - responseCount++; - } else { - errorCount++; - } - output++; - - if(output >= requestCount){ - console.log(Date.now() - startTime); - } - }); - if (input <= requestCount) newRequest(); -}; - -newRequest(); -setTimeout(() => {}, 100000); diff --git a/jest.config.js b/jest.config.js index 95622d4..0dba61f 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,6 +1,6 @@ module.exports = { - preset: 'ts-jest', - testEnvironment: 'node', + preset: "ts-jest", + testEnvironment: "node", collectCoverageFrom: [ "src/**/*.ts", "!src/**/*.d.ts", diff --git a/package.json b/package.json index a776a84..adba0f9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "puzzle-warden", - "version": "1.3.0", + "version": "1.4.0", "main": "dist/index.js", "types": "dist/index.d.ts", "license": "MIT", diff --git a/src/cache-then-network.ts b/src/cache-then-network.ts index 7ef6ee5..b749dc3 100644 --- a/src/cache-then-network.ts +++ b/src/cache-then-network.ts @@ -4,47 +4,53 @@ import {StreamType} from "./stream-factory"; import {CachePlugin} from "./cache-factory"; import request from "request"; +interface CacheDecoratedResponse extends ResponseChunk { + cacheHit: boolean; +} + +interface CacheDecoratedRequest extends RequestChunk { + cacheHit: boolean; +} class CacheThenNetwork extends WardenStream { - private readonly storage: CachePlugin; - private readonly ms?: number; - - constructor(plugin: CachePlugin, ms?: number) { - super(StreamType.CACHE); - - this.ms = ms; - this.storage = plugin; - } - - async onResponse(chunk: ResponseChunk, callback: TransformCallback): Promise { - if (!chunk.cacheHit && !chunk.error && chunk.response) { - if(chunk.response.headers["set-cookie"]){ - console.warn('Detected dangerous response with set-cookie header, not caching', chunk.key); - }else{ - await this.storage.set(chunk.key, chunk.response, this.ms); - } + private readonly storage: CachePlugin; + private readonly ms?: number; + + constructor(plugin: CachePlugin, ms?: number) { + super(StreamType.CACHE); + + this.ms = ms; + this.storage = plugin; + } + + async onResponse(chunk: CacheDecoratedResponse, callback: TransformCallback): Promise { + if (!chunk.cacheHit && !chunk.error && chunk.response) { + if (chunk.response.headers["set-cookie"]) { + console.warn('Detected dangerous response with set-cookie header, not caching', chunk.key); + } else { + await this.storage.set(chunk.key, chunk.response, this.ms); + } + } + + callback(undefined, chunk); } - callback(undefined, chunk); - } - - async onRequest(chunk: RequestChunk, callback: TransformCallback): Promise { - const cachedData = await this.storage.get(chunk.key) as request.Response; - - if (cachedData) { - this.respond({ - key: chunk.key, - cb: chunk.cb, - response: cachedData, - cacheHit: true - }); - callback(undefined, null); - } else { - callback(undefined, chunk); + async onRequest(chunk: CacheDecoratedRequest, callback: TransformCallback): Promise { + const cachedData = await this.storage.get(chunk.key) as request.Response; + + if (cachedData) { + this.respond({ + ...chunk, + response: cachedData, + cacheHit: true + }); + callback(undefined, null); + } else { + callback(undefined, chunk); + } } - } } export { - CacheThenNetwork + CacheThenNetwork }; diff --git a/src/holder.ts b/src/holder.ts index 8f4f998..14d9b14 100644 --- a/src/holder.ts +++ b/src/holder.ts @@ -2,9 +2,6 @@ import {RequestChunk, ResponseChunk, WardenStream} from "./warden-stream"; import {TransformCallback} from "stream"; import {StreamType} from "./stream-factory"; -interface HolderConfiguration { - -} class Holder extends WardenStream { private holdQueue: { [key: string]: RequestChunk[] | null } = {}; @@ -20,10 +17,8 @@ class Holder extends WardenStream { if (holdQueue) { holdQueue.forEach(holdChunk => { this.respond({ + ...chunk, cb: holdChunk.cb, - key: chunk.key, - response: chunk.response, - error: chunk.error }); }); @@ -48,6 +43,5 @@ class Holder extends WardenStream { } export { - HolderConfiguration, Holder }; diff --git a/src/network.ts b/src/network.ts index 3494eac..5bd3dee 100644 --- a/src/network.ts +++ b/src/network.ts @@ -21,8 +21,7 @@ class Network extends WardenStream { onRequest(chunk: RequestChunk, callback: TransformCallback): void { this.requestWrapper.request[chunk.requestOptions.method](chunk.requestOptions, (error, response) => { this.respond({ - key: chunk.key, - cb: chunk.cb, + ...chunk, response, error }); diff --git a/src/retry.ts b/src/retry.ts new file mode 100644 index 0000000..1305d2e --- /dev/null +++ b/src/retry.ts @@ -0,0 +1,43 @@ +import {RequestChunk, ResponseChunk, WardenStream} from "./warden-stream"; +import {TransformCallback} from "stream"; + +interface RetryDecoratedResponse extends ResponseChunk { + retryCount: number; +} + +interface RetryDecoratedRequest extends RequestChunk { + retryCount: number; +} + +class Retry extends WardenStream { + retryLimit: number; + + constructor(retryLimit: number) { + super('Retry'); + + this.retryLimit = retryLimit; + } + + async onResponse(chunk: RetryDecoratedResponse, callback: TransformCallback): Promise { + if (chunk.error && chunk.retryCount <= this.retryLimit) { + this.request({ + ...chunk, + retryCount: chunk.retryCount + 1 + }); + callback(undefined, null); + } else { + callback(undefined, chunk); + } + } + + async onRequest(chunk: RetryDecoratedRequest, callback: TransformCallback): Promise { + callback(null, { + ...chunk, + retryCount: 0 + } as RequestChunk); + } +} + +export { + Retry +}; \ No newline at end of file diff --git a/src/stream-factory.ts b/src/stream-factory.ts index 569ba35..9242e8e 100644 --- a/src/stream-factory.ts +++ b/src/stream-factory.ts @@ -3,6 +3,7 @@ import {CacheFactory} from "./cache-factory"; import {StreamHead} from "./stream-head"; import {Holder} from "./holder"; import {RequestWrapper} from "./request-wrapper"; +import {Retry} from "./retry"; const enum StreamType { HOLDER = 'holder', @@ -10,12 +11,14 @@ const enum StreamType { NETWORK = 'network', QUEUE = 'queue', CIRCUIT = 'circuit', - HEAD = 'head' + HEAD = 'head', + RETRY = 'retry' } enum ConfigurableStream { HOLDER = StreamType.HOLDER, - CACHE = StreamType.CACHE + CACHE = StreamType.CACHE, + RETRY = StreamType.RETRY } @@ -41,6 +44,8 @@ class StreamFactory { // throw new Error('Not implemented'); case StreamType.NETWORK: return new Network(this.requestWrapper) as unknown as U; + case StreamType.RETRY: + return new Retry(3) as unknown as U; case StreamType.HEAD: return new StreamHead() as unknown as U; default: diff --git a/src/warden-stream.ts b/src/warden-stream.ts index 217e556..e1800cd 100644 --- a/src/warden-stream.ts +++ b/src/warden-stream.ts @@ -4,18 +4,18 @@ import {RequestCallback} from "request"; import {RequestOptions} from "./request-manager"; import request from "request"; + interface RequestChunk { key: string; requestOptions: RequestOptions; cb: RequestCallback; } -interface ResponseChunk { - cb: RequestCallback; - key: string; +interface ResponseChunk extends RequestChunk{ response?: request.Response; - error?: any; - cacheHit?: true; + error?: { + name: string + }; } interface WardenStreamer { @@ -112,11 +112,11 @@ abstract class WardenStream implements WardenStreamer { return wardenStream; } - respond(chunk: ResponseChunk): boolean { + respond(chunk: T): boolean { return this.responseStream.push(chunk); } - request(chunk: RequestChunk): boolean { + request(chunk: T): boolean { return this.requestStream.push(chunk); } diff --git a/test/cache-then-network.spec.ts b/test/cache-then-network.spec.ts index de12c5b..c5a679e 100644 --- a/test/cache-then-network.spec.ts +++ b/test/cache-then-network.spec.ts @@ -74,7 +74,7 @@ describe("[cache.ts]", () => { cb: chunk.cb, response: response as any, cacheHit: true - })).to.eq(true); + } as any)).to.eq(true); }); it("should return incoming response without caching if cache hit flag true", async () => { diff --git a/test/holder.spec.ts b/test/holder.spec.ts index 6b56d25..8d21b6c 100644 --- a/test/holder.spec.ts +++ b/test/holder.spec.ts @@ -80,10 +80,13 @@ describe("[holder.ts]", () => { const responseChunk = { key, response: {}, + requestOptions: {}, + error: undefined } as any; const requestChunk = { key, - cb: sandbox.stub() + cb: sandbox.stub(), + requestOptions: {} } as any; const requestSpy = sandbox.stub(); const respondSpy = sandbox.stub(holder, 'respond'); @@ -98,7 +101,8 @@ describe("[holder.ts]", () => { key: responseChunk.key, response: responseChunk.response, cb: requestChunk.cb, - error: undefined + error: undefined, + requestOptions: responseChunk.requestOptions })).to.eq(true); expect(spy.calledOnce).to.eq(true); expect(spy.calledWithExactly(undefined, null)).to.eq(true); diff --git a/test/network.spec.ts b/test/network.spec.ts index 7c45fdf..f15bcf7 100644 --- a/test/network.spec.ts +++ b/test/network.spec.ts @@ -74,7 +74,8 @@ describe("[network.ts]", () => { key: chunk.key, cb: chunk.cb, response: response as unknown as request.Response, - error: null + error: null as any, + requestOptions: chunk.requestOptions as any })).to.eq(true); }); From 227acfe2b7b85300d9e8560e8723ea72c3f172d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Can=20G=C3=BCven?= Date: Wed, 8 May 2019 10:51:22 +0300 Subject: [PATCH 2/2] Retry implemented --- CHANGELOG.md | 7 + README.md | 36 ++++- demo.ts | 9 -- src/cache-factory.ts | 9 +- src/request-manager.ts | 17 ++- src/retry.ts | 114 +++++++++++--- src/stream-factory.ts | 16 +- src/tokenizer.ts | 12 +- src/warden-stream.ts | 2 +- test/request-manager.spec.ts | 61 ++++++-- test/retry.spec.ts | 287 +++++++++++++++++++++++++++++++++++ test/stream-factory.spec.ts | 31 ++-- test/tokenizer.spec.ts | 15 ++ 13 files changed, 540 insertions(+), 76 deletions(-) delete mode 100644 demo.ts create mode 100644 test/retry.spec.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 4779dbf..8520507 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [1.4.0] - 2019-05-08 +### Added +- Retry plugin for retrying failed responses +- Added generic identifiers +### Changed +- Identifiers are not required anymore + ## [1.3.0] - 2019-05-02 ### Added - Security for set-cookie header to prevent dangerous response caching with credentials. diff --git a/README.md b/README.md index e49e8ba..e509b8b 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Warden is an outgoing request optimizer for creating fast and scalable applicati - 🚧 **Request Holder** Stopping same request to be sent multiple times. ✅ - 🔌 **Support** Warden can be used with anything but it supports [request](https://github.com/request/request) out of the box. ✅ - 😎 **Easy Implementation** Warden can be easily implemented with a few lines of codes. ✅ -- 🔁 **Request Retry** Requests will automatically be re-attempted on recoverable errors. 📝 +- 🔁 **Request Retry** Requests will automatically be re-attempted on recoverable errors. ✅ - 📇 **Schema Parser** Warden uses a schema which can be provided by you for parsing JSON faster. 📝 - 🚥 **API Queue** Throttles API calls to protect target service. 📝 - 👻 **Request Shadowing** Copies a fraction of traffic to a new deployment for observation. 📝 @@ -29,6 +29,7 @@ Warden is an outgoing request optimizer for creating fast and scalable applicati - [Identifier](#identifier) - [Registering Route](#registering-route) - [Cache](#cache) +- [Retry](#retry) - [Holder](#holder) - [Api](#api) @@ -122,6 +123,8 @@ warden.register('test', { }); ``` +`identifier` is an optional field. If an identifier is not provided warden will be use generic identifier which is `${name}_${url}_${JSON.stringify({cookie, headers, query})}_${method}`. + ### Cache You can simply enable cache with default values using. @@ -176,7 +179,36 @@ Simple old school caching. Asks cache plugin if it has a valid cached response. ### Holder Holder prevents same HTTP requests to be sent at the same time. -Let's assume we have an identifier for a request: `{query.foo}`. We send a HTTP request `/product?foo=bar`. While waiting for the response, warden received another HTTP request to the same address which means both HTTP requests are converted to the same key. Then Warden stops the second request. After receiving the response from the first request, Warden returns both requests with the same response by sending only one HTTP request. +Let's assume we have an identifier for a request: `{query.foo}`. We send a HTTP request `/product?foo=bar`. While waiting for the response, warden received another HTTP request to the same address which means both HTTP requests are converted to the same key. Then Warden stops the second request. After receiving the response from the first request, Warden returns both requests with the same response by sending only one HTTP request. + +### Retry + +When the connection fails with one of ECONNRESET, ENOTFOUND, ESOCKETTIMEDOUT, ETIMEDOUT, ECONNREFUSED, EHOSTUNREACH, EPIPE, EAI_AGAIN or when an HTTP 5xx or 429 error occurrs, the request will automatically be re-attempted as these are often recoverable errors and will go away on retry. + +```js +warden.register('routeName', { + retry: { + delay: 100, + count: 1, + logger: (retryCount) => { + console.log(retryCount); + } + } +}); + +warden.register('routeName', { + retry: true // default settings +}); +``` + +Default values and properties + +| Property | Required | Default Value | Definition | +| :--- | :---: | ---: | :--- | +| delay | ❌ | 100 | Warden will wait for 100ms before retry | +| count | ❌ | 1 | It will try for 1 time by default | +| logger | ❌ | 1m | Logger will be called on each retry with retry count| + ### Api diff --git a/demo.ts b/demo.ts deleted file mode 100644 index 4712122..0000000 --- a/demo.ts +++ /dev/null @@ -1,9 +0,0 @@ -const request = require("request"); - - -request({ - url: 'https://m.trendyol.com', - timeout: 5 -}, (err: any, res: any, data: any) => { - console.log(err.connect); -}); \ No newline at end of file diff --git a/src/cache-factory.ts b/src/cache-factory.ts index 8a63a29..7a42c60 100644 --- a/src/cache-factory.ts +++ b/src/cache-factory.ts @@ -50,11 +50,10 @@ class CacheFactory { } getPlugin(plugin?: CACHE_PLUGIN): CachePlugin { - switch (plugin) { - case CACHE_PLUGIN.Memory: - return new MemoryCache(); - default: - return new MemoryCache(); + if (plugin === CACHE_PLUGIN.Memory) { + return new MemoryCache(); + } else { + return new MemoryCache(); } } diff --git a/src/request-manager.ts b/src/request-manager.ts index 4a5e35a..41cba7d 100644 --- a/src/request-manager.ts +++ b/src/request-manager.ts @@ -1,13 +1,14 @@ import {StreamHead} from "./stream-head"; import {KeyMaker, Tokenizer} from "./tokenizer"; +import * as request from "request"; import {RequestCallback} from "request"; import Url from "fast-url-parser"; -import {ConfigurableStream, StreamFactory, StreamType} from "./stream-factory"; +import {ConfigurableStream, StreamFactory} from "./stream-factory"; import {CacheConfiguration} from "./cache-factory"; import {WardenStream} from "./warden-stream"; import Cookie from "cookie"; -import {Network} from "./network"; -import * as request from "request"; +import {RetryInputConfiguration} from "./retry"; +import http from "http"; type KeyStreamPair = { keyMaker: KeyMaker; @@ -18,6 +19,8 @@ interface StreamMap { [routeName: string]: KeyStreamPair[]; } +const DEFAULT_IDENTIFIER = `u_{Date.now()}`; + interface RequestOptions extends request.CoreOptions { url: string; method: 'get' | 'post'; @@ -30,9 +33,10 @@ interface RequestOptions extends request.CoreOptions { interface RouteConfiguration { [key: string]: any; - identifier: string; + identifier?: string; cache?: CacheConfiguration | boolean; holder?: boolean; + retry?: RetryInputConfiguration | boolean | number; } class RequestManager { @@ -49,9 +53,9 @@ class RequestManager { } register(name: string, routeConfiguration: RouteConfiguration) { - const stream = this.streamFactory.create(StreamType.HEAD); + const stream = this.streamFactory.createHead(); let streamLink: WardenStream = stream; - const network = this.streamFactory.create(StreamType.NETWORK); + const network = this.streamFactory.createNetwork(); const keyMaker = this.tokenizer.tokenize(name, routeConfiguration.identifier); Object.values(ConfigurableStream).forEach((streamType: string) => { @@ -100,6 +104,7 @@ class RequestManager { } export { + DEFAULT_IDENTIFIER, RequestOptions, RouteConfiguration, RequestManager diff --git a/src/retry.ts b/src/retry.ts index 1305d2e..4b3364a 100644 --- a/src/retry.ts +++ b/src/retry.ts @@ -1,43 +1,113 @@ import {RequestChunk, ResponseChunk, WardenStream} from "./warden-stream"; import {TransformCallback} from "stream"; +import {StreamType} from "./stream-factory"; interface RetryDecoratedResponse extends ResponseChunk { - retryCount: number; + retryCount: number; } interface RetryDecoratedRequest extends RequestChunk { - retryCount: number; + retryCount: number; } +interface RetryConfiguration { + count: number; + delay: number; + logger?: (retry: number) => void; +} + +interface RetryInputConfiguration { + count?: number; + delay?: number; + logger?: (retry: number) => void; +} + +const DEFAULT_RETRY_CONFIGURATION = { + count: 1, + delay: 100, +}; + +const RETRYABLE_ERRORS = ['ECONNRESET', 'ENOTFOUND', 'ESOCKETTIMEDOUT', 'ETIMEDOUT', 'ECONNREFUSED', 'EHOSTUNREACH', 'EPIPE', 'EAI_AGAIN']; + class Retry extends WardenStream { - retryLimit: number; + private configuration: RetryConfiguration; + + constructor(configuration: RetryConfiguration) { + super(StreamType.RETRY); - constructor(retryLimit: number) { - super('Retry'); + this.configuration = configuration; + } - this.retryLimit = retryLimit; + static create(configuration: RetryInputConfiguration | true | number) { + if (typeof configuration === "boolean") { + return new Retry(DEFAULT_RETRY_CONFIGURATION); + } else if (typeof configuration === "number") { + return new Retry({ + count: configuration, + delay: 0 + }); + } else if (typeof configuration === "object") { + const count = configuration.count || 1; + const delay = configuration.delay || 0; + return new Retry({ + count, + delay, + logger: configuration.logger + }); + } else { + return new Retry(DEFAULT_RETRY_CONFIGURATION); } + } - async onResponse(chunk: RetryDecoratedResponse, callback: TransformCallback): Promise { - if (chunk.error && chunk.retryCount <= this.retryLimit) { - this.request({ - ...chunk, - retryCount: chunk.retryCount + 1 - }); - callback(undefined, null); - } else { - callback(undefined, chunk); - } + async onResponse(chunk: RetryDecoratedResponse, callback: TransformCallback): Promise { + if (chunk.retryCount <= this.configuration.count && this.shouldRetry(chunk)) { + if (this.configuration.delay > 0) { + setTimeout(() => { + this.retry(chunk); + }, this.configuration.delay); + } else { + this.retry(chunk); + } + callback(undefined, null); + } else { + callback(undefined, chunk); } + } - async onRequest(chunk: RetryDecoratedRequest, callback: TransformCallback): Promise { - callback(null, { - ...chunk, - retryCount: 0 - } as RequestChunk); + async onRequest(chunk: RetryDecoratedRequest, callback: TransformCallback): Promise { + callback(null, { + ...chunk, + retryCount: 0 + } as RequestChunk); + } + + private retry(chunk: RetryDecoratedResponse) { + if (this.configuration.logger) this.configuration.logger(chunk.retryCount + 1); + this.request({ + retryCount: chunk.retryCount + 1, + cb: chunk.cb, + key: chunk.key, + requestOptions: chunk.requestOptions + }); + } + + private shouldRetry(chunk: RetryDecoratedResponse) { + const statusCode = chunk.response ? chunk.response.statusCode : null; + + if (statusCode && (statusCode === 429 || (500 <= statusCode && statusCode < 600))) { + return true; } + + if (chunk.error) { + return RETRYABLE_ERRORS.includes(chunk.error.code); + } + + return false; + } } export { - Retry + RetryInputConfiguration, + RetryConfiguration, + Retry }; \ No newline at end of file diff --git a/src/stream-factory.ts b/src/stream-factory.ts index 9242e8e..fe85227 100644 --- a/src/stream-factory.ts +++ b/src/stream-factory.ts @@ -34,7 +34,7 @@ class StreamFactory { this.requestWrapper = requestWrapper; } - create(streamType: string, configuration?: T) { + create(streamType: string, configuration: T) { switch (streamType) { case StreamType.CACHE: return this.cacheFactory.create(configuration as T) as unknown as U; @@ -42,16 +42,20 @@ class StreamFactory { return new Holder() as unknown as U; // case StreamType.CIRCUIT: // throw new Error('Not implemented'); - case StreamType.NETWORK: - return new Network(this.requestWrapper) as unknown as U; case StreamType.RETRY: - return new Retry(3) as unknown as U; - case StreamType.HEAD: - return new StreamHead() as unknown as U; + return Retry.create(configuration) as unknown as U; default: throw new Error('Unknown stream type'); } } + + createNetwork() { + return new Network(this.requestWrapper); + } + + createHead() { + return new StreamHead(); + } } export { diff --git a/src/tokenizer.ts b/src/tokenizer.ts index 93be700..be18f23 100644 --- a/src/tokenizer.ts +++ b/src/tokenizer.ts @@ -10,7 +10,11 @@ type KeyMaker = ( class Tokenizer { - tokenize(name: string, identifier: string): KeyMaker { + tokenize(name: string, identifier?: string): KeyMaker { + return identifier ? this.generateCustomIdentifier(name, identifier) : this.createGenericIdentifier(name); + } + + private generateCustomIdentifier(name: string, identifier: string) { const reversedIdentifier = reverseString(identifier); const interpolationsAdded = reversedIdentifier .replace(/{(?!\\)/g, `{$`) @@ -23,6 +27,12 @@ class Tokenizer { return fn(); } + + private createGenericIdentifier(name: string): KeyMaker { + return (url, cookie, headers, query, method) => { + return `${name}_${url}_${JSON.stringify({cookie, headers, query})}_${method}`; + }; + } } export { diff --git a/src/warden-stream.ts b/src/warden-stream.ts index e1800cd..74dbdaf 100644 --- a/src/warden-stream.ts +++ b/src/warden-stream.ts @@ -14,7 +14,7 @@ interface RequestChunk { interface ResponseChunk extends RequestChunk{ response?: request.Response; error?: { - name: string + code: string }; } diff --git a/test/request-manager.spec.ts b/test/request-manager.spec.ts index 4faa03c..e40dc29 100644 --- a/test/request-manager.spec.ts +++ b/test/request-manager.spec.ts @@ -2,7 +2,7 @@ import "reflect-metadata"; import {expect} from "chai"; import sinon, {SinonMock} from "sinon"; -import {RequestManager, RequestOptions, RouteConfiguration} from "../src/request-manager"; +import {DEFAULT_IDENTIFIER, RequestManager, RequestOptions, RouteConfiguration} from "../src/request-manager"; import {CacheFactory} from "../src/cache-factory"; import {StreamFactory, StreamType} from "../src/stream-factory"; import {Tokenizer} from "../src/tokenizer"; @@ -56,8 +56,8 @@ describe("[request-manager]", () => { connect: sandbox.stub().returnsArg(0) }; const keyMaker = sandbox.stub(); - streamFactoryMock.expects('create').withExactArgs(StreamType.HEAD).returns(headStream); - streamFactoryMock.expects('create').withExactArgs(StreamType.NETWORK).returns(networkStream); + streamFactoryMock.expects('createHead').withExactArgs().returns(headStream); + streamFactoryMock.expects('createNetwork').withExactArgs().returns(networkStream); tokenizerMock.expects('tokenize').withExactArgs(name, routeConfiguration.identifier).returns(keyMaker); // Act @@ -67,6 +67,37 @@ describe("[request-manager]", () => { expect(headStream.connect.calledWithExactly(networkStream)).to.eq(true); }); + it("should register new route without any identifier", () => { + // Arrange + const name = faker.random.word(); + const routeConfiguration = {} as RouteConfiguration; + const headStream = { + connect: sandbox.stub().returnsArg(0) + }; + const networkStream = { + connect: sandbox.stub().returnsArg(0) + }; + const keyMaker = sandbox.stub(); + streamFactoryMock + .expects('createHead') + .withExactArgs() + .returns(headStream); + streamFactoryMock + .expects('createNetwork') + .withExactArgs() + .returns(networkStream); + tokenizerMock + .expects('tokenize') + .withExactArgs(name, undefined) + .returns(keyMaker); + + // Act + requestManager.register(name, routeConfiguration); + + // Assert + expect(headStream.connect.calledWithExactly(networkStream)).to.eq(true); + }); + it("should register new route without any plugin with boolean false", () => { // Arrange const name = faker.random.word(); @@ -82,8 +113,8 @@ describe("[request-manager]", () => { connect: sandbox.stub().returnsArg(0) }; const keyMaker = sandbox.stub(); - streamFactoryMock.expects('create').withExactArgs(StreamType.HEAD).returns(headStream); - streamFactoryMock.expects('create').withExactArgs(StreamType.NETWORK).returns(networkStream); + streamFactoryMock.expects('createHead').withExactArgs().returns(headStream); + streamFactoryMock.expects('createNetwork').withExactArgs().returns(networkStream); tokenizerMock.expects('tokenize').withExactArgs(name, routeConfiguration.identifier).returns(keyMaker); // Act @@ -110,8 +141,8 @@ describe("[request-manager]", () => { connect: sandbox.stub().returnsArg(0) }; const keyMaker = sandbox.stub(); - streamFactoryMock.expects('create').withExactArgs(StreamType.HEAD).returns(headStream); - streamFactoryMock.expects('create').withExactArgs(StreamType.NETWORK).returns(networkStream); + streamFactoryMock.expects('createHead').withExactArgs().returns(headStream); + streamFactoryMock.expects('createNetwork').withExactArgs().returns(networkStream); streamFactoryMock.expects('create').withExactArgs(StreamType.CACHE, routeConfiguration.cache).returns(cacheStream); tokenizerMock.expects('tokenize').withExactArgs(name, routeConfiguration.identifier).returns(keyMaker); @@ -143,8 +174,8 @@ describe("[request-manager]", () => { }; const key = faker.random.word(); const keyMaker = sandbox.stub().returns(key); - streamFactoryMock.expects('create').withExactArgs(StreamType.HEAD).returns(headStream); - streamFactoryMock.expects('create').withExactArgs(StreamType.NETWORK).returns(networkStream); + streamFactoryMock.expects('createHead').withExactArgs().returns(headStream); + streamFactoryMock.expects('createNetwork').withExactArgs().returns(networkStream); tokenizerMock.expects('tokenize').withExactArgs(name, routeConfiguration.identifier).returns(keyMaker); const stub = sandbox.stub(); @@ -176,8 +207,8 @@ describe("[request-manager]", () => { }; const key = faker.random.word(); const keyMaker = sandbox.stub().returns(key); - streamFactoryMock.expects('create').withExactArgs(StreamType.HEAD).returns(headStream); - streamFactoryMock.expects('create').withExactArgs(StreamType.NETWORK).returns(networkStream); + streamFactoryMock.expects('createHead').withExactArgs().returns(headStream); + streamFactoryMock.expects('createNetwork').withExactArgs().returns(networkStream); tokenizerMock.expects('tokenize').withExactArgs(name, routeConfiguration.identifier).returns(keyMaker); const stub = sandbox.stub(); @@ -212,8 +243,8 @@ describe("[request-manager]", () => { connect: sandbox.stub().returnsArg(0) }; const keyMaker = sandbox.stub(); - streamFactoryMock.expects('create').withExactArgs(StreamType.HEAD).returns(headStream); - streamFactoryMock.expects('create').withExactArgs(StreamType.NETWORK).returns(networkStream); + streamFactoryMock.expects('createHead').withExactArgs().returns(headStream); + streamFactoryMock.expects('createNetwork').withExactArgs().returns(networkStream); tokenizerMock.expects('tokenize').withExactArgs(name, routeConfiguration.identifier).returns(keyMaker); // Act @@ -238,8 +269,8 @@ describe("[request-manager]", () => { connect: sandbox.stub().returnsArg(0) }; const keyMaker = sandbox.stub(); - streamFactoryMock.expects('create').withExactArgs(StreamType.HEAD).returns(headStream); - streamFactoryMock.expects('create').withExactArgs(StreamType.NETWORK).returns(networkStream); + streamFactoryMock.expects('createHead').withExactArgs().returns(headStream); + streamFactoryMock.expects('createNetwork').withExactArgs().returns(networkStream); tokenizerMock.expects('tokenize').withExactArgs(name, routeConfiguration.identifier).returns(keyMaker); // Act diff --git a/test/retry.spec.ts b/test/retry.spec.ts new file mode 100644 index 0000000..89f7f26 --- /dev/null +++ b/test/retry.spec.ts @@ -0,0 +1,287 @@ +import * as sinon from "sinon"; +import * as faker from "faker"; +import {expect} from "chai"; +import {Retry, RetryConfiguration} from "../src/retry"; +import {SinonStub} from "sinon"; + +const sandbox = sinon.createSandbox(); +let retryStream: Retry; + +const defaultConfiguration = { + count: 2, + delay: 100 +}; + +let clock: sinon.SinonFakeTimers; + +describe('[retry.ts]', () => { + beforeEach(() => { + clock = sandbox.useFakeTimers(); + retryStream = new Retry(defaultConfiguration); + }); + + afterEach(() => { + clock.restore(); + sandbox.verifyAndRestore(); + }); + + it('should create new Retry', () => { + // Arrange + const retryStream = new Retry(defaultConfiguration); + + // Assert + expect(retryStream).to.be.instanceOf(Retry); + }); + + it('should create new Retry from number', () => { + // Arrange + const retryConfiguration = 100; + + // Act + const retry = Retry.create(retryConfiguration); + + // Assert + expect(retry).to.be.instanceOf(Retry); + }); + + it('should create new Retry from boolean', () => { + // Arrange + const retryConfiguration = true; + + // Act + const retry = Retry.create(retryConfiguration); + + // Assert + expect(retry).to.be.instanceOf(Retry); + }); + + it('should create new Retry from configuration', () => { + // Arrange + const retryConfiguration = { + count: 2, + delay: 100 + }; + + // Act + const retry = Retry.create(retryConfiguration); + + // Assert + expect(retry).to.be.instanceOf(Retry); + }); + + it('should create new Retry from configuration with default delay', () => { + // Arrange + const retryConfiguration = { + count: 2 + }; + + // Act + const retry = Retry.create(retryConfiguration); + + // Assert + expect(retry).to.be.instanceOf(Retry); + }); + + it('should create new Retry from configuration with default count', () => { + // Arrange + const retryConfiguration = { + delay: 100 + }; + + // Act + const retry = Retry.create(retryConfiguration); + + // Assert + expect(retry).to.be.instanceOf(Retry); + }); + + it('should create new Retry when provided configuration is not valid', () => { + // Arrange + const retryConfiguration = "" as unknown as RetryConfiguration; + + // Act + const retry = Retry.create(retryConfiguration); + + // Assert + expect(retry).to.be.instanceOf(Retry); + }); + + it('should and retry count to outgoing request for the first time', async () => { + // Arrange + const chunk = {}; + const spy = sandbox.stub(); + + // Act + await retryStream.onRequest(chunk as any, spy); + + // Assert + expect(spy.calledWith(null, { + ...chunk, + retryCount: 0 + })).to.eq(true); + }); + + it('should not retry incoming failed response if it is not error', async () => { + // Arrange + const chunk = { + retryCount: 0 + } as any; + const spy = sandbox.stub(); + const retry = new Retry({ + count: 1, + delay: 0 + }); + const request = sandbox.stub(retry, 'request'); + + // Act + await retry.onResponse(chunk, spy); + + // Assert + expect(request.notCalled).to.eq(true); + expect(spy.calledWithExactly(undefined, chunk)).to.eq(true); + }); + + it('should retry incoming failed response if status code is 429 (too many requests)', async () => { + // Arrange + const chunk = { + retryCount: 0, + response: { + statusCode: 429 + } + } as any; + const spy = sandbox.stub(); + const retry = new Retry({ + count: 1, + delay: 0 + }); + const request = sandbox.stub(retry, 'request') as SinonStub; + + // Act + await retry.onResponse(chunk, spy); + + // Assert + expect(request.calledWith({ + retryCount: 1, + cb: chunk.cb, + key: chunk.key, + requestOptions: chunk.requestOptions + })).to.eq(true); + expect(spy.calledWithExactly(undefined, null)).to.eq(true); + }); + + it('should retry incoming failed response if status code is 500', async () => { + // Arrange + const chunk = { + retryCount: 0, + response: { + statusCode: 500 + } + } as any; + const spy = sandbox.stub(); + const retry = new Retry({ + count: 1, + delay: 0 + }); + const request = sandbox.stub(retry, 'request') as SinonStub; + + // Act + await retry.onResponse(chunk, spy); + + // Assert + expect(request.calledWith({ + retryCount: 1, + cb: chunk.cb, + key: chunk.key, + requestOptions: chunk.requestOptions + })).to.eq(true); + expect(spy.calledWithExactly(undefined, null)).to.eq(true); + }); + + it('should retry incoming failed response if the request errored with one of specific error types', async () => { + // Arrange + const chunk = { + error: { + code: 'ECONNRESET' + }, + retryCount: 0 + } as any; + const spy = sandbox.stub(); + const retry = new Retry({ + count: 1, + delay: 0 + }); + const request = sandbox.stub(retry, 'request') as SinonStub; + + // Act + await retry.onResponse(chunk, spy); + + // Assert + expect(request.calledWith({ + retryCount: 1, + cb: chunk.cb, + key: chunk.key, + requestOptions: chunk.requestOptions + })).to.eq(true); + expect(spy.calledWithExactly(undefined, null)).to.eq(true); + }); + + it('should call logger function if it is provided', async () => { + // Arrange + const chunk = { + error: { + code: 'ECONNRESET' + }, + retryCount: 0 + } as any; + const spy = sandbox.stub(); + const logger = sandbox.stub(); + const retry = new Retry({ + count: 1, + delay: 0, + logger + }); + const request = sandbox.stub(retry, 'request') as SinonStub; + + // Act + await retry.onResponse(chunk, spy); + + // Assert + expect(request.calledWith({ + retryCount: 1, + cb: chunk.cb, + key: chunk.key, + requestOptions: chunk.requestOptions + })).to.eq(true); + expect(logger.calledWithExactly(1)).to.eq(true); + expect(spy.calledWithExactly(undefined, null)).to.eq(true); + }); + + it('should retry incoming failed response if the request errored with delay', async () => { + // Arrange + const chunk = { + error: { + code: 'ECONNRESET' + }, + retryCount: 0 + } as any; + const spy = sandbox.stub(); + const retry = new Retry({ + count: 1, + delay: 100 + }); + const request = sandbox.stub(retry, 'request') as SinonStub; + + // Act + await retry.onResponse(chunk, spy); + clock.tick(110); + + // Assert + expect(request.calledWith({ + retryCount: 1, + cb: chunk.cb, + key: chunk.key, + requestOptions: chunk.requestOptions + })).to.eq(true); + expect(spy.calledWithExactly(undefined, null)).to.eq(true); + }); +}); \ No newline at end of file diff --git a/test/stream-factory.spec.ts b/test/stream-factory.spec.ts index 48440c9..7bf4fd7 100644 --- a/test/stream-factory.spec.ts +++ b/test/stream-factory.spec.ts @@ -6,9 +6,10 @@ import {StreamFactory, StreamType} from "../src/stream-factory"; import {Cache, CacheFactory} from "../src/cache-factory"; import {RequestWrapper} from "../src/request-wrapper"; import {Holder} from "../src/holder"; +import faker from "faker"; import {StreamHead} from "../src/stream-head"; import {Network} from "../src/network"; -import faker from "faker"; +import {Retry} from "../src/retry"; const sandbox = sinon.createSandbox(); @@ -43,9 +44,10 @@ describe("[stream-factory.ts]", () => { it("should return new cache instance", () => { // Act + const configuration = {}; const cacheResponse = {}; cacheFactoryMock.expects('create').returns(cacheResponse); - const cache = streamFactory.create(StreamType.CACHE); + const cache = streamFactory.create(StreamType.CACHE, configuration); // Assert expect(cache).to.eq(cacheResponse); @@ -53,33 +55,44 @@ describe("[stream-factory.ts]", () => { it("should return new Holder instance", () => { // Act - const holder = streamFactory.create(StreamType.HOLDER); + const configuration = {}; + const holder = streamFactory.create(StreamType.HOLDER, configuration); // Assert expect(holder).to.be.instanceOf(Holder); }); - it("should return new StreamHead type", () => { + it("should return new Retry instance", () => { + // Act + const configuration = {}; + const retry = streamFactory.create(StreamType.RETRY, configuration); + + // Assert + expect(retry).to.be.instanceOf(Retry); + }); + + it("should return new Head instance", () => { // Act - const head = streamFactory.create(StreamType.HEAD); + const head = streamFactory.createHead(); // Assert expect(head).to.be.instanceOf(StreamHead); }); - it("should return new Network", () => { + it("should return new Network instance", () => { // Act - const network = streamFactory.create(StreamType.NETWORK); + const network = streamFactory.createNetwork(); // Assert expect(network).to.be.instanceOf(Network); }); - it("should throw error when request stream type is unkonwn", () => { + it("should throw error when request stream type is unknown", () => { // Arrange + const configuration = {}; const test = () => { - streamFactory.create(faker.random.word()); + streamFactory.create(faker.random.word(), configuration); }; // Assert diff --git a/test/tokenizer.spec.ts b/test/tokenizer.spec.ts index cdb37f8..e469d0d 100644 --- a/test/tokenizer.spec.ts +++ b/test/tokenizer.spec.ts @@ -91,4 +91,19 @@ describe("[tokenizer.ts]", () => { expect(keyMaker).to.be.a('function'); expect(key).to.eq('_name___special_c_{escaped}_a_c_/he_key_e'); }); + + it('should create generic identifier', () => { + // Arrange + const tokenizer = new Tokenizer(); + const name = '_name_'; + const method = 'get'; + + // Act + const keyMaker = tokenizer.tokenize(name); + const key = keyMaker('/he',{test:'c'},{test:'a'},{test:'c'}, method); + + // Assert + expect(keyMaker).to.be.a('function'); + expect(key).to.eq('_name__/he_{"cookie":{"test":"c"},"headers":{"test":"a"},"query":{"test":"c"}}_get'); + }); });