diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 6052bdf..5a16e09 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -22,12 +22,6 @@ and if required please add new test cases and list them below: - [ ] Test A - [ ] Test B -## Linting - -Please make sure you've run the following and fixed any issues that arise: - -- [ ] `trunk check` has been run - ## Final Checklist: - [ ] My code follows the style guidelines of this project diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml deleted file mode 100644 index cc5c17a..0000000 --- a/.github/workflows/lint.yaml +++ /dev/null @@ -1,39 +0,0 @@ -name: Lint - -on: - pull_request: - branches: - - "*" - -jobs: - lint: - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v2 - - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: 1.18 - - - name: Cache Linters/Formatters - uses: actions/cache@v2 - with: - path: ~/.cache/trunk - key: trunk-${{ runner.os }} - - - name: Install Protoc - uses: arduino/setup-protoc@v1 - - name: Install protoc-gen plugin - working-directory: ./protoc-gen-go-frpc - run: go install google.golang.org/protobuf/cmd/protoc-gen-go@latest - - name: Install frpc plugin for protoc-gen - working-directory: ./ - run: go install ./protoc-gen-go-frpc - - name: Run generator - working-directory: ./examples/test - run: protoc --go-frpc_out=../../pkg/generator test.proto - - - name: Trunk Check - uses: trunk-io/trunk-action@v1 diff --git a/.trunk/.gitignore b/.trunk/.gitignore deleted file mode 100644 index 507283d..0000000 --- a/.trunk/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*out -*logs -external diff --git a/.trunk/logs b/.trunk/logs deleted file mode 120000 index 12da1f2..0000000 --- a/.trunk/logs +++ /dev/null @@ -1 +0,0 @@ -/Users/shivanshvij/.cache/trunk/repos/48c9c5d3298f6198ad85aaa16535615b/logs \ No newline at end of file diff --git a/.trunk/out b/.trunk/out deleted file mode 120000 index fd6e5a6..0000000 --- a/.trunk/out +++ /dev/null @@ -1 +0,0 @@ -/Users/shivanshvij/.cache/trunk/repos/48c9c5d3298f6198ad85aaa16535615b/out \ No newline at end of file diff --git a/.trunk/plugins/trunk b/.trunk/plugins/trunk deleted file mode 120000 index 0109ed7..0000000 --- a/.trunk/plugins/trunk +++ /dev/null @@ -1 +0,0 @@ -/Users/shivanshvij/.cache/trunk/plugins/https---github-com-trunk-io-plugins/v0.0.3 \ No newline at end of file diff --git a/.trunk/trunk.yaml b/.trunk/trunk.yaml deleted file mode 100644 index 3886671..0000000 --- a/.trunk/trunk.yaml +++ /dev/null @@ -1,15 +0,0 @@ -version: 0.1 -cli: - version: 0.16.1-beta -lint: - enabled: - - actionlint@1.6.13 - - gitleaks@8.8.7 - - gofmt@1.18.3 - - golangci-lint@1.46.2 - - markdownlint@0.31.1 - - prettier@2.6.2 - ignore: - - linters: [ALL] - paths: - - dist/** diff --git a/CHANGELOG.md b/CHANGELOG.md index 87d8020..da48195 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,13 +7,20 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [v0.7.1] - 2022-12-10 + +### Changes + +- Fixed a bug when generating fRPC with streams where sometimes stream messages would be received out of order. +- Removed the Trunk linter + ## [v0.7.0] - 2022-09-28 ### Features - fRPC now uses the `VarInt` encoding format under the hood (added in [polyglot-go v0.5.0](https://github.com/loopholelabs/polyglot-go)) which should help reduce the number of bytes an RPC call is serialized to -- A new `CloseError` type has been added which, when returned by an RPC call, causes the connection to be closed after the message is written. This can be useful for authentication or connection management. -- Streaming is now available! The API matches gRPC's so it should be a drop-in replacement! +- A new `CloseError` type has been added which, when returned by an RPC call, causes the connection to be closed after the message is written. This can be useful for authentication or connection management. +- Streaming is now available! The API matches gRPC's so it should be a drop-in replacement! ### Changes @@ -40,7 +47,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). > Changelogs for [v0.5.0] and before can be found at https://github.com/loopholelabs/frisbee-go -[unreleased]: https://github.com/loopholelabs/frpc-go/compare/v0.7.0...HEAD +[unreleased]: https://github.com/loopholelabs/frpc-go/compare/v0.7.1...HEAD +[v0.7.1]: https://github.com/loopholelabs/frpc-go/releases/tag/v0.7.1 [v0.7.0]: https://github.com/loopholelabs/frpc-go/releases/tag/v0.7.0 [v0.6.0]: https://github.com/loopholelabs/frpc-go/releases/tag/v0.6.0 [v0.5.1]: https://github.com/loopholelabs/frpc-go/releases/tag/v0.5.1 diff --git a/README.md b/README.md index f18847d..83f3197 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ same is true for selected other new features explicitly marked as Usage instructions and documentation for fRPC is available at [https://frpc.io/](https://frpc.io/). -fRPC is still in very early \*_Alpha_. There may be bug in the library that will be fixed +fRPC is still in very early _Alpha_. There may be bugs in the library that will be fixed as the library matures and usage of fRPC grows. One of the major benefits to fRPC is that reading the generated code is extremely straight forward, making it easy to debug potential issues down the line. @@ -27,7 +27,6 @@ is extremely straight forward, making it easy to debug potential issues down the fRPC currently does not support the following features, though they are actively being worked on: -- Streaming Messages between the client and server - `OneOf` Message Types Example `Proto3` files can be found [here](/examples). diff --git a/dockerfile b/dockerfile index b708ef2..59a356b 100644 --- a/dockerfile +++ b/dockerfile @@ -2,7 +2,7 @@ FROM golang as builder ENV GOOS=linux GOARCH=amd64 CGO_ENABLED=0 -RUN go install github.com/loopholelabs/frpc-go/protoc-gen-go-frpc@v0.5.1 +RUN go install github.com/loopholelabs/frpc-go/protoc-gen-go-frpc@v0.7.1 # Note, the Docker images must be built for amd64. If the host machine architecture is not amd64 # you need to cross-compile the binary and move it into /go/bin. @@ -12,8 +12,8 @@ FROM scratch # Runtime dependencies LABEL "build.buf.plugins.runtime_library_versions.0.name"="github.com/loopholelabs/frpc-go" -LABEL "build.buf.plugins.runtime_library_versions.0.version"="v0.5.1" +LABEL "build.buf.plugins.runtime_library_versions.0.version"="v0.7.1" COPY --from=builder /go/bin / -ENTRYPOINT ["/protoc-gen-go-frpc"] +ENTRYPOINT ["/protoc-gen-go-frpc"] \ No newline at end of file diff --git a/docs/favicon.png b/docs/favicon.png new file mode 100644 index 0000000..d430c6e Binary files /dev/null and b/docs/favicon.png differ diff --git a/docs/getting-started/architecture.mdx b/docs/getting-started/architecture.mdx new file mode 100644 index 0000000..3bc6c0d --- /dev/null +++ b/docs/getting-started/architecture.mdx @@ -0,0 +1,49 @@ +--- +title: Architecture +--- + +The architecture of fRPC is based on the standard Server/Client model that a lot of other RPC frameworks are follow. +The idea is that the `Client` makes a connection with the `Server`, and then sends a structured +request. Based on the request type, the `Server` runs a handler that then returns a response or an error. +The `Server` then forwards that response object (or the error) back to the `Client`. + +From the perspective of the `Client`, they have simply called a function and received a response. The fact +that the request is serialized and transmitted to the `Server` is hidden for simplicity. + +To dig into how the underlying architecture of both the `Server` and `Client` work, it is first +important to understand that the underlying [Frisbee](https://github.com/loopholelabs/frisbee-go) protocol does not have any notion of a request +or response. When Frisbee sends a `Packet` of data, it does not wait for a response. This makes +the protocol suitable for a number of use cases (like real-time streaming), but also means that Request/Reply semantics +need to be implemented in the application logic - in this case, the code that fRPC generates. + +# Server Architecture + +The generated fRPC `Server` is based on the RPC `Services` that are defined in the `proto3` +file that is passed to the `protoc` compiler. Developers are responsible for implementing the generated +`Service` interfaces and passing that into the `Server` constructor. + +The `Server` then takes that implementation and creates a `handler table` that maps the request type to the +accompanying function in the provided `Service` implementation. + +When it receives a request, it looks up the request type in the `handler table` and calls the accompanying +function with the deserialized Request object. The function then returns a Response object that +is serialized and sent back to the `Client`. + +# Client Architecture + +The generated fRPC `Client` is also based on the RPC `Services` that are defined in the +`proto3` file that is passed to the `protoc` compiler. Based on the RPC Calls defined in those services, +fRPC generates a number of `Client` helper functions - one for each possible RPC Call. + +As mentioned before, Frisbee does not have any notion of a request or response - this means that we must implement +the ability to wait for a response from the `Server` in the application logic. We need to also be able to map +those incoming responses to the correct ongoing request. + +To achieve this, fRPC Clients make use of an `in-flight` requests table that maps a request ID to a channel +that can be listened to for a response. When an RPC function is called, it generates a request ID, serializes the request +object and sends it to the `Server`. When a response object is received from the `Server`, it is +deserialized and request ID is looked up in the `in-flight` requests table. + +The response is then pushed into the channel associated with the request ID, where it is read by the RPC function +that made the request in the first place. This response unblocks the RPC caller and the response object (or an error) +is returned. diff --git a/docs/getting-started/concepts.mdx b/docs/getting-started/concepts.mdx new file mode 100644 index 0000000..0d7f965 --- /dev/null +++ b/docs/getting-started/concepts.mdx @@ -0,0 +1,70 @@ +--- +title: Concepts +--- + +fRPC is, at its core, a code generator - one which uses the Frisbee messaging framework as its underlying transport mechanism. It hooks into +the `protoc` compiler and generates an RPC framework that matches the `proto3` spec provided to it. + +Frisbee was designed to allow developers to define their own messaging protocols, while having a library that would handle +all the lower level implementation for them. + +RPC Frameworks +are implementations of the Request/Reply pattern, and so fRPC generates the necessary +Frisbee code to handle that messaging pattern. + +There are three main components to fRPC: + +- The Message Types +- The Client +- The Server + +# Message Types + +One of the challenges with any messaging system is that the messages must be serialized and deserialized into formats that +can be transmitted over the wire. With a code generator like fRPC, that means we need to take your `proto3` +message definitions and generate the accompanying `structs` in Go. We then need to create consistent, performant, +and safe serialization and deserialization functions for those structs. + +To do this, fRPC makes use of the [Polyglot](https://github.com/loopholelabs/polyglot-go) library, which is a high-performance +serialization framework that recycles byte buffers and can serialize and deserialize data with almost no allocations. +This makes serialization and deserialization extremely fast, while also allowing us to minimize the accompanying memory allocations. + +[polyglot-go](https://github.com/loopholelabs/polyglot-go) library type comes with a number of +`encode` and `decode` methods for various types, that fRPC chains together to create the +serialization and deserialization functions for your `proto3` message definitions. + +We're also actively working on a [polyglot-rs](https://github.com/loopholelabs/polyglot-rs) library, which is a Rust +implementation of `Polyglot`, as well as [polyglot-ts](https://github.com/loopholelabs/polyglot-ts) which is a +TypeScript (and Javascript) implementation of `Polyglot`. + +# The Client + +The fRPC Client is a simple wrapper around the `frisbee.Client` type, and contains generated helper +functions for creating and sending requests to an fRPC Server and then returning the accompanying response. + +It's also possible to deviate from those helper functions and access the underlying `frisbee.Client` directly. +This allows you to do things like turn Frisbee off (and thus retrieve the underlying TCP connection). + +# The Server + +The fRPC Server is a simple wrapper around the `frisbee.Server` type, and contains generated helper +functions for handling incoming requests and returning the accompanying response based on the handlers you've passed in +to the constructor. + +Similar to the Client, it's also possible to deviate from those helper functions and access the underlying +`frisbee.Server` directly. This allows you to do things like turn Frisbee off (and thus retrieve the +underlying TCP connection), or write your own middleware functions for incoming or outgoing packets. + +# Accessing Frisbee Directly + +As we've mentioned before, it's possible to access the underlying [Frisbee](https://github.com/loopholelabs/frisbee-go) primitives from both the +client and the server. This is why fRPC is more flexible than other RPC frameworks, and why it's possible to +do things like send a few RPC requests using fRPC and then reuse that underlying TCP connection for something like an +HTTP proxy. + +fRPC generates a `frisbee.HandlerTable` that allows Frisbee to route incoming packets to the correct +handler functions. It's possible to override this table using the `frisbee.Server.SetHandlerTable()` +method (which is exposed in the generated `frpc.Server` type). + +To learn more about how [Frisbee](https://github.com/loopholelabs/frisbee-go) works and how you can leverage it from within the generated fRPC +code, check out the [frisbee-go Github Repository](https://github.com/loopholelabs/frisbee-go). diff --git a/docs/getting-started/overview.mdx b/docs/getting-started/overview.mdx new file mode 100644 index 0000000..e187b81 --- /dev/null +++ b/docs/getting-started/overview.mdx @@ -0,0 +1,49 @@ +--- +title: Overview +--- + +**fRPC** (or **Frisbee RPC**), is an RPC Framework (similar to [gRPC](https://grpc.io) or +[Apache Thrift](https://thrift.apache.org/)) that's designed from the ground up to be lightweight, extensible, and extremely performant. + +We built fRPC because we loved the idea of defining our message types in a standardized +[proto3](https://developers.google.com/protocol-buffers/docs/proto3) format and having the [protobuf](https://github.com/protocolbuffers/protobuf) compiler generate all the necessary +glue code for us, but we didn't like the [overhead](https://github.com/boguslaw-wojcik/encoding-benchmarks) of encoding and decoding +messages in the Protobuf format, and wanted a wire protocol that was lighter and faster +than HTTP\/2. + +fRPC offers a few major improvements over existing +RPC frameworks like gRPC: + +- **Speed** - On average fRPC outperforms other RPC frameworks [by 2-4x in an apples-to-apples comparison](/performance/grpc-benchmarks), and is easily able to handle more than **2 million RPCs/second** on a single server +- **Flexibility** - Not only does fRPC allow developers to deviate from the standard request/reply messaging pattern and implement custom patterns alongside their existing RPCs, but developers also have the ability to turn fRPC off and retrieve the underlying TCP connections so they can be reused for something else +- **Familiarity** - Using fRPC feels very familiar to anyone who's used gRPC before, which means that developers can take advantage of the speed and extensibility that fRPC provides without a steep learning curve + +fRPC works by making use of protobuf plugins, and allows developers to use their existing proto3 files to generate a full +RPC Framework that uses Frisbee under the hood. Our goal is to make fRPC a **drop-in +replacement for gRPC** thanks to its generated interfaces matching gRPC's, however we don't support all of the features that +gRPC does yet, most notable being Streaming and OneOf message types. + +# fRPC vs Frisbee + +It's important to note the distinction between fRPC and Frisbee. fRPC uses proto3 files to generate client and server +implementations that use the Frisbee framework under the hood. This is why fRPC is so performant compared to other RPC +frameworks - the Frisbee messaging framework and wire protocol are lightweight and extremely optimized. + +At its core, **Frisbee** is best described as a `bring-your-own-protocol` messaging framework. Our goal was +to make it possible for developers to define their **own** messaging patterns and protocols, and have the actual +lower-level implementations done for them by the library. + + + A simple way to understand this is to think of fRPC as a Request/Reply + protocol, and Frisbee as the low-level implementation of that protocol. With + Frisbee you can implement any protocol or pattern you'd like, but since + Request/Reply is so common fRPC allows you to implement that specific pattern + very quickly and easily. + + +# Getting started with fRPC + +Over the next few pages we'll walk you through the process of getting started with fRPC, +from defining your message types in a proto3 file, to writing your first server and client. + +We'll also introduce the core concepts around Frisbee as well as how you can use the Framework to build your own custom messaging protocols. diff --git a/docs/getting-started/quick-start.mdx b/docs/getting-started/quick-start.mdx new file mode 100644 index 0000000..57a03c2 --- /dev/null +++ b/docs/getting-started/quick-start.mdx @@ -0,0 +1,317 @@ +--- +title: Quick Start +--- + +In this section we'll be going over how you can quickly get started with fRPC, +from defining your message types in a proto3 file, to writing your first server and client. + +We'll be building a simple echo service that will echo back the message it receives, and later on we'll also show how +you can use the Frisbee framework itself to build a more complex PUB\/SUB service. + +# Installation + +To get started with fRPC, you'll need to make sure you have `Go` +and the `protoc` compiler installed. Then, you'll need to install +the `protoc-gen-go-frpc` protoc plugin +which we will use to generate the server and client code. + +## Prerequisites + +- [Go](https://golang.org) - fRPC works with `Go` version 1.18 or later. For installation instructions see [Go's Getting Started Guide](https://golang.org/doc/install). +- [Protocol Buffer Compiler (protoc)](https://developers.google.com/protocol-buffers) - fRPC works with `protoc` version 3. For installation instructions see the [Protoc Getting Started Guide](https://developers.google.com/protoc/docs/getting_started). + +If you're using MacOS and have [Brew](https://brew.sh/) installed, you can use `brew install go` +to install Golang, and `brew install protoc` to install the protoc compiler. + +## Install the fRPC Plugin + +To install the `protoc-gen-go-frpc` plugin, you'll first need to make sure that your `$GOBIN` environment variable is set and available in +your system path. See the [Go Environment Variables](https://golang.org/doc/code.html#GOPATH) page +for more information, but in general, you can do this by adding the following to +your `~/.bashrc` file: + +```sh .bashrc +export GOBIN=$GOPATH/bin +export PATH=$PATH:$GOBIN +``` + +To install the `protoc-gen-go-frpc` plugin itself, you'll need to run the following command: + +```bash +$ go install github.com/loopholelabs/frpc-go/protoc-gen-go-frpc@latest +``` + +This will install the `protoc-gen-go-frpc` plugin into your `$GOBIN` directory +where it will be available for use by the `protoc` compiler. + +You can check that the plugin is installed and available by running the following command: + +```bash +$ which protoc-gen-go-frpc +/Users//go/bin/protoc-gen-go-frpc # or $GOPATH/bin/protoc-gen-go-frpc +``` + +# Create a Proto3 File + +Now that we have the prerequisites and the `protoc-gen-go-frpc` plugin installed, +we can start writing our echo service. Let's start by creating a directory to house our project: + +```bash +$ mkdir -p ~/frpc +$ cd ~/frpc +``` + +Now we'll create an `echo.proto` file and define our message types: + +```protobuf echo.proto +option go_package = "/echo"; + +message Request { + string Message = 1; +} + +message Response{ + string Message = 1; +} +``` + +You can see that we've defined two message types, one for the `Request` and one for the `Response`. + +Next, we will define a new `EchoService` in our `proto3` file. This tells the compiler that we want to generate a server and client for this service. + +```protobuf echo.proto +option go_package = "/echo"; + +service EchoService { + rpc Echo(Request) returns (Response); +} + +message Request { + string Message = 1; +} + +message Response{ + string Message = 1; +} +``` + +And with that you should be ready. Next we'll start the `protoc` compiler to generate +our fRPC server and client. + +# Generate the Server and Client + +Let's run the following command to generate the server and client code: + +```bash +$ protoc --go-frpc_out=. echo.proto +``` + +This command tells the `protoc` compiler to generate the server and client code for us and +by specifying the `--go-frpc_out` flag, we're implicitly specifying that we want to use the `protoc-gen-go-frpc` plugin. + +If we wanted to be more explicit, we could have run the following command: + +```bash +$ protoc --plugin=protoc-gen-go-frpc=$GOBIN/protoc-gen-go-frpc --go-frpc_out=. echo.proto +``` + +These commands should have generated a new folder at `~/frpc/echo`, which +contains an `echo.frpc.go` file containing the server and client code. Within +that file, you'll find the following interface: + +```go echo.frpc.go +... + +type EchoService interface { + Echo(context.Context, *Request) (*Response, error) +} + +... +``` + +All we have left to do is implement the `EchoService` interface with our server-side logic, +and pass that into the server. The generated library will then be able to handle everything else for us. + +# Setting up the Server + +To set up our server, we simply need to implement the `EchoService` interface and then start +the server. We'll start by creating a new `server/main.go` file in our `~/frpc` directory: + +```go server/main.go +package main + +import ( + "context" + "frpc/echo" +) + +type svc struct{} + +func (s *svc) Echo(_ context.Context, req *echo.Request) (*echo.Response, error) { + res := new(echo.Response) + res.Message = req.Message + return res, nil +} +``` + +As you can see we've created a new struct called `svc` and implemented the `EchoService` interface by +creating a new function called `Echo` which takes a `context.Context` and an `*echo.Request` object. +We aren't really using the context in this example so we just ignore that and instead return an `*echo.Response` object with the +same message as the request. + +Now we can implement the server itself: + +```go server/server.go +package main + +import ( + "context" + "github.com/rs/zerolog" + "frpc/echo" + "log" + "os" + "runtime" + "time" +) + +type svc struct{} + +func (s *svc) Echo(_ context.Context, req *echo.Request) (*echo.Response, error) { + res := new(echo.Response) + res.Message = req.Message + return res, nil +} + +func main() { + frpcServer, err := echo.NewServer(new(svc), nil, nil) + if err != nil { + panic(err) + } + + err = frpcServer.Start(":8080") + if err != nil { + panic(err) + } +} +``` + +This additional `main` function runs when the server starts up, and passes in our `svc` struct to the +generated `echo.NewServer()` function. It then binds the server to port `:8080` and starts listening for connections. + +We're passing in `nil` for both the `*tls.Config` and `logging` parameters in the generated `echo.NewServer()` function because +we don't want to use TLS or logging in this example. + +# Setting up the Client + +To set up our client, we don't need to implement any additional logic, but we do need to create a new `client/main.go` file +in our `~/frpc` directory: + +```go client/main.go +package main + +import ( + "context" + "fmt" + "frpc/echo" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + c, err := echo.NewClient(nil, nil) + if err != nil { + panic(err) + } + + err = c.Connect("127.0.0.1:8080") + if err != nil { + panic(err) + } +} +``` + +Here, we're creating a new echo client using our generated `echo.NewClient()` function. +Then, we're passing in the address of the server we want to connect to. But we're not actually sending any +requests to the server yet. + +To do that, we can write a simple look to send a request to the server every second and then print out the response: + +```go echo/client/client.go +package main + +import ( + "context" + "fmt" + "frpc/echo" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + c, err := echo.NewClient(nil, nil) + if err != nil { + panic(err) + } + + err = c.Connect("127.0.0.1:8080") + if err != nil { + panic(err) + } + + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + req := echo.NewRequest() + i := 0 + for { + select { + case <-stop: + err = c.Close() + if err != nil { + panic(err) + } + return + default: + req.Message = fmt.Sprintf("#%d", i) + log.Printf("Sending Request %s\n", req.Message) + res, err := c.EchoService.Echo(context.Background(), req) + if err != nil { + panic(err) + } + log.Printf("Received Response %s\n", res.Message) + time.Sleep(time.Second) + i++ + } + } +} +``` + +The above loop registers a `stop` channel to receive a signal when the user hits `Ctrl+C`, +and then starts sending a request to the server every second. + +And that's it! We've now set up a simple echo client that can send requests to our server and print out the response. + +We were able to use a simple `proto3` file to define our request and response objects, and all we had to do was +implement the `EchoService` interface. Everything else was handled for us by **fRPC**. + +The complete code for this example is available in the [frpc-echo-example](https://github.com/loopholelabs/frpc-echo-example) +repository on [Github](https://github.com/loopholelabs). + +# Next Steps + +Now that we've seen how easy it is to use **fRPC**, we recommend you check out our [benchmarks](/performance/grpc-benchmarks) pages +to learn more about how **fRPC** fares against other RPC frameworks. + +If you want to learn more about how **fRPC** works under the hood, you can check out our [fRPC Concepts](/getting-started/concepts) page, or our [technical reference](/reference/overview). + +Finally, if you'd like to learn how to use [Frisbee](https://github.com/loopholelabs/frisbee-go) (the underlying transport mechanism for fRPC) +to implement your **own messaging protocol** that's fast and performant, you can check out the [frisbee-go Github repository](https://github.com/loopholelabs/frisbee-go). + +If you need any help or have any feedback about Frisbee or fRPC, please to check out our [Discord Community](https://loopholelabs.io/discord)! +Our team would love to hear your thoughts and understand how you're planning on using fRPC, and we're always happy to help! diff --git a/docs/getting-started/roadmap.mdx b/docs/getting-started/roadmap.mdx new file mode 100644 index 0000000..f2a6da3 --- /dev/null +++ b/docs/getting-started/roadmap.mdx @@ -0,0 +1,66 @@ +--- +title: Roadmap +--- + +We're constantly building new features into fRPC to improve its functionality and expand its ease of use. +We'd love to hear your feedback on these as well as any other ideas you have! +Also, if you have any questions, please feel free to join our [Discord Server](https://loopholelabs.io/discord) where +you can ask questions and get help from other fRPC developers. + +## Currently In Development + +These are the features that are actively being developed right now. + +### Load Balancing Support + +One of gRPC's most important features is load balancing. This is a feature that allows you to distribute your requests +across multiple servers, and allows you to scale your application to handle more requests. Currently, fRPC does not +have load balancing built-in, however it is already possible to do application-level load balancing on top of a normal +fRPC client. + +Our goal is to have a built-in solution that Developers can use off the shelf, though, and this feature +is being actively developed. We're expecting to have it available in the coming months. + +### Retry Support + +Sometimes requests fail, whether it's because of a network issue or because of a server issue. Currently, this sort +of failure will result in an fRPC client reporting an error - instead, we'd like to be able to retry the request +a number of times before giving up. + +It's currently possible to do this at the application level, but our goal is to have a built-in solution that Developers \ +can use off the shelf, though, and this feature is being actively developed. We're expecting to have it available in the coming months. + +# Planned Features + +These are the features that we've planned to work on in the coming months. If any of these are important to you please +let us know! We use community feedback to plan our roadmap, and we also encourage contributors to submit their ideas +to the [Discord Server](https://loopholelabs.io/discord) so that we can discuss them with the community. + +### OneOf Message Types + +A recent addition to the proto3 syntax, `OneOf` message types allow developers to specify that +only a single field from a set can be used at a time. + +This feature is also actively being developed and should be available in the coming months. + +### JS/TS Support + +Currently, fRPC is only compatible with `Golang` but we plan to add support for JS and TS in the near future. +We know it's important to have an RPC framework that can be used across language boundaries so this is a big priority for us. + +We've already begun the work required to add support for JS and TS by porting the [polyglot-go](https://github.com/loopholelabs/polyglot-go) +framework to [Typescript](https://github.com/loopholelabs/polyglot-ts). This means we are already able to serialize and +deserialize RPCs in Typescript. + +### Rust Support + +Currently, fRPC is only compatible with `Golang` but we plan to add support for Rust in the near future. + +We've already begun the work required to add support for JS and TS by porting the [polyglot-go](https://github.com/loopholelabs/polyglot-go) +framework to [Rust](https://github.com/loopholelabs/polyglot-rs). This means we are already able to serialize and +deserialize RPCs in Rust. + +### Websocket Support + +We would love for developers to be able to use fRPC over websockets. This would allow fRPC clients to in the browser and +communicate directly with fRPC servers. diff --git a/docs/images/grpc/128kb.png b/docs/images/grpc/128kb.png new file mode 100644 index 0000000..781897a Binary files /dev/null and b/docs/images/grpc/128kb.png differ diff --git a/docs/images/grpc/1mb.png b/docs/images/grpc/1mb.png new file mode 100644 index 0000000..4108ae9 Binary files /dev/null and b/docs/images/grpc/1mb.png differ diff --git a/docs/images/grpc/32byte.png b/docs/images/grpc/32byte.png new file mode 100644 index 0000000..acf6ddb Binary files /dev/null and b/docs/images/grpc/32byte.png differ diff --git a/docs/images/grpc/512byte.png b/docs/images/grpc/512byte.png new file mode 100644 index 0000000..e24a296 Binary files /dev/null and b/docs/images/grpc/512byte.png differ diff --git a/docs/images/grpc/multi.png b/docs/images/grpc/multi.png new file mode 100644 index 0000000..dbf3139 Binary files /dev/null and b/docs/images/grpc/multi.png differ diff --git a/docs/images/grpc/throughput.png b/docs/images/grpc/throughput.png new file mode 100644 index 0000000..834b6bd Binary files /dev/null and b/docs/images/grpc/throughput.png differ diff --git a/docs/images/intro.svg b/docs/images/intro.svg new file mode 100644 index 0000000..57c028e --- /dev/null +++ b/docs/images/intro.svg @@ -0,0 +1,333 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/images/lightbackground.png b/docs/images/lightbackground.png new file mode 100644 index 0000000..b1160a8 Binary files /dev/null and b/docs/images/lightbackground.png differ diff --git a/docs/images/twirp/128kb.png b/docs/images/twirp/128kb.png new file mode 100644 index 0000000..7375fdc Binary files /dev/null and b/docs/images/twirp/128kb.png differ diff --git a/docs/images/twirp/1mb.png b/docs/images/twirp/1mb.png new file mode 100644 index 0000000..befc304 Binary files /dev/null and b/docs/images/twirp/1mb.png differ diff --git a/docs/images/twirp/32byte.png b/docs/images/twirp/32byte.png new file mode 100644 index 0000000..6a7e61f Binary files /dev/null and b/docs/images/twirp/32byte.png differ diff --git a/docs/images/twirp/512byte.png b/docs/images/twirp/512byte.png new file mode 100644 index 0000000..b40b481 Binary files /dev/null and b/docs/images/twirp/512byte.png differ diff --git a/docs/images/twirp/throughput.png b/docs/images/twirp/throughput.png new file mode 100644 index 0000000..983d85b Binary files /dev/null and b/docs/images/twirp/throughput.png differ diff --git a/docs/introduction.mdx b/docs/introduction.mdx new file mode 100644 index 0000000..6005b0e --- /dev/null +++ b/docs/introduction.mdx @@ -0,0 +1,58 @@ +--- +title: "fRPC Documentation" +sidebarTitle: "Introduction" +mode: "wide" +--- + +fRPC is a **proto3-compatible** RPC Framework +that's designed from the ground up to be **lightweight, extensible, and extremely +performant**. On average fRPC outperforms other RPC frameworks [by 2-4x in an apples-to-apples +comparison](/performance/grpc-benchmarks), and is easily able to handle more than +2 million RPCs/second on a single server. + +Welcome to fRPC + + + + Quickly get up and running with fRPC by following our getting started guide. + + + Take a look at some unique fRPC concepts and how it differs from other + frameworks. + + + Check out our planned technical roadmap to see how we'll be improving fRPC + in the future. + + + Take a look at our technical docs to dig into the details of fRPC and how + you can use it. + + + + + + The footer of each page contains an "Edit on Github" link. Please feel free + to contribute by making a pull request! + + + The **#Frisbee** Channel in our Discord server is a great place to get help + with all things Frisbee and fRPC. + + diff --git a/docs/logo/dark.svg b/docs/logo/dark.svg new file mode 100644 index 0000000..8c1afc2 --- /dev/null +++ b/docs/logo/dark.svg @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/docs/logo/light.svg b/docs/logo/light.svg new file mode 100644 index 0000000..42edafa --- /dev/null +++ b/docs/logo/light.svg @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/docs/mint.json b/docs/mint.json new file mode 100644 index 0000000..ba27af3 --- /dev/null +++ b/docs/mint.json @@ -0,0 +1,81 @@ +{ + "name": "fRPC", + "logo": { + "light": "/logo/light.svg", + "dark": "/logo/dark.svg" + }, + "favicon": "/favicon.png", + "colors": { + "primary": "#AB1CFC", + "light": "#C854FF", + "dark": "#9015D9", + "ultraLight": "#F8EEFE", + "ultraDark": "#6A06A5", + "background": { + "dark": "#121212" + }, + "anchors": { + "from": "#7C34FF", + "to": "#FE9195" + } + }, + "topbarCtaButton": { + "type": "github", + "url": "https://github.com/loopholelabs/frpc-go" + }, + "anchors": [ + { + "name": "Discord", + "icon": "discord", + "url": "https://loopholelabs.io/discord" + }, + { + "name": "GitHub", + "icon": "github", + "url": "https://github.com/loopholelabs/frpc-go" + } + ], + "navigation": [ + { + "group": "Welcome", + "pages": ["introduction"] + }, + { + "group": "Getting Started", + "pages": [ + "getting-started/overview", + "getting-started/quick-start", + "getting-started/concepts", + "getting-started/architecture", + "getting-started/roadmap" + ] + }, + { + "group": "Performance", + "pages": [ + "performance/optimizations", + "performance/grpc-benchmarks", + "performance/twirp-benchmarks" + ] + }, + { + "group": "Reference", + "pages": [ + "reference/overview", + "reference/client-methods", + "reference/server-methods" + ] + } + ], + "footerSocials": { + "discord": "https://loopholelabs.io/discord", + "github": "https://github.com/loopholelabs/frpc-go" + }, + "backgroundImage": "/images/lightbackground.png", + "analytics": { + "posthog": { + "apiKey": "phc_WfWunxGOdcI7urnQA4XolSTSisVIh9NI38jTbg2TshT", + "apiHost": "https://data.frpc.io" + } + } +} diff --git a/docs/performance/grpc-benchmarks.mdx b/docs/performance/grpc-benchmarks.mdx new file mode 100644 index 0000000..9446bf5 --- /dev/null +++ b/docs/performance/grpc-benchmarks.mdx @@ -0,0 +1,122 @@ +--- +title: gRPC Benchmarks +--- + +We can't just claim that fRPC is faster than a battle-tested tool like gRPC without backing it up with an apples-to-apples comparison. +These benchmarks are publicly available at [https://github.com/loopholelabs/frpc-go-benchmarks](https://github.com/loopholelabs/frpc-go-benchmarks), and we encourage you to run them for yourselves. + +To make sure our benchmark is fair, we'll be using the exact same proto3 file as the input for both fRPC and gRPC. +Moreover, we'll be using **the exact same service implementation for both the gRPC and fRPC** servers - the generated service interfaces in fRPC are designed to look the same as in gRPC, +so using the same service implementation is extremely simple. + +```protobuf Benchmark Proto3 File +option go_package = "/"; + +service BenchmarkService { + rpc Benchmark(Request) returns (Response); +} + +message Request { + string Message = 1; +} + +message Response{ + string Message = 1; +} +``` + +We'll be running a number of different benchmarks with an increasing number of concurrent clients to show off both the throughput and the scalability of fRPC when compared to gRPC. + +The following benchmarks were performed on a bare metal host running Debian 11, 2x AMD EPYC 7643 48-Core CPUs, and 256GB of DDR4 memory. The benchmarks were performed over a local network to avoid inconsistencies due to network latency. + +# Client Throughput + +For our first set of benchmarks we'll have a number of concurrently connected clients and each client will make RPCs to the fRPC or +gRPC server using a randomly generated fixed-sized message, and then wait for a response before repeating. + +In each of our benchmark runs we're increasing the number of concurrently connected clients and we're measuring the average throughput of each client to see how well fRPC and gRPC scale. We're also +running a number of separate benchmarks, each with an increasing message size. + +## 32-Byte Messages + +![32-byte messages](/images/grpc/32byte.png) + +Starting with **32-byte** messages, it's obvious from the graph above that fRPC consistently outperforms and outscales gRPC - +often by more than 2x. In the case of 8192 connected clients, fRPC's throughput is still 112 RPCs/second while gRPC drops to only 29. + +That means that clients using fRPC get almost 4x more throughput than gRPC using the same services and the same proto3 files. + +With 32-byte messages and 112 RPCs/second for fRPC that means our total throughput is about 3584B/s per client. With 8192 +clients that means our total throughput is about 28MB/s. for gRPC our total throughput is about 7.25MB/s. + +## 512-Byte Messages + +![512-byte messages](/images/grpc/512byte.png) + +Moving to the slightly larger **512-byte** messages, we can see the total throughput seems to drop for each individual client, but +fRPC is still comfortably 2-3x faster than gRPC is. In the case of 8192 connected clients, fRPC's throughput is still 98 RPCs/second while gRPC drops to only 29. + +With 512-byte messages and 98 RPCs/second for fRPC that means our total throughput is about 49KB/s per client. With 8192 +clients that means our total throughput is about 392MB/s. for gRPC our total throughput is about 116MB/s. + +## 128-KB Messages + +![128-KB messages](/images/grpc/128kb.png) + +Now we're moving to the next larger message size, **128-KB**. Total throughput drops as expected for each client, but fRPC is still +easily 3-4x faster than gRPC. In the case of 100 connected clients, fRPC's throughput is 192 RPCs/second while gRPC drops to only 65. + +With 128KB messages and 192 RPCs/second for fRPC that means our total throughput is about 24MB/s per client. +With 100 clients that means our total throughput is about 2.34GB/s. For gRPC our total throughput is only about 0.8GB/s. + +## 1-MB Messages + +![1-MB messages](/images/grpc/1mb.png) + +With the next largest message size, **1MB**, it's clear that we're starting to become bottlenecked by the bare metal host we're using to benchmark. + +Still, fRPC keeps its lead with a 3-4x improvement over gRPC, and in the case of 100 connected clients fRPC's throughput +per client is about 37MB/s. With 100 clients that means our total throughput is about 3.6GB/s. For gRPC our total throughput is only about 1.7GB/s. + +# Server Throughput + +Now let's look at how fRPC servers scale as we increase the number of connected clients. +For this benchmark, we're going to make it so that each client repeatedly sends 10 concurrent RPCs in order to +saturate the underlying TCP connections and the accompanying RPC server. + +![server throughput](/images/grpc/throughput.png) + +As before, we can see that fRPC consistently outperforms gRPC - but as we increase the number of clients it's also +clear that fRPC does not get as slowed down as the gRPC server does. It's able to handle **more than +2,000,000 RPCs/second** and the bottleneck actually seems to be our bare metal host as opposed to fRPC. + +In the case where we have 8192 connected clients, we can see that the gRPC server is able to handle just less than 500,000 RPCs/second - whereas **fRPC can easily handle more than 4x that**. + +# Multi-Threaded Performance + +By default, fRPC creates a new goroutine for each incoming RPC. This is a very similar approach to the one used by gRPC, +and is a good choice for high-throughput applications where handling the RPC can be a blocking operation (like querying a remote +database). + +However, fRPC can also be configured to create a single goroutine to handle all the RPCs from +each incoming connection. This is a good choice for applications that require very low latency and where the +handlers are not blocking operations (such as metrics streaming). + +The benchmarks above were all run with the single-goroutine option enabled, because our +BenchmarkService implementation is a simple `Echo` service that does little to no processing and does +not block. + +It's also important, however, to benchmark an application where the RPCs are blocking operations - and for those we'll go back +to fRPCs default behavior to create a new goroutine to handle each incoming RPC. + +Our blocking operation for the following benchmark is a simple `time.Sleep` call that sleeps for exactly 50 Microseconds. + +![Multi-threaded Throughput](/images/grpc/multi.png) + +The trend above is very similar to the single-threaded benchmarks above - fRPC is still leading gRPC in throughput, but it's also clear that boht gRPC and fRPC have suffered a performance penalty. For gRPC this is likely because the RPCs are now doing "work" and are blocking operations, but +for fRPC it's a mixture of the added computation as well as the multi-threaded behaviour. + +In the case where we have 8192 connected clients, we can see that the performance of fRPC has dropped from 2,000,000 RPCs/second to about 1,600,000 RPCs/second, and gRPC has dropped from 500,000 RPCs/second to about 400,000 RPCs/second. + +These benchmarks show off just a small portion of fRPCs capabilities, and we encourage everyone to run +these for themselves. We also have [benchmarks comparing fRPCs messaging format with protobuf and other serialization frameworks](https://github.com/loopholelabs/polyglot-go-benchmarks). diff --git a/docs/performance/optimizations.mdx b/docs/performance/optimizations.mdx new file mode 100644 index 0000000..3d2da6b --- /dev/null +++ b/docs/performance/optimizations.mdx @@ -0,0 +1,66 @@ +--- +title: Optimizations +--- + +With **Zero Allocations** in the hot-path and an 8-Byte Packet Header, the network overhead of Frisbee is significantly +lower than that of existing protocols like gRPC (performance comparisons available in our [gRPC Benchmarks](/performance/grpc-benchmarks)) +which use HTTP/2 under the hood. + +This, combined with the substantial performance gains over [protobufs](https://github.com/protocolbuffers/protobuf) that come with using +[polyglot](http://github.com/loopholelabs/polyglot-go) for serialization and deserialization, makes fRPC a great choice for high-performance, high-scalability, and high-reliability RPC frameworks. + +We originally designed Frisbee for our own messaging needs at [Loophole Labs](https://loopholelabs.io), where we needed +to send both large and small amounts of data in a latency-sensitive manner. We also needed it to be massively scalable, +able to handle thousands of concurrent connections and able to send millions of messages. + +To fulfill these requirements we spent a lot of time architecting the Frisbee data path to be extremely fast and extremely efficient. + +# Data Path Optimizations + +Our optimizations began with the `Packet` package, which efficiently recycles the byte buffers +that are used throughout Frisbee to hold interstitial data. These make use of the [polyglot](http://github.com/loopholelabs/polyglot-go) library +to implement `Encoding` and `Decoding` packages, which read and write directly from the `Packet.packet` +structure. By recycling the `Packet.packet` structures throughout Frisbee, we can significantly reduce +the number of allocations in the encoding and decoding functions. + +Most of our other optimizations center around our network I/O. Actually reading and writing data from a TCP socket +is extremely slow, and so Frisbee makes an effort to maximize the amount of data that we read or write to a TCP socket +while avoiding any additional latency. + +All these optimizations - as well as Frisbee's architecture, make it feasible to use Frisbee (as well as fRPC) +in both latency-sensitive applications like in real-time streaming, as well as high-throughput applications like PUB/SUB systems. + +# Scheduling Optimizations + +By default, fRPC creates a new goroutine for each incoming RPC. This is a very similar approach to the one used by gRPC, +and is a good choice for high-throughput applications where handling the RPC can be a blocking operation (like querying a remote +database). + +However, fRPC can also be configured to create a single goroutine to handle all the RPCs from +each incoming connection. This is a good choice for applications that require very low latency and where the handlers are not blocking operations (such as metrics streaming). + + + In our benchmarks we've tested both approaches, though it should be noted that + the single-goroutine approach is not as efficient as the multi-goroutine + approach when the blocking time of the RPC handler is high. + + +# Why TCP? + +Many of the recently released wire protocols like [Wireguard](https://www.wireguard.com/) and [QUIC](https://datatracker.ietf.org/doc/html/rfc9000) +use UDP under the hood instead of TCP. Unlike TCP, UDP is an unreliable transport mechanism and provides no guarantees +on packet delivery. + +There are benefits to using UDP and implementing packet delivery mechanisms on top of it - QUIC, for example, uses +UDP to solve the [head-of-line blocking](https://calendar.perfplanet.com/2020/head-of-line-blocking-in-quic-and-http-3-the-details/) +problem. + +For Frisbee, however, we wanted to make use of the existing performance optimizations that networking software and hardware +have for TCP traffic, and we wanted the strong guarantees around packet delivery that TCP already provides. + + + It's important to note that while Frisbee and fRPC were designed to be used + with TCP connections, there's no reason developers can't use other transports. + As long as the transport fulfills the 'net.Conn' interface, it will work as + expected with Frisbee and fRPC. + diff --git a/docs/performance/twirp-benchmarks.mdx b/docs/performance/twirp-benchmarks.mdx new file mode 100644 index 0000000..ce62d6c --- /dev/null +++ b/docs/performance/twirp-benchmarks.mdx @@ -0,0 +1,92 @@ +--- +title: Twirp Benchmarks +--- + +As with our other benchmarks, this will be as close to an apples-to-apples comparison as possible. +These benchmarks are publicly available at [https://github.com/loopholelabs/frpc-go-benchmarks](https://github.com/loopholelabs/frpc-go-benchmarks), and we encourage you to run them for yourselves. + +To make sure our benchmark is fair, we'll be using the exact same proto3 file as the input for both fRPC and Twirp. +Moreover, we'll be using **the exact same service implementation for both the Twirp and fRPC** servers - Twirp uses protobufs for serialization and its interface is very similar to gRPC. Because fRPC was designed +to feel familiar to the gRPC interface, using the same service implementation is extremely simple. + +```protobuf Benchmark Proto3 File +option go_package = "/"; + +service BenchmarkService { + rpc Benchmark(Request) returns (Response); +} + +message Request { + string Message = 1; +} + +message Response{ + string Message = 1; +} +``` + +We'll be running a number of different benchmarks with an increasing number of concurrent clients to show off both the throughput and the scalability of fRPC when compared to Twirp. + +The following benchmarks were performed on a bare metal host running Debian 11, 2x AMD EPYC 7643 48-Core CPUs, and 256GB of DDR4 memory. The benchmarks were performed over a local network to avoid inconsistencies due to network latency. + +# Client Throughput + +For our first set of benchmarks we'll have a number of concurrently connected clients and each client will make RPCs to the fRPC or +Twirp server using a randomly generated fixed-sized message, and then wait for a response before repeating. + +In each of our benchmark runs we're increasing the number of concurrently connected clients and we're measuring the average throughput of each client to see how well fRPC and Twirp scale. We're also +running a number of separate benchmarks, each with an increasing message size. + +## 32-Byte Messages + +![32-byte messages](/images/twirp/32byte.png) + +Starting with **32-byte** messages, the results are very similar to the ones with gRPC. fRPC consistently and +substantially outperforms Twirp - though the performance drop off for Twirp is significantly steeper than what we saw with gRPC. + +In the case of 8192 connected clients, Twirp's performance drops to only 4 RPCs/second per client while fRPC is able to handle 112 RPC/second. +This means fRPC is 28x more performant than Twirp. + +Twirp seems to be relatively capable when there is a small number of connected clients, but quickly falls off as the number of clients increases. + +## 512-Byte Messages + +![512-byte messages](/images/twirp/512byte.png) + +When changing the message size to **512-bytes**, we can see an extremely sharp drop in Twirp's throughput, while fRPC seems to fare +much better. In the case of 8192 connected clients, fRPC's throughput is still 98 RPCs/second while Twirp drops to only 4 - meaning +fRPC performs almost 25x better than Twirp. + +## 128-KB Messages + +![128-KB messages](/images/twirp/128kb.png) + +With larger **128-KB** messages, we continue to see the same pattern as before - throughput for individual clients of both frameworks +drops as more clients are added, but fRPC performs far better than Twirp - in this case between 2-7x better + +## 1-MB Messages + +![1-MB messages](/images/twirp/1mb.png) + +With the largest message size of the benchmark, **1MB**, the pattern from our previous runs continues. In this case, fRPC +seems to perform between 3-6x better than Twirp and we're guessing that our bare metal host is starting the become the bottleneck as +we increase the number of clients. + +# Server Throughput + +Now let's look at how fRPC servers scale compared to Twirp as we increase the number of connected clients. Twirp makes +use of the standard `net/http` server so we're really comparing against that. + +For this benchmark, we're going to make it so that each client repeatedly sends 10 concurrent RPCs in order to +saturate the underlying TCP connections and the accompanying RPC server. + +![server throughput](/images/twirp/throughput.png) + +As before, we can see that fRPC consistently outperforms Twirp - but as we increase the number of clients beyond 1024, +we actually saw the Twirp clients begin to fail. We couldn't get our benchmark to run for more than 1024 clients, which is +why the benchmark reports a 0 for those runs. + +At 1024 clients, though, the fRPC is easily able to handle more than 60x more RPCs/second than Twirp is. + +These benchmarks show off just a small portion of fRPCs capabilities, and we encourage everyone to run +these for themselves. We also have [benchmarks comparing fRPCs messaging format with protobuf and other serialization frameworks](https://github.com/loopholelabs/polyglot-go-benchmarks). diff --git a/docs/reference/client-methods.mdx b/docs/reference/client-methods.mdx new file mode 100644 index 0000000..7e4c6fb --- /dev/null +++ b/docs/reference/client-methods.mdx @@ -0,0 +1,104 @@ +--- +title: Client Methods +--- + +# NewClient + +The `NewClient(tlsConfig *tls.Config, logger *zerolog.Logger) (*Client, error)` method is used to create a new fRPC client. + +It takes two arguments: + +- `tlsConfig`: a `*tls.Config` that will be used to configure TLS for the underlying connection. This can be left as `nil` if no TLS is required. +- `logger`: a `*zerolog.Logger` that will be used to log all events. This can be left as `nil` if no logging is required. + +It returns an fRPC `*Client` on success and an error otherwise. + +# Connect + +The `Connect(addr string) error` method is used to initiate a connection to an fRPC server. +If a `*tls.Config` was provided when the client was created (using `NewClient`) +it will be used to configure TLS for the connection. + +An error will be returned if the connection fails. + + + The Connect function should only be called once. If FromConn was already + called on this client, Connect will return an error. + + +# FromConn + +The `FromConn(conn net.Conn) error` method is used to create a new fRPC client from an existing net.Conn. This is useful if you want to reuse an existing connection, or +if you have a custom transport that you want to use. If a `*tls.Config` was provided when the client was created (using `NewClient`), it will be ignored. + +An error will be returned if the connection fails. + + + The FromConn function should only be called once. If Connect was already + called on this client, FromConn will return an error." + + +# Closed + +The `Closed() bool` method is used to check if the client is closed. This method will return `true` if the client is closed or has not yet been initialized, and `false` otherwise. + +# Error + +The `Error() error` method is used to check if the client has encountered an error. This method will return an `error` if the client has encountered an error, or `nil` otherwise. + +This method is meant to be used to check if the client encountered an error that caused it to close. + +# Close + +The `Close() error` method is used to close the client. It will return an `error` if the client encounters an error while closing (or if it is already closed), and will cancel any pending RPCs. + +# WritePacket + +The `WritePacket(p *packet.Packet) error` method is used to write a raw frisbee packet to the underlying connection. Normal fRPC operations should not use this method, however it is available +when extending fRPC with custom protocols or messaging patterns directly for use with the underlying Frisbee library. + +# Flush + +The `Flush() error` method is used to flush the underlying connection. Normal fRPC operations should not use this method, however it is available +when extending fRPC with custom protocols or messaging patterns directly for use with the underlying Frisbee library. + +# CloseChannel + +The `CloseChannel() <- chan struct{}` method is used to signal to a listener that the Client has been closed. +The returned channel will be closed when the client is closed, and the `Error()` method can be used to check if the connection was closed due to an error. + +# Raw + +The `Raw() (net.Conn, error)` method is used to get the underlying `net.Conn` from the fRPC Client. This is useful if you want to extend fRPC with custom protocols or messaging patterns directly for use with the underlying Frisbee library. + +# Logger + +The `Logger() *zerolog.Logger` method is used to get the logger that was provided when the client was created. This is useful if you want to extend fRPC with custom protocols or messaging patterns directly for use with the underlying Frisbee library. + +# Generated Methods + +When generating the fRPC Client, each service in the `.proto` file also results in a generated `service` Client. +Then, for each RPC in the service, a method is generated on the appropriate the `service` client. +For example, if the `.proto` file contains the following service definition: + +```proto +service MyService { + rpc MyMethod(MyRequest) returns (MyResponse) {} +} +``` + +Then the generated `service` Client method is: + +```go +func (c *MyService) MyMethod(ctx context.Context, req *MyRequest) (res *MyResponse, err error) { + ... +} +``` + +And it's meant to be invoked using: + +```go +res, err := c.MyService.MyMethod(ctx, req) +``` + +Unlike gRPC, where each service requires creating a new RPC Client, fRPC creates a single client for all your services. diff --git a/docs/reference/overview.mdx b/docs/reference/overview.mdx new file mode 100644 index 0000000..cbe23d0 --- /dev/null +++ b/docs/reference/overview.mdx @@ -0,0 +1,13 @@ +--- +title: Overview +--- + +This is the reference documentation for fRPCs generated Client and Server. Since fRPC currently only supports the [Go](https://golang.org) programming language, this reference guide is currently only for [Go](https://golang.org). + +This guide describes the code generated with the `protoc-gen-go-frpc` plugin, when compiling `.proto` files with protoc. + + + Client-side RPC invocations and server-side RPC handlers are meant to be + thread-safe and will be run in concurrent goroutines. You must keep + thread-safety in mind when implementing server-side RPC handlers for fRPC. + diff --git a/docs/reference/server-methods.mdx b/docs/reference/server-methods.mdx new file mode 100644 index 0000000..6716869 --- /dev/null +++ b/docs/reference/server-methods.mdx @@ -0,0 +1,123 @@ +--- +title: Server Methods +--- + +# NewServer + +The `NewServer(... Services, *tls.Config, *zerolog.Logger) (*Server, error)` method is used to create a new fRPC Server. It takes a list of structs that implement +the RPC methods corresponding to the services defined in the `.proto` file. + +For example, if the `.proto` file contains the following service definition: + +```proto +service MyService { + rpc MyMethod(MyRequest) returns (MyResponse) {} +} + +service OtherService { + rpc OtherMethod(MyRequest) returns (MyResponse) {} +} +``` + +Then the generated `service implementation ` interfaces are expected: + +```go +type MyService interface { + MyMethod(context.Context, *MyRequest) (*MyResponse, error) +} + +type OtherService interface { + OtherMethod(context.Context, *MyRequest) (*MyResponse, error) +} +``` + +And the corresponding generated `NewServer` method would be: + +```go +func NewServer(myService MyService, otherService OtherService, tlsConfig *tls.Config, logger *zerolog.Logger) (*Server, error) { + ... +} +``` + +The generated `NewServer` method also takes the additional two arguments: + +- `tlsConfig`: a `*tls.Config` that will be used to configure TLS for fRPC server. This can be left as `nil` if no TLS is required. +- `logger`: a `*zerolog.Logger` that will be used to log all events. This can be left as `nil` if no logging is required. + +It returns an fRPC `*Server` on success and an error otherwise. + + + For long-running RPC handlers it's important to pay attention to the passed on + context.Context - it will be cancelled when the server is shutdown and + handlers are expected to return as soon as that happens. + + +# SetBaseContext + +The `SetBaseContext(f func() context.Context) error` method is used to set the base context for all incoming RPCs. This is useful if you want to set a common context for all incoming RPCs. + +# SetOnClosed + +The `SetOnClosed(f func(*frisbee.Async, error)) error` method is used to set the callback function that will be called when a connection to an fRPC client is closed. This is useful if you want to do any cleanup when a connection to a client is closed. + +# SetHandlerTable + +The `SetHandlerTable(handlerTable frisbee.HandlerTable) error` method is used to set the handler table for the fRPC server. This is useful if you want to set a custom handler table for the fRPC server, and is commonly used to extend the fRPC server with custom handlers for +alternative messaging patterns. In order to avoid breaking the fRPC functionality, it's recommended to first use the `GetHandlerTable` method to retrieve the base handler table, modify it, and then use the `SetHandlerTable` method to set the modified handler table. + +# GetHandlerTable + +The `GetHandlerTable() frisbee.HandlerTable` method is used to retrieve the handler table for the fRPC server. This is useful if you want to retrieve and extend handler table for the fRPC server, and is commonly used with the `SetHandlerTable` method. + +# SetPreWrite + +The `SetPreWrite(f func()) error` method is used to set the pre-write callback function for the fRPC server. This is useful if you want to handle metrics or do some logging before a request is written to the client. + +# SetConcurrency + +The `SetConcurrency(concurrency uint64)` method is used to set the concurrency for the fRPC server. This is useful if you want to set a maximum number of concurrent goroutines that can be spawned by the fRPC server (across all clients). + +Setting this value to `0` will result in the fRPC server spawning an unlimited number of goroutines to handle incoming RPCs, and setting the value to `1` will result in the fRPC server spawning a single goroutine (per fRPC Client) to handle incoming RPCs. + +All other values will result in the fRPC server spawning (at maximum) the specified number of goroutines to handle incoming RPCs. + +# Start + +The `Start(addr string) error` method is used to start the fRPC server. It takes the address to listen on as an argument and will return an error if the server fails to start. + +# ServeConn + +The `ServeConn(conn net.Conn)` method is used to serve a given net.Conn. It takes the connection to serve as an argument and is a non-blocking method - it will return immediately after starting to serve the connection in a separate goroutine, and if it +encounters an error, the `OnClosed` callback function will be called with the error. + +# Logger + +The `Logger() *zerolog.Logger` method is used to retrieve the logger for the fRPC server. This is useful if you want to retrieve and extend the fRPC server. + +# Shutdown + +The `Shutdown() error` method is used to shutdown the fRPC server. It will return an error if the server fails to shutdown, and it will clean up all goroutines spawned by the server before returning. Any active connections will be closed before the server is shutdown, and any active RPCs +will be cancelled. The contexts given to the RPCs will be cancelled as well. + +# Generated Interfaces + +When generating the fRPC Server, each service in the `.proto` file requires a `service implementation` that +fulfills the RPC methods defined for the service. + +For example, if the `.proto` file contains the following service definition: + +```proto +service MyService { + rpc MyMethod(MyRequest) returns (MyResponse) {} +} +``` + +then the generated `service implementation ` interface looks like this: + +```go +type MyService interface { + MyMethod(context.Context, *MyRequest) (*MyResponse, error) +} +``` + +This is a similar function signature to the one gRPC would generate, making it easy to reuse the service implementation from gRPC. diff --git a/go.mod b/go.mod index 1d6ee2f..b8ef354 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,8 @@ module github.com/loopholelabs/frpc-go go 1.18 require ( - github.com/loopholelabs/common v0.4.4 - github.com/loopholelabs/frisbee-go v0.7.0 - github.com/loopholelabs/polyglot-go v0.5.0 + github.com/loopholelabs/frisbee-go v0.7.1 + github.com/loopholelabs/polyglot-go v0.5.1 github.com/loopholelabs/testing v0.2.3 github.com/rs/zerolog v1.28.0 github.com/stretchr/testify v1.8.0 @@ -16,6 +15,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/text v0.2.0 // indirect + github.com/loopholelabs/common v0.4.4 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index 5075225..0dad396 100644 --- a/go.sum +++ b/go.sum @@ -12,10 +12,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/loopholelabs/common v0.4.4 h1:Ge+1v1WiLYgR/4pziOQoJAwUqUm1c9j6nQvnkiFFBsk= github.com/loopholelabs/common v0.4.4/go.mod h1:YKnljczr4jgxkHhhAwIHh3CJXaff89YBd8Vp3pwpG3k= -github.com/loopholelabs/frisbee-go v0.7.0 h1:LGzG/NYQBsRnmojfVoSBTC31PgfV9XutQA39IU9gzf8= -github.com/loopholelabs/frisbee-go v0.7.0/go.mod h1:XfDgwwOkgN/ktzaDAq3Zu0A9Dl0w5/xkZ2qfXAvRbjs= -github.com/loopholelabs/polyglot-go v0.5.0 h1:F65/d+65qgAu2F0GcWzP6UVIwd9897bNEgylNMr8FGk= -github.com/loopholelabs/polyglot-go v0.5.0/go.mod h1:Z0QiNv4KRuWjQWpUerMhmkvRh6ks1pYmEH4SGpG0EHQ= +github.com/loopholelabs/frisbee-go v0.7.1 h1:imAu7k1blav6FH9nLLn2wqi8d3rHJZqk9e20EglMEqo= +github.com/loopholelabs/frisbee-go v0.7.1/go.mod h1:vvW59GSxsw0euO6NtOIWD4lAgXu0jNE9bjFPQGxdOBc= +github.com/loopholelabs/polyglot-go v0.5.1 h1:21QVDELp+EodPUAL+Aw8GNXLyt2BFj9gYQsGvHIFlcc= +github.com/loopholelabs/polyglot-go v0.5.1/go.mod h1:Z0QiNv4KRuWjQWpUerMhmkvRh6ks1pYmEH4SGpG0EHQ= github.com/loopholelabs/testing v0.2.3 h1:4nVuK5ctaE6ua5Z0dYk2l7xTFmcpCYLUeGjRBp8keOA= github.com/loopholelabs/testing v0.2.3/go.mod h1:gqtGY91soYD1fQoKQt/6kP14OYpS7gcbcIgq5mc9m8Q= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= diff --git a/internal/version/version.go b/internal/version/version.go index 0b9768e..318781d 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -17,5 +17,5 @@ package version const ( - Version = "v0.7.0" + Version = "v0.7.1" ) diff --git a/pkg/generator/imports.go b/pkg/generator/imports.go index 561d7b0..7cdbb73 100644 --- a/pkg/generator/imports.go +++ b/pkg/generator/imports.go @@ -20,6 +20,7 @@ var ( requiredImports = []string{ "errors", "github.com/loopholelabs/polyglot-go", + "net", } serviceImports = []string{ @@ -31,7 +32,6 @@ var ( } streamMethodImports = []string{ - "github.com/loopholelabs/common/pkg/queue", "go.uber.org/atomic", "io", } diff --git a/pkg/generator/test/generator_test.go b/pkg/generator/test/generator_test.go index 90aa85d..a0e2b89 100644 --- a/pkg/generator/test/generator_test.go +++ b/pkg/generator/test/generator_test.go @@ -42,31 +42,19 @@ func TestRPC(t *testing.T) { go server.ServeConn(sConn) t.Run("Synchronous Request", func(t *testing.T) { - t.Parallel() - go func() { - testSynchronous(client, t) - }() + testSynchronous(client, t) }) t.Run("Bi-directional Stream", func(t *testing.T) { - t.Parallel() - go func() { - testBidirectional(client, t) - }() + testBidirectional(client, t) }) t.Run("Server Stream", func(t *testing.T) { - t.Parallel() - go func() { - testServerStreaming(client, t) - }() + testServerStreaming(client, t) }) t.Run("Client Stream", func(t *testing.T) { - t.Parallel() - go func() { - testClientStreaming(client, t) - }() + testClientStreaming(client, t) }) } @@ -151,6 +139,7 @@ func testClientStreaming(client *Client, t *testing.T) { err := stream.Send(data) assert.NoError(t, err) } - err = stream.CloseSend() + res, err := stream.CloseAndRecv() assert.NoError(t, err) + assert.Equal(t, "Hello World", res.Message) } diff --git a/pkg/generator/test/server.go b/pkg/generator/test/server.go index 32bb60b..f3995a6 100644 --- a/pkg/generator/test/server.go +++ b/pkg/generator/test/server.go @@ -37,14 +37,7 @@ func (s svc) Echo(ctx context.Context, request *Request) (*Response, error) { } func (s svc) EchoStream(srv *EchoStreamServer) error { - ctx := srv.Context() for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - request, err := srv.Recv() if err == io.EOF { err = srv.CloseSend() @@ -89,9 +82,10 @@ func (s svc) Upload(srv *UploadServer) error { for { res, err := srv.Recv() if err == io.EOF { - assert.Equal(s.t, 10, received) - return srv.CloseSend() + assert.Equal(s.t, 11, received) + return srv.CloseAndSend(&Response{Message: "Hello World", Test: &Data{}}) } + received += 1 assert.NoError(s.t, err) assert.Equal(s.t, "Hello World", res.Message) } diff --git a/pkg/generator/test/test.frpc.go b/pkg/generator/test/test.frpc.go new file mode 100644 index 0000000..9834491 --- /dev/null +++ b/pkg/generator/test/test.frpc.go @@ -0,0 +1,2266 @@ +// Code generated by fRPC Go v0.7.1, DO NOT EDIT. +// source: test.proto + +package test + +import ( + "errors" + "github.com/loopholelabs/polyglot-go" + "net" + + "context" + "crypto/tls" + "github.com/loopholelabs/frisbee-go" + "github.com/loopholelabs/frisbee-go/pkg/packet" + "github.com/rs/zerolog" + + "sync" + + "go.uber.org/atomic" + "io" +) + +var ( + NilDecode = errors.New("cannot decode into a nil root struct") +) + +type Test uint32 + +const ( + Potato = Test(0) + Monkey = Test(1) +) + +type RequestCorpus uint32 + +const ( + RequestUNIVERSAL = RequestCorpus(0) + RequestWEB = RequestCorpus(1) + RequestIMAGES = RequestCorpus(2) + RequestLOCAL = RequestCorpus(3) + RequestNEWS = RequestCorpus(4) + RequestPRODUCTS = RequestCorpus(5) + RequestVIDEO = RequestCorpus(6) +) + +type Request struct { + error error + flags uint8 + + Message string + Corpus RequestCorpus +} + +func NewRequest() *Request { + return &Request{} +} + +func (x *Request) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *Request) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + polyglot.Encoder(b).String(x.Message).Uint32(uint32(x.Corpus)) + } +} + +func (x *Request) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *Request) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + x.Message, err = d.String() + if err != nil { + return err + } + var CorpusTemp uint32 + CorpusTemp, err = d.Uint32() + x.Corpus = RequestCorpus(CorpusTemp) + if err != nil { + return err + } + return nil +} + +type Response struct { + error error + flags uint8 + + Message string + Test *Data +} + +func NewResponse() *Response { + return &Response{ + Test: NewData(), + } +} + +func (x *Response) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *Response) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + polyglot.Encoder(b).String(x.Message) + x.Test.Encode(b) + } +} + +func (x *Response) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *Response) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + x.Message, err = d.String() + if err != nil { + return err + } + if x.Test == nil { + x.Test = NewData() + } + err = x.Test.decode(d) + if err != nil { + return err + } + return nil +} + +type Data struct { + error error + flags uint8 + + Message string + Checker Test +} + +func NewData() *Data { + return &Data{} +} + +func (x *Data) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *Data) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + polyglot.Encoder(b).String(x.Message).Uint32(uint32(x.Checker)) + } +} + +func (x *Data) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *Data) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + x.Message, err = d.String() + if err != nil { + return err + } + var CheckerTemp uint32 + CheckerTemp, err = d.Uint32() + x.Checker = Test(CheckerTemp) + if err != nil { + return err + } + return nil +} + +type MyMessage1EnumAllowingAlias uint32 + +const ( + MyMessage1UNKNOWN = MyMessage1EnumAllowingAlias(0) + MyMessage1STARTED = MyMessage1EnumAllowingAlias(1) + MyMessage1RUNNING = MyMessage1EnumAllowingAlias(2) +) + +type MyMessage1 struct { + error error + flags uint8 +} + +func NewMyMessage1() *MyMessage1 { + return &MyMessage1{} +} + +func (x *MyMessage1) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *MyMessage1) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + } +} + +func (x *MyMessage1) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *MyMessage1) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + return nil +} + +type MyMessage2EnumNotAllowingAlias uint32 + +const ( + MyMessage2UNKNOWN = MyMessage2EnumNotAllowingAlias(0) + MyMessage2STARTED = MyMessage2EnumNotAllowingAlias(1) +) + +type MyMessage2 struct { + error error + flags uint8 +} + +func NewMyMessage2() *MyMessage2 { + return &MyMessage2{} +} + +func (x *MyMessage2) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *MyMessage2) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + } +} + +func (x *MyMessage2) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *MyMessage2) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + return nil +} + +type SearchResponseResult struct { + error error + flags uint8 + + Url string + Title string + Snippets []string +} + +func NewSearchResponseResult() *SearchResponseResult { + return &SearchResponseResult{} +} + +func (x *SearchResponseResult) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *SearchResponseResult) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + polyglot.Encoder(b).String(x.Url).String(x.Title) + polyglot.Encoder(b).Slice(uint32(len(x.Snippets)), polyglot.StringKind) + for _, v := range x.Snippets { + polyglot.Encoder(b).String(v) + } + } +} + +func (x *SearchResponseResult) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *SearchResponseResult) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + x.Url, err = d.String() + if err != nil { + return err + } + x.Title, err = d.String() + if err != nil { + return err + } + var sliceSize uint32 + sliceSize, err = d.Slice(polyglot.StringKind) + if err != nil { + return err + } + if uint32(len(x.Snippets)) != sliceSize { + x.Snippets = make([]string, sliceSize) + } + for i := uint32(0); i < sliceSize; i++ { + x.Snippets[i], err = d.String() + if err != nil { + return err + } + } + return nil +} + +type SearchResponse struct { + error error + flags uint8 + + Results []*SearchResponseResult + Results2 []*SearchResponseResult + Snippets []string + Snippets2 []string +} + +func NewSearchResponse() *SearchResponse { + return &SearchResponse{} +} + +func (x *SearchResponse) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *SearchResponse) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + polyglot.Encoder(b).Slice(uint32(len(x.Results)), polyglot.AnyKind) + for _, v := range x.Results { + v.Encode(b) + } + polyglot.Encoder(b).Slice(uint32(len(x.Results2)), polyglot.AnyKind) + for _, v := range x.Results2 { + v.Encode(b) + } + polyglot.Encoder(b).Slice(uint32(len(x.Snippets)), polyglot.StringKind) + for _, v := range x.Snippets { + polyglot.Encoder(b).String(v) + } + polyglot.Encoder(b).Slice(uint32(len(x.Snippets2)), polyglot.StringKind) + for _, v := range x.Snippets2 { + polyglot.Encoder(b).String(v) + } + } +} + +func (x *SearchResponse) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *SearchResponse) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + var sliceSize uint32 + sliceSize, err = d.Slice(polyglot.AnyKind) + if err != nil { + return err + } + if uint32(len(x.Results)) != sliceSize { + x.Results = make([]*SearchResponseResult, sliceSize) + } + for i := uint32(0); i < sliceSize; i++ { + if x.Results[i] == nil { + x.Results[i] = NewSearchResponseResult() + } + err = x.Results[i].decode(d) + if err != nil { + return err + } + } + sliceSize, err = d.Slice(polyglot.AnyKind) + if err != nil { + return err + } + if uint32(len(x.Results2)) != sliceSize { + x.Results2 = make([]*SearchResponseResult, sliceSize) + } + for i := uint32(0); i < sliceSize; i++ { + if x.Results2[i] == nil { + x.Results2[i] = NewSearchResponseResult() + } + err = x.Results2[i].decode(d) + if err != nil { + return err + } + } + sliceSize, err = d.Slice(polyglot.StringKind) + if err != nil { + return err + } + if uint32(len(x.Snippets)) != sliceSize { + x.Snippets = make([]string, sliceSize) + } + for i := uint32(0); i < sliceSize; i++ { + x.Snippets[i], err = d.String() + if err != nil { + return err + } + } + sliceSize, err = d.Slice(polyglot.StringKind) + if err != nil { + return err + } + if uint32(len(x.Snippets2)) != sliceSize { + x.Snippets2 = make([]string, sliceSize) + } + for i := uint32(0); i < sliceSize; i++ { + x.Snippets2[i], err = d.String() + if err != nil { + return err + } + } + return nil +} + +type Resulting struct { + error error + flags uint8 + + Url string + Title string + Snippets []string +} + +func NewResulting() *Resulting { + return &Resulting{} +} + +func (x *Resulting) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *Resulting) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + polyglot.Encoder(b).String(x.Url).String(x.Title) + polyglot.Encoder(b).Slice(uint32(len(x.Snippets)), polyglot.StringKind) + for _, v := range x.Snippets { + polyglot.Encoder(b).String(v) + } + } +} + +func (x *Resulting) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *Resulting) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + x.Url, err = d.String() + if err != nil { + return err + } + x.Title, err = d.String() + if err != nil { + return err + } + var sliceSize uint32 + sliceSize, err = d.Slice(polyglot.StringKind) + if err != nil { + return err + } + if uint32(len(x.Snippets)) != sliceSize { + x.Snippets = make([]string, sliceSize) + } + for i := uint32(0); i < sliceSize; i++ { + x.Snippets[i], err = d.String() + if err != nil { + return err + } + } + return nil +} + +type SomeOtherMessage struct { + error error + flags uint8 + + Result *SearchResponseResult +} + +func NewSomeOtherMessage() *SomeOtherMessage { + return &SomeOtherMessage{ + Result: NewSearchResponseResult(), + } +} + +func (x *SomeOtherMessage) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *SomeOtherMessage) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + x.Result.Encode(b) + } +} + +func (x *SomeOtherMessage) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *SomeOtherMessage) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + if x.Result == nil { + x.Result = NewSearchResponseResult() + } + err = x.Result.decode(d) + if err != nil { + return err + } + return nil +} + +type OuterMiddleAAInner struct { + error error + flags uint8 + + Ival int64 + Booly bool +} + +func NewOuterMiddleAAInner() *OuterMiddleAAInner { + return &OuterMiddleAAInner{} +} + +func (x *OuterMiddleAAInner) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *OuterMiddleAAInner) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + polyglot.Encoder(b).Int64(x.Ival).Bool(x.Booly) + } +} + +func (x *OuterMiddleAAInner) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *OuterMiddleAAInner) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + x.Ival, err = d.Int64() + if err != nil { + return err + } + x.Booly, err = d.Bool() + if err != nil { + return err + } + return nil +} + +type OuterMiddleAA struct { + error error + flags uint8 + + Inner *OuterMiddleAAInner +} + +func NewOuterMiddleAA() *OuterMiddleAA { + return &OuterMiddleAA{ + Inner: NewOuterMiddleAAInner(), + } +} + +func (x *OuterMiddleAA) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *OuterMiddleAA) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + x.Inner.Encode(b) + } +} + +func (x *OuterMiddleAA) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *OuterMiddleAA) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + if x.Inner == nil { + x.Inner = NewOuterMiddleAAInner() + } + err = x.Inner.decode(d) + if err != nil { + return err + } + return nil +} + +type OuterMiddleBBInner struct { + error error + flags uint8 + + Ival int32 + Booly bool +} + +func NewOuterMiddleBBInner() *OuterMiddleBBInner { + return &OuterMiddleBBInner{} +} + +func (x *OuterMiddleBBInner) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *OuterMiddleBBInner) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + polyglot.Encoder(b).Int32(x.Ival).Bool(x.Booly) + } +} + +func (x *OuterMiddleBBInner) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *OuterMiddleBBInner) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + x.Ival, err = d.Int32() + if err != nil { + return err + } + x.Booly, err = d.Bool() + if err != nil { + return err + } + return nil +} + +type OuterMiddleBB struct { + error error + flags uint8 + + Inner *OuterMiddleBBInner +} + +func NewOuterMiddleBB() *OuterMiddleBB { + return &OuterMiddleBB{ + Inner: NewOuterMiddleBBInner(), + } +} + +func (x *OuterMiddleBB) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *OuterMiddleBB) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + x.Inner.Encode(b) + } +} + +func (x *OuterMiddleBB) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *OuterMiddleBB) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + if x.Inner == nil { + x.Inner = NewOuterMiddleBBInner() + } + err = x.Inner.decode(d) + if err != nil { + return err + } + return nil +} + +type Outer struct { + error error + flags uint8 + + A *OuterMiddleAA + B *OuterMiddleBB +} + +func NewOuter() *Outer { + return &Outer{ + A: NewOuterMiddleAA(), + B: NewOuterMiddleBB(), + } +} + +func (x *Outer) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *Outer) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + x.A.Encode(b) + x.B.Encode(b) + } +} + +func (x *Outer) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *Outer) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + if x.A == nil { + x.A = NewOuterMiddleAA() + } + err = x.A.decode(d) + if err != nil { + return err + } + if x.B == nil { + x.B = NewOuterMiddleBB() + } + err = x.B.decode(d) + if err != nil { + return err + } + return nil +} + +type SampleMessage struct { + error error + flags uint8 + + Name string + Potato string +} + +func NewSampleMessage() *SampleMessage { + return &SampleMessage{} +} + +func (x *SampleMessage) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *SampleMessage) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + polyglot.Encoder(b).String(x.Name).String(x.Potato) + } +} + +func (x *SampleMessage) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *SampleMessage) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + x.Name, err = d.String() + if err != nil { + return err + } + x.Potato, err = d.String() + if err != nil { + return err + } + return nil +} + +type TestPotatoPricesMap map[string]Test + +func NewTestPotatoPricesMap(size uint32) map[string]Test { + return make(map[string]Test, size) +} + +func (x TestPotatoPricesMap) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + polyglot.Encoder(b).Map(uint32(len(x)), polyglot.StringKind, polyglot.Uint32Kind) + for k, v := range x { + polyglot.Encoder(b).String(k) + polyglot.Encoder(b).Uint32(uint32(v)) + } + } +} + +func (x TestPotatoPricesMap) decode(d *polyglot.Decoder, size uint32) error { + if size == 0 { + return nil + } + var k string + var v Test + var ValueTemp uint32 + var err error + for i := uint32(0); i < size; i++ { + k, err = d.String() + if err != nil { + return err + } + ValueTemp, err = d.Uint32() + v = Test(ValueTemp) + if err != nil { + return err + } + x[k] = v + } + return nil +} + +type TestPotato struct { + error error + flags uint8 + + Prices TestPotatoPricesMap +} + +func NewTestPotato() *TestPotato { + return &TestPotato{} +} + +func (x *TestPotato) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *TestPotato) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + x.Prices.Encode(b) + } +} + +func (x *TestPotato) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *TestPotato) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + if !d.Nil() { + PricesSize, err := d.Map(polyglot.StringKind, polyglot.Uint32Kind) + if err != nil { + return err + } + x.Prices = NewTestPotatoPricesMap(PricesSize) + err = x.Prices.decode(d, PricesSize) + if err != nil { + return err + } + } + return nil +} + +type StockPricesPricesMap map[string]float64 + +func NewStockPricesPricesMap(size uint32) map[string]float64 { + return make(map[string]float64, size) +} + +func (x StockPricesPricesMap) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + polyglot.Encoder(b).Map(uint32(len(x)), polyglot.StringKind, polyglot.Float64Kind) + for k, v := range x { + polyglot.Encoder(b).String(k) + polyglot.Encoder(b).Float64(v) + } + } +} + +func (x StockPricesPricesMap) decode(d *polyglot.Decoder, size uint32) error { + if size == 0 { + return nil + } + var k string + var v float64 + var err error + for i := uint32(0); i < size; i++ { + k, err = d.String() + if err != nil { + return err + } + v, err = d.Float64() + if err != nil { + return err + } + x[k] = v + } + return nil +} + +type StockPrices struct { + error error + flags uint8 + + Prices StockPricesPricesMap +} + +func NewStockPrices() *StockPrices { + return &StockPrices{} +} + +func (x *StockPrices) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *StockPrices) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + x.Prices.Encode(b) + } +} + +func (x *StockPrices) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *StockPrices) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + if !d.Nil() { + PricesSize, err := d.Map(polyglot.StringKind, polyglot.Float64Kind) + if err != nil { + return err + } + x.Prices = NewStockPricesPricesMap(PricesSize) + err = x.Prices.decode(d, PricesSize) + if err != nil { + return err + } + } + return nil +} + +type StockPricesWrapper struct { + error error + flags uint8 + + SPrices []*StockPrices +} + +func NewStockPricesWrapper() *StockPricesWrapper { + return &StockPricesWrapper{} +} + +func (x *StockPricesWrapper) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *StockPricesWrapper) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + polyglot.Encoder(b).Slice(uint32(len(x.SPrices)), polyglot.AnyKind) + for _, v := range x.SPrices { + v.Encode(b) + } + } +} + +func (x *StockPricesWrapper) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *StockPricesWrapper) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + var sliceSize uint32 + sliceSize, err = d.Slice(polyglot.AnyKind) + if err != nil { + return err + } + if uint32(len(x.SPrices)) != sliceSize { + x.SPrices = make([]*StockPrices, sliceSize) + } + for i := uint32(0); i < sliceSize; i++ { + if x.SPrices[i] == nil { + x.SPrices[i] = NewStockPrices() + } + err = x.SPrices[i].decode(d) + if err != nil { + return err + } + } + return nil +} + +type StockPricesSuperWrapPricesMap map[string]*StockPricesWrapper + +func NewStockPricesSuperWrapPricesMap(size uint32) map[string]*StockPricesWrapper { + return make(map[string]*StockPricesWrapper, size) +} + +func (x StockPricesSuperWrapPricesMap) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + polyglot.Encoder(b).Map(uint32(len(x)), polyglot.StringKind, polyglot.AnyKind) + for k, v := range x { + polyglot.Encoder(b).String(k) + v.Encode(b) + } + } +} + +func (x StockPricesSuperWrapPricesMap) decode(d *polyglot.Decoder, size uint32) error { + if size == 0 { + return nil + } + var k string + var v *StockPricesWrapper + var err error + for i := uint32(0); i < size; i++ { + k, err = d.String() + if err != nil { + return err + } + v = NewStockPricesWrapper() + err = v.decode(d) + if err != nil { + return err + } + x[k] = v + } + return nil +} + +type StockPricesSuperWrap struct { + error error + flags uint8 + + Prices StockPricesSuperWrapPricesMap +} + +func NewStockPricesSuperWrap() *StockPricesSuperWrap { + return &StockPricesSuperWrap{} +} + +func (x *StockPricesSuperWrap) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *StockPricesSuperWrap) Encode(b *polyglot.Buffer) { + if x == nil { + polyglot.Encoder(b).Nil() + } else { + if x.error != nil { + polyglot.Encoder(b).Error(x.error) + return + } + polyglot.Encoder(b).Uint8(x.flags) + + x.Prices.Encode(b) + } +} + +func (x *StockPricesSuperWrap) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *StockPricesSuperWrap) decode(d *polyglot.Decoder) error { + if d.Nil() { + return nil + } + + var err error + x.error, err = d.Error() + if err == nil { + return nil + } + x.flags, err = d.Uint8() + if err != nil { + return err + } + if !d.Nil() { + PricesSize, err := d.Map(polyglot.StringKind, polyglot.AnyKind) + if err != nil { + return err + } + x.Prices = NewStockPricesSuperWrapPricesMap(PricesSize) + err = x.Prices.decode(d, PricesSize) + if err != nil { + return err + } + } + return nil +} + +type EchoService interface { + Echo(context.Context, *Request) (*Response, error) + + EchoStream(srv *EchoStreamServer) error + Testy(context.Context, *SearchResponse) (*StockPricesWrapper, error) + + Search(req *SearchResponse, srv *SearchServer) error + + Upload(srv *UploadServer) error +} + +const connectionContextKey int = 1000 + +func SetErrorFlag(flags uint8, error bool) uint8 { + return flags | 0x2 +} +func HasErrorFlag(flags uint8) bool { + return flags&(1<<1) == 1 +} + +type RPCStreamOpen struct { + operation uint16 +} + +func (x *RPCStreamOpen) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *RPCStreamOpen) Encode(b *polyglot.Buffer) { + polyglot.Encoder(b).Uint16(x.operation) +} + +func (x *RPCStreamOpen) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) +} + +func (x *RPCStreamOpen) decode(d *polyglot.Decoder) error { + var err error + x.operation, err = d.Uint16() + return err +} + +type Server struct { + *frisbee.Server + onClosed func(*frisbee.Async, error) +} + +func NewServer(echoService EchoService, tlsConfig *tls.Config, logger *zerolog.Logger) (*Server, error) { + var s *Server + table := make(frisbee.HandlerTable) + + table[10] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) { + req := NewRequest() + err := req.Decode((*incoming.Content)[:incoming.Metadata.ContentLength]) + if err == nil { + var res *Response + outgoing = incoming + outgoing.Content.Reset() + res, err = echoService.Echo(ctx, req) + if err != nil { + if _, ok := err.(CloseError); ok { + action = frisbee.CLOSE + } + res.Error(outgoing.Content, err) + } else { + res.Encode(outgoing.Content) + } + outgoing.Metadata.ContentLength = uint32(len(*outgoing.Content)) + } + return + } + table[12] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) { + req := NewSearchResponse() + err := req.Decode((*incoming.Content)[:incoming.Metadata.ContentLength]) + if err == nil { + var res *StockPricesWrapper + outgoing = incoming + outgoing.Content.Reset() + res, err = echoService.Testy(ctx, req) + if err != nil { + if _, ok := err.(CloseError); ok { + action = frisbee.CLOSE + } + res.Error(outgoing.Content, err) + } else { + res.Encode(outgoing.Content) + } + outgoing.Metadata.ContentLength = uint32(len(*outgoing.Content)) + } + return + } + var fsrv *frisbee.Server + var err error + if tlsConfig != nil { + fsrv, err = frisbee.NewServer(table, frisbee.WithTLS(tlsConfig), frisbee.WithLogger(logger)) + if err != nil { + return nil, err + } + } else { + fsrv, err = frisbee.NewServer(table, frisbee.WithLogger(logger)) + if err != nil { + return nil, err + } + } + + fsrv.SetStreamHandler(func(conn *frisbee.Async, stream *frisbee.Stream) { + p, err := stream.ReadPacket() + if err != nil { + return + } + open := &RPCStreamOpen{} + err = open.Decode(*p.Content) + if err != nil { + stream.Close() + return + } + switch open.operation { + case 11: + s.createEchoStreamServer(echoService, stream) + case 13: + s.createSearchServer(echoService, stream) + case 14: + s.createUploadServer(echoService, stream) + } + }) + + fsrv.ConnContext = func(ctx context.Context, conn *frisbee.Async) context.Context { + return context.WithValue(ctx, connectionContextKey, conn) + } + s, err = &Server{ + Server: fsrv, + }, nil + + fsrv.SetOnClosed(func(async *frisbee.Async, err error) { + if s.onClosed != nil { + s.onClosed(async, err) + } + }) + return s, err +} + +func (s *Server) SetOnClosed(f func(*frisbee.Async, error)) error { + if f == nil { + return frisbee.OnClosedNil + } + s.onClosed = f + return nil +} + +type EchoStreamServer struct { + recv func() (*Request, error) + send func(*Response) error + + stream *frisbee.Stream + closed *atomic.Bool +} + +func (s *Server) createEchoStreamServer(echoService EchoService, stream *frisbee.Stream) { + srv := &EchoStreamServer{ + closed: atomic.NewBool(false), + stream: stream, + } + + srv.recv = func() (*Request, error) { + p, err := srv.stream.ReadPacket() + if err != nil { + return nil, err + } + + res := NewRequest() + err = res.Decode(*p.Content) + if err != nil { + return nil, err + } + if errors.Is(res.error, io.EOF) { + return nil, io.EOF + } + + return res, nil + } + srv.send = func(m *Response) error { + p := packet.Get() + + m.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + return srv.stream.WritePacket(p) + } + + go func() { + err := echoService.EchoStream(srv) + if err != nil { + res := Response{error: err} + res.flags = SetErrorFlag(res.flags, true) + srv.CloseAndSend(&res) + } else { + srv.CloseSend() + } + }() +} + +func (x *EchoStreamServer) Recv() (*Request, error) { + return x.recv() +} + +func (x *EchoStreamServer) close() { + x.stream.Close() +} +func (x *EchoStreamServer) Send(m *Response) error { + return x.send(m) +} +func (x *EchoStreamServer) CloseSend() error { + return x.send(&Response{error: io.EOF}) +} + +func (x *EchoStreamServer) CloseAndSend(m *Response) error { + err := x.send(m) + if err != nil { + return err + } + return x.CloseSend() +} + +type SearchServer struct { + recv func() (*SearchResponse, error) + send func(*Response) error + + stream *frisbee.Stream + closed *atomic.Bool +} + +func (s *Server) createSearchServer(echoService EchoService, stream *frisbee.Stream) { + srv := &SearchServer{ + closed: atomic.NewBool(false), + stream: stream, + } + + srv.send = func(m *Response) error { + p := packet.Get() + + m.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + return srv.stream.WritePacket(p) + } + + incoming, err := stream.ReadPacket() + if err != nil { + return + } + req := NewSearchResponse() + err = req.Decode((*incoming.Content)[:incoming.Metadata.ContentLength]) + go func() { + + err := echoService.Search(req, srv) + if err != nil { + res := Response{error: err} + res.flags = SetErrorFlag(res.flags, true) + srv.CloseAndSend(&res) + } else { + srv.CloseSend() + } + }() +} + +func (x *SearchServer) Send(m *Response) error { + return x.send(m) +} +func (x *SearchServer) CloseSend() error { + return x.send(&Response{error: io.EOF}) +} + +func (x *SearchServer) CloseAndSend(m *Response) error { + err := x.send(m) + if err != nil { + return err + } + return x.CloseSend() +} + +type UploadServer struct { + recv func() (*Data, error) + send func(*Response) error + + stream *frisbee.Stream + closed *atomic.Bool +} + +func (s *Server) createUploadServer(echoService EchoService, stream *frisbee.Stream) { + srv := &UploadServer{ + closed: atomic.NewBool(false), + stream: stream, + } + + srv.recv = func() (*Data, error) { + p, err := srv.stream.ReadPacket() + if err != nil { + return nil, err + } + + res := NewData() + err = res.Decode(*p.Content) + if err != nil { + return nil, err + } + if errors.Is(res.error, io.EOF) { + return nil, io.EOF + } + + return res, nil + } + srv.send = func(m *Response) error { + p := packet.Get() + + m.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + return srv.stream.WritePacket(p) + } + + go func() { + err := echoService.Upload(srv) + if err != nil { + res := Response{error: err} + res.flags = SetErrorFlag(res.flags, true) + srv.CloseAndSend(&res) + } else { + srv.CloseSend() + } + }() +} + +func (x *UploadServer) Recv() (*Data, error) { + return x.recv() +} + +func (x *UploadServer) close() { + x.stream.Close() +} +func (x *UploadServer) CloseSend() error { + return x.send(&Response{error: io.EOF}) +} + +func (x *UploadServer) CloseAndSend(m *Response) error { + err := x.send(m) + if err != nil { + return err + } + return x.CloseSend() +} + +type subEchoServiceClient struct { + client *frisbee.Client + nextEcho uint16 + nextEchoMu sync.RWMutex + inflightEcho map[uint16]chan *Response + inflightEchoMu sync.RWMutex + nextTesty uint16 + nextTestyMu sync.RWMutex + inflightTesty map[uint16]chan *StockPricesWrapper + inflightTestyMu sync.RWMutex + nextStreamingID uint16 + nextStreamingIDMu sync.RWMutex +} +type Client struct { + *frisbee.Client + EchoService *subEchoServiceClient +} + +func NewClient(tlsConfig *tls.Config, logger *zerolog.Logger) (*Client, error) { + c := new(Client) + table := make(frisbee.HandlerTable) + + table[10] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) { + c.EchoService.inflightEchoMu.RLock() + if ch, ok := c.EchoService.inflightEcho[incoming.Metadata.Id]; ok { + c.EchoService.inflightEchoMu.RUnlock() + res := NewResponse() + res.Decode((*incoming.Content)[:incoming.Metadata.ContentLength]) + ch <- res + } else { + c.EchoService.inflightEchoMu.RUnlock() + } + return + } + table[12] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) { + c.EchoService.inflightTestyMu.RLock() + if ch, ok := c.EchoService.inflightTesty[incoming.Metadata.Id]; ok { + c.EchoService.inflightTestyMu.RUnlock() + res := NewStockPricesWrapper() + res.Decode((*incoming.Content)[:incoming.Metadata.ContentLength]) + ch <- res + } else { + c.EchoService.inflightTestyMu.RUnlock() + } + return + } + var err error + if tlsConfig != nil { + c.Client, err = frisbee.NewClient(table, context.Background(), frisbee.WithTLS(tlsConfig), frisbee.WithLogger(logger)) + if err != nil { + return nil, err + } + } else { + c.Client, err = frisbee.NewClient(table, context.Background(), frisbee.WithLogger(logger)) + if err != nil { + return nil, err + } + } + + c.EchoService = new(subEchoServiceClient) + c.EchoService.client = c.Client + c.EchoService.nextEchoMu.Lock() + c.EchoService.nextEcho = 0 + c.EchoService.nextEchoMu.Unlock() + c.EchoService.inflightEcho = make(map[uint16]chan *Response) + c.EchoService.nextTestyMu.Lock() + c.EchoService.nextTesty = 0 + c.EchoService.nextTestyMu.Unlock() + c.EchoService.inflightTesty = make(map[uint16]chan *StockPricesWrapper) + return c, nil +} + +func (c *Client) Connect(addr string, streamHandler ...frisbee.NewStreamHandler) error { + return c.Client.Connect(addr, func(stream *frisbee.Stream) {}) +} + +func (c *Client) FromConn(conn net.Conn, streamHandler ...frisbee.NewStreamHandler) error { + return c.Client.FromConn(conn, func(stream *frisbee.Stream) {}) +} + +func (c *subEchoServiceClient) Echo(ctx context.Context, req *Request) (res *Response, err error) { + ch := make(chan *Response, 1) + p := packet.Get() + p.Metadata.Operation = 10 + + c.nextEchoMu.Lock() + c.nextEcho += 1 + id := c.nextEcho + c.nextEchoMu.Unlock() + p.Metadata.Id = id + + req.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + c.inflightEchoMu.Lock() + c.inflightEcho[id] = ch + c.inflightEchoMu.Unlock() + err = c.client.WritePacket(p) + if err != nil { + packet.Put(p) + return + } + select { + case res = <-ch: + err = res.error + case <-ctx.Done(): + err = ctx.Err() + } + c.inflightEchoMu.Lock() + delete(c.inflightEcho, id) + c.inflightEchoMu.Unlock() + packet.Put(p) + return +} + +func (c *subEchoServiceClient) EchoStream(ctx context.Context, req *Request) (*EchoStreamClient, error) { + p := packet.Get() + + c.nextStreamingIDMu.Lock() + c.nextStreamingID += 1 + id := c.nextStreamingID + c.nextStreamingIDMu.Unlock() + + open := &RPCStreamOpen{operation: 11} + + open.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + + fStream := c.client.Stream(id) + fStream.WritePacket(p) + + if req != nil { + p2 := packet.Get() + req.Encode(p2.Content) + p2.Metadata.ContentLength = uint32(len(*p2.Content)) + fStream.WritePacket(p2) + } + + stream := EchoStreamClient{ + context: ctx, + stream: fStream, + closed: atomic.NewBool(false), + } + + stream.recv = func() (*Response, error) { + p, err := stream.stream.ReadPacket() + if err != nil { + return nil, err + } + + res := NewResponse() + err = res.Decode(*p.Content) + if err != nil { + return nil, err + } + if errors.Is(res.error, io.EOF) { + return nil, io.EOF + } + + return res, nil + } + + stream.close = func() { + stream.stream.Close() + } + stream.send = func(m *Request) error { + p := packet.Get() + + m.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + return stream.stream.WritePacket(p) + } + return &stream, nil +} + +type EchoStreamClient struct { + context context.Context + recv func() (*Response, error) + close func() + closed *atomic.Bool + + stream *frisbee.Stream + send func(*Request) error +} + +func (x *EchoStreamClient) Recv() (*Response, error) { + return x.recv() +} +func (x *EchoStreamClient) Send(m *Request) error { + return x.send(m) +} + +func (x *EchoStreamClient) CloseSend() error { + return x.send(&Request{error: io.EOF}) +} + +func (x *EchoStreamClient) CloseAndRecv() (*Response, error) { + err := x.send(&Request{error: io.EOF}) + if err != nil { + return nil, err + } + return x.recv() +} + +func (c *subEchoServiceClient) Testy(ctx context.Context, req *SearchResponse) (res *StockPricesWrapper, err error) { + ch := make(chan *StockPricesWrapper, 1) + p := packet.Get() + p.Metadata.Operation = 12 + + c.nextTestyMu.Lock() + c.nextTesty += 1 + id := c.nextTesty + c.nextTestyMu.Unlock() + p.Metadata.Id = id + + req.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + c.inflightTestyMu.Lock() + c.inflightTesty[id] = ch + c.inflightTestyMu.Unlock() + err = c.client.WritePacket(p) + if err != nil { + packet.Put(p) + return + } + select { + case res = <-ch: + err = res.error + case <-ctx.Done(): + err = ctx.Err() + } + c.inflightTestyMu.Lock() + delete(c.inflightTesty, id) + c.inflightTestyMu.Unlock() + packet.Put(p) + return +} + +func (c *subEchoServiceClient) Search(ctx context.Context, req *SearchResponse) (*SearchClient, error) { + p := packet.Get() + + c.nextStreamingIDMu.Lock() + c.nextStreamingID += 1 + id := c.nextStreamingID + c.nextStreamingIDMu.Unlock() + + open := &RPCStreamOpen{operation: 13} + + open.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + + fStream := c.client.Stream(id) + fStream.WritePacket(p) + + if req != nil { + p2 := packet.Get() + req.Encode(p2.Content) + p2.Metadata.ContentLength = uint32(len(*p2.Content)) + fStream.WritePacket(p2) + } + + stream := SearchClient{ + context: ctx, + stream: fStream, + closed: atomic.NewBool(false), + } + + stream.recv = func() (*Response, error) { + p, err := stream.stream.ReadPacket() + if err != nil { + return nil, err + } + + res := NewResponse() + err = res.Decode(*p.Content) + if err != nil { + return nil, err + } + if errors.Is(res.error, io.EOF) { + return nil, io.EOF + } + + return res, nil + } + + stream.close = func() { + stream.stream.Close() + } + stream.send = func(m *SearchResponse) error { + p := packet.Get() + + m.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + return stream.stream.WritePacket(p) + } + return &stream, nil +} + +type SearchClient struct { + context context.Context + recv func() (*Response, error) + close func() + closed *atomic.Bool + + stream *frisbee.Stream + send func(*SearchResponse) error +} + +func (x *SearchClient) Recv() (*Response, error) { + return x.recv() +} + +func (c *subEchoServiceClient) Upload(ctx context.Context, req *Data) (*UploadClient, error) { + p := packet.Get() + + c.nextStreamingIDMu.Lock() + c.nextStreamingID += 1 + id := c.nextStreamingID + c.nextStreamingIDMu.Unlock() + + open := &RPCStreamOpen{operation: 14} + + open.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + + fStream := c.client.Stream(id) + fStream.WritePacket(p) + + if req != nil { + p2 := packet.Get() + req.Encode(p2.Content) + p2.Metadata.ContentLength = uint32(len(*p2.Content)) + fStream.WritePacket(p2) + } + + stream := UploadClient{ + context: ctx, + stream: fStream, + closed: atomic.NewBool(false), + } + + stream.recv = func() (*Response, error) { + p, err := stream.stream.ReadPacket() + if err != nil { + return nil, err + } + + res := NewResponse() + err = res.Decode(*p.Content) + if err != nil { + return nil, err + } + if errors.Is(res.error, io.EOF) { + return nil, io.EOF + } + + return res, nil + } + + stream.close = func() { + stream.stream.Close() + } + stream.send = func(m *Data) error { + p := packet.Get() + + m.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + return stream.stream.WritePacket(p) + } + return &stream, nil +} + +type UploadClient struct { + context context.Context + recv func() (*Response, error) + close func() + closed *atomic.Bool + + stream *frisbee.Stream + send func(*Data) error +} + +func (x *UploadClient) Send(m *Data) error { + return x.send(m) +} + +func (x *UploadClient) CloseSend() error { + return x.send(&Data{error: io.EOF}) +} + +func (x *UploadClient) CloseAndRecv() (*Response, error) { + err := x.send(&Data{error: io.EOF}) + if err != nil { + return nil, err + } + return x.recv() +} + +type CloseError struct { + err error +} + +func NewCloseError(err error) CloseError { + return CloseError{err: err} +} + +func (e CloseError) Error() string { + return e.err.Error() +} diff --git a/templates/client.templ b/templates/client.templ index b3fcdbe..d0a8133 100644 --- a/templates/client.templ +++ b/templates/client.templ @@ -5,15 +5,15 @@ client *frisbee.Client {{ range $i, $v := (MakeIterable $service.Methods.Len) -}} {{ $method := $service.Methods.Get $i -}} + {{ if not (or $method.IsStreamingServer $method.IsStreamingClient) -}} next{{ CamelCaseName $method.Name }} uint16 next{{ CamelCaseName $method.Name }}Mu sync.RWMutex - {{ if $method.IsStreamingServer -}} - inflight{{ CamelCaseName $method.Name }} map[uint16]*{{ CamelCaseName $method.Name }}Client - {{ else -}} inflight{{ CamelCaseName $method.Name }} map[uint16]chan *{{ CamelCase $method.Output.FullName }} - {{ end -}} inflight{{ CamelCaseName $method.Name }}Mu sync.RWMutex + {{ end -}} {{end -}} + nextStreamingID uint16 + nextStreamingIDMu sync.RWMutex } {{end -}} @@ -50,12 +50,10 @@ func NewClient (tlsConfig *tls.Config, logger *zerolog.Logger) (*Client, error) c.{{ CamelCaseName $service.Name }}.client = c.Client {{ range $i, $v := (MakeIterable $service.Methods.Len) -}} {{ $method := $service.Methods.Get $i -}} + {{ if not (or $method.IsStreamingServer $method.IsStreamingClient) -}} c.{{ CamelCaseName $service.Name }}.next{{ CamelCaseName $method.Name }}Mu.Lock() c.{{ CamelCaseName $service.Name }}.next{{ CamelCaseName $method.Name }} = 0 c.{{ CamelCaseName $service.Name }}.next{{ CamelCaseName $method.Name }}Mu.Unlock() - {{ if $method.IsStreamingServer -}} - c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }} = make(map[uint16]*{{ CamelCaseName $method.Name }}Client) - {{ else -}} c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }} = make(map[uint16]chan *{{ CamelCase $method.Output.FullName }}) {{ end -}} {{end -}} @@ -63,6 +61,14 @@ func NewClient (tlsConfig *tls.Config, logger *zerolog.Logger) (*Client, error) return c, nil } +func (c *Client) Connect(addr string, streamHandler ...frisbee.NewStreamHandler) error { + return c.Client.Connect(addr, func (stream *frisbee.Stream) { }) +} + +func (c *Client) FromConn(conn net.Conn, streamHandler ...frisbee.NewStreamHandler) error { + return c.Client.FromConn(conn, func (stream *frisbee.Stream) { }) +} + {{template "clientmethods" .services }} type CloseError struct { @@ -85,26 +91,9 @@ func (e CloseError) Error() string { {{ range $i, $v := (MakeIterable $service.Methods.Len) -}} {{ $method := $service.Methods.Get $i -}} {{ $count := call $counter -}} + + {{ if not (or $method.IsStreamingServer $method.IsStreamingClient) -}} table[{{ $count }}] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) { - {{ if $method.IsStreamingServer -}} - res := New{{ CamelCase $method.Output.FullName }}() - err := res.Decode(*incoming.Content) - if err == nil { - c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }}Mu.RLock() - if stream, ok := c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }}[incoming.Metadata.Id]; ok { - c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }}Mu.RUnlock() - err := stream.received.Push(res) - if HasCloseFlag(res.flags) { - stream.close() - } - if err != nil { - return nil, 0 - } - } else { - c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }}Mu.RUnlock() - } - } - {{ else -}} c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }}Mu.RLock() if ch, ok := c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }}[incoming.Metadata.Id]; ok { c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }}Mu.RUnlock() @@ -114,9 +103,9 @@ func (e CloseError) Error() string { } else { c.{{ CamelCaseName $service.Name }}.inflight{{ CamelCaseName $method.Name }}Mu.RUnlock() } - {{ end -}} return } + {{ end -}} {{end -}} {{end -}} {{ end -}} @@ -130,169 +119,96 @@ func (e CloseError) Error() string { {{ $opIndex := call $counter -}} {{if or $method.IsStreamingClient $method.IsStreamingServer -}} func (c *sub{{ CamelCaseName $service.Name }}Client) {{ CamelCaseName $method.Name }}(ctx context.Context, req *{{ CamelCase $method.Input.FullName }}) (*{{ CamelCaseName $method.Name }}Client, error) { - p := packet.Get() - p.Metadata.Operation = {{ $opIndex }} + p := packet.Get() - c.next{{ CamelCaseName $method.Name }}Mu.Lock() - c.next{{ CamelCaseName $method.Name }} += 1 - id := c.next{{ CamelCaseName $method.Name }} - c.next{{ CamelCaseName $method.Name }}Mu.Unlock() - p.Metadata.Id = id + c.nextStreamingIDMu.Lock() + c.nextStreamingID += 1 + id := c.nextStreamingID + c.nextStreamingIDMu.Unlock() - req.Encode(p.Content) - p.Metadata.ContentLength = uint32(len(*p.Content)) - err := c.client.WritePacket(p) + open := &RPCStreamOpen{operation: {{ $opIndex}}}; - {{ if $method.IsStreamingServer -}} - q := queue.NewCircular[{{ CamelCase $method.Output.FullName }}, *{{ CamelCase $method.Output.FullName }}](100) - stale := make([]*{{ CamelCase $method.Output.FullName }}, 0) - staleMu := sync.Mutex{} - {{ end -}} + open.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) - if err != nil { - packet.Put(p) - return nil, err - } - packet.Put(p) + fStream := c.client.Stream(id) + fStream.WritePacket(p) - stream := {{ CamelCaseName $method.Name }}Client{ - context: ctx, - closed: atomic.NewBool(false), - {{ if $method.IsStreamingServer -}} - received: q, - {{ end -}} - } - {{ if $method.IsStreamingServer -}} - c.inflight{{ CamelCaseName $method.Name }}Mu.Lock() - c.inflight{{ CamelCaseName $method.Name }}[id] = &stream - c.inflight{{ CamelCaseName $method.Name }}Mu.Unlock() - - stream.recv = func() (*{{ CamelCase $method.Output.FullName }}, error) { - if stream.closed.Load() { - staleMu.Lock() - if len(stale) > 0 { - var r *{{ $method.Output.FullName }} - r, stale = stale[0], stale[1:] - staleMu.Unlock() - return r, nil - } - staleMu.Unlock() - return nil, io.EOF - } else if c.client.Closed() { - stream.close() + if req != nil { + p2 := packet.Get() + req.Encode(p2.Content) + p2.Metadata.ContentLength = uint32(len(*p2.Content)) + fStream.WritePacket(p2) } - readPacket, err := q.Pop() - if err != nil { - if stream.closed.Load() { - staleMu.Lock() - if len(stale) > 0 { - var r *{{ $method.Output.FullName }} - r, stale = stale[0], stale[1:] - staleMu.Unlock() - if errors.Is(r.error, io.EOF) { - return nil, io.EOF + stream := {{ CamelCaseName $method.Name }}Client{ + context: ctx, + stream: fStream, + closed: atomic.NewBool(false), } - return r, nil - } - staleMu.Unlock() - } - return nil, io.EOF - } - if errors.Is(readPacket.error, io.EOF) { - return nil, io.EOF - } - return readPacket, nil + + stream.recv = func () (*{{ CamelCase $method.Output.FullName }}, error) { + p, err := stream.stream.ReadPacket() + if err != nil { + return nil, err + } + + res := New{{ CamelCase $method.Output.FullName }}() + err = res.Decode(*p.Content) + if err != nil { + return nil, err + } + if errors.Is(res.error, io.EOF) { + return nil, io.EOF + } + + return res, nil } + stream.close = func () { - if stream.closed.Load() { - return - } - staleMu.Lock() - stream.closed.Store(true) - stale = q.Drain() - staleMu.Unlock() - q.Close() + stream.stream.Close() } - {{ else if $method.IsStreamingClient }} - ch := make(chan *{{ CamelCase $method.Output.FullName }}, 1) - c.inflight{{ CamelCaseName $method.Name }}Mu.Lock() - c.inflight{{ CamelCaseName $method.Name }}[id] = ch - c.inflight{{ CamelCaseName $method.Name }}Mu.Unlock() - - stream.recv = func () (res *{{ CamelCase $method.Output.FullName }}, err error) { - select { - case res = <- ch: - err = res.error - case <- ctx.Done(): - err = ctx.Err() - } - c.inflight{{ CamelCaseName $method.Name }}Mu.Lock() - delete(c.inflight{{ CamelCaseName $method.Name }}, id) - c.inflight{{ CamelCaseName $method.Name }}Mu.Unlock() - return - } - {{ end -}} - - {{ if $method.IsStreamingClient -}} stream.send = func (m *{{ CamelCase $method.Input.FullName }}) error { - p := packet.Get() - p.Metadata.Operation = {{ $opIndex }} + p := packet.Get() - p.Metadata.Id = id - - m.Encode(p.Content) - p.Metadata.ContentLength = uint32(len(*p.Content)) - err := c.client.WritePacket(p) - if err != nil { - packet.Put(p) - return err - } - packet.Put(p) - return nil + m.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + return stream.stream.WritePacket(p) } - {{ end -}} - go func () { - <-ctx.Done() - c.inflight{{ CamelCaseName $method.Name }}Mu.Lock() - delete(c.inflight{{ CamelCaseName $method.Name }}, id) - c.inflight{{ CamelCaseName $method.Name }}Mu.Unlock() - }() - return &stream, nil + return &stream, nil } type {{ CamelCaseName $method.Name }}Client struct { - context context.Context - recv func() (*{{ CamelCase $method.Output.FullName }}, error) - close func() - closed *atomic.Bool + context context.Context + recv func() (*{{ CamelCase $method.Output.FullName }}, error) + close func() + closed *atomic.Bool - {{ if $method.IsStreamingClient -}} + stream *frisbee.Stream send func (*{{ CamelCase $method.Input.FullName }}) error - {{ end -}} - received *queue.Circular[{{ CamelCase $method.Output.FullName }}, *{{ CamelCase $method.Output.FullName }}] } {{ if $method.IsStreamingServer -}} - func (x *{{ CamelCaseName $method.Name }}Client) Recv() (*{{ CamelCase $method.Output.FullName }}, error) { + func (x *{{ CamelCaseName $method.Name }}Client) Recv() (*{{ CamelCase $method.Output.FullName }}, error) { return x.recv() - } + } {{ end -}} {{ if $method.IsStreamingClient -}} func (x *{{ CamelCaseName $method.Name }}Client) Send(m *{{ CamelCase $method.Input.FullName }}) error { - return x.send(m) + return x.send(m) } func (x *{{ CamelCaseName $method.Name }}Client) CloseSend() error { - r := {{ CamelCase $method.Input.FullName }}{error: io.EOF, flags: SetCloseFlag(0, true) } - return x.Send(&r) + return x.send(&{{ CamelCase $method.Input.FullName }}{error: io.EOF}) } func (x *{{ CamelCaseName $method.Name }}Client) CloseAndRecv() (*{{ CamelCase $method.Output.FullName }}, error) { - r := {{ CamelCase $method.Input.FullName }}{error: io.EOF, flags: SetCloseFlag(0, true) } - x.Send(&r) - return x.recv() + err := x.send(&{{ CamelCase $method.Input.FullName }}{error: io.EOF}) + if err != nil { + return nil, err + } + return x.recv() } {{ end -}} {{else -}} diff --git a/templates/constants.templ b/templates/constants.templ index 9e14ff7..051778e 100644 --- a/templates/constants.templ +++ b/templates/constants.templ @@ -1,16 +1,10 @@ {{define "constants"}} const connectionContextKey int = 1000 - func SetCloseFlag(flags uint8, close bool) uint8 { - return flags | 0x1 - } - func HasCloseFlag(flags uint8) bool { - return flags & 1 == 1 - } func SetErrorFlag(flags uint8, error bool) uint8 { - return flags | 0x2 + return flags | 0x2 } func HasErrorFlag(flags uint8) bool { - return flags & (1 << 1) == 1 + return flags & (1 << 1) == 1 } {{end}} \ No newline at end of file diff --git a/templates/server.templ b/templates/server.templ index e0217ec..89ece7a 100644 --- a/templates/server.templ +++ b/templates/server.templ @@ -1,31 +1,37 @@ {{define "server"}} -type ServerMap[T any] struct { - servers map[uint16]*T - closed bool - mu sync.Mutex + +{{ if .numStreamMethods -}} +type RPCStreamOpen struct { + operation uint16 +} + +func (x *RPCStreamOpen) Error(b *polyglot.Buffer, err error) { + polyglot.Encoder(b).Error(err) +} + +func (x *RPCStreamOpen) Encode(b *polyglot.Buffer) { + polyglot.Encoder(b).Uint16(x.operation) } -func NewServerMap[T any]() *ServerMap[T] { - return &ServerMap[T]{servers: make(map[uint16]*T)} +func (x *RPCStreamOpen) Decode(b []byte) error { + if x == nil { + return NilDecode + } + d := polyglot.GetDecoder(b) + defer d.Return() + return x.decode(d) } +func (x *RPCStreamOpen) decode(d *polyglot.Decoder) error { + var err error + x.operation, err = d.Uint16() + return err +} +{{ end -}} + type Server struct { *frisbee.Server onClosed func(*frisbee.Async, error) - - {{ range $i, $v := (MakeIterable .services.Len) -}} - {{ $service := $.services.Get $i -}} - {{ range $i, $v := (MakeIterable $service.Methods.Len) -}} - {{ $method := $service.Methods.Get $i -}} - {{ if $method.IsStreamingClient}} - next{{ CamelCaseName $method.Name }} uint16 - next{{ CamelCaseName $method.Name }}Mu sync.RWMutex - - streams{{ CamelCaseName $method.Name }} map[string]*ServerMap[{{ CamelCaseName $method.Name }}Server] - streams{{ CamelCaseName $method.Name }}Mu sync.RWMutex - {{end -}} - {{end -}} - {{end -}} } func NewServer({{ GetServerFields .services }}, tlsConfig *tls.Config, logger *zerolog.Logger) (*Server, error) { @@ -46,6 +52,34 @@ func NewServer({{ GetServerFields .services }}, tlsConfig *tls.Config, logger *z } } + + fsrv.SetStreamHandler(func(conn *frisbee.Async, stream *frisbee.Stream) { + p, err := stream.ReadPacket() + if err != nil { + return + } + open := &RPCStreamOpen{} + err = open.Decode(*p.Content) + if err != nil { + stream.Close() + return + } + switch open.operation { + {{ $counter := Counter 9 -}} + {{ range $i, $v := (MakeIterable .services.Len) -}} + {{ $service := $.services.Get $i -}} + {{ range $i, $v := (MakeIterable $service.Methods.Len) -}} + {{ $method := $service.Methods.Get $i -}} + {{ $opIndex := call $counter -}} + {{ if or $method.IsStreamingClient $method.IsStreamingServer -}} + case {{ $opIndex }}: + s.create{{ CamelCaseName $method.Name }}Server({{ FirstLowerCase (CamelCaseName $service.Name) }}, stream) + {{end -}} + {{end -}} + {{end -}} + } + }) + fsrv.ConnContext = func (ctx context.Context, conn *frisbee.Async) context.Context { return context.WithValue(ctx, connectionContextKey, conn) } @@ -54,38 +88,10 @@ func NewServer({{ GetServerFields .services }}, tlsConfig *tls.Config, logger *z }, nil fsrv.SetOnClosed(func(async *frisbee.Async, err error) { - {{ range $i, $v := (MakeIterable .services.Len) -}} - {{ $service := $.services.Get $i -}} - {{ range $i, $v := (MakeIterable $service.Methods.Len) -}} - {{ $method := $service.Methods.Get $i -}} - {{ if $method.IsStreamingClient -}} - s.streams{{ CamelCaseName $method.Name }}Mu.RLock() - if streamMap, ok := s.streams{{ CamelCaseName $method.Name }}[async.RemoteAddr().String()]; ok { - s.streams{{ CamelCaseName $method.Name }}Mu.RUnlock() - for _, stream := range streamMap.servers { - stream.close() - } - return - } - s.streams{{ CamelCaseName $method.Name }}Mu.RUnlock() - {{end -}} - {{end -}} - {{end -}} if s.onClosed != nil { s.onClosed(async, err) } }) - - {{ range $i, $v := (MakeIterable .services.Len) -}} - {{ $service := $.services.Get $i -}} - {{ range $i, $v := (MakeIterable $service.Methods.Len) -}} - {{ $method := $service.Methods.Get $i -}} - {{ if $method.IsStreamingClient -}} - s.next{{ CamelCaseName $method.Name }} = 0 - s.streams{{ CamelCaseName $method.Name }} = make(map[string]*ServerMap[{{ CamelCaseName $method.Name }}Server]) - {{end -}} - {{end -}} - {{end -}} return s, err } @@ -109,51 +115,114 @@ func (s *Server) SetOnClosed(f func(*frisbee.Async, error)) error { {{ $opIndex := call $counter -}} {{if or $method.IsStreamingClient $method.IsStreamingServer -}} type {{ CamelCaseName $method.Name }}Server struct { - context context.Context - recv func() (*{{ CamelCase $method.Input.FullName }}, error) - send func (*{{ CamelCase $method.Output.FullName }}) error + recv func() (*{{ CamelCase $method.Input.FullName }}, error) + send func (*{{ CamelCase $method.Output.FullName }}) error - {{ if $method.IsStreamingClient -}} - received *queue.Circular[{{ CamelCase $method.Input.FullName }}, *{{ CamelCase $method.Input.FullName }}] - stale []*{{ CamelCase $method.Input.FullName }} - staleMu sync.Mutex - {{ end -}} - closed *atomic.Bool + stream *frisbee.Stream + closed *atomic.Bool } - func (x *{{ CamelCaseName $method.Name }}Server) Context() context.Context { - return x.context + func (s *Server) create{{ CamelCaseName $method.Name}}Server ({{ FirstLowerCase (CamelCaseName $service.Name) }} {{ CamelCaseName $service.Name }}, stream *frisbee.Stream) { + srv := &{{ CamelCaseName $method.Name }}Server{ + closed: atomic.NewBool(false), + stream: stream, + } + + {{ if $method.IsStreamingClient -}} + srv.recv = func() (*{{ CamelCase $method.Input.FullName }}, error) { + p, err := srv.stream.ReadPacket() + if err != nil { + return nil, err + } + + res := New{{ CamelCase $method.Input.FullName }}() + err = res.Decode(*p.Content) + if err != nil { + return nil, err + } + if errors.Is(res.error, io.EOF) { + return nil, io.EOF + } + + return res, nil + } + {{ end -}} + + srv.send = func (m *{{ CamelCase $method.Output.FullName }}) error { + p := packet.Get() + + m.Encode(p.Content) + p.Metadata.ContentLength = uint32(len(*p.Content)) + return srv.stream.WritePacket(p) + } + + {{ if not $method.IsStreamingClient -}} + incoming, err := stream.ReadPacket() + if err != nil { + return + } + req := New{{ CamelCase $method.Input.FullName }}() + err = req.Decode((*incoming.Content)[:incoming.Metadata.ContentLength]) + {{ end -}} + + go func() { + {{ if $method.IsStreamingClient -}} + err := {{ FirstLowerCaseName $service.Name }}.{{ CamelCaseName $method.Name }}(srv) + {{ else }} + err := {{ FirstLowerCaseName $service.Name }}.{{ CamelCaseName $method.Name }}(req, srv) + {{ end -}} + if err != nil { + res := {{ CamelCase $method.Output.FullName }}{error: err} + res.flags = SetErrorFlag(res.flags, true) + srv.CloseAndSend(&res) + } else { + srv.CloseSend() + } + }() + {{ if not (or $method.IsStreamingServer $method.IsStreamingClient) -}} + var res *{{ CamelCase $method.Output.FullName }} + outgoing = incoming + outgoing.Content.Reset() + res, err = {{ FirstLowerCase (CamelCaseName $service.Name) }}.{{ CamelCaseName $method.Name }}(ctx, req) + if err != nil { + if _, ok := err.(CloseError); ok { + action = frisbee.CLOSE + } + res.Error(outgoing.Content, err) + } else { + res.Encode(outgoing.Content) + } + outgoing.Metadata.ContentLength = uint32(len(*outgoing.Content)) + {{end -}} } {{ if $method.IsStreamingClient -}} func (x *{{ CamelCaseName $method.Name }}Server) Recv() (*{{ CamelCase $method.Input.FullName }}, error) { - return x.recv() + return x.recv() } func (x *{{ CamelCaseName $method.Name }}Server) close() { - x.staleMu.Lock() - x.closed.Store(true) - x.stale = x.received.Drain() - x.staleMu.Unlock() - x.received.Close() + x.stream.Close() } {{ end -}} {{ if $method.IsStreamingServer -}} func (x *{{ CamelCaseName $method.Name }}Server) Send(m *{{ CamelCase $method.Output.FullName }}) error { - return x.send(m) + return x.send(m) } {{ end -}} func (x *{{ CamelCaseName $method.Name }}Server) CloseSend() error { - r := {{ CamelCase $method.Output.FullName }}{error: io.EOF, flags: SetCloseFlag(0, true) } - return x.send(&r) + return x.send(&{{ CamelCase $method.Output.FullName }}{error: io.EOF}) } {{ if or $method.IsStreamingClient $method.IsStreamingServer -}} func (x *{{ CamelCaseName $method.Name }}Server) CloseAndSend(m *{{ CamelCase $method.Output.FullName }}) error { - m.flags = SetCloseFlag(m.flags, true) - return x.send(m) + err := x.send(m) + if err != nil { + return err + } + return x.CloseSend() } {{ end -}} {{end -}} @@ -168,145 +237,11 @@ func (s *Server) SetOnClosed(f func(*frisbee.Async, error)) error { {{ range $i, $v := (MakeIterable $service.Methods.Len) -}} {{ $method := $service.Methods.Get $i -}} {{ $count := call $counter -}} + {{ if not (or $method.IsStreamingServer $method.IsStreamingClient) -}} table[{{ $count }}] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) { req := New{{ CamelCase $method.Input.FullName }}() err := req.Decode((*incoming.Content)[:incoming.Metadata.ContentLength]) if err == nil { - {{ if or $method.IsStreamingClient $method.IsStreamingServer -}} - id := incoming.Metadata.Id - conn := ctx.Value(connectionContextKey).(*frisbee.Async) - - {{ if $method.IsStreamingClient -}} - s.streams{{ CamelCaseName $method.Name }}Mu.RLock() - if smap, ok := s.streams{{ CamelCaseName $method.Name }}[conn.RemoteAddr().String()]; ok { - s.streams{{ CamelCaseName $method.Name }}Mu.RUnlock() - smap.mu.Lock() - if srv, ok := smap.servers[id]; ok { - smap.mu.Unlock() - srv.received.Push(req) - if HasCloseFlag(req.flags) { - srv.close() - } - if _, ok := req.error.(CloseError); ok { - action = frisbee.CLOSE - } - return - } else { - smap.mu.Unlock() - } - } else { - s.streams{{ CamelCaseName $method.Name }}Mu.RUnlock() - } - q := queue.NewCircular[{{ CamelCase $method.Input.FullName }}, *{{ CamelCase $method.Input.FullName }}](100) - q.Push(req) - {{ end -}} - - srv := &{{ CamelCaseName $method.Name }}Server{ - context: ctx, - {{ if $method.IsStreamingClient -}} - received: q, - stale: make([]*{{ CamelCase $method.Input.FullName }}, 0), - {{ end -}} - closed: atomic.NewBool(false), - } - - {{ if $method.IsStreamingClient -}} - s.streams{{ CamelCaseName $method.Name }}Mu.Lock() - if serverMap, ok := s.streams{{ CamelCaseName $method.Name }}[conn.RemoteAddr().String()]; ok { - s.streams{{ CamelCaseName $method.Name }}Mu.Unlock() - serverMap.mu.Lock() - serverMap.servers[id] = srv - serverMap.mu.Unlock() - } else { - s.streams{{ CamelCaseName $method.Name }}Mu.Unlock() - serverMap = NewServerMap[{{ CamelCaseName $method.Name }}Server]() - serverMap.servers[id] = srv - s.streams{{ CamelCaseName $method.Name }}Mu.Lock() - s.streams{{ CamelCaseName $method.Name }}[conn.RemoteAddr().String()] = serverMap - s.streams{{ CamelCaseName $method.Name }}Mu.Unlock() - } - - srv.recv = func() (*{{ CamelCase $method.Input.FullName }}, error) { - if srv.closed.Load() { - srv.staleMu.Lock() - if len(srv.stale) > 0 { - var r *{{ CamelCase $method.Input.FullName }} - r, srv.stale = srv.stale[0], srv.stale[1:] - srv.staleMu.Unlock() - if errors.Is(r.error, io.EOF) { - return nil, io.EOF - } - return r, nil - } - srv.staleMu.Unlock() - return nil, io.EOF - } - - readPacket, err := srv.received.Pop() - if err != nil { - if srv.closed.Load() { - srv.staleMu.Lock() - if len(srv.stale) > 0 { - var r *{{ CamelCase $method.Input.FullName }} - r, srv.stale = srv.stale[0], srv.stale[1:] - srv.staleMu.Unlock() - return r, nil - } - srv.staleMu.Unlock() - } - return nil, io.EOF - } - if errors.Is(readPacket.error, io.EOF) { - return nil, io.EOF - } - return readPacket, nil - } - {{ end -}} - - srv.send = func (m *{{ CamelCase $method.Output.FullName }}) error { - p := packet.Get() - p.Metadata.Operation = {{ $count }} - - p.Metadata.Id = id - - m.Encode(p.Content) - p.Metadata.ContentLength = uint32(len(*p.Content)) - err := conn.WritePacket(p) - if err != nil { - packet.Put(p) - return err - } - packet.Put(p) - return nil - } - - go func() { - {{ if $method.IsStreamingClient -}} - err := {{ FirstLowerCaseName $service.Name }}.{{ CamelCaseName $method.Name }}(srv) - {{ else }} - err := {{ FirstLowerCaseName $service.Name }}.{{ CamelCaseName $method.Name }}(req, srv) - {{ end -}} - if err != nil { - res := {{ CamelCase $method.Output.FullName }}{error: err} - res.flags = SetErrorFlag(res.flags, true) - srv.CloseAndSend(&res) - } else { - srv.CloseSend() - } - {{ if $method.IsStreamingClient -}} - s.streams{{ CamelCaseName $method.Name }}Mu.RLock() - if smap, ok := s.streams{{ CamelCaseName $method.Name }}[conn.RemoteAddr().String()]; ok { - s.streams{{ CamelCaseName $method.Name }}Mu.RUnlock() - smap.mu.Lock() - delete(smap.servers, incoming.Metadata.Id) - smap.mu.Unlock() - } else { - s.streams{{ CamelCaseName $method.Name }}Mu.RUnlock() - } - {{ end -}} - }() - {{ end -}} - {{ if not (or $method.IsStreamingServer $method.IsStreamingClient) -}} var res *{{ CamelCase $method.Output.FullName }} outgoing = incoming outgoing.Content.Reset() @@ -320,10 +255,10 @@ func (s *Server) SetOnClosed(f func(*frisbee.Async, error)) error { res.Encode(outgoing.Content) } outgoing.Metadata.ContentLength = uint32(len(*outgoing.Content)) - {{end -}} } return } + {{end -}} {{end -}} {{end -}} {{end}} \ No newline at end of file