Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
Enrich Zipkin db tracing
Browse files Browse the repository at this point in the history
Add info regarding

- database type and address
- approximation for queries results size
  • Loading branch information
embs committed Nov 26, 2017
1 parent 9de2c86 commit b352f92
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 28 deletions.
5 changes: 4 additions & 1 deletion cmd/cataloguesvc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ func main() {
}

// Data domain.
db, err := sqlx.Open("mysql", *dsn)
var db catalogue.Database
sqlxdb, err := sqlx.Open("mysql", *dsn)
db = &catalogue.SqlxDb{Db: sqlxdb}
db = catalogue.DbTracingMiddleware()(db)
if err != nil {
logger.Log("err", err)
os.Exit(1)
Expand Down
48 changes: 48 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package catalogue

import(
"context"
"database/sql"

"github.com/jmoiron/sqlx"
)

type Database interface {
Close() error
Ping() error
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Prepare(query string) (StmtMiddleware, error)
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}

// SqlxDb meets the Database interface requirements
type SqlxDb struct {
// db is a reference for the underlying database implementation
Db *sqlx.DB
}

func (sqlxdb *SqlxDb) Close() error {
return sqlxdb.Db.Close()
}

func (sqlxdb *SqlxDb) Ping() error {
return sqlxdb.Db.Ping()
}

func (sqlxdb *SqlxDb) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return sqlxdb.Db.Select(dest, query, args...)
}

func (sqlxdb *SqlxDb) Prepare(query string) (StmtMiddleware, error) {
sel, err := sqlxdb.Db.Prepare(query)
return StmtMiddleware{next: sel}, err
}

func (sqlxdb *SqlxDb) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return sqlxdb.Db.Get(dest, query, args...)
}

func (sqlxdb *SqlxDb) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return sqlxdb.Db.Query(query, args...)
}
87 changes: 87 additions & 0 deletions db_tracing_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package catalogue

import (
"context"
"database/sql"
"unsafe"

stdopentracing "github.com/opentracing/opentracing-go"
)

// Middleware decorates a database.
type DbMiddleware func(Database) Database

// DbTracingMiddleware traces database calls.
func DbTracingMiddleware() DbMiddleware {
return func(next Database) Database {
return dbTracingMiddleware{
next: next,
}
}
}

type dbTracingMiddleware struct {
next Database
}

type StmtMiddleware struct {
next *sql.Stmt
}

func (stmt StmtMiddleware) Close() error {
return stmt.next.Close()
}

func (stmt StmtMiddleware) QueryRow(ctx context.Context, args ...interface{}) *sql.Row {
span := startSpan(ctx, "rows from database")
rows := stmt.next.QueryRow(args...)
finishSpan(span, unsafe.Sizeof(rows))
return rows
}

func (mw dbTracingMiddleware) Close() error {
return mw.next.Close()
}

func (mw dbTracingMiddleware) Ping() error {
return mw.next.Ping()
}

func (mw dbTracingMiddleware) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
span := startSpan(ctx, "socks from database")
err := mw.next.Select(ctx, dest, query, args...)
finishSpan(span, unsafe.Sizeof(dest))
return err
}

func (mw dbTracingMiddleware) Prepare(query string) (StmtMiddleware, error) {
return mw.next.Prepare(query)
}

func (mw dbTracingMiddleware) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
span := startSpan(ctx, "get from database")
err := mw.next.Get(ctx, dest, query, args...)
finishSpan(span, unsafe.Sizeof(dest))
return err
}

func (mw dbTracingMiddleware) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
span := startSpan(ctx, "query from database")
rows, err := mw.next.Query(ctx, query, args...)
finishSpan(span, unsafe.Sizeof(rows))
return rows, err
}

func startSpan(ctx context.Context, n string) stdopentracing.Span {
var span stdopentracing.Span
span, ctx = stdopentracing.StartSpanFromContext(ctx, n)
span.SetTag("span.kind", "client")
span.SetTag("db.type", "mysql")
span.SetTag("peer.address", "catalogue-db:3306")
return span
}

