From f93c4317dd0409ef01e711e2f911ed2459d17756 Mon Sep 17 00:00:00 2001 From: Nandan Rao Date: Thu, 4 Aug 2022 07:20:26 +0200 Subject: [PATCH] added timeout blacklist to dean --- dean/dean.go | 1 + dean/go.sum | 3 -- dean/queries.go | 9 +++- dean/queries_test.go | 121 +++++++++++++++++++++---------------------- dean/test_helpers.go | 17 +++++- 5 files changed, 83 insertions(+), 68 deletions(-) diff --git a/dean/dean.go b/dean/dean.go index 554284e0..9120d45f 100644 --- a/dean/dean.go +++ b/dean/dean.go @@ -50,6 +50,7 @@ type Config struct { Botserver string `env:"BOTSERVER_URL,required"` Codes []string `env:"DEAN_FB_CODES,required" envSeparator:","` ErrorTags []string `env:"DEAN_ERROR_TAGS,required" envSeparator:","` + TimeoutBlacklist []string `env:"DEAN_TIMEOUT_BLACKLIST,required" envSeparator:","` ErrorInterval string `env:"DEAN_ERROR_INTERVAL,required"` BlockedInterval string `env:"DEAN_BLOCKED_INTERVAL,required"` RespondingInterval string `env:"DEAN_RESPONDING_INTERVAL,required"` diff --git a/dean/go.sum b/dean/go.sum index a1ab7610..0defe1e9 100644 --- a/dean/go.sum +++ b/dean/go.sum @@ -39,7 +39,6 @@ github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY= github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.0.4 h1:RHkX5ZUD9bl/kn0f9dYUWs1N7Nwvo1wwUYvKiR26Zco= github.com/jackc/pgproto3/v2 v2.0.4/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= @@ -126,7 +125,6 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -161,7 +159,6 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/dean/queries.go b/dean/queries.go index 5588f2c6..1c5b54cb 100644 --- a/dean/queries.go +++ b/dean/queries.go @@ -118,9 +118,14 @@ func Timeouts(cfg *Config, conn *pgxpool.Pool) <-chan *ExternalEvent { FROM states WHERE current_state = 'WAIT_EXTERNAL_EVENT' AND - timeout_date < $1` - + timeout_date < $1 ` d := time.Now().UTC() + + if len(cfg.TimeoutBlacklist) > 0 { + query += `AND current_form != ANY($2)` + return get(conn, getTimeout, query, d, cfg.TimeoutBlacklist) + } + return get(conn, getTimeout, query, d) } diff --git a/dean/queries_test.go b/dean/queries_test.go index 70dd3a76..c260bccf 100644 --- a/dean/queries_test.go +++ b/dean/queries_test.go @@ -10,52 +10,13 @@ import ( ) const ( - surveySql = `drop table if exists surveys; - create table if not exists surveys( - userid VARCHAR NOT NULL, - shortcode VARCHAR NOT NULL, - messages_json JSON, - created TIMESTAMPTZ NOT NULL, - has_followup BOOL AS (messages_json->>'label.buttonHint.default' IS NOT NULL) STORED - ); - - drop table if exists credentials; - create table if not exists credentials( - userid VARCHAR NOT NULL, - entity VARCHAR NOT NULL, - key VARCHAR NOT NULL UNIQUE, - created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, - details JSONB NOT NULL, - facebook_page_id VARCHAR AS (CASE WHEN entity = 'facebook_page' THEN details->>'id' ELSE NULL END) STORED, - UNIQUE(entity, key), - UNIQUE(facebook_page_id) - ); - ` - - pageInsertSql = `INSERT INTO credentials(entity, key, userid, details) VALUES ('facebook_page', ($1)->>'id', 'owner', $1)` - surveyInsertSql = `INSERT INTO surveys(userid, shortcode, created, messages_json) VALUES ('owner', $1, $2, $3);` - - stateSql = `drop table if exists states; - create table if not exists states( - userid VARCHAR NOT NULL, - pageid VARCHAR NOT NULL NOT NULL, - updated TIMESTAMPTZ NOT NULL, - current_state VARCHAR NOT NULL, - state_json JSON NOT NULL, - current_form VARCHAR AS (state_json->'forms'->>-1) STORED, - form_start_time TIMESTAMPTZ AS (CEILING((state_json->'md'->>'startTime')::INT/1000)::INT::TIMESTAMPTZ) STORED, - previous_with_token BOOL AS (state_json->'previousOutput'->>'token' IS NOT NULL) STORED, - previous_is_followup BOOL AS (state_json->'previousOutput'->>'followUp' IS NOT NULL) STORED, - timeout_date TIMESTAMPTZ AS (CASE - WHEN state_json->'wait'->>'type' = 'timeout' THEN (CEILING((state_json->>'waitStart')::INT/1000)::INT::TIMESTAMPTZ + (state_json->'wait'->>'value')::INTERVAL) - ELSE NULL - END) STORED, - fb_error_code varchar AS (state_json->'error'->>'code') STORED, - error_tag VARCHAR AS (state_json->'error'->>'tag') STORED, - next_retry TIMESTAMP AS ((FLOOR((POWER(2, JSON_ARRAY_LENGTH(state_json->'retries'))*60000 + (state_json->'retries'->>-1)::INT)::INT)/1000)::INT::TIMESTAMP) STORED, - CONSTRAINT "valid_state_json" CHECK (state_json ? 'state'), - PRIMARY KEY (userid, pageid) - );` + insertUserSql = ` + INSERT INTO users(id, email) + VALUES ('e49cbb6b-45e1-4b9d-9516-094c63cc6ca2', 'test@test.com'); + ` + pageInsertSql = `INSERT INTO credentials(entity, key, userid, details) VALUES ('facebook_page', ($1)->>'id', 'e49cbb6b-45e1-4b9d-9516-094c63cc6ca2', $1)` + + surveyInsertSql = `INSERT INTO surveys(userid, formid, form, title, shortcode, created, messages) VALUES ('e49cbb6b-45e1-4b9d-9516-094c63cc6ca2', 'formid', '{}', 'title', $1, $2, $3);` insertQuery = `INSERT INTO states(userid, pageid, updated, current_state, state_json) @@ -85,8 +46,8 @@ func makeStateJson(startTime time.Time, form, previousOutput string) string { func TestGetRespondingsGetsOnlyThoseInGivenInterval(t *testing.T) { pool := testPool() defer pool.Close() + before(pool) - mustExec(t, pool, stateSql) mustExec(t, pool, insertQuery, "foo", "bar", @@ -108,14 +69,13 @@ func TestGetRespondingsGetsOnlyThoseInGivenInterval(t *testing.T) { assert.Equal(t, 1, len(events)) assert.Equal(t, "foo", events[0].User) - mustExec(t, pool, "drop table states") } func TestGetRespondingsOnlyGetsThoseOutsideOfGracePeriod(t *testing.T) { pool := testPool() defer pool.Close() + before(pool) - mustExec(t, pool, stateSql) mustExec(t, pool, insertQuery, "foo", "bar", @@ -137,14 +97,13 @@ func TestGetRespondingsOnlyGetsThoseOutsideOfGracePeriod(t *testing.T) { assert.Equal(t, 1, len(events)) assert.Equal(t, "baz", events[0].User) - mustExec(t, pool, "drop table states") } func TestGetBlockedOnlyGetsThoseWithCodesInsideWindow(t *testing.T) { pool := testPool() defer pool.Close() + before(pool) - mustExec(t, pool, stateSql) mustExec(t, pool, insertQuery, "foo", "bar", @@ -171,14 +130,13 @@ func TestGetBlockedOnlyGetsThoseWithCodesInsideWindow(t *testing.T) { assert.Equal(t, 1, len(events)) assert.Equal(t, "foo", events[0].User) - mustExec(t, pool, "drop table states") } func TestGetBlockedOnlyGetsThoseWithNextRetryPassed(t *testing.T) { pool := testPool() defer pool.Close() + before(pool) - mustExec(t, pool, stateSql) mustExec(t, pool, insertQuery, "foo", "bar", @@ -209,14 +167,13 @@ func TestGetBlockedOnlyGetsThoseWithNextRetryPassed(t *testing.T) { assert.Equal(t, 1, len(events)) assert.Equal(t, "foo", events[0].User) - mustExec(t, pool, "drop table states") } func TestGetErroredGetsByTag(t *testing.T) { pool := testPool() defer pool.Close() + before(pool) - mustExec(t, pool, stateSql) mustExec(t, pool, insertQuery, "foo", "bar", @@ -237,23 +194,23 @@ func TestGetErroredGetsByTag(t *testing.T) { assert.Equal(t, 1, len(events)) assert.Equal(t, "foo", events[0].User) - mustExec(t, pool, "drop table states") } func TestGetTimeoutsGetsOnlyExpiredTimeouts(t *testing.T) { pool := testPool() defer pool.Close() + before(pool) ts := time.Now().UTC().Add(-30 * time.Minute) ms := ts.Unix() * 1000 - mustExec(t, pool, stateSql) mustExec(t, pool, insertQuery, "foo", "bar", ts, "WAIT_EXTERNAL_EVENT", fmt.Sprintf(`{"state": "WAIT_EXTERNAL_EVENT", + "forms": ["short1", "short2"], "waitStart": %v, "wait": { "type": "timeout", "value": "20 minutes"}}`, ms)) @@ -263,6 +220,7 @@ func TestGetTimeoutsGetsOnlyExpiredTimeouts(t *testing.T) { ts, "WAIT_EXTERNAL_EVENT", fmt.Sprintf(`{"state": "WAIT_EXTERNAL_EVENT", + "forms": ["short1", "short2"], "waitStart": %v, "wait": { "type": "timeout", "value": "40 minutes"}}`, ms)) @@ -275,22 +233,62 @@ func TestGetTimeoutsGetsOnlyExpiredTimeouts(t *testing.T) { assert.Equal(t, "timeout", events[0].Event.Type) + ev, _ := json.Marshal(events[0].Event) + assert.Equal(t, string(ev), fmt.Sprintf(`{"type":"timeout","value":%v}`, ms)) +} + +func TestGetTimeoutsIgnoresBlacklistShortcodes(t *testing.T) { + pool := testPool() + defer pool.Close() + before(pool) + + ts := time.Now().UTC().Add(-30 * time.Minute) + ms := ts.Unix() * 1000 + + mustExec(t, pool, insertQuery, + "foo", + "bar", + ts, + "WAIT_EXTERNAL_EVENT", + fmt.Sprintf(`{"state": "WAIT_EXTERNAL_EVENT", + "forms": ["short1", "short2"], + "waitStart": %v, + "wait": { "type": "timeout", "value": "20 minutes"}}`, ms)) + + mustExec(t, pool, insertQuery, + "baz", + "bar", + ts, + "WAIT_EXTERNAL_EVENT", + fmt.Sprintf(`{"state": "WAIT_EXTERNAL_EVENT", + "forms": ["short1", "short3"], + "waitStart": %v, + "wait": { "type": "timeout", "value": "20 minutes"}}`, ms)) + + cfg := &Config{TimeoutBlacklist: []string{"short3"}} + ch := Timeouts(cfg, pool) + events := getEvents(ch) + + assert.Equal(t, 1, len(events)) + assert.Equal(t, "foo", events[0].User) + + assert.Equal(t, "timeout", events[0].Event.Type) + ev, _ := json.Marshal(events[0].Event) assert.Equal(t, string(ev), fmt.Sprintf(`{"type":"timeout","value":%v}`, ms)) - mustExec(t, pool, "drop table states") } func TestFollowUpsGetsOnlyThoseBetweenMinAndMaxAndIgnoresAllSortsOfThings(t *testing.T) { pool := testPool() defer pool.Close() + before(pool) - mustExec(t, pool, stateSql) - mustExec(t, pool, surveySql) - + mustExec(t, pool, insertUserSql) mustExec(t, pool, pageInsertSql, `{"id": "bar"}`) mustExec(t, pool, pageInsertSql, `{"id": "qux"}`) mustExec(t, pool, pageInsertSql, `{"id": "quux"}`) + mustExec(t, pool, surveyInsertSql, "with_followup", time.Now().UTC().Add(-50*time.Hour), `{"label.buttonHint.default": "this is follow up"}`) mustExec(t, pool, surveyInsertSql, "with_followup", time.Now().UTC().Add(-40*time.Hour), `{"label.buttonHint.default": "this is follow up"}`) mustExec(t, pool, surveyInsertSql, "with_followup", time.Now().UTC().Add(-20*time.Hour), `{"label.other": "not a follow up"}`) @@ -349,5 +347,4 @@ func TestFollowUpsGetsOnlyThoseBetweenMinAndMaxAndIgnoresAllSortsOfThings(t *tes ev, _ := json.Marshal(events[0].Event) assert.Equal(t, string(ev), `{"type":"follow_up","value":"foo"}`) - mustExec(t, pool, "drop table states") } diff --git a/dean/test_helpers.go b/dean/test_helpers.go index 98ba75e6..89cc6318 100644 --- a/dean/test_helpers.go +++ b/dean/test_helpers.go @@ -33,7 +33,7 @@ func mustExec(t testing.TB, conn *pgxpool.Pool, sql string, arguments ...interfa } func testPool() *pgxpool.Pool { - config, err := pgxpool.ParseConfig("postgres://root@localhost:5433/test") + config, err := pgxpool.ParseConfig("postgres://root@localhost:5432/chatroach") handle(err) ctx := context.Background() @@ -42,3 +42,18 @@ func testPool() *pgxpool.Pool { return pool } + + +func resetDb(pool *pgxpool.Pool, tableNames []string) error { + query := "" + for _, table := range tableNames { + query += fmt.Sprintf("DELETE FROM %s; ", table) + } + + _, err := pool.Exec(context.Background(), query) + return err +} + +func before(pool *pgxpool.Pool) { + resetDb(pool, []string{"states", "surveys", "users"}) +}