Publisher job simplification

Publisher jobs now store the the activity id instead of inserting duplicate JSON data in the Oban queue for each delivery.
This commit is contained in:
Mark Felder 2024-07-28 20:41:21 -04:00
parent 6876761837
commit 59309a9eff
3 changed files with 48 additions and 34 deletions

View file

@ -0,0 +1 @@
Publisher jobs now store the the activity id instead of inserting duplicate JSON data in the Oban queue for each delivery.

View file

@ -80,13 +80,26 @@ def representable?(%Activity{} = activity) do
parameters set: parameters set:
* `inbox`: the inbox to publish to * `inbox`: the inbox to publish to
* `json`: the JSON message body representing the ActivityPub message
* `actor`: the actor which is signing the message * `actor`: the actor which is signing the message
* `id`: the ActivityStreams URI of the message * `activity_id`: the internal activity id
* `cc`: the cc recipients relevant to this inbox (optional)
""" """
def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do def publish_one(%{inbox: inbox, actor: %User{} = actor, activity_id: activity_id} = params) do
Logger.debug("Federating #{id} to #{inbox}") activity = Activity.get_by_id(activity_id)
ap_id = activity.data["id"]
Logger.debug("Federating #{ap_id} to #{inbox}")
uri = %{path: path} = URI.parse(inbox) uri = %{path: path} = URI.parse(inbox)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
cc = Map.get(params, :cc)
json =
data
|> Map.put("cc", cc)
|> Jason.encode!()
digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
date = Pleroma.Signature.signed_date() date = Pleroma.Signature.signed_date()
@ -119,7 +132,7 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa
else else
{_post_result, %{status: code} = response} = e -> {_post_result, %{status: code} = response} = e ->
unless params[:unreachable_since], do: Instances.set_unreachable(inbox) unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
Logger.metadata(activity: id, inbox: inbox, status: code) Logger.metadata(id: activity_id, inbox: inbox, status: code)
Logger.error("Publisher failed to inbox #{inbox} with status #{code}") Logger.error("Publisher failed to inbox #{inbox} with status #{code}")
case response do case response do
@ -136,7 +149,7 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa
e -> e ->
unless params[:unreachable_since], do: Instances.set_unreachable(inbox) unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
Logger.metadata(activity: id, inbox: inbox) Logger.metadata(activity: activity_id, inbox: inbox)
Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}") Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}")
{:error, e} {:error, e}
end end
@ -251,7 +264,6 @@ def determine_inbox(
def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
when is_list(bcc) and bcc != [] do when is_list(bcc) and bcc != [] do
public = public?(activity) public = public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
[priority_recipients, recipients] = recipients(actor, activity) [priority_recipients, recipients] = recipients(actor, activity)
@ -276,16 +288,11 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
# instance would only accept a first message for the first recipient and ignore the rest. # instance would only accept a first message for the first recipient and ignore the rest.
cc = get_cc_ap_ids(ap_id, recipients) cc = get_cc_ap_ids(ap_id, recipients)
json =
data
|> Map.put("cc", cc)
|> Jason.encode!()
__MODULE__.enqueue_one(%{ __MODULE__.enqueue_one(%{
inbox: inbox, inbox: inbox,
json: json, cc: cc,
actor_id: actor.id, actor_id: actor.id,
id: activity.data["id"], activity_id: activity.id,
unreachable_since: unreachable_since unreachable_since: unreachable_since
}) })
end) end)
@ -302,9 +309,6 @@ def publish(%User{} = actor, %Activity{} = activity) do
Relay.publish(activity) Relay.publish(activity)
end end
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
[priority_inboxes, inboxes] = [priority_inboxes, inboxes] =
recipients(actor, activity) recipients(actor, activity)
|> Enum.map(fn recipients -> |> Enum.map(fn recipients ->
@ -326,9 +330,8 @@ def publish(%User{} = actor, %Activity{} = activity) do
__MODULE__.enqueue_one( __MODULE__.enqueue_one(
%{ %{
inbox: inbox, inbox: inbox,
json: json,
actor_id: actor.id, actor_id: actor.id,
id: activity.data["id"], activity_id: activity.id,
unreachable_since: unreachable_since unreachable_since: unreachable_since
}, },
priority: priority priority: priority

View file

@ -137,6 +137,7 @@ test "it returns inbox for messages involving single recipients in total" do
test "publish to url with with different ports" do test "publish to url with with different ports" do
inbox80 = "http://42.site/users/nick1/inbox" inbox80 = "http://42.site/users/nick1/inbox"
inbox42 = "http://42.site:42/users/nick1/inbox" inbox42 = "http://42.site:42/users/nick1/inbox"
activity = insert(:note_activity)
mock(fn mock(fn
%{method: :post, url: "http://42.site:42/users/nick1/inbox"} -> %{method: :post, url: "http://42.site:42/users/nick1/inbox"} ->
@ -151,18 +152,16 @@ test "publish to url with with different ports" do
assert {:ok, %{body: "port 42"}} = assert {:ok, %{body: "port 42"}} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox42, inbox: inbox42,
json: "{}",
actor: actor, actor: actor,
id: 1, activity_id: activity.id,
unreachable_since: true unreachable_since: true
}) })
assert {:ok, %{body: "port 80"}} = assert {:ok, %{body: "port 80"}} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox80, inbox: inbox80,
json: "{}",
actor: actor, actor: actor,
id: 1, activity_id: activity.id,
unreachable_since: true unreachable_since: true
}) })
end end
@ -173,8 +172,11 @@ test "publish to url with with different ports" do
[] do [] do
actor = insert(:user) actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox" inbox = "http://200.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:ok, _} =
Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id})
assert {:ok, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})
assert called(Instances.set_reachable(inbox)) assert called(Instances.set_reachable(inbox))
end end
@ -184,13 +186,13 @@ test "publish to url with with different ports" do
[] do [] do
actor = insert(:user) actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox" inbox = "http://200.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:ok, _} = assert {:ok, _} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox, inbox: inbox,
json: "{}",
actor: actor, actor: actor,
id: 1, activity_id: activity.id,
unreachable_since: NaiveDateTime.utc_now() unreachable_since: NaiveDateTime.utc_now()
}) })
@ -203,13 +205,13 @@ test "publish to url with with different ports" do
[] do [] do
actor = insert(:user) actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox" inbox = "http://200.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:ok, _} = assert {:ok, _} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox, inbox: inbox,
json: "{}",
actor: actor, actor: actor,
id: 1, activity_id: activity.id,
unreachable_since: nil unreachable_since: nil
}) })
@ -222,9 +224,10 @@ test "publish to url with with different ports" do
[] do [] do
actor = insert(:user) actor = insert(:user)
inbox = "http://404.site/users/nick1/inbox" inbox = "http://404.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:cancel, _} = assert {:cancel, _} =
Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id})
assert called(Instances.set_unreachable(inbox)) assert called(Instances.set_unreachable(inbox))
end end
@ -235,10 +238,15 @@ test "publish to url with with different ports" do
[] do [] do
actor = insert(:user) actor = insert(:user)
inbox = "http://connrefused.site/users/nick1/inbox" inbox = "http://connrefused.site/users/nick1/inbox"
activity = insert(:note_activity)
assert capture_log(fn -> assert capture_log(fn ->
assert {:error, _} = assert {:error, _} =
Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) Publisher.publish_one(%{
inbox: inbox,
actor: actor,
activity_id: activity.id
})
end) =~ "connrefused" end) =~ "connrefused"
assert called(Instances.set_unreachable(inbox)) assert called(Instances.set_unreachable(inbox))
@ -250,8 +258,10 @@ test "publish to url with with different ports" do
[] do [] do
actor = insert(:user) actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox" inbox = "http://200.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:ok, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) assert {:ok, _} =
Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id})
refute called(Instances.set_unreachable(inbox)) refute called(Instances.set_unreachable(inbox))
end end
@ -262,14 +272,14 @@ test "publish to url with with different ports" do
[] do [] do
actor = insert(:user) actor = insert(:user)
inbox = "http://connrefused.site/users/nick1/inbox" inbox = "http://connrefused.site/users/nick1/inbox"
activity = insert(:note_activity)
assert capture_log(fn -> assert capture_log(fn ->
assert {:error, _} = assert {:error, _} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox, inbox: inbox,
json: "{}",
actor: actor, actor: actor,
id: 1, activity_id: activity.id,
unreachable_since: NaiveDateTime.utc_now() unreachable_since: NaiveDateTime.utc_now()
}) })
end) =~ "connrefused" end) =~ "connrefused"
@ -406,7 +416,7 @@ test "publish to url with with different ports" do
Publisher.enqueue_one(%{ Publisher.enqueue_one(%{
inbox: "https://domain.com/users/nick1/inbox", inbox: "https://domain.com/users/nick1/inbox",
actor_id: actor.id, actor_id: actor.id,
id: note_activity.data["id"] activity_id: note_activity.id
}) })
) )
end end