From 6e6e896fbf1e9fda393d91fdd02db9812f90493d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Can=20G=C3=BCven?= Date: Mon, 6 May 2019 16:49:57 +0300 Subject: [PATCH] Implemented retry stream --- README.md | 24 ++++---- demo.ts | 9 +++ demo/demo-starter.ts | 99 --------------------------------- jest.config.js | 4 +- package.json | 2 +- src/cache-then-network.ts | 76 +++++++++++++------------ src/holder.ts | 8 +-- src/network.ts | 3 +- src/retry.ts | 43 ++++++++++++++ src/stream-factory.ts | 9 ++- src/warden-stream.ts | 14 ++--- test/cache-then-network.spec.ts | 2 +- test/holder.spec.ts | 8 ++- test/network.spec.ts | 3 +- 14 files changed, 133 insertions(+), 171 deletions(-) create mode 100644 demo.ts delete mode 100644 demo/demo-starter.ts create mode 100644 src/retry.ts diff --git a/README.md b/README.md index 0403e6e..e49e8ba 100644 --- a/README.md +++ b/README.md @@ -10,16 +10,16 @@ Warden is an outgoing request optimizer for creating fast and scalable applicati [![Codacy](https://api.codacy.com/project/badge/Grade/e806d72373414fd9818ab2a403f1b36d)](https://www.codacy.com/app/Acanguven/puzzle-warden?utm_source=github.com&utm_medium=referral&utm_content=puzzle-js/puzzle-warden&utm_campaign=Badge_Grade) ## Features -- 📥 **Smart Caching** Caches requests by converting HTTP requests to smart key strings. ✅ -- 🚧 **Request Holder** Stopping same request to be sent multiple times. ✅ -- 🔌 **Support** Warden can be used with anything but it supports [request](https://github.com/request/request) out of the box. ✅ -- 😎 **Easy Implementation** Warden can be easily implemented with a few lines of codes. ✅ -- 🔁 **Request Retry** Requests will automatically be re-attempted on recoverable errors. 📝 -- 📇 **Schema Parser** Warden uses a schema which can be provided by you for parsing JSON faster. 📝 -- 🚥 **API Queue** Throttles API calls to protect target service. 📝 -- 👻 **Request Shadowing** Copies a fraction of traffic to a new deployment for observation. 📝 -- 🚉 **Reverse Proxy** It can be deployable as an external application which can serve as a reverse proxy. 📝 -- 📛 **Circuit Breaker** Immediately refuses new requests to provide time for the API to become healthy. 📝 +- 📥 **Smart Caching** Caches requests by converting HTTP requests to smart key strings. ✅ +- 🚧 **Request Holder** Stopping same request to be sent multiple times. ✅ +- 🔌 **Support** Warden can be used with anything but it supports [request](https://github.com/request/request) out of the box. ✅ +- 😎 **Easy Implementation** Warden can be easily implemented with a few lines of codes. ✅ +- 🔁 **Request Retry** Requests will automatically be re-attempted on recoverable errors. 📝 +- 📇 **Schema Parser** Warden uses a schema which can be provided by you for parsing JSON faster. 📝 +- 🚥 **API Queue** Throttles API calls to protect target service. 📝 +- 👻 **Request Shadowing** Copies a fraction of traffic to a new deployment for observation. 📝 +- 🚉 **Reverse Proxy** It can be deployable as an external application which can serve as a reverse proxy. 📝 +- 📛 **Circuit Breaker** Immediately refuses new requests to provide time for the API to become healthy. 📝 ![Warden Achitecture](./warden_architecture.svg) @@ -35,11 +35,11 @@ Warden is an outgoing request optimizer for creating fast and scalable applicati ### Installing Yarn -``` +```bash yarn add puzzle-warden ``` Npm -``` +```bash npm i puzzle-warden --save ``` diff --git a/demo.ts b/demo.ts new file mode 100644 index 0000000..4712122 --- /dev/null +++ b/demo.ts @@ -0,0 +1,9 @@ +const request = require("request"); + + +request({ + url: 'https://m.trendyol.com', + timeout: 5 +}, (err: any, res: any, data: any) => { + console.log(err.connect); +}); \ No newline at end of file diff --git a/demo/demo-starter.ts b/demo/demo-starter.ts deleted file mode 100644 index 6381d4f..0000000 --- a/demo/demo-starter.ts +++ /dev/null @@ -1,99 +0,0 @@ -import {Warden} from "../src/warden"; -import {CacheFactory} from "../src/cache-factory"; -import {RequestManager} from "../src/request-manager"; -import {Tokenizer} from "../src/tokenizer"; -import {StreamFactory} from "../src/stream-factory"; -import {RequestWrapper} from "../src/request-wrapper"; - -const request = require('request'); - -const cacheFactory = new CacheFactory(); - -const tokenizer = new Tokenizer(); -const requestWrapper = new RequestWrapper(); -const streamFactory = new StreamFactory(cacheFactory, requestWrapper); -const requestManager = new RequestManager(streamFactory, tokenizer); -const warden = new Warden(requestManager, requestWrapper); - - -warden.register('test', { - identifier: 'ty_{query.foo2}_{cookie.osman}', - cache: true, - holder: true -}); - -import https from "https"; -const agent = new https.Agent({ - keepAlive: true, - keepAliveMsecs: 1000, - maxSockets: 50 -}); -let input = 0; -let output = 0; -let failedToPush = 0; -let stop = false; - -// setTimeout(() => { -// stop = true; -// console.log(`${output}/${failedToPush}/${input}`); -// console.log(`${(output / 100).toFixed(2)} rps`); -// }, 30000); -let errorCount = 0; -let responseCount = 0; -const startTime = Date.now(); -const requestCount = 6000; -const newRequest = () => { - input++; - // - // warden.request('test', { - // url: `https://postman-echo.com/get?foo1=${Math.random().toFixed(2)}&foo2=${Math.random().toFixed(2)}`, - // headers: { - // cookie: `osman=${Math.random().toFixed(1)}` - // }, - // gzip: true, - // json: true, - // method: "get", - // agent, - // strictSSL: false, - // timeout: 1000 - // }, (err, response, data) => { - // if (!err && data) { - // responseCount++; - // } else { - // errorCount++; - // } - // output++; - // - // if(output >= requestCount){ - // console.log(Date.now() - startTime); - // } - // }); - // - request({ - url: `https://postman-echo.com/get?foo1=${Math.random().toFixed(2)}&foo2=${Math.random().toFixed(2)}`, - gzip: true, - json: true, - timeout: 1000, - headers: { - cookie: `osman=${Math.random().toFixed(1)}` - }, - agent, - strictSSL: false, - method: "get", - }, (err: any, res: any, data: any) => { - if (!err && data) { - responseCount++; - } else { - errorCount++; - } - output++; - - if(output >= requestCount){ - console.log(Date.now() - startTime); - } - }); - if (input <= requestCount) newRequest(); -}; - -newRequest(); -setTimeout(() => {}, 100000); diff --git a/jest.config.js b/jest.config.js index 95622d4..0dba61f 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,6 +1,6 @@ module.exports = { - preset: 'ts-jest', - testEnvironment: 'node', + preset: "ts-jest", + testEnvironment: "node", collectCoverageFrom: [ "src/**/*.ts", "!src/**/*.d.ts", diff --git a/package.json b/package.json index a776a84..adba0f9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "puzzle-warden", - "version": "1.3.0", + "version": "1.4.0", "main": "dist/index.js", "types": "dist/index.d.ts", "license": "MIT", diff --git a/src/cache-then-network.ts b/src/cache-then-network.ts index 7ef6ee5..b749dc3 100644 --- a/src/cache-then-network.ts +++ b/src/cache-then-network.ts @@ -4,47 +4,53 @@ import {StreamType} from "./stream-factory"; import {CachePlugin} from "./cache-factory"; import request from "request"; +interface CacheDecoratedResponse extends ResponseChunk { + cacheHit: boolean; +} + +interface CacheDecoratedRequest extends RequestChunk { + cacheHit: boolean; +} class CacheThenNetwork extends WardenStream { - private readonly storage: CachePlugin; - private readonly ms?: number; - - constructor(plugin: CachePlugin, ms?: number) { - super(StreamType.CACHE); - - this.ms = ms; - this.storage = plugin; - } - - async onResponse(chunk: ResponseChunk, callback: TransformCallback): Promise { - if (!chunk.cacheHit && !chunk.error && chunk.response) { - if(chunk.response.headers["set-cookie"]){ - console.warn('Detected dangerous response with set-cookie header, not caching', chunk.key); - }else{ - await this.storage.set(chunk.key, chunk.response, this.ms); - } + private readonly storage: CachePlugin; + private readonly ms?: number; + + constructor(plugin: CachePlugin, ms?: number) { + super(StreamType.CACHE); + + this.ms = ms; + this.storage = plugin; + } + + async onResponse(chunk: CacheDecoratedResponse, callback: TransformCallback): Promise { + if (!chunk.cacheHit && !chunk.error && chunk.response) { + if (chunk.response.headers["set-cookie"]) { + console.warn('Detected dangerous response with set-cookie header, not caching', chunk.key); + } else { + await this.storage.set(chunk.key, chunk.response, this.ms); + } + } + + callback(undefined, chunk); } - callback(undefined, chunk); - } - - async onRequest(chunk: RequestChunk, callback: TransformCallback): Promise { - const cachedData = await this.storage.get(chunk.key) as request.Response; - - if (cachedData) { - this.respond({ - key: chunk.key, - cb: chunk.cb, - response: cachedData, - cacheHit: true - }); - callback(undefined, null); - } else { - callback(undefined, chunk); + async onRequest(chunk: CacheDecoratedRequest, callback: TransformCallback): Promise { + const cachedData = await this.storage.get(chunk.key) as request.Response; + + if (cachedData) { + this.respond({ + ...chunk, + response: cachedData, + cacheHit: true + }); + callback(undefined, null); + } else { + callback(undefined, chunk); + } } - } } export { - CacheThenNetwork + CacheThenNetwork }; diff --git a/src/holder.ts b/src/holder.ts index 8f4f998..14d9b14 100644 --- a/src/holder.ts +++ b/src/holder.ts @@ -2,9 +2,6 @@ import {RequestChunk, ResponseChunk, WardenStream} from "./warden-stream"; import {TransformCallback} from "stream"; import {StreamType} from "./stream-factory"; -interface HolderConfiguration { - -} class Holder extends WardenStream { private holdQueue: { [key: string]: RequestChunk[] | null } = {}; @@ -20,10 +17,8 @@ class Holder extends WardenStream { if (holdQueue) { holdQueue.forEach(holdChunk => { this.respond({ + ...chunk, cb: holdChunk.cb, - key: chunk.key, - response: chunk.response, - error: chunk.error }); }); @@ -48,6 +43,5 @@ class Holder extends WardenStream { } export { - HolderConfiguration, Holder }; diff --git a/src/network.ts b/src/network.ts index 3494eac..5bd3dee 100644 --- a/src/network.ts +++ b/src/network.ts @@ -21,8 +21,7 @@ class Network extends WardenStream { onRequest(chunk: RequestChunk, callback: TransformCallback): void { this.requestWrapper.request[chunk.requestOptions.method](chunk.requestOptions, (error, response) => { this.respond({ - key: chunk.key, - cb: chunk.cb, + ...chunk, response, error }); diff --git a/src/retry.ts b/src/retry.ts new file mode 100644 index 0000000..1305d2e --- /dev/null +++ b/src/retry.ts @@ -0,0 +1,43 @@ +import {RequestChunk, ResponseChunk, WardenStream} from "./warden-stream"; +import {TransformCallback} from "stream"; + +interface RetryDecoratedResponse extends ResponseChunk { + retryCount: number; +} + +interface RetryDecoratedRequest extends RequestChunk { + retryCount: number; +} + +class Retry extends WardenStream { + retryLimit: number; + + constructor(retryLimit: number) { + super('Retry'); + + this.retryLimit = retryLimit; + } + + async onResponse(chunk: RetryDecoratedResponse, callback: TransformCallback): Promise { + if (chunk.error && chunk.retryCount <= this.retryLimit) { + this.request({ + ...chunk, + retryCount: chunk.retryCount + 1 + }); + callback(undefined, null); + } else { + callback(undefined, chunk); + } + } + + async onRequest(chunk: RetryDecoratedRequest, callback: TransformCallback): Promise { + callback(null, { + ...chunk, + retryCount: 0 + } as RequestChunk); + } +} + +export { + Retry +}; \ No newline at end of file diff --git a/src/stream-factory.ts b/src/stream-factory.ts index 569ba35..9242e8e 100644 --- a/src/stream-factory.ts +++ b/src/stream-factory.ts @@ -3,6 +3,7 @@ import {CacheFactory} from "./cache-factory"; import {StreamHead} from "./stream-head"; import {Holder} from "./holder"; import {RequestWrapper} from "./request-wrapper"; +import {Retry} from "./retry"; const enum StreamType { HOLDER = 'holder', @@ -10,12 +11,14 @@ const enum StreamType { NETWORK = 'network', QUEUE = 'queue', CIRCUIT = 'circuit', - HEAD = 'head' + HEAD = 'head', + RETRY = 'retry' } enum ConfigurableStream { HOLDER = StreamType.HOLDER, - CACHE = StreamType.CACHE + CACHE = StreamType.CACHE, + RETRY = StreamType.RETRY } @@ -41,6 +44,8 @@ class StreamFactory { // throw new Error('Not implemented'); case StreamType.NETWORK: return new Network(this.requestWrapper) as unknown as U; + case StreamType.RETRY: + return new Retry(3) as unknown as U; case StreamType.HEAD: return new StreamHead() as unknown as U; default: diff --git a/src/warden-stream.ts b/src/warden-stream.ts index 217e556..e1800cd 100644 --- a/src/warden-stream.ts +++ b/src/warden-stream.ts @@ -4,18 +4,18 @@ import {RequestCallback} from "request"; import {RequestOptions} from "./request-manager"; import request from "request"; + interface RequestChunk { key: string; requestOptions: RequestOptions; cb: RequestCallback; } -interface ResponseChunk { - cb: RequestCallback; - key: string; +interface ResponseChunk extends RequestChunk{ response?: request.Response; - error?: any; - cacheHit?: true; + error?: { + name: string + }; } interface WardenStreamer { @@ -112,11 +112,11 @@ abstract class WardenStream implements WardenStreamer { return wardenStream; } - respond(chunk: ResponseChunk): boolean { + respond(chunk: T): boolean { return this.responseStream.push(chunk); } - request(chunk: RequestChunk): boolean { + request(chunk: T): boolean { return this.requestStream.push(chunk); } diff --git a/test/cache-then-network.spec.ts b/test/cache-then-network.spec.ts index de12c5b..c5a679e 100644 --- a/test/cache-then-network.spec.ts +++ b/test/cache-then-network.spec.ts @@ -74,7 +74,7 @@ describe("[cache.ts]", () => { cb: chunk.cb, response: response as any, cacheHit: true - })).to.eq(true); + } as any)).to.eq(true); }); it("should return incoming response without caching if cache hit flag true", async () => { diff --git a/test/holder.spec.ts b/test/holder.spec.ts index 6b56d25..8d21b6c 100644 --- a/test/holder.spec.ts +++ b/test/holder.spec.ts @@ -80,10 +80,13 @@ describe("[holder.ts]", () => { const responseChunk = { key, response: {}, + requestOptions: {}, + error: undefined } as any; const requestChunk = { key, - cb: sandbox.stub() + cb: sandbox.stub(), + requestOptions: {} } as any; const requestSpy = sandbox.stub(); const respondSpy = sandbox.stub(holder, 'respond'); @@ -98,7 +101,8 @@ describe("[holder.ts]", () => { key: responseChunk.key, response: responseChunk.response, cb: requestChunk.cb, - error: undefined + error: undefined, + requestOptions: responseChunk.requestOptions })).to.eq(true); expect(spy.calledOnce).to.eq(true); expect(spy.calledWithExactly(undefined, null)).to.eq(true); diff --git a/test/network.spec.ts b/test/network.spec.ts index 7c45fdf..f15bcf7 100644 --- a/test/network.spec.ts +++ b/test/network.spec.ts @@ -74,7 +74,8 @@ describe("[network.ts]", () => { key: chunk.key, cb: chunk.cb, response: response as unknown as request.Response, - error: null + error: null as any, + requestOptions: chunk.requestOptions as any })).to.eq(true); });