Skip to content

Commit

Permalink
feat: Add job to download captions
Browse files Browse the repository at this point in the history
  • Loading branch information
Betree committed Jun 18, 2024
1 parent af7d1df commit fb8586a
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 51 deletions.
19 changes: 0 additions & 19 deletions apps/cf/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,6 @@ config :cf,
soft_limitations_period: 15 * 60,
hard_limitations_period: 3 * 60 * 60

# Configure scheduler
config :cf, CF.Scheduler,
# Run only one instance across cluster
global: true,
debug_logging: false,
jobs: [
# credo:disable-for-lines:10
# Actions analysers
# Every minute
{"*/1 * * * *", {CF.Jobs.Reputation, :update, []}},
# Every day
{"@daily", {CF.Jobs.Reputation, :reset_daily_limits, []}},
# Every minute
{"*/1 * * * *", {CF.Jobs.Flags, :update, []}},
# Various updaters
# Every 5 minutes
{"*/5 * * * *", {CF.Jobs.Moderation, :update, []}}
]

# Configure mailer
config :cf, CF.Mailer, adapter: Bamboo.MailgunAdapter

Expand Down
3 changes: 0 additions & 3 deletions apps/cf/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ config :cf,
# Print only warnings and errors during test
config :logger, level: :warn

# Disable CRON tasks on test
config :cf, CF.Scheduler, jobs: []

# Mails
config :cf, CF.Mailer, adapter: Bamboo.TestAdapter

