Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-8227: Error handling in datasources #338

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 78 additions & 35 deletions pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"

@@ -318,6 +319,7 @@ type SubscriptionDataSource interface {
}

type Resolver struct {
sync.Mutex
ctx context.Context
dataLoaderEnabled bool
resultSetPool sync.Pool
@@ -329,6 +331,7 @@ type Resolver struct {
hash64Pool sync.Pool
dataloaderFactory *dataLoaderFactory
fetcher *Fetcher
fetchErrorMap map[int]error
}

type inflightFetch struct {
@@ -388,23 +391,24 @@ func New(ctx context.Context, fetcher *Fetcher, enableDataLoader bool) *Resolver
dataloaderFactory: newDataloaderFactory(fetcher),
fetcher: fetcher,
dataLoaderEnabled: enableDataLoader,
fetchErrorMap: make(map[int]error),
}
}

func (r *Resolver) resolveNode(ctx *Context, node Node, data []byte, bufPair *BufPair) (err error) {
func (r *Resolver) resolveNode(ctx *Context, node Node, data []byte, fetchError error, bufPair *BufPair) (err error) {
switch n := node.(type) {
case *Object:
return r.resolveObject(ctx, n, data, bufPair)
case *Array:
return r.resolveArray(ctx, n, data, bufPair)
return r.resolveArray(ctx, n, data, fetchError, bufPair)
case *Null:
if n.Defer.Enabled {
r.preparePatch(ctx, n.Defer.PatchIndex, nil, data)
}
r.resolveNull(bufPair.Data)
return
case *String:
return r.resolveString(ctx, n, data, bufPair)
return r.resolveString(ctx, n, data, fetchError, bufPair)
case *Boolean:
return r.resolveBoolean(ctx, n, data, bufPair)
case *Integer:
@@ -496,7 +500,7 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
}

ignoreData := false
err = r.resolveNode(ctx, response.Data, responseBuf.Data.Bytes(), buf)
err = r.resolveNode(ctx, response.Data, responseBuf.Data.Bytes(), nil, buf)
if err != nil {
if !errors.Is(err, errNonNullableFieldValueIsNull) {
return
@@ -648,21 +652,22 @@ func (r *Resolver) ResolveGraphQLResponsePatch(ctx *Context, patch *GraphQLRespo

ctx.pathPrefix = append(path, extraPath...)

var fetchError error
if patch.Fetch != nil {
set := r.getResultSet()
defer r.freeResultSet(set)
err = r.resolveFetch(ctx, patch.Fetch, data, set)
if err != nil {
return err
}
fetchError = r.resolveFetch(ctx, patch.Fetch, data, set)
//if err != nil {
// return err
//}
_, ok := set.buffers[0]
if ok {
r.MergeBufPairErrors(set.buffers[0], buf)
data = set.buffers[0].Data.Bytes()
}
}

err = r.resolveNode(ctx, patch.Value, data, buf)
err = r.resolveNode(ctx, patch.Value, data, fetchError, buf)
if err != nil {
return
}
@@ -716,7 +721,7 @@ func (r *Resolver) resolveEmptyObject(b *fastbuffer.FastBuffer) {
b.WriteBytes(rBrace)
}

func (r *Resolver) resolveArray(ctx *Context, array *Array, data []byte, arrayBuf *BufPair) (err error) {
func (r *Resolver) resolveArray(ctx *Context, array *Array, data []byte, fetchError error, arrayBuf *BufPair) (err error) {
if len(array.Path) != 0 {
data, _, _, _ = jsonparser.Get(data, array.Path...)
}
@@ -753,12 +758,12 @@ func (r *Resolver) resolveArray(ctx *Context, array *Array, data []byte, arrayBu
defer func() { ctx.removeResponseArrayLastElements(array.Path) }()

if array.ResolveAsynchronous && !array.Stream.Enabled && !r.dataLoaderEnabled {
return r.resolveArrayAsynchronous(ctx, array, arrayItems, arrayBuf)
return r.resolveArrayAsynchronous(ctx, array, arrayItems, fetchError, arrayBuf)
}
return r.resolveArraySynchronous(ctx, array, arrayItems, arrayBuf)
return r.resolveArraySynchronous(ctx, array, arrayItems, fetchError, arrayBuf)
}

func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItems *[][]byte, arrayBuf *BufPair) (err error) {
func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItems *[][]byte, fetchError error, arrayBuf *BufPair) (err error) {

itemBuf := r.getBufPair()
defer r.freeBufPair(itemBuf)
@@ -780,9 +785,12 @@ func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItem
}

ctx.addIntegerPathElement(i)
err = r.resolveNode(ctx, array.Item, (*arrayItems)[i], itemBuf)
err = r.resolveNode(ctx, array.Item, (*arrayItems)[i], fetchError, itemBuf)
ctx.removeLastPathElement()
if err != nil {
if itemBuf.HasErrors() {
r.MergeBufPairErrors(itemBuf, arrayBuf)
}
if errors.Is(err, errNonNullableFieldValueIsNull) && array.Nullable {
arrayBuf.Data.Reset()
r.resolveNull(arrayBuf.Data)
@@ -805,7 +813,7 @@ func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItem
return
}

func (r *Resolver) resolveArrayAsynchronous(ctx *Context, array *Array, arrayItems *[][]byte, arrayBuf *BufPair) (err error) {
func (r *Resolver) resolveArrayAsynchronous(ctx *Context, array *Array, arrayItems *[][]byte, fetchError error, arrayBuf *BufPair) (err error) {

arrayBuf.Data.WriteBytes(lBrack)

@@ -827,7 +835,7 @@ func (r *Resolver) resolveArrayAsynchronous(ctx *Context, array *Array, arrayIte
cloned := ctx.Clone()
go func(ctx Context, i int) {
ctx.addPathElement([]byte(strconv.Itoa(i)))
if e := r.resolveNode(&ctx, array.Item, itemData, itemBuf); e != nil && !errors.Is(e, errTypeNameSkipped) {
if e := r.resolveNode(&ctx, array.Item, itemData, fetchError, itemBuf); e != nil && !errors.Is(e, errTypeNameSkipped) {
select {
case errCh <- e:
default:
@@ -922,14 +930,20 @@ func (r *Resolver) resolveBoolean(ctx *Context, boolean *Boolean, data []byte, b
return nil
}

func (r *Resolver) resolveString(ctx *Context, str *String, data []byte, stringBuf *BufPair) error {
func (r *Resolver) resolveString(ctx *Context, str *String, data []byte, fetchError error, stringBuf *BufPair) error {
var (
value []byte
valueType jsonparser.ValueType
err error
)

// TODO clean this up
value, valueType, _, err = jsonparser.Get(data, str.Path...)
if value == nil && fetchError != nil && str.Nullable {
r.resolveNull(stringBuf.Data)
r.addError(ctx, fetchError, stringBuf)
return nil
}
if err != nil || valueType != jsonparser.String {
if err == nil && str.UnescapeResponseJson {
switch valueType {
@@ -942,6 +956,9 @@ func (r *Resolver) resolveString(ctx *Context, str *String, data []byte, stringB
return fmt.Errorf("invalid value type '%s' for path %s, expecting string, got: %v. You can fix this by configuring this field as Int/Float/JSON Scalar", valueType, string(ctx.path()), string(value))
}
if !str.Nullable {
if fetchError != nil {
r.addError(ctx, fetchError, stringBuf)
}
return errNonNullableFieldValueIsNull
}
r.resolveNull(stringBuf.Data)
@@ -1001,7 +1018,17 @@ func (r *Resolver) resolveNull(b *fastbuffer.FastBuffer) {
b.WriteBytes(null)
}

func (r *Resolver) addError(ctx *Context, err error, objectBuf *BufPair) {
r.writeError(ctx, err.Error(), objectBuf)
}

func (r *Resolver) addResolveError(ctx *Context, objectBuf *BufPair) {
r.writeError(ctx, string(unableToResolveMsg), objectBuf)
}

func (r *Resolver) writeError(ctx *Context, msg string, objectBuf *BufPair) {
msg = strings.ReplaceAll(msg, `"`, `\"`)
msg = fmt.Sprintf("error resolving: %s", msg)
locations, path := pool.BytesBuffer.Get(), pool.BytesBuffer.Get()
defer pool.BytesBuffer.Put(locations)
defer pool.BytesBuffer.Put(path)
@@ -1034,7 +1061,7 @@ func (r *Resolver) addResolveError(ctx *Context, objectBuf *BufPair) {
pathBytes = path.Bytes()
}

objectBuf.WriteErr(unableToResolveMsg, locations.Bytes(), pathBytes, nil)
objectBuf.WriteErr([]byte(msg), locations.Bytes(), pathBytes, nil)
}

func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, objectBuf *BufPair) (err error) {
@@ -1066,9 +1093,9 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
if object.Fetch != nil {
set = r.getResultSet()
defer r.freeResultSet(set)
err = r.resolveFetch(ctx, object.Fetch, data, set)
err := r.resolveFetch(ctx, object.Fetch, data, set)
if err != nil {
return
// TODO figure out what to do with error
}
for i := range set.buffers {
r.MergeBufPairErrors(set.buffers[i], objectBuf)
@@ -1085,7 +1112,6 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
first := true
skipCount := 0
for i := range object.Fields {

if object.Fields[i].SkipDirectiveDefined {
skip, err := jsonparser.GetBoolean(ctx.Variables, object.Fields[i].SkipVariableName)
if err == nil && skip {
@@ -1103,6 +1129,13 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
}

var fieldData []byte
var fetchError error
if object.Fields[i].HasBuffer {
fErr, ok := r.fetchErrorMap[object.Fields[i].BufferID]
if ok && fErr != nil {
fetchError = fErr
}
}
if set != nil && object.Fields[i].HasBuffer {
buffer, ok := set.buffers[object.Fields[i].BufferID]
if ok {
@@ -1152,7 +1185,7 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
objectBuf.Data.WriteBytes(colon)
ctx.addPathElement(object.Fields[i].Name)
ctx.setPosition(object.Fields[i].Position)
err = r.resolveNode(ctx, object.Fields[i].Value, fieldData, fieldBuf)
err = r.resolveNode(ctx, object.Fields[i].Value, fieldData, fetchError, fieldBuf)
ctx.removeLastPathElement()
ctx.responseElements = responseElements
ctx.lastFetchID = lastFetchID
@@ -1164,15 +1197,16 @@ func (r *Resolver) resolveObject(ctx *Context, object *Object, data []byte, obje
}
if errors.Is(err, errNonNullableFieldValueIsNull) {
objectBuf.Data.Reset()
fieldBufHasErrors := fieldBuf.HasErrors()
r.MergeBufPairErrors(fieldBuf, objectBuf)

if object.Nullable {
r.resolveNull(objectBuf.Data)
return nil
}

// if fied is of object type than we should not add resolve error here
if _, ok := object.Fields[i].Value.(*Object); !ok {
// if field is of object type than we should not add resolve error here
if _, ok := object.Fields[i].Value.(*Object); !ok && !fieldBufHasErrors {
r.addResolveError(ctx, objectBuf)
}
}
@@ -1266,7 +1300,7 @@ func (r *Resolver) resolveParallelFetch(ctx *Context, fetch *ParallelFetch, data
preparedInputs := r.getBufPairSlice()
defer r.freeBufPairSlice(preparedInputs)

resolvers := make([]func() error, 0, len(fetch.Fetches))
resolvers := make(map[int]func() error)

wg := r.getWaitGroup()
defer r.freeWaitGroup(wg)
@@ -1282,9 +1316,9 @@ func (r *Resolver) resolveParallelFetch(ctx *Context, fetch *ParallelFetch, data
}
*preparedInputs = append(*preparedInputs, preparedInput)
buf := set.buffers[f.BufferId]
resolvers = append(resolvers, func() error {
resolvers[f.BufferId] = func() error {
return r.resolveSingleFetch(ctx, f, preparedInput.Data, buf)
})
}
case *BatchFetch:
preparedInput := r.getBufPair()
err = r.prepareSingleFetch(ctx, f.Fetch, data, set, preparedInput.Data)
@@ -1293,17 +1327,21 @@ func (r *Resolver) resolveParallelFetch(ctx *Context, fetch *ParallelFetch, data
}
*preparedInputs = append(*preparedInputs, preparedInput)
buf := set.buffers[f.Fetch.BufferId]
resolvers = append(resolvers, func() error {
resolvers[f.Fetch.BufferId] = func() error {
return r.resolveBatchFetch(ctx, f, preparedInput.Data, buf)
})
}
}
}

for _, resolver := range resolvers {
go func(r func() error) {
_ = r()
for bufferID, resolver := range resolvers {
bufferID := bufferID
go func(r func() error, res *Resolver) {
err := r()
res.Lock()
defer res.Unlock()
res.fetchErrorMap[bufferID] = err
wg.Done()
}(resolver)
}(resolver, r)
}

wg.Wait()
@@ -1331,10 +1369,15 @@ func (r *Resolver) resolveBatchFetch(ctx *Context, fetch *BatchFetch, preparedIn
}

func (r *Resolver) resolveSingleFetch(ctx *Context, fetch *SingleFetch, preparedInput *fastbuffer.FastBuffer, buf *BufPair) error {
var err error
if r.dataLoaderEnabled && !fetch.DisableDataLoader {
return ctx.dataLoader.Load(ctx, fetch, buf)
err = ctx.dataLoader.Load(ctx, fetch, buf)
}
return r.fetcher.Fetch(ctx, fetch, preparedInput, buf)
err = r.fetcher.Fetch(ctx, fetch, preparedInput, buf)
r.Lock()
defer r.Unlock()
r.fetchErrorMap[fetch.BufferId] = err
return err
}

type Object struct {