RichMedia backfill processing through Oban

This commit is contained in:
Mark Felder 2024-06-19 23:18:38 -04:00
parent 4dfa50f256
commit 17d04ccc8b
10 changed files with 59 additions and 60 deletions

View file

@ -0,0 +1 @@
Rich Media backfilling is now an Oban job

View file

@ -180,8 +180,6 @@
config :pleroma, Pleroma.Web.Streaming, sync_streaming: true
config :pleroma, Pleroma.Web.MastodonAPI.StatusView, sync_fetching: true
config :pleroma, Pleroma.Uploaders.Uploader, timeout: 1_000
config :pleroma, Pleroma.Emoji.Loader, test_emoji: true

View file

@ -8,7 +8,6 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
require Pleroma.Constants
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.HTML
alias Pleroma.Maps
alias Pleroma.Object
@ -31,13 +30,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
# pagination is restricted to 40 activities at a time
defp fetch_rich_media_for_activities(activities) do
Enum.each(activities, fn activity ->
fun = fn -> Card.get_by_activity(activity) end
if Config.get([__MODULE__, :sync_fetching], false) do
fun.()
else
spawn(fun)
end
Card.get_by_activity(activity)
end)
end

View file

@ -10,31 +10,21 @@ defmodule Pleroma.Web.RichMedia.Backfill do
require Logger
@backfiller Pleroma.Config.get([__MODULE__, :provider], Pleroma.Web.RichMedia.Backfill.Task)
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
@max_attempts 3
@retry 5_000
def start(%{url: url} = args) when is_binary(url) do
@spec run(map()) ::
:ok | {:error, {:invalid_metadata, any()} | :body_too_large | {:content, any()} | any()}
def run(%{"url" => url} = args) do
url_hash = Card.url_to_hash(url)
args =
args
|> Map.put(:attempt, 1)
|> Map.put(:url_hash, url_hash)
@backfiller.run(args)
end
def run(%{url: url, url_hash: url_hash, attempt: attempt} = args)
when attempt <= @max_attempts do
case Parser.parse(url) do
{:ok, fields} ->
{:ok, card} = Card.create(url, fields)
maybe_schedule_expiration(url, fields)
if Map.has_key?(args, :activity_id) do
with %{"activity_id" => activity_id} <- args,
false <- is_nil(activity_id) do
stream_update(args)
end
@ -54,19 +44,10 @@ def run(%{url: url, url_hash: url_hash, attempt: attempt} = args)
e ->
Logger.debug("Rich media error for #{url}: #{inspect(e)}")
:timer.sleep(@retry * attempt)
run(%{args | attempt: attempt + 1})
{:error, e}
end
end
def run(%{url: url, url_hash: url_hash}) do
Logger.debug("Rich media failure for #{url}")
negative_cache(url_hash, :timer.minutes(15))
end
defp maybe_schedule_expiration(url, fields) do
case TTL.process(fields, url) do
{:ok, ttl} when is_number(ttl) ->
@ -80,22 +61,14 @@ defp maybe_schedule_expiration(url, fields) do
end
end
defp stream_update(%{activity_id: activity_id}) do
defp stream_update(%{"activity_id" => activity_id}) do
Pleroma.Activity.get_by_id(activity_id)
|> Pleroma.Activity.normalize()
|> Pleroma.Web.ActivityPub.ActivityPub.stream_out()
end
defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val)
defp negative_cache(key, ttl \\ nil), do: @cachex.put(:rich_media_cache, key, nil, ttl: ttl)
end
defmodule Pleroma.Web.RichMedia.Backfill.Task do
alias Pleroma.Web.RichMedia.Backfill
def run(args) do
Task.Supervisor.start_child(Pleroma.TaskSupervisor, Backfill, :run, [args],
name: {:global, {:rich_media, args.url_hash}}
)
end
defp negative_cache(key, ttl \\ :timer.minutes(15)),
do: @cachex.put(:rich_media_cache, key, nil, ttl: ttl)
end

View file

@ -7,8 +7,8 @@ defmodule Pleroma.Web.RichMedia.Card do
alias Pleroma.HTML
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.Web.RichMedia.Backfill
alias Pleroma.Web.RichMedia.Parser
alias Pleroma.Workers.RichMediaWorker
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
@config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
@ -75,17 +75,18 @@ def get_by_url(url) when is_binary(url) do
def get_by_url(nil), do: nil
@spec get_or_backfill_by_url(String.t(), map()) :: t() | nil
def get_or_backfill_by_url(url, backfill_opts \\ %{}) do
@spec get_or_backfill_by_url(String.t(), keyword()) :: t() | nil
def get_or_backfill_by_url(url, opts \\ []) do
if @config_impl.get([:rich_media, :enabled]) do
case get_by_url(url) do
%__MODULE__{} = card ->
card
nil ->
backfill_opts = Map.put(backfill_opts, :url, url)
activity_id = Keyword.get(opts, :activity, nil)
Backfill.start(backfill_opts)
RichMediaWorker.new(%{"op" => "backfill", "url" => url, "activity_id" => activity_id})
|> Oban.insert()
nil
@ -137,7 +138,7 @@ def get_by_activity(activity) do
nil
else
{:cached, url} ->
get_or_backfill_by_url(url, %{activity_id: activity.id})
get_or_backfill_by_url(url, activity_id: activity.id)
_ ->
:error