Expand Down
3 changes: 2 additions & 1 deletion apps/cf/lib/videos/captions_fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ defmodule CF.Videos.CaptionsFetcher do
Fetch captions for videos.
"""

@callback fetch(DB.Schema.Video.t()) :: {:ok, DB.Schema.VideoCaption.t()} | {:error, binary()}
@callback fetch(DB.Schema.Video.t()) ::
{:ok, %{content: String.t(), format: String.t()}} | {:error, term()}
end
2 changes: 1 addition & 1 deletion apps/cf/lib/videos/captions_fetcher_test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule CF.Videos.CaptionsFetcherTest do

@impl true
def fetch(_video) do
captions = %DB.Schema.VideoCaption{
captions = %{
content: "__TEST-CONTENT__",
format: "xml"
}
Expand Down
129 changes: 112 additions & 17 deletions apps/cf/lib/videos/captions_fetcher_youtube.ex
Original file line number Diff line number Diff line change
@@ -1,38 +1,133 @@
defmodule CF.Videos.CaptionsFetcherYoutube do
@moduledoc """
A captions fetcher for YouTube.
Based upon https://github.com/Valian/youtube-captions, but adapted with Httpoison.
"""

@behaviour CF.Videos.CaptionsFetcher

require Logger

@impl true
def fetch(%{youtube_id: youtube_id, language: language}) do
with {:ok, content} <- fetch_captions_content(youtube_id, language) do
captions = %DB.Schema.VideoCaption{
content: content,
format: "xml"
}

{:ok, captions}
with {:ok, data} <- fetch_youtube_data(youtube_id),
{:ok, caption_tracks} <- parse_caption_tracks(data),
{:ok, transcript_url} <- find_transcript_url(caption_tracks, language),
{:ok, transcript_data} <- fetch_transcript(transcript_url) do
{:ok,
%{
raw: transcript_data,
parsed: process_transcript(transcript_data),
format: "xml"
}}
end
end

defp fetch_captions_content(video_id, locale) do
case HTTPoison.get("http://video.google.com/timedtext?lang=#{locale}&v=#{video_id}") do
{:ok, %HTTPoison.Response{status_code: 200, body: ""}} ->
{:error, :not_found}
defp fetch_youtube_data(video_id) do
url = "https://www.youtube.com/watch?v=#{video_id}"

{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
case HTTPoison.get(url, []) do
{:ok, %HTTPoison.Response{body: body}} ->
{:ok, body}

{:ok, %HTTPoison.Response{status_code: 404}} ->
{:error, :not_found}
{:error, %HTTPoison.Error{reason: reason}} ->
{:error, "Failed to fetch YouTube video #{url}: #{inspect(reason)}"}
end
end

{:ok, %HTTPoison.Response{status_code: _}} ->
{:error, :unknown}
defp parse_caption_tracks(data) do
captions_regex = ~r/"captionTracks":(?<data>\[.*?\])/
case Regex.named_captures(captions_regex, data) do
%{"data" => data} -> {:ok, Jason.decode!(data)}
_ -> {:error, "Could not find captions for video"}
end
end

defp find_transcript_url(caption_tracks, lang) do
case Enum.find(caption_tracks, &Regex.match?(~r".#{lang}", &1["vssId"])) do
nil ->
{:error, "Unable to find transcript for language #{lang}"}

%{"baseUrl" => base_url} ->
{:ok, base_url}

_data ->
{:error, "Unable to find transcript URL for language #{lang}"}
end
end

defp fetch_transcript(base_url) do
case HTTPoison.get(base_url, []) do
{:ok, %HTTPoison.Response{body: body}} ->
{:ok, body}

{:error, %HTTPoison.Error{reason: reason}} ->
{:error, reason}
{:error, "Failed to fetch transcript: #{inspect(reason)}"}
end
end

defp process_transcript(transcript) do
transcript
|> String.replace(~r/^<\?xml version="1.0" encoding="utf-8"\?><transcript>/, "")
|> String.replace("</transcript>", "")
|> String.split("</text>")
|> Enum.filter(&(String.trim(&1) != ""))
|> Enum.map(&process_line/1)
end

defp process_line(line) do
%{"start" => start} = Regex.named_captures(~r/start="(?<start>[\d.]+)"/, line)
%{"dur" => dur} = Regex.named_captures(~r/dur="(?<dur>[\d.]+)"/, line)

text =
line
|> String.replace("&amp;", "&")
|> String.replace(~r/<text.+>/, "")
|> String.replace(~r"</?[^>]+(>|$)", "")
|> HtmlEntities.decode()
|> String.trim()

%{start: parse_float(start), duration: parse_float(dur), text: text}
end

defp parse_float(val) do
{num, _} = Float.parse(val)
num
end

# defp fetch_captions_content_with_official_api(video_id, locale) do
# # TODO: Continue dev here. See https://www.perplexity.ai/search/Can-you-show-jioyCtw.S4yrL8mlIBdqGg
# {:ok, token} = Goth.Token.for_scope("https://www.googleapis.com/auth/youtube.force-ssl")
# conn = YouTubeConnection.new(token.token)

# {:ok, captions} = GoogleApi.YouTube.V3.Api.Captions.youtube_captions_list(conn, ["snippet"], video_id, [])
# # {
# # "kind": "youtube#captionListResponse",
# # "etag": "kMTAKpyU_VGu7GxgEnxXHqcuEXM",
# # "items": [
# # {
# # "kind": "youtube#caption",
# # "etag": "tWo68CIcRRFZA0oXPt8HGxCYia4",
# # "id": "AUieDaZJxYug0L5YNAw_31GbXz73b0CPXCDFlsPNSNe7KQvuv1g",
# # "snippet": {
# # "videoId": "v2IoEhuho2k",
# # "lastUpdated": "2024-06-16T18:45:12.56697Z",
# # "trackKind": "asr",
# # "language": "fr",
# # "name": "",
# # "audioTrackType": "unknown",
# # "isCC": false,
# # "isLarge": false,
# # "isEasyReader": false,
# # "isDraft": false,
# # "isAutoSynced": false,
# # "status": "serving"
# # }
# # }
# # ]
# # }

# caption_id = List.first(captions.items).id # TODO inspect to pick the right caption
# {:ok, caption} = GoogleApi.YouTube.V3.Api.Captions.youtube_captions_download(conn, caption_id, [])
# end
end
37 changes: 32 additions & 5 deletions apps/cf/lib/videos/videos.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule CF.Videos do
The boundary for the Videos system.
"""

require Logger

