Skip to content

Commit

Permalink
refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
ignavan39 committed Sep 20, 2023
1 parent a6fa6ef commit 7ea0d6f
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 40 deletions.
6 changes: 3 additions & 3 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func main() {

go func() {
for i := 0; i <= 20; i++ {
wq.WriteChannel <- piper.Message{
wq.Write() <- piper.Message{
Payload: i,
UID: "uid",
}
Expand All @@ -37,9 +37,9 @@ func main() {
if err != nil {
panic(err)
}
for message := range c.MessagesChannel {
for message := range c.Read() {
fmt.Println(message)
c.Report.Report <- piper.Report{
c.Report() <- piper.Report{
Done: &piper.DoneReport{
Status: 1,
},
Expand Down
56 changes: 32 additions & 24 deletions read_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ import (

type ReadQueueReport struct {
channel *amqp.Channel
Report chan Report
report chan Report
exchangeName string
routingKey string
}

type ReadQueue struct {
conn *amqp.Connection
channel *amqp.Channel
queue string
done chan error
routines int
MessagesChannel chan Message
Report *ReadQueueReport
conn *amqp.Connection
channel *amqp.Channel
queue string
done chan error
routines int
read chan Message
report *ReadQueueReport
}

func NewReadQueue(
Expand Down Expand Up @@ -68,12 +68,12 @@ func NewReadQueue(
channel.Qos(routines, 0, false)

return &ReadQueue{
channel: channel,
conn: conn,
queue: queue,
done: make(chan error),
routines: routines,
MessagesChannel: make(chan Message),
channel: channel,
conn: conn,
queue: queue,
done: make(chan error),
routines: routines,
read: make(chan Message),
}, nil
}

Expand All @@ -96,17 +96,25 @@ func (rq *ReadQueue) WithReport(reportExchangeName string, reportRoutingKey stri
return nil, fmt.Errorf("[Rq][%s][createReportChannel]: %s", rq.queue, err)
}
report := make(chan Report)
rq.Report = &ReadQueueReport{
rq.report = &ReadQueueReport{
exchangeName: reportExchangeName,
channel: reportChannel,
Report: report,
report: report,
routingKey: reportRoutingKey,
}
return rq, nil
}

func (rq *ReadQueue) Report() chan Report {
return rq.report.report
}

func (rq *ReadQueue) Read() <-chan Message {
return rq.read
}

func (rq *ReadQueue) Run() error {
defer close(rq.MessagesChannel)
defer close(rq.read)
var wg sync.WaitGroup
deliveries, err := rq.channel.Consume(
rq.queue,
Expand Down Expand Up @@ -148,28 +156,28 @@ func (rq *ReadQueue) Run() error {
continue
}
fmt.Println(idx)
rq.MessagesChannel <- *message
rq.read <- *message
}
}
}(i)
}
}()

if rq.Report != nil {
if rq.report != nil {
wg.Add(1)
go func() {
defer close(rq.Report.Report)
defer close(rq.report.report)
defer wg.Done()
for report := range rq.Report.Report {
for report := range rq.report.report {
buffer, err := json.Marshal(report)
if err != nil {
fmt.Printf("[Rq][%s][Run][report] - failed marshal: %s", rq.queue, err)
continue
}

rq.Report.channel.Publish(
rq.Report.exchangeName,
rq.Report.routingKey,
rq.report.channel.Publish(
rq.report.exchangeName,
rq.report.routingKey,
false,
false,
amqp.Publishing{
Expand Down
30 changes: 17 additions & 13 deletions write_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
)

type WriteQueue struct {
WriteChannel chan Message
conn *amqp.Connection
channel *amqp.Channel
exchange string
routingKey string
done chan error
write chan Message
conn *amqp.Connection
channel *amqp.Channel
exchange string
routingKey string
done chan error
}

func NewWriteQueue(
Expand All @@ -41,17 +41,21 @@ func NewWriteQueue(
}

wq := &WriteQueue{
WriteChannel: make(chan Message),
conn: conn,
channel: channel,
exchange: exchange,
routingKey: routingKey,
write: make(chan Message),
conn: conn,
channel: channel,
exchange: exchange,
routingKey: routingKey,
}
return wq, nil
}

func (wq *WriteQueue) Write() chan Message {
return wq.write
}

func (wq *WriteQueue) Run() {
defer close(wq.WriteChannel)
defer close(wq.write)
var wg sync.WaitGroup

wg.Add(1)
Expand All @@ -60,7 +64,7 @@ func (wq *WriteQueue) Run() {
defer wg.Done()
for {
select {
case payload, ok := <-wq.WriteChannel:
case payload, ok := <-wq.write:
fmt.Println(payload)
if !ok {
fmt.Printf("[Wq][%s-%s][Run][channel closed]\n", wq.exchange, wq.routingKey)
Expand Down

0 comments on commit 7ea0d6f

Please sign in to comment.