func finishSpan(span stdopentracing.Span, size uintptr) {
span.SetTag("db.query.result.size", size)
span.Finish()
}
8 changes: 4 additions & 4 deletions endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func MakeEndpoints(s Service, tracer stdopentracing.Tracer) Endpoints {
func MakeListEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(listRequest)
socks, err := s.List(req.Tags, req.Order, req.PageNum, req.PageSize)
socks, err := s.List(ctx, req.Tags, req.Order, req.PageNum, req.PageSize)
return listResponse{Socks: socks, Err: err}, err
}
}
Expand All @@ -45,7 +45,7 @@ func MakeListEndpoint(s Service) endpoint.Endpoint {
func MakeCountEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(countRequest)
n, err := s.Count(req.Tags)
n, err := s.Count(ctx, req.Tags)
return countResponse{N: n, Err: err}, err
}
}
Expand All @@ -54,15 +54,15 @@ func MakeCountEndpoint(s Service) endpoint.Endpoint {
func MakeGetEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(getRequest)
sock, err := s.Get(req.ID)
sock, err := s.Get(ctx, req.ID)
return getResponse{Sock: sock, Err: err}, err
}
}

// MakeTagsEndpoint returns an endpoint via the given service.
func MakeTagsEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
tags, err := s.Tags()
tags, err := s.Tags(ctx)
return tagsResponse{Tags: tags, Err: err}, err
}
}
Expand Down
17 changes: 9 additions & 8 deletions logging.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package catalogue