import Ecto.Query, warn: false
import CF.Videos.MetadataFetcher
import CF.Videos.CaptionsFetcher
Expand Down Expand Up @@ -167,12 +169,37 @@ defmodule CF.Videos do
iex> download_captions(video)
"""
def download_captions(video = %Video{}) do
with {:ok, captions} <- @captions_fetcher.fetch(video) do
captions
|> VideoCaption.changeset(%{video_id: video.id})
|> Repo.insert()
# Try to fetch new captions
case @captions_fetcher.fetch(video) do
{:ok, captions} ->
get_captions_base(video)
|> VideoCaption.changeset(Map.merge(captions, %{video_id: video.id}))
|> Repo.insert_or_update()
|> Kaur.Result.ok()

# If no Youtube caption found, insert a dummy entry in DB to prevent retrying for 30 days
{:error, :not_found} ->
# TODO
# unless existing_captions do
# Repo.insert(%DB.Schema.VideoCaption{video_id: video.id, content: "", format: "xml"})
# end

{:error, :not_found}

result ->
result
end
end

{:ok, captions}
defp get_captions_base(video) do
VideoCaption
|> where([vc], vc.video_id == ^video.id)
|> order_by(desc: :inserted_at)
|> limit(1)
|> Repo.one()
|> case do
nil -> %VideoCaption{}
vc -> vc
end
end

Expand Down
11 changes: 11 additions & 0 deletions apps/cf_jobs/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ config :cf_jobs, CF.Jobs.Scheduler,
jobs: [
# Reputation
update_reputations: [
# every 20 minutes
schedule: {:extended, "*/20"},
task: {CF.Jobs.Reputation, :update, []},
overlap: false
Expand All @@ -19,21 +20,31 @@ config :cf_jobs, CF.Jobs.Scheduler,
],
# Moderation
update_moderation: [
# every 5 minutes
schedule: "*/5 * * * *",
task: {CF.Jobs.Moderation, :update, []},
overlap: false
],
# Flags
update_flags: [
# every minute
schedule: "*/1 * * * *",
task: {CF.Jobs.Flags, :update, []},
overlap: false
],
# Notifications
create_notifications: [
# every 5 seconds
schedule: {:extended, "*/5"},
task: {CF.Jobs.CreateNotifications, :update, []},
overlap: false
],
# Captions
download_captions: [
# every 10 minutes
schedule: "*/10 * * * *",
task: {CF.Jobs.DownloadCaptions, :update, []},
overlap: false
]
]

Expand Down
4 changes: 3 additions & 1 deletion apps/cf_jobs/lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ defmodule CF.Jobs.Application do
:timer.sleep(1000)

env = Application.get_env(:cf, :env)

# Define workers and child supervisors to be supervised
children = [
# Jobs
worker(CF.Jobs.Reputation, []),
worker(CF.Jobs.Flags, []),
worker(CF.Jobs.Moderation, []),
worker(CF.Jobs.CreateNotifications, [])
worker(CF.Jobs.CreateNotifications, []),
worker(CF.Jobs.DownloadCaptions, [])
]

# Do not start scheduler in tests
Expand Down
66 changes: 66 additions & 0 deletions apps/cf_jobs/lib/jobs/download_captions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule CF.Jobs.DownloadCaptions do
@behaviour CF.Jobs.Job

require Logger
import Ecto.Query
import ScoutApm.Tracing

alias DB.Repo
alias DB.Schema.UserAction
alias DB.Schema.Video
alias DB.Schema.VideoCaption
alias DB.Schema.UsersActionsReport

alias CF.Jobs.ReportManager

@name :download_captions
@analyser_id UsersActionsReport.analyser_id(@name)

# --- Client API ---

def name, do: @name

def start_link() do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

def init(args) do
{:ok, args}
end

# 2 minutes
@timeout 120_000
def update() do
GenServer.call(__MODULE__, :download_captions, @timeout)
end

# --- Server callbacks ---
@transaction_opts [type: "background", name: "download_captions"]
def handle_call(:download_captions, _from, _state) do
get_videos()
|> Enum.map(fn video ->
Logger.info("Downloading captions for video #{video.id}")
CF.Videos.download_captions(video)
end)

{:reply, :ok, :ok}
end

# Get all videos that need new captions. We fetch new captions:
# - For any videos that doesn't have any captions yet
# - For videos whose captions haven't been updated in the last 30 days
defp get_videos() do
Repo.all(
from(v in Video,
limit: 15,
left_join: captions in VideoCaption,
on: captions.video_id == v.id,
where:
is_nil(captions.id) or
captions.inserted_at < ^DateTime.add(DateTime.utc_now(), -30 * 24 * 60 * 60, :second),
group_by: v.id,
order_by: [desc: v.inserted_at]
)
)
end
end
1 change: 1 addition & 0 deletions apps/db/lib/db_schema/users_actions_report.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defmodule DB.Schema.UsersActionsReport do
def analyser_id(:achievements), do: 3
def analyser_id(:votes), do: 4
def analyser_id(:create_notifications), do: 5
def analyser_id(:download_captions), do: 6

def status(:pending), do: 1
def status(:running), do: 2
Expand Down
Loading

0 comments on commit fb8586a

Please sign in to comment.