Skip to content

Commit

Permalink
Reverting some changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kpachhai committed Jun 7, 2024
1 parent b75ac06 commit a5e1117
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 116 deletions.
49 changes: 18 additions & 31 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
Expand All @@ -14,18 +12,18 @@ import (
"time"

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/hypersdk/server"
"github.com/ava-labs/hypersdk/utils"
"github.com/joho/godotenv"
_ "github.com/lib/pq"
"go.uber.org/zap"

"github.com/nuklai/nuklai-feed/config"
"github.com/nuklai/nuklai-feed/manager"
frpc "github.com/nuklai/nuklai-feed/rpc"
"go.uber.org/zap"
)

var (
httpConfig = &http.Server{
httpConfig = server.HTTPConfig{
ReadTimeout: 60 * time.Second,
ReadHeaderTimeout: 60 * time.Second,
WriteTimeout: 60 * time.Second,
Expand All @@ -45,7 +43,7 @@ func HealthHandler(w http.ResponseWriter, r *http.Request) {
}

func main() {
err := godotenv.Overload()
err := godotenv.Overload() // Overload the environment variables with those from the .env file
if err != nil {
utils.Outf("{{red}}Error loading .env file{{/}}: %v\n", err)
os.Exit(1)
Expand All @@ -63,17 +61,20 @@ func main() {
log := l
log.Info("Logger initialized")

// Load config from environment variables
config, err := config.LoadConfigFromEnv()
if err != nil {
fatal(log, "cannot load config from environment variables", zap.Error(err))
}
log.Info("Config loaded from environment variables")

// Load recipient
if _, err := config.RecipientAddress(); err != nil {
fatal(log, "cannot parse recipient address", zap.Error(err))
}
log.Info("Loaded feed recipient", zap.String("address", config.Recipient))

// Create server
listenAddress := net.JoinHostPort(config.HTTPHost, fmt.Sprintf("%d", config.HTTPPort))
listener, err := net.Listen("tcp", listenAddress)
if err != nil {
Expand All @@ -90,9 +91,11 @@ func main() {
IdleTimeout: httpConfig.IdleTimeout,
}

// Add health check handler
mux.HandleFunc("/health", HealthHandler)
log.Info("Health handler added")

// Retry mechanism for PostgreSQL connection
var db *sql.DB
for i := 0; i < 10; i++ {
db, err = sql.Open("postgres", fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
Expand All @@ -115,6 +118,7 @@ func main() {
}
log.Info("Database connection established")

// Start manager with context handling
manager, err := manager.New(log, config, db)
if err != nil {
fatal(log, "cannot create manager", zap.Error(err))
Expand All @@ -129,39 +133,22 @@ func main() {
}
}()

// Add feed handler
feedServer := frpc.NewJSONRPCServer(manager)

mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "could not read request body", http.StatusInternalServerError)
return
}

var req frpc.JSONRPCRequest
err = json.Unmarshal(body, &req)
if err != nil {
log.Error("Failed to unmarshal JSON-RPC request", zap.Error(err))
http.Error(w, "invalid JSON-RPC request", http.StatusBadRequest)
return
}

response := feedServer.HandleRequest(req)

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
})

handler, err := server.NewHandler(feedServer, "feed")
if err != nil {
fatal(log, "cannot create handler", zap.Error(err))
}
mux.Handle("/", handler)
log.Info("Feed handler added")

// Start server
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
log.Info("Triggering server shutdown", zap.Any("signal", sig))
cancel()
cancel() // Ensure context cancellation cascades down
_ = srv.Shutdown(ctx)
}()
log.Info("Server starting")
Expand Down
11 changes: 10 additions & 1 deletion rpc/jsonrpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,25 @@ package rpc

import (
"context"
"strings"

"github.com/ava-labs/hypersdk/requester"
"github.com/nuklai/nuklai-feed/manager"
)

const (
JSONRPCEndpoint = "/feed"
)

type JSONRPCClient struct {
requester *requester.EndpointRequester
}

// New creates a new client object.
func NewJSONRPCClient(uri string) *JSONRPCClient {
req := requester.New(uri, "")
uri = strings.TrimSuffix(uri, "/")
uri += JSONRPCEndpoint
req := requester.New(uri, "feed")
return &JSONRPCClient{
requester: req,
}
Expand Down Expand Up @@ -44,6 +52,7 @@ func (cli *JSONRPCClient) Feed(ctx context.Context, subnetID, chainID string, li
return resp.Feed, err
}

// UpdateNuklaiRPC updates the RPC url for Nuklai
func (cli *JSONRPCClient) UpdateNuklaiRPC(ctx context.Context, newNuklaiRPCUrl, adminToken string) (bool, error) {
resp := new(UpdateNuklaiRPCReply)
err := cli.requester.SendRequest(
Expand Down
98 changes: 14 additions & 84 deletions rpc/jsonrpc_server.go
Original file line number Diff line number Diff line change
@@ -1,101 +1,31 @@
package rpc

import (
"context"
"encoding/json"
"errors"
"net/http"

"github.com/ava-labs/hypersdk/codec"
"github.com/nuklai/nuklai-feed/manager"
"github.com/nuklai/nuklaivm/consts"
)

type JSONRPCServer struct {
m *manager.Manager
m Manager
}

func NewJSONRPCServer(m *manager.Manager) *JSONRPCServer {
func NewJSONRPCServer(m Manager) *JSONRPCServer {
return &JSONRPCServer{m}
}

type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID interface{} `json:"id"`
}

type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
Error *jsonrpcError `json:"error,omitempty"`
ID interface{} `json:"id"`
}

type jsonrpcError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}

func (j *JSONRPCServer) HandleRequest(req JSONRPCRequest) JSONRPCResponse {
var result interface{}
var jsonErr *jsonrpcError

switch req.Method {
case "feedInfo":
var params struct{}
err := json.Unmarshal(req.Params, &params)
if err != nil {
jsonErr = &jsonrpcError{Code: -32602, Message: "Invalid params"}
break
}
var reply FeedInfoReply
jsonErr = j.FeedInfo(params, &reply)
result = reply

case "feed":
var params FeedArgs
err := json.Unmarshal(req.Params, &params)
if err != nil {
jsonErr = &jsonrpcError{Code: -32602, Message: "Invalid params"}
break
}
var reply FeedReply
jsonErr = j.Feed(params, &reply)
result = reply

case "updateNuklaiRPC":
var params UpdateNuklaiRPCArgs
err := json.Unmarshal(req.Params, &params)
if err != nil {
jsonErr = &jsonrpcError{Code: -32602, Message: "Invalid params"}
break
}
var reply UpdateNuklaiRPCReply
jsonErr = j.UpdateNuklaiRPC(params, &reply)
result = reply

default:
jsonErr = &jsonrpcError{Code: -32601, Message: "Method not found"}
}

return JSONRPCResponse{
JSONRPC: "2.0",
Result: result,
Error: jsonErr,
ID: req.ID,
}
}

type FeedInfoReply struct {
Address string `json:"address"`
Fee uint64 `json:"fee"`
}

func (j *JSONRPCServer) FeedInfo(_ struct{}, reply *FeedInfoReply) *jsonrpcError {
addr, fee, err := j.m.GetFeedInfo(context.Background())
func (j *JSONRPCServer) FeedInfo(req *http.Request, _ *struct{}, reply *FeedInfoReply) (err error) {
addr, fee, err := j.m.GetFeedInfo(req.Context())
if err != nil {
return &jsonrpcError{Code: -32000, Message: err.Error()}
return err
}
reply.Address = codec.MustAddressBech32(consts.HRP, addr)
reply.Fee = fee
Expand All @@ -112,10 +42,10 @@ type FeedReply struct {
Feed []*manager.FeedObject `json:"feed"`
}

func (j *JSONRPCServer) Feed(args FeedArgs, reply *FeedReply) *jsonrpcError {
feed, err := j.m.GetFeed(context.Background(), args.SubnetID, args.ChainID, args.Limit)
func (j *JSONRPCServer) Feed(req *http.Request, args *FeedArgs, reply *FeedReply) (err error) {
feed, err := j.m.GetFeed(req.Context(), args.SubnetID, args.ChainID, args.Limit)
if err != nil {
return &jsonrpcError{Code: -32000, Message: err.Error()}
return err
}
reply.Feed = feed
return nil
Expand All @@ -130,13 +60,13 @@ type UpdateNuklaiRPCReply struct {
Success bool `json:"success"`
}

func (j *JSONRPCServer) UpdateNuklaiRPC(args UpdateNuklaiRPCArgs, reply *UpdateNuklaiRPCReply) *jsonrpcError {
func (j *JSONRPCServer) UpdateNuklaiRPC(req *http.Request, args *UpdateNuklaiRPCArgs, reply *UpdateNuklaiRPCReply) error {
if args.AdminToken != j.m.Config().AdminToken {
return &jsonrpcError{Code: -32000, Message: "unauthorized user"}
return errors.New("unauthorized user")
}
err := j.m.UpdateNuklaiRPC(context.Background(), args.NuklaiRPCUrl)
err := j.m.UpdateNuklaiRPC(req.Context(), args.NuklaiRPCUrl)
if err != nil {
return &jsonrpcError{Code: -32000, Message: err.Error()}
return err
}
reply.Success = true
return nil
Expand Down

0 comments on commit a5e1117

Please sign in to comment.