Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BE] feature #52, #55 - 호가창 websocket 연결, 코인 종목 api #56

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/CICD.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ on:
- main
- dev
- dev-be
- feature-be-#52

jobs:
build_and_deploy:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ services:
context: .
dockerfile: dockerfile-server
image: seunggwan/corinee-server
restart: always
env_file:
- .env
ports:
Expand Down
5 changes: 4 additions & 1 deletion packages/server/common/upbit.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
export const UPBIT_WEBSOCKET_URL = 'wss://api.upbit.com/websocket/v1';
export const UPBIT_RESTAPI_URL = 'https://api.upbit.com/v1/market/all?is_details=true'
export const UPBIT_IMAGE_URL = "https://static.upbit.com/logos/"

export const UPBIT_WEBSOCKET_CONNECTION_TIME = 3000
export const UPBIT_IMAGE_URL = "https://static.upbit.com/logos/"
export const UPBIT_UPDATED_COIN_INFO_TIME = 5000
export const UPBIT_UPDATED_ORDER_INFO_TIME = 1000
70 changes: 49 additions & 21 deletions packages/server/src/upbit/coin-list.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,70 @@ import { Injectable, OnModuleInit } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { firstValueFrom } from 'rxjs';
import { UPBIT_IMAGE_URL, UPBIT_RESTAPI_URL } from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';
import { UPBIT_UPDATED_COIN_INFO_TIME } from 'common/upbit';

