Skip to content

Commit

Permalink
feat: socket updates + virtual states
Browse files Browse the repository at this point in the history
  • Loading branch information
amalcaraz committed Dec 11, 2023
1 parent 2bda710 commit 7a94094
Show file tree
Hide file tree
Showing 13 changed files with 561 additions and 235 deletions.
2 changes: 1 addition & 1 deletion src/components/common/NodeDetailHeader/styles.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export const StyledContainer = styled.div<StyledContainerProps>`
: `linear-gradient(90deg, ${c1}cc 0%, ${c2}cc 100%)`
return css`
${tw`relative flex flex-col h-[12.5rem] justify-end overflow-hidden`}
${tw`relative flex flex-col h-[12.5rem] justify-end overflow-hidden z-0`}
background-image: ${bg};
background-position: center;
background-size: cover;
Expand Down
5 changes: 5 additions & 0 deletions src/components/common/Viewport/cmp.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import { memo } from 'react'
import { useRequestCCNs } from '@/hooks/common/useRequestEntity/useRequestCCNs'
import { useRequestCRNs } from '@/hooks/common/useRequestEntity/useRequestCRNs'
import { useRequestRewards } from '@/hooks/common/useRequestEntity/useRequestRewards'
import { useRequestCCNsFeed } from '@/hooks/common/useRequestEntity/useRequestCCNsFeed'
import { useRequestRewardsFeed } from '@/hooks/common/useRequestEntity/useRequestRewardsFeed'

export const Viewport = memo(({ children }: ViewportProps) => {
useRequestCCNs({})
useRequestCRNs({})
useRequestRewards({})

useRequestCCNsFeed()
useRequestRewardsFeed()

return <StyledViewport>{children}</StyledViewport>
})
Viewport.displayName = 'Viewport'
Expand Down
62 changes: 56 additions & 6 deletions src/domain/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import {
apiServer,
channel,
defaultAccountChannel,
monitorAddress,
postType,
scoringAddress,
tags,
wsServer,
} from '@/helpers/constants'
import { Account } from 'aleph-sdk-ts/dist/accounts/account'
import { messages } from 'aleph-sdk-ts'
Expand All @@ -13,14 +15,15 @@ import {
getLatestReleases,
stripExtraTagDescription,
} from '@/helpers/utils'
import { ItemType } from 'aleph-sdk-ts/dist/messages/types'
import { AggregateMessage, ItemType } from 'aleph-sdk-ts/dist/messages/types'
import {
newCCNSchema,
newCRNSchema,
updateCCNSchema,
updateCRNSchema,
} from '@/helpers/schemas'
import { FileManager } from './file'
import { subscribeSocketFeed } from '@/helpers/socket'

const { post } = messages

Expand Down Expand Up @@ -53,6 +56,8 @@ export type BaseNode = {

// --------- CCN fields ?
registration_url?: string

virtual?: number
}

