Skip to content

Commit

Permalink
Prevent usage of failed connections and improve connection management (
Browse files Browse the repository at this point in the history
  • Loading branch information
adamyeats authored Feb 11, 2025
1 parent d26f1cd commit 28f86d0
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 81 deletions.
66 changes: 42 additions & 24 deletions pkg/plugin/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func getPDCDialContext(settings Settings) (func(context.Context, string) (net.Co
return nil, errors.New("unable to cast SOCKS proxy dialer to context proxy dialer")
}

// Return a function matching the expected signature
return func(ctx context.Context, addr string) (net.Conn, error) {
return contextDialer.DialContext(ctx, "tcp", addr)
}, nil
Expand Down Expand Up @@ -180,30 +179,26 @@ func (h *Clickhouse) Connect(ctx context.Context, config backend.DataSourceInsta
httpHeaders[k] = v
}

timeout := time.Duration(t) * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

opts := &clickhouse.Options{
ClientInfo: clickhouse.ClientInfo{
Products: getClientInfoProducts(ctx),
},
TLS: tlsConfig,
Addr: []string{fmt.Sprintf("%s:%d", settings.Host, settings.Port)},
HttpUrlPath: settings.Path,
HttpHeaders: httpHeaders,
Addr: []string{fmt.Sprintf("%s:%d", settings.Host, settings.Port)},
Auth: clickhouse.Auth{
Username: settings.Username,
Password: settings.Password,
Database: settings.DefaultDatabase,
Password: settings.Password,
Username: settings.Username,
},
ClientInfo: clickhouse.ClientInfo{
Products: getClientInfoProducts(ctx),
},
Compression: &clickhouse.Compression{
Method: compression,
},
DialTimeout: time.Duration(t) * time.Second,
ReadTimeout: time.Duration(qt) * time.Second,
HttpHeaders: httpHeaders,
HttpUrlPath: settings.Path,
Protocol: protocol,
ReadTimeout: time.Duration(qt) * time.Second,
Settings: customSettings,
TLS: tlsConfig,
}

// dialCtx is used to create a connection to PDC, if it is enabled
Expand All @@ -215,21 +210,44 @@ func (h *Clickhouse) Connect(ctx context.Context, config backend.DataSourceInsta
opts.DialContext = dialCtx
}

ctx, cancel := context.WithTimeout(ctx, time.Duration(t)*time.Second)
defer cancel()

db := clickhouse.OpenDB(opts)

// Set connection pool settings
if i, err := strconv.Atoi(settings.ConnMaxLifetime); err == nil {
db.SetConnMaxLifetime(time.Duration(i) * time.Minute)
}
if i, err := strconv.Atoi(settings.MaxIdleConns); err == nil {
db.SetMaxIdleConns(i)
}
if i, err := strconv.Atoi(settings.MaxOpenConns); err == nil {
db.SetMaxOpenConns(i)
}

select {
case <-ctx.Done():
return db, fmt.Errorf("the operation was cancelled: %w", ctx.Err())
return nil, fmt.Errorf("the operation was cancelled before starting: %w", ctx.Err())
default:
err := db.PingContext(ctx)
if err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
log.DefaultLogger.Error("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
log.DefaultLogger.Error(err.Error())
}
return db, nil
// proceed
}

// `sqlds` normally calls `db.PingContext()` to check if the connection is alive,
// however, as ClickHouse returns its own non-standard `Exception` type, we need
// to handle it here so that we can log the error code, message and stack trace
if err := db.PingContext(ctx); err != nil {
if ctx.Err() != nil {
return nil, fmt.Errorf("the operation was cancelled during execution: %w", ctx.Err())
}

if exception, ok := err.(*clickhouse.Exception); ok {
log.DefaultLogger.Error("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
log.DefaultLogger.Error(err.Error())
}

return nil, err
}

return db, settings.isValid()
Expand Down
3 changes: 1 addition & 2 deletions pkg/plugin/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,11 @@ func TestHTTPConnect(t *testing.T) {
username := getEnv("CLICKHOUSE_USERNAME", "default")
password := getEnv("CLICKHOUSE_PASSWORD", "")
ssl := getEnv("CLICKHOUSE_SSL", "false")
path := "custom-path"
clickhouse := plugin.Clickhouse{}
t.Run("should not error when valid settings passed", func(t *testing.T) {
secure := map[string]string{}
secure["password"] = password
settings := backend.DataSourceInstanceSettings{JSONData: []byte(fmt.Sprintf(`{ "server": "%s", "port": %s, "path": "%s", "username": "%s", "secure": %s, "protocol": "http" }`, host, port, path, username, ssl)), DecryptedSecureJSONData: secure}
settings := backend.DataSourceInstanceSettings{JSONData: []byte(fmt.Sprintf(`{ "server": "%s", "port": %s, "username": "%s", "secure": %s, "protocol": "http" }`, host, port, username, ssl)), DecryptedSecureJSONData: secure}
_, err := clickhouse.Connect(context.Background(), settings, json.RawMessage{})
assert.Equal(t, nil, err)
})
Expand Down
19 changes: 17 additions & 2 deletions pkg/plugin/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ type Settings struct {

DefaultDatabase string `json:"defaultDatabase,omitempty"`

DialTimeout string `json:"dialTimeout,omitempty"`
QueryTimeout string `json:"queryTimeout,omitempty"`
ConnMaxLifetime string `json:"connMaxLifetime,omitempty"`
DialTimeout string `json:"dialTimeout,omitempty"`
QueryTimeout string `json:"queryTimeout,omitempty"`
MaxIdleConns string `json:"maxIdleConns,omitempty"`
MaxOpenConns string `json:"maxOpenConns,omitempty"`

HttpHeaders map[string]string `json:"-"`
ForwardGrafanaHeaders bool `json:"forwardGrafanaHeaders,omitempty"`
Expand Down Expand Up @@ -181,12 +184,24 @@ func LoadSettings(ctx context.Context, config backend.DataSourceInstanceSettings
}
}

// Set default values
if strings.TrimSpace(settings.DialTimeout) == "" {
settings.DialTimeout = "10"
}
if strings.TrimSpace(settings.QueryTimeout) == "" {
settings.QueryTimeout = "60"
}
if strings.TrimSpace(settings.ConnMaxLifetime) == "" {
settings.ConnMaxLifetime = "5"
}
if strings.TrimSpace(settings.MaxIdleConns) == "" {
settings.MaxIdleConns = "25"
}
if strings.TrimSpace(settings.MaxOpenConns) == "" {
settings.MaxOpenConns = "50"
}

// Load secure settings
password, ok := config.DecryptedSecureJSONData["password"]
if ok {
settings.Password = password
Expand Down
17 changes: 13 additions & 4 deletions pkg/plugin/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ func TestLoadSettings(t *testing.T) {
TlsCACert: "caCert",
TlsClientCert: "clientCert",
TlsClientKey: "clientKey",
ConnMaxLifetime: "5",
DialTimeout: "10",
MaxIdleConns: "25",
MaxOpenConns: "50",
QueryTimeout: "60",
HttpHeaders: map[string]string{
"test-plain-1": "value-1",
Expand Down Expand Up @@ -100,7 +103,10 @@ func TestLoadSettings(t *testing.T) {
InsecureSkipVerify: true,
TlsClientAuth: true,
TlsAuthWithCACert: true,
ConnMaxLifetime: "5",
DialTimeout: "10",
MaxIdleConns: "25",
MaxOpenConns: "50",
QueryTimeout: "60",
ProxyOptions: nil,
},
Expand All @@ -115,10 +121,13 @@ func TestLoadSettings(t *testing.T) {
},
},
wantSettings: Settings{
Host: "test",
Port: 443,
DialTimeout: "10",
QueryTimeout: "60",
Host: "test",
Port: 443,
ConnMaxLifetime: "5",
DialTimeout: "10",
MaxIdleConns: "25",
MaxOpenConns: "50",
QueryTimeout: "60",
},
wantErr: nil,
},
Expand Down
99 changes: 90 additions & 9 deletions src/components/configEditor/QuerySettingsConfig.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ describe('QuerySettingsConfig', () => {
it('should render', () => {
const result = render(
<QuerySettingsConfig
dialTimeout='10'
queryTimeout='10'
validateSql
connMaxLifetime={'5'}
dialTimeout={'5'}
maxIdleConns={'5'}
maxOpenConns={'5'}
queryTimeout={'5'}
validateSql={true}
onConnMaxIdleConnsChange={() => {}}
onConnMaxLifetimeChange={() => {}}
onConnMaxOpenConnsChange={() => {}}
onDialTimeoutChange={() => {}}
onQueryTimeoutChange={() => {}}
onValidateSqlChange={() => {}}
Expand All @@ -22,6 +28,9 @@ describe('QuerySettingsConfig', () => {
const onDialTimeout = jest.fn();
const result = render(
<QuerySettingsConfig
onConnMaxIdleConnsChange={() => {}}
onConnMaxLifetimeChange={() => {}}
onConnMaxOpenConnsChange={() => {}}
onDialTimeoutChange={onDialTimeout}
onQueryTimeoutChange={() => {}}
onValidateSqlChange={() => {}}
Expand All @@ -33,14 +42,17 @@ describe('QuerySettingsConfig', () => {
expect(input).toBeInTheDocument();
fireEvent.change(input, { target: { value: '10' } });
fireEvent.blur(input);
expect(onDialTimeout).toBeCalledTimes(1);
expect(onDialTimeout).toBeCalledWith(expect.any(Object));
expect(onDialTimeout).toHaveBeenCalledTimes(1);
expect(onDialTimeout).toHaveBeenCalledWith(expect.any(Object));
});

it('should call onQueryTimeout when changed', () => {
const onQueryTimeout = jest.fn();
const result = render(
<QuerySettingsConfig
onConnMaxIdleConnsChange={() => {}}
onConnMaxLifetimeChange={() => {}}
onConnMaxOpenConnsChange={() => {}}
onDialTimeoutChange={() => {}}
onQueryTimeoutChange={onQueryTimeout}
onValidateSqlChange={() => {}}
Expand All @@ -52,14 +64,17 @@ describe('QuerySettingsConfig', () => {
expect(input).toBeInTheDocument();
fireEvent.change(input, { target: { value: '10' } });
fireEvent.blur(input);
expect(onQueryTimeout).toBeCalledTimes(1);
expect(onQueryTimeout).toBeCalledWith(expect.any(Object));
expect(onQueryTimeout).toHaveBeenCalledTimes(1);
expect(onQueryTimeout).toHaveBeenCalledWith(expect.any(Object));
});

it('should call onValidateSqlChange when changed', () => {
const onValidateSqlChange = jest.fn();
const result = render(
<QuerySettingsConfig
onConnMaxIdleConnsChange={() => {}}
onConnMaxLifetimeChange={() => {}}
onConnMaxOpenConnsChange={() => {}}
onDialTimeoutChange={() => {}}
onQueryTimeoutChange={() => {}}
onValidateSqlChange={onValidateSqlChange}
Expand All @@ -70,7 +85,73 @@ describe('QuerySettingsConfig', () => {
const input = result.getByRole('checkbox');
expect(input).toBeInTheDocument();
fireEvent.click(input);
expect(onValidateSqlChange).toBeCalledTimes(1);
expect(onValidateSqlChange).toBeCalledWith(expect.any(Object));
expect(onValidateSqlChange).toHaveBeenCalledTimes(1);
expect(onValidateSqlChange).toHaveBeenCalledWith(expect.any(Object));
});

it('should call onConnMaxIdleConnsChange when changed', () => {
const onConnMaxIdleConnsChange = jest.fn();
const result = render(
<QuerySettingsConfig
onConnMaxIdleConnsChange={onConnMaxIdleConnsChange}
onConnMaxLifetimeChange={() => {}}
onConnMaxOpenConnsChange={() => {}}
onDialTimeoutChange={() => {}}
onQueryTimeoutChange={() => {}}
onValidateSqlChange={() => {}}
/>
);
expect(result.container.firstChild).not.toBeNull();

const input = result.getByPlaceholderText(allLabels.components.Config.QuerySettingsConfig.maxIdleConns.placeholder);
expect(input).toBeInTheDocument();
fireEvent.change(input, { target: { value: '10' } });
fireEvent.blur(input);
expect(onConnMaxIdleConnsChange).toHaveBeenCalledTimes(1);
expect(onConnMaxIdleConnsChange).toHaveBeenCalledWith(expect.any(Object));
});

it('should call onConnMaxLifetimeChange when changed', () => {
const onConnMaxLifetimeChange = jest.fn();
const result = render(
<QuerySettingsConfig
onConnMaxIdleConnsChange={() => {}}
onConnMaxLifetimeChange={onConnMaxLifetimeChange}
onConnMaxOpenConnsChange={() => {}}
onDialTimeoutChange={() => {}}
onQueryTimeoutChange={() => {}}
onValidateSqlChange={() => {}}
/>
);
expect(result.container.firstChild).not.toBeNull();

const input = result.getByPlaceholderText(allLabels.components.Config.QuerySettingsConfig.connMaxLifetime.placeholder);
expect(input).toBeInTheDocument();
fireEvent.change(input, { target: { value: '10' } });
fireEvent.blur(input);
expect(onConnMaxLifetimeChange).toHaveBeenCalledTimes(1);
expect(onConnMaxLifetimeChange).toHaveBeenCalledWith(expect.any(Object));
});

it('should call onConnMaxOpenConnsChange when changed', () => {
const onConnMaxOpenConnsChange = jest.fn();
const result = render(
<QuerySettingsConfig
onConnMaxIdleConnsChange={() => {}}
onConnMaxLifetimeChange={() => {}}
onConnMaxOpenConnsChange={onConnMaxOpenConnsChange}
onDialTimeoutChange={() => {}}
onQueryTimeoutChange={() => {}}
onValidateSqlChange={() => {}}
/>
);
expect(result.container.firstChild).not.toBeNull();

const input = result.getByPlaceholderText(allLabels.components.Config.QuerySettingsConfig.maxOpenConns.placeholder);
expect(input).toBeInTheDocument();
fireEvent.change(input, { target: { value: '10' } });
fireEvent.blur(input);
expect(onConnMaxOpenConnsChange).toHaveBeenCalledTimes(1);
expect(onConnMaxOpenConnsChange).toHaveBeenCalledWith(expect.any(Object));
});
});
Loading

0 comments on commit 28f86d0

Please sign in to comment.