From bef663fbcb5aa7443368d14917c1fd0231e17077 Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 02:33:02 +0300 Subject: [PATCH 01/16] feat(pass-nessage-subject-as-header): added prototype for nats subject as "x-nats-subject" header pass --- go.work.sum | 44 ++++++ natsjobs/item.go | 1 + natsjobs/listener.go | 4 + .../.rr-nats-message-subject-as-header.yaml | 36 +++++ tests/jobs_nats_test.go | 126 ++++++++++++++++++ .../jobs/jobs_ok_with_subject_header.php | 29 ++++ 6 files changed, 240 insertions(+) create mode 100644 tests/configs/.rr-nats-message-subject-as-header.yaml create mode 100644 tests/php_test_files/jobs/jobs_ok_with_subject_header.php diff --git a/go.work.sum b/go.work.sum index 968d5bb..512abcf 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys= cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk= +cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y= cloud.google.com/go v0.110.10/go.mod h1:v1OoFqYxiBkUrruItNM3eT4lLByNjxmJSV/xDKJNnic= cloud.google.com/go/accessapproval v1.7.1 h1:/5YjNhR6lzCvmJZAnByYkfEgWjfAKwYP6nkuTk6nKFE= cloud.google.com/go/accessapproval v1.7.2 h1:W55SFrY6EVlcmmRGUk0rGhuy3j4fn7UtEocib/zADVE= @@ -17,6 +18,8 @@ cloud.google.com/go/aiplatform v1.51.1 h1:g+y03dll9HnX9U0oBKIqUOI+8VQWT1QJF12VGx cloud.google.com/go/aiplatform v1.51.1/go.mod h1:kY3nIMAVQOK2XDqDPHaOuD9e+FdMA6OOpfBjsvaFSOo= cloud.google.com/go/aiplatform v1.52.0 h1:TbbUvAujxXlSlbG5+XBtJEEEUyGjtyJxZ/VIlvz9Dps= cloud.google.com/go/aiplatform v1.52.0/go.mod h1:pwZMGvqe0JRkI1GWSZCtnAfrR4K1bv65IHILGA//VEU= +cloud.google.com/go/aiplatform v1.54.0 h1:wH7OYl9Vq/5tupok0BPTFY9xaTLb0GxkReHtB5PF7cI= +cloud.google.com/go/aiplatform v1.54.0/go.mod h1:pwZMGvqe0JRkI1GWSZCtnAfrR4K1bv65IHILGA//VEU= cloud.google.com/go/analytics v0.21.3 h1:TFBC1ZAqX9/jL56GEXdLrVe5vT3I22bDVWyDwZX4IEg= cloud.google.com/go/analytics v0.21.4 h1:SScWR8i/M8h7h3lFKtOYcj0r4272aL+KvRRrsu39Vec= cloud.google.com/go/analytics v0.21.4/go.mod h1:zZgNCxLCy8b2rKKVfC1YkC2vTrpfZmeRCySM3aUbskA= @@ -121,6 +124,8 @@ cloud.google.com/go/cloudbuild v1.14.1 h1:Tp0ITIlFam7T8K/TyeceITtpw1f8+KxVKwYyiy cloud.google.com/go/cloudbuild v1.14.1/go.mod h1:K7wGc/3zfvmYWOWwYTgF/d/UVJhS4pu+HAy7PL7mCsU= cloud.google.com/go/cloudbuild v1.14.3 h1:hP6LDes9iqeppgGbmCkB3C3MvS12gJe5i4ZGtnnIO5c= cloud.google.com/go/cloudbuild v1.14.3/go.mod h1:eIXYWmRt3UtggLnFGx4JvXcMj4kShhVzGndL1LwleEM= +cloud.google.com/go/cloudbuild v1.15.0 h1:9IHfEMWdCklJ1cwouoiQrnxmP0q3pH7JUt8Hqx4Qbck= +cloud.google.com/go/cloudbuild v1.15.0/go.mod h1:eIXYWmRt3UtggLnFGx4JvXcMj4kShhVzGndL1LwleEM= cloud.google.com/go/clouddms v1.7.0 h1:vTcaFaFZTZZ11gXB6aZHdAx+zn30P8YJw4X/S3NC+VQ= cloud.google.com/go/clouddms v1.7.0/go.mod h1:MW1dC6SOtI/tPNCciTsXtsGNEM0i0OccykPvv3hiYeM= cloud.google.com/go/clouddms v1.7.1 h1:LrtqeR2xKV3juG5N7eeUgW+PqdMClOWH2U9PN3EpfFw= @@ -134,17 +139,25 @@ cloud.google.com/go/cloudtasks v1.12.4 h1:5xXuFfAjg0Z5Wb81j2GAbB3e0bwroCeSF+5jBn cloud.google.com/go/cloudtasks v1.12.4/go.mod h1:BEPu0Gtt2dU6FxZHNqqNdGqIG86qyWKBPGnsb7udGY0= cloud.google.com/go/compute v1.19.0 h1:+9zda3WGgW1ZSTlVppLCYFIr48Pa35q1uG2N1itbCEQ= cloud.google.com/go/compute v1.23.1/go.mod h1:CqB3xpmPKKt3OJpW2ndFIXnA9A4xAy/F3Xp1ixncW78= +cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= +cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/contactcenterinsights v1.10.0 h1:YR2aPedGVQPpFBZXJnPkqRj8M//8veIZZH5ZvICoXnI= cloud.google.com/go/contactcenterinsights v1.11.1 h1:dEfCjtdYjS3n8/1HEKbJaOL31l3dEs3q9aeaNsyrJBc= cloud.google.com/go/contactcenterinsights v1.11.1/go.mod h1:FeNP3Kg8iteKM80lMwSk3zZZKVxr+PGnAId6soKuXwE= cloud.google.com/go/contactcenterinsights v1.11.3 h1:Ui14kRKgQ3mVrMRkiBNzjdJIfFAN2qqiu9993ec9+jw= cloud.google.com/go/contactcenterinsights v1.11.3/go.mod h1:HHX5wrz5LHVAwfI2smIotQG9x8Qd6gYilaHcLLLmNis= +cloud.google.com/go/contactcenterinsights v1.12.0 h1:wP41IUA4ucMVooj/TP53jd7vbNjWrDkAPOeulVJGT5U= +cloud.google.com/go/contactcenterinsights v1.12.0/go.mod h1:HHX5wrz5LHVAwfI2smIotQG9x8Qd6gYilaHcLLLmNis= cloud.google.com/go/container v1.26.0 h1:SszQdI0qlyKsImz8/l26rpTZMyqvaH9yfua7rirDZvY= cloud.google.com/go/container v1.26.0/go.mod h1:YJCmRet6+6jnYYRS000T6k0D0xUXQgBSaJ7VwI8FBj4= cloud.google.com/go/container v1.26.1 h1:1CXjOL/dZZ2jXX1CYWqlxmXqJbZo8HwQX4DJxLzgQWo= cloud.google.com/go/container v1.26.1/go.mod h1:5smONjPRUxeEpDG7bMKWfDL4sauswqEtnBK1/KKpR04= cloud.google.com/go/container v1.27.1 h1:ZfLRiFM9ddFE92SlA28rknI6YJMz5Z5huAQK+FKWxIQ= cloud.google.com/go/container v1.27.1/go.mod h1:b1A1gJeTBXVLQ6GGw9/9M4FG94BEGsqJ5+t4d/3N7O4= +cloud.google.com/go/container v1.28.0 h1:/o82CFWXIYnT9p/07SnRgybqL3Pmmu86jYIlzlJVUBY= +cloud.google.com/go/container v1.28.0/go.mod h1:b1A1gJeTBXVLQ6GGw9/9M4FG94BEGsqJ5+t4d/3N7O4= cloud.google.com/go/containeranalysis v0.11.0 h1:/EsoP+UTIjvl4yqrLA4WgUG83kwQhqZmbXEfqirT2LM= cloud.google.com/go/containeranalysis v0.11.0/go.mod h1:4n2e99ZwpGxpNcz+YsFT1dfOHPQFGcAC8FN2M2/ne/U= cloud.google.com/go/containeranalysis v0.11.1 h1:PHh4KTcMpCjYgxfV+TzvP24wolTGP9lGbqh9sBNHxjs= @@ -157,6 +170,8 @@ cloud.google.com/go/datacatalog v1.18.1 h1:xJp9mZrc2HPaoxIz3sP9pCmf/impifweQ/yGG cloud.google.com/go/datacatalog v1.18.1/go.mod h1:TzAWaz+ON1tkNr4MOcak8EBHX7wIRX/gZKM+yTVsv+A= cloud.google.com/go/datacatalog v1.18.3 h1:zmdxP6nOjN5Qb1rtu9h4kbEVwerQ6Oshf+t747QJUew= cloud.google.com/go/datacatalog v1.18.3/go.mod h1:5FR6ZIF8RZrtml0VUao22FxhdjkoG+a0866rEnObryM= +cloud.google.com/go/datacatalog v1.19.0 h1:rbYNmHwvAOOwnW2FPXYkaK3Mf1MmGqRzK0mMiIEyLdo= +cloud.google.com/go/datacatalog v1.19.0/go.mod h1:5FR6ZIF8RZrtml0VUao22FxhdjkoG+a0866rEnObryM= cloud.google.com/go/dataflow v0.9.1 h1:VzG2tqsk/HbmOtq/XSfdF4cBvUWRK+S+oL9k4eWkENQ= cloud.google.com/go/dataflow v0.9.2 h1:cpu2OeNxnYVadAIXETLRS5riz3KUR8ErbTojAQTFJVg= cloud.google.com/go/dataflow v0.9.2/go.mod h1:vBfdBZ/ejlTaYIGB3zB4T08UshH70vbtZeMD+urnUSo= @@ -183,6 +198,8 @@ cloud.google.com/go/dataplex v1.10.1 h1:8Irss8sIalm/X8r0Masv5KJRkddcxov3TiW8W96F cloud.google.com/go/dataplex v1.10.1/go.mod h1:1MzmBv8FvjYfc7vDdxhnLFNskikkB+3vl475/XdCDhs= cloud.google.com/go/dataplex v1.11.1 h1:+malGTMnHubsSi0D6dbr3aqp86dKs0t4yAdmxKZGUm4= cloud.google.com/go/dataplex v1.11.1/go.mod h1:mHJYQQ2VEJHsyoC0OdNyy988DvEbPhqFs5OOLffLX0c= +cloud.google.com/go/dataplex v1.11.2 h1:AfFFR15Ifh4U+Me1IBztrSd5CrasTODzy3x8KtDyHdc= +cloud.google.com/go/dataplex v1.11.2/go.mod h1:mHJYQQ2VEJHsyoC0OdNyy988DvEbPhqFs5OOLffLX0c= cloud.google.com/go/dataproc v1.12.0 h1:W47qHL3W4BPkAIbk4SWmIERwsWBaNnWm0P2sdx3YgGU= cloud.google.com/go/dataproc/v2 v2.2.0 h1:jKijbdsERm2hy/5dFl/LeQN+7CNssLdGXQYBMvMH/M4= cloud.google.com/go/dataproc/v2 v2.2.0/go.mod h1:lZR7AQtwZPvmINx5J87DSOOpTfof9LVZju6/Qo4lmcY= @@ -190,6 +207,8 @@ cloud.google.com/go/dataproc/v2 v2.2.1 h1:BPjIIkTCAOHUkMtWKqae55qEku5K09LVbQ46LY cloud.google.com/go/dataproc/v2 v2.2.1/go.mod h1:QdAJLaBjh+l4PVlVZcmrmhGccosY/omC1qwfQ61Zv/o= cloud.google.com/go/dataproc/v2 v2.2.3 h1:snv4EQfh1BfQ/HZS2MGbOqCgwEzYE/j6f30XFOTsgXg= cloud.google.com/go/dataproc/v2 v2.2.3/go.mod h1:G5R6GBc9r36SXv/RtZIVfB8SipI+xVn0bX5SxUzVYbY= +cloud.google.com/go/dataproc/v2 v2.3.0 h1:tTVP9tTxmc8fixxOd/8s6Q6Pz/+yzn7r7XdZHretQH0= +cloud.google.com/go/dataproc/v2 v2.3.0/go.mod h1:G5R6GBc9r36SXv/RtZIVfB8SipI+xVn0bX5SxUzVYbY= cloud.google.com/go/dataqna v0.8.1 h1:ITpUJep04hC9V7C+gcK390HO++xesQFSUJ7S4nSnF3U= cloud.google.com/go/dataqna v0.8.2 h1:vJ9JVKDgDG7AQMbTD8pdWaogJ4c/yHn0qer+q0nFIaw= cloud.google.com/go/dataqna v0.8.2/go.mod h1:KNEqgx8TTmUipnQsScOoDpq/VlXVptUqVMZnt30WAPs= @@ -210,6 +229,8 @@ cloud.google.com/go/deploy v1.13.1 h1:eV5MdoQJGdac/k7D97SDjD8iLE4jCzL42UCAgG6j0i cloud.google.com/go/deploy v1.13.1/go.mod h1:8jeadyLkH9qu9xgO3hVWw8jVr29N1mnW42gRJT8GY6g= cloud.google.com/go/deploy v1.14.2 h1:OWVwtGy+QeQGPT3yc8bJu6yANoPFpXniCgl7bJu5u88= cloud.google.com/go/deploy v1.14.2/go.mod h1:e5XOUI5D+YGldyLNZ21wbp9S8otJbBE4i88PtO9x/2g= +cloud.google.com/go/deploy v1.15.0 h1:ZdmYzRMTGkVyP1nXEUat9FpbJGJemDcNcx82RSSOElc= +cloud.google.com/go/deploy v1.15.0/go.mod h1:e5XOUI5D+YGldyLNZ21wbp9S8otJbBE4i88PtO9x/2g= cloud.google.com/go/dialogflow v1.43.0 h1:0hBV5ipVbhYNKCyiBoM47bUt+43Kd8eWXhBr+pwUSTw= cloud.google.com/go/dialogflow v1.43.0/go.mod h1:pDUJdi4elL0MFmt1REMvFkdsUTYSHq+rTCS8wg0S3+M= cloud.google.com/go/dialogflow v1.44.1 h1:Ml/hgEzU3AN0tjNSSv4/QmG1nqwYEsiCySKMkWMqUmI= @@ -254,6 +275,8 @@ cloud.google.com/go/filestore v1.7.2 h1:/Nnk5pOoY1Lx6A42hJ2eBYcBfqKvLcnh8fV4egop cloud.google.com/go/filestore v1.7.2/go.mod h1:TYOlyJs25f/omgj+vY7/tIG/E7BX369triSPzE4LdgE= cloud.google.com/go/filestore v1.7.4 h1:twtI5/89kf9QW7MqDic9fsUbH5ZLIDV1MVsRmu9iu2E= cloud.google.com/go/filestore v1.7.4/go.mod h1:S5JCxIbFjeBhWMTfIYH2Jx24J6BqjwpkkPl+nBA5DlI= +cloud.google.com/go/filestore v1.8.0 h1:/+wUEGwk3x3Kxomi2cP5dsR8+SIXxo7M0THDjreFSYo= +cloud.google.com/go/filestore v1.8.0/go.mod h1:S5JCxIbFjeBhWMTfIYH2Jx24J6BqjwpkkPl+nBA5DlI= cloud.google.com/go/firestore v1.9.0 h1:IBlRyxgGySXu5VuW0RgGFlTtLukSnNkpDiEOMkQkmpA= cloud.google.com/go/firestore v1.13.0 h1:/3S4RssUV4GO/kvgJZB+tayjhOfyAHs+KcpJgRVu/Qk= cloud.google.com/go/firestore v1.13.0/go.mod h1:QojqqOh8IntInDUSTAh0c8ZsPYAr68Ma8c5DWOy8xb8= @@ -446,6 +469,8 @@ cloud.google.com/go/recaptchaenterprise/v2 v2.8.1 h1:06V6+edT20PcrFJfH0TVWMZpZCU cloud.google.com/go/recaptchaenterprise/v2 v2.8.1/go.mod h1:JZYZJOeZjgSSTGP4uz7NlQ4/d1w5hGmksVgM0lbEij0= cloud.google.com/go/recaptchaenterprise/v2 v2.8.3 h1:UaV9C58snc5IsRQ6NN65jmRGnTdPT7mYZzK4Vbun+ik= cloud.google.com/go/recaptchaenterprise/v2 v2.8.3/go.mod h1:Dak54rw6lC2gBY8FBznpOCAR58wKf+R+ZSJRoeJok4w= +cloud.google.com/go/recaptchaenterprise/v2 v2.8.4 h1:KOlLHLv3h3HwcZAkx91ubM3Oztz3JtT3ZacAJhWDorQ= +cloud.google.com/go/recaptchaenterprise/v2 v2.8.4/go.mod h1:Dak54rw6lC2gBY8FBznpOCAR58wKf+R+ZSJRoeJok4w= cloud.google.com/go/recommendationengine v0.8.1 h1:nMr1OEVHuDambRn+/y4RmNAmnR/pXCuHtH0Y4tCgGRQ= cloud.google.com/go/recommendationengine v0.8.2 h1:odf0TZXtwoZ5kJaWBlaE9D0AV+WJLLs+/SRSuE4T/ds= cloud.google.com/go/recommendationengine v0.8.2/go.mod h1:QIybYHPK58qir9CV2ix/re/M//Ty10OxjnnhWdaKS1Y= @@ -487,6 +512,8 @@ cloud.google.com/go/scheduler v1.10.2 h1:lgUd1D84JEgNzzHRlcZEIoQ6Ny10YWe8RNH1knh cloud.google.com/go/scheduler v1.10.2/go.mod h1:O3jX6HRH5eKCA3FutMw375XHZJudNIKVonSCHv7ropY= cloud.google.com/go/scheduler v1.10.4 h1:LXm6L6IYW3Fy8lxU7kvT7r6JiW/noxn2gItJmsvwzV4= cloud.google.com/go/scheduler v1.10.4/go.mod h1:MTuXcrJC9tqOHhixdbHDFSIuh7xZF2IysiINDuiq6NI= +cloud.google.com/go/scheduler v1.10.5 h1:eMEettHlFhG5pXsoHouIM5nRT+k+zU4+GUvRtnxhuVI= +cloud.google.com/go/scheduler v1.10.5/go.mod h1:MTuXcrJC9tqOHhixdbHDFSIuh7xZF2IysiINDuiq6NI= cloud.google.com/go/secretmanager v1.11.1 h1:cLTCwAjFh9fKvU6F13Y4L9vPcx9yiWPyWXE4+zkuEQs= cloud.google.com/go/secretmanager v1.11.2 h1:52Z78hH8NBWIqbvIG0wi0EoTaAmSx99KIOAmDXIlX0M= cloud.google.com/go/secretmanager v1.11.2/go.mod h1:MQm4t3deoSub7+WNwiC4/tRYgDBHJgJPvswqQVB1Vss= @@ -521,11 +548,15 @@ cloud.google.com/go/spanner v1.50.0 h1:QrJFOpaxCXdXF+GkiruLz642PHxkdj68PbbnLw3O2 cloud.google.com/go/spanner v1.50.0/go.mod h1:eGj9mQGK8+hkgSVbHNQ06pQ4oS+cyc4tXXd6Dif1KoM= cloud.google.com/go/spanner v1.51.0 h1:l3exhhsVMKsx1E7Xd1QajYSvHmI1KZoWPW5tRxIIdvQ= cloud.google.com/go/spanner v1.51.0/go.mod h1:c5KNo5LQ1X5tJwma9rSQZsXNBDNvj4/n8BVc3LNahq0= +cloud.google.com/go/spanner v1.53.0 h1:/NzWQJ1MEhdRcffiutRKbW/AIGVKhcTeivWTDjEyCCo= +cloud.google.com/go/spanner v1.53.0/go.mod h1:liG4iCeLqm5L3fFLU5whFITqP0e0orsAW1uUSrd4rws= cloud.google.com/go/speech v1.19.0 h1:MCagaq8ObV2tr1kZJcJYgXYbIn8Ai5rp42tyGYw9rls= cloud.google.com/go/speech v1.19.1 h1:z035FMLs98jpnqcP5xZZ6Es+g6utbeVoUH64BaTzTSU= cloud.google.com/go/speech v1.19.1/go.mod h1:WcuaWz/3hOlzPFOVo9DUsblMIHwxP589y6ZMtaG+iAA= cloud.google.com/go/speech v1.20.1 h1:OpJ666ao7XxXewGSAkDUJnW188tJ5hNPoM7pZB+Q730= cloud.google.com/go/speech v1.20.1/go.mod h1:wwolycgONvfz2EDU8rKuHRW3+wc9ILPsAWoikBEWavY= +cloud.google.com/go/speech v1.21.0 h1:qkxNao58oF8ghAHE1Eghen7XepawYEN5zuZXYWaUTA4= +cloud.google.com/go/speech v1.21.0/go.mod h1:wwolycgONvfz2EDU8rKuHRW3+wc9ILPsAWoikBEWavY= cloud.google.com/go/storage v1.14.0 h1:6RRlFMv1omScs6iq2hfE3IvgE+l6RfJPampq8UZc5TU= cloud.google.com/go/storage v1.30.1 h1:uOdMxAs8HExqBlnLtnQyP0YkvbiDpdGShGKtx6U/oNM= cloud.google.com/go/storagetransfer v1.10.0 h1:+ZLkeXx0K0Pk5XdDmG0MnUVqIR18lllsihU/yq39I8Q= @@ -646,6 +677,8 @@ github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJ github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M= github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe h1:QQ3GSy+MqSHxm/d8nCtnAiZdYFd45cYZPs8vOOIYKfk= github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= @@ -665,6 +698,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad h1:E github.com/envoyproxy/go-control-plane v0.11.1 h1:wSUXTlLfiAQRWs2F+p+EKOY9rUyis1MyGqJ2DIk5HpM= github.com/envoyproxy/go-control-plane v0.11.1/go.mod h1:uhMcXKCQMEJHiAb0w+YGefQLaTEw+YhGluxZkrTmD0g= github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= +github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/go-fonts/dejavu v0.1.0 h1:JSajPXURYqpr+Cu8U9bt8K+XcACIHWqWrvWCKyeFmVQ= @@ -870,8 +905,12 @@ golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= +golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY= +golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -879,6 +918,8 @@ golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU= golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM= @@ -886,6 +927,8 @@ golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= +golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8= +golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= @@ -895,6 +938,7 @@ google.golang.org/api v0.122.0 h1:zDobeejm3E7pEG1mNHvdxvjs5XJoCMzyNH+CmwL94Es= google.golang.org/api v0.126.0 h1:q4GJq+cAdMAC7XP7njvQ4tvohGLiSlytuL4BQxbIZ+o= google.golang.org/api v0.143.0 h1:o8cekTkqhywkbZT6p1UHJPZ9+9uuCAJs/KYomxZB8fA= google.golang.org/api v0.143.0/go.mod h1:FoX9DO9hT7DLNn97OuoZAGSDuNAXdJRuGK98rSUgurk= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY= google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4= google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q= diff --git a/natsjobs/item.go b/natsjobs/item.go index d1a15ef..c4b27bc 100644 --- a/natsjobs/item.go +++ b/natsjobs/item.go @@ -46,6 +46,7 @@ type Options struct { stream string seq uint64 sub jetstream.Stream + subject string } // DelayDuration returns delay duration in a form of time.Duration. diff --git a/natsjobs/listener.go b/natsjobs/listener.go index 51073e5..fb538e8 100644 --- a/natsjobs/listener.go +++ b/natsjobs/listener.go @@ -92,6 +92,8 @@ func (c *Driver) listenerStart() { //nolint:gocognit item.Options.requeueFn = c.requeue // sequence needed for the requeue item.Options.seq = meta.Sequence.Stream + // subject needed to pass it as header + item.Options.subject = m.Subject() // needed only if delete after ack is true if c.deleteAfterAck { @@ -134,6 +136,8 @@ func (c *Driver) listenerStart() { //nolint:gocognit item.headers = make(map[string][]string, 1) } + item.headers["x-nats-subject"] = []string{item.Options.subject} + c.prop.Inject(ctx, propagation.HeaderCarrier(item.headers)) c.queue.Insert(item) span.End() diff --git a/tests/configs/.rr-nats-message-subject-as-header.yaml b/tests/configs/.rr-nats-message-subject-as-header.yaml new file mode 100644 index 0000000..8253e3f --- /dev/null +++ b/tests/configs/.rr-nats-message-subject-as-header.yaml @@ -0,0 +1,36 @@ +version: '3' + +rpc: + listen: tcp://127.0.0.1:6601 + +server: + command: "php php_test_files/jobs/jobs_ok_pq.php" + relay: "pipes" + +nats: + addr: "nats://127.0.0.1:4222" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 2 + pipeline_size: 100000 + timeout: 100 + pool: + num_workers: 2 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1-pq: + driver: nats + config: + prefetch: 100 + subject: "default-nats-message-subject-as-header.*" + stream: "foo-nats-message-subject-as-header" + delete_after_ack: true + deliver_new: "true" + priority: 1 diff --git a/tests/jobs_nats_test.go b/tests/jobs_nats_test.go index 6ec1f85..e7656b2 100644 --- a/tests/jobs_nats_test.go +++ b/tests/jobs_nats_test.go @@ -1024,6 +1024,132 @@ func TestNATSOTEL(t *testing.T) { }) } +func TestNATSMessageSubjectAsHeader(t *testing.T) { + cont := endure.New(slog.LevelDebug) + + cfg := &config.Plugin{ + Version: "4.6.2", + Path: "configs/.rr-nats-message-subject-as-header.yaml", + Prefix: "rr", + } + + l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel) + err := cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + l, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &natsPlugin.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + conn, err := nats.Connect("nats://127.0.0.1:4222", + nats.NoEcho(), + nats.Timeout(time.Minute), + nats.MaxReconnects(-1), + nats.PingInterval(time.Second*10), + nats.ReconnectWait(time.Second), + ) + require.NoError(t, err) + + js, err := jetstream.New(conn) + require.NoError(t, err) + + ctx := context.Background() + + stream, _ := js.Stream(ctx, "foo-nats-message-subject-as-header") + si, err := stream.Info(ctx) + if err != nil { + if err.Error() == "nats: stream not found" { + // skip + } else { + t.Fatal(err) + } + } + + if si == nil { + _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: "foo-nats-message-subject-as-header", + Subjects: []string{"nats-message-subject-as-header.*"}, + }) + if err != nil { + t.Fatal(err) + } + } + + _, err = js.PublishMsg(ctx, &nats.Msg{ + Data: []byte("foo-barrrrrr-bazzzzz"), + Subject: "default-nats-message-subject-as-header.current-subject", + }) + require.NoError(t, err) + + time.Sleep(time.Second * 10) + helpers.DestroyPipelines("127.0.0.1:6001", "test-raw") + + stopCh <- struct{}{} + wg.Wait() + + assert.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was started").Len()) + assert.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len()) + assert.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len()) + assert.Equal(t, 1, oLogger.FilterMessageSnippet("job was processed successfully").Len()) + + t.Cleanup(func() { + errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-raw") + t.Log(errc) + }) +} + func declareNATSPipe(address, subj, stream string) func(t *testing.T) { return func(t *testing.T) { conn, err := net.Dial("tcp", address) diff --git a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php new file mode 100644 index 0000000..9173cd6 --- /dev/null +++ b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php @@ -0,0 +1,29 @@ +waitTask()) { + try { + $subject = $task->getHeader('x-nats-subject')[0]; + if ('default-nats-message-subject-as-header.current-subject' !== $task->getQueue()) { + throw new RuntimeException('Subject was not found'); + } + + $task->complete(); + } catch (\Throwable $e) { + $task->error((string)$e); + } +} From 61845effab45d89590c4aa7f54291aab250102cb Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 02:55:10 +0300 Subject: [PATCH 02/16] feat(pass-nessage-subject-as-header): temporary added branch feature/pass-message-subject-as-header to linux.yml workflow --- .github/workflows/linux.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 836ddf9..9d4282a 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -5,6 +5,7 @@ on: branches: - master - stable + - "feature/pass-message-subject-as-header" pull_request: branches: - master From 9b0fe258606149bc9e694ddbb10907b9ada5f6a1 Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 03:06:26 +0300 Subject: [PATCH 03/16] feat(pass-nessage-subject-as-header): fixed stream cleanup --- tests/jobs_nats_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jobs_nats_test.go b/tests/jobs_nats_test.go index e7656b2..1bf6a2e 100644 --- a/tests/jobs_nats_test.go +++ b/tests/jobs_nats_test.go @@ -1145,7 +1145,7 @@ func TestNATSMessageSubjectAsHeader(t *testing.T) { assert.Equal(t, 1, oLogger.FilterMessageSnippet("job was processed successfully").Len()) t.Cleanup(func() { - errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-raw") + errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-nats-message-subject-as-header") t.Log(errc) }) } From 52b828d9c9fa10f51cfa3efea6db9aa94df58ecd Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 03:07:46 +0300 Subject: [PATCH 04/16] feat(pass-nessage-subject-as-header): fixed pipeline name --- tests/configs/.rr-nats-message-subject-as-header.yaml | 2 +- tests/jobs_nats_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/configs/.rr-nats-message-subject-as-header.yaml b/tests/configs/.rr-nats-message-subject-as-header.yaml index 8253e3f..f0bff0e 100644 --- a/tests/configs/.rr-nats-message-subject-as-header.yaml +++ b/tests/configs/.rr-nats-message-subject-as-header.yaml @@ -25,7 +25,7 @@ jobs: destroy_timeout: 60s pipelines: - test-1-pq: + test-nats-message-subject-as-header: driver: nats config: prefetch: 100 diff --git a/tests/jobs_nats_test.go b/tests/jobs_nats_test.go index 1bf6a2e..5dd53ba 100644 --- a/tests/jobs_nats_test.go +++ b/tests/jobs_nats_test.go @@ -1134,7 +1134,7 @@ func TestNATSMessageSubjectAsHeader(t *testing.T) { require.NoError(t, err) time.Sleep(time.Second * 10) - helpers.DestroyPipelines("127.0.0.1:6001", "test-raw") + helpers.DestroyPipelines("127.0.0.1:6001", "test-nats-message-subject-as-header") stopCh <- struct{}{} wg.Wait() From 17a2d58e0be956d50d486116eb2e1e70552ca8e6 Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 03:12:48 +0300 Subject: [PATCH 05/16] feat(pass-nessage-subject-as-header): fixed php test mock --- tests/php_test_files/jobs/jobs_ok_with_subject_header.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php index 9173cd6..46e478c 100644 --- a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php +++ b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php @@ -17,8 +17,8 @@ while ($task = $consumer->waitTask()) { try { - $subject = $task->getHeader('x-nats-subject')[0]; - if ('default-nats-message-subject-as-header.current-subject' !== $task->getQueue()) { + $subject = $task->getHeader('x-nats-subject')[0] ?? 'undefined'; + if ('default-nats-message-subject-as-header.current-subject' !== $subject) { throw new RuntimeException('Subject was not found'); } From 54921b34293fd3cd76198387d4e378d563a8e77f Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 03:16:10 +0300 Subject: [PATCH 06/16] feat(pass-nessage-subject-as-header): fixed typo in command handler file name --- tests/configs/.rr-nats-message-subject-as-header.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/configs/.rr-nats-message-subject-as-header.yaml b/tests/configs/.rr-nats-message-subject-as-header.yaml index f0bff0e..41e02c8 100644 --- a/tests/configs/.rr-nats-message-subject-as-header.yaml +++ b/tests/configs/.rr-nats-message-subject-as-header.yaml @@ -4,7 +4,7 @@ rpc: listen: tcp://127.0.0.1:6601 server: - command: "php php_test_files/jobs/jobs_ok_pq.php" + command: "php php_test_files/jobs/jobs_ok_with_subject_header.php" relay: "pipes" nats: From 72c8187ce23c8954b220c70d6f5b1fa4e7702780 Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 03:32:13 +0300 Subject: [PATCH 07/16] feat(pass-nessage-subject-as-header): changed raw insert in map to Inject. Number of workers reduced to 1 in .rr-nats-message-subject-as-header.yaml --- natsjobs/listener.go | 5 +++-- tests/configs/.rr-nats-message-subject-as-header.yaml | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/natsjobs/listener.go b/natsjobs/listener.go index fb538e8..d3e27d4 100644 --- a/natsjobs/listener.go +++ b/natsjobs/listener.go @@ -136,9 +136,10 @@ func (c *Driver) listenerStart() { //nolint:gocognit item.headers = make(map[string][]string, 1) } - item.headers["x-nats-subject"] = []string{item.Options.subject} - c.prop.Inject(ctx, propagation.HeaderCarrier(item.headers)) + c.prop.Inject(ctx, propagation.HeaderCarrier(map[string][]string{ + "x-nats-subject": {item.Options.subject}, + })) c.queue.Insert(item) span.End() diff --git a/tests/configs/.rr-nats-message-subject-as-header.yaml b/tests/configs/.rr-nats-message-subject-as-header.yaml index 41e02c8..0250e4b 100644 --- a/tests/configs/.rr-nats-message-subject-as-header.yaml +++ b/tests/configs/.rr-nats-message-subject-as-header.yaml @@ -16,9 +16,9 @@ logs: mode: development jobs: - num_pollers: 2 + num_pollers: 1 pipeline_size: 100000 - timeout: 100 + timeout: 1 pool: num_workers: 2 allocate_timeout: 60s From 18f71cd102a6c308909ff2404ba6a058471c8572 Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 03:41:48 +0300 Subject: [PATCH 08/16] feat(pass-nessage-subject-as-header): fixed forgotten consume --- tests/configs/.rr-nats-message-subject-as-header.yaml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/configs/.rr-nats-message-subject-as-header.yaml b/tests/configs/.rr-nats-message-subject-as-header.yaml index 0250e4b..d64b3af 100644 --- a/tests/configs/.rr-nats-message-subject-as-header.yaml +++ b/tests/configs/.rr-nats-message-subject-as-header.yaml @@ -1,8 +1,5 @@ version: '3' -rpc: - listen: tcp://127.0.0.1:6601 - server: command: "php php_test_files/jobs/jobs_ok_with_subject_header.php" relay: "pipes" @@ -23,7 +20,8 @@ jobs: num_workers: 2 allocate_timeout: 60s destroy_timeout: 60s - + consume: + - "test-nats-message-subject-as-header" pipelines: test-nats-message-subject-as-header: driver: nats From d4261a58451710c618ad3f968fcc18eb2146d62f Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Thu, 30 Nov 2023 04:11:04 +0300 Subject: [PATCH 09/16] feat(pass-nessage-subject-as-header): fixed error in PHP code --- tests/php_test_files/jobs/jobs_ok_with_subject_header.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php index 46e478c..103b763 100644 --- a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php +++ b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php @@ -24,6 +24,6 @@ $task->complete(); } catch (\Throwable $e) { - $task->error((string)$e); + $task->fail((string)$e); } } From 5aa6df3744cc2145c6a3f81af59a167663983f7e Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Fri, 1 Dec 2023 13:48:31 +0300 Subject: [PATCH 10/16] feat(pass-nessage-subject-as-header): removed dev-time branch subscription github action --- .github/workflows/linux.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 9d4282a..836ddf9 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -5,7 +5,6 @@ on: branches: - master - stable - - "feature/pass-message-subject-as-header" pull_request: branches: - master From fdc8109222a761588f4e2d224694caabded254eb Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Fri, 1 Dec 2023 14:04:19 +0300 Subject: [PATCH 11/16] feat(pass-nessage-subject-as-header): refactored expected subject definition and exception message format --- .../jobs/jobs_ok_with_subject_header.php | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php index 103b763..14cda3d 100644 --- a/tests/php_test_files/jobs/jobs_ok_with_subject_header.php +++ b/tests/php_test_files/jobs/jobs_ok_with_subject_header.php @@ -15,11 +15,17 @@ $consumer = new Spiral\RoadRunner\Jobs\Consumer(); +const EXPECTED_SUBJECT = 'default-nats-message-subject-as-header.current-subject'; + while ($task = $consumer->waitTask()) { try { $subject = $task->getHeader('x-nats-subject')[0] ?? 'undefined'; - if ('default-nats-message-subject-as-header.current-subject' !== $subject) { - throw new RuntimeException('Subject was not found'); + if (EXPECTED_SUBJECT !== $subject) { + throw new RuntimeException(sprintf( + "Expected subject '%s', got '%s'", + EXPECTED_SUBJECT, + $subject + )); } $task->complete(); From 266e4098f043e6a0c90c60136b0331819b18a433 Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Fri, 1 Dec 2023 14:27:04 +0300 Subject: [PATCH 12/16] feat(pass-nessage-subject-as-header): refactored code related with nats subject header injection --- natsjobs/listener.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/natsjobs/listener.go b/natsjobs/listener.go index d3e27d4..3febf62 100644 --- a/natsjobs/listener.go +++ b/natsjobs/listener.go @@ -10,6 +10,8 @@ import ( "go.uber.org/zap" ) +const NatsSubjectHeaderKey = "x-nats-subject" + // blocking func (c *Driver) listenerInit() error { id := uuid.NewString() @@ -135,11 +137,9 @@ func (c *Driver) listenerStart() { //nolint:gocognit if item.headers == nil { item.headers = make(map[string][]string, 1) } + item.headers[NatsSubjectHeaderKey] = []string{m.Subject()} c.prop.Inject(ctx, propagation.HeaderCarrier(item.headers)) - c.prop.Inject(ctx, propagation.HeaderCarrier(map[string][]string{ - "x-nats-subject": {item.Options.subject}, - })) c.queue.Insert(item) span.End() From 18b42f9b967d095dad06e2a08d1be070712c3c6a Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Fri, 1 Dec 2023 16:51:56 +0300 Subject: [PATCH 13/16] feat(pass-nessage-subject-as-header): removed redundant subject field in Options Struct --- natsjobs/item.go | 1 - natsjobs/listener.go | 2 -- 2 files changed, 3 deletions(-) diff --git a/natsjobs/item.go b/natsjobs/item.go index c4b27bc..d1a15ef 100644 --- a/natsjobs/item.go +++ b/natsjobs/item.go @@ -46,7 +46,6 @@ type Options struct { stream string seq uint64 sub jetstream.Stream - subject string } // DelayDuration returns delay duration in a form of time.Duration. diff --git a/natsjobs/listener.go b/natsjobs/listener.go index 3febf62..d02deee 100644 --- a/natsjobs/listener.go +++ b/natsjobs/listener.go @@ -94,8 +94,6 @@ func (c *Driver) listenerStart() { //nolint:gocognit item.Options.requeueFn = c.requeue // sequence needed for the requeue item.Options.seq = meta.Sequence.Stream - // subject needed to pass it as header - item.Options.subject = m.Subject() // needed only if delete after ack is true if c.deleteAfterAck { From 6947a9a0ac87e8e41f2b466ef415c0135fa7a10e Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Sun, 3 Dec 2023 16:24:57 +0300 Subject: [PATCH 14/16] feat(pass-nessage-subject-as-header): moved NatsSubjectHeaderKey constant to config.go --- natsjobs/config.go | 2 ++ natsjobs/listener.go | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/natsjobs/config.go b/natsjobs/config.go index 5eb0c47..74abd25 100644 --- a/natsjobs/config.go +++ b/natsjobs/config.go @@ -5,6 +5,8 @@ import ( ) const ( + NatsSubjectHeaderKey string = "x-nats-subject" + pipeSubject string = "subject" pipeStream string = "stream" pipePrefetch string = "prefetch" diff --git a/natsjobs/listener.go b/natsjobs/listener.go index d02deee..a3be9b3 100644 --- a/natsjobs/listener.go +++ b/natsjobs/listener.go @@ -10,8 +10,6 @@ import ( "go.uber.org/zap" ) -const NatsSubjectHeaderKey = "x-nats-subject" - // blocking func (c *Driver) listenerInit() error { id := uuid.NewString() From 4d81854e5c475883a07a844597fa8e0ffd99c497 Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Sun, 3 Dec 2023 17:05:30 +0300 Subject: [PATCH 15/16] feat(pass-nessage-subject-as-header): added assert that job completed without any errors --- tests/jobs_nats_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/jobs_nats_test.go b/tests/jobs_nats_test.go index 5dd53ba..5c27b19 100644 --- a/tests/jobs_nats_test.go +++ b/tests/jobs_nats_test.go @@ -1143,6 +1143,7 @@ func TestNATSMessageSubjectAsHeader(t *testing.T) { assert.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len()) assert.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len()) assert.Equal(t, 1, oLogger.FilterMessageSnippet("job was processed successfully").Len()) + assert.Equal(t, 0, oLogger.FilterMessageSnippet("jobs protocol error").Len()) t.Cleanup(func() { errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-nats-message-subject-as-header") From e8b4f55d396fdcb58c280584892bbb5f03d6cc1e Mon Sep 17 00:00:00 2001 From: KernelMrex <24671528+KernelMrex@users.noreply.github.com> Date: Mon, 4 Dec 2023 10:07:15 +0300 Subject: [PATCH 16/16] feat(pass-nessage-subject-as-header): refactored Subject Header Name --- natsjobs/config.go | 2 +- natsjobs/listener.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/natsjobs/config.go b/natsjobs/config.go index 74abd25..0a68f33 100644 --- a/natsjobs/config.go +++ b/natsjobs/config.go @@ -5,7 +5,7 @@ import ( ) const ( - NatsSubjectHeaderKey string = "x-nats-subject" + subjectHeaderKey string = "x-nats-subject" pipeSubject string = "subject" pipeStream string = "stream" diff --git a/natsjobs/listener.go b/natsjobs/listener.go index a3be9b3..eb185ce 100644 --- a/natsjobs/listener.go +++ b/natsjobs/listener.go @@ -133,7 +133,7 @@ func (c *Driver) listenerStart() { //nolint:gocognit if item.headers == nil { item.headers = make(map[string][]string, 1) } - item.headers[NatsSubjectHeaderKey] = []string{m.Subject()} + item.headers[subjectHeaderKey] = []string{m.Subject()} c.prop.Inject(ctx, propagation.HeaderCarrier(item.headers)) c.queue.Insert(item)