Skip to content

Commit

Permalink
Merge pull request #1919 from hashicorp/llb-session-connection-deadlocks
Browse files Browse the repository at this point in the history
chore: Merge session connection deadlock branch to main
  • Loading branch information
irenarindos authored Mar 15, 2022
2 parents ff42d46 + bbc12d4 commit 066e478
Show file tree
Hide file tree
Showing 37 changed files with 2,444 additions and 1,196 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ Canonical reference for changes, improvements, and bugfixes for Boundary.
## Next

### Bug Fixes

* sessions: Sessions and session connections have been refactored
to better isolate transactions and prevent resource contention that caused deadlocks.
([Issue](https://github.com/hashicorp/boundary/issues/1812),
[PR](https://github.com/hashicorp/boundary/pull/1919))
* scheduler: Fix bug that causes erroneous logs when racing controllers
attempted to run jobs
([Issue](https://github.com/hashicorp/boundary/issues/1903),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestServer_ShutdownWorker(t *testing.T) {

// Connection should fail, and the session should be closed on the DB.
sConn.TestSendRecvFail(t)
sess.ExpectConnectionStateOnController(ctx, t, controllerCmd.controller.SessionRepoFn, session.StatusClosed)
sess.ExpectConnectionStateOnController(ctx, t, controllerCmd.controller.ConnectionRepoFn, session.StatusClosed)

// We're done! Shutdown the controller, and that's it.
close(controllerCmd.ShutdownCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ begin;
after insert on session_connection
for each row execute procedure insert_new_connection_state();

-- Replaced in 27/01_disable_terminate_session.up.sql
-- update_connection_state_on_closed_reason() is used in an update trigger on the
-- session_connection table. it will insert a state of "closed" in
-- session_connection_state for the closed session connection.
Expand Down Expand Up @@ -284,6 +285,7 @@ begin;
create trigger insert_session_connection_state before insert on session_connection_state
for each row execute procedure insert_session_connection_state();

-- Removed in 27/01_disable_terminate_session.up.sql
-- terminate_session_if_possible takes a session id and terminates the session
-- if the following conditions are met:
-- * the session is expired and all its connections are closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ begin;
for each row
execute function wh_insert_session_connection();

-- Updated in 27/01_disable_terminate_session.up.sql
-- wh_update_session_connection returns an after update trigger for the
-- session_connection table which updates a row in
-- wh_session_connection_accumulating_fact for the session connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ begin;
alter table wh_session_connection_accumulating_fact
alter column credential_group_key drop default;

-- Updated in 27/01_disable_terminate_session.up.sql
-- replaces function from 15/01_wh_rename_key_columns.up.sql
drop trigger wh_insert_session_connection on session_connection;
drop function wh_insert_session_connection;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
begin;
-- Replaces function from 0/51_connection.up.sql
-- Remove call to terminate_session_if_possible
drop trigger update_connection_state_on_closed_reason on session_connection;
drop function update_connection_state_on_closed_reason();
create function
update_connection_state_on_closed_reason()
returns trigger
as $$
begin
if new.closed_reason is not null then
-- check to see if there's a closed state already, before inserting a new one.
perform from
session_connection_state cs
where
cs.connection_id = new.public_id and
cs.state = 'closed';
if not found then
insert into session_connection_state (connection_id, state)
values
(new.public_id, 'closed');
end if;
end if;
return new;
end;
$$ language plpgsql;

create trigger
update_connection_state_on_closed_reason
after update of closed_reason on session_connection
for each row execute procedure update_connection_state_on_closed_reason();

-- Remove function, defined in 0/51_connection.up.sql
drop function terminate_session_if_possible;

commit;
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
begin;
-- Updating definition from 16/05_wh_credential_dimension.up.sql
-- Remove call to wh_rollup_connections(new.session_id) from function
drop trigger wh_insert_session_connection on session_connection;
drop function wh_insert_session_connection();
create function wh_insert_session_connection()
returns trigger
as $$
declare
new_row wh_session_connection_accumulating_fact%rowtype;
begin
with
authorized_timestamp (date_dim_key, time_dim_key, ts) as (
select wh_date_key(start_time), wh_time_key(start_time), start_time
from session_connection_state
where connection_id = new.public_id
and state = 'authorized'
),
session_dimension (host_dim_key, user_dim_key, credential_group_dim_key) as (
select host_key, user_key, credential_group_key
from wh_session_accumulating_fact
where session_id = new.session_id
)
insert into wh_session_connection_accumulating_fact (
connection_id,
session_id,
host_key,
user_key,
credential_group_key,
connection_authorized_date_key,
connection_authorized_time_key,
connection_authorized_time,
client_tcp_address,
client_tcp_port_number,
endpoint_tcp_address,
endpoint_tcp_port_number,
bytes_up,
bytes_down
)
select new.public_id,
new.session_id,
session_dimension.host_dim_key,
session_dimension.user_dim_key,
session_dimension.credential_group_dim_key,
authorized_timestamp.date_dim_key,
authorized_timestamp.time_dim_key,
authorized_timestamp.ts,
new.client_tcp_address,
new.client_tcp_port,
new.endpoint_tcp_address,
new.endpoint_tcp_port,
new.bytes_up,
new.bytes_down
from authorized_timestamp,
session_dimension
returning * into strict new_row;
return null;
end;
$$ language plpgsql;

create trigger wh_insert_session_connection
after insert on session_connection
for each row
execute function wh_insert_session_connection();

-- Updating definition from 0/69_wh_session_facts.up.sql
-- Remove call to wh_rollup_connections(new.session_id) from function
drop trigger wh_update_session_connection on session_connection;
drop function wh_update_session_connection;
create function wh_update_session_connection()
returns trigger
as $$
declare
updated_row wh_session_connection_accumulating_fact%rowtype;
begin
update wh_session_connection_accumulating_fact
set client_tcp_address = new.client_tcp_address,
client_tcp_port_number = new.client_tcp_port,
endpoint_tcp_address = new.endpoint_tcp_address,
endpoint_tcp_port_number = new.endpoint_tcp_port,
bytes_up = new.bytes_up,
bytes_down = new.bytes_down
where connection_id = new.public_id
returning * into strict updated_row;
return null;
end;
$$ language plpgsql;

create trigger wh_update_session_connection
after update on session_connection
for each row
execute function wh_update_session_connection();

create function
wh_session_rollup()
returns trigger
as $$
begin
if new.termination_reason is not null then
-- Rollup will fail if no connections were made for a session
if exists (select from session_connection where session_id = new.public_id) then
perform wh_rollup_connections(new.public_id);
end if;
end if;
return null;
end;
$$ language plpgsql;

create trigger
wh_rollup_connections_on_session_termination
after update of termination_reason on session
for each row execute procedure wh_session_rollup();

commit;
3 changes: 2 additions & 1 deletion internal/db/schema/migrations/oss/postgres_24_01_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func TestMigrations_SessionState(t *testing.T) {
require.Equal(want, state)

sessionRepo, err := session.NewRepository(rw, rw, kmsCache)
connectionRepo, err := session.NewConnectionRepository(ctx, rw, rw, kmsCache)
require.NoError(err)

// Ensure session is cancelled
Expand All @@ -138,7 +139,7 @@ func TestMigrations_SessionState(t *testing.T) {
require.Equal([]string{"canceled"}, sessionTermReason)

// Ensure connection is also cancelled
connections, err := sessionRepo.ListConnectionsBySessionId(ctx, repoSessionId)
connections, err := connectionRepo.ListConnectionsBySessionId(ctx, repoSessionId)
require.NoError(err)

var connTermReason []string
Expand Down
1 change: 1 addition & 0 deletions internal/servers/controller/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ type (
PluginHostRepoFactory func() (*pluginhost.Repository, error)
HostPluginRepoFactory func() (*hostplugin.Repository, error)
SessionRepoFactory func() (*session.Repository, error)
ConnectionRepoFactory func() (*session.ConnectionRepository, error)
TargetRepoFactory func() (*target.Repository, error)
)
17 changes: 10 additions & 7 deletions internal/servers/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Controller struct {
PasswordAuthRepoFn common.PasswordAuthRepoFactory
ServersRepoFn common.ServersRepoFactory
SessionRepoFn common.SessionRepoFactory
ConnectionRepoFn common.ConnectionRepoFactory
StaticHostRepoFn common.StaticRepoFactory
PluginHostRepoFn common.PluginHostRepoFactory
HostPluginRepoFn common.HostPluginRepoFactory
Expand Down Expand Up @@ -235,7 +236,9 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
c.SessionRepoFn = func() (*session.Repository, error) {
return session.NewRepository(dbase, dbase, c.kms)
}

c.ConnectionRepoFn = func() (*session.ConnectionRepository, error) {
return session.NewConnectionRepository(ctx, dbase, dbase, c.kms)
}
return c, nil
}

Expand Down Expand Up @@ -290,21 +293,21 @@ func (c *Controller) registerJobs() error {
return err
}

if err := c.registerSessionCleanupJob(); err != nil {
if err := c.registerSessionConnectionCleanupJob(); err != nil {
return err
}

return nil
}

// registerSessionCleanupJob is a helper method to abstract
// registering the session cleanup job specifically.
func (c *Controller) registerSessionCleanupJob() error {
sessionCleanupJob, err := newSessionCleanupJob(c.SessionRepoFn, int(c.conf.StatusGracePeriodDuration.Seconds()))
// registerSessionConnectionCleanupJob is a helper method to abstract
// registering the session connection cleanup job specifically.
func (c *Controller) registerSessionConnectionCleanupJob() error {
sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(c.ConnectionRepoFn, c.conf.StatusGracePeriodDuration)
if err != nil {
return fmt.Errorf("error creating session cleanup job: %w", err)
}
if err = c.scheduler.RegisterJob(c.baseContext, sessionCleanupJob); err != nil {
if err = c.scheduler.RegisterJob(c.baseContext, sessionConnectionCleanupJob); err != nil {
return fmt.Errorf("error registering session cleanup job: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2628,6 +2628,9 @@ func TestAuthorizeSession(t *testing.T) {
sessionRepoFn := func() (*session.Repository, error) {
return session.NewRepository(rw, rw, kms)
}
connectionRepoFn := func() (*session.ConnectionRepository, error) {
return session.NewConnectionRepository(ctx, rw, rw, kms)
}
staticHostRepoFn := func() (*static.Repository, error) {
return static.NewRepository(rw, rw, kms)
}
Expand Down Expand Up @@ -2758,7 +2761,7 @@ func TestAuthorizeSession(t *testing.T) {
require.NoError(t, err)

// Tell our DB that there is a worker ready to serve the data
workerService := workers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, &sync.Map{}, kms)
workerService := workers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connectionRepoFn, &sync.Map{}, kms)
_, err = workerService.Status(ctx, &spbs.StatusRequest{
Worker: &spb.Server{
PrivateId: "testworker",
Expand Down Expand Up @@ -2874,6 +2877,9 @@ func TestAuthorizeSessionTypedCredentials(t *testing.T) {
sessionRepoFn := func() (*session.Repository, error) {
return session.NewRepository(rw, rw, kms)
}
connectionRepoFn := func() (*session.ConnectionRepository, error) {
return session.NewConnectionRepository(ctx, rw, rw, kms)
}
staticHostRepoFn := func() (*static.Repository, error) {
return static.NewRepository(rw, rw, kms)
}
Expand Down Expand Up @@ -3037,7 +3043,7 @@ func TestAuthorizeSessionTypedCredentials(t *testing.T) {
require.NoError(t, err)

// Tell our DB that there is a worker ready to serve the data
workerService := workers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, &sync.Map{}, kms)
workerService := workers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connectionRepoFn, &sync.Map{}, kms)
_, err = workerService.Status(ctx, &spbs.StatusRequest{
Worker: &spb.Server{
PrivateId: "testworker",
Expand Down Expand Up @@ -3126,6 +3132,9 @@ func TestAuthorizeSession_Errors(t *testing.T) {
sessionRepoFn := func() (*session.Repository, error) {
return session.NewRepository(rw, rw, kms)
}
connectionRepoFn := func() (*session.ConnectionRepository, error) {
return session.NewConnectionRepository(ctx, rw, rw, kms)
}
staticHostRepoFn := func() (*static.Repository, error) {
return static.NewRepository(rw, rw, kms)
}
Expand Down Expand Up @@ -3165,7 +3174,7 @@ func TestAuthorizeSession_Errors(t *testing.T) {
store := vault.TestCredentialStore(t, conn, wrapper, proj.GetPublicId(), v.Addr, tok, sec.Auth.Accessor)

workerExists := func(tar target.Target) (version uint32) {
workerService := workers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, &sync.Map{}, kms)
workerService := workers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connectionRepoFn, &sync.Map{}, kms)
_, err := workerService.Status(context.Background(), &spbs.StatusRequest{
Worker: &spb.Server{
PrivateId: "testworker",
Expand Down
Loading

0 comments on commit 066e478

Please sign in to comment.