Skip to content

Commit

Permalink
Merge pull request #26 from xeptagondev/iverfiy-rdc-be
Browse files Browse the repository at this point in the history
Iverfiy rdc be
  • Loading branch information
palindaa authored Oct 18, 2024
2 parents 5eb676f + f9042cc commit b242517
Show file tree
Hide file tree
Showing 16 changed files with 1,402 additions and 92 deletions.
2 changes: 1 addition & 1 deletion apps/publisher/src/api-publisher/api-publisher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { catchError, concatMap, filter, map, reduce, scan, shareReplay, switchMa
import { SharedService } from "../shared/shared.service";
import { ApiPublisherHelper } from "./helper";

@Injectable({ scope: Scope.REQUEST })
@Injectable()
export class ApiPublisherService{
private report$: Observable<any> = this.shared.report$;
private meedanReport$: Observable<any> = this.shared.meedanReport$;
Expand Down
16 changes: 14 additions & 2 deletions apps/publisher/src/app/app.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PublishReportDto {
@Controller()
export class AppController {
private readonly logger = new Logger('PublisherAppService');

constructor(
private readonly appService: AppService,
) {}
Expand Down Expand Up @@ -67,9 +67,21 @@ export class AppController {
this.logger.error(err);
throw new HttpException(err.message, 500);
})
);
);
}catch(e){
return new HttpException(e.message, 500)
}
}

// only for test
// @Get('subscribers')
// @ApiTags('subscribers')
// async subscribers(){
// try{
// return this.appService.notifySubscribers();
// }catch(e){
// return new HttpException(e.message, 500)
// }
// }

}
12 changes: 5 additions & 7 deletions apps/publisher/src/app/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { HttpModule, Module } from '@nestjs/common';


import { AppController } from './app.controller';
import { AppService } from './app.service';
import { SharedModule } from '../shared/shared.module';
Expand All @@ -9,21 +8,20 @@ import { MeedanCheckClientModule } from '@iverify/meedan-check-client';
import { ApiClientModule } from '@iverify/api-client';
import { WpClientModule } from '@iverify/wp-client';
import { ApiPublisherModule } from '../api-publisher/api-publisher.module';


import { CronServicePublisher } from './cron.service';
import { ScheduleModule } from '@nestjs/schedule';
@Module({
imports: [
ScheduleModule.forRoot(),
SharedModule,
WpPublisherModule,
MeedanCheckClientModule,
WpClientModule,
ApiPublisherModule,
ApiClientModule,
HttpModule
HttpModule,
],
controllers: [AppController],
providers: [
AppService,
],
providers: [AppService,CronServicePublisher]
})
export class AppModule {}
25 changes: 14 additions & 11 deletions apps/publisher/src/app/app.service.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import { Injectable, Scope } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { combineLatest, Observable } from 'rxjs';
import { ApiPublisherService } from '../api-publisher/api-publisher.service';
import { SharedService } from '../shared/shared.service';
import { WpPublisherService } from '../wp-publisher/wp-publisher.service';

@Injectable({ scope: Scope.REQUEST })
@Injectable()
export class AppService {
itemsToBePublished$: Observable<any> = combineLatest(([
itemsToBePublished$: Observable<any> = combineLatest([
this.wpPublsher.post$,
this.apiPublisher.postToApi$
]))
this.apiPublisher.postToApi$,
]);

constructor(
private shared: SharedService,
private wpPublsher: WpPublisherService,
private apiPublisher: ApiPublisherService
){}
publishReportById(id: string){
private apiPublisher: ApiPublisherService,
) {}

publishReportById(id: string) {
this.shared.updateReportId(id);
return this.itemsToBePublished$;
}

}

notifySubscribers() {
return this.wpPublsher.sendSubscribesEmail$;
}
}
37 changes: 37 additions & 0 deletions apps/publisher/src/app/cron.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { HttpException,Injectable} from '@nestjs/common';
import { Cron} from '@nestjs/schedule';
import { AppService } from './app.service';

@Injectable()
export class CronServicePublisher {
@Cron('0 0 0 * * *', {
timeZone: process.env.CRON_TIMEZONE || 'UTC'
})
async handleCron() {
const start = new Date();
const startDate = start.toISOString();
console.log('Running hourly job',startDate );
try {
await this.analyze();
} catch (error) {
// this.logger.error(`Failed to run analyze: ${error.message}`, error.stack);
}
}

constructor(private appService: AppService) {}

private async analyze(): Promise<void> {
this.appService.notifySubscribers().subscribe({
next: (created) => {
console.log('Subscribers notified', created);
},
error: (error) => {
console.error(`Cron job error: ${error.message}`, error.stack);
throw new HttpException(error.message, 500);
},
complete: () => {
console.log('Notification process completed.');
},
});
}
}
14 changes: 7 additions & 7 deletions apps/publisher/src/shared/shared.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { MeedanCheckClientService } from "libs/meedan-check-client/src/lib/meeda
import { Observable, Subject } from "rxjs";
import { shareReplay, switchMap, take } from "rxjs/operators";

