diff --git a/changelog.d/rich_media_backfill.change b/changelog.d/rich_media_backfill.change new file mode 100644 index 000000000..d746ac8ce --- /dev/null +++ b/changelog.d/rich_media_backfill.change @@ -0,0 +1 @@ +Rich Media backfilling is now an Oban job diff --git a/config/test.exs b/config/test.exs index 9afb2763a..1e8168548 100644 --- a/config/test.exs +++ b/config/test.exs @@ -180,8 +180,6 @@ config :pleroma, Pleroma.Web.Streaming, sync_streaming: true -config :pleroma, Pleroma.Web.MastodonAPI.StatusView, sync_fetching: true - config :pleroma, Pleroma.Uploaders.Uploader, timeout: 1_000 config :pleroma, Pleroma.Emoji.Loader, test_emoji: true diff --git a/lib/pleroma/web/mastodon_api/views/status_view.ex b/lib/pleroma/web/mastodon_api/views/status_view.ex index 963658b1e..d9d7e516a 100644 --- a/lib/pleroma/web/mastodon_api/views/status_view.ex +++ b/lib/pleroma/web/mastodon_api/views/status_view.ex @@ -8,7 +8,6 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do require Pleroma.Constants alias Pleroma.Activity - alias Pleroma.Config alias Pleroma.HTML alias Pleroma.Maps alias Pleroma.Object @@ -31,13 +30,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do # pagination is restricted to 40 activities at a time defp fetch_rich_media_for_activities(activities) do Enum.each(activities, fn activity -> - fun = fn -> Card.get_by_activity(activity) end - - if Config.get([__MODULE__, :sync_fetching], false) do - fun.() - else - spawn(fun) - end + Card.get_by_activity(activity) end) end diff --git a/lib/pleroma/web/rich_media/backfill.ex b/lib/pleroma/web/rich_media/backfill.ex index 46b879434..1d8cc87d4 100644 --- a/lib/pleroma/web/rich_media/backfill.ex +++ b/lib/pleroma/web/rich_media/backfill.ex @@ -10,31 +10,21 @@ defmodule Pleroma.Web.RichMedia.Backfill do require Logger - @backfiller Pleroma.Config.get([__MODULE__, :provider], Pleroma.Web.RichMedia.Backfill.Task) @cachex Pleroma.Config.get([:cachex, :provider], Cachex) - @max_attempts 3 - @retry 5_000 - def start(%{url: url} = args) when is_binary(url) do + @spec run(map()) :: + :ok | {:error, {:invalid_metadata, any()} | :body_too_large | {:content, any()} | any()} + def run(%{"url" => url} = args) do url_hash = Card.url_to_hash(url) - args = - args - |> Map.put(:attempt, 1) - |> Map.put(:url_hash, url_hash) - - @backfiller.run(args) - end - - def run(%{url: url, url_hash: url_hash, attempt: attempt} = args) - when attempt <= @max_attempts do case Parser.parse(url) do {:ok, fields} -> {:ok, card} = Card.create(url, fields) maybe_schedule_expiration(url, fields) - if Map.has_key?(args, :activity_id) do + with %{"activity_id" => activity_id} <- args, + false <- is_nil(activity_id) do stream_update(args) end @@ -54,19 +44,10 @@ def run(%{url: url, url_hash: url_hash, attempt: attempt} = args) e -> Logger.debug("Rich media error for #{url}: #{inspect(e)}") - - :timer.sleep(@retry * attempt) - - run(%{args | attempt: attempt + 1}) + {:error, e} end end - def run(%{url: url, url_hash: url_hash}) do - Logger.debug("Rich media failure for #{url}") - - negative_cache(url_hash, :timer.minutes(15)) - end - defp maybe_schedule_expiration(url, fields) do case TTL.process(fields, url) do {:ok, ttl} when is_number(ttl) -> @@ -80,22 +61,14 @@ defp maybe_schedule_expiration(url, fields) do end end - defp stream_update(%{activity_id: activity_id}) do + defp stream_update(%{"activity_id" => activity_id}) do Pleroma.Activity.get_by_id(activity_id) |> Pleroma.Activity.normalize() |> Pleroma.Web.ActivityPub.ActivityPub.stream_out() end defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val) - defp negative_cache(key, ttl \\ nil), do: @cachex.put(:rich_media_cache, key, nil, ttl: ttl) -end -defmodule Pleroma.Web.RichMedia.Backfill.Task do - alias Pleroma.Web.RichMedia.Backfill - - def run(args) do - Task.Supervisor.start_child(Pleroma.TaskSupervisor, Backfill, :run, [args], - name: {:global, {:rich_media, args.url_hash}} - ) - end + defp negative_cache(key, ttl \\ :timer.minutes(15)), + do: @cachex.put(:rich_media_cache, key, nil, ttl: ttl) end diff --git a/lib/pleroma/web/rich_media/card.ex b/lib/pleroma/web/rich_media/card.ex index 040066f36..72ff5e791 100644 --- a/lib/pleroma/web/rich_media/card.ex +++ b/lib/pleroma/web/rich_media/card.ex @@ -7,8 +7,8 @@ defmodule Pleroma.Web.RichMedia.Card do alias Pleroma.HTML alias Pleroma.Object alias Pleroma.Repo - alias Pleroma.Web.RichMedia.Backfill alias Pleroma.Web.RichMedia.Parser + alias Pleroma.Workers.RichMediaWorker @cachex Pleroma.Config.get([:cachex, :provider], Cachex) @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config) @@ -75,17 +75,18 @@ def get_by_url(url) when is_binary(url) do def get_by_url(nil), do: nil - @spec get_or_backfill_by_url(String.t(), map()) :: t() | nil - def get_or_backfill_by_url(url, backfill_opts \\ %{}) do + @spec get_or_backfill_by_url(String.t(), keyword()) :: t() | nil + def get_or_backfill_by_url(url, opts \\ []) do if @config_impl.get([:rich_media, :enabled]) do case get_by_url(url) do %__MODULE__{} = card -> card nil -> - backfill_opts = Map.put(backfill_opts, :url, url) + activity_id = Keyword.get(opts, :activity, nil) - Backfill.start(backfill_opts) + RichMediaWorker.new(%{"op" => "backfill", "url" => url, "activity_id" => activity_id}) + |> Oban.insert() nil @@ -137,7 +138,7 @@ def get_by_activity(activity) do nil else {:cached, url} -> - get_or_backfill_by_url(url, %{activity_id: activity.id}) + get_or_backfill_by_url(url, activity_id: activity.id) _ -> :error diff --git a/lib/pleroma/workers/rich_media_worker.ex b/lib/pleroma/workers/rich_media_worker.ex index 968395c64..f18ac658a 100644 --- a/lib/pleroma/workers/rich_media_worker.ex +++ b/lib/pleroma/workers/rich_media_worker.ex @@ -3,13 +3,17 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.RichMediaWorker do + alias Pleroma.Web.RichMedia.Backfill alias Pleroma.Web.RichMedia.Card - use Oban.Worker, - queue: :background + use Oban.Worker, queue: :background, max_attempts: 3, unique: [period: 300] @impl Oban.Worker def perform(%Job{args: %{"op" => "expire", "url" => url} = _args}) do Card.delete(url) end + + def perform(%Job{args: %{"op" => "backfill", "url" => _url} = args}) do + Backfill.run(args) + end end diff --git a/test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs b/test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs index f17add774..c78c03aba 100644 --- a/test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs +++ b/test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs @@ -9,6 +9,7 @@ defmodule Pleroma.Web.PleromaAPI.ChatMessageReferenceViewTest do alias Pleroma.Chat alias Pleroma.Chat.MessageReference alias Pleroma.Object + alias Pleroma.Tests.ObanHelpers alias Pleroma.UnstubbedConfigMock, as: ConfigMock alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.CommonAPI @@ -70,6 +71,8 @@ test "it displays a chat message" do media_id: upload.id ) + ObanHelpers.perform_all() + object = Object.normalize(activity, fetch: false) cm_ref = MessageReference.for_chat_and_object(chat, object) diff --git a/test/pleroma/web/rich_media/card_test.exs b/test/pleroma/web/rich_media/card_test.exs index 516ac9951..c76df99e2 100644 --- a/test/pleroma/web/rich_media/card_test.exs +++ b/test/pleroma/web/rich_media/card_test.exs @@ -5,6 +5,7 @@ defmodule Pleroma.Web.RichMedia.CardTest do use Pleroma.DataCase, async: true + alias Pleroma.Tests.ObanHelpers alias Pleroma.UnstubbedConfigMock, as: ConfigMock alias Pleroma.Web.CommonAPI alias Pleroma.Web.RichMedia.Card @@ -36,6 +37,8 @@ test "crawls URL in activity" do content_type: "text/markdown" }) + ObanHelpers.perform_all() + assert %Card{url_hash: ^url_hash, fields: _} = Card.get_by_activity(activity) end @@ -50,6 +53,7 @@ test "recrawls URLs on status edits/updates" do # Force a backfill Card.get_by_activity(activity) + ObanHelpers.perform_all() assert match?( %Card{url_hash: ^original_url_hash, fields: _}, @@ -62,6 +66,7 @@ test "recrawls URLs on status edits/updates" do # Force a backfill Card.get_by_activity(activity) + ObanHelpers.perform_all() assert match?( %Card{url_hash: ^updated_url_hash, fields: _}, diff --git a/test/pleroma/web/rich_media/parser/ttl/aws_signed_url_test.exs b/test/pleroma/web/rich_media/parser/ttl/aws_signed_url_test.exs index 8fd9e6a5f..e02dd437a 100644 --- a/test/pleroma/web/rich_media/parser/ttl/aws_signed_url_test.exs +++ b/test/pleroma/web/rich_media/parser/ttl/aws_signed_url_test.exs @@ -4,10 +4,11 @@ defmodule Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrlTest do use Pleroma.DataCase, async: false - use Oban.Testing, repo: Pleroma.Repo + use Oban.Testing, repo: Pleroma.Repo, testing: :inline import Mox + alias Pleroma.Tests.ObanHelpers alias Pleroma.UnstubbedConfigMock, as: ConfigMock alias Pleroma.Web.RichMedia.Card alias Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrl @@ -74,12 +75,19 @@ test "s3 signed url is parsed and correct ttl is set for rich media" do Card.get_or_backfill_by_url(url) - assert_enqueued( - worker: Pleroma.Workers.RichMediaWorker, - args: %{"op" => "expire", "url" => url} - ) + # Find the backfill job + expected_job = + [ + worker: "Pleroma.Workers.RichMediaWorker", + args: %{"op" => "backfill", "url" => url} + ] - [%Oban.Job{scheduled_at: scheduled_at}] = all_enqueued() + assert_enqueued(expected_job) + + # Run it manually + ObanHelpers.perform_all() + + [%Oban.Job{scheduled_at: scheduled_at} | _] = all_enqueued() timestamp_dt = Timex.parse!(timestamp, "{ISO:Basic:Z}") diff --git a/test/pleroma/web/rich_media/parser/ttl/opengraph_test.exs b/test/pleroma/web/rich_media/parser/ttl/opengraph_test.exs index e275ee523..6805e786d 100644 --- a/test/pleroma/web/rich_media/parser/ttl/opengraph_test.exs +++ b/test/pleroma/web/rich_media/parser/ttl/opengraph_test.exs @@ -8,6 +8,7 @@ defmodule Pleroma.Web.RichMedia.Parser.TTL.OpengraphTest do import Mox + alias Pleroma.Tests.ObanHelpers alias Pleroma.UnstubbedConfigMock, as: ConfigMock alias Pleroma.Web.RichMedia.Card @@ -36,6 +37,18 @@ test "OpenGraph TTL value is honored" do Card.get_or_backfill_by_url(url) + # Find the backfill job + expected_job = + [ + worker: "Pleroma.Workers.RichMediaWorker", + args: %{"op" => "backfill", "url" => url} + ] + + assert_enqueued(expected_job) + + # Run it manually + ObanHelpers.perform_all() + assert_enqueued( worker: Pleroma.Workers.RichMediaWorker, args: %{"op" => "expire", "url" => url}