Skip to content

Commit

Permalink
Merge pull request #4 from schoeppi5/initial
Browse files Browse the repository at this point in the history
Initial code push
  • Loading branch information
schoeppi5 authored Feb 6, 2021
2 parents 6bd780c + b078cb2 commit 6d010a7
Show file tree
Hide file tree
Showing 45 changed files with 4,417 additions and 0 deletions.
17 changes: 17 additions & 0 deletions communication/communication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package communication_test

import (
"fmt"
"testing"
)

// Test helper

func LogTestError(have, want interface{}, t *testing.T, extraInfo ...string) {
msg := fmt.Sprintf("%s: Test %s failed\n\tHave: %+v\n\tWant: %+v\n", "core_test", t.Name(), have, want)
if len(extraInfo) > 0 {
msg += "\n"
msg += fmt.Sprintln(extraInfo)
}
t.Errorf(msg)
}
68 changes: 68 additions & 0 deletions communication/eventstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package communication

import (
"sync"

"github.com/schoeppi5/libts"
)

// EventStore is used to manage events across a subscriber and the event loop (communication.SortEvents)
// it is basically a thread-safe map
type EventStore struct {
store map[string]*libts.Event
lock sync.Locker
}

// NewEventStore retuns a thread safe EventStore
func NewEventStore() *EventStore {
return &EventStore{
store: make(map[string]*libts.Event),
lock: &sync.Mutex{},
}
}

// Add adds/updates name event pairs to/in the store
func (es *EventStore) Add(name string, event *libts.Event) {
es.lock.Lock()
defer es.lock.Unlock()
es.store[name] = event
}

// Del deletes name event pairs from the store
func (es *EventStore) Del(name string) {
es.lock.Lock()
defer es.lock.Unlock()
delete(es.store, name)
}

// Get returns the event to a name
func (es *EventStore) Get(name string) (*libts.Event, bool) {
es.lock.Lock()
defer es.lock.Unlock()
e, ok := es.store[name]
return e, ok
}

// Clean the whole store
func (es *EventStore) Clean() {
es.lock.Lock()
defer es.lock.Unlock()
es.store = make(map[string]*libts.Event)
}

// Len returns the number of events in the store
func (es *EventStore) Len() int {
es.lock.Lock()
defer es.lock.Unlock()
return len(es.store)
}

// Keys all keys in store
func (es *EventStore) Keys() (keys []string) {
es.lock.Lock()
defer es.lock.Unlock()
for key := range es.store {
keys = append(keys, key)
}
return
}
128 changes: 128 additions & 0 deletions communication/eventstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package communication

import (
"fmt"
"testing"

"github.com/schoeppi5/libts"
)

func TestAdd(t *testing.T) {
// given
es := NewEventStore()
key := "test"
value := &libts.Event{
C: nil,
Template: "string",
}

// when
es.Add(key, value)

// then
if es.store[key] != value {
t.Errorf("Test %s failed!\n\tHave: %v\n\tWant: %v", t.Name(), es.store[key], value)
}
}

func TestDel(t *testing.T) {
// given
es := NewEventStore()
key := "test"
value := &libts.Event{
C: nil,
Template: nil,
}
es.Add(key, value)

// when
es.Del(key)

// then
if _, ok := es.store[key]; ok {
t.Errorf("Test %s failed!\n\tKey %s still present in store", t.Name(), key)
}
}

func TestGetPresent(t *testing.T) {
// given
es := NewEventStore()
key := "test"
value := &libts.Event{
C: nil,
Template: nil,
}
es.Add(key, value)

// when
v, ok := es.Get(key)

// then
if !ok {
t.Errorf("Test %s failed!\n\tKey %s not present in store", t.Name(), key)
}
if v != value {
t.Errorf("Test %s failed!\n\tHave: %v\n\tWant: %v", t.Name(), v, value)
}
}

func TestGetNotPresent(t *testing.T) {
// given
es := NewEventStore()
key := "test"

// when
v, ok := es.Get(key)

// then
if ok {
t.Errorf("Test %s failed!\n\tFound value for key %s: %v", t.Name(), key, v)
}
}

