Skip to content

Commit

Permalink
Merge pull request #56 from boostcampwm-2024/feature-be-#52
Browse files Browse the repository at this point in the history
[BE] feature #52, #55 - 호가창 websocket 연결, 코인 종목 api
  • Loading branch information
SeongHyeon0409 authored Nov 12, 2024
2 parents e8c220f + 6da7d0d commit 2bf78c2
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 106 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/CICD.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ on:
- main
- dev
- dev-be
- feature-be-#52

jobs:
build_and_deploy:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ services:
context: .
dockerfile: dockerfile-server
image: seunggwan/corinee-server
restart: always
env_file:
- .env
ports:
Expand Down
5 changes: 4 additions & 1 deletion packages/server/common/upbit.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
export const UPBIT_WEBSOCKET_URL = 'wss://api.upbit.com/websocket/v1';
export const UPBIT_RESTAPI_URL = 'https://api.upbit.com/v1/market/all?is_details=true'
export const UPBIT_IMAGE_URL = "https://static.upbit.com/logos/"

export const UPBIT_WEBSOCKET_CONNECTION_TIME = 3000
export const UPBIT_IMAGE_URL = "https://static.upbit.com/logos/"
export const UPBIT_UPDATED_COIN_INFO_TIME = 5000
export const UPBIT_UPDATED_ORDER_INFO_TIME = 1000
70 changes: 49 additions & 21 deletions packages/server/src/upbit/coin-list.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,70 @@ import { Injectable, OnModuleInit } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { firstValueFrom } from 'rxjs';
import { UPBIT_IMAGE_URL, UPBIT_RESTAPI_URL } from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';
import { UPBIT_UPDATED_COIN_INFO_TIME } from 'common/upbit';

