Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor queue module #314

Open
wants to merge 103 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 78 commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
043f128
refactor: queue module
devhaozi Oct 7, 2023
22621b8
chore: update mocks
devhaozi Oct 7, 2023
33a9c54
feat: update
devhaozi Oct 8, 2023
28b9344
Merge remote-tracking branch 'origin/haozi/queue' into haozi/queue
devhaozi Oct 8, 2023
ca16d6a
chore: update mocks
devhaozi Oct 8, 2023
745bc27
fix: type
devhaozi Oct 8, 2023
071cccf
chore: update mocks
devhaozi Oct 8, 2023
150f302
feat: update database job
devhaozi Oct 9, 2023
d6e6599
Merge remote-tracking branch 'origin/haozi/queue' into haozi/queue
devhaozi Oct 9, 2023
45598dc
chore: update mocks
devhaozi Oct 9, 2023
e5d5b6c
feat: update facades
devhaozi Oct 9, 2023
ea6ac5a
Merge remote-tracking branch 'origin/haozi/queue' into haozi/queue
devhaozi Oct 9, 2023
cb3214c
fix: import cycle
devhaozi Oct 9, 2023
870781f
fix: lint
devhaozi Oct 9, 2023
9a7904f
feat: optimize config
devhaozi Oct 9, 2023
657663d
feat: optimize migrations
devhaozi Oct 9, 2023
854fc60
feat: sync and async
devhaozi Oct 9, 2023
03d489a
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi Oct 29, 2023
e40811b
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi Nov 9, 2023
c7dbcec
feat: sync master
devhaozi Nov 9, 2023
21578e7
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi Dec 3, 2023
fcdf373
chore: merge master and update
devhaozi Dec 3, 2023
433d0b8
fix: sync databases
devhaozi Dec 3, 2023
4c34582
feat: update database migrations
devhaozi Dec 3, 2023
6527ab8
feat: update async and database
devhaozi Dec 3, 2023
924ceba
refactor: use Payload instead Arg
devhaozi Dec 4, 2023
f5f057e
feat: update
devhaozi Dec 4, 2023
c6c226f
feat: update
devhaozi Dec 4, 2023
3bb00eb
feat: redis
devhaozi Dec 4, 2023
2b159b9
refactor: redis driver
devhaozi Dec 5, 2023
760c48f
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi Dec 5, 2023
fa03dc5
chore: merge master
devhaozi Dec 5, 2023
5c13b36
chore: update mocks
devhaozi Dec 5, 2023
19a97d2
fix: event error
devhaozi Dec 5, 2023
96f5ea1
Merge remote-tracking branch 'origin/haozi/queue' into haozi/queue
devhaozi Dec 5, 2023
1590461
fix: tests
devhaozi Dec 5, 2023
31cf3b6
fix: lint
devhaozi Dec 5, 2023
6481f4e
fix: email orm mock
devhaozi Dec 5, 2023
34d9850
fix: email orm mock
devhaozi Dec 5, 2023
d706baa
refactor: use reflect to handle job
devhaozi Dec 5, 2023
8e8c003
chore: update mocks
devhaozi Dec 5, 2023
44fe027
feat: add test case for argsToValues
devhaozi Dec 5, 2023
4309dc4
Merge remote-tracking branch 'origin/haozi/queue' into haozi/queue
devhaozi Dec 5, 2023
4b66470
fix: convert error
devhaozi Dec 5, 2023
0069909
fix: optimize SQL
devhaozi Dec 10, 2023
36f8b21
feat: some optimize
devhaozi Dec 17, 2023
26db41f
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi Dec 17, 2023
04aff6d
feat: merge master
devhaozi Dec 17, 2023
97bab31
chore: update mocks
devhaozi Dec 17, 2023
87ecd1a
feat: add test cases
devhaozi Dec 17, 2023
1aec5ef
fix: nil
devhaozi Dec 17, 2023
f8c9b63
fix: mock
devhaozi Dec 17, 2023
093d250
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi May 26, 2024
51aa085
chore: merge master
devhaozi May 26, 2024
8a1e5d0
chore: update mocks
devhaozi May 26, 2024
efa0b5e
refactor: use new method
devhaozi May 26, 2024
3b51863
chore: update mocks
devhaozi May 26, 2024
f15a41c
refactor: remove unused methods
devhaozi May 26, 2024
f414760
Merge remote-tracking branch 'origin/haozi/queue' into haozi/queue
devhaozi May 26, 2024
54a0d05
chore: update mocks
devhaozi May 26, 2024
119bb4c
fix: lint
devhaozi May 26, 2024
7ac2501
Merge remote-tracking branch 'origin/haozi/queue' into haozi/queue
devhaozi May 26, 2024
8e9f953
fix: email test
devhaozi May 26, 2024
7a607fb
fix: email test
devhaozi May 26, 2024
cb49814
refactor: use any type
devhaozi May 26, 2024
e35cca2
chore: update mocks
devhaozi May 26, 2024
c027db0
fix: lint
devhaozi May 26, 2024
fcbe1a0
fix: test
devhaozi May 26, 2024
807bee4
fix: test
devhaozi May 26, 2024
f09f9b3
fix: test
devhaozi May 26, 2024
4345964
fix: test
devhaozi May 26, 2024
4f570bd
feat: support custom
devhaozi May 26, 2024
ae5ac8b
feat: refactor job struct
devhaozi May 26, 2024
ce8577b
fix: test
devhaozi May 26, 2024
9158c13
fix: test
devhaozi May 26, 2024
4fcadbb
feat: add migrate make
devhaozi May 26, 2024
94593ba
feat: add method to get job
devhaozi May 26, 2024
98edd33
chore: update mocks
devhaozi May 26, 2024
5408563
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi May 29, 2024
b573ced
chore: merge master
devhaozi May 29, 2024
443e5ef
chore: update github.com/charmbracelet/huh
devhaozi May 29, 2024
bddcb8b
fix: remove signal listen
devhaozi May 29, 2024
530af3e
Merge remote-tracking branch 'refs/remotes/origin/master' into haozi/…
devhaozi Jun 8, 2024
72e9e36
chore: merge master
devhaozi Jun 8, 2024
13e3e57
Merge branch 'refs/heads/master' into haozi/queue
devhaozi Jun 18, 2024
fed0b61
chore: merge master
devhaozi Jun 18, 2024
731b8d9
fix: lint
devhaozi Jun 18, 2024
6906b86
Merge branch 'refs/heads/master' into haozi/queue
devhaozi Jun 27, 2024
91c3c46
chore: merge master
devhaozi Jun 27, 2024
25e1b1b
Merge remote-tracking branch 'refs/remotes/origin/master' into haozi/…
devhaozi Jul 10, 2024
ad7e517
chore: merge master
devhaozi Jul 10, 2024
4069a4e
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi Jul 10, 2024
5067a9a
fix: add missing Lang method
devhaozi Jul 10, 2024
410aac7
Merge branch 'refs/heads/haozi/lang' into haozi/queue
devhaozi Jul 10, 2024
ebb185a
feat: optimize code
devhaozi Jul 10, 2024
f2fb5aa
fix: lint
devhaozi Jul 10, 2024
d704469
Merge branch 'refs/heads/haozi/lang' into haozi/queue
devhaozi Jul 10, 2024
202ac79
Merge remote-tracking branch 'refs/remotes/origin/master' into haozi/…
devhaozi Jul 12, 2024
8fee275
chore: merge master
devhaozi Jul 12, 2024
00d3fef
Merge remote-tracking branch 'origin/master' into haozi/queue
devhaozi Jul 12, 2024
f1b118b
feat: optimize test
devhaozi Jul 12, 2024
78d1d98
Merge branch 'refs/heads/master' into haozi/queue
devhaozi Jul 28, 2024
b71cfeb
chore: merge master
devhaozi Jul 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,5 @@ jobs:
uses: golangci/golangci-lint-action@v6
with:
skip-cache: true
skip-pkg-cache: true
skip-build-cache: true
version: latest
args: --timeout=30m ./...
6 changes: 5 additions & 1 deletion .github/workflows/pr-check-title.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
name: PR Check Title
on:
pull_request:
pull_request_target:
types:
- opened
- edited
- synchronize
jobs:
pr-check-title:
runs-on: ubuntu-latest
Expand Down
16 changes: 16 additions & 0 deletions contracts/queue/driver.go
devhaozi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package queue

