From 5dfd2095c960b944120af289f2594893036bca88 Mon Sep 17 00:00:00 2001 From: Seonghyeon0409 Date: Tue, 12 Nov 2024 17:27:02 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=BD=94=EC=9D=B8=20=EC=A2=85=EB=AA=A9?= =?UTF-8?q?=20api,=20=ED=98=B8=EA=B0=80=EC=B0=BD=20websocket=20=EC=97=B0?= =?UTF-8?q?=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/server/common/upbit.ts | 5 +- .../server/src/upbit/coin-list.service.ts | 48 ++++++++--- .../coin-ticker-websocket.service.spec.ts | 10 +-- .../upbit/coin-ticker-websocket.service.ts | 83 ++++++++++++------- .../upbit/orderbook-websocket.service.spec.ts | 8 +- .../src/upbit/orderbook-websocket.service.ts | 68 +++++++++------ packages/server/src/upbit/sse.service.ts | 36 ++++++-- packages/server/src/upbit/upbit.controller.ts | 26 +++++- packages/server/src/upbit/upbit.module.ts | 4 +- .../server/src/upbit/upbit.service.spec.ts | 18 ---- packages/server/src/upbit/upbit.service.ts | 60 -------------- 11 files changed, 196 insertions(+), 170 deletions(-) delete mode 100644 packages/server/src/upbit/upbit.service.spec.ts delete mode 100644 packages/server/src/upbit/upbit.service.ts diff --git a/packages/server/common/upbit.ts b/packages/server/common/upbit.ts index 4d225393..c6ff8a43 100644 --- a/packages/server/common/upbit.ts +++ b/packages/server/common/upbit.ts @@ -1,4 +1,7 @@ export const UPBIT_WEBSOCKET_URL = 'wss://api.upbit.com/websocket/v1'; export const UPBIT_RESTAPI_URL = 'https://api.upbit.com/v1/market/all?is_details=true' +export const UPBIT_IMAGE_URL = "https://static.upbit.com/logos/" + export const UPBIT_WEBSOCKET_CONNECTION_TIME = 3000 -export const UPBIT_IMAGE_URL = "https://static.upbit.com/logos/" \ No newline at end of file +export const UPBIT_UPDATED_COIN_INFO_TIME = 5000 +export const UPBIT_UPDATED_ORDER_INFO_TIME = 1000 \ No newline at end of file diff --git a/packages/server/src/upbit/coin-list.service.ts b/packages/server/src/upbit/coin-list.service.ts index bbefb944..eb523013 100644 --- a/packages/server/src/upbit/coin-list.service.ts +++ b/packages/server/src/upbit/coin-list.service.ts @@ -2,25 +2,51 @@ import { Injectable, OnModuleInit } from '@nestjs/common'; import { HttpService } from '@nestjs/axios'; import { firstValueFrom } from 'rxjs'; import { UPBIT_IMAGE_URL, UPBIT_RESTAPI_URL } from 'common/upbit'; -import { CoinTickerDto } from './dtos/coin-ticker.dto'; +import { UPBIT_UPDATED_COIN_INFO_TIME } from 'common/upbit'; @Injectable() export class CoinListService{ - private coinCodeList: string[]; + private coinRawList: any; + private coinCodeList: string[] = ["KRW-BTC"]; private coinNameList: Map; - constructor(private readonly httpService: HttpService) {} + private timeoutId: NodeJS.Timeout | null = null; + + constructor(private readonly httpService: HttpService) { + this.getCoinListFromUpbit(); + } async getCoinListFromUpbit() { - const response = await firstValueFrom( - this.httpService.get(UPBIT_RESTAPI_URL), - ); - this.coinCodeList = response.data.map((coin) => coin.market); - this.coinNameList = new Map( - response.data.map((coin) => [coin.market, coin.korean_name]), - ); + try{ + const response = await firstValueFrom( + this.httpService.get(UPBIT_RESTAPI_URL), + ); + this.coinRawList = response.data; + this.coinCodeList = response.data.map((coin) => coin.market); + this.coinNameList = new Map( + response.data.map((coin) => [coin.market, coin.korean_name]), + ); + + }catch(error){ + }finally{ + console.log(`코인 정보 최신화: ${Date()}`); + if (this.timeoutId) clearTimeout(this.timeoutId); + this.timeoutId = setTimeout(()=>this.getCoinListFromUpbit(),UPBIT_UPDATED_COIN_INFO_TIME) + } + } + getCoinNameList(){ + return this.coinCodeList; } getAllCoinList(){ - return this.coinCodeList + return this.coinRawList; + } + getKRWCoinList(){ + return this.coinRawList.filter((coin) => coin.market.startsWith("KRW")) + } + getBTCCoinList(){ + return this.coinRawList.filter((coin)=>coin.market.startsWith("BTC")) + } + getUSDTCoinList(){ + return this.coinRawList.filter((coin) => coin.market.startsWith("USDT")) } tempCoinAddNameAndUrl(message) { message.name = this.coinNameList.get(message.code); diff --git a/packages/server/src/upbit/coin-ticker-websocket.service.spec.ts b/packages/server/src/upbit/coin-ticker-websocket.service.spec.ts index 51a18bed..9022c70a 100644 --- a/packages/server/src/upbit/coin-ticker-websocket.service.spec.ts +++ b/packages/server/src/upbit/coin-ticker-websocket.service.spec.ts @@ -1,15 +1,15 @@ import { Test, TestingModule } from '@nestjs/testing'; -import { UpbitService } from './upbit.service'; +import { CoinTickerService } from './coin-ticker-websocket.service'; -describe('UpbitService', () => { - let service: UpbitService; +describe('CoinTickerService', () => { + let service: CoinTickerService; beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ - providers: [UpbitService], + providers: [CoinTickerService], }).compile(); - service = module.get(UpbitService); + service = module.get(CoinTickerService); }); it('should be defined', () => { diff --git a/packages/server/src/upbit/coin-ticker-websocket.service.ts b/packages/server/src/upbit/coin-ticker-websocket.service.ts index 0c524d33..ba55e50a 100644 --- a/packages/server/src/upbit/coin-ticker-websocket.service.ts +++ b/packages/server/src/upbit/coin-ticker-websocket.service.ts @@ -2,59 +2,78 @@ import { Injectable, OnModuleInit } from '@nestjs/common'; import * as WebSocket from 'ws'; import { SseService } from './sse.service'; import { CoinListService } from './coin-list.service'; -import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL } from 'common/upbit'; +import { + UPBIT_UPDATED_COIN_INFO_TIME, + UPBIT_WEBSOCKET_CONNECTION_TIME, + UPBIT_WEBSOCKET_URL, +} from 'common/upbit'; import { CoinTickerDto } from './dtos/coin-ticker.dto'; @Injectable() -export class CoinTickerService implements OnModuleInit{ +export class CoinTickerService implements OnModuleInit { private websocket: WebSocket; + private sending: Boolean = false; + private timeoutId: NodeJS.Timeout | null = null; constructor( private readonly coinListService: CoinListService, - private readonly sseService: SseService - ) {}; + private readonly sseService: SseService, + ) {} onModuleInit() { this.websocket = new WebSocket(UPBIT_WEBSOCKET_URL); - this.connectWebSocket() + this.connectWebSocket(); } connectWebSocket() { - this.websocket.on('open', async () => { - await this.coinListService.getCoinListFromUpbit(); - const coin_list = this.coinListService.getAllCoinList(); - console.log('WebSocket 연결 성공'); - const subscribeMessage = JSON.stringify([ - { ticket: 'test' }, - { type: 'ticker', codes: coin_list }, - ]); - this.websocket.send(subscribeMessage); + this.websocket.on('open', () => { + try { + console.log('CoinTickerWebSocket 연결이 열렸습니다.'); + this.sendWebSocket(); + } catch (error) { + console.error('sendWebSocket 실행 중 오류 발생:', error); + } }); this.websocket.on('message', (data) => { - const message = JSON.parse(data.toString()); - const temp = this.coinListService.tempCoinAddNameAndUrl(message); - this.sseService.sendEvent(temp); - //현재는 전부 보냅니다. - // const coinTick: CoinTickerDto = dtoMethod(message); - // this.sseService.sendEvent(coinTick); + try{ + const message = JSON.parse(data.toString()); + this.sseService.coinTickerData(message); + }catch(error){ + console.error('CoinTickerWebSocket 오류:', error); + } }); this.websocket.on('close', () => { - console.log('WebSocket 연결이 닫혔습니다. 재연결 시도 중...'); - setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME); + try { + console.log('CoinTickerWebSocket 연결이 닫혔습니다. 재연결 시도 중...'); + setTimeout( + () => this.connectWebSocket(), + UPBIT_WEBSOCKET_CONNECTION_TIME + ); + } catch (error) { + console.error('WebSocket 재연결 설정 중 오류 발생:', error); + } }); this.websocket.on('error', (error) => { - console.error('WebSocket 오류:', error); + console.error('CoinTickerWebSocket 오류:', error); }); } - async sendWebSocket(){ - await this.coinListService.getCoinListFromUpbit(); - const coin_list = this.coinListService.getAllCoinList(); - console.log('WebSocket 연결 성공'); - const subscribeMessage = JSON.stringify([ - { ticket: 'test' }, - { type: 'ticker', codes: coin_list }, - ]); - this.websocket.send(subscribeMessage); + async sendWebSocket() { + if (this.sending) return; + this.sending = true; + try{ + const coin_list = this.coinListService.getCoinNameList(); + const subscribeMessage = JSON.stringify([ + { ticket: 'test' }, + { type: 'ticker', codes: coin_list }, + ]); + this.websocket.send(subscribeMessage); + }catch(error){ + console.error('CoinTickerWebSocket 오류:', error); + }finally{ + this.sending = false; + if (this.timeoutId) clearTimeout(this.timeoutId); + this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_COIN_INFO_TIME); + } } } diff --git a/packages/server/src/upbit/orderbook-websocket.service.spec.ts b/packages/server/src/upbit/orderbook-websocket.service.spec.ts index 3a74b044..3c7cef46 100644 --- a/packages/server/src/upbit/orderbook-websocket.service.spec.ts +++ b/packages/server/src/upbit/orderbook-websocket.service.spec.ts @@ -1,15 +1,15 @@ import { Test, TestingModule } from '@nestjs/testing'; -import { UpbitService } from './upbit.service'; +import { OrderbookService } from './orderbook-websocket.service'; describe('OrderbookService', () => { - let service: UpbitService; + let service: OrderbookService; beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ - providers: [UpbitService], + providers: [OrderbookService], }).compile(); - service = module.get(UpbitService); + service = module.get(OrderbookService); }); it('should be defined', () => { diff --git a/packages/server/src/upbit/orderbook-websocket.service.ts b/packages/server/src/upbit/orderbook-websocket.service.ts index cb7c2d90..3ec17f08 100644 --- a/packages/server/src/upbit/orderbook-websocket.service.ts +++ b/packages/server/src/upbit/orderbook-websocket.service.ts @@ -1,13 +1,15 @@ import { Injectable, OnModuleInit } from '@nestjs/common'; import * as WebSocket from 'ws'; import { SseService } from './sse.service'; -import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL } from 'common/upbit'; -import { CoinTickerDto } from './dtos/coin-ticker.dto'; import { CoinListService } from './coin-list.service'; +import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL, UPBIT_UPDATED_COIN_INFO_TIME, UPBIT_UPDATED_ORDER_INFO_TIME } from 'common/upbit'; +import { CoinTickerDto } from './dtos/coin-ticker.dto'; @Injectable() export class OrderbookService implements OnModuleInit{ private websocket: WebSocket; + private sending: Boolean = false; + private timeoutId: NodeJS.Timeout | null = null; constructor( private readonly coinListService: CoinListService, @@ -20,35 +22,51 @@ export class OrderbookService implements OnModuleInit{ } connectWebSocket() { - this.websocket.on('open', async () => { - await this.coinListService.getCoinListFromUpbit(); - const coin_list = this.coinListService.getAllCoinList(); - console.log('WebSocket 연결 성공'); - const subscribeMessage = JSON.stringify([ - { ticket: 'test' }, - { type: 'ticker', codes: coin_list }, - ]); - this.websocket.send(subscribeMessage); + this.websocket.on('open', () => { + try { + console.log('OrderbookWebSocket 연결이 열렸습니다.'); + this.sendWebSocket(); + } catch (error) { + console.error('sendWebSocket 실행 중 오류 발생:', error); + } + }); + this.websocket.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + this.sseService.orderbookData(message); + } catch (error) { + console.error('OrderbookWebSocket 메시지 처리 중 오류 발생:', error); + } }); - this.websocket.on('close', () => { - console.log('WebSocket 연결이 닫혔습니다. 재연결 시도 중...'); - setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME); + try { + console.log('OrderbookWebSocket 연결이 닫혔습니다. 재연결 시도 중...'); + setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME); + } catch (error) { + console.error('OrderbookWebSocket 재연결 설정 중 오류 발생:', error); + } }); this.websocket.on('error', (error) => { - console.error('WebSocket 오류:', error); + console.error('OrderbookWebSocket 오류:', error); }); } - sendWebSocket(dtoMethod: Function, coins: string[]){ - let message; - this.websocket.on('message', (data) => { - message = JSON.parse(data.toString()); - const temp = coins.includes(message.code) ? this.coinListService.tempCoinAddNameAndUrl(message) : null; - this.sseService.sendEvent(temp); - //현재는 전부 보냅니다. - // const coinTick: CoinTickerDto = dtoMethod(message); - // this.sseService.sendEvent(coinTick); - }); + async sendWebSocket() { + if (this.sending) return; + this.sending = true; + try{ + const coin_list = this.coinListService.getCoinNameList(); + const subscribeMessage = JSON.stringify([ + { ticket: 'test' }, + { type: 'ticker', codes: coin_list }, + ]); + this.websocket.send(subscribeMessage); + }catch(error){ + console.error('OrderbookWebSocket 오류:', error); + }finally{ + this.sending = false; + if (this.timeoutId) clearTimeout(this.timeoutId); + this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_ORDER_INFO_TIME); + } } } diff --git a/packages/server/src/upbit/sse.service.ts b/packages/server/src/upbit/sse.service.ts index 661f2a80..906351bb 100644 --- a/packages/server/src/upbit/sse.service.ts +++ b/packages/server/src/upbit/sse.service.ts @@ -4,16 +4,22 @@ import { map, takeUntil, filter } from 'rxjs/operators'; @Injectable() export class SseService implements OnModuleDestroy{ - private eventStream$ = new Subject(); - private destroy$ = new Subject(); - - sendEvent(data: any) { - this.eventStream$.next(data); + private coinTickerStream$ = new Subject(); + private orderbookStream$ = new Subject(); + private coinTickerdestroy$ = new Subject(); + private orderbookdestroy$ = new Subject(); + private coinTicker = 0; + coinTickerData(data: any) { + this.coinTicker++; + this.coinTickerStream$.next(data); + } + orderbookData(data:any){ + this.orderbookStream$.next(data); } getPriceUpdatesStream(coins, dto:Function): Observable { - return this.eventStream$.asObservable().pipe( - takeUntil(this.destroy$), + return this.coinTickerStream$.asObservable().pipe( + takeUntil(this.coinTickerdestroy$), filter((data)=>coins.includes(data.code)), map((data) => { //const setDto = dto(data); @@ -25,7 +31,21 @@ export class SseService implements OnModuleDestroy{ ); } + getOrderbookUpdatesStream(coins, dto:Function): Observable { + return this.orderbookStream$.asObservable().pipe( + takeUntil(this.orderbookdestroy$), + filter((data)=> coins.includes(data.code)), + map((data) => { + //const setDto = dto(data); + return new MessageEvent('orderbook-update', { + //data: JSON.stringify(setDto), + data: JSON.stringify(data), + }) as MessageEvent; + }), + ); + } onModuleDestroy() { - this.destroy$.next(); + this.coinTickerdestroy$.next(); + this.orderbookdestroy$.next(); } } diff --git a/packages/server/src/upbit/upbit.controller.ts b/packages/server/src/upbit/upbit.controller.ts index 48c4fc18..890d0279 100644 --- a/packages/server/src/upbit/upbit.controller.ts +++ b/packages/server/src/upbit/upbit.controller.ts @@ -1,4 +1,4 @@ -import { Controller, Sse, Query } from '@nestjs/common'; +import { Controller, Sse, Query, Get } from '@nestjs/common'; import { Observable, Subject } from 'rxjs'; import { SseService } from './sse.service'; import { CoinTickerService } from './coin-ticker-websocket.service'; @@ -15,14 +15,32 @@ export class UpbitController { @Sse('price-updates') priceUpdates(@Query('coins') coins:string[]): Observable { - this.coinTickerService.sendWebSocket(); - return this.sseService.getPriceUpdatesStream(coins,this.coinListService.convertToTickerDTO); + return this.sseService.getPriceUpdatesStream(coins,this.coinListService.tempCoinAddNameAndUrl); + } + @Sse('orderbook') + orderbookUpdates(@Query('coins') coins:string[]): Observable { + return this.sseService.getOrderbookUpdatesStream(coins,this.coinListService.convertToTickerDTO); } - // 상세페이지용 // @Sse('price-updates-detail') // priceUpdatesDetail(@Query('coins') coins:string[]): Observable { // this.upbitService.connectWebSocket(coins); // return this.sseService.getPriceUpdatesStream(); // } + @Get('market/all') + getAllMarkets() { + return this.coinListService.getAllCoinList(); + } + @Get('market/krw') + getKRWMarkets() { + return this.coinListService.getKRWCoinList(); + } + @Get('market/btc') + getBTCMarkets() { + return this.coinListService.getBTCCoinList(); + } + @Get('market/usdt') + getUSDTMarkets() { + return this.coinListService.getUSDTCoinList(); + } } diff --git a/packages/server/src/upbit/upbit.module.ts b/packages/server/src/upbit/upbit.module.ts index 0eff14bf..61780795 100644 --- a/packages/server/src/upbit/upbit.module.ts +++ b/packages/server/src/upbit/upbit.module.ts @@ -4,10 +4,10 @@ import { UpbitController } from './upbit.controller'; import { CoinListService } from './coin-list.service' import { HttpModule } from '@nestjs/axios'; import { SseService } from './sse.service'; - +import { OrderbookService } from './orderbook-websocket.service'; @Module({ imports: [HttpModule], - providers: [CoinTickerService, CoinListService, SseService], + providers: [CoinTickerService, CoinListService, SseService,OrderbookService], controllers: [UpbitController] }) export class UpbitModule {} diff --git a/packages/server/src/upbit/upbit.service.spec.ts b/packages/server/src/upbit/upbit.service.spec.ts deleted file mode 100644 index 51a18bed..00000000 --- a/packages/server/src/upbit/upbit.service.spec.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Test, TestingModule } from '@nestjs/testing'; -import { UpbitService } from './upbit.service'; - -describe('UpbitService', () => { - let service: UpbitService; - - beforeEach(async () => { - const module: TestingModule = await Test.createTestingModule({ - providers: [UpbitService], - }).compile(); - - service = module.get(UpbitService); - }); - - it('should be defined', () => { - expect(service).toBeDefined(); - }); -}); diff --git a/packages/server/src/upbit/upbit.service.ts b/packages/server/src/upbit/upbit.service.ts deleted file mode 100644 index ad682940..00000000 --- a/packages/server/src/upbit/upbit.service.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { Injectable, OnModuleInit } from '@nestjs/common'; -import * as WebSocket from 'ws'; -import { SseService } from './sse.service'; -import { CoinListService } from './coin-list.service'; -import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL } from 'common/upbit'; -import { CoinTickerDto } from './dtos/coin-ticker.dto'; - -@Injectable() -export class UpbitService implements OnModuleInit{ - private websocket: WebSocket; - - constructor( - private readonly coinListService: CoinListService, - private readonly sseService: SseService - ) {}; - - onModuleInit() { - this.websocket = new WebSocket(UPBIT_WEBSOCKET_URL); - this.connectWebSocket() - } - - connectWebSocket() { - this.websocket.on('open', async () => { - await this.coinListService.getCoinListFromUpbit(); - const coin_list = this.coinListService.getAllCoinList(); - console.log('WebSocket 연결 성공'); - const subscribeMessage = JSON.stringify([ - { ticket: 'test' }, - { type: 'ticker', codes: coin_list }, - ]); - this.websocket.send(subscribeMessage); - }); - this.websocket.on('message', (data) => { - const message = JSON.parse(data.toString()); - const temp = this.coinListService.tempCoinAddNameAndUrl(message); - this.sseService.sendEvent(temp); - //현재는 전부 보냅니다. - // const coinTick: CoinTickerDto = dtoMethod(message); - // this.sseService.sendEvent(coinTick); - }); - this.websocket.on('close', () => { - console.log('WebSocket 연결이 닫혔습니다. 재연결 시도 중...'); - setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME); - }); - - this.websocket.on('error', (error) => { - console.error('WebSocket 오류:', error); - }); - } - async sendWebSocket(){ - await this.coinListService.getCoinListFromUpbit(); - const coin_list = this.coinListService.getAllCoinList(); - console.log('WebSocket 연결 성공'); - const subscribeMessage = JSON.stringify([ - { ticket: 'test' }, - { type: 'ticker', codes: coin_list }, - ]); - this.websocket.send(subscribeMessage); - } -}