View file

@ -3,13 +3,17 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.RichMediaWorker do
alias Pleroma.Web.RichMedia.Backfill
alias Pleroma.Web.RichMedia.Card
use Oban.Worker,
queue: :background
use Oban.Worker, queue: :background, max_attempts: 3, unique: [period: 300]
@impl Oban.Worker
def perform(%Job{args: %{"op" => "expire", "url" => url} = _args}) do
Card.delete(url)
end
def perform(%Job{args: %{"op" => "backfill", "url" => _url} = args}) do
Backfill.run(args)
end
end

View file

@ -9,6 +9,7 @@ defmodule Pleroma.Web.PleromaAPI.ChatMessageReferenceViewTest do
alias Pleroma.Chat
alias Pleroma.Chat.MessageReference
alias Pleroma.Object
alias Pleroma.Tests.ObanHelpers
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.CommonAPI
@ -70,6 +71,8 @@ test "it displays a chat message" do
media_id: upload.id
)
ObanHelpers.perform_all()
object = Object.normalize(activity, fetch: false)
cm_ref = MessageReference.for_chat_and_object(chat, object)

View file

@ -5,6 +5,7 @@
defmodule Pleroma.Web.RichMedia.CardTest do
use Pleroma.DataCase, async: true
alias Pleroma.Tests.ObanHelpers
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.RichMedia.Card
@ -36,6 +37,8 @@ test "crawls URL in activity" do
content_type: "text/markdown"
})
ObanHelpers.perform_all()
assert %Card{url_hash: ^url_hash, fields: _} = Card.get_by_activity(activity)
end
@ -50,6 +53,7 @@ test "recrawls URLs on status edits/updates" do
# Force a backfill
Card.get_by_activity(activity)
ObanHelpers.perform_all()
assert match?(
%Card{url_hash: ^original_url_hash, fields: _},
@ -62,6 +66,7 @@ test "recrawls URLs on status edits/updates" do
# Force a backfill
Card.get_by_activity(activity)
ObanHelpers.perform_all()
assert match?(
%Card{url_hash: ^updated_url_hash, fields: _},

View file

@ -4,10 +4,11 @@
defmodule Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrlTest do
use Pleroma.DataCase, async: false
use Oban.Testing, repo: Pleroma.Repo
use Oban.Testing, repo: Pleroma.Repo, testing: :inline
import Mox
alias Pleroma.Tests.ObanHelpers
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
alias Pleroma.Web.RichMedia.Card
alias Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrl
@ -74,12 +75,19 @@ test "s3 signed url is parsed and correct ttl is set for rich media" do
Card.get_or_backfill_by_url(url)
assert_enqueued(
worker: Pleroma.Workers.RichMediaWorker,
args: %{"op" => "expire", "url" => url}
)
# Find the backfill job
expected_job =
[
worker: "Pleroma.Workers.RichMediaWorker",
args: %{"op" => "backfill", "url" => url}
]
[%Oban.Job{scheduled_at: scheduled_at}] = all_enqueued()
assert_enqueued(expected_job)
# Run it manually
ObanHelpers.perform_all()
[%Oban.Job{scheduled_at: scheduled_at} | _] = all_enqueued()
timestamp_dt = Timex.parse!(timestamp, "{ISO:Basic:Z}")

View file

@ -8,6 +8,7 @@ defmodule Pleroma.Web.RichMedia.Parser.TTL.OpengraphTest do
import Mox
alias Pleroma.Tests.ObanHelpers
alias Pleroma.UnstubbedConfigMock, as: ConfigMock
alias Pleroma.Web.RichMedia.Card
@ -36,6 +37,18 @@ test "OpenGraph TTL value is honored" do
Card.get_or_backfill_by_url(url)
# Find the backfill job
expected_job =
[
worker: "Pleroma.Workers.RichMediaWorker",
args: %{"op" => "backfill", "url" => url}
]
assert_enqueued(expected_job)
# Run it manually
ObanHelpers.perform_all()
assert_enqueued(
worker: Pleroma.Workers.RichMediaWorker,
args: %{"op" => "expire", "url" => url}