Skip to content

Commit

Permalink
revamp listener (#1525)
Browse files Browse the repository at this point in the history
* migration files

* go files for blocks dir

* write queries

* implement observed-block controller and route

* implement unprocessed block apis

* integrate go apis to listener api.ts

* implement get observed block api and integrate

* change insertUnprocessedBlock api to batch insert array of blocks

* revamp listener.ts

* revamp state.ts

* rearrange execution sequence in state.add()

* generalize block payload validation

* define separate logPrefix var in latestJob

* refactor latestJob creating unprocessed blocks logic

* update upsert observed_block query to disallow decreasing observedBlock value

* return void in delete api

* fix logPrefix on exception in latestJob

* fallthrough clear option in state.add()

* throw error if processing event fails

* move checking observedBlock existence in db to controller

* replace template literal with simple string
  • Loading branch information
Intizar-T authored May 28, 2024
1 parent d45d6e8 commit 7676923
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 89 deletions.
117 changes: 117 additions & 0 deletions api/blocks/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package blocks

import (
"bisonai.com/orakl/api/utils"
"github.com/go-playground/validator/v10"
"github.com/gofiber/fiber/v2"
)

type BlockModel struct {
Service string `db:"service" json:"service" validate:"required"`
BlockNumber int64 `db:"block_number" json:"blockNumber" validate:"isZeroOrPositive"`
}

type BlocksModel struct {
Service string `db:"service" json:"service" validate:"required"`
Blocks []int64 `db:"blocks" json:"blocks" validate:"dive,isZeroOrPositive"`
}

var validate *validator.Validate

func init() {
validate = validator.New()
validate.RegisterValidation("isZeroOrPositive", func(fl validator.FieldLevel) bool {

Check failure on line 23 in api/blocks/controller.go

View workflow job for this annotation

GitHub Actions / Build

Error return value of `validate.RegisterValidation` is not checked (errcheck)
return fl.Field().Int() >= 0
})
}

func validateBlockPayload(payload interface{}) error {
return validate.Struct(payload)
}

func getObservedBlock(c *fiber.Ctx) error {
service := c.Query("service")
if service == "" {
return fiber.NewError(fiber.StatusBadRequest, "service is required")
}
result, err := utils.QueryRow[BlockModel](c, GetObservedBlock, map[string]any{
"service": service,
})
if err != nil {
return err
}
if result.Service == "" {
return c.JSON(nil)
}

return c.JSON(result)
}

func upsertObservedBlock(c *fiber.Ctx) error {
payload := new(BlockModel)
if err := c.BodyParser(payload); err != nil {
return err
}

if err := validateBlockPayload(payload); err != nil {
return err
}

result, err := utils.QueryRow[BlockModel](c, UpsertObservedBlock, map[string]any{
"service": payload.Service,
"block_number": payload.BlockNumber,
})
if err != nil {
return err
}

return c.JSON(result)
}

func getUnprocessedBlocks(c *fiber.Ctx) error {
service := c.Query("service")
if service == "" {
return fiber.NewError(fiber.StatusBadRequest, "service is required")
}
result, err := utils.QueryRows[BlockModel](c, GetUnprocessedBlocks, map[string]any{
"service": service,
})
if err != nil {
return err
}

return c.JSON(result)
}

func insertUnprocessedBlocks(c *fiber.Ctx) error {
payload := new(BlocksModel)
if err := c.BodyParser(payload); err != nil {
return err
}

if err := validateBlockPayload(payload); err != nil {
return err
}

result, err := utils.QueryRows[BlocksModel](c, GenerateInsertBlocksQuery(payload.Blocks, payload.Service), map[string]any{})
if err != nil {
return err
}

return c.JSON(result)
}

func deleteUnprocessedBlock(c *fiber.Ctx) error {
service := c.Params("service")
blockNumber := c.Params("blockNumber")

result, err := utils.QueryRow[BlockModel](c, DeleteUnprocessedBlock, map[string]any{
"service": service,
"block_number": blockNumber,
})
if err != nil {
return err
}

return c.JSON(result)
}
51 changes: 51 additions & 0 deletions api/blocks/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package blocks

import (
"fmt"
"strings"
)

const (
// get observedBlock given service
GetObservedBlock = `
SELECT * FROM observed_blocks
WHERE service = @service;
`

// upsert to observed_blocks given service and block_number
UpsertObservedBlock = `
INSERT INTO observed_blocks (service, block_number)
VALUES (@service, @block_number)
ON CONFLICT (service) DO UPDATE SET block_number = GREATEST(observed_blocks.block_number, EXCLUDED.block_number)
RETURNING *;
`

// get all unprocessed blocks given service
GetUnprocessedBlocks = `
SELECT * FROM unprocessed_blocks
WHERE service = @service;
`

// delete unprocessed block given service and block_number
DeleteUnprocessedBlock = `
DELETE FROM unprocessed_blocks
WHERE service = @service AND block_number = @block_number
RETURNING *;
`
)

func GenerateInsertBlocksQuery(blocks []int64, service string) string {
baseQuery := `
INSERT INTO unprocessed_blocks (service, block_number)
VALUES
`
onConflict := `
ON CONFLICT (service, block_number) DO NOTHING;
`
values := make([]string, 0, len(blocks))
for _, block := range blocks {
values = append(values, fmt.Sprintf("('%s', %d)", service, block))
}

return baseQuery + strings.Join(values, ",") + onConflict
}
15 changes: 15 additions & 0 deletions api/blocks/route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package blocks

import (
"github.com/gofiber/fiber/v2"
)

func Routes(router fiber.Router) {
blocks := router.Group("/blocks")

blocks.Get("/observed", getObservedBlock)
blocks.Post("/observed", upsertObservedBlock)
blocks.Post("/unprocessed", insertUnprocessedBlocks)
blocks.Get("/unprocessed", getUnprocessedBlocks)
blocks.Delete("/unprocessed/:service/:blockNumber", deleteUnprocessedBlock)
}
2 changes: 2 additions & 0 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bisonai.com/orakl/api/aggregate"
"bisonai.com/orakl/api/aggregator"
"bisonai.com/orakl/api/apierr"
"bisonai.com/orakl/api/blocks"
"bisonai.com/orakl/api/chain"
"bisonai.com/orakl/api/data"
"bisonai.com/orakl/api/feed"
Expand Down Expand Up @@ -80,4 +81,5 @@ func SetRouter(_router fiber.Router) {
reporter.Routes(_router)
service.Routes(_router)
vrf.Routes(_router)
blocks.Routes(_router)
}
2 changes: 2 additions & 0 deletions api/migrations/000002_blocks.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS "observed_blocks";
DROP TABLE IF EXISTS "unprocessed_blocks";
10 changes: 10 additions & 0 deletions api/migrations/000002_blocks.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS "observed_blocks" (
service TEXT NOT NULL UNIQUE,
block_number BIGINT NOT NULL
);

CREATE TABLE IF NOT EXISTS "unprocessed_blocks" (
service TEXT NOT NULL,
block_number BIGINT NOT NULL,
UNIQUE (service, block_number)
);
7 changes: 6 additions & 1 deletion core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,10 @@ export enum OraklErrorCode {
AxiosCanceledByUser,
AxiosNotSupported,
AxiosInvalidUrl,
FailedToConnectAPI
FailedToConnectAPI,
FailedToGetUnprocessedBlocks,
FailedToGetObservedBlock,
FailedUpsertObservedBlock,
FailedInsertUnprocessedBlock,
FailedDeleteUnprocessedBlock
}
92 changes: 92 additions & 0 deletions core/src/listener/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { OraklError, OraklErrorCode } from '../errors'
import { ORAKL_NETWORK_API_URL } from '../settings'
import { IListenerRawConfig } from '../types'
import { buildUrl } from '../utils'
import { IBlock } from './types'

const FILE_NAME = import.meta.url

Expand Down Expand Up @@ -58,3 +59,94 @@ export async function getListener({
throw new OraklError(OraklErrorCode.GetListenerRequestFailed)
}
}

/**
* @param {string} service
* @return {Promise<IBlock>}
* @exception {FailedToGetObservedBlock}
*/
export async function getObservedBlock({ service }: { service: string }): Promise<IBlock | null> {
try {
const endpoint = buildUrl(ORAKL_NETWORK_API_URL, `blocks/observed?service=${service}`)
return (await axios.get(endpoint))?.data
} catch (e) {
throw new OraklError(OraklErrorCode.FailedToGetObservedBlock)
}
}

/**
* @param {string} service
* @return {Promise<IBlocks[]>}
* @exception {FailedToGetUnprocessedBlocks}
*/
export async function getUnprocessedBlocks({ service }: { service: string }): Promise<IBlock[]> {
try {
const endpoint = buildUrl(ORAKL_NETWORK_API_URL, `blocks/unprocessed?service=${service}`)
return (await axios.get(endpoint))?.data
} catch (e) {
throw new OraklError(OraklErrorCode.FailedToGetUnprocessedBlocks)
}
}

/**
* @param {string} service
* @param {number} blockNumber
* @return {Promise<void>}
* @exception {FailedInsertUnprocessedBlock}
*/
export async function insertUnprocessedBlocks({
blocks,
service
}: {
service: string
blocks: number[]
}): Promise<void> {
try {
const endpoint = buildUrl(ORAKL_NETWORK_API_URL, 'blocks/unprocessed')
await axios.post(endpoint, { service, blocks })
} catch (e) {
throw new OraklError(OraklErrorCode.FailedInsertUnprocessedBlock)
}
}

/**
* @param {string} service
* @param {number} blockNumber
* @return {Promise<void>}
* @exception {FailedDeleteUnprocessedBlock}
*/
export async function deleteUnprocessedBlock({
blockNumber,
service
}: {
blockNumber: number
service: string
}): Promise<void> {
try {
const endpoint = buildUrl(ORAKL_NETWORK_API_URL, `blocks/unprocessed/${service}/${blockNumber}`)
await axios.delete(endpoint)
} catch (e) {
throw new OraklError(OraklErrorCode.FailedDeleteUnprocessedBlock)
}
}

/**
* @param {string} service
* @param {number} blockNumber
* @return {Promise<IBlock>}
* @exception {FailedUpsertObservedBlock}
*/
export async function upsertObservedBlock({
blockNumber,
service
}: {
service: string
blockNumber: number
}): Promise<IBlock> {
try {
const endpoint = buildUrl(ORAKL_NETWORK_API_URL, 'blocks/observed')
return (await axios.post(endpoint, { service, blockNumber }))?.data
} catch (e) {
throw new OraklError(OraklErrorCode.FailedUpsertObservedBlock)
}
}
Loading

0 comments on commit 7676923

Please sign in to comment.