Skip to content

Commit

Permalink
feat: 코인 종목 api, 호가창 websocket 연결
Browse files Browse the repository at this point in the history
  • Loading branch information
SeongHyeon0409 committed Nov 12, 2024
1 parent ec14ae2 commit 5dfd209
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 170 deletions.
5 changes: 4 additions & 1 deletion packages/server/common/upbit.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
export const UPBIT_WEBSOCKET_URL = 'wss://api.upbit.com/websocket/v1';
export const UPBIT_RESTAPI_URL = 'https://api.upbit.com/v1/market/all?is_details=true'
export const UPBIT_IMAGE_URL = "https://static.upbit.com/logos/"

export const UPBIT_WEBSOCKET_CONNECTION_TIME = 3000
export const UPBIT_IMAGE_URL = "https://static.upbit.com/logos/"
export const UPBIT_UPDATED_COIN_INFO_TIME = 5000
export const UPBIT_UPDATED_ORDER_INFO_TIME = 1000
48 changes: 37 additions & 11 deletions packages/server/src/upbit/coin-list.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,51 @@ import { Injectable, OnModuleInit } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { firstValueFrom } from 'rxjs';
import { UPBIT_IMAGE_URL, UPBIT_RESTAPI_URL } from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';
import { UPBIT_UPDATED_COIN_INFO_TIME } from 'common/upbit';

