Skip to content

Commit

Permalink
refactor: add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Attack825 committed Aug 30, 2023
1 parent e40c105 commit b50fcb2
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 55 deletions.
75 changes: 63 additions & 12 deletions src/services/favorite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (
healthImpl "GuGoTik/src/services/health"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/prom"
"context"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
"net/http"
"os"
"syscall"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand All @@ -34,34 +41,78 @@ func main() {
}()

// Configure Pyroscope
profiling.InitPyroscope("GuGoTik.LikeService")

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)
profiling.InitPyroscope("GuGoTik.FavoriteService")

log := logging.LogService(config.FavoriteRpcServerName)

lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.FavoriteRpcServerPort)

if err != nil {
log.Panicf("Rpc %s listen happens error: %v", config.FavoriteRpcServerName, err)
}

srvMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg := prom.Client
reg.MustRegister(srvMetrics)

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
)

if err := consul.RegisterConsul(config.FavoriteRpcServerName, config.FavoriteRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err)
}
log.Infof("Rpc %s is running at %s now", config.FavoriteRpcServerName, config.FavoriteRpcServerPort)

var srv FavoriteServiceServerImpl
var probe healthImpl.ProbeImpl

favorite.RegisterFavoriteServiceServer(s, srv)

health.RegisterHealthServer(s, &probe)
defer CloseMQConn()
if err := consul.RegisterConsul(config.FavoriteRpcServerName, config.FavoriteRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err)
}

srv.New()
log.Infof("Rpc %s is running at %s now", config.FavoriteRpcServerName, config.FavoriteRpcServerPort)
if err := s.Serve(lis); err != nil {
log.Panicf("Rpc %s listen happens error for: %v", config.FavoriteRpcServerName, err)
srvMetrics.InitializeMetrics(s)

g := &run.Group{}
g.Add(func() error {
return s.Serve(lis)
}, func(err error) {
s.GracefulStop()
s.Stop()
log.Errorf("Rpc %s listen happens error for: %v", config.FavoriteRpcServerName, err)
})

httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics}
g.Add(func() error {
m := http.NewServeMux()
m.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
))
httpSrv.Handler = m
log.Infof("Promethus now running")
return httpSrv.ListenAndServe()
}, func(error) {
if err := httpSrv.Close(); err != nil {
log.Errorf("Prometheus %s listen happens error for: %v", config.FavoriteRpcServerName, err)
}
})

g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM))

if err := g.Run(); err != nil {
log.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when runing http server")
os.Exit(1)
}
}
74 changes: 62 additions & 12 deletions src/services/message/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (
healthImpl "GuGoTik/src/services/health"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/prom"
"context"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net"
"net/http"
"os"
"syscall"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand All @@ -36,31 +43,74 @@ func main() {
// Configure Pyroscope
profiling.InitPyroscope("GuGoTik.ChatService")

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)

log := logging.LogService(config.MessageRpcServerName)

lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.MessageRpcServerPort)

if err != nil {
log.Panicf("Rpc %s listen happens error: %v", config.MessageRpcServerName, err)
}

var srv MessageServiceImpl
var probe healthImpl.ProbeImpl
srvMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)

chat.RegisterChatServiceServer(s, srv)
reg := prom.Client
reg.MustRegister(srvMetrics)

health.RegisterHealthServer(s, &probe)
s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
)

if err := consul.RegisterConsul(config.MessageRpcServerName, config.MessageRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.MessageRpcServerName, err)
}
srv.New()
log.Infof("Rpc %s is running at %s now", config.MessageRpcServerName, config.MessageRpcServerPort)
if err := s.Serve(lis); err != nil {
log.Panicf("Rpc %s listen happens error for: %v", config.MessageRpcServerName, err)

var srv MessageServiceImpl
var probe healthImpl.ProbeImpl
chat.RegisterChatServiceServer(s, srv)
health.RegisterHealthServer(s, &probe)

srv.New()
srvMetrics.InitializeMetrics(s)

g := &run.Group{}
g.Add(func() error {
return s.Serve(lis)
}, func(err error) {
s.GracefulStop()
s.Stop()
log.Errorf("Rpc %s listen happens error for: %v", config.MessageRpcServerName, err)
})

httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics}
g.Add(func() error {
m := http.NewServeMux()
m.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
))
httpSrv.Handler = m
log.Infof("Promethus now running")
return httpSrv.ListenAndServe()
}, func(error) {
if err := httpSrv.Close(); err != nil {
log.Errorf("Prometheus %s listen happens error for: %v", config.MessageRpcServerName, err)
}
})

g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM))

if err := g.Run(); err != nil {
log.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when runing http server")
os.Exit(1)
}
}
74 changes: 63 additions & 11 deletions src/services/publish/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ import (
healthImpl "GuGoTik/src/services/health"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/prom"
"context"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"net"
"net/http"
"os"
"syscall"
)

func main() {
Expand All @@ -35,31 +42,76 @@ func main() {
// Configure Pyroscope
profiling.InitPyroscope("GuGoTik.PublishService")

maxSize := 500 * 1024 * 1024
s := grpc.NewServer(
grpc.MaxRecvMsgSize(maxSize),
grpc.MaxSendMsgSize(maxSize),
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)

log := logging.LogService(config.PublishRpcServerName)
lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.PublishRpcServerPort)

if err != nil {
log.Panicf("Rpc %s listen happens error: %v", config.PublishRpcServerName, err)
}

srvMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg := prom.Client
reg.MustRegister(srvMetrics)

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
)

if err := consul.RegisterConsul(config.PublishRpcServerName, config.PublishRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.PublishRpcServerName, err)
}
log.Infof("Rpc %s is running at %s now", config.PublishRpcServerName, config.PublishRpcServerPort)

var srv PublishServiceImpl
var probe healthImpl.ProbeImpl
defer CloseMQConn()
publish.RegisterPublishServiceServer(s, srv)
health.RegisterHealthServer(s, &probe)
defer CloseMQConn()
if err := consul.RegisterConsul(config.PublishRpcServerName, config.PublishRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.PublishRpcServerName, err)
}
srv.New()
log.Infof("Rpc %s is running at %s now", config.PublishRpcServerName, config.PublishRpcServerPort)
if err := s.Serve(lis); err != nil {
log.Panicf("Rpc %s listen happens error for: %v", config.PublishRpcServerName, err)
srvMetrics.InitializeMetrics(s)

g := &run.Group{}
g.Add(func() error {
return s.Serve(lis)
}, func(err error) {
s.GracefulStop()
s.Stop()
log.Errorf("Rpc %s listen happens error for: %v", config.PublishRpcServerName, err)
})

httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics}
g.Add(func() error {
m := http.NewServeMux()
m.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
))
httpSrv.Handler = m
log.Infof("Promethus now running")
return httpSrv.ListenAndServe()
}, func(error) {
if err := httpSrv.Close(); err != nil {
log.Errorf("Prometheus %s listen happens error for: %v", config.PublishRpcServerName, err)
}
})

g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM))

if err := g.Run(); err != nil {
log.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when runing http server")
os.Exit(1)
}
}
Loading

0 comments on commit b50fcb2

Please sign in to comment.