@Injectable({ scope: Scope.REQUEST })
@Injectable()
export class SharedService{
private _reportId: Subject<string> = new Subject<string>();
reportId$: Observable<string> = this._reportId.asObservable().pipe(take(1), shareReplay(1));
Expand All @@ -12,19 +12,19 @@ export class SharedService{
wpPost$: Observable<string> = this._wpPost.asObservable().pipe(take(1), shareReplay(1));

report$: Observable<any> = this.reportId$.pipe(
switchMap(id => this.checkClient.getReport(id)),
shareReplay(1)
switchMap(id => this.checkClient.getReport(id)),
shareReplay(1)
)
meedanReport$: Observable<any> = this.reportId$.pipe(
switchMap(id => this.checkClient.getMeedanReport(id)),
shareReplay(1)
switchMap(id => this.checkClient.getMeedanReport(id)),
shareReplay(1)
)
private sub = this.report$.subscribe();
private wpPostSub = this.wpPost$.subscribe();


constructor(private checkClient: MeedanCheckClientService){}

updateReportId(id: string){
this._reportId.next(id);
}
Expand Down
51 changes: 48 additions & 3 deletions apps/publisher/src/wp-publisher/wp-publisher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import { CreateCategoryDto } from "libs/wp-client/src/lib/interfaces/create-cate
import { CommentStatus, CreatePostDto, PostFormat, PostStatus } from "libs/wp-client/src/lib/interfaces/create-post.dto";
import { CreateTagDto } from "libs/wp-client/src/lib/interfaces/create-tag.dto";
import { WpClientService } from "libs/wp-client/src/lib/wp-client.service";
import { combineLatest, from, iif, Observable, of, zip } from "rxjs";
import { combineLatest, forkJoin, from, iif, Observable, of, zip } from "rxjs";
import { catchError, concatMap, filter, map, reduce, scan, shareReplay, switchMap, take, tap, withLatestFrom } from "rxjs/operators";
import { SharedService } from "../shared/shared.service";
import { WpPublisherHelper } from "./wp-publisher-helper.service";
import { EmailService } from "@iverify/email/src";
import { DateTime } from 'luxon';

@Injectable({ scope: Scope.REQUEST })
@Injectable()
export class WpPublisherService{
private reportId$: Observable<string> = this.shared.reportId$;
private report$: Observable<any> = this.shared.report$.pipe(tap(report => console.log('Report: ', JSON.stringify(report))));
Expand Down Expand Up @@ -76,7 +77,7 @@ export class WpPublisherService{
tap(wpPost => {
this.shared.updateWpPost(wpPost);
if (data.postDto.email_address) {
this.emailService.submittedFactCheckContent(data.postDto.email_address, wpPost.link).catch(err => {
this.emailService.submittedFactCheckContent(data.postDto.email_address,data.postDto.title,wpPost.link , this.formatDate(data.postDto.date)).catch(err => {
console.error('Error sending post published email:', err);
});
}
Expand All @@ -87,6 +88,45 @@ export class WpPublisherService{
))
);

subscribersList$:Observable<string[]> = this.wpClient.getWPSubscribers();

latestPosts$:Observable<string[]> = this.wpClient.getPostsFromDate();

sendSubscribesEmail$: Observable<any> = combineLatest([this.subscribersList$, this.latestPosts$]).pipe(
switchMap(([subscribersList, latestPosts]) => {
// Handle the case where latestPosts might be null or undefined
console.log('latestPosts', latestPosts)
const postObservables = (latestPosts ?? []).map((post: any) =>
this.wpClient.getPostsThumbnail(post.featured_media).pipe(
map((thumbnail: any) => ({
link: post.link,
title: post.title.rendered,
thumbnail: thumbnail?.guid?.rendered || '',
date: this.formatDate(post.date)
})),
catchError(() => of({
link: post.link,
title: post.title.rendered,
thumbnail: 'https://rdc.i-verify.org/wp-content/uploads/2024/10/Frame-1.jpg',
date: this.formatDate(post.date)
}))
)
);

return forkJoin(postObservables).pipe(
switchMap((formattedPosts) => {
if (subscribersList.length > 0 && formattedPosts.length > 0) {
return this.emailService.sendEmailForSubscribers(subscribersList.join(', '), formattedPosts);
} else {
return of(null);
}
})
);
})
);



constructor(
private http: HttpService,
private shared: SharedService,
Expand All @@ -95,6 +135,11 @@ export class WpPublisherService{
private emailService: EmailService
){}

private formatDate(inputDate) {
// Parse the input date and format it to the desired output
return DateTime.fromISO(inputDate).toFormat('MMMM d, yyyy');
}

private tagsIds(tags: string[]): Observable<number[]>{
const tagsIds$: Observable<number[]> = of(tags).pipe(
switchMap(tags => iif(()=> !!tags.length, this.createManyTags(tags), of([]))),
Expand Down
13 changes: 13 additions & 0 deletions apps/triage/src/app/app.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,17 @@ export class AppController {
return new HttpException(e.message, 500)
}
}
// test end point for UW
// @Get('radio-messages')
// @ApiTags('Radio Messages')
// async testRadioMessages(){
// try{
// const created: number = await this.appService.pullRadioMessages();
// return created;
// console.log('Items created: ', created);
// } catch(e){
// throw new HttpException(e.message, 500);
// }

// }
}
1 change: 0 additions & 1 deletion apps/triage/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import { PerspectiveClientModule } from '@iverify/perspective-client/src';
import { UnitedwaveClientModule } from '@iverify/unitedwave-client';
import { ApiClientModule, ApiClientService } from '@iverify/api-client/src';
import { TranslateService } from './TranslateService/TranslateService';
import { S3Module } from '@iverify/s3/src/lib/s3.module';
@Module({
imports: [
HttpModule,
Expand Down
53 changes: 30 additions & 23 deletions apps/triage/src/app/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,32 +44,41 @@ export class AppService {
startTime = lastMeedanReport[0].node?.tasks?.edges?.find(task => task.node?.label === this.config.originalPostTimeField).node?.first_response_value
}
}
let list = await this.unitedwaveClient.getPosts(startTime).toPromise();
let createdItems = [];
while(list && list.length !== 0) {
this.logger.log(`${list.length} posts found from radio. Creating Meedan Check items...`);
let lastTime;

for (let i = list.length - 1; i >= 0; i--) {
this.logger.log('Creating item...')
const post = list[i]
const item = await this.checkClient.createItemFromRadio(post?.clip_url, post?.clip_name, post?.source_text, post?.date_reported).toPromise();
console.log('item: ', item)
if(!item.error) createdItems = [...createdItems, item];
lastTime = post?.date_reported
}
this.logger.log('United wave response', JSON.stringify(list))
if (lastTime) {
list = await this.unitedwaveClient.getPosts(lastTime).toPromise();
}
else {
list = []
const postsCount = await this.unitedwaveClient.getPostsCount().toPromise();
this.logger.log('Latest unitedwaveClient count', postsCount)
const postsPerPage = 50;
const pageCount = Math.ceil(Number(postsCount) / postsPerPage);
const pageIndex = pageCount - 1;
let createdItems = [];
let createdPosts = [];
for(let page = pageIndex ; page >=0 ; page --) {
const list = await this.unitedwaveClient.getPosts(page,startTime).toPromise();
for (let i = list.length - 1; i >= 0; i--) {
const post = list[i];
//check duplication
const isDuplicate = this.isDuplicatePost(post, createdPosts);
if (!isDuplicate) {
const item = await this.checkClient.createItemFromRadio(post?.clip_url, post?.clip_name, post?.source_text, post?.date_reported).toPromise();
if(!item.error) {
createdItems = [...createdItems, item];
createdPosts = [...createdPosts, post];
}
}
}
}
}
this.logger.log(`Created ${createdItems.length} items.`)
return createdItems.length;
}

private isDuplicatePost(post, createdPosts) {
return createdPosts.some((createdPost) =>
createdPost.clip_url === post?.clip_url &&
createdPost.clip_name === post?.clip_name &&
createdPost.source_text === post?.source_text &&
createdPost.date_reported === post?.date_reported
);
}

async analyze(startDate: string, endDate: string): Promise<number> {
try{
const lists = await this.ctClient.getLists().toPromise();
Expand All @@ -92,7 +101,6 @@ export class AppService {
for(const post of uniqueToxicPosts){
this.logger.log('Creating item...')
const item = await this.checkClient.createItem(post.postUrl, post.toxicScores).toPromise();
console.log('item: ', item)
if(!item.error) createdItems = [...createdItems, item];
}
this.logger.log(`Created ${createdItems.length} items.`)
Expand All @@ -113,7 +121,6 @@ export class AppService {
endDate - ${endDate}`);
this.logger.log(`Max post scan limit - ${this.config.postScanLimit}`)
const res = await this.ctClient.getPosts(listId, pagination.count, pagination.offset, startDate, endDate,token).toPromise();
//console.log('getToxicPostsByList', res)
this.logger.log(`Received ${res.posts.length} posts. Analyzing...`)
let postsCount = 0;
for(const post of res['posts']){
Expand Down
Loading

0 comments on commit b242517

Please sign in to comment.