type Driver interface {
// Connection returns the connection name for the driver.
Connection() string
// Driver returns the driver name for the driver.
Driver() string
// Push pushes the job onto the queue.
Push(job Job, args []any, queue string) error
// Bulk pushes a slice of jobs onto the queue.
Bulk(jobs []Jobs, queue string) error
// Later pushes the job onto the queue after a delay.
Later(delay uint, job Job, args []any, queue string) error
// Pop pops the next job off of the queue.
Pop(queue string) (Job, []any, error)
}
5 changes: 3 additions & 2 deletions contracts/queue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Job interface {
}

type Jobs struct {
Job Job
Args []Arg
Job Job
Args []any
Delay uint
}
14 changes: 6 additions & 8 deletions contracts/queue/queue.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package queue

type Queue interface {
Worker(args *Args) Worker
Worker(payloads ...*Args) Worker
// Register register jobs
Register(jobs []Job)
Register(jobs []Job) error
// GetJobs get all jobs
GetJobs() []Job
// GetJob get job by signature
GetJob(signature string) (Job, error)
// Job add a job to queue
Job(job Job, args []Arg) Task
Job(job Job, args []any) Task
// Chain creates a chain of jobs to be processed one by one, passing
Chain(jobs []Jobs) Task
}

type Worker interface {
Run() error
Shutdown() error
}