func TestLen(t *testing.T) {
// given
es := NewEventStore()
keyCount := 10
for i := 0; i < keyCount; i++ {
es.Add(fmt.Sprintf("%d", i), nil)
}

// when
l := es.Len()

if l != keyCount {
t.Errorf("Test %s failed!\n\tHave: %d\n\tWant: %d", t.Name(), l, keyCount)
}
}

func TestClean(t *testing.T) {
// given
es := NewEventStore()
es.Add("test", nil)

// when
es.Clean()

// then
if len(es.store) != 0 {
t.Errorf("Test %s failed!\n\tHave: %d\n\tWant: %d", t.Name(), len(es.store), 0)
}
}

func TestKeys(t *testing.T) {
// given
es := NewEventStore()
es.Add("test", nil)

// when
keys := es.Keys()

// then
if len(keys) != 1 {
t.Errorf("Test %s failed!\n\tHave: %d\n\tWant: %d", t.Name(), len(keys), 1)
}
if keys[0] != "test" {
t.Errorf("Test %s failed!\n\tHave: %s\n\tWant: %s", t.Name(), keys[0], "test")
}
}
3 changes: 3 additions & 0 deletions communication/mocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package communication_test

// TODO: write query mock
119 changes: 119 additions & 0 deletions communication/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package communication

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"time"
)

// Shared codebase for queries

// QueryError is returned when the query answered with an error
type QueryError struct {
ID int `mapstructure:"id" json:"code"`
Message string `mapstructure:"msg" json:"message"`
ExtraMessage string `json:"extra_message"` // only set on webquery
}

func (qe QueryError) Error() string {
s := fmt.Sprintf("Query error(%d): %s", qe.ID, qe.Message)
if qe.ExtraMessage != "" {
s += fmt.Sprintf(" (Extra message: %s)", qe.ExtraMessage)
}
return s
}

// KeepAlive sends " \n" every t to conn
func KeepAlive(conn io.Writer, t time.Duration) {
ticker := time.NewTicker(t)
for {
<-ticker.C
conn.Write([]byte(" \n"))
}
}

// ReadHeader slurps the header from the io.Reader
func ReadHeader(r <-chan []byte) error {
// header
header, open := <-r
if !open {
return errors.New("unable to read header: connection closed")
}
if string(header) != "TS3" {
return errors.New("wrong header")
}
// banner
_, open = <-r
if !open {
return errors.New("unable to read banner: connection closed")
}
return nil
}

// Run writes r to in and reads until it encounters an error. If error has id 0, the read data is returned
func Run(in <-chan []byte, out io.Writer, r []byte) ([]byte, error) {
if !bytes.HasSuffix(r, []byte("\n")) { // a command must be suffixed by \n
r = append(r, byte('\n'))
}
_, err := out.Write(r)
if err != nil {
return nil, err
}
var data []byte
for {
d, open := <-in
if !open {
return nil, errors.New("unable to read response: connection closed")
}
if err = IsError(d); err != nil {
if e, ok := err.(QueryError); ok {
if e.ID == 0 {
return data, nil
}
return nil, err
}
return nil, err
}
if len(data) != 0 {
data = append(data, byte('|'))
}
data = append(data, d...)
}
}

// Split the input from c
// notifications (prefixed with notify.*) are send to notify, everything else is send to out
// Stops when c is closed or it encounters an error while reading from c
// If notify is nil, notifications are discarded
// If out is nil, Split returns
func Split(c io.Reader, out chan<- []byte, notify chan<- []byte) {
if out == nil {
return
}
reader := bufio.NewReader(c)
for {
data, err := reader.ReadBytes('\n')
if err != nil {
close(out)
if notify != nil {
close(notify)
}
return
}
data = bytes.TrimRight(bytes.TrimLeft(data, "\r"), "\n") // normalize data
if bytes.HasPrefix(data, []byte("notify")) {
if notify == nil {
continue
}
select { // non blocking write to notify
case notify <- data:
default:
}
continue
}
out <- data // block on write to out
}
}
Loading

0 comments on commit 6d010a7

Please sign in to comment.