@Injectable()
export class CoinListService{
private coinCodeList: string[];
export class CoinListService implements OnModuleInit {
private coinRawList: any;
private coinCodeList: string[] = ["KRW-BTC"];
private coinNameList: Map<string, string>;
private timeoutId: NodeJS.Timeout | null = null;

onModuleInit() {
this.getCoinListFromUpbit();
}

constructor(private readonly httpService: HttpService) {}

async getCoinListFromUpbit() {
const response = await firstValueFrom(
this.httpService.get(UPBIT_RESTAPI_URL),
);
this.coinCodeList = response.data.map((coin) => coin.market);
this.coinNameList = new Map(
response.data.map((coin) => [coin.market, coin.korean_name]),
);
try{
const response = await firstValueFrom(
this.httpService.get(UPBIT_RESTAPI_URL),
);
this.coinCodeList = response.data.map((coin) => coin.market);
this.coinNameList = new Map(
response.data.map((coin) => [coin.market, coin.korean_name]),
);
this.coinRawList = response.data
}catch(error){
console.error('getCoinListFromUpbit error:', error);
}finally{
console.log(`코인 정보 최신화: ${Date()}`);
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(()=>this.getCoinListFromUpbit(),UPBIT_UPDATED_COIN_INFO_TIME)
}
}
getCoinNameList(){
return this.coinCodeList;
}
getAllCoinList(){
return this.coinCodeList
return this.coinRawList.map((coin) => this.coinAddNameAndUrl(coin));
}
getKRWCoinList(){
return this.coinRawList.filter((coin) => coin.market.startsWith("KRW"))
}
getBTCCoinList(){
return this.coinRawList.filter((coin)=>coin.market.startsWith("BTC"))
}
getUSDTCoinList(){
return this.coinRawList.filter((coin) => coin.market.startsWith("USDT"))
}
tempCoinAddNameAndUrl(message) {
coinAddNameAndUrl = (message) => {
message.name = this.coinNameList.get(message.code);
message.coin_img_url = this.getCoinImageURL(message.code);

return message;
}
convertToTickerDTO = (message: string) => {
const data = JSON.parse(message);
convertToTickerDTO = (message) => {
const data = message;
return {
name: this.coinNameList.get(data.code),
code: data.code,
coin_img_url: this.getCoinImageURL(data.code),
signed_change_price: data.signed_change_price,
opening_price: data.opening_price,
signed_change_rate: data.signed_change_rate,
trade_price: data.trade_price,
name: this.coinNameList.get(data.code),
code: data.code,
coin_img_url: this.getCoinImageURL(data.code),
signed_change_price: data.signed_change_price,
opening_price: data.opening_price,
signed_change_rate: data.signed_change_rate,
trade_price: data.trade_price,
}
}
private getCoinImageURL(code: string) {
Expand Down
18 changes: 18 additions & 0 deletions packages/server/src/upbit/coin-ticker-websocket.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { CoinTickerService } from './coin-ticker-websocket.service';

describe('CoinTickerService', () => {
let service: CoinTickerService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [CoinTickerService],
}).compile();

service = module.get<CoinTickerService>(CoinTickerService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
80 changes: 80 additions & 0 deletions packages/server/src/upbit/coin-ticker-websocket.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import * as WebSocket from 'ws';
import { SseService } from './sse.service';
import { CoinListService } from './coin-list.service';
import {
UPBIT_UPDATED_COIN_INFO_TIME,
UPBIT_WEBSOCKET_CONNECTION_TIME,
UPBIT_WEBSOCKET_URL,
} from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';

@Injectable()
export class CoinTickerService implements OnModuleInit {
private websocket: WebSocket;
private sending: Boolean = false;
private timeoutId: NodeJS.Timeout | null = null;
private coinLatestInfo = new Map();
constructor(
private readonly coinListService: CoinListService,
private readonly sseService: SseService,
) {}

onModuleInit() {
this.websocket = new WebSocket(UPBIT_WEBSOCKET_URL);
this.connectWebSocket();
}

connectWebSocket() {
this.websocket.on('open', () => {
try {
console.log('CoinTickerWebSocket 연결이 열렸습니다.');
this.sendWebSocket();
} catch (error) {
console.error('sendWebSocket 실행 중 오류 발생:', error);
}
});
this.websocket.on('message', (data) => {
try{
const message = JSON.parse(data.toString());
this.sseService.coinTickerData(message);
this.sseService.setCoinLastestInfo(message);
}catch(error){
console.error('CoinTickerWebSocket 오류:', error);
}
});
this.websocket.on('close', () => {
try {
console.log('CoinTickerWebSocket 연결이 닫혔습니다. 재연결 시도 중...');
setTimeout(
() => this.connectWebSocket(),
UPBIT_WEBSOCKET_CONNECTION_TIME
);
} catch (error) {
console.error('WebSocket 재연결 설정 중 오류 발생:', error);
}
});

this.websocket.on('error', (error) => {
console.error('CoinTickerWebSocket 오류:', error);
});
}
async sendWebSocket() {
if (this.sending) return;
this.sending = true;
try{
const coin_list = this.coinListService.getCoinNameList();
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
}catch(error){
console.error('CoinTickerWebSocket 오류:', error);
}finally{
this.sending = false;
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_COIN_INFO_TIME);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { Test, TestingModule } from '@nestjs/testing';
import { UpbitService } from './upbit.service';
import { OrderbookService } from './orderbook-websocket.service';

describe('UpbitService', () => {
let service: UpbitService;
describe('OrderbookService', () => {
let service: OrderbookService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [UpbitService],
providers: [OrderbookService],
}).compile();

service = module.get<UpbitService>(UpbitService);
service = module.get<OrderbookService>(OrderbookService);
});

it('should be defined', () => {
Expand Down
72 changes: 72 additions & 0 deletions packages/server/src/upbit/orderbook-websocket.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import * as WebSocket from 'ws';
import { SseService } from './sse.service';
import { CoinListService } from './coin-list.service';
import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL, UPBIT_UPDATED_COIN_INFO_TIME, UPBIT_UPDATED_ORDER_INFO_TIME } from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';

@Injectable()
export class OrderbookService implements OnModuleInit{
private websocket: WebSocket;
private sending: Boolean = false;
private timeoutId: NodeJS.Timeout | null = null;

constructor(
private readonly coinListService: CoinListService,
private readonly sseService: SseService
) {};

onModuleInit() {
this.websocket = new WebSocket(UPBIT_WEBSOCKET_URL);
this.connectWebSocket()
}

connectWebSocket() {
this.websocket.on('open', () => {
try {
console.log('OrderbookWebSocket 연결이 열렸습니다.');
this.sendWebSocket();
} catch (error) {
console.error('sendWebSocket 실행 중 오류 발생:', error);
}
});
this.websocket.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.sseService.orderbookData(message);
} catch (error) {
console.error('OrderbookWebSocket 메시지 처리 중 오류 발생:', error);
}
});
this.websocket.on('close', () => {
try {
console.log('OrderbookWebSocket 연결이 닫혔습니다. 재연결 시도 중...');
setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME);
} catch (error) {
console.error('OrderbookWebSocket 재연결 설정 중 오류 발생:', error);
}
});

this.websocket.on('error', (error) => {
console.error('OrderbookWebSocket 오류:', error);
});
}
async sendWebSocket() {
if (this.sending) return;
this.sending = true;
try{
const coin_list = this.coinListService.getCoinNameList();
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
}catch(error){
console.error('OrderbookWebSocket 오류:', error);
}finally{
this.sending = false;
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_ORDER_INFO_TIME);
}
}
}
61 changes: 52 additions & 9 deletions packages/server/src/upbit/sse.service.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { Subject, Observable } from 'rxjs';
import { map, takeUntil, filter } from 'rxjs/operators';
import { CoinTickerService } from './coin-ticker-websocket.service';

@Injectable()
export class SseService implements OnModuleDestroy{
private eventStream$ = new Subject<any>();
private destroy$ = new Subject<void>();

sendEvent(data: any) {
this.eventStream$.next(data);
private coinTickerStream$ = new Subject<any>();
private orderbookStream$ = new Subject<any>();
private coinTickerdestroy$ = new Subject<void>();
private orderbookdestroy$ = new Subject<void>();
private coinLatestInfo = new Map();
constructor(
){}
coinTickerData(data: any) {
this.coinTickerStream$.next(data);
}

orderbookData(data:any){
this.orderbookStream$.next(data);
}
setCoinLastestInfo(coin){
this.coinLatestInfo.set(coin.code, coin);
}
initPriceStream(coins, dto: Function) {
const events: MessageEvent[] = [];
if (coins && typeof coins === 'string') {
coins = [coins];
}
coins.forEach(async (coin) => {
while (this.coinLatestInfo.get(coin) === undefined) {
await new Promise(resolve => setTimeout(resolve, 100));
}
const initData = this.coinLatestInfo.get(coin);
const setDto = dto(initData);
const msgEvent = new MessageEvent('price-update', {
data: JSON.stringify(setDto),
}) as MessageEvent;

events.push(msgEvent);
});

return events;
}
getPriceUpdatesStream(coins, dto:Function): Observable<MessageEvent> {
return this.eventStream$.asObservable().pipe(
takeUntil(this.destroy$),
return this.coinTickerStream$.asObservable().pipe(
takeUntil(this.coinTickerdestroy$),
filter((data)=>coins.includes(data.code)),
map((data) => {
const setDto = dto(data);
Expand All @@ -24,7 +54,20 @@ export class SseService implements OnModuleDestroy{
);
}

getOrderbookUpdatesStream(coins, dto:Function): Observable<MessageEvent> {
return this.orderbookStream$.asObservable().pipe(
takeUntil(this.orderbookdestroy$),
filter((data)=> coins.includes(data.code)),
map((data) => {
const setDto = dto(data);
return new MessageEvent('orderbook-update', {
data: JSON.stringify(setDto),
}) as MessageEvent;
}),
);
}
onModuleDestroy() {
this.destroy$.next();
this.coinTickerdestroy$.next();
this.orderbookdestroy$.next();
}
}
Loading

0 comments on commit 2bf78c2

Please sign in to comment.