-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathMigrationsProvider.tsx
182 lines (164 loc) · 6.02 KB
/
MigrationsProvider.tsx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
'use client'
import React, { createContext, useContext, ReactNode, useState, useEffect } from 'react'
import { Migration, MigrationConfiguration, MigrationID, Progress } from '@/lib/migrations/api'
import { useW3 } from '@w3ui/react'
import * as Migrations from '@/lib/migrations'
import { MigrationsStorage } from '@/lib/migrations/store'
import { serviceConnection } from './services'
import { logAndCaptureError } from '@/sentry'
const MAX_LOG_LINES = 1000
export interface ContextState {
migrations: Migration[]
logs: Record<MigrationID, string[]>
}
export interface ContextActions {
createMigration: (config: MigrationConfiguration) => ({ id: MigrationID })
removeMigration: (id: MigrationID) => void
startMigration: (id: MigrationID) => void
stopMigration: (id: MigrationID) => void
}
export type ContextValue = [
state: ContextState,
actions: ContextActions
]
export const MigrationContextDefaultValue: ContextValue = [
{
migrations: [],
logs: {}
},
{
createMigration: () => { throw new Error('missing provider') },
removeMigration: () => { throw new Error('missing provider') },
startMigration: () => { throw new Error('missing provider') },
stopMigration: () => { throw new Error('missing provider') }
}
]
export const Context = createContext<ContextValue>(
MigrationContextDefaultValue
)
export interface ProviderProps {
children?: ReactNode
}
const runningMigrations: Record<string, AbortController> = {}
const migrationsStore = new MigrationsStorage()
export function Provider ({ children }: ProviderProps): ReactNode {
const [{ client }] = useW3()
const [migrations, setMigrations] = useState(migrationsStore.load())
const [logs, setLogs] = useState<Record<MigrationID, string[]>>({})
const log = (id: MigrationID, msg: string) => {
setLogs(logs => ({ ...logs, [id]: [...(logs[id] ?? []), msg].slice(-MAX_LOG_LINES) }))
}
const createMigration = (config: MigrationConfiguration) => {
const { id } = migrationsStore.create(config)
log(id, 'created migration')
setMigrations(() => migrationsStore.load())
return { id }
}
const removeMigration = (id: MigrationID) => {
log(id, 'removing migration')
stopMigration(id)
migrationsStore.delete(id)
setMigrations(() => migrationsStore.load())
}
const startMigration = (id: MigrationID) => {
const migration = migrationsStore.read(id)
if (runningMigrations[id]) return console.warn(`already started: ${id}`)
if (!client) return console.warn('missing client')
log(id, migration.progress?.pending ? 'resuming migration...' : 'starting migration...')
const controller = new AbortController()
runningMigrations[id] = controller
const uploads = Migrations.createReader(migration.source, {
token: migration.token,
cursor: migration.progress?.cursor
})
const initProgress = async () => ({
pending: await uploads.count(),
succeeded: 0,
failed: []
}) as Progress
Migrations.migrate({
signal: controller.signal,
uploads,
issuer: client.agent.issuer,
space: migration.space,
proofs: client.proofs([
{ can: 'store/add', with: migration.space },
{ can: 'upload/add', with: migration.space }
]) ?? [],
// @ts-expect-error
connection: serviceConnection,
onStoreAdd: async (upload, shard) => {
log(id, `stored shard: ${shard.link}`)
const migration = migrationsStore.read(id)
migration.progress = migration.progress ?? await initProgress()
migration.progress = {
...migration.progress,
current: upload.root
}
migrationsStore.update(migration)
setMigrations(() => migrationsStore.load())
},
onUploadAdd: async (upload) => {
log(id, `upload migrated: ${upload.root}`)
const migration = migrationsStore.read(id)
migration.progress = migration.progress ?? await initProgress()
migration.progress = {
...migration.progress,
cursor: upload.root.toString(),
pending: migration.progress.pending - 1,
succeeded: migration.progress.succeeded + 1
}
migrationsStore.update(migration)
setMigrations(() => migrationsStore.load())
},
onError: async (err, upload, shard) => {
logAndCaptureError(err)
log(id, `failed migration ${upload.root}${shard ? ` (shard: ${shard.link})` : ''}: ${err.stack}`)
const migration = migrationsStore.read(id)
migration.progress = migration.progress ?? await initProgress()
migration.progress.failed.push(upload.root)
migration.progress = {
...migration.progress,
cursor: upload.root.toString(),
pending: migration.progress.pending - 1
}
migrationsStore.update(migration)
setMigrations(() => migrationsStore.load())
},
onComplete: async () => {
log(id, 'finished migration')
const migration = migrationsStore.read(id)
// there will be no progress if there are 0 items to migrate
if (!migration.progress) {
migration.progress = migration.progress ?? await initProgress()
migrationsStore.update(migration)
setMigrations(() => migrationsStore.load())
}
}
})
}
const stopMigration = (id: MigrationID) => {
log(id, 'stopping migration')
if (!runningMigrations[id]) throw new Error('missing migration controller')
runningMigrations[id].abort()
delete runningMigrations[id]
}
// resume existing migrations
useEffect(() => {
if (!client) return
for (const migration of migrations) {
if (migration.progress && migration.progress.pending <= 0) {
continue // do not resume if completed :)
}
startMigration(migration.id)
}
}, [client])
return (
<Context.Provider value={[{ migrations, logs }, { createMigration, removeMigration, startMigration, stopMigration }]}>
{children}
</Context.Provider>
)
}
export function useMigrations (): ContextValue {
return useContext(Context)
}