Skip to content

Commit

Permalink
make _id a type ObjectId, refactored createTask, and storeMessageThread.
Browse files Browse the repository at this point in the history
  • Loading branch information
salisshuaibu11 committed Jan 25, 2025
1 parent fd06f96 commit 42715fc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
12 changes: 9 additions & 3 deletions examples/example-tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ async function main() {

await taskDb.connect();


// // Store a task
await taskDb.createTask('task_001',{
_id: "task_001",
type: "ongoing",
const now = new Date();
await taskDb.createTask({
status: "pending",
type: "scheduled",
updatedAt: now,
createdAt: now,
scheduledFor: new Date(Date.now() + 1000 * 60 * 60 * 24), // 24 hours from now,
metadata: { description: "Sample task" },
});
Expand All @@ -24,6 +28,8 @@ async function main() {
await taskDb.executeTask(task._id);
}

console.log(tasks);

// Store a message thread
await taskDb.storeMessageThread("thread_001", "user_123", [
{
Expand Down
44 changes: 21 additions & 23 deletions packages/core/src/core/tasks-db.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { MongoClient, Collection } from "mongodb";
import {MongoClient, Collection, type ObjectId} from "mongodb";

interface Task {
_id: string;
_id?: ObjectId;
type: "ongoing" | "scheduled";
status: "pending" | "completed";
createdAt?: Date;
Expand All @@ -28,11 +28,11 @@ export class KeyValueDB {
private client: MongoClient;
private tasksCollection!: Collection<Task>;
private threadsCollection!: Collection<MessageThread>;
private cache: Map<string, Task>; // In-memory cache
private cache: Map<ObjectId, Task>; // In-memory cache

constructor(private uri: string, private dbName: string = "myApp") {
this.client = new MongoClient(uri);
this.cache = new Map<string, Task>();
this.cache = new Map<ObjectId, Task>();
}

/**
Expand All @@ -54,13 +54,13 @@ export class KeyValueDB {
* Cache a task
* */
private cacheTask(task: Task): void {
this.cache.set(task._id, task);
this.cache.set(<ObjectId>task._id, task);
}

/**
* Fetch from cache or database
* */
private async fetchTaskFromCacheOrDB(taskId: string): Promise<Task | null> {
private async fetchTaskFromCacheOrDB(taskId: ObjectId): Promise<Task | null> {
// Check the in-memory cache first
if (this.cache.has(taskId)) {
return this.cache.get(taskId)!;
Expand All @@ -76,24 +76,17 @@ export class KeyValueDB {

/**
*
* Upsert a task (updates cache and DB)
* Create a new task (updates cache and DB)
* */
async createTask(taskId: string, taskData: Partial<Task>): Promise<void> {
const now = new Date();
const data = { ...taskData, updatedAt: now };
if (!taskData.createdAt) {
data.createdAt = now;
}

await this.tasksCollection.updateOne(
{ _id: taskId },
{ $set: data },
{ upsert: true } // If there is an existing task with this id - update it
async createTask(taskData: Task): Promise<ObjectId> {
const result = await this.tasksCollection.insertOne(
taskData
);

// Update the cache
const updatedTask = { ...data, _id: taskId } as Task;
this.cacheTask(updatedTask);
this.cacheTask(taskData);

return result.insertedId;
}

/**
Expand All @@ -115,12 +108,12 @@ export class KeyValueDB {
}

// Fetch a single task (uses cache)
async fetchTask(taskId: string): Promise<Task | null> {
async fetchTask(taskId: ObjectId): Promise<Task | null> {
return this.fetchTaskFromCacheOrDB(taskId);
}

// Execute a task and mark it as completed (cache and DB consistency)
async executeTask(taskId: string): Promise<void> {
async executeTask(taskId: ObjectId): Promise<void> {
const task = await this.fetchTaskFromCacheOrDB(taskId);
if (!task) {
throw new Error(`Task with ID ${taskId} not found`);
Expand Down Expand Up @@ -150,7 +143,12 @@ export class KeyValueDB {
messages,
createdAt: new Date(),
};
await this.threadsCollection.insertOne(threadData);

await this.threadsCollection.updateOne(
{ _id: threadId },
{ $set: threadData },
{ upsert: true }
);
}

// Fetch a message thread by ID
Expand Down

0 comments on commit 42715fc

Please sign in to comment.