@Injectable()
export class CoinListService{
private coinCodeList: string[];
export class CoinListService implements OnModuleInit {
private coinRawList: any;
private coinCodeList: string[] = ["KRW-BTC"];
private coinNameList: Map<string, string>;
private timeoutId: NodeJS.Timeout | null = null;

onModuleInit() {
this.getCoinListFromUpbit();
}

constructor(private readonly httpService: HttpService) {}

async getCoinListFromUpbit() {
const response = await firstValueFrom(
this.httpService.get(UPBIT_RESTAPI_URL),
);
this.coinCodeList = response.data.map((coin) => coin.market);
this.coinNameList = new Map(
response.data.map((coin) => [coin.market, coin.korean_name]),
);
try{
const response = await firstValueFrom(
this.httpService.get(UPBIT_RESTAPI_URL),
);
this.coinCodeList = response.data.map((coin) => coin.market);
this.coinNameList = new Map(
response.data.map((coin) => [coin.market, coin.korean_name]),
);
this.coinRawList = response.data
}catch(error){
console.error('getCoinListFromUpbit error:', error);
}finally{
console.log(`코인 정보 최신화: ${Date()}`);
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(()=>this.getCoinListFromUpbit(),UPBIT_UPDATED_COIN_INFO_TIME)
}
}
getCoinNameList(){
return this.coinCodeList;
}
getAllCoinList(){
return this.coinCodeList
return this.coinRawList.map((coin) => this.coinAddNameAndUrl(coin));
}
getKRWCoinList(){
return this.coinRawList.filter((coin) => coin.market.startsWith("KRW"))
}
getBTCCoinList(){
return this.coinRawList.filter((coin)=>coin.market.startsWith("BTC"))
}
getUSDTCoinList(){
return this.coinRawList.filter((coin) => coin.market.startsWith("USDT"))
}
tempCoinAddNameAndUrl(message) {
coinAddNameAndUrl = (message) => {
message.name = this.coinNameList.get(message.code);
message.coin_img_url = this.getCoinImageURL(message.code);

return message;
}
convertToTickerDTO = (message: string) => {
const data = JSON.parse(message);
convertToTickerDTO = (message) => {
const data = message;
return {
name: this.coinNameList.get(data.code),
code: data.code,
coin_img_url: this.getCoinImageURL(data.code),
signed_change_price: data.signed_change_price,
opening_price: data.opening_price,
signed_change_rate: data.signed_change_rate,
trade_price: data.trade_price,
name: this.coinNameList.get(data.code),
code: data.code,
coin_img_url: this.getCoinImageURL(data.code),
signed_change_price: data.signed_change_price,
opening_price: data.opening_price,
signed_change_rate: data.signed_change_rate,
trade_price: data.trade_price,
}
}
private getCoinImageURL(code: string) {
Expand Down
18 changes: 18 additions & 0 deletions packages/server/src/upbit/coin-ticker-websocket.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { CoinTickerService } from './coin-ticker-websocket.service';

describe('CoinTickerService', () => {
let service: CoinTickerService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [CoinTickerService],
}).compile();

service = module.get<CoinTickerService>(CoinTickerService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
80 changes: 80 additions & 0 deletions packages/server/src/upbit/coin-ticker-websocket.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import * as WebSocket from 'ws';
import { SseService } from './sse.service';
import { CoinListService } from './coin-list.service';
import {
UPBIT_UPDATED_COIN_INFO_TIME,
UPBIT_WEBSOCKET_CONNECTION_TIME,
UPBIT_WEBSOCKET_URL,
} from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';

@Injectable()
export class CoinTickerService implements OnModuleInit {
private websocket: WebSocket;
private sending: Boolean = false;
private timeoutId: NodeJS.Timeout | null = null;
private coinLatestInfo = new Map();
constructor(
private readonly coinListService: CoinListService,
private readonly sseService: SseService,
) {}

onModuleInit() {
this.websocket = new WebSocket(UPBIT_WEBSOCKET_URL);
this.connectWebSocket();
}

connectWebSocket() {
this.websocket.on('open', () => {
try {
console.log('CoinTickerWebSocket 연결이 열렸습니다.');
this.sendWebSocket();
} catch (error) {
console.error('sendWebSocket 실행 중 오류 발생:', error);
}
});
this.websocket.on('message', (data) => {
try{
const message = JSON.parse(data.toString());
this.sseService.coinTickerData(message);
this.sseService.setCoinLastestInfo(message);
}catch(error){
console.error('CoinTickerWebSocket 오류:', error);
}
});
this.websocket.on('close', () => {
try {
console.log('CoinTickerWebSocket 연결이 닫혔습니다. 재연결 시도 중...');
setTimeout(
() => this.connectWebSocket(),
UPBIT_WEBSOCKET_CONNECTION_TIME
);
} catch (error) {
console.error('WebSocket 재연결 설정 중 오류 발생:', error);
}
});

this.websocket.on('error', (error) => {
console.error('CoinTickerWebSocket 오류:', error);
});
}
async sendWebSocket() {
if (this.sending) return;
this.sending = true;
try{
const coin_list = this.coinListService.getCoinNameList();
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
}catch(error){
console.error('CoinTickerWebSocket 오류:', error);
}finally{
this.sending = false;
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_COIN_INFO_TIME);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { Test, TestingModule } from '@nestjs/testing';
import { UpbitService } from './upbit.service';
import { OrderbookService } from './orderbook-websocket.service';

describe('UpbitService', () => {
let service: UpbitService;
describe('OrderbookService', () => {
let service: OrderbookService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [UpbitService],
providers: [OrderbookService],
}).compile();

service = module.get<UpbitService>(UpbitService);
service = module.get<OrderbookService>(OrderbookService);
});

it('should be defined', () => {
Expand Down
72 changes: 72 additions & 0 deletions packages/server/src/upbit/orderbook-websocket.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import * as WebSocket from 'ws';
import { SseService } from './sse.service';
import { CoinListService } from './coin-list.service';
import { UPBIT_WEBSOCKET_CONNECTION_TIME, UPBIT_WEBSOCKET_URL, UPBIT_UPDATED_COIN_INFO_TIME, UPBIT_UPDATED_ORDER_INFO_TIME } from 'common/upbit';
import { CoinTickerDto } from './dtos/coin-ticker.dto';

@Injectable()
export class OrderbookService implements OnModuleInit{
private websocket: WebSocket;
private sending: Boolean = false;
private timeoutId: NodeJS.Timeout | null = null;

constructor(
private readonly coinListService: CoinListService,
private readonly sseService: SseService
) {};

onModuleInit() {
this.websocket = new WebSocket(UPBIT_WEBSOCKET_URL);
this.connectWebSocket()
}

connectWebSocket() {
this.websocket.on('open', () => {
try {
console.log('OrderbookWebSocket 연결이 열렸습니다.');
this.sendWebSocket();
} catch (error) {
console.error('sendWebSocket 실행 중 오류 발생:', error);
}
});
this.websocket.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.sseService.orderbookData(message);
} catch (error) {
console.error('OrderbookWebSocket 메시지 처리 중 오류 발생:', error);
}
});
this.websocket.on('close', () => {
try {
console.log('OrderbookWebSocket 연결이 닫혔습니다. 재연결 시도 중...');
setTimeout(() => this.connectWebSocket(), UPBIT_WEBSOCKET_CONNECTION_TIME);
} catch (error) {
console.error('OrderbookWebSocket 재연결 설정 중 오류 발생:', error);
}
});

this.websocket.on('error', (error) => {
console.error('OrderbookWebSocket 오류:', error);
});
}
async sendWebSocket() {
if (this.sending) return;
this.sending = true;
try{
const coin_list = this.coinListService.getCoinNameList();
const subscribeMessage = JSON.stringify([
{ ticket: 'test' },
{ type: 'ticker', codes: coin_list },
]);
this.websocket.send(subscribeMessage);
}catch(error){
console.error('OrderbookWebSocket 오류:', error);
}finally{
this.sending = false;
if (this.timeoutId) clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(() => this.sendWebSocket(), UPBIT_UPDATED_ORDER_INFO_TIME);
}
}
}
61 changes: 52 additions & 9 deletions packages/server/src/upbit/sse.service.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { Subject, Observable } from 'rxjs';
import { map, takeUntil, filter } from 'rxjs/operators';
import { CoinTickerService } from './coin-ticker-websocket.service';

@Injectable()
export class SseService implements OnModuleDestroy{
private eventStream$ = new Subject<any>();
private destroy$ = new Subject<void>();

sendEvent(data: any) {
this.eventStream$.next(data);
private coinTickerStream$ = new Subject<any>();
private orderbookStream$ = new Subject<any>();
private coinTickerdestroy$ = new Subject<void>();
private orderbookdestroy$ = new Subject<void>();
private coinLatestInfo = new Map();
constructor(
){}
coinTickerData(data: any) {
this.coinTickerStream$.next(data);
}

orderbookData(data:any){
this.orderbookStream$.next(data);
}
setCoinLastestInfo(coin){
this.coinLatestInfo.set(coin.code, coin);
}
initPriceStream(coins, dto: Function) {
const events: MessageEvent[] = [];
if (coins && typeof coins === 'string') {
coins = [coins];
}
coins.forEach(async (coin) => {
while (this.coinLatestInfo.get(coin) === undefined) {
await new Promise(resolve => setTimeout(resolve, 100));
}
const initData = this.coinLatestInfo.get(coin);
const setDto = dto(initData);
const msgEvent = new MessageEvent('price-update', {
data: JSON.stringify(setDto),
}) as MessageEvent;

events.push(msgEvent);
});

return events;
}
getPriceUpdatesStream(coins, dto:Function): Observable<MessageEvent> {
return this.eventStream$.asObservable().pipe(
takeUntil(this.destroy$),
return this.coinTickerStream$.asObservable().pipe(
takeUntil(this.coinTickerdestroy$),
filter((data)=>coins.includes(data.code)),
map((data) => {
const setDto = dto(data);
Expand All @@ -24,7 +54,20 @@ export class SseService implements OnModuleDestroy{
);
}

getOrderbookUpdatesStream(coins, dto:Function): Observable<MessageEvent> {
return this.orderbookStream$.asObservable().pipe(
takeUntil(this.orderbookdestroy$),
filter((data)=> coins.includes(data.code)),
map((data) => {
const setDto = dto(data);
return new MessageEvent('orderbook-update', {
data: JSON.stringify(setDto),
}) as MessageEvent;
}),
);
}
onModuleDestroy() {
this.destroy$.next();
this.coinTickerdestroy$.next();
this.orderbookdestroy$.next();
}
}
Loading
Loading