Skip to content

Commit 30b6dba

Browse files
committed
Remove concurrencyhandler package and update httpclient_client.go
1 parent 0c066b7 commit 30b6dba

8 files changed

+176
-206
lines changed

concurrenyhandler/dynamic_token_adjustment.go renamed to concurrency/dynamic_token_adjustment.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
// concurrencyhandler/dynamic_token_adjustment.go
1+
// concurrency/dynamic_token_adjustment.go
22

3-
package concurrencyhandler
3+
package concurrency
44

55
import (
66
"time"
@@ -87,9 +87,9 @@ func (ch *ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics() {
8787
// the concurrency limit is decreased, and vice versa. The method ensures that the concurrency
8888
// limit remains within the bounds defined by the system's best practices.
8989
func (ch *ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency() {
90-
ch.PerfMetrics.lock.Lock()
91-
averageResponseTime := ch.PerfMetrics.TotalResponseTime / time.Duration(ch.PerfMetrics.TotalRequests)
92-
ch.PerfMetrics.lock.Unlock()
90+
ch.Metrics.Lock.Lock()
91+
averageResponseTime := ch.Metrics.TotalResponseTime / time.Duration(ch.Metrics.TotalRequests)
92+
ch.Metrics.Lock.Unlock()
9393

9494
historicalAverageAcquisitionTime := ch.HistoricalAverageAcquisitionTime()
9595

@@ -194,6 +194,6 @@ func (ch *ConcurrencyHandler) GetHistoricalAverageAcquisitionTime() time.Duratio
194194
// GetPerformanceMetrics returns the current performance metrics of the ConcurrencyHandler.
195195
// This includes counts of total requests, retries, rate limit errors, total response time,
196196
// and token wait time.
197-
func (ch *ConcurrencyHandler) GetPerformanceMetrics() *PerformanceMetrics {
198-
return ch.PerfMetrics
197+
func (ch *ConcurrencyHandler) GetPerformanceMetrics() *ConcurrencyMetrics {
198+
return ch.Metrics
199199
}

concurrenyhandler/handler.go renamed to concurrency/handler.go

+12-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
// concurrencyhandler/handler.go
2-
package concurrencyhandler
1+
// concurrency/handler.go
2+
package concurrency
33

44
import (
55
"sync"
@@ -22,38 +22,37 @@ type ConcurrencyHandler struct {
2222
AcquisitionTimes []time.Duration
2323
lock sync.Mutex
2424
lastTokenAcquisitionTime time.Time
25-
PerfMetrics *PerformanceMetrics
25+
Metrics *ConcurrencyMetrics
2626
}
2727

28-
// PerformanceMetrics captures various metrics related to the client's
29-
// interactions with the API.
30-
type PerformanceMetrics struct {
28+
// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
29+
type ConcurrencyMetrics struct {
3130
TotalRequests int64
3231
TotalRetries int64
3332
TotalRateLimitErrors int64
3433
TotalResponseTime time.Duration
3534
TokenWaitTime time.Duration
36-
lock sync.Mutex // Protects performance metrics fields
35+
Lock sync.Mutex // Protects performance metrics fields
3736
}
3837

39-
// NewConcurrencyManager initializes a new ConcurrencyManager with the given
40-
// concurrency limit, logger, and perf metrics. The ConcurrencyManager ensures
38+
// NewConcurrencyHandler initializes a new ConcurrencyHandler with the given
39+
// concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures
4140
// no more than a certain number of concurrent requests are made.
4241
// It uses a semaphore to control concurrency.
43-
func NewConcurrencyHandler(limit int, logger logger.Logger, perfMetrics *PerformanceMetrics) *ConcurrencyHandler {
42+
func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
4443
return &ConcurrencyHandler{
4544
sem: make(chan struct{}, limit),
4645
logger: logger,
4746
AcquisitionTimes: []time.Duration{},
48-
PerfMetrics: perfMetrics,
47+
Metrics: metrics,
4948
}
5049
}
5150

52-
// requestIDKey is type used as a key for storing and retrieving
51+
// RequestIDKey is type used as a key for storing and retrieving
5352
// request-specific identifiers from a context.Context object. This private
5453
// type ensures that the key is distinct and prevents accidental value
5554
// retrieval or conflicts with other context keys. The value associated
5655
// with this key in a context is typically a UUID that uniquely identifies
5756
// a request being processed by the ConcurrencyManager, allowing for
5857
// fine-grained control and tracking of concurrent HTTP requests.
59-
type requestIDKey struct{}
58+
type RequestIDKey struct{}

concurrenyhandler/metrics.go renamed to concurrency/metrics.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
// concurrencyhandler/metrics.go
2-
package concurrencyhandler
1+
// concurrency/metrics.go
2+
package concurrency
33

44
import "time"
55

@@ -54,10 +54,10 @@ func (ch *ConcurrencyHandler) HistoricalAverageAcquisitionTime() time.Duration {
5454
// This function should be called after each HTTP request to keep track of the
5555
// ConcurrencyHandler's performance over time.
5656
func (ch *ConcurrencyHandler) UpdatePerformanceMetrics(duration time.Duration) {
57-
ch.PerfMetrics.lock.Lock()
58-
defer ch.PerfMetrics.lock.Unlock()
59-
ch.PerfMetrics.TotalResponseTime += duration
60-
ch.PerfMetrics.TotalRequests++
57+
ch.Metrics.Lock.Lock()
58+
defer ch.Metrics.Lock.Unlock()
59+
ch.Metrics.TotalResponseTime += duration
60+
ch.Metrics.TotalRequests++
6161
}
6262

6363
// Min returns the smaller of the two integers.

concurrency/semaphore.go

+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// concurrency/semaphore.go
2+
/* package provides utilities to manage concurrency control. The Concurrency Manager
3+
ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro)
4+
are sent at the same time. This is managed using a semaphore */
5+
package concurrency
6+
7+
import (
8+
"context"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"go.uber.org/zap"
13+
)
14+
15+
// AcquireConcurrencyToken acquires a concurrency token to regulate the number of concurrent
16+
// operations within predefined limits, ensuring system stability and adherence to concurrency policies.
17+
// This function initiates a token acquisition process that involves generating a unique request ID
18+
// for tracking purposes and attempting to acquire a token within a specified timeout to prevent
19+
// indefinite blocking. Successful acquisition updates performance metrics and associates the
20+
// unique request ID with the provided context for enhanced traceability and management of
21+
// concurrent requests.
22+
//
23+
// Parameters:
24+
// - ctx: A parent context used as the basis for the token acquisition attempt, facilitating
25+
// appropriate cancellation and timeout handling in line with best practices for concurrency control.
26+
//
27+
// Returns:
28+
// - context.Context: A derived context that includes the unique request ID, offering a mechanism
29+
// for associating subsequent operations with the acquired concurrency token and facilitating
30+
// effective request tracking and management.
31+
// - uuid.UUID: The unique request ID generated as part of the token acquisition process, serving
32+
// as an identifier for the acquired token and enabling detailed tracking and auditing of
33+
// concurrent operations.
34+
// - error: An error that signals failure to acquire a concurrency token within the allotted timeout,
35+
// or due to other encountered issues, ensuring that potential problems in concurrency control
36+
// are surfaced and can be addressed.
37+
//
38+
// Usage:
39+
// This function is a critical component of concurrency control and should be invoked prior to
40+
// executing operations that require regulation of concurrency. The returned context, enhanced
41+
// with the unique request ID, should be utilized in the execution of these operations to maintain
42+
// consistency in tracking and managing concurrency tokens. The unique request ID also facilitates
43+
// detailed auditing and troubleshooting of the concurrency control mechanism.
44+
//
45+
// Example:
46+
// ctx, requestID, err := concurrencyHandler.AcquireConcurrencyToken(context.Background())
47+
//
48+
// if err != nil {
49+
// // Handle token acquisition failure
50+
// }
51+
//
52+
// defer concurrencyHandler.Release(requestID)
53+
// // Proceed with the operation using the modified context
54+
func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, uuid.UUID, error) {
55+
log := ch.logger
56+
57+
// Measure the token acquisition start time
58+
tokenAcquisitionStart := time.Now()
59+
60+
// Generate a unique request ID for this acquisition
61+
requestID := uuid.New()
62+
63+
// Create a new context with a timeout for acquiring the concurrency token
64+
ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
65+
defer cancel()
66+
67+
// Attempt to acquire a token from the semaphore within the given context timeout
68+
select {
69+
case ch.sem <- struct{}{}: // Successfully acquired a token
70+
// Calculate the duration it took to acquire the token and record it
71+
tokenAcquisitionDuration := time.Since(tokenAcquisitionStart)
72+
ch.lock.Lock()
73+
ch.AcquisitionTimes = append(ch.AcquisitionTimes, tokenAcquisitionDuration)
74+
ch.Metrics.Lock.Lock()
75+
ch.Metrics.TokenWaitTime += tokenAcquisitionDuration
76+
ch.Metrics.TotalRequests++ // Increment total requests count
77+
ch.Metrics.Lock.Unlock()
78+
ch.lock.Unlock()
79+
80+
// Logging the acquisition
81+
utilizedTokens := len(ch.sem)
82+
availableTokens := cap(ch.sem) - utilizedTokens
83+
log.Debug("Acquired concurrency token", zap.String("RequestID", requestID.String()), zap.Duration("AcquisitionTime", tokenAcquisitionDuration), zap.Int("UtilizedTokens", utilizedTokens), zap.Int("AvailableTokens", availableTokens))
84+
85+
// Add the acquired request ID to the context for use in subsequent operations
86+
ctxWithRequestID := context.WithValue(ctx, RequestIDKey{}, requestID)
87+
88+
// Return the updated context, the request ID, and nil error to indicate success
89+
return ctxWithRequestID, requestID, nil
90+
91+
case <-ctxWithTimeout.Done(): // Failed to acquire a token within the timeout
92+
log.Error("Failed to acquire concurrency token", zap.Error(ctxWithTimeout.Err()))
93+
return ctx, requestID, ctxWithTimeout.Err()
94+
}
95+
}
96+
97+
// ReleaseConcurrencyToken returns a token back to the semaphore pool, allowing other
98+
// operations to proceed. It uses the provided requestID for structured logging,
99+
// aiding in tracking and debugging the release of concurrency tokens.
100+
func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID) {
101+
<-ch.sem // Release a token back to the semaphore
102+
103+
ch.lock.Lock()
104+
defer ch.lock.Unlock()
105+
106+
// Update the list of acquisition times by removing the time related to the released token
107+
// This step is optional and depends on whether you track acquisition times per token or not
108+
109+
utilizedTokens := len(ch.sem) // Tokens currently in use
110+
availableTokens := cap(ch.sem) - utilizedTokens // Tokens available for use
111+
112+
// Log the release of the concurrency token for auditing and debugging purposes
113+
ch.logger.Debug("Released concurrency token",
114+
zap.String("RequestID", requestID.String()),
115+
zap.Int("UtilizedTokens", utilizedTokens),
116+
zap.Int("AvailableTokens", availableTokens),
117+
)
118+
}

concurrenyhandler/semaphore.go

-124
This file was deleted.

0 commit comments

Comments
 (0)