type Args struct {
Expand All @@ -24,8 +27,3 @@ type Args struct {
// Concurrent num
Concurrent int
}

type Arg struct {
Type string
Value any
}
8 changes: 2 additions & 6 deletions contracts/queue/task.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package queue

import (
"time"
)

type Task interface {
// Dispatch dispatches the task.
Dispatch() error
// DispatchSync dispatches the task synchronously.
DispatchSync() error
// Delay dispatches the task after the given delay.
Delay(time time.Time) Task
// Delay dispatches the task after the given time.
Delay(time uint) Task
// OnConnection sets the connection of the task.
OnConnection(connection string) Task
// OnQueue sets the queue of the task.
Expand Down
53 changes: 29 additions & 24 deletions database/console/migrate_stubs.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Haven't implementing this feature for now, right?

Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package console

type MysqlStubs struct {
}
type MysqlStubs struct{}

// CreateUp Create up migration content.
func (receiver MysqlStubs) CreateUp() string {
return `CREATE TABLE DummyTable (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
created_at datetime(3) NOT NULL,
updated_at datetime(3) NOT NULL,
id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
created_at DATETIME(3) NOT NULL,
updated_at DATETIME(3) NOT NULL,
PRIMARY KEY (id),
KEY idx_DummyTable_created_at (created_at),
KEY idx_DummyTable_updated_at (updated_at)
Expand All @@ -24,7 +23,7 @@ func (receiver MysqlStubs) CreateDown() string {

// UpdateUp Update up migration content.
func (receiver MysqlStubs) UpdateUp() string {
return `ALTER TABLE DummyTable ADD column varchar(255) COMMENT '';
devhaozi marked this conversation as resolved.
Show resolved Hide resolved
return `ALTER TABLE DummyTable ADD column VARCHAR(255);
`
}

Expand All @@ -34,16 +33,18 @@ func (receiver MysqlStubs) UpdateDown() string {
`
}

type PostgresqlStubs struct {
}
type PostgresqlStubs struct{}

// CreateUp Create up migration content.
func (receiver PostgresqlStubs) CreateUp() string {
return `CREATE TABLE DummyTable (
id SERIAL PRIMARY KEY NOT NULL,
created_at timestamp NOT NULL,
updated_at timestamp NOT NULL
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);

CREATE INDEX idx_DummyTable_created_at ON DummyTable (created_at);
CREATE INDEX idx_DummyTable_updated_at ON DummyTable (updated_at);
`
}

Expand All @@ -55,7 +56,7 @@ func (receiver PostgresqlStubs) CreateDown() string {

// UpdateUp Update up migration content.
func (receiver PostgresqlStubs) UpdateUp() string {
return `ALTER TABLE DummyTable ADD column varchar(255) NOT NULL;
return `ALTER TABLE DummyTable ADD column TEXT NOT NULL;
`
}

Expand All @@ -65,16 +66,18 @@ func (receiver PostgresqlStubs) UpdateDown() string {
`
}

type SqliteStubs struct {
}
type SqliteStubs struct{}

// CreateUp Create up migration content.
func (receiver SqliteStubs) CreateUp() string {
return `CREATE TABLE DummyTable (
id integer PRIMARY KEY AUTOINCREMENT NOT NULL,
created_at datetime NOT NULL,
updated_at datetime NOT NULL
id BIGINT PRIMARY KEY AUTOINCREMENT NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
);

CREATE INDEX idx_DummyTable_created_at ON DummyTable (created_at);
CREATE INDEX idx_DummyTable_updated_at ON DummyTable (updated_at);
`
}

Expand All @@ -86,7 +89,7 @@ func (receiver SqliteStubs) CreateDown() string {

// UpdateUp Update up migration content.
func (receiver SqliteStubs) UpdateUp() string {
return `ALTER TABLE DummyTable ADD column text;
return `ALTER TABLE DummyTable ADD column TEXT;
`
}

Expand All @@ -96,17 +99,19 @@ func (receiver SqliteStubs) UpdateDown() string {
`
}

type SqlserverStubs struct {
}
type SqlserverStubs struct{}

// CreateUp Create up migration content.
func (receiver SqlserverStubs) CreateUp() string {
return `CREATE TABLE DummyTable (
id bigint NOT NULL IDENTITY(1,1),
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
id BIGINT NOT NULL IDENTITY(1,1),
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
PRIMARY KEY (id)
);

CREATE INDEX idx_DummyTable_created_at ON DummyTable (created_at);
CREATE INDEX idx_DummyTable_updated_at ON DummyTable (updated_at);
`
}

Expand All @@ -118,7 +123,7 @@ func (receiver SqlserverStubs) CreateDown() string {

// UpdateUp Update up migration content.
func (receiver SqlserverStubs) UpdateUp() string {
return `ALTER TABLE DummyTable ADD column varchar(255);
return `ALTER TABLE DummyTable ADD column VARCHAR(255);
`
}

Expand Down
5 changes: 4 additions & 1 deletion event/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func (app *Application) Register(events map[event.Event][]event.Listener) {
}
}

app.queue.Register(jobs)
err := app.queue.Register(jobs)
if err != nil {
panic(err.Error())
}
devhaozi marked this conversation as resolved.
Show resolved Hide resolved
}