export type CCN = BaseNode & {
Expand Down Expand Up @@ -182,6 +187,8 @@ export type UpdateCRN = BaseUpdateNode & {

export type UpdateAlephNode = UpdateCCN | UpdateCRN

export type NodesResponse = { ccns: CCN[]; crns: CRN[]; timestamp: number }

export class NodeManager {
static newCCNSchema = newCCNSchema
static newCRNSchema = newCRNSchema
Expand Down Expand Up @@ -218,8 +225,11 @@ export class NodeManager {
return crns
}

async getAllNodes(): Promise<{ ccns: CCN[]; crns: CRN[] }> {
let { ccns, crns } = await this.fetchAllNodes()
async getAllNodes(): Promise<NodesResponse> {
const response = await this.fetchAllNodes()

const { timestamp } = response
let { ccns, crns } = response

ccns = this.parseResourceNodes(ccns, crns)
ccns = await this.parseScores(ccns, false)
Expand All @@ -229,7 +239,45 @@ export class NodeManager {
crns = await this.parseScores(crns, true)
crns = await this.parseMetrics(crns, true)

return { ccns, crns }
return { ccns, crns, timestamp }
}

async *subscribeNodesFeed(
abort: Promise<void>,
): AsyncGenerator<NodesResponse> {
const feed = subscribeSocketFeed<AggregateMessage<any>>(
`${wsServer}/api/ws0/messages?msgType=AGGREGATE&history=1&addresses=${monitorAddress}`,
abort,
)

for await (const data of feed) {
if (!data.content) return
if (!data.content.content) return

const { content, address, key, time } = data.content || {}
const { nodes, resource_nodes } = content

if (
address === monitorAddress &&
key === 'corechannel' &&
(nodes !== undefined || resource_nodes !== undefined)
) {
let crns: CRN[] = resource_nodes
let ccns: CCN[] = nodes

ccns = this.parseResourceNodes(ccns, crns)
ccns = await this.parseScores(ccns, false)
ccns = await this.parseMetrics(ccns, false)

crns = this.parseParentNodes(crns, ccns)
crns = await this.parseScores(crns, true)
crns = await this.parseMetrics(crns, true)

const timestamp = Math.trunc(time * 1000)

yield { ccns, crns, timestamp }
}
}
}

async getLatestVersion(node: AlephNode): Promise<NodeLastVersions> {
Expand Down Expand Up @@ -373,7 +421,7 @@ export class NodeManager {
})
}

protected async fetchAllNodes(): Promise<{ ccns: CCN[]; crns: CRN[] }> {
protected async fetchAllNodes(): Promise<NodesResponse> {
return fetchAndCache(
`${apiServer}/api/v0/aggregates/0xa1B3bb7d2332383D96b7796B908fB7f7F3c2Be10.json?keys=corechannel&limit=100`,
'nodes',
Expand All @@ -383,7 +431,9 @@ export class NodeManager {
const crns: CRN[] = content?.data?.corechannel?.resource_nodes
const ccns: CCN[] = content?.data?.corechannel?.nodes

return { ccns, crns }
const timestamp = 0

return { ccns, crns, timestamp }
},
)
}
Expand Down
106 changes: 16 additions & 90 deletions src/domain/stake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import {
wsServer,
} from '@/helpers/constants'
import { AlephNode, CCN, CRN } from './node'
import { Future, normalizeValue } from '@/helpers/utils'
import { normalizeValue } from '@/helpers/utils'
import { post } from 'aleph-sdk-ts/dist/messages'
import { ItemType } from 'aleph-sdk-ts/dist/messages/types'
import { ItemType, PostMessage } from 'aleph-sdk-ts/dist/messages/types'
import { subscribeSocketFeed } from '@/helpers/socket'

export type RewardsResponse = {
type: 'calculation' | 'distribution'
Expand Down Expand Up @@ -69,109 +70,34 @@ export class StakeManager {
}
}

async *subscribeRewardsFeed(): AsyncGenerator<RewardsResponse> {
let socket: WebSocket | undefined

const values: RewardsResponse[] = []
const futures: Future<RewardsResponse>[] = []

function deliver() {
while (true) {
if (values.length === 0 || futures.length === 0) return

const nextValue = values.shift() as RewardsResponse
const nextFuture = futures.shift() as Future<RewardsResponse>

nextFuture?.resolve(nextValue)
}
}

const connect = () => {
socket = new WebSocket(
// `${wsServer}/api/ws0/messages?msgType=POST&history=1&contentTypes=staking-rewards-distribution&addresses=${senderAddress},${monitorAddress}`,
`${wsServer}/api/ws0/messages?msgType=POST&history=1&contentTypes=staking-rewards-distribution&addresses=${senderAddress}`,
)

socket.addEventListener('message', handleMessage)
socket.addEventListener('close', handleClose)
socket.addEventListener('error', handleError)

console.log('Oppening Socket', socket.readyState)
}

const close = (e?: CloseEvent, reconnect = true) => {
const ws = socket

socket?.removeEventListener('message', handleMessage)
socket?.removeEventListener('close', handleClose)
socket?.removeEventListener('error', handleError)

socket?.close()
socket = undefined

console.log('Closing Socket', e?.reason, ws?.readyState)

if (reconnect) {
console.log('Reconnecting Socket in 1 second')
setTimeout(connect, 1000)
}
}

const push = (value: RewardsResponse) => {
values.push(value)
deliver()
}

const handleMessage = (event: MessageEvent) => {
const data = JSON.parse(event.data)
async *subscribeRewardsFeed(
abort: Promise<void>,
): AsyncGenerator<RewardsResponse> {
const feed = subscribeSocketFeed<PostMessage<any>>(
// `${wsServer}/api/ws0/messages?msgType=POST&history=1&contentTypes=staking-rewards-distribution&addresses=${senderAddress},${monitorAddress}`,
`${wsServer}/api/ws0/messages?msgType=POST&history=1&contentTypes=staking-rewards-distribution&addresses=${senderAddress}`,
abort,
)

for await (const data of feed) {
if (!data.content) return
if (!data.content.content) return

const { content, time } = data.content
const { content, time } = data.content || {}
const { status: type, rewards, end_height: lastHeight } = content

if (
type === 'calculation' ||
(type === 'distribution' &&
data.content.content.targets.every(({ success }: any) => success))
data.content.content.targets.some(({ success }: any) => success))
) {
push({
yield {
type,
rewards,
lastHeight,
timestamp: Math.trunc(time * 1000),
})
}
}

const handleClose = (e: CloseEvent) => {
close(e, true)
}

const handleError = (err: any) => {
console.error(
'Socket encountered error: ',
err?.message,
'Closing socket',
)
close(undefined, false)
}

connect()

try {
while (true) {
const future = new Future<RewardsResponse>()
futures.push(future)

deliver()

yield await future.promise
}
}
} finally {
// @note. close socket on desubs
close()
}
}

Expand Down
Loading

0 comments on commit 7a94094

Please sign in to comment.