Skip to content
This repository has been archived by the owner on Mar 14, 2023. It is now read-only.

Commit

Permalink
single ws connection per client and max 5 concurrent compress jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
clouless committed Dec 30, 2018
1 parent af4355c commit be9e96b
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 47 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kartoffelstampf-client",
"version": "2.1.2",
"version": "2.3.0",
"license": "MIT",
"scripts": {
"ng": "ng",
Expand Down
52 changes: 35 additions & 17 deletions src/app/services/backend.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from '../types/kartoffelstampf-server';
import { Observable, Subject, throwError } from 'rxjs';
import { HttpClient, HttpHeaders, HttpErrorResponse } from '@angular/common/http';
import { catchError } from 'rxjs/operators';
import { catchError, filter, takeWhile } from 'rxjs/operators';

const httpOptions = {
headers: new HttpHeaders({
Expand All @@ -22,8 +22,14 @@ export class BackendService {
private restApiBaseUrl: string;
private webSocketBaseUrl: string;

private ws: WebSocket;
private subject = new Subject<KartoffelstampfTerminalOutputEntry>();

constructor(private http: HttpClient) {
const self = this;
//
// Autodetect URLs
//
const hostname = window.location.hostname;
const protocol = window.location.protocol;
const port = window.location.port;
Expand All @@ -41,6 +47,22 @@ export class BackendService {
this.restApiBaseUrl = `http://localhost:9999`;
this.webSocketBaseUrl = `ws://localhost:9999`;
}
//
// Connect
//
self.ws = new WebSocket(`${this.webSocketBaseUrl}/`);
self.ws.onclose = function(event: CloseEvent) {
console.log('websocket onclose', event);
self.subject.complete();
};
self.ws.onmessage = function(event: MessageEvent) {
const kartoffelstampfTerminalOutputEntry: KartoffelstampfTerminalOutputEntry = JSON.parse(event.data);
self.subject.next(kartoffelstampfTerminalOutputEntry);
};
self.ws.onerror = function(event: ErrorEvent) {
console.log('websocket onerror', event);
self.subject.complete();
};
}

getDownloadUrl(temporaryFileName: string, originalFileName: string) {
Expand All @@ -55,22 +77,18 @@ export class BackendService {
}

runCompressCommand(compressInstruction: KartoffelstampfCompressInstruction): Observable<KartoffelstampfTerminalOutputEntry> {
const ws = new WebSocket(`${this.webSocketBaseUrl}/`);
const subject = new Subject<KartoffelstampfTerminalOutputEntry>();
ws.onopen = function (event) {
ws.send(JSON.stringify(compressInstruction));
};
ws.onmessage = function(event: MessageEvent) {
const kartoffelstampfTerminalOutputEntry: KartoffelstampfTerminalOutputEntry = JSON.parse(event.data);
subject.next(kartoffelstampfTerminalOutputEntry);
};
ws.onerror = function (event) {
console.log('websocket onerror', event);
};
ws.onclose = function (event) {
subject.complete();
};
return subject.asObservable();
this.ws.send(JSON.stringify(compressInstruction));
// Use single websocket connection and distinguish messages by compressInstruction
// The last message sent by the server per compressJob should be type=DONE. This is where we unsubscribe.
return this.subject
.asObservable()
.pipe(
filter(e =>
e.compressInstruction.compressType === compressInstruction.compressType &&
e.compressInstruction.temporaryFileName === compressInstruction.temporaryFileName
),
takeWhile(data => data.type !== 'DONE'),
);
}

}
7 changes: 2 additions & 5 deletions src/app/types/kartoffelstampf-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
// Typings: https://github.com/codeclou/karteoffelstampf-server
//

export interface KartoffelstampfTerminalOutputEntryPayload {
text: string;
}

export interface KartoffelstampfTerminalOutputEntry {
payload: KartoffelstampfTerminalOutputEntryPayload;
payload: any;
type: string;
compressInstruction: KartoffelstampfCompressInstruction;
}

export interface KartoffelstampfImageUploadRequest {
Expand Down
55 changes: 31 additions & 24 deletions src/app/upload-page/upload-page.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Component, OnInit, OnDestroy } from '@angular/core';
import { BackendService } from '../services/backend.service';
import { KartoffelstampfTerminalOutputEntry, KartoffelstampfCompressInstruction } from '../types/kartoffelstampf-server';
import { TerminalLine, CompressImageJobItem } from '../types/kartoffelstampf-client';
import { finalize, takeUntil } from 'rxjs/operators';
import { finalize, takeUntil, takeWhile, endWith } from 'rxjs/operators';
import { Subject, throwError, of, EMPTY } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { HttpErrorResponse } from '@angular/common/http';
Expand Down Expand Up @@ -57,6 +57,8 @@ export class UploadPageComponent implements OnInit, OnDestroy {
uiStateDragLeave = true;

activeStep = 1;
concurrentJobLimit = 5;
concurrentJobCount = 0;

constructor(private backendService: BackendService) { }

Expand Down Expand Up @@ -146,30 +148,35 @@ export class UploadPageComponent implements OnInit, OnDestroy {

runCompressCommand(job: CompressImageJobItem) {
const self = this;
self.backendService.runCompressCommand(<KartoffelstampfCompressInstruction>{
compressType: KartoffelstampfCompressInstruction.COMPRESS_TYPE_LOSSLESS,
temporaryFileName: job.temporaryFileName,
})
.pipe(
finalize(() => {
job.compressDone = true;
}),
takeUntil(self.preDestroy)
)
.subscribe(data => {
if (data.type === 'compressResult') {
job.compressedSize = data.payload['compressedSize'];
} else {
const terminalLine = new TerminalLine(data);
const previousTerminalLine = job.terminalLines[job.terminalLines.length - 1];
if (previousTerminalLine !== undefined &&
previousTerminalLine.clearLine === true &&
terminalLine.clearLine === true) {
job.terminalLines.pop();
}
job.terminalLines.push(terminalLine);
const intervallId = setInterval(function() {
if (self.concurrentJobCount < self.concurrentJobLimit) {
clearInterval(intervallId);
self.concurrentJobCount = self.concurrentJobCount + 1;
self.backendService.runCompressCommand(<KartoffelstampfCompressInstruction>{
compressType: KartoffelstampfCompressInstruction.COMPRESS_TYPE_LOSSLESS,
temporaryFileName: job.temporaryFileName,
})
.pipe(
finalize(() => self.concurrentJobCount = self.concurrentJobCount - 1),
takeUntil(self.preDestroy),
)
.subscribe(data => {
if (data.type === 'compressResult') {
job.compressedSize = data.payload['compressedSize'];
job.compressDone = true;
} else {
const terminalLine = new TerminalLine(data);
const previousTerminalLine = job.terminalLines[job.terminalLines.length - 1];
if (previousTerminalLine !== undefined &&
previousTerminalLine.clearLine === true &&
terminalLine.clearLine === true) {
job.terminalLines.pop();
}
job.terminalLines.push(terminalLine);
}
});
}
});
}, 300);
}

}

0 comments on commit be9e96b

Please sign in to comment.