func (app *Application) GetEvents() map[event.Event][]event.Listener {
Expand Down
6 changes: 3 additions & 3 deletions event/service_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package event
import (
"github.com/goravel/framework/contracts/console"
"github.com/goravel/framework/contracts/foundation"
eventConsole "github.com/goravel/framework/event/console"
eventconsole "github.com/goravel/framework/event/console"
)

const Binding = "goravel.event"
Expand All @@ -23,7 +23,7 @@ func (receiver *ServiceProvider) Boot(app foundation.Application) {

func (receiver *ServiceProvider) registerCommands(app foundation.Application) {
app.MakeArtisan().Register([]console.Command{
&eventConsole.EventMakeCommand{},
&eventConsole.ListenerMakeCommand{},
&eventconsole.EventMakeCommand{},
&eventconsole.ListenerMakeCommand{},
})
}
9 changes: 3 additions & 6 deletions event/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,10 @@ func (receiver *Task) Dispatch() error {
return nil
}

func eventArgsToQueueArgs(args []event.Arg) []queuecontract.Arg {
var queueArgs []queuecontract.Arg
func eventArgsToQueueArgs(args []event.Arg) []any {
var queueArgs []any
for _, arg := range args {
queueArgs = append(queueArgs, queuecontract.Arg{
Type: arg.Type,
Value: arg.Value,
})
queueArgs = append(queueArgs, arg.Value)
}

return queueArgs
Expand Down
9 changes: 2 additions & 7 deletions event/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/assert"

"github.com/goravel/framework/contracts/event"
queuecontract "github.com/goravel/framework/contracts/queue"
queuemock "github.com/goravel/framework/mocks/queue"
)

Expand All @@ -32,9 +31,7 @@ func TestDispatch(t *testing.T) {
listener := &TestListener{}
mockTask := &queuemock.Task{}

mockQueue.On("Job", listener, []queuecontract.Arg{
{Type: "string", Value: "test"},
}).Return(mockTask).Once()
mockQueue.On("Job", listener, []any{"test"}).Return(mockTask).Once()
mockTask.On("DispatchSync").Return(nil).Once()

task = NewTask(mockQueue, []event.Arg{
Expand All @@ -51,9 +48,7 @@ func TestDispatch(t *testing.T) {
listener := &TestListenerHandleError{}
mockTask := &queuemock.Task{}

mockQueue.On("Job", listener, []queuecontract.Arg{
{Type: "string", Value: "test"},
}).Return(mockTask).Once()
mockQueue.On("Job", listener, []any{"test"}).Return(mockTask).Once()
mockTask.On("DispatchSync").Return(errors.New("error")).Once()

task = NewTask(mockQueue, []event.Arg{
Expand Down
Loading