Skip to content

Streaming JSON Objects from a Remote File

Viktor Nikolaiev edited this page Jun 19, 2024 · 1 revision

This code demonstrates how to transform a JSON array streamed from a remote URL into a rill stream of objects. This approach allows processing JSON concurrently and on the fly without allocating memory for all objects.

The StreamJSONArray generic reusable function takes a io.ReadCloser (which can be obtained from an HTTP response body) and a variadic list of keys to navigate to the desired JSON array. It returns a stream of objects of type T.

Here's an example of how to use StreamJSONArray to stream products from https://dummyjson.com/products:

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"net/http"

	"github.com/destel/rill"
)

// StreamJSONArray streams objects of type T from a JSON array.
func StreamJSONArray[T any](r io.ReadCloser, keys ...string) <-chan rill.Try[T] {
	var zero T
	out := make(chan rill.Try[T])

	go func() {
		defer r.Close()
		defer close(out)

		dec := json.NewDecoder(r)

		// Navigate to the specified key if provided
		if err := jsonNavigateToKey(dec, keys...); err != nil {
			out <- rill.Wrap(zero, err)
			return
		}

		// Expect '['
		if err := jsonExpectToken(dec, json.Delim('[')); err != nil {
			out <- rill.Wrap(zero, err)
			return
		}

		// Stream array items
		for dec.More() {
			var item T
			if err := dec.Decode(&item); err != nil {
				out <- rill.Wrap(zero, err)
				return
			}
			out <- rill.Wrap(item, nil)
		}

		// Expect ']
		if err := jsonExpectToken(dec, json.Delim(']')); err != nil {
			out <- rill.Wrap(zero, err)
			return
		}
	}()

	return out
}

// jsonNavigateToKey fast-forwards the decoder to the specified key in the JSON object.
// For example when keys are ["a", "b", "c"], it will navigate to the "c" key in the object {"a": {"b": {"c": ...}}} or return an error if it's not possible.
func jsonNavigateToKey(dec *json.Decoder, keys ...string) error {
	if len(keys) == 0 {
		return nil
	}

	if err := jsonExpectToken(dec, json.Delim('{')); err != nil {
		return err
	}

	key := keys[0]

	for dec.More() {
		tok, err := dec.Token()
		if err != nil {
			return err
		}
		if tok == key {
			break
		}

		// not our key, skip the value
		if err := dec.Decode(&json.RawMessage{}); err != nil {
			return err
		}
	}

	return jsonNavigateToKey(dec, keys[1:]...)
}

// jsonExpectToken gets the next token from the decoder and checks if it matches the expected one.
func jsonExpectToken(dec *json.Decoder, expected json.Token) error {
	tok, err := dec.Token()
	if err != nil {
		return err
	}

	if tok != expected {
		return fmt.Errorf("expected %v, got %v", expected, tok)
	}
	return nil
}

// Product represents a simplified product object from the API.
type Product struct {
	ID       int    `json:"id"`
	Title    string `json:"title"`
	Category string `json:"category"`
}

func main() {
	resp, err := http.Get("https://dummyjson.com/products")
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	
	// The API returns an array of products located at the key "products", like {"products": [...]}
	products := StreamJSONArray[*Product](resp.Body, "products")

	err = rill.ForEach(products, 1, func(product *Product) error {
		fmt.Printf("%+v\n", product)
		return nil
	})

	if err != nil {
		fmt.Println("Error:", err)
		return
	}
}

For an even simpler case where the JSON is newline-delimited (i.e., each line is a separate JSON object), you can create a function that directly decodes each object without the need for navigating keys or handling arrays.

Clone this wiki locally