@Injectable()
export class CoinListService{
private coinCodeList: string[];
private coinRawList: any;
private coinCodeList: string[] = ["KRW-BTC"];
private coinNameList: Map<string, string>;
constructor(private readonly httpService: HttpService) {}
private timeoutId: NodeJS.Timeout | null = null;

constructor(private readonly httpService: HttpService) {
this.getCoinListFromUpbit();
}

async getCoinListFromUpbit() {
const response = await firstValueFrom(
this.httpService.get(UPBIT_RESTAPI_URL),
);
this.coinCodeList = response.data.map((coin) => coin.market);
this.coinNameList = new Map(
response.data.map((coin) => [coin.market, coin.korean_name]),
);
try{
const response = await firstValueFrom(
this.httpService.get(UPBIT_RESTAPI_URL),
);
this.coinRawList = response.data;
this.coinCodeList = response.data.map((coin) => coin.market);
this.coinNameList = new Map(
response.data.map((coin) => [coin.market, coin.korean_name]),
);

}catch(error){
}finally{
console.log(`코인 정보 최신화: ${Date()}`);
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(()=>this.getCoinListFromUpbit(),UPBIT_UPDATED_COIN_INFO_TIME)
}
}
getCoinNameList(){
return this.coinCodeList;
}
getAllCoinList(){
return this.coinCodeList
return this.coinRawList;
}
getKRWCoinList(){
return this.coinRawList.filter((coin) => coin.market.startsWith("KRW"))
}
getBTCCoinList(){
return this.coinRawList.filter((coin)=>coin.market.startsWith("BTC"))
}
getUSDTCoinList(){
return this.coinRawList.filter((coin) => coin.market.startsWith("USDT"))
}
tempCoinAddNameAndUrl(message) {
message.name = this.coinNameList.get(message.code);
Expand Down
10 changes: 5 additions & 5 deletions packages/server/src/upbit/coin-ticker-websocket.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { Test, TestingModule } from '@nestjs/testing';
import { UpbitService } from './upbit.service';
import { CoinTickerService } from './coin-ticker-websocket.service';

describe('UpbitService', () => {
let service: UpbitService;
describe('CoinTickerService', () => {
let service: CoinTickerService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [UpbitService],
providers: [CoinTickerService],
}).compile();

service = module.get<UpbitService>(UpbitService);
service = module.get<CoinTickerService>(CoinTickerService);
});

it('should be defined', () => {
Expand Down
83 changes: 51 additions & 32 deletions packages/server/src/upbit/coin-ticker-websocket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,78 @@ import { Injectable, OnModuleInit } from '@nestjs/common';
import * as WebSocket from 'ws';
import { SseService } from './sse.service';
import { CoinListService } from './coin-list.service';
import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL } from 'common/upbit';
import {
UPBIT_UPDATED_COIN_INFO_TIME,
UPBIT_WEBSOCKET_CONNECTION_TIME,
UPBIT_WEBSOCKET_URL,
} from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';

@Injectable()
export class CoinTickerService implements OnModuleInit{
export class CoinTickerService implements OnModuleInit {
private websocket: WebSocket;
private sending: Boolean = false;
private timeoutId: NodeJS.Timeout | null = null;

constructor(
private readonly coinListService: CoinListService,
private readonly sseService: SseService
) {};
private readonly sseService: SseService,
) {}

onModuleInit() {
this.websocket = new WebSocket(UPBIT_WEBSOCKET_URL);
this.connectWebSocket()
this.connectWebSocket();
}

connectWebSocket() {
this.websocket.on('open', async () => {
await this.coinListService.getCoinListFromUpbit();
const coin_list = this.coinListService.getAllCoinList();
console.log('WebSocket 연결 성공');
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
this.websocket.on('open', () => {
try {
console.log('CoinTickerWebSocket 연결이 열렸습니다.');
this.sendWebSocket();
} catch (error) {
console.error('sendWebSocket 실행 중 오류 발생:', error);
}
});
this.websocket.on('message', (data) => {
const message = JSON.parse(data.toString());
const temp = this.coinListService.tempCoinAddNameAndUrl(message);
this.sseService.sendEvent(temp);
//현재는 전부 보냅니다.
// const coinTick: CoinTickerDto = dtoMethod(message);
// this.sseService.sendEvent(coinTick);
try{
const message = JSON.parse(data.toString());
this.sseService.coinTickerData(message);
}catch(error){
console.error('CoinTickerWebSocket 오류:', error);
}
});
this.websocket.on('close', () => {
console.log('WebSocket 연결이 닫혔습니다. 재연결 시도 중...');
setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME);
try {
console.log('CoinTickerWebSocket 연결이 닫혔습니다. 재연결 시도 중...');
setTimeout(
() => this.connectWebSocket(),
UPBIT_WEBSOCKET_CONNECTION_TIME
);
} catch (error) {
console.error('WebSocket 재연결 설정 중 오류 발생:', error);
}
});

this.websocket.on('error', (error) => {
console.error('WebSocket 오류:', error);
console.error('CoinTickerWebSocket 오류:', error);
});
}
async sendWebSocket(){
await this.coinListService.getCoinListFromUpbit();
const coin_list = this.coinListService.getAllCoinList();
console.log('WebSocket 연결 성공');
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
async sendWebSocket() {
if (this.sending) return;
this.sending = true;
try{
const coin_list = this.coinListService.getCoinNameList();
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
}catch(error){
console.error('CoinTickerWebSocket 오류:', error);
}finally{
this.sending = false;
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_COIN_INFO_TIME);
}
}
}
8 changes: 4 additions & 4 deletions packages/server/src/upbit/orderbook-websocket.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { Test, TestingModule } from '@nestjs/testing';
import { UpbitService } from './upbit.service';
import { OrderbookService } from './orderbook-websocket.service';

describe('OrderbookService', () => {
let service: UpbitService;
let service: OrderbookService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [UpbitService],
providers: [OrderbookService],
}).compile();

service = module.get<UpbitService>(UpbitService);
service = module.get<OrderbookService>(OrderbookService);
});

it('should be defined', () => {
Expand Down
68 changes: 43 additions & 25 deletions packages/server/src/upbit/orderbook-websocket.service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import * as WebSocket from 'ws';
import { SseService } from './sse.service';
import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL } from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';
import { CoinListService } from './coin-list.service';
import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL, UPBIT_UPDATED_COIN_INFO_TIME, UPBIT_UPDATED_ORDER_INFO_TIME } from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';

@Injectable()
export class OrderbookService implements OnModuleInit{
private websocket: WebSocket;
private sending: Boolean = false;
private timeoutId: NodeJS.Timeout | null = null;

constructor(
private readonly coinListService: CoinListService,
Expand All @@ -20,35 +22,51 @@ export class OrderbookService implements OnModuleInit{
}

connectWebSocket() {
this.websocket.on('open', async () => {
await this.coinListService.getCoinListFromUpbit();
const coin_list = this.coinListService.getAllCoinList();
console.log('WebSocket 연결 성공');
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
this.websocket.on('open', () => {
try {
console.log('OrderbookWebSocket 연결이 열렸습니다.');
this.sendWebSocket();
} catch (error) {
console.error('sendWebSocket 실행 중 오류 발생:', error);
}
});
this.websocket.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.sseService.orderbookData(message);
} catch (error) {
console.error('OrderbookWebSocket 메시지 처리 중 오류 발생:', error);
}
});

this.websocket.on('close', () => {
console.log('WebSocket 연결이 닫혔습니다. 재연결 시도 중...');
setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME);
try {
console.log('OrderbookWebSocket 연결이 닫혔습니다. 재연결 시도 중...');
setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME);
} catch (error) {
console.error('OrderbookWebSocket 재연결 설정 중 오류 발생:', error);
}
});

this.websocket.on('error', (error) => {
console.error('WebSocket 오류:', error);
console.error('OrderbookWebSocket 오류:', error);
});
}
sendWebSocket(dtoMethod: Function, coins: string[]){
let message;
this.websocket.on('message', (data) => {
message = JSON.parse(data.toString());
const temp = coins.includes(message.code) ? this.coinListService.tempCoinAddNameAndUrl(message) : null;
this.sseService.sendEvent(temp);
//현재는 전부 보냅니다.
// const coinTick: CoinTickerDto = dtoMethod(message);
// this.sseService.sendEvent(coinTick);
});
async sendWebSocket() {
if (this.sending) return;
this.sending = true;
try{
const coin_list = this.coinListService.getCoinNameList();
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
}catch(error){
console.error('OrderbookWebSocket 오류:', error);
}finally{
this.sending = false;
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_ORDER_INFO_TIME);
}
}
}
36 changes: 28 additions & 8 deletions packages/server/src/upbit/sse.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ import { map, takeUntil, filter } from 'rxjs/operators';

