Merge branch 'oban/simpler-publish' into 'develop'

Publisher job simplification

See merge request pleroma/pleroma!4194
This commit is contained in:
feld 2024-07-30 15:23:28 +00:00
commit 18469f3b1d
7 changed files with 149 additions and 88 deletions

View file

@ -0,0 +1 @@
Publisher jobs now store the the activity id instead of inserting duplicate JSON data in the Oban queue for each delivery.

View file

@ -80,13 +80,26 @@ def representable?(%Activity{} = activity) do
parameters set: parameters set:
* `inbox`: the inbox to publish to * `inbox`: the inbox to publish to
* `json`: the JSON message body representing the ActivityPub message * `activity_id`: the internal activity id
* `actor`: the actor which is signing the message * `cc`: the cc recipients relevant to this inbox (optional)
* `id`: the ActivityStreams URI of the message
""" """
def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
Logger.debug("Federating #{id} to #{inbox}") activity = Activity.get_by_id_with_user_actor(activity_id)
actor = activity.user_actor
ap_id = activity.data["id"]
Logger.debug("Federating #{ap_id} to #{inbox}")
uri = %{path: path} = URI.parse(inbox) uri = %{path: path} = URI.parse(inbox)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
cc = Map.get(params, :cc)
json =
data
|> Map.put("cc", cc)
|> Jason.encode!()
digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
date = Pleroma.Signature.signed_date() date = Pleroma.Signature.signed_date()
@ -119,7 +132,7 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa
else else
{_post_result, %{status: code} = response} = e -> {_post_result, %{status: code} = response} = e ->
unless params[:unreachable_since], do: Instances.set_unreachable(inbox) unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
Logger.metadata(activity: id, inbox: inbox, status: code) Logger.metadata(activity: activity_id, inbox: inbox, status: code)
Logger.error("Publisher failed to inbox #{inbox} with status #{code}") Logger.error("Publisher failed to inbox #{inbox} with status #{code}")
case response do case response do
@ -136,21 +149,12 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa
e -> e ->
unless params[:unreachable_since], do: Instances.set_unreachable(inbox) unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
Logger.metadata(activity: id, inbox: inbox) Logger.metadata(activity: activity_id, inbox: inbox)
Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}") Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}")
{:error, e} {:error, e}
end end
end end
def publish_one(%{actor_id: actor_id} = params) do
actor = User.get_cached_by_id(actor_id)
params
|> Map.delete(:actor_id)
|> Map.put(:actor, actor)
|> publish_one()
end
defp signature_host(%URI{port: port, scheme: scheme, host: host}) do defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
if port == URI.default_port(scheme) do if port == URI.default_port(scheme) do
host host
@ -251,7 +255,6 @@ def determine_inbox(
def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
when is_list(bcc) and bcc != [] do when is_list(bcc) and bcc != [] do
public = public?(activity) public = public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
[priority_recipients, recipients] = recipients(actor, activity) [priority_recipients, recipients] = recipients(actor, activity)
@ -276,16 +279,10 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
# instance would only accept a first message for the first recipient and ignore the rest. # instance would only accept a first message for the first recipient and ignore the rest.
cc = get_cc_ap_ids(ap_id, recipients) cc = get_cc_ap_ids(ap_id, recipients)
json =
data
|> Map.put("cc", cc)
|> Jason.encode!()
__MODULE__.enqueue_one(%{ __MODULE__.enqueue_one(%{
inbox: inbox, inbox: inbox,
json: json, cc: cc,
actor_id: actor.id, activity_id: activity.id,
id: activity.data["id"],
unreachable_since: unreachable_since unreachable_since: unreachable_since
}) })
end) end)
@ -302,9 +299,6 @@ def publish(%User{} = actor, %Activity{} = activity) do
Relay.publish(activity) Relay.publish(activity)
end end
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
[priority_inboxes, inboxes] = [priority_inboxes, inboxes] =
recipients(actor, activity) recipients(actor, activity)
|> Enum.map(fn recipients -> |> Enum.map(fn recipients ->
@ -326,9 +320,7 @@ def publish(%User{} = actor, %Activity{} = activity) do
__MODULE__.enqueue_one( __MODULE__.enqueue_one(
%{ %{
inbox: inbox, inbox: inbox,
json: json, activity_id: activity.id,
actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since unreachable_since: unreachable_since
}, },
priority: priority priority: priority

View file

@ -714,11 +714,11 @@ def get_user(ap_id, fake_record_fallback \\ true) do
end end
end end
defp maybe_cancel_jobs(%Activity{data: %{"id" => ap_id}}) do defp maybe_cancel_jobs(%Activity{id: activity_id}) do
Oban.Job Oban.Job
|> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")
|> where([j], j.args["op"] == "publish_one") |> where([j], j.args["op"] == "publish_one")
|> where([j], j.args["params"]["id"] == ^ap_id) |> where([j], j.args["params"]["activity_id"] == ^activity_id)
|> Oban.cancel_all_jobs() |> Oban.cancel_all_jobs()
end end

View file

@ -0,0 +1,27 @@
defmodule Pleroma.Repo.Migrations.PublisherJobChange do
use Ecto.Migration
alias Pleroma.Activity
alias Pleroma.Repo
import Ecto.Query
def up do
query =
from(j in Oban.Job,
where: j.worker == "Pleroma.Workers.PublisherWorker",
where: j.state in ["available", "retryable"]
)
jobs = Repo.all(query)
Enum.each(jobs, fn job ->
args = job.args
activity = Activity.get_by_ap_id(args["id"])
updated_args = Map.put(args, "activity_id", activity.id)
Pleroma.Workers.PublisherWorker.new(updated_args)
|> Oban.insert()
end)
end
end

View file

@ -0,0 +1,43 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Repo.Migrations.PublisherMigrationChangeTest do
use Oban.Testing, repo: Pleroma.Repo
use Pleroma.DataCase
import Pleroma.Factory
import Pleroma.Tests.Helpers
alias Pleroma.Activity
alias Pleroma.Workers.PublisherWorker
setup_all do: require_migration("20240729163838_publisher_job_change")
describe "up/0" do
test "migrates publisher jobs to new format", %{migration: migration} do
user = insert(:user)
%Activity{id: activity_id, data: %{"id" => ap_id}} =
insert(:note_activity, user: user)
{:ok, %{id: job_id}} =
PublisherWorker.new(%{
"actor_id" => user.id,
"json" => "{}",
"id" => ap_id,
"inbox" => "https://example.com/inbox",
"unreachable_since" => nil
})
|> Oban.insert()
assert [%{id: ^job_id, args: %{"id" => ^ap_id}}] = all_enqueued(worker: PublisherWorker)
assert migration.up() == :ok
assert_enqueued(
worker: PublisherWorker,
args: %{"id" => ap_id, "activity_id" => activity_id}
)
end
end
end

View file

@ -137,6 +137,7 @@ test "it returns inbox for messages involving single recipients in total" do
test "publish to url with with different ports" do test "publish to url with with different ports" do
inbox80 = "http://42.site/users/nick1/inbox" inbox80 = "http://42.site/users/nick1/inbox"
inbox42 = "http://42.site:42/users/nick1/inbox" inbox42 = "http://42.site:42/users/nick1/inbox"
activity = insert(:note_activity)
mock(fn mock(fn
%{method: :post, url: "http://42.site:42/users/nick1/inbox"} -> %{method: :post, url: "http://42.site:42/users/nick1/inbox"} ->
@ -146,23 +147,19 @@ test "publish to url with with different ports" do
{:ok, %Tesla.Env{status: 200, body: "port 80"}} {:ok, %Tesla.Env{status: 200, body: "port 80"}}
end) end)
actor = insert(:user) _actor = insert(:user)
assert {:ok, %{body: "port 42"}} = assert {:ok, %{body: "port 42"}} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox42, inbox: inbox42,
json: "{}", activity_id: activity.id,
actor: actor,
id: 1,
unreachable_since: true unreachable_since: true
}) })
assert {:ok, %{body: "port 80"}} = assert {:ok, %{body: "port 80"}} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox80, inbox: inbox80,
json: "{}", activity_id: activity.id,
actor: actor,
id: 1,
unreachable_since: true unreachable_since: true
}) })
end end
@ -171,10 +168,13 @@ test "publish to url with with different ports" do
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
actor = insert(:user) _actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox" inbox = "http://200.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:ok, _} =
Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
assert {:ok, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})
assert called(Instances.set_reachable(inbox)) assert called(Instances.set_reachable(inbox))
end end
@ -182,15 +182,14 @@ test "publish to url with with different ports" do
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
actor = insert(:user) _actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox" inbox = "http://200.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:ok, _} = assert {:ok, _} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox, inbox: inbox,
json: "{}", activity_id: activity.id,
actor: actor,
id: 1,
unreachable_since: NaiveDateTime.utc_now() unreachable_since: NaiveDateTime.utc_now()
}) })
@ -201,15 +200,14 @@ test "publish to url with with different ports" do
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
actor = insert(:user) _actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox" inbox = "http://200.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:ok, _} = assert {:ok, _} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox, inbox: inbox,
json: "{}", activity_id: activity.id,
actor: actor,
id: 1,
unreachable_since: nil unreachable_since: nil
}) })
@ -220,11 +218,12 @@ test "publish to url with with different ports" do
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
actor = insert(:user) _actor = insert(:user)
inbox = "http://404.site/users/nick1/inbox" inbox = "http://404.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:cancel, _} = assert {:cancel, _} =
Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
assert called(Instances.set_unreachable(inbox)) assert called(Instances.set_unreachable(inbox))
end end
@ -233,12 +232,16 @@ test "publish to url with with different ports" do
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
actor = insert(:user) _actor = insert(:user)
inbox = "http://connrefused.site/users/nick1/inbox" inbox = "http://connrefused.site/users/nick1/inbox"
activity = insert(:note_activity)
assert capture_log(fn -> assert capture_log(fn ->
assert {:error, _} = assert {:error, _} =
Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) Publisher.publish_one(%{
inbox: inbox,
activity_id: activity.id
})
end) =~ "connrefused" end) =~ "connrefused"
assert called(Instances.set_unreachable(inbox)) assert called(Instances.set_unreachable(inbox))
@ -248,10 +251,12 @@ test "publish to url with with different ports" do
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
actor = insert(:user) _actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox" inbox = "http://200.site/users/nick1/inbox"
activity = insert(:note_activity)
assert {:ok, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) assert {:ok, _} =
Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
refute called(Instances.set_unreachable(inbox)) refute called(Instances.set_unreachable(inbox))
end end
@ -260,16 +265,15 @@ test "publish to url with with different ports" do
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
actor = insert(:user) _actor = insert(:user)
inbox = "http://connrefused.site/users/nick1/inbox" inbox = "http://connrefused.site/users/nick1/inbox"
activity = insert(:note_activity)
assert capture_log(fn -> assert capture_log(fn ->
assert {:error, _} = assert {:error, _} =
Publisher.publish_one(%{ Publisher.publish_one(%{
inbox: inbox, inbox: inbox,
json: "{}", activity_id: activity.id,
actor: actor,
id: 1,
unreachable_since: NaiveDateTime.utc_now() unreachable_since: NaiveDateTime.utc_now()
}) })
end) =~ "connrefused" end) =~ "connrefused"
@ -309,8 +313,7 @@ test "publish to url with with different ports" do
assert not called( assert not called(
Publisher.enqueue_one(%{ Publisher.enqueue_one(%{
inbox: "https://domain.com/users/nick1/inbox", inbox: "https://domain.com/users/nick1/inbox",
actor_id: actor.id, activity_id: note_activity.id
id: note_activity.data["id"]
}) })
) )
end end
@ -346,8 +349,7 @@ test "publish to url with with different ports" do
Publisher.enqueue_one( Publisher.enqueue_one(
%{ %{
inbox: "https://domain.com/users/nick1/inbox", inbox: "https://domain.com/users/nick1/inbox",
actor_id: actor.id, activity_id: note_activity.id
id: note_activity.data["id"]
}, },
priority: 1 priority: 1
) )
@ -370,8 +372,7 @@ test "publish to url with with different ports" do
Publisher.enqueue_one( Publisher.enqueue_one(
%{ %{
inbox: :_, inbox: :_,
actor_id: actor.id, activity_id: note_activity.id
id: note_activity.data["id"]
}, },
priority: 0 priority: 0
) )
@ -405,8 +406,7 @@ test "publish to url with with different ports" do
assert called( assert called(
Publisher.enqueue_one(%{ Publisher.enqueue_one(%{
inbox: "https://domain.com/users/nick1/inbox", inbox: "https://domain.com/users/nick1/inbox",
actor_id: actor.id, activity_id: note_activity.id
id: note_activity.data["id"]
}) })
) )
end end
@ -456,8 +456,7 @@ test "publish to url with with different ports" do
Publisher.enqueue_one( Publisher.enqueue_one(
%{ %{
inbox: "https://domain.com/users/nick1/inbox", inbox: "https://domain.com/users/nick1/inbox",
actor_id: actor.id, activity_id: delete.id
id: delete.data["id"]
}, },
priority: 1 priority: 1
) )
@ -467,8 +466,7 @@ test "publish to url with with different ports" do
Publisher.enqueue_one( Publisher.enqueue_one(
%{ %{
inbox: "https://domain2.com/users/nick1/inbox", inbox: "https://domain2.com/users/nick1/inbox",
actor_id: actor.id, activity_id: delete.id
id: delete.data["id"]
}, },
priority: 1 priority: 1
) )

