Building Scalable Microservices in Golang: Leveraging gRPC, RabbitMQ, and SQLite for Efficient Messaging and State Management with RESTful Automation
The initial idea after a bit of reasearch was the following:
A user sends a request to a rest api that invokes a gRPC. However that was a mistake, since the idea of the project was to be async, instead of directly calling the method from the restapi.
Then after some more reasearch I figured out that the best way implement the RabbitMQ and gRPC was the following: API --> RabbitMQ Queue that then on goes to call the Microservice, and then they communicate between each other via gRPC.
After getting my hands dirty with the RabbitMQ, I figured out I would need a way to call the callback function from the previous diagram, so I stumbled on the Request-reply pattern seen here :
This is the initial proto file, that I used to build the 2 microservices, allow them to communicate. They have a function called AddItem, that when it's called it needs a message(ItemRequest) and it returns the message(ItemResponse)
syntax = "proto3";
option go_package = "grpc/ms/pb";
service MyService {
rpc AddItem(ItemRequest) returns (ItemResponse) {}
}
message ItemRequest {
string name = 1;
}
message ItemResponse {
int64 id = 1;
string name = 2;
}
I first had to implement the AddItem function that we previously defined in the protofile
I'm just showing code snippets, if you want to find the full file it's server2/main.go
// function to add items to the database (CREATE)
func (s *server) AddItem(ctx context.Context, req *pb.ItemRequest) (*pb.ItemResponse, error) {
//insert the item into the database
result, err := s.db.Exec("INSERT INTO items(name) VALUES(?)", req.Name)
//check for errors, due to sqlite instance not running?
if err != nil {
return nil, fmt.Errorf("failed to add item: %v", err)
}
//get the id of the last inserted item
id, _ := result.LastInsertId()
return &pb.ItemResponse{
Id: id,
Name: req.Name,
}, nil
}
Afterwards I craete the db as well as the gRPC server
listener, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterMyServiceServer(s, &server{db: db})
With this Microservice, I had to implement a Customer as well as a Producer of requests
I'm just showing code snippets, if you want to find the full file it's server2/main.go
The following part is for the gRPC server -
// set up a connection to the existing gRPC server
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// create a gRPC client for the existing MyService
client := pb.NewMyServiceClient(conn)
// set up the intermediate gRPC server
listener, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// create the intermediate gRPC server
intermediateServer := grpc.NewServer()
pb.RegisterMyServiceServer(intermediateServer, &intermediateService{client: client})
And for the RabbitMQ Queue
// start RabbitMQ consumer
go func() {
connRabbit, err := amqp.Dial(rabbitURL)
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
}
defer connRabbit.Close()
ch, err := connRabbit.Channel()
if err != nil {
log.Fatalf("failed to open a channel: %v", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
First I establish a connection to the RabbitMQ queue
rabbitConn, err := amqp.Dial(rabbitURL)
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
}
defer rabbitConn.Close()
rabbitChannel, err := rabbitConn.Channel()
if err != nil {
log.Fatalf("failed to open a RabbitMQ channel: %v", err)
}
defer rabbitChannel.Close()
queue, err := rabbitChannel.QueueDeclare(queueName, false, false, false, false, nil)
if err != nil {
log.Fatalf("failed to declare a RabbitMQ queue: %v", err)
}
log.Printf("RabbitMQ queue declared: %v", queue)
defer rabbitChannel.Close()
Then I made an endpoint
e.POST("/add-item", func() error{...})
That was responsible for creating a CorrelationId as well as publishing the request/value of the request to the Queue, that will be picked up by the worker/server.
It returns a CorrelationId, since we don't know when the queue will be done. So a user can access the updated information, once it's processed
response := map[string]interface{}{
"message": "Item added and sent to RabbitMQ",
"corrId": corrId,
}
return c.JSON(http.StatusOK, response)
We send a post request to /add-item
{
"Name": "test"
}
and we get a server response of
{
"corrId": "HNLYNSEUBGETFISBMYWTWPSLEWODMRPQ",
"message": "Item added and sent to RabbitMQ"
}
We can see that the data is successfully send over the rabbitmq queue then onto the gRPC services
RestAPI: Echo Queues: RabbitMQ Server: gRPC
- RabbitMQ Docs/Tutorials
- gRPC docs
- RabbitMQ : Message Queues for beginners
- Where should you use gRPC? And where NOT to use it!
- Microservices communication using gRPC Protocol
- Scalable Microservice Architecture Using RabbitMQ RPC
Issue with Correlation ID for Client Responses
The setup for the Correlation ID, aimed at retrieving client responses post-queue, encountered a problem. Initially, I managed to get it to work smoothly, allowing clients to access their responses once the queue finished processing.
However, subsequent attempts faced a glitch. Despite troubleshooting efforts, it consistently broke after the initial success. Due to time constraints, I couldn't pinpoint the root cause and resolve the issue completely within the project's timeframe
I wasn't able to make the dockerfiles in time, since I had to reasearch how everything works, including golang...