Skip to content

Commit

Permalink
simplify inspector session
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Oct 24, 2023
1 parent 66a5cf6 commit 1c0387f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 21 deletions.
33 changes: 13 additions & 20 deletions pkg/inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,14 @@ import (

const DefaultBufferSize = 1000

// Session wraps a channel of records and provides:
// 1. a way to send records to it asynchronously
// 2. a way to know if it's closed or not
// Session represents a single inspector session. Records are continuously sent
// into channel C. If the buffer of C is full, records will be dropped. C will
// be closed once the session is removed from the inspector.
type Session struct {
C chan record.Record

id string
componentID string
logger log.CtxLogger
}

// send a record to the session's channel.
// If the channel has already reached its capacity,
// the record will be ignored.
func (s *Session) send(ctx context.Context, r record.Record) {
select {
case s.C <- r:
default:
s.logger.
Warn(ctx).
Str(log.InspectorSessionID, s.id).
Msg("session buffer full, record will be dropped")
}
}

// Inspector is attached to an inspectable pipeline component
Expand Down Expand Up @@ -81,12 +66,13 @@ func New(logger log.CtxLogger, bufferSize int) *Inspector {

// NewSession creates a new session in given inspector.
// componentID is the ID of the component being inspected (connector or processor).
// The session will be closed and removed from the inspector when the context is
// closed.
func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session {
s := &Session{
C: make(chan record.Record, i.bufferSize),
id: uuid.NewString(),
componentID: componentID,
logger: i.logger.WithComponent("inspector.Session"),
}

i.add(s)
Expand Down Expand Up @@ -115,7 +101,14 @@ func (i *Inspector) Send(ctx context.Context, r record.Record) {
i.lock.Lock()
defer i.lock.Unlock()
for _, s := range i.sessions {
s.send(ctx, rClone)
select {
case s.C <- rClone:
default:
i.logger.
Warn(ctx).
Str(log.InspectorSessionID, s.id).
Msg("session buffer full, record will be dropped")
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestInspector_Send_SlowConsumer(t *testing.T) {
s := underTest.NewSession(context.Background(), "test-id")

for i := 0; i < bufferSize+1; i++ {
s.send(
underTest.Send(
context.Background(),
record.Record{
Position: record.Position(fmt.Sprintf("test-pos-%v", i)),
Expand Down

0 comments on commit 1c0387f

Please sign in to comment.