Skip to content

Commit

Permalink
Merge pull request #5 from mundipagg/feature/improve-serialize
Browse files Browse the repository at this point in the history
improve serialize
  • Loading branch information
paulolimarb authored Nov 18, 2019
2 parents 6a56a97 + 3784cf9 commit 013b93d
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
25 changes: 18 additions & 7 deletions buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type buffer struct {
cap int
size int
expiration time.Duration
chunks chan []interface{}
chunks chan entry
items []interface{}
backoff time.Duration
}
Expand All @@ -43,7 +43,10 @@ func (b *buffer) clear() {
b.size = 0
b.items = make([]interface{}, b.cap)
go func() {
b.chunks <- events
b.chunks <- entry{
items: events,
retries: cap(b.chunks),
}
}()
}
}
Expand Down Expand Up @@ -71,6 +74,11 @@ type Config struct {
OnOverflow func([]interface{}) error
}

type entry struct {
items []interface{}
retries int
}

func New(c Config) Buffer {
if c.Cap == 0 {
c.Cap = DefaultCapacity
Expand All @@ -90,7 +98,7 @@ func New(c Config) Buffer {
size: 0,
cap: c.Cap,
expiration: c.Expiration,
chunks: make(chan []interface{}, c.OnWait),
chunks: make(chan entry, c.OnWait),
items: make([]interface{}, c.Cap),
backoff: c.BackOff,
}
Expand All @@ -107,11 +115,14 @@ func (b *buffer) consumer(c Config) {
}
}()
for events := range b.chunks {
err := c.OnOverflow(events)
err := c.OnOverflow(events.items)
if err != nil {
go func(events []interface{}) {
time.Sleep(b.backoff)
b.chunks <- events
go func(events entry) {
events.retries--
if events.retries >= 0 {
time.Sleep(b.backoff)
b.chunks <- events
}
}(events)
}
}
Expand Down
6 changes: 3 additions & 3 deletions buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestBuffer_Write(t *testing.T) {
size: 0,
cap: 10,
items: make([]interface{}, 10),
chunks: make(chan []interface{}, 10),
chunks: make(chan entry, 10),
}
subject.Write("something")
is.Equal(1, subject.size, "it should increment the size of the buffer in one unit")
Expand All @@ -34,15 +34,15 @@ func TestBuffer_Write(t *testing.T) {
size: 0,
cap: 1,
items: make([]interface{}, 1),
chunks: make(chan []interface{}, 10),
chunks: make(chan entry, 10),
}
subject.Write("something")
is.Equal(0, subject.size, "it should remain zero")
is.Equal([]interface{}{nil}, subject.items, "it should clean the buffer's inner slice")
timeout := time.NewTimer(10 * time.Millisecond)
select {
case actual := <-subject.chunks:
is.Equal([]interface{}{"something"}, actual, "it should read the expected slice")
is.Equal(entry{items:[]interface{}{"something"}, retries: 10} , actual, "it should read the expected slice")
case <-timeout.C:
is.Fail("nothing was published")
}
Expand Down
33 changes: 27 additions & 6 deletions json/encoder/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ func (changer *Struct) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteObjectStart()
numFields := v.NumField()
if numFields > 0 {
lastWasWrote := false
for i := 0; i < numFields; i++ {
var i int
for i = 0; i < numFields; i++ {
fv := v.Field(i)
ft := changer.Type.Field(i)
lastWasWrote = changer.writeField(ft, fv, stream, lastWasWrote)
if changer.writeField(ft, fv, stream, false) {
break
}
}
i++
for ; i < numFields; i++ {
fv := v.Field(i)
ft := changer.Type.Field(i)
changer.writeField(ft, fv, stream, true)
}
}
stream.WriteObjectEnd()
Expand All @@ -56,15 +64,19 @@ func (changer *Struct) writeField(structField reflect.StructField, value reflect
if !value.CanInterface() {
return false
}
if needsComma {
stream.WriteMore()
}

tag := strings.TrimSpace(structField.Tag.Get("json"))
if len(tag) == 0 {
if needsComma {
stream.WriteMore()
}
stream.WriteObjectField(changer.Strategy(structField.Name))
stream.WriteVal(value.Interface())

} else {

pieces := strings.Split(tag, ",")

if len(pieces) > 1 {
if pieces[1] == "omitempty" {
isZero := func() (isZero bool) {
Expand All @@ -88,8 +100,17 @@ func (changer *Struct) writeField(structField reflect.StructField, value reflect
}
}
}

if pieces[0] == "-" {
return false
}

if needsComma {
stream.WriteMore()
}
stream.WriteObjectField(changer.Strategy(pieces[0]))
stream.WriteVal(value.Interface())

}
return true
}

0 comments on commit 013b93d

Please sign in to comment.