diff --git a/src/SDK.ts b/src/SDK.ts index e595dbb22..c70d2fe92 100644 --- a/src/SDK.ts +++ b/src/SDK.ts @@ -10,6 +10,7 @@ import { SchemaColl, Variables, GraphQLRequest, GraphQLResponse, GraphQLClientOption, GraphQLQuery, GraphQLVariables, GraphQLResult, } from './utils/internalTypes' +import { SDKAsyncJob } from './SDKAsyncJob'; export const schemas: SchemaColl = [] @@ -22,6 +23,7 @@ export class SDK { net = new Net(this.schemas) fetch = new SDKFetch() + asyncJob = new SDKAsyncJob(this.fetch) socketClient: socket.Client database: Database | undefined diff --git a/src/SDKAsyncJob.ts b/src/SDKAsyncJob.ts new file mode 100644 index 000000000..af1bfee8c --- /dev/null +++ b/src/SDKAsyncJob.ts @@ -0,0 +1,260 @@ +import { Observable } from 'rxjs/Observable' +import { of } from 'rxjs/observable/of' +import { timer } from 'rxjs/observable/timer' +import { _throw } from 'rxjs/observable/throw' + +import { SDKFetch, SDKFetchOptions } from './SDKFetch' +import { appendQueryString, toQueryString } from './utils' + +interface PollingOptions { + /** + * 最大轮询数 + */ + maxTimes?: number + /** + * 轮询间隔 + */ + interval?: number + /** + * 最大轮询间隔 + */ + maxInterval?: number + /** + * 轮询间隔步长 + */ + intervalSteps?: number +} + +export interface AsyncJobOptions extends AsyncJobCallbacks { + timeoutSeconds?: number // 客户端愿意等待的时间 + readySid?: string // consumerId + pollingOptions?: PollingOptions +} + +interface AsyncJobCallbacks { + onPending?: () => void + onFulfilled?: (res: T) => void + onRejected?: (e: Error) => void +} + +type Options = SDKFetchOptions & AsyncJobOptions + +interface AsyncResult { + readyKey: string + result: null + timeout: boolean +} + +interface AsyncJobInfo { + isDone: boolean // job是否完成 + timeCost?: number // job消耗的毫秒数 + statusCode?: number // job执行响应码(response.status 的冗余) + request: { // 原始请求 + headers: string + url: string // 原始请求链接 + body?: {} + } + response?: { // 执行结果,如果项目还没有准备好,response可能为null + status: number // job响应码 + headers: string // job响应头 + body: T // job响应体 + } +} + +interface AsyncJobsRes { + result: Record> +} + +const DefaultPollInterval = 3000 +const DefaultSteps = 1.2 +const DefaultMaxPollInterval = 1000 * 10 +const DefaultMaxPollTimes = 7 + +export class SDKAsyncJob { + + constructor(private fetch: SDKFetch) {} + + get(path: string, query?: Object, options: Options = {}) { + const { onPending, onFulfilled, onRejected, timeoutSeconds, readySid, pollingOptions, ...rest } = options + const callbacks: AsyncJobCallbacks = { onPending, onFulfilled, onRejected } + + const q = this.normalizeQuery(query, timeoutSeconds, readySid) + return this.fetch.get(path, q, rest) + .switchMap((res: T | AsyncResult) => this.handleRes(res, callbacks, pollingOptions)) + } + + post(path: string, body?: any, options: Options = {}) { + const { onPending, onFulfilled, onRejected, timeoutSeconds, readySid, pollingOptions, ...rest } = options + const callbacks: AsyncJobCallbacks = { onPending, onFulfilled, onRejected } + + const query = this.normalizeQuery(void 0, timeoutSeconds, readySid) + const url = this.appendQueryToUrl(path, query) + return this.fetch.post(url, body, rest) + .switchMap((res: T | AsyncResult) => this.handleRes( res, callbacks, pollingOptions )) + } + + put(path: string, body?: any, options: Options = {}) { + const { onPending, onFulfilled, onRejected, timeoutSeconds, readySid, pollingOptions, ...rest } = options + const callbacks: AsyncJobCallbacks = { onPending, onFulfilled, onRejected } + + const query = this.normalizeQuery(void 0, timeoutSeconds, readySid) + const url = this.appendQueryToUrl(path, query) + return this.fetch.put(url, body, rest) + .switchMap((res: T | AsyncResult) => this.handleRes( res, callbacks, pollingOptions )) + } + + delete(path: string, body?: any, options: Options = {}) { + const { onPending, onFulfilled, onRejected, timeoutSeconds, readySid, pollingOptions, ...rest } = options + const callbacks: AsyncJobCallbacks = { onPending, onFulfilled, onRejected } + + const query = this.normalizeQuery(void 0, timeoutSeconds, readySid) + const url = this.appendQueryToUrl(path, query) + return this.fetch.delete(url, body, rest) + .switchMap((res: T | AsyncResult) => this.handleRes( res, callbacks, pollingOptions )) + } + + private appendQueryToUrl(url: string, q: Object) { + return appendQueryString(url, toQueryString(q)) + } + + private handleRes( + res: T | AsyncResult, + callbacks: AsyncJobCallbacks, + pollingOptions: PollingOptions = {}, + ) { + // 如果资源在正常时间范围内返回,则直接返回资源 + if (!this.isAsyncResult(res)) { + return of(res) + } + + if (callbacks.onPending) { + callbacks.onPending() + } + + const { + maxTimes = DefaultMaxPollTimes, + interval = DefaultPollInterval, + maxInterval = DefaultMaxPollInterval, + intervalSteps = DefaultSteps, + } = pollingOptions + + return this.waitingForJobDone(res.readyKey, callbacks, { + maxTimes, + interval, + maxInterval, + intervalSteps, + }) + } + + private waitingForJobDone( + key: string, + callbacks: AsyncJobCallbacks, + pollingOptions: Required, + ) { + const polling$ = this.getAsyncJobResult(key) // 轮询前先查询一次 + .switchMap(res => { + if (res === null) { + return this.polling( + key, + 0, + callbacks, + pollingOptions, + ) + } + if (callbacks.onFulfilled) { + callbacks.onFulfilled(res) + } + return of(res) + }) + + return polling$ + } + + private polling( + key: string, + times: number, + callbacks: AsyncJobCallbacks, + pollingOptions: Required, + ): Observable { + const { + intervalSteps, + maxInterval, + maxTimes, + interval, + } = pollingOptions + return timer(interval) + .switchMap(() => this.getAsyncJobResult(key)) + .switchMap(res => { + if (res === null) { + if (times > maxTimes) { + const e = new Error('Async job polling failed') + if (callbacks.onRejected) { + callbacks.onRejected(e) + } + return _throw(e) + } + + const nextTimes = times + 1 + const nextOptions = { + intervalSteps, + maxInterval, + maxTimes, + interval: Math.floor(Math.min(interval * intervalSteps, maxInterval)), + } + return this.polling(key, nextTimes, callbacks, nextOptions) + } + if (callbacks.onFulfilled) { + callbacks.onFulfilled(res) + } + return of(res) + }) + } + + private getAsyncJobResult(key: string): Observable { + return this.fetch.get>('async-jobs', { keys: [key] }) + .map(m => { + const r = m.result[key] + + if (r.response && r.response.status && r.response.status >= 400) { + throw new Error(`${r.response.body || r.response.status}`) + } + + if (r.isDone) { + if (!r.response) { + throw new Error('response is undefined') + } + return r.response.body + } + return null + }) + } + + /** + * 判断资源是否需要异步等待 + */ + private isAsyncResult(res: any): res is AsyncResult { + if (typeof res !== 'object') { + return false + } + + if ('timeout' in res && 'readyKey' in res) { + return true + } + + return false + } + + private normalizeQuery( + query?: Object, + timeoutSeconds: number = 3, // 默认等待 3 秒 + readySid: string = '', + ) { + return { + timeoutAsync: true, + timeoutSeconds, + readySid, + ...query, + } + } + +} diff --git a/src/SDKFetch.ts b/src/SDKFetch.ts index b056f07d5..589c192a0 100644 --- a/src/SDKFetch.ts +++ b/src/SDKFetch.ts @@ -7,8 +7,7 @@ import 'rxjs/add/operator/finally' import { Observable } from 'rxjs/Observable' import { Http, HttpErrorMessage, HttpResponseWithHeaders, getHttpWithResponseHeaders } from './Net/Http' import { UserMe } from './schemas/UserMe' -import { forEach, uuid } from './utils' -import { SDKLogger } from './utils/Logger' +import { appendQueryString, toQueryString, uuid } from './utils' export type SDKFetchOptions = { apiHost?: string @@ -294,51 +293,3 @@ export class SDKFetch { return this.get('users/me') } } - -const appendQueryString = (url: string, queryString: string) => { - if (!queryString) { - return url - } - if (url.slice(-1) === '?') { // '?' 是最后一个字符 - return `${url}${queryString}` - } - return url.indexOf('?') === -1 - ? `${url}?${queryString}` // '?' 不存在 - : `${url}&${queryString}` // '?' 存在,其后还有其他字符 -} - -const toQueryString = (query: any) => { - if (typeof query !== 'object' || !query) { - return '' - } - const result: string[] = [] - forEach(query, (val: any, key: string) => { - if (key === '_') { - SDKLogger.warn('query should not contain key \'_\', it will be ignored') - } else if (Array.isArray(val)) { - val.forEach(_val => { - result.push(`${key}=${encoded(_val)}`) - }) - } else if (typeof val !== 'undefined') { - result.push(`${key}=${encoded(val)}`) - } - }) - return result.join('&') -} - -/** - * encodeURIComponent 不会修改的字符有 A-Z a-z 0-9 - _ . ! ~ * ' ( ) - * - 参考自 https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/encodeURIComponent#Description - * 而被修改的字符,都会以 percent-encoding 方法替换 - * - 参考自 https://tools.ietf.org/html/rfc3986#section-2.4 - * - percent-encoding 的方法参考自 https://tools.ietf.org/html/rfc3986#section-2.1 - */ -const encodedRegExp = /^(%(\d|[a-fA-F]){2}|[a-zA-Z0-9]|-|_|\.|!|~|\*|'|\(|\))*$/ -// ^percent-encoded^ ^^^^^^^^^^^^^escaped^^^^^^^^^^^^^w - -const encoded = (value: {} | null): string => { - const maybeEncoded = String(value) - return encodedRegExp.test(maybeEncoded) - ? maybeEncoded - : encodeURIComponent(maybeEncoded) -} diff --git a/src/index.ts b/src/index.ts index 5e31b4078..b546473ad 100644 --- a/src/index.ts +++ b/src/index.ts @@ -23,6 +23,7 @@ export { Socket, eventToRE as socketEventToRE } export { SDK } from './SDK' export { SDKFetch, HttpHeaders } from './SDKFetch' +export { SDKAsyncJob, AsyncJobOptions } from './SDKAsyncJob' export { Net, CacheStrategy, Http, HttpErrorMessage, HttpError$, Page, batchService, FallbackWhen } from './Net' export { Database, ExecutorResult, QueryToken, OrderDescription, Query, Predicate } from './db' diff --git a/src/utils/helper.ts b/src/utils/helper.ts index 6462172c2..d9a5929b6 100644 --- a/src/utils/helper.ts +++ b/src/utils/helper.ts @@ -1,5 +1,6 @@ import { PagingQuery, UrlPagingQuery, SqlPagingQuery } from './internalTypes' import * as uuidv4 from 'uuid/v4' +import { SDKLogger } from './Logger' export function forEach (target: Array, eachFunc: (val: T, key: number) => void, inverse?: boolean): void @@ -199,3 +200,51 @@ export const hasMorePages = ( export const isNonNullable = (x: T): x is NonNullable => { return x != null } + +export const appendQueryString = (url: string, queryString: string) => { + if (!queryString) { + return url + } + if (url.slice(-1) === '?') { // '?' 是最后一个字符 + return `${url}${queryString}` + } + return url.indexOf('?') === -1 + ? `${url}?${queryString}` // '?' 不存在 + : `${url}&${queryString}` // '?' 存在,其后还有其他字符 +} + +/** + * encodeURIComponent 不会修改的字符有 A-Z a-z 0-9 - _ . ! ~ * ' ( ) + * - 参考自 https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/encodeURIComponent#Description + * 而被修改的字符,都会以 percent-encoding 方法替换 + * - 参考自 https://tools.ietf.org/html/rfc3986#section-2.4 + * - percent-encoding 的方法参考自 https://tools.ietf.org/html/rfc3986#section-2.1 + */ +const encodedRegExp = /^(%(\d|[a-fA-F]){2}|[a-zA-Z0-9]|-|_|\.|!|~|\*|'|\(|\))*$/ +// ^percent-encoded^ ^^^^^^^^^^^^^escaped^^^^^^^^^^^^^w + +export const encoded = (value: {} | null): string => { + const maybeEncoded = String(value) + return encodedRegExp.test(maybeEncoded) + ? maybeEncoded + : encodeURIComponent(maybeEncoded) +} + +export const toQueryString = (query: any) => { + if (typeof query !== 'object' || !query) { + return '' + } + const result: string[] = [] + forEach(query, (val: any, key: string) => { + if (key === '_') { + SDKLogger.warn('query should not contain key \'_\', it will be ignored') + } else if (Array.isArray(val)) { + val.forEach(_val => { + result.push(`${key}=${encoded(_val)}`) + }) + } else if (typeof val !== 'undefined') { + result.push(`${key}=${encoded(val)}`) + } + }) + return result.join('&') +} diff --git a/test/SDKAsyncJob.spec.ts b/test/SDKAsyncJob.spec.ts new file mode 100644 index 000000000..4c283493c --- /dev/null +++ b/test/SDKAsyncJob.spec.ts @@ -0,0 +1,325 @@ +import { expect } from 'chai' +import * as sinon from 'sinon' +import { describe, it, beforeEach, afterEach } from 'tman' + +import { SDKAsyncJob } from '../src/SDKAsyncJob' +import { SDKFetch } from '../src/SDKFetch' +import { Scheduler } from 'rxjs' +import { of } from 'rxjs/observable/of' + +const fetchMock = require('fetch-mock') + +const path = 'test' +const allowedMethods: ['get', 'post', 'put', 'delete'] = ['get', 'post', 'put', 'delete'] + +describe('SDKAsyncJob', () => { + + let sdkFetch: SDKFetch + let sdkAsyncJob: SDKAsyncJob + const apiHost = 'https://www.teambition.com/api' + const testUrl = `${apiHost}/${path}` + const pollingUrl = `${apiHost}/async-jobs` + const urlMatcher = new RegExp(testUrl) + const pollingUrlMatcher = new RegExp(pollingUrl) + + beforeEach(() => { + sdkFetch = new SDKFetch() + sdkFetch.setAPIHost(apiHost) + sdkAsyncJob = new SDKAsyncJob(sdkFetch) + }) + + afterEach(() => { + fetchMock.restore() + }) + + allowedMethods.forEach((method) => { + it(`method ${method} should no difference with SDKFetch.${method} if job respond in time`, function* () { + fetchMock.mock(urlMatcher, { result: 'foo' }) + let fetchResult: string + + yield sdkFetch[method]<{ result: string }>(path) + .subscribeOn(Scheduler.asap) + .do((res) => { + fetchResult = res.result + }) + + yield (sdkAsyncJob[method] as any)(path) + .subscribeOn(Scheduler.asap) + .do((res: { result: string }) => { + expect(res.result).to.equal(fetchResult) + }) + }) + }) + + it('should return response directly if response is not an object', function* () { + fetchMock.mock(urlMatcher, 'foo') + + yield sdkAsyncJob.get(path) + .subscribeOn(Scheduler.asap) + .do(res => { + expect(res).to.equal('foo') + }) + }) + + it('should start polling if response has "timeout"', function* () { + fetchMock.mock(urlMatcher, { + timeout: true, + readyKey: '233' + }) + + fetchMock.mock(pollingUrlMatcher, { + result: { + '233': { + isDone: true, + response: { + status: 200, + headers: '', + body: { result: 'foo' } + } + } + } + }) + + yield sdkAsyncJob.get<{ result: string }>(path) + .subscribeOn(Scheduler.asap) + .do((res) => { + expect(res.result).to.equal('foo') + }) + }) + + it('should continue polling until result responded', function* (this: any) { + this.timeout(1000 * 60) + + fetchMock.mock(urlMatcher, { + timeout: true, + readyKey: '233' + }) + + let pollingTimes = 0 + + fetchMock.mock(pollingUrlMatcher, () => { + if (pollingTimes === 2) { + return { + result: { + '233': { + isDone: true, + response: { + status: 200, + headers: '', + body: { result: 'foo' } + } + } + } + } + } else { + pollingTimes++ + return { + result: { + '233': { + isDone: false + } + } + } + } + }) + + yield sdkAsyncJob.get<{ result: string }>(path, void 0, { + pollingOptions: { + interval: 100, + } + }) + .subscribeOn(Scheduler.asap) + .do((res) => { + expect(res.result).to.equal('foo') + }) + }) + + it('should throw an error if status code >= 400', function* () { + fetchMock.mock(urlMatcher, { + timeout: true, + readyKey: '233' + }) + + fetchMock.mock(pollingUrlMatcher, { + result: { + '233': { + isDone: true, + response: { + status: 401, + body: 'something is wrong' + } + } + } + }) + + yield sdkAsyncJob.get<{ result: string }>(path) + .subscribeOn(Scheduler.asap) + .catch((e: Error) => { + expect(e.message).to.equal('something is wrong') + return of(null) + }) + }) + + it('should throw an error if response body is undefined', function* () { + fetchMock.mock(urlMatcher, { + timeout: true, + readyKey: '233' + }) + + fetchMock.mock(pollingUrlMatcher, { + result: { + '233': { + isDone: true + } + } + }) + + yield sdkAsyncJob.get<{ result: string }>(path) + .subscribeOn(Scheduler.asap) + .catch((e: Error) => { + expect(e.message).to.equal('response is undefined') + return of(null) + }) + }) +}) + +describe('SDKAsyncJob options', () => { + let sdkFetch: SDKFetch + let sdkAsyncJob: SDKAsyncJob + const apiHost = 'https://www.teambition.com/api' + const testUrl = `${apiHost}/${path}` + const pollingUrl = `${apiHost}/async-jobs` + const urlMatcher = new RegExp(testUrl) + const pollingUrlMatcher = new RegExp(pollingUrl) + const successResponse = { + result: { + '233': { + isDone: true, + response: { + status: 200, + headers: '', + body: { result: 'foo' } + } + } + } + } + + beforeEach(() => { + sdkFetch = new SDKFetch() + sdkFetch.setAPIHost(apiHost) + sdkAsyncJob = new SDKAsyncJob(sdkFetch) + fetchMock.mock(urlMatcher, { + timeout: true, + readyKey: '233' + }) + }) + + afterEach(() => { + fetchMock.restore() + }) + + it('should call "onFulfilled" if job finished', function* () { + fetchMock.mock(pollingUrlMatcher, successResponse) + + const callback = sinon.spy((res: any) => expect(res.result).to.equal('foo')) + + yield sdkAsyncJob.get<{ result: string }>(path, void 0, { + onFulfilled: callback + }) + .subscribeOn(Scheduler.asap) + + expect(callback).to.be.calledOnce + }) + + it('should call "onFulfilled" if job finished on polling', function* () { + let pollTimes = 0 + fetchMock.mock(pollingUrlMatcher, () => { + if (pollTimes === 1) { + return successResponse + } + pollTimes++ + return { + result: { + '233': { + isDone: false + } + } + } + }) + + const callback = sinon.spy((res: any) => expect(res.result).to.equal('foo')) + + yield sdkAsyncJob.get<{ result: string }>(path, void 0, { + onFulfilled: callback, + pollingOptions: { + interval: 10, + } + }) + .subscribeOn(Scheduler.asap) + + expect(callback).to.be.calledOnce + }) + + it('should call "onPending" if needs wait', function* () { + fetchMock.mock(pollingUrlMatcher, successResponse) + + const callback = sinon.spy(() => void 0) + + yield sdkAsyncJob.get<{ result: string }>(path, void 0, { + onPending: callback + }) + .subscribeOn(Scheduler.asap) + + expect(callback).to.be.calledOnce + }) + + it('should call "onRejected" if job failed', function* (this: any) { + this.timeout(1000 * 60) + fetchMock.mock(pollingUrlMatcher, { + result: { + '233': { + isDone: false + } + } + }) + + const callback = sinon.spy(() => void 0) + + yield sdkAsyncJob.get<{ result: string }>(path, void 0, { + onRejected: callback, + pollingOptions: { + interval: 100, + maxTimes: 1, + } + }) + .subscribeOn(Scheduler.asap) + .catch((_e: Error) => { + return of(null) + }) + + expect(callback).to.be.calledOnce + }) + + it('should throw error when exceed max polling times', function* (this: any) { + this.timeout(1000 * 60) + fetchMock.mock(pollingUrlMatcher, { + result: { + '233': { + isDone: false + } + } + }) + + yield sdkAsyncJob.get<{ result: string }>(path, void 0, { + pollingOptions: { + interval: 100, + maxTimes: 1, + } + }) + .subscribeOn(Scheduler.asap) + .catch((e: Error) => { + expect(e.message).to.equal('Async job polling failed') + return of(null) + }) + }) +}) diff --git a/test/app.ts b/test/app.ts index b05bab866..061beb1d3 100644 --- a/test/app.ts +++ b/test/app.ts @@ -10,6 +10,7 @@ export * from './utils/httpErrorSpec' export * from './mock/MockSpec' import './SDKFetch.spec' +import './SDKAsyncJob.spec' import './mock' import './apis' import './sockets'