Skip to content

Commit

Permalink
feat(database): setup relations
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed May 20, 2024
1 parent 5e47911 commit 94bad05
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 61 deletions.
18 changes: 5 additions & 13 deletions packages/database/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ export enum SyncStatus {

type LocateResult = [Span, MessageLike]

interface CollectResult {
temp?: Universal.TwoWayList<Universal.Message>
span?: Span
}

export class SyncChannel {
public _spans: Span[] = []
public _query: { platform: string; 'channel.id': string }
Expand Down Expand Up @@ -129,7 +124,7 @@ export class SyncChannel {
return result
}

collect(result: Universal.TwoWayList<Universal.Message>, dir: Span.Direction, data: Message[], index: number): CollectResult {
collect(result: Universal.TwoWayList<Universal.Message>, dir: Span.Direction, data: Message[], index: number) {
const w = Span.words[dir]
for (let i = index + w.unit; i >= 0 && i < result.data.length; i += w.unit) {
const span = this._spans.find(span => span[w.back][1] === result.data[i].id)
Expand All @@ -138,11 +133,10 @@ export class SyncChannel {
if (data.length) {
span[w.temp] = { [w.next]: result[w.next], data }
}
return { span }
return span
}
data[w.push](Message.from(result.data[i], this.bot.platform, undefined, dir, data.at(w.last)?.sid))
}
return { temp: { data: [], [w.next]: result[w.next] } }
}

private async locate(id?: string, dir: Universal.Direction = 'before', limit?: number): Promise<LocateResult | undefined> {
Expand Down Expand Up @@ -186,8 +180,8 @@ export class SyncChannel {
index = -1
}

const { span: prev, temp: prevTemp } = this.collect(result, 'before', data, index)
const { span: next, temp: nextTemp } = this.collect(result, 'after', data, index)
const prev = this.collect(result, 'before', data, index)
const next = this.collect(result, 'after', data, index)

if (data.length || prev && next) {
span = this.insert(data, { prev, next })
Expand All @@ -198,8 +192,6 @@ export class SyncChannel {
return
}

span.prevTemp = prevTemp
span.nextTemp = nextTemp
if (dir === 'before') {
message = { sid: span.front[0] }
} else if (dir === 'after') {
Expand All @@ -222,7 +214,7 @@ export class SyncChannel {

let result = span[w.temp]
if (result) {
let i = dir === 'before' ? result.data.length - 1 : 0
let i = w.start(result.data.length)
for (; i >= 0 && i < result.data.length; i += w.unit) {
if (!data.some(item => item.id === result!.data[i].id)) break
}
Expand Down
165 changes: 125 additions & 40 deletions packages/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {} from 'minato'
import { Bot, Context, Dict, Schema, Service, Universal } from '@satorijs/core'
import { SyncChannel } from './channel'
import { SyncGuild } from './guild'
import { Login } from './types'

export * from './types'

Expand All @@ -15,6 +16,10 @@ declare module '@satorijs/core' {
interface Satori {
database: SatoriDatabase
}

interface Bot {
sync: Login['sync']
}
}

class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
Expand All @@ -25,27 +30,89 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {

stopped = false

private _botTasks = new WeakMap<Bot, Promise<void>>()

constructor(ctx: Context, public config: SatoriDatabase.Config) {
super(ctx, 'satori.database', true)

const self = this

// ctx.accessor('bot.getGuildList', {
// get: () => async function (this: Bot) {
// const data = await ctx.database.get('satori.guild', {
// logins: {
// $some: {
// platform: this.platform,
// 'user.id': this.user.id,
// },
// },
// })
// if (data.length) return data
// return data
// },
// })
ctx.accessor('bot.getGuildList', {
get: () => async function (this: Bot) {
if (this.sync.guildListAt >= this.sync.onlineAt) {
const data = await ctx.database.get('satori.guild', {
syncs: {
$some: {
login: {
platform: this.platform,
'user.id': this.user.id,
},
},
},
})
return { data }
}
const data: Universal.Guild[] = []
for await (const guild of this.self.getGuildIter()) {
data.push(guild)
}
await ctx.database.set('satori.login', {
platform: this.platform,
'user.id': this.user.id,
}, {
sync: {
guildListAt: this.timestamp,
},
guildSyncs: {
// ?
},
})
return { data }
},
})

ctx.accessor('bot.getChannelList', {
get: () => async function (this: Bot, guildId: string) {
// FIXME sync maybe undefined
const [sync] = await ctx.database.get('satori.guild.sync', {
login: {
platform: this.platform,
'user.id': this.user.id,
},
guild: {
id: guildId,
},
})
if (sync!.channelListAt >= this.sync.onlineAt) {
const data = await ctx.database.get('satori.channel', {
guild: {
id: guildId,
},
syncs: {
$some: {
login: {
platform: this.platform,
'user.id': this.user.id,
},
},
},
})
return { data }
}
const data: Universal.Channel[] = []
for await (const channel of this.self.getChannelIter(guildId)) {
data.push(channel)
}
await ctx.database.set('satori.guild.sync', {
login: {
platform: this.platform,
'user.id': this.user.id,
},
}, {
channelListAt: this.timestamp,
// ?
})
return { data }
},
})

ctx.accessor('bot.getMessageList', {
get: () => async function (this: Bot, channelId: string, id: string, dir?: Universal.Direction, limit?: number, order?: Universal.Order) {
Expand All @@ -70,6 +137,7 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
'flag': 'unsigned(1)',
'deleted': 'boolean',
'edited': 'boolean',
'syncAt': 'unsigned(8)',
}, {
primary: 'uid',
autoInc: true,
Expand All @@ -85,6 +153,7 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
'name': 'char(255)',
'nick': 'char(255)',
'avatar': 'char(255)',
'syncAt': 'unsigned(8)',
}, {
primary: ['id', 'platform'],
})
Expand All @@ -102,21 +171,46 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
'id': 'char(255)',
'platform': 'char(255)',
'name': 'char(255)',
'syncAt': 'unsigned(8)',
}, {
primary: ['id', 'platform'],
})

ctx.model.extend('satori.login', {
'platform': 'char(255)',
'user.id': 'char(255)',
'guilds': {
type: 'manyToMany',
table: 'satori.guild',
target: 'logins',
},
'sync.guildListAt': 'unsigned(8)',
}, {
primary: ['platform', 'user.id'],
})

ctx.model.extend('satori.guild.sync', {
'guild': {
type: 'manyToOne',
table: 'satori.guild',
target: 'syncs',
},
'login': {
type: 'manyToOne',
table: 'satori.login',
target: 'syncs',
},
'channelListAt': 'unsigned(8)',
'memberListAt': 'unsigned(8)',
})

ctx.model.extend('satori.channel.sync', {
'channel': {
type: 'manyToOne',
table: 'satori.channel',
target: 'syncs',
},
'login': {
type: 'manyToOne',
table: 'satori.login',
target: 'syncs',
},
})
}

async start() {
Expand Down Expand Up @@ -169,31 +263,22 @@ class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
if (bot.hidden) return
if (!await bot.supports('message.list') || !await bot.supports('guild.list')) return
if (bot.status !== Universal.Status.ONLINE) {
this._botTasks.delete(bot)
for (const channel of Object.values(this._channels)) {
if (channel.bot !== bot) continue
channel.hasLatest = false
}
const query = {
platform: bot.platform,
'user.id': bot.user.id,
}
const [login] = await this.ctx.database.get('satori.login', query)
bot.sync = login?.sync || { onlineAt: Date.now() }
await this.ctx.database.upsert('satori.login', [{
...query,
sync: bot.sync,
}])
return
}
this._botTasks.has(bot) || this._botTasks.set(bot, (async () => {
for await (const guild of bot.getGuildIter()) {
const key = bot.platform + '/' + guild.id
this._guilds[key] ||= new SyncGuild(bot, guild)
}
})())
// const tasks: Promise<any>[] = []
// for await (const guild of bot.getGuildIter()) {
// const key = bot.platform + '/' + guild.id
// this._guilds[key] ||= new SyncGuild(bot, guild)
// tasks.push((async () => {
// for await (const channel of bot.getChannelIter(guild.id)) {
// const key = bot.platform + '/' + guild.id + '/' + channel.id
// this._channels[key] ||= new SyncChannel(this.ctx, bot, guild.id, channel)
// }
// })())
// }
// await Promise.all(tasks)
}
}

Expand Down
11 changes: 5 additions & 6 deletions packages/database/src/span.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ export class Span {

async flush() {
if (this.type !== Span.Type.LOCAL) throw new Error('expect local span')
console.log('flush', !!this.prev, !!this.prevTemp, !!this.next, !!this.nextTemp)
if (!this.prev && this.prevTemp) return
if (!this.next && this.nextTemp) return
console.log('flush', !!this.prevTemp, !!this.nextTemp)
if (this.prevTemp || this.nextTemp) return
await Promise.all([this.prev?.syncTask, this.next?.syncTask])
if (!this.channel._spans.includes(this)) return
return this.syncTask ||= this.sync()
Expand Down Expand Up @@ -73,7 +72,6 @@ export class Span {
} else {
(data.at(0)!.flag as number) |= Message.Flag.BACK
}
console.log(data.at(0), data.at(-1))
return data
}, ['sid', 'channel.id', 'platform'])
this.type = Span.Type.REMOTE
Expand Down Expand Up @@ -118,13 +116,12 @@ export class Span {
console.log('raw:', result.data.length)
}
const data: Message[] = []
const { span, temp } = this.channel.collect(result, dir, data, dir === 'after' ? -1 : result.data.length)
const span = this.channel.collect(result, dir, data, w.start(result.data.length) - w.unit)
if (!span && dir === 'before' && !result[w.next]) this.channel.hasEarliest = true
if (data.length || span) {
return this.channel.insert(data, {
[w.prev]: this,
[w.next]: span,
[w.temp]: temp,
})
}
}
Expand Down Expand Up @@ -162,6 +159,7 @@ export namespace Span {
unit: -1,
last: 0,
slice: <T>(arr: T[], index: number) => arr.slice(0, index + 1),
start: (length: number) => length - 1,
},
after: {
prev: 'prev',
Expand All @@ -179,6 +177,7 @@ export namespace Span {
unit: 1,
last: -1,
slice: <T>(arr: T[], index: number) => arr.slice(index),
start: () => 0,
},
} as const
}
Loading

0 comments on commit 94bad05

Please sign in to comment.