@Injectable()
export class SseService implements OnModuleDestroy{
private eventStream$ = new Subject<any>();
private destroy$ = new Subject<void>();

sendEvent(data: any) {
this.eventStream$.next(data);
private coinTickerStream$ = new Subject<any>();
private orderbookStream$ = new Subject<any>();
private coinTickerdestroy$ = new Subject<void>();
private orderbookdestroy$ = new Subject<void>();
private coinTicker = 0;
coinTickerData(data: any) {
this.coinTicker++;
this.coinTickerStream$.next(data);
}
orderbookData(data:any){
this.orderbookStream$.next(data);
}

getPriceUpdatesStream(coins, dto:Function): Observable<MessageEvent> {
return this.eventStream$.asObservable().pipe(
takeUntil(this.destroy$),
return this.coinTickerStream$.asObservable().pipe(
takeUntil(this.coinTickerdestroy$),
filter((data)=>coins.includes(data.code)),
map((data) => {
//const setDto = dto(data);
Expand All @@ -25,7 +31,21 @@ export class SseService implements OnModuleDestroy{
);
}

getOrderbookUpdatesStream(coins, dto:Function): Observable<MessageEvent> {
return this.orderbookStream$.asObservable().pipe(
takeUntil(this.orderbookdestroy$),
filter((data)=> coins.includes(data.code)),
map((data) => {
//const setDto = dto(data);
return new MessageEvent('orderbook-update', {
//data: JSON.stringify(setDto),
data: JSON.stringify(data),
}) as MessageEvent;
}),
);
}
onModuleDestroy() {
this.destroy$.next();
this.coinTickerdestroy$.next();
this.orderbookdestroy$.next();
}
}
Loading

0 comments on commit 5dfd209

Please sign in to comment.