-
Notifications
You must be signed in to change notification settings - Fork 20
Streaming JSON Objects from a Remote File
This code demonstrates how to transform a JSON array streamed from a remote URL into a rill stream of objects. This approach allows processing JSON concurrently and on the fly without allocating memory for all objects.
The StreamJSONArray
generic reusable function takes a io.ReadCloser
(which can be obtained from an HTTP response body) and a variadic list of keys to navigate to the desired JSON array. It returns a stream of objects of type T.
Here's an example of how to use StreamJSONArray
to stream products from https://dummyjson.com/products:
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/destel/rill"
)
// StreamJSONArray streams objects of type T from a JSON array.
func StreamJSONArray[T any](r io.ReadCloser, keys ...string) <-chan rill.Try[T] {
var zero T
out := make(chan rill.Try[T])
go func() {
defer r.Close()
defer close(out)
dec := json.NewDecoder(r)
// Navigate to the specified key if provided
if err := jsonNavigateToKey(dec, keys...); err != nil {
out <- rill.Wrap(zero, err)
return
}
// Expect '['
if err := jsonExpectToken(dec, json.Delim('[')); err != nil {
out <- rill.Wrap(zero, err)
return
}
// Stream array items
for dec.More() {
var item T
if err := dec.Decode(&item); err != nil {
out <- rill.Wrap(zero, err)
return
}
out <- rill.Wrap(item, nil)
}
// Expect ']
if err := jsonExpectToken(dec, json.Delim(']')); err != nil {
out <- rill.Wrap(zero, err)
return
}
}()
return out
}
// jsonNavigateToKey fast-forwards the decoder to the specified key in the JSON object.
// For example when keys are ["a", "b", "c"], it will navigate to the "c" key in the object {"a": {"b": {"c": ...}}} or return an error if it's not possible.
func jsonNavigateToKey(dec *json.Decoder, keys ...string) error {
if len(keys) == 0 {
return nil
}
if err := jsonExpectToken(dec, json.Delim('{')); err != nil {
return err
}
key := keys[0]
for dec.More() {
tok, err := dec.Token()
if err != nil {
return err
}
if tok == key {
break
}
// not our key, skip the value
if err := dec.Decode(&json.RawMessage{}); err != nil {
return err
}
}
return jsonNavigateToKey(dec, keys[1:]...)
}
// jsonExpectToken gets the next token from the decoder and checks if it matches the expected one.
func jsonExpectToken(dec *json.Decoder, expected json.Token) error {
tok, err := dec.Token()
if err != nil {
return err
}
if tok != expected {
return fmt.Errorf("expected %v, got %v", expected, tok)
}
return nil
}
// Product represents a simplified product object from the API.
type Product struct {
ID int `json:"id"`
Title string `json:"title"`
Category string `json:"category"`
}
func main() {
resp, err := http.Get("https://dummyjson.com/products")
if err != nil {
fmt.Println("Error:", err)
return
}
// The API returns an array of products located at the key "products", like {"products": [...]}
products := StreamJSONArray[*Product](resp.Body, "products")
err = rill.ForEach(products, 1, func(product *Product) error {
fmt.Printf("%+v\n", product)
return nil
})
if err != nil {
fmt.Println("Error:", err)
return
}
}
For an even simpler case where the JSON is newline-delimited (i.e., each line is a separate JSON object), you can create a function that directly decodes each object without the need for navigating keys or handling arrays.