View file

@ -1957,7 +1957,7 @@ test "when deleting posts", %{
{:ok, _, _} = Pleroma.User.follow(remote_one, local_user) {:ok, _, _} = Pleroma.User.follow(remote_one, local_user)
{:ok, _, _} = Pleroma.User.follow(remote_two, local_user) {:ok, _, _} = Pleroma.User.follow(remote_two, local_user)
{:ok, %{data: %{"id" => ap_id}} = activity} = {:ok, %{id: activity_id} = _activity} =
CommonAPI.post(local_user, %{status: "Happy Friday everyone!"}) CommonAPI.post(local_user, %{status: "Happy Friday everyone!"})
# Generate the publish_one jobs # Generate the publish_one jobs
@ -1971,7 +1971,7 @@ test "when deleting posts", %{
state: "available", state: "available",
queue: "federator_outgoing", queue: "federator_outgoing",
worker: "Pleroma.Workers.PublisherWorker", worker: "Pleroma.Workers.PublisherWorker",
args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} args: %{"op" => "publish_one", "params" => %{"activity_id" => ^activity_id}}
}, },
job job
) )
@ -1980,7 +1980,7 @@ test "when deleting posts", %{
assert length(publish_one_jobs) == 2 assert length(publish_one_jobs) == 2
# The delete should have triggered cancelling the publish_one jobs # The delete should have triggered cancelling the publish_one jobs
assert {:ok, _delete} = CommonAPI.delete(activity.id, local_user) assert {:ok, _delete} = CommonAPI.delete(activity_id, local_user)
# all_enqueued/1 will not return cancelled jobs # all_enqueued/1 will not return cancelled jobs
cancelled_jobs = cancelled_jobs =
@ -1988,7 +1988,7 @@ test "when deleting posts", %{
|> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")
|> where([j], j.state == "cancelled") |> where([j], j.state == "cancelled")
|> where([j], j.args["op"] == "publish_one") |> where([j], j.args["op"] == "publish_one")
|> where([j], j.args["params"]["id"] == ^ap_id) |> where([j], j.args["params"]["activity_id"] == ^activity_id)
|> Pleroma.Repo.all() |> Pleroma.Repo.all()
assert length(cancelled_jobs) == 2 assert length(cancelled_jobs) == 2
@ -2001,7 +2001,7 @@ test "when unfavoriting posts", %{
{:ok, activity} = {:ok, activity} =
CommonAPI.post(remote_user, %{status: "I like turtles!"}) CommonAPI.post(remote_user, %{status: "I like turtles!"})
{:ok, %{data: %{"id" => ap_id}} = _favorite} = {:ok, %{id: favorite_id} = _favorite} =
CommonAPI.favorite(activity.id, local_user) CommonAPI.favorite(activity.id, local_user)
# Generate the publish_one jobs # Generate the publish_one jobs
@ -2015,7 +2015,7 @@ test "when unfavoriting posts", %{
state: "available", state: "available",
queue: "federator_outgoing", queue: "federator_outgoing",
worker: "Pleroma.Workers.PublisherWorker", worker: "Pleroma.Workers.PublisherWorker",
args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} args: %{"op" => "publish_one", "params" => %{"activity_id" => ^favorite_id}}
}, },
job job
) )
@ -2032,7 +2032,7 @@ test "when unfavoriting posts", %{
|> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")
|> where([j], j.state == "cancelled") |> where([j], j.state == "cancelled")
|> where([j], j.args["op"] == "publish_one") |> where([j], j.args["op"] == "publish_one")
|> where([j], j.args["params"]["id"] == ^ap_id) |> where([j], j.args["params"]["activity_id"] == ^favorite_id)
|> Pleroma.Repo.all() |> Pleroma.Repo.all()
assert length(cancelled_jobs) == 1 assert length(cancelled_jobs) == 1
@ -2049,7 +2049,7 @@ test "when unboosting posts", %{
{:ok, activity} = {:ok, activity} =
CommonAPI.post(remote_one, %{status: "This is an unpleasant post"}) CommonAPI.post(remote_one, %{status: "This is an unpleasant post"})
{:ok, %{data: %{"id" => ap_id}} = _repeat} = {:ok, %{id: repeat_id} = _repeat} =
CommonAPI.repeat(activity.id, local_user) CommonAPI.repeat(activity.id, local_user)
# Generate the publish_one jobs # Generate the publish_one jobs
@ -2063,7 +2063,7 @@ test "when unboosting posts", %{
state: "available", state: "available",
queue: "federator_outgoing", queue: "federator_outgoing",
worker: "Pleroma.Workers.PublisherWorker", worker: "Pleroma.Workers.PublisherWorker",
args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} args: %{"op" => "publish_one", "params" => %{"activity_id" => ^repeat_id}}
}, },
job job
) )
@ -2080,7 +2080,7 @@ test "when unboosting posts", %{
|> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")
|> where([j], j.state == "cancelled") |> where([j], j.state == "cancelled")
|> where([j], j.args["op"] == "publish_one") |> where([j], j.args["op"] == "publish_one")
|> where([j], j.args["params"]["id"] == ^ap_id) |> where([j], j.args["params"]["activity_id"] == ^repeat_id)
|> Pleroma.Repo.all() |> Pleroma.Repo.all()
assert length(cancelled_jobs) == 2 assert length(cancelled_jobs) == 2
@ -2094,11 +2094,11 @@ test "when unreacting to posts", %{
{:ok, _, _} = Pleroma.User.follow(remote_one, local_user) {:ok, _, _} = Pleroma.User.follow(remote_one, local_user)
{:ok, _, _} = Pleroma.User.follow(remote_two, local_user) {:ok, _, _} = Pleroma.User.follow(remote_two, local_user)
{:ok, activity} = {:ok, %{id: activity_id}} =
CommonAPI.post(remote_one, %{status: "Gang gang!!!!"}) CommonAPI.post(remote_one, %{status: "Gang gang!!!!"})
{:ok, %{data: %{"id" => ap_id}} = _react} = {:ok, %{id: react_id} = _react} =
CommonAPI.react_with_emoji(activity.id, local_user, "👍") CommonAPI.react_with_emoji(activity_id, local_user, "👍")
# Generate the publish_one jobs # Generate the publish_one jobs
ObanHelpers.perform_all() ObanHelpers.perform_all()
@ -2111,7 +2111,7 @@ test "when unreacting to posts", %{
state: "available", state: "available",
queue: "federator_outgoing", queue: "federator_outgoing",
worker: "Pleroma.Workers.PublisherWorker", worker: "Pleroma.Workers.PublisherWorker",
args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} args: %{"op" => "publish_one", "params" => %{"activity_id" => ^react_id}}
}, },
job job
) )
@ -2120,7 +2120,7 @@ test "when unreacting to posts", %{
assert length(publish_one_jobs) == 2 assert length(publish_one_jobs) == 2
# The unreact should have triggered cancelling the publish_one jobs # The unreact should have triggered cancelling the publish_one jobs
assert {:ok, _unreact} = CommonAPI.unreact_with_emoji(activity.id, local_user, "👍") assert {:ok, _unreact} = CommonAPI.unreact_with_emoji(activity_id, local_user, "👍")
# all_enqueued/1 will not return cancelled jobs # all_enqueued/1 will not return cancelled jobs
cancelled_jobs = cancelled_jobs =
@ -2128,7 +2128,7 @@ test "when unreacting to posts", %{
|> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.worker == "Pleroma.Workers.PublisherWorker")
|> where([j], j.state == "cancelled") |> where([j], j.state == "cancelled")
|> where([j], j.args["op"] == "publish_one") |> where([j], j.args["op"] == "publish_one")
|> where([j], j.args["params"]["id"] == ^ap_id) |> where([j], j.args["params"]["activity_id"] == ^react_id)
|> Pleroma.Repo.all() |> Pleroma.Repo.all()
assert length(cancelled_jobs) == 2 assert length(cancelled_jobs) == 2