From 42715fc9528c3b91729ccb7427abab9ac6ef6e55 Mon Sep 17 00:00:00 2001 From: Salisu Shuaibu Date: Sat, 25 Jan 2025 04:30:56 -0800 Subject: [PATCH] make _id a type ObjectId, refactored createTask, and storeMessageThread. --- examples/example-tasks.ts | 12 ++++++-- packages/core/src/core/tasks-db.ts | 44 ++++++++++++++---------------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/examples/example-tasks.ts b/examples/example-tasks.ts index d4794eab..7eeacea2 100644 --- a/examples/example-tasks.ts +++ b/examples/example-tasks.ts @@ -10,10 +10,14 @@ async function main() { await taskDb.connect(); + // // Store a task - await taskDb.createTask('task_001',{ - _id: "task_001", - type: "ongoing", + const now = new Date(); + await taskDb.createTask({ + status: "pending", + type: "scheduled", + updatedAt: now, + createdAt: now, scheduledFor: new Date(Date.now() + 1000 * 60 * 60 * 24), // 24 hours from now, metadata: { description: "Sample task" }, }); @@ -24,6 +28,8 @@ async function main() { await taskDb.executeTask(task._id); } + console.log(tasks); + // Store a message thread await taskDb.storeMessageThread("thread_001", "user_123", [ { diff --git a/packages/core/src/core/tasks-db.ts b/packages/core/src/core/tasks-db.ts index 8c396d0b..d60f221f 100644 --- a/packages/core/src/core/tasks-db.ts +++ b/packages/core/src/core/tasks-db.ts @@ -1,7 +1,7 @@ -import { MongoClient, Collection } from "mongodb"; +import {MongoClient, Collection, type ObjectId} from "mongodb"; interface Task { - _id: string; + _id?: ObjectId; type: "ongoing" | "scheduled"; status: "pending" | "completed"; createdAt?: Date; @@ -28,11 +28,11 @@ export class KeyValueDB { private client: MongoClient; private tasksCollection!: Collection; private threadsCollection!: Collection; - private cache: Map; // In-memory cache + private cache: Map; // In-memory cache constructor(private uri: string, private dbName: string = "myApp") { this.client = new MongoClient(uri); - this.cache = new Map(); + this.cache = new Map(); } /** @@ -54,13 +54,13 @@ export class KeyValueDB { * Cache a task * */ private cacheTask(task: Task): void { - this.cache.set(task._id, task); + this.cache.set(task._id, task); } /** * Fetch from cache or database * */ - private async fetchTaskFromCacheOrDB(taskId: string): Promise { + private async fetchTaskFromCacheOrDB(taskId: ObjectId): Promise { // Check the in-memory cache first if (this.cache.has(taskId)) { return this.cache.get(taskId)!; @@ -76,24 +76,17 @@ export class KeyValueDB { /** * - * Upsert a task (updates cache and DB) + * Create a new task (updates cache and DB) * */ - async createTask(taskId: string, taskData: Partial): Promise { - const now = new Date(); - const data = { ...taskData, updatedAt: now }; - if (!taskData.createdAt) { - data.createdAt = now; - } - - await this.tasksCollection.updateOne( - { _id: taskId }, - { $set: data }, - { upsert: true } // If there is an existing task with this id - update it + async createTask(taskData: Task): Promise { + const result = await this.tasksCollection.insertOne( + taskData ); // Update the cache - const updatedTask = { ...data, _id: taskId } as Task; - this.cacheTask(updatedTask); + this.cacheTask(taskData); + + return result.insertedId; } /** @@ -115,12 +108,12 @@ export class KeyValueDB { } // Fetch a single task (uses cache) - async fetchTask(taskId: string): Promise { + async fetchTask(taskId: ObjectId): Promise { return this.fetchTaskFromCacheOrDB(taskId); } // Execute a task and mark it as completed (cache and DB consistency) - async executeTask(taskId: string): Promise { + async executeTask(taskId: ObjectId): Promise { const task = await this.fetchTaskFromCacheOrDB(taskId); if (!task) { throw new Error(`Task with ID ${taskId} not found`); @@ -150,7 +143,12 @@ export class KeyValueDB { messages, createdAt: new Date(), }; - await this.threadsCollection.insertOne(threadData); + + await this.threadsCollection.updateOne( + { _id: threadId }, + { $set: threadData }, + { upsert: true } + ); } // Fetch a message thread by ID