import (
"context"
"strings"
"time"

Expand All @@ -22,7 +23,7 @@ type loggingMiddleware struct {
logger log.Logger
}

func (mw loggingMiddleware) List(tags []string, order string, pageNum, pageSize int) (socks []Sock, err error) {
func (mw loggingMiddleware) List(ctx context.Context, tags []string, order string, pageNum, pageSize int) (socks []Sock, err error) {
defer func(begin time.Time) {
mw.logger.Log(
"method", "List",
Expand All @@ -35,10 +36,10 @@ func (mw loggingMiddleware) List(tags []string, order string, pageNum, pageSize
"took", time.Since(begin),
)
}(time.Now())
return mw.next.List(tags, order, pageNum, pageSize)
return mw.next.List(ctx, tags, order, pageNum, pageSize)
}

func (mw loggingMiddleware) Count(tags []string) (n int, err error) {
func (mw loggingMiddleware) Count(ctx context.Context, tags []string) (n int, err error) {
defer func(begin time.Time) {
mw.logger.Log(
"method", "Count",
Expand All @@ -48,10 +49,10 @@ func (mw loggingMiddleware) Count(tags []string) (n int, err error) {
"took", time.Since(begin),
)
}(time.Now())
return mw.next.Count(tags)
return mw.next.Count(ctx, tags)
}

func (mw loggingMiddleware) Get(id string) (s Sock, err error) {
func (mw loggingMiddleware) Get(ctx context.Context, id string) (s Sock, err error) {
defer func(begin time.Time) {
mw.logger.Log(
"method", "Get",
Expand All @@ -61,10 +62,10 @@ func (mw loggingMiddleware) Get(id string) (s Sock, err error) {
"took", time.Since(begin),
)
}(time.Now())
return mw.next.Get(id)
return mw.next.Get(ctx, id)
}

func (mw loggingMiddleware) Tags() (tags []string, err error) {
func (mw loggingMiddleware) Tags(ctx context.Context) (tags []string, err error) {
defer func(begin time.Time) {
mw.logger.Log(
"method", "Tags",
Expand All @@ -73,7 +74,7 @@ func (mw loggingMiddleware) Tags() (tags []string, err error) {
"took", time.Since(begin),
)
}(time.Now())
return mw.next.Tags()
return mw.next.Tags(ctx)
}

func (mw loggingMiddleware) Health() (health []Health) {
Expand Down
30 changes: 15 additions & 15 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ package catalogue
// catalogue service. Everything here is agnostic to the transport (HTTP).

import (
"context"
"errors"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/jmoiron/sqlx"
)

// Service is the catalogue service, providing read operations on a saleable
// catalogue of sock products.
type Service interface {
List(tags []string, order string, pageNum, pageSize int) ([]Sock, error) // GET /catalogue
Count(tags []string) (int, error) // GET /catalogue/size
Get(id string) (Sock, error) // GET /catalogue/{id}
Tags() ([]string, error) // GET /tags
List(ctx context.Context, tags []string, order string, pageNum, pageSize int) ([]Sock, error) // GET /catalogue
Count(ctx context.Context, tags []string) (int, error) // GET /catalogue/size
Get(ctx context.Context, id string) (Sock, error) // GET /catalogue/{id}
Tags(ctx context.Context) ([]string, error) // GET /tags
Health() []Health // GET /health
}

Expand Down Expand Up @@ -56,19 +56,19 @@ var baseQuery = "SELECT sock.sock_id AS id, sock.name, sock.description, sock.pr

// NewCatalogueService returns an implementation of the Service interface,
// with connection to an SQL database.
func NewCatalogueService(db *sqlx.DB, logger log.Logger) Service {
func NewCatalogueService(db Database, logger log.Logger) Service {
return &catalogueService{
db: db,
logger: logger,
}
}

type catalogueService struct {
db *sqlx.DB
db Database
logger log.Logger
}

func (s *catalogueService) List(tags []string, order string, pageNum, pageSize int) ([]Sock, error) {
func (s *catalogueService) List(ctx context.Context, tags []string, order string, pageNum, pageSize int) ([]Sock, error) {
var socks []Sock
query := baseQuery

Expand All @@ -93,7 +93,7 @@ func (s *catalogueService) List(tags []string, order string, pageNum, pageSize i

query += ";"

err := s.db.Select(&socks, query, args...)
err := s.db.Select(ctx, &socks, query, args...)
if err != nil {
s.logger.Log("database error", err)
return []Sock{}, ErrDBConnection
Expand All @@ -111,7 +111,7 @@ func (s *catalogueService) List(tags []string, order string, pageNum, pageSize i
return socks, nil
}

func (s *catalogueService) Count(tags []string) (int, error) {
func (s *catalogueService) Count(ctx context.Context, tags []string) (int, error) {
query := "SELECT COUNT(DISTINCT sock.sock_id) FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id"

var args []interface{}
Expand All @@ -137,7 +137,7 @@ func (s *catalogueService) Count(tags []string) (int, error) {
defer sel.Close()

var count int
err = sel.QueryRow(args...).Scan(&count)
err = sel.QueryRow(ctx, args...).Scan(&count)

if err != nil {
s.logger.Log("database error", err)
Expand All @@ -147,11 +147,11 @@ func (s *catalogueService) Count(tags []string) (int, error) {
return count, nil
}

func (s *catalogueService) Get(id string) (Sock, error) {
func (s *catalogueService) Get(ctx context.Context, id string) (Sock, error) {
query := baseQuery + " WHERE sock.sock_id =? GROUP BY sock.sock_id;"

var sock Sock
err := s.db.Get(&sock, query, id)
err := s.db.Get(ctx, &sock, query, id)
if err != nil {
s.logger.Log("database error", err)
return Sock{}, ErrNotFound
Expand Down Expand Up @@ -181,10 +181,10 @@ func (s *catalogueService) Health() []Health {
return health
}

func (s *catalogueService) Tags() ([]string, error) {
func (s *catalogueService) Tags(ctx context.Context) ([]string, error) {
var tags []string
query := "SELECT name FROM tag;"
rows, err := s.db.Query(query)
rows, err := s.db.Query(ctx, query)
if err != nil {
s.logger.Log("database error", err)
return []string{}, ErrDBConnection
Expand Down

0 comments on commit b352f92

Please sign in to comment.