Skip to content

Commit

Permalink
Merge pull request #116 from mbret/develop
Browse files Browse the repository at this point in the history
release
  • Loading branch information
mbret authored Mar 17, 2024
2 parents 12b9c28 + 2328a04 commit 5f3becf
Show file tree
Hide file tree
Showing 18 changed files with 287 additions and 184 deletions.
2 changes: 1 addition & 1 deletion packages/api/serverless.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ const serverlessConfiguration: AWS & any = {
// this is needed to read from ssm and retrieve secrets
{
Effect: `Allow`,
Action: [`ssm:GetParameter`],
Action: [`ssm:GetParameter`, `ssm:GetParameters`],
Resource: [
{
"Fn::Join": [
Expand Down
24 changes: 20 additions & 4 deletions packages/api/src/functions/refreshMetadata/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import schema from "./schema"
import { InvokeCommand } from "@aws-sdk/client-lambda"
import { STAGE } from "src/constants"
import { lock } from "@libs/supabase/lock"
import { Logger } from "@libs/logger"

const LOCK_MAX_DURATION_MN = 5

const logger = Logger.child({ module: "handler" })

const lambda: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (
event
) => {
Expand All @@ -25,12 +28,25 @@ const lambda: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (
})
})

const lockId = `metadata_${event.body.bookId}`
logger.info(`invoke for ${event.body.bookId}`)

try {
const lockId = `metadata_${event.body.bookId}`

const { alreadyLocked } = await lock(lockId, LOCK_MAX_DURATION_MN)

if (!alreadyLocked) {
const response = await client.send(command)

const { alreadyLocked } = await lock(lockId, LOCK_MAX_DURATION_MN)
logger.info(`${event.body.bookId}: command sent with success ${response.$metadata.requestId}`)
logger.info(response)
} else {
logger.info(`${event.body.bookId} is already locked, ignoring!`)
}
} catch (error) {
logger.error(error)

if (!alreadyLocked) {
await client.send(command)
throw error
}

return {
Expand Down
26 changes: 22 additions & 4 deletions packages/api/src/functions/refreshMetadataCollection/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { InvokeCommand } from "@aws-sdk/client-lambda"
import { STAGE } from "src/constants"
import { COLLECTION_METADATA_LOCK_MN } from "@oboku/shared"
import { lock } from "@libs/supabase/lock"
import { Logger } from "@libs/logger"

const logger = Logger.child({ module: "handler" })

const lambda: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (
event
Expand All @@ -25,12 +28,27 @@ const lambda: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (
})
})

const lockId = `metadata-collection_${event.body.collectionId}`
logger.info(`invoke for ${event.body.collectionId}`)

try {
const lockId = `metadata-collection_${event.body.collectionId}`

const { alreadyLocked } = await lock(lockId, COLLECTION_METADATA_LOCK_MN)

if (!alreadyLocked) {
const response = await client.send(command)

const { alreadyLocked } = await lock(lockId, COLLECTION_METADATA_LOCK_MN)
logger.info(
`${event.body.collectionId}: command sent with success ${response.$metadata.requestId}`
)
logger.info(response)
} else {
logger.info(`${event.body.collectionId} is already locked, ignoring!`)
}
} catch (error) {
logger.error(error)

if (!alreadyLocked) {
await client.send(command)
throw error
}

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { withToken } from "@libs/auth"
import { configure as configureGoogleDataSource } from "@libs/plugins/google"
import schema from "./schema"
import { findOne, getNanoDbForUser } from "@libs/couch/dbHelpers"
import { getParameterValue } from "@libs/ssm"
import { getParametersValue } from "@libs/ssm"
import { deleteLock } from "@libs/supabase/deleteLock"
import { supabase } from "@libs/supabase/client"
import { Logger } from "@libs/logger"
Expand All @@ -13,34 +13,41 @@ import { refreshMetadata } from "@libs/collections/refreshMetadata"
const lambda: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (
event
) => {
configureGoogleDataSource({
client_id:
(await getParameterValue({
Name: `GOOGLE_CLIENT_ID`,
WithDecryption: true
})) ?? ``,
client_secret:
(await getParameterValue({
Name: `GOOGLE_CLIENT_SECRET`,
WithDecryption: true
})) ?? ``
const [
client_id = ``,
client_secret = ``,
googleApiKey = ``,
jwtPrivateKey = ``,
comicVineApiKey = ``
] = await getParametersValue({
Names: [
"GOOGLE_CLIENT_ID",
"GOOGLE_CLIENT_SECRET",
"GOOGLE_API_KEY",
"jwt-private-key",
"COMiCVINE_API_KEY"
],
WithDecryption: true
})

const googleApiKey = await getParameterValue({
Name: `GOOGLE_API_KEY`,
WithDecryption: true
configureGoogleDataSource({
client_id,
client_secret
})

const soft = event.body.soft === true
const authorization = event.body.authorization ?? ``
const rawCredentials = event.body.credentials ?? JSON.stringify({})
const credentials = JSON.parse(rawCredentials)

const { name: userName } = await withToken({
headers: {
authorization
}
})
const { name: userName } = await withToken(
{
headers: {
authorization
}
},
jwtPrivateKey
)

const collectionId: string | undefined = event.body.collectionId

Expand All @@ -50,7 +57,7 @@ const lambda: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (

const lockId = `metadata-collection_${collectionId}`

const db = await getNanoDbForUser(userName)
const db = await getNanoDbForUser(userName, jwtPrivateKey)

const collection = await findOne(db, "obokucollection", {
selector: { _id: collectionId }
Expand All @@ -63,7 +70,8 @@ const lambda: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (
googleApiKey,
db,
credentials,
soft
soft,
comicVineApiKey
})
} catch (e) {
await deleteLock(supabase, lockId)
Expand Down
170 changes: 90 additions & 80 deletions packages/api/src/functions/refreshMetadataLongProcess/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,112 +9,122 @@ import schema from "./schema"
import { atomicUpdate, findOne, getNanoDbForUser } from "@libs/couch/dbHelpers"
import { PromiseReturnType } from "@libs/types"
import { retrieveMetadataAndSaveCover } from "@libs/books/retrieveMetadataAndSaveCover"
import { getParameterValue } from "@libs/ssm"
import { getParametersValue } from "@libs/ssm"
import { deleteLock } from "@libs/supabase/deleteLock"
import { supabase } from "@libs/supabase/client"
import { Logger } from "@libs/logger"

const lambda: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (
event
) => {
configureGoogleDataSource({
client_id:
(await getParameterValue({
Name: `GOOGLE_CLIENT_ID`,
WithDecryption: true
})) ?? ``,
client_secret:
(await getParameterValue({
Name: `GOOGLE_CLIENT_SECRET`,
WithDecryption: true
})) ?? ``
})

const googleApiKey = await getParameterValue({
Name: `GOOGLE_API_KEY`,
WithDecryption: true
})

if (!OFFLINE) {
const files = await fs.promises.readdir(TMP_DIR)

await Promise.all(
files.map((file) => {
return fs.promises.unlink(path.join(TMP_DIR, file))
})
)
}

const bookId = event.body.bookId
const lockId = `metadata_${bookId}`
const authorization = event.body.authorization ?? ``
const rawCredentials = event.body.credentials ?? JSON.stringify({})
const credentials = JSON.parse(rawCredentials)

const { name: userName } = await withToken({
headers: {
authorization
try {
const [
client_id = ``,
client_secret = ``,
googleApiKey = ``,
jwtPrivateKey = ``
] = await getParametersValue({
Names: [
"GOOGLE_CLIENT_ID",
"GOOGLE_CLIENT_SECRET",
"GOOGLE_API_KEY",
"jwt-private-key"
],
WithDecryption: true
})

configureGoogleDataSource({
client_id,
client_secret
})

if (!OFFLINE) {
const files = await fs.promises.readdir(TMP_DIR)

await Promise.all(
files.map((file) => {
return fs.promises.unlink(path.join(TMP_DIR, file))
})
)
}
})
const userNameHex = Buffer.from(userName).toString("hex")
const bookId: string | undefined = event.body.bookId

if (!bookId) {
throw new Error(`Unable to parse event.body -> ${event.body}`)
}
const credentials = JSON.parse(rawCredentials)

const lockId = `metadata_${event.body.bookId}`
const { name: userName } = await withToken(
{
headers: {
authorization
}
},
jwtPrivateKey
)
const userNameHex = Buffer.from(userName).toString("hex")
const bookId: string | undefined = event.body.bookId

const db = await getNanoDbForUser(userName)
if (!bookId) {
throw new Error(`Unable to parse event.body -> ${event.body}`)
}

const book = await findOne(db, "book", { selector: { _id: bookId } })
const db = await getNanoDbForUser(userName, jwtPrivateKey)

if (!book) throw new Error(`Unable to find book ${bookId}`)
const book = await findOne(db, "book", { selector: { _id: bookId } })

if (book.metadataUpdateStatus !== "fetching") {
await atomicUpdate(db, "book", book._id, (old) => ({
...old,
metadataUpdateStatus: "fetching" as const
}))
}
if (!book) throw new Error(`Unable to find book ${bookId}`)

const firstLinkId = (book.links || [])[0] || "-1"
if (book.metadataUpdateStatus !== "fetching") {
await atomicUpdate(db, "book", book._id, (old) => ({
...old,
metadataUpdateStatus: "fetching" as const
}))
}

const link = await findOne(db, "link", { selector: { _id: firstLinkId } })
const firstLinkId = (book.links || [])[0] || "-1"

if (!link) throw new Error(`Unable to find link ${firstLinkId}`)
const link = await findOne(db, "link", { selector: { _id: firstLinkId } })

let data: PromiseReturnType<typeof retrieveMetadataAndSaveCover>
if (!link) throw new Error(`Unable to find link ${firstLinkId}`)

try {
data = await retrieveMetadataAndSaveCover({
userName,
userNameHex,
credentials,
book,
link,
googleApiKey,
db
})
} catch (e) {
await atomicUpdate(db, "book", book._id, (old) => ({
...old,
metadataUpdateStatus: null,
lastMetadataUpdateError: "unknown"
}))
let data: PromiseReturnType<typeof retrieveMetadataAndSaveCover>

await deleteLock(supabase, lockId)
try {
data = await retrieveMetadataAndSaveCover({
userName,
userNameHex,
credentials,
book,
link,
googleApiKey,
db
})
} catch (e) {
await atomicUpdate(db, "book", book._id, (old) => ({
...old,
metadataUpdateStatus: null,
lastMetadataUpdateError: "unknown"
}))

throw e
}

throw e
}
await Promise.all([
atomicUpdate(db, "link", link._id, (old) => ({
...old,
contentLength: data.link.contentLength
})),
deleteLock(supabase, lockId)
])

await Promise.all([
atomicUpdate(db, "link", link._id, (old) => ({
...old,
contentLength: data.link.contentLength
})),
deleteLock(supabase, lockId)
])
Logger.info(`lambda executed with success for ${book._id}`)
} catch (error) {
await deleteLock(supabase, lockId)

Logger.info(`lambda executed with success for ${book._id}`)
throw error
}

return {
statusCode: 200,
Expand Down
Loading

0 comments on commit 5f3becf

Please sign in to comment.