Skip to content

Commit

Permalink
added timeout blacklist to dean
Browse files Browse the repository at this point in the history
  • Loading branch information
nandanrao committed Aug 4, 2022
1 parent 800461b commit f93c431
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 68 deletions.
1 change: 1 addition & 0 deletions dean/dean.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Config struct {
Botserver string `env:"BOTSERVER_URL,required"`
Codes []string `env:"DEAN_FB_CODES,required" envSeparator:","`
ErrorTags []string `env:"DEAN_ERROR_TAGS,required" envSeparator:","`
TimeoutBlacklist []string `env:"DEAN_TIMEOUT_BLACKLIST,required" envSeparator:","`
ErrorInterval string `env:"DEAN_ERROR_INTERVAL,required"`
BlockedInterval string `env:"DEAN_BLOCKED_INTERVAL,required"`
RespondingInterval string `env:"DEAN_RESPONDING_INTERVAL,required"`
Expand Down
3 changes: 0 additions & 3 deletions dean/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod
github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY=
github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgproto3/v2 v2.0.4 h1:RHkX5ZUD9bl/kn0f9dYUWs1N7Nwvo1wwUYvKiR26Zco=
github.com/jackc/pgproto3/v2 v2.0.4/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
Expand Down Expand Up @@ -126,7 +125,6 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down Expand Up @@ -161,7 +159,6 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
9 changes: 7 additions & 2 deletions dean/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,14 @@ func Timeouts(cfg *Config, conn *pgxpool.Pool) <-chan *ExternalEvent {
FROM states
WHERE
current_state = 'WAIT_EXTERNAL_EVENT' AND
timeout_date < $1`

timeout_date < $1 `
d := time.Now().UTC()

if len(cfg.TimeoutBlacklist) > 0 {
query += `AND current_form != ANY($2)`
return get(conn, getTimeout, query, d, cfg.TimeoutBlacklist)
}

return get(conn, getTimeout, query, d)
}

Expand Down
121 changes: 59 additions & 62 deletions dean/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,13 @@ import (
)

const (
surveySql = `drop table if exists surveys;
create table if not exists surveys(
userid VARCHAR NOT NULL,
shortcode VARCHAR NOT NULL,
messages_json JSON,
created TIMESTAMPTZ NOT NULL,
has_followup BOOL AS (messages_json->>'label.buttonHint.default' IS NOT NULL) STORED
);
drop table if exists credentials;
create table if not exists credentials(
userid VARCHAR NOT NULL,
entity VARCHAR NOT NULL,
key VARCHAR NOT NULL UNIQUE,
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
details JSONB NOT NULL,
facebook_page_id VARCHAR AS (CASE WHEN entity = 'facebook_page' THEN details->>'id' ELSE NULL END) STORED,
UNIQUE(entity, key),
UNIQUE(facebook_page_id)
);
`

pageInsertSql = `INSERT INTO credentials(entity, key, userid, details) VALUES ('facebook_page', ($1)->>'id', 'owner', $1)`
surveyInsertSql = `INSERT INTO surveys(userid, shortcode, created, messages_json) VALUES ('owner', $1, $2, $3);`

stateSql = `drop table if exists states;
create table if not exists states(
userid VARCHAR NOT NULL,
pageid VARCHAR NOT NULL NOT NULL,
updated TIMESTAMPTZ NOT NULL,
current_state VARCHAR NOT NULL,
state_json JSON NOT NULL,
current_form VARCHAR AS (state_json->'forms'->>-1) STORED,
form_start_time TIMESTAMPTZ AS (CEILING((state_json->'md'->>'startTime')::INT/1000)::INT::TIMESTAMPTZ) STORED,
previous_with_token BOOL AS (state_json->'previousOutput'->>'token' IS NOT NULL) STORED,
previous_is_followup BOOL AS (state_json->'previousOutput'->>'followUp' IS NOT NULL) STORED,
timeout_date TIMESTAMPTZ AS (CASE
WHEN state_json->'wait'->>'type' = 'timeout' THEN (CEILING((state_json->>'waitStart')::INT/1000)::INT::TIMESTAMPTZ + (state_json->'wait'->>'value')::INTERVAL)
ELSE NULL
END) STORED,
fb_error_code varchar AS (state_json->'error'->>'code') STORED,
error_tag VARCHAR AS (state_json->'error'->>'tag') STORED,
next_retry TIMESTAMP AS ((FLOOR((POWER(2, JSON_ARRAY_LENGTH(state_json->'retries'))*60000 + (state_json->'retries'->>-1)::INT)::INT)/1000)::INT::TIMESTAMP) STORED,
CONSTRAINT "valid_state_json" CHECK (state_json ? 'state'),
PRIMARY KEY (userid, pageid)
);`
insertUserSql = `
INSERT INTO users(id, email)
VALUES ('e49cbb6b-45e1-4b9d-9516-094c63cc6ca2', '[email protected]');
`
pageInsertSql = `INSERT INTO credentials(entity, key, userid, details) VALUES ('facebook_page', ($1)->>'id', 'e49cbb6b-45e1-4b9d-9516-094c63cc6ca2', $1)`

surveyInsertSql = `INSERT INTO surveys(userid, formid, form, title, shortcode, created, messages) VALUES ('e49cbb6b-45e1-4b9d-9516-094c63cc6ca2', 'formid', '{}', 'title', $1, $2, $3);`

insertQuery = `INSERT INTO
states(userid, pageid, updated, current_state, state_json)
Expand Down Expand Up @@ -85,8 +46,8 @@ func makeStateJson(startTime time.Time, form, previousOutput string) string {
func TestGetRespondingsGetsOnlyThoseInGivenInterval(t *testing.T) {
pool := testPool()
defer pool.Close()
before(pool)

mustExec(t, pool, stateSql)
mustExec(t, pool, insertQuery,
"foo",
"bar",
Expand All @@ -108,14 +69,13 @@ func TestGetRespondingsGetsOnlyThoseInGivenInterval(t *testing.T) {
assert.Equal(t, 1, len(events))
assert.Equal(t, "foo", events[0].User)

mustExec(t, pool, "drop table states")
}

func TestGetRespondingsOnlyGetsThoseOutsideOfGracePeriod(t *testing.T) {
pool := testPool()
defer pool.Close()
before(pool)

mustExec(t, pool, stateSql)
mustExec(t, pool, insertQuery,
"foo",
"bar",
Expand All @@ -137,14 +97,13 @@ func TestGetRespondingsOnlyGetsThoseOutsideOfGracePeriod(t *testing.T) {
assert.Equal(t, 1, len(events))
assert.Equal(t, "baz", events[0].User)

mustExec(t, pool, "drop table states")
}

func TestGetBlockedOnlyGetsThoseWithCodesInsideWindow(t *testing.T) {
pool := testPool()
defer pool.Close()
before(pool)

mustExec(t, pool, stateSql)
mustExec(t, pool, insertQuery,
"foo",
"bar",
Expand All @@ -171,14 +130,13 @@ func TestGetBlockedOnlyGetsThoseWithCodesInsideWindow(t *testing.T) {
assert.Equal(t, 1, len(events))
assert.Equal(t, "foo", events[0].User)

mustExec(t, pool, "drop table states")
}

func TestGetBlockedOnlyGetsThoseWithNextRetryPassed(t *testing.T) {
pool := testPool()
defer pool.Close()
before(pool)

mustExec(t, pool, stateSql)
mustExec(t, pool, insertQuery,
"foo",
"bar",
Expand Down Expand Up @@ -209,14 +167,13 @@ func TestGetBlockedOnlyGetsThoseWithNextRetryPassed(t *testing.T) {
assert.Equal(t, 1, len(events))
assert.Equal(t, "foo", events[0].User)

mustExec(t, pool, "drop table states")
}

func TestGetErroredGetsByTag(t *testing.T) {
pool := testPool()
defer pool.Close()
before(pool)

mustExec(t, pool, stateSql)
mustExec(t, pool, insertQuery,
"foo",
"bar",
Expand All @@ -237,23 +194,23 @@ func TestGetErroredGetsByTag(t *testing.T) {
assert.Equal(t, 1, len(events))
assert.Equal(t, "foo", events[0].User)

mustExec(t, pool, "drop table states")
}

func TestGetTimeoutsGetsOnlyExpiredTimeouts(t *testing.T) {
pool := testPool()
defer pool.Close()
before(pool)

ts := time.Now().UTC().Add(-30 * time.Minute)
ms := ts.Unix() * 1000

mustExec(t, pool, stateSql)
mustExec(t, pool, insertQuery,
"foo",
"bar",
ts,
"WAIT_EXTERNAL_EVENT",
fmt.Sprintf(`{"state": "WAIT_EXTERNAL_EVENT",
"forms": ["short1", "short2"],
"waitStart": %v,
"wait": { "type": "timeout", "value": "20 minutes"}}`, ms))

Expand All @@ -263,6 +220,7 @@ func TestGetTimeoutsGetsOnlyExpiredTimeouts(t *testing.T) {
ts,
"WAIT_EXTERNAL_EVENT",
fmt.Sprintf(`{"state": "WAIT_EXTERNAL_EVENT",
"forms": ["short1", "short2"],
"waitStart": %v,
"wait": { "type": "timeout", "value": "40 minutes"}}`, ms))

Expand All @@ -275,22 +233,62 @@ func TestGetTimeoutsGetsOnlyExpiredTimeouts(t *testing.T) {

assert.Equal(t, "timeout", events[0].Event.Type)

ev, _ := json.Marshal(events[0].Event)
assert.Equal(t, string(ev), fmt.Sprintf(`{"type":"timeout","value":%v}`, ms))
}

func TestGetTimeoutsIgnoresBlacklistShortcodes(t *testing.T) {
pool := testPool()
defer pool.Close()
before(pool)

ts := time.Now().UTC().Add(-30 * time.Minute)
ms := ts.Unix() * 1000

mustExec(t, pool, insertQuery,
"foo",
"bar",
ts,
"WAIT_EXTERNAL_EVENT",
fmt.Sprintf(`{"state": "WAIT_EXTERNAL_EVENT",
"forms": ["short1", "short2"],
"waitStart": %v,
"wait": { "type": "timeout", "value": "20 minutes"}}`, ms))

mustExec(t, pool, insertQuery,
"baz",
"bar",
ts,
"WAIT_EXTERNAL_EVENT",
fmt.Sprintf(`{"state": "WAIT_EXTERNAL_EVENT",
"forms": ["short1", "short3"],
"waitStart": %v,
"wait": { "type": "timeout", "value": "20 minutes"}}`, ms))

cfg := &Config{TimeoutBlacklist: []string{"short3"}}
ch := Timeouts(cfg, pool)
events := getEvents(ch)

assert.Equal(t, 1, len(events))
assert.Equal(t, "foo", events[0].User)

assert.Equal(t, "timeout", events[0].Event.Type)

ev, _ := json.Marshal(events[0].Event)
assert.Equal(t, string(ev), fmt.Sprintf(`{"type":"timeout","value":%v}`, ms))

mustExec(t, pool, "drop table states")
}

func TestFollowUpsGetsOnlyThoseBetweenMinAndMaxAndIgnoresAllSortsOfThings(t *testing.T) {
pool := testPool()
defer pool.Close()
before(pool)

mustExec(t, pool, stateSql)
mustExec(t, pool, surveySql)

mustExec(t, pool, insertUserSql)
mustExec(t, pool, pageInsertSql, `{"id": "bar"}`)
mustExec(t, pool, pageInsertSql, `{"id": "qux"}`)
mustExec(t, pool, pageInsertSql, `{"id": "quux"}`)

mustExec(t, pool, surveyInsertSql, "with_followup", time.Now().UTC().Add(-50*time.Hour), `{"label.buttonHint.default": "this is follow up"}`)
mustExec(t, pool, surveyInsertSql, "with_followup", time.Now().UTC().Add(-40*time.Hour), `{"label.buttonHint.default": "this is follow up"}`)
mustExec(t, pool, surveyInsertSql, "with_followup", time.Now().UTC().Add(-20*time.Hour), `{"label.other": "not a follow up"}`)
Expand Down Expand Up @@ -349,5 +347,4 @@ func TestFollowUpsGetsOnlyThoseBetweenMinAndMaxAndIgnoresAllSortsOfThings(t *tes
ev, _ := json.Marshal(events[0].Event)
assert.Equal(t, string(ev), `{"type":"follow_up","value":"foo"}`)

mustExec(t, pool, "drop table states")
}
17 changes: 16 additions & 1 deletion dean/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func mustExec(t testing.TB, conn *pgxpool.Pool, sql string, arguments ...interfa
}

func testPool() *pgxpool.Pool {
config, err := pgxpool.ParseConfig("postgres://root@localhost:5433/test")
config, err := pgxpool.ParseConfig("postgres://root@localhost:5432/chatroach")
handle(err)

ctx := context.Background()
Expand All @@ -42,3 +42,18 @@ func testPool() *pgxpool.Pool {

return pool
}


func resetDb(pool *pgxpool.Pool, tableNames []string) error {
query := ""
for _, table := range tableNames {
query += fmt.Sprintf("DELETE FROM %s; ", table)
}

_, err := pool.Exec(context.Background(), query)
return err
}

func before(pool *pgxpool.Pool) {
resetDb(pool, []string{"states", "surveys", "users"})
}

0 comments on commit f93c431

Please sign in to comment.