Fastify plugins for Stackbox applications.
pnpm install @stackbox/fp-plugins
# or
npm install @stackbox/fp-plugins
This package provides a collection of Fastify plugins designed to enhance Stackbox applications. These plugins extend Fastify's functionality while maintaining its performance and developer experience.
A Fastify plugin that integrates an event bus system into your application, enabling efficient event-driven communication between different parts of your application.
- RabbitMQ (
rabbitmq
) - Google Cloud Pub/Sub (
gcp-pubsub
) - Azure Service Bus (
azure-servicebus
) - In-process (
in-process
) - for development and testing
app.register(Plugins.EventBus, {
// Required: type of message broker to use
busType: "rabbitmq" | "gcp-pubsub" | "azure-servicebus" | "in-process",
// Required for gcp-pubsub and azure-servicebus
topic: "your-topic-name",
// Required for azure-servicebus
namespace: "your-namespace",
// Required: define event handlers
handlers: [
{
file: "module-name",
handlers: {
"event-name": async function (msg, req) {
// Handle the event
},
},
},
],
// Required: message validation function
validateMsg: (event, payload, req) => {
// Validate the message
},
// Required: error processing function
processError: (err, ctx) => {
// Process the error
return { err, status: 500 };
},
// Optional: disable the /event-bus/publish/:event route
disableEventPublishRoute: false,
// Optional: control concurrency of event handlers
actionConcurrency: 1,
// Optional: Prometheus registry for metrics
registry: new Registry(),
});
RABBITMQ_URL
: RabbitMQ connection URLK_SERVICE
: Service name for queue naming
EVENT_TOPIC
: GCP Pub/Sub topic nameEVENT_SUBSCRIPTION
: GCP Pub/Sub subscription nameEVENT_SUBSCRIPTION_MAX_MESSAGES
: Maximum concurrent messages (default: 10)
EVENT_NAMESPACE
: Azure Service Bus namespaceEVENT_TOPIC
: Azure Service Bus topicEVENT_SUBSCRIPTION
: Azure Service Bus subscriptionEVENT_SUBSCRIPTION_MAX_CONCURRENT_CALLS
: Maximum concurrent calls (default: 10)
EVENT_RETRY_BASE_DELAY
: Base delay for exponential backoff in ms (default: 5000)EVENT_RETRY_MAX_DELAY
: Maximum delay for retries in ms (default: 60000)
import { EventBus, Plugins, EventBusOptions, EventMessage } from "@stackbox/fp-plugins";
const app = fastify();
// Register event bus
app.register(Plugins.EventBus, {
busType: "rabbitmq",
validateMsg: (event, payload) => {
// Validate event and payload
console.log(`Validating message: ${event}`);
},
processError: (err, ctx) => {
console.error(`Error processing message: ${err.message}`);
return { err, status: 500 };
},
handlers: [
{
file: "orderModule",
handlers: {
"order.created": async function (msg, req) {
// Process order created event
console.log(`Processing order: ${msg.data.orderId}`);
},
"order.cancelled": async function (msg, req) {
// Process order cancelled event
console.log(`Processing cancelled order: ${msg.data.orderId}`);
},
},
},
],
});
// Publishing events
app.post("/create-order", async (req, reply) => {
// Create order logic...
// Publish event
req.EventBus.publish("order.created", {
orderId: "order-123",
customerId: "customer-456",
});
return { success: true };
});
// Delayed events (process after specified milliseconds)
app.post("/schedule-reminder", async (req, reply) => {
req.EventBus.publish(
"reminder.send",
{
userId: "user-123",
message: "Reminder to complete your profile",
},
3600000,
); // Process after 1 hour
return { scheduled: true };
});
To consume events from external services, use the CreateEventConsumer
function:
import { CreateEventConsumer } from "@stackbox/fp-plugins";
// Create consumer that matches your EventBus busType
const consumer = await CreateEventConsumer(app, "rabbitmq");
// Close consumer when application shuts down
app.addHook("onClose", async () => {
await consumer.close();
});
A plugin that provides an abstraction for file storage operations across different providers:
- Local filesystem
- AWS S3
- Google Cloud Storage
- Azure Blob Storage
- MinIO
app.register(Plugins.FileStore, {
// Required: type of storage provider to use
type: "local" | "s3" | "gcs" | "azureBlob" | "minio",
});
LOCAL_STORAGE_DIR
: Directory for file storage (default: system temp directory)
AWS_S3_REGION
: AWS region (default: "us-east-1")S3_BUCKET
: S3 bucket name (required)- Standard AWS authentication environment variables
The plugin also supports other AWS authentication methods including:
- ECS/EC2 instance roles
- AWS IAM roles for service accounts (IRSA)
- Web identity providers
- AWS profiles
STORAGE_BUCKET
: GCS bucket name (required)- Standard GCP authentication environment variables
The plugin also supports other GCP authentication methods including:
- GKE Workload Identity
- Compute Engine service accounts
- GCP Application Default Credentials (ADC)
- Service account key files (not recommended for production)
AZURE_STORAGE_ACCOUNT_URL
: Azure storage account URL (required)AZURE_STORAGE_CONTAINER
: Azure storage container name (required)- Standard Azure authentication environment variables
The plugin also supports other Azure authentication methods including:
- Managed Identities for Azure resources
- Azure AD workload identity
- Azure service principals
- Azure DefaultAzureCredential chain
MINIO_ENDPOINT
: MinIO server endpoint (required)MINIO_ACCESS_KEY_ID
: MinIO access key (required)MINIO_SECRET_ACCESS_KEY
: MinIO secret key (required)MINIO_REGION
: MinIO region (default: "us-east-1")MINIO_BUCKET
: MinIO bucket name (required)
The plugin provides a FileStore
interface with the following methods:
interface FileStore {
exists(filepath: string): Promise<boolean>;
getInfo(filepath: string): Promise<FileInfo | null>;
save(
filepath: string,
contentType: string,
data: string | Buffer,
): Promise<void>;
getAsBuffer(filepath: string): Promise<Buffer>;
getAsStream(filepath: string): Promise<NodeJS.ReadableStream>;
copyFromStream(
filepath: string,
contentType: string,
stream: stream.Readable,
): Promise<void>;
copyFromLocalFile(
filepath: string,
contentType: string,
localFilepath: string,
): Promise<void>;
}
interface FileInfo {
size: number;
contentType: string;
lastModified: Date;
}
import { Plugins, FileStore } from "@stackbox/fp-plugins";
import { fastify } from "fastify";
const app = fastify();
// Register file store plugin
app.register(Plugins.FileStore, {
type: "s3", // Choose the appropriate storage type
});
// Using the file store
app.post("/upload", async (request, reply) => {
const { filepath, contentType, data } = request.body;
// Check if file exists
const exists = await request.server.FileStore.exists(filepath);
// Get file info (returns null if file doesn't exist)
const fileInfo = await request.server.FileStore.getInfo(filepath);
if (fileInfo) {
console.log(`File size: ${fileInfo.size}, Content type: ${fileInfo.contentType}`);
}
// Save file
await request.server.FileStore.save(filepath, contentType, data);
// Get file as buffer
const fileContent = await request.server.FileStore.getAsBuffer(filepath);
// Get file as stream
const fileStream = await request.server.FileStore.getAsStream(filepath);
// Copy from stream
await request.server.FileStore.copyFromStream(
filepath,
contentType,
someReadableStream,
);
// Copy from local file
await request.server.FileStore.copyFromLocalFile(
filepath,
contentType,
"/path/to/local/file",
);
return { success: true };
});
- Node.js 18+
- pnpm
pnpm install
pnpm test
- Run testspnpm run test:coverage
- Run tests with coveragepnpm run build
- Build the projectpnpm run pretty
- Format code
Tests are written using Jest and located alongside source files with .spec.ts
extension.
Contributions are welcome! Please feel free to submit a Pull Request.