Merge branch 'oban-overuse' into 'develop'

Expired activities as scheduled jobs in Oban

See merge request pleroma/pleroma!2916
This commit is contained in:
rinpatch 2020-09-10 19:17:15 +00:00
commit 6316350918
31 changed files with 481 additions and 433 deletions

View file

@ -12,8 +12,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Removed
- **Breaking:** Removed `Pleroma.Workers.Cron.StatsWorker` setting from Oban `:crontab`.
- **Breaking:** Removed `Pleroma.Workers.Cron.ClearOauthTokenWorker` setting from Oban `:crontab` config.
- **Breaking:** `Pleroma.Workers.Cron.StatsWorker` setting from Oban `:crontab` (moved to a simpler implementation).
- **Breaking:** `Pleroma.Workers.Cron.ClearOauthTokenWorker` setting from Oban `:crontab` (moved to scheduled jobs).
- **Breaking:** `Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker` setting from Oban `:crontab` (moved to scheduled jobs).
### Changed
- Minimum lifetime for ephmeral activities changed to 10 minutes and made configurable (`:min_lifetime` option).
## [2.1.1] - 2020-09-08

View file

@ -544,7 +544,6 @@
],
plugins: [Oban.Plugins.Pruner],
crontab: [
{"* * * * *", Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker},
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
{"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
]
@ -655,7 +654,7 @@
account_confirmation_resend: {8_640_000, 5},
ap_routes: {60_000, 15}
config :pleroma, Pleroma.ActivityExpiration, enabled: true
config :pleroma, Pleroma.Workers.PurgeExpiredActivity, enabled: true, min_lifetime: 600
config :pleroma, Pleroma.Plugs.RemoteIp, enabled: true

View file

@ -2290,7 +2290,6 @@
type: {:list, :tuple},
description: "Settings for cron background jobs",
suggestions: [
{"* * * * *", Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker},
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
{"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
]
@ -2473,14 +2472,20 @@
},
%{
group: :pleroma,
key: Pleroma.ActivityExpiration,
key: Pleroma.Workers.PurgeExpiredActivity,
type: :group,
description: "Expired activity settings",
description: "Expired activities settings",
children: [
%{
key: :enabled,
type: :boolean,
description: "Whether expired activities will be sent to the job queue to be deleted"
description: "Enables expired activities addition & deletion"
},
%{
key: :min_lifetime,
type: :integer,
description: "Minimum lifetime for ephemeral activity (in seconds)",
suggestions: [600]
}
]
},

View file

@ -1091,3 +1091,10 @@ config :pleroma, :frontends,
```
This would serve the frontend from the the folder at `$instance_static/frontends/pleroma/stable`. You have to copy the frontend into this folder yourself. You can choose the name and ref any way you like, but they will be used by mix tasks to automate installation in the future, the name referring to the project and the ref referring to a commit.
## Ephemeral activities (Pleroma.Workers.PurgeExpiredActivity)
Settings to enable and configure expiration for ephemeral activities
* `:enabled` - enables ephemeral activities creation
* `:min_lifetime` - minimum lifetime for ephemeral activities (in seconds). Default: 10 minutes.

View file

@ -133,8 +133,7 @@ def run(["ensure_expiration"]) do
days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
Pleroma.Activity
|> join(:left, [a], u in assoc(a, :expiration))
|> join(:inner, [a, _u], o in Object,
|> join(:inner, [a], o in Object,
on:
fragment(
"(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
@ -144,14 +143,20 @@ def run(["ensure_expiration"]) do
)
)
|> where(local: true)
|> where([a, u], is_nil(u))
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
|> where([_a, _u, o], fragment("?->>'type' = 'Note'", o.data))
|> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
|> Pleroma.RepoStreamer.chunk_stream(100)
|> Stream.each(fn activities ->
Enum.each(activities, fn activity ->
expires_at = Timex.shift(activity.inserted_at, days: days)
Pleroma.ActivityExpiration.create(activity, expires_at, false)
expires_at =
activity.inserted_at
|> DateTime.from_naive!("Etc/UTC")
|> Timex.shift(days: days)
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
activity_id: activity.id,
expires_at: expires_at
})
end)
end)
|> Stream.run()

View file

@ -7,7 +7,6 @@ defmodule Pleroma.Activity do
alias Pleroma.Activity
alias Pleroma.Activity.Queries
alias Pleroma.ActivityExpiration
alias Pleroma.Bookmark
alias Pleroma.Notification
alias Pleroma.Object
@ -60,8 +59,6 @@ defmodule Pleroma.Activity do
# typical case.
has_one(:object, Object, on_delete: :nothing, foreign_key: :id)
has_one(:expiration, ActivityExpiration, on_delete: :delete_all)
timestamps()
end
@ -304,14 +301,14 @@ def all_by_actor_and_id(actor, status_ids) do
|> Repo.all()
end
def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
def follow_requests_for_actor(%User{ap_id: ap_id}) do
ap_id
|> Queries.by_object_id()
|> Queries.by_type("Follow")
|> where([a], fragment("? ->> 'state' = 'pending'", a.data))
end
def following_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
def following_requests_for_actor(%User{ap_id: ap_id}) do
Queries.by_type("Follow")
|> where([a], fragment("?->>'state' = 'pending'", a.data))
|> where([a], a.actor == ^ap_id)

View file

@ -1,74 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ActivityExpiration do
use Ecto.Schema
alias Pleroma.Activity
alias Pleroma.ActivityExpiration
alias Pleroma.Repo
import Ecto.Changeset
import Ecto.Query
@type t :: %__MODULE__{}
@min_activity_lifetime :timer.hours(1)
schema "activity_expirations" do
belongs_to(:activity, Activity, type: FlakeId.Ecto.CompatType)
field(:scheduled_at, :naive_datetime)
end
def changeset(%ActivityExpiration{} = expiration, attrs, validate_scheduled_at) do
expiration
|> cast(attrs, [:scheduled_at])
|> validate_required([:scheduled_at])
|> validate_scheduled_at(validate_scheduled_at)
end
def get_by_activity_id(activity_id) do
ActivityExpiration
|> where([exp], exp.activity_id == ^activity_id)
|> Repo.one()
end
def create(%Activity{} = activity, scheduled_at, validate_scheduled_at \\ true) do
%ActivityExpiration{activity_id: activity.id}
|> changeset(%{scheduled_at: scheduled_at}, validate_scheduled_at)
|> Repo.insert()
end
def due_expirations(offset \\ 0) do
naive_datetime =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(offset, :millisecond)
ActivityExpiration
|> where([exp], exp.scheduled_at < ^naive_datetime)
|> limit(50)
|> preload(:activity)
|> Repo.all()
|> Enum.reject(fn %{activity: activity} ->
Activity.pinned_by_actor?(activity)
end)
end
def validate_scheduled_at(changeset, false), do: changeset
def validate_scheduled_at(changeset, true) do
validate_change(changeset, :scheduled_at, fn _, scheduled_at ->
if not expires_late_enough?(scheduled_at) do
[scheduled_at: "an ephemeral activity must live for at least one hour"]
else
[]
end
end)
end
def expires_late_enough?(scheduled_at) do
now = NaiveDateTime.utc_now()
diff = NaiveDateTime.diff(scheduled_at, now, :millisecond)
diff > @min_activity_lifetime
end
end

View file

@ -8,7 +8,7 @@ defmodule Pleroma.Config.DeprecationWarnings do
require Logger
alias Pleroma.Config
@type config_namespace() :: [atom()]
@type config_namespace() :: atom() | [atom()]
@type config_map() :: {config_namespace(), config_namespace(), String.t()}
@mrf_config_map [
@ -57,6 +57,7 @@ def warn do
check_media_proxy_whitelist_config()
check_welcome_message_config()
check_gun_pool_options()
check_activity_expiration_config()
end
def check_welcome_message_config do
@ -158,4 +159,20 @@ def check_gun_pool_options do
Config.put(:pools, updated_config)
end
end
@spec check_activity_expiration_config() :: :ok | nil
def check_activity_expiration_config do
warning_preface = """
!!!DEPRECATION WARNING!!!
Your config is using old namespace for activity expiration configuration. Setting should work for now, but you are advised to change to new namespace to prevent possible issues later:
"""
move_namespace_and_warn(
[
{Pleroma.ActivityExpiration, Pleroma.Workers.PurgeExpiredActivity,
"\n* `config :pleroma, Pleroma.ActivityExpiration` is now `config :pleroma, Pleroma.Workers.PurgeExpiredActivity`"}
],
warning_preface
)
end
end

View file

@ -5,7 +5,11 @@ def warn do
oban_config = Pleroma.Config.get(Oban)
crontab =
[Pleroma.Workers.Cron.StatsWorker, Pleroma.Workers.Cron.ClearOauthTokenWorker]
[
Pleroma.Workers.Cron.StatsWorker,
Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker,
Pleroma.Workers.Cron.ClearOauthTokenWorker
]
|> Enum.reduce(oban_config[:crontab], fn removed_worker, acc ->
with acc when is_list(acc) <- acc,
setting when is_tuple(setting) <-

View file

@ -2315,6 +2315,11 @@ def add_pinnned_activity(user, %Pleroma.Activity{id: id}) do
max_pinned_statuses = Config.get([:instance, :max_pinned_statuses], 0)
params = %{pinned_activities: user.pinned_activities ++ [id]}
# if pinned activity was scheduled for deletion, we remove job
if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(id) do
Oban.cancel_job(expiration.id)
end
user
|> cast(params, [:pinned_activities])
|> validate_length(:pinned_activities,
@ -2327,9 +2332,19 @@ def add_pinnned_activity(user, %Pleroma.Activity{id: id}) do
|> update_and_set_cache()
end
def remove_pinnned_activity(user, %Pleroma.Activity{id: id}) do
def remove_pinnned_activity(user, %Pleroma.Activity{id: id, data: data}) do
params = %{pinned_activities: List.delete(user.pinned_activities, id)}
# if pinned activity was scheduled for deletion, we reschedule it for deletion
if data["expires_at"] do
{:ok, expires_at, _} = DateTime.from_iso8601(data["expires_at"])
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
activity_id: id,
expires_at: expires_at
})
end
user
|> cast(params, [:pinned_activities])
|> update_and_set_cache()

View file

@ -5,7 +5,6 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
alias Pleroma.ActivityExpiration
alias Pleroma.Config
alias Pleroma.Constants
alias Pleroma.Conversation
@ -102,7 +101,9 @@ def persist(object, meta) do
local: local,
recipients: recipients,
actor: object["actor"]
}) do
}),
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
{:ok, _} <- maybe_create_activity_expiration(activity) do
{:ok, activity, meta}
end
end
@ -111,23 +112,14 @@ def persist(object, meta) do
def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
with nil <- Activity.normalize(map),
map <- lazy_put_activity_defaults(map, fake),
true <- bypass_actor_check || check_actor_is_active(map["actor"]),
{_, true} <- {:remote_limit_error, check_remote_limit(map)},
{_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
{_, true} <- {:remote_limit_pass, check_remote_limit(map)},
{:ok, map} <- MRF.filter(map),
{recipients, _, _} = get_recipients(map),
{:fake, false, map, recipients} <- {:fake, fake, map, recipients},
{:containment, :ok} <- {:containment, Containment.contain_child(map)},
{:ok, map, object} <- insert_full_object(map) do
{:ok, activity} =
%Activity{
data: map,
local: local,
actor: map["actor"],
recipients: recipients
}
|> Repo.insert()
|> maybe_create_activity_expiration()
{:ok, map, object} <- insert_full_object(map),
{:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
# Splice in the child object if we have one.
activity = Maps.put_if_present(activity, :object, object)
@ -138,6 +130,15 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
%Activity{} = activity ->
{:ok, activity}
{:actor_check, _} ->
{:error, false}
{:containment, _} = error ->
error
{:error, _} = error ->
error
{:fake, true, map, recipients} ->
activity = %Activity{
data: map,
@ -150,8 +151,24 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
{:ok, activity}
error ->
{:error, error}
{:remote_limit_pass, _} ->
{:error, :remote_limit}
{:reject, reason} ->
{:error, reason}
end
end
defp insert_activity_with_expiration(data, local, recipients) do
struct = %Activity{
data: data,
local: local,
actor: data["actor"],
recipients: recipients
}
with {:ok, activity} <- Repo.insert(struct) do
maybe_create_activity_expiration(activity)
end
end
@ -164,13 +181,19 @@ def notify_and_stream(activity) do
stream_out_participations(participations)
end
defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
defp maybe_create_activity_expiration(
%{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
) do
with {:ok, _job} <-
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
activity_id: activity.id,
expires_at: expires_at
}) do
{:ok, activity}
end
end
defp maybe_create_activity_expiration(result), do: result
defp maybe_create_activity_expiration(activity), do: {:ok, activity}
defp create_or_bump_conversation(activity, actor) do
with {:ok, conversation} <- Conversation.create_or_bump_for(activity),

View file

@ -31,10 +31,10 @@ defp note?(activity) do
defp maybe_add_expiration(activity) do
days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
expires_at = NaiveDateTime.utc_now() |> Timex.shift(days: days)
expires_at = DateTime.utc_now() |> Timex.shift(days: days)
with %{"expires_at" => existing_expires_at} <- activity,
:lt <- NaiveDateTime.compare(existing_expires_at, expires_at) do
:lt <- DateTime.compare(existing_expires_at, expires_at) do
activity
else
_ -> Map.put(activity, "expires_at", expires_at)

View file

@ -7,7 +7,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
"""
alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
alias Pleroma.ActivityExpiration
alias Pleroma.Chat
alias Pleroma.Chat.MessageReference
alias Pleroma.FollowingRelationship
@ -188,10 +187,6 @@ def handle(%{data: %{"type" => "Create"}} = activity, meta) do
Object.increase_replies_count(in_reply_to)
end
if expires_at = activity.data["expires_at"] do
ActivityExpiration.create(activity, expires_at)
end
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
meta =

View file

@ -202,7 +202,7 @@ defp changes(draft) do
additional =
case draft.expires_at do
%NaiveDateTime{} = expires_at -> Map.put(additional, "expires_at", expires_at)
%DateTime{} = expires_at -> Map.put(additional, "expires_at", expires_at)
_ -> additional
end

View file

@ -4,7 +4,6 @@
defmodule Pleroma.Web.CommonAPI do
alias Pleroma.Activity
alias Pleroma.ActivityExpiration
alias Pleroma.Conversation.Participation
alias Pleroma.Formatter
alias Pleroma.Object
@ -381,9 +380,9 @@ def get_replied_to_visibility(activity) do
def check_expiry_date({:ok, nil} = res), do: res
def check_expiry_date({:ok, in_seconds}) do
expiry = NaiveDateTime.utc_now() |> NaiveDateTime.add(in_seconds)
expiry = DateTime.add(DateTime.utc_now(), in_seconds)
if ActivityExpiration.expires_late_enough?(expiry) do
if Pleroma.Workers.PurgeExpiredActivity.expires_late_enough?(expiry) do
{:ok, expiry}
else
{:error, "Expiry date is too soon"}

View file

@ -8,7 +8,6 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
require Pleroma.Constants
alias Pleroma.Activity
alias Pleroma.ActivityExpiration
alias Pleroma.HTML
alias Pleroma.Object
alias Pleroma.Repo
@ -245,8 +244,8 @@ def render("show.json", %{activity: %{data: %{"object" => _object}} = activity}
expires_at =
with true <- client_posted_this_activity,
%ActivityExpiration{scheduled_at: scheduled_at} <-
ActivityExpiration.get_by_activity_id(activity.id) do
%Oban.Job{scheduled_at: scheduled_at} <-
Pleroma.Workers.PurgeExpiredActivity.get_expiration(activity.id) do
scheduled_at
else
_ -> nil

View file

@ -1,48 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do
@moduledoc """
The worker to purge expired activities.
"""
use Oban.Worker, queue: "activity_expiration"
alias Pleroma.Activity
alias Pleroma.ActivityExpiration
alias Pleroma.Config
alias Pleroma.User
alias Pleroma.Web.CommonAPI
require Logger
@interval :timer.minutes(1)
@impl Oban.Worker
def perform(_job) do
if Config.get([ActivityExpiration, :enabled]) do
Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)
end
after
:ok
end
def delete_activity(%ActivityExpiration{activity_id: activity_id}) do
with {:activity, %Activity{} = activity} <-
{:activity, Activity.get_by_id_with_object(activity_id)},
{:user, %User{} = user} <- {:user, User.get_by_ap_id(activity.object.data["actor"])} do
CommonAPI.delete(activity.id, user)
else
{:activity, _} ->
Logger.error(
"#{__MODULE__} Couldn't delete expired activity: not found activity ##{activity_id}"
)
{:user, _} ->
Logger.error(
"#{__MODULE__} Couldn't delete expired activity: not found actor of ##{activity_id}"
)
end
end
end

View file

@ -0,0 +1,72 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.PurgeExpiredActivity do
@moduledoc """
Worker which purges expired activity.
"""
use Oban.Worker, queue: :activity_expiration, max_attempts: 1
import Ecto.Query
alias Pleroma.Activity
@spec enqueue(map()) ::
{:ok, Oban.Job.t()}
| {:error, :expired_activities_disabled}
| {:error, :expiration_too_close}
def enqueue(args) do
with true <- enabled?() do
{scheduled_at, args} = Map.pop(args, :expires_at)
args
|> new(scheduled_at: scheduled_at)
|> Oban.insert()
end
end
@impl true
def perform(%Oban.Job{args: %{"activity_id" => id}}) do
with %Activity{} = activity <- find_activity(id),
%Pleroma.User{} = user <- find_user(activity.object.data["actor"]) do
Pleroma.Web.CommonAPI.delete(activity.id, user)
end
end
defp enabled? do
with false <- Pleroma.Config.get([__MODULE__, :enabled], false) do
{:error, :expired_activities_disabled}
end
end
defp find_activity(id) do
with nil <- Activity.get_by_id_with_object(id) do
{:error, :activity_not_found}
end
end
defp find_user(ap_id) do
with nil <- Pleroma.User.get_by_ap_id(ap_id) do
{:error, :user_not_found}
end
end
def get_expiration(id) do
from(j in Oban.Job,
where: j.state == "scheduled",
where: j.queue == "activity_expiration",
where: fragment("?->>'activity_id' = ?", j.args, ^id)
)
|> Pleroma.Repo.one()
end
@spec expires_late_enough?(DateTime.t()) :: boolean()
def expires_late_enough?(scheduled_at) do
now = DateTime.utc_now()
diff = DateTime.diff(scheduled_at, now, :millisecond)
min_lifetime = Pleroma.Config.get([__MODULE__, :min_lifetime], 600)
diff > :timer.seconds(min_lifetime)
end
end

View file

@ -0,0 +1,13 @@
defmodule Pleroma.Repo.Migrations.RenameActivityExpirationSetting do
use Ecto.Migration
def change do
config = Pleroma.ConfigDB.get_by_params(%{group: :pleroma, key: Pleroma.ActivityExpiration})
if config do
config
|> Ecto.Changeset.change(key: Pleroma.Workers.PurgeExpiredActivity)
|> Pleroma.Repo.update()
end
end
end

View file

@ -0,0 +1,26 @@
defmodule Pleroma.Repo.Migrations.MoveActivityExpirationsToOban do
use Ecto.Migration
import Ecto.Query, only: [from: 2]
def change do
Supervisor.start_link([{Oban, Pleroma.Config.get(Oban)}],
strategy: :one_for_one,
name: Pleroma.Supervisor
)
from(e in "activity_expirations",
select: %{id: e.id, activity_id: e.activity_id, scheduled_at: e.scheduled_at}
)
|> Pleroma.Repo.stream()
|> Stream.each(fn expiration ->
with {:ok, expires_at} <- DateTime.from_naive(expiration.scheduled_at, "Etc/UTC") do
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
activity_id: FlakeId.to_string(expiration.activity_id),
expires_at: expires_at
})
end
end)
|> Stream.run()
end
end

View file

@ -0,0 +1,7 @@
defmodule Pleroma.Repo.Migrations.DropActivityExpirationsTable do
use Ecto.Migration
def change do
drop(table("activity_expirations"))
end
end

View file

@ -1,55 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ActivityExpirationTest do
use Pleroma.DataCase
alias Pleroma.ActivityExpiration
import Pleroma.Factory
setup do: clear_config([ActivityExpiration, :enabled])
test "finds activities due to be deleted only" do
activity = insert(:note_activity)
expiration_due =
insert(:expiration_in_the_past, %{activity_id: activity.id}) |> Repo.preload(:activity)
activity2 = insert(:note_activity)
insert(:expiration_in_the_future, %{activity_id: activity2.id})
expirations = ActivityExpiration.due_expirations()
assert length(expirations) == 1
assert hd(expirations) == expiration_due
end
test "denies expirations that don't live long enough" do
activity = insert(:note_activity)
now = NaiveDateTime.utc_now()
assert {:error, _} = ActivityExpiration.create(activity, now)
end
test "deletes an expiration activity" do
Pleroma.Config.put([ActivityExpiration, :enabled], true)
activity = insert(:note_activity)
naive_datetime =
NaiveDateTime.add(
NaiveDateTime.utc_now(),
-:timer.minutes(2),
:millisecond
)
expiration =
insert(
:expiration_in_the_past,
%{activity_id: activity.id, scheduled_at: naive_datetime}
)
Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker.perform(%Oban.Job{})
refute Pleroma.Repo.get(Pleroma.Activity, activity.id)
refute Pleroma.Repo.get(Pleroma.ActivityExpiration, expiration.id)
end
end

View file

@ -185,15 +185,6 @@ test "find all statuses for unauthenticated users when `limit_to_local_content`
end
end
test "add an activity with an expiration" do
activity = insert(:note_activity)
insert(:expiration_in_the_future, %{activity_id: activity.id})
Pleroma.ActivityExpiration
|> where([a], a.activity_id == ^activity.id)
|> Repo.one!()
end
test "all_by_ids_with_object/1" do
%{id: id1} = insert(:note_activity)
%{id: id2} = insert(:note_activity)

View file

@ -200,25 +200,6 @@ def note_activity_factory(attrs \\ %{}) do
|> Map.merge(attrs)
end
defp expiration_offset_by_minutes(attrs, minutes) do
scheduled_at =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(:timer.minutes(minutes), :millisecond)
|> NaiveDateTime.truncate(:second)
%Pleroma.ActivityExpiration{}
|> Map.merge(attrs)
|> Map.put(:scheduled_at, scheduled_at)
end
def expiration_in_the_past_factory(attrs \\ %{}) do
expiration_offset_by_minutes(attrs, -60)
end
def expiration_in_the_future_factory(attrs \\ %{}) do
expiration_offset_by_minutes(attrs, 61)
end
def article_activity_factory do
article = insert(:article)

View file

@ -3,14 +3,15 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.DatabaseTest do
use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Activity
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
use Pleroma.DataCase
import Pleroma.Factory
setup_all do
@ -130,40 +131,45 @@ test "it turns OrderedCollection likes into empty arrays" do
describe "ensure_expiration" do
test "it adds to expiration old statuses" do
%{id: activity_id1} = insert(:note_activity)
activity1 = insert(:note_activity)
%{id: activity_id2} =
insert(:note_activity, %{inserted_at: NaiveDateTime.from_iso8601!("2015-01-23 23:50:07")})
{:ok, inserted_at, 0} = DateTime.from_iso8601("2015-01-23T23:50:07Z")
activity2 = insert(:note_activity, %{inserted_at: inserted_at})
%{id: activity_id3} = activity3 = insert(:note_activity)
%{id: activity_id3} = insert(:note_activity)
expires_at =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(60 * 61, :second)
|> NaiveDateTime.truncate(:second)
expires_at = DateTime.add(DateTime.utc_now(), 60 * 61)
Pleroma.ActivityExpiration.create(activity3, expires_at)
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
activity_id: activity_id3,
expires_at: expires_at
})
Mix.Tasks.Pleroma.Database.run(["ensure_expiration"])
expirations =
Pleroma.ActivityExpiration
|> order_by(:activity_id)
|> Repo.all()
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: activity1.id},
scheduled_at:
activity1.inserted_at
|> DateTime.from_naive!("Etc/UTC")
|> Timex.shift(days: 365)
)
assert [
%Pleroma.ActivityExpiration{
activity_id: ^activity_id1
},
%Pleroma.ActivityExpiration{
activity_id: ^activity_id2,
scheduled_at: ~N[2016-01-23 23:50:07]
},
%Pleroma.ActivityExpiration{
activity_id: ^activity_id3,
scheduled_at: ^expires_at
}
] = expirations
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: activity2.id},
scheduled_at:
activity2.inserted_at
|> DateTime.from_naive!("Etc/UTC")
|> Timex.shift(days: 365)
)
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: activity_id3},
scheduled_at: expires_at
)
end
end
end

View file

@ -239,7 +239,7 @@ test "drops activities beyond a certain limit" do
}
}
assert {:error, {:remote_limit_error, _}} = ActivityPub.insert(data)
assert {:error, :remote_limit} = ActivityPub.insert(data)
end
test "doesn't drop activities with content being null" do
@ -386,9 +386,11 @@ test "can be fetched into a timeline" do
end
describe "create activities" do
test "it reverts create" do
user = insert(:user)
setup do
[user: insert(:user)]
end
test "it reverts create", %{user: user} do
with_mock(Utils, [:passthrough], maybe_federate: fn _ -> {:error, :reverted} end) do
assert {:error, :reverted} =
ActivityPub.create(%{
@ -407,9 +409,47 @@ test "it reverts create" do
assert Repo.aggregate(Object, :count, :id) == 0
end
test "removes doubled 'to' recipients" do
user = insert(:user)
test "creates activity if expiration is not configured and expires_at is not passed", %{
user: user
} do
clear_config([Pleroma.Workers.PurgeExpiredActivity, :enabled], false)
assert {:ok, _} =
ActivityPub.create(%{
to: ["user1", "user2"],
actor: user,
context: "",
object: %{
"to" => ["user1", "user2"],
"type" => "Note",
"content" => "testing"
}
})
end
test "rejects activity if expires_at present but expiration is not configured", %{user: user} do
clear_config([Pleroma.Workers.PurgeExpiredActivity, :enabled], false)
assert {:error, :expired_activities_disabled} =
ActivityPub.create(%{
to: ["user1", "user2"],
actor: user,
context: "",
object: %{
"to" => ["user1", "user2"],
"type" => "Note",
"content" => "testing"
},
additional: %{
"expires_at" => DateTime.utc_now()
}
})
assert Repo.aggregate(Activity, :count, :id) == 0
assert Repo.aggregate(Object, :count, :id) == 0
end
test "removes doubled 'to' recipients", %{user: user} do
{:ok, activity} =
ActivityPub.create(%{
to: ["user1", "user1", "user2"],
@ -427,9 +467,7 @@ test "removes doubled 'to' recipients" do
assert activity.recipients == ["user1", "user2", user.ap_id]
end
test "increases user note count only for public activities" do
user = insert(:user)
test "increases user note count only for public activities", %{user: user} do
{:ok, _} =
CommonAPI.post(User.get_cached_by_id(user.id), %{
status: "1",
@ -458,8 +496,7 @@ test "increases user note count only for public activities" do
assert user.note_count == 2
end
test "increases replies count" do
user = insert(:user)
test "increases replies count", %{user: user} do
user2 = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{status: "1", visibility: "public"})
@ -2069,18 +2106,25 @@ test "it just returns the input if the user has no following/follower addresses"
end
describe "global activity expiration" do
setup do: clear_config([:mrf, :policies])
test "creates an activity expiration for local Create activities" do
Pleroma.Config.put(
[:mrf, :policies],
Pleroma.Web.ActivityPub.MRF.ActivityExpirationPolicy
clear_config([:mrf, :policies], Pleroma.Web.ActivityPub.MRF.ActivityExpirationPolicy)
{:ok, activity} = ActivityBuilder.insert(%{"type" => "Create", "context" => "3hu"})
{:ok, follow} = ActivityBuilder.insert(%{"type" => "Follow", "context" => "3hu"})
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: activity.id},
scheduled_at:
activity.inserted_at
|> DateTime.from_naive!("Etc/UTC")
|> Timex.shift(days: 365)
)
{:ok, %{id: id_create}} = ActivityBuilder.insert(%{"type" => "Create", "context" => "3hu"})
{:ok, _follow} = ActivityBuilder.insert(%{"type" => "Follow", "context" => "3hu"})
assert [%{activity_id: ^id_create}] = Pleroma.ActivityExpiration |> Repo.all()
refute_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: follow.id}
)
end
end

View file

@ -18,11 +18,11 @@ test "adds `expires_at` property" do
"object" => %{"type" => "Note"}
})
assert Timex.diff(expires_at, NaiveDateTime.utc_now(), :days) == 364
assert Timex.diff(expires_at, DateTime.utc_now(), :days) == 364
end
test "keeps existing `expires_at` if it less than the config setting" do
expires_at = NaiveDateTime.utc_now() |> Timex.shift(days: 1)
expires_at = DateTime.utc_now() |> Timex.shift(days: 1)
assert {:ok, %{"type" => "Create", "expires_at" => ^expires_at}} =
ActivityExpirationPolicy.filter(%{
@ -35,7 +35,7 @@ test "keeps existing `expires_at` if it less than the config setting" do
end
test "overwrites existing `expires_at` if it greater than the config setting" do
too_distant_future = NaiveDateTime.utc_now() |> Timex.shift(years: 2)
too_distant_future = DateTime.utc_now() |> Timex.shift(years: 2)
assert {:ok, %{"type" => "Create", "expires_at" => expires_at}} =
ActivityExpirationPolicy.filter(%{
@ -46,7 +46,7 @@ test "overwrites existing `expires_at` if it greater than the config setting" do
"object" => %{"type" => "Note"}
})
assert Timex.diff(expires_at, NaiveDateTime.utc_now(), :days) == 364
assert Timex.diff(expires_at, DateTime.utc_now(), :days) == 364
end
test "ignores remote activities" do

View file

@ -4,6 +4,8 @@
defmodule Pleroma.Web.CommonAPITest do
use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Activity
alias Pleroma.Chat
alias Pleroma.Conversation.Participation
@ -598,15 +600,15 @@ test "it validates character limits are correctly enforced" do
test "it can handle activities that expire" do
user = insert(:user)
expires_at =
NaiveDateTime.utc_now()
|> NaiveDateTime.truncate(:second)
|> NaiveDateTime.add(1_000_000, :second)
expires_at = DateTime.add(DateTime.utc_now(), 1_000_000)
assert {:ok, activity} = CommonAPI.post(user, %{status: "chai", expires_in: 1_000_000})
assert expiration = Pleroma.ActivityExpiration.get_by_activity_id(activity.id)
assert expiration.scheduled_at == expires_at
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: activity.id},
scheduled_at: expires_at
)
end
end

View file

@ -4,9 +4,9 @@
defmodule Pleroma.Web.MastodonAPI.StatusControllerTest do
use Pleroma.Web.ConnCase
use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Activity
alias Pleroma.ActivityExpiration
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Object
@ -29,8 +29,8 @@ defmodule Pleroma.Web.MastodonAPI.StatusControllerTest do
setup do: oauth_access(["write:statuses"])
test "posting a status does not increment reblog_count when relaying", %{conn: conn} do
Pleroma.Config.put([:instance, :federating], true)
Pleroma.Config.get([:instance, :allow_relay], true)
Config.put([:instance, :federating], true)
Config.get([:instance, :allow_relay], true)
response =
conn
@ -103,7 +103,9 @@ test "posting a status", %{conn: conn} do
# An activity that will expire:
# 2 hours
expires_in = 120 * 60
expires_in = 2 * 60 * 60
expires_at = DateTime.add(DateTime.utc_now(), expires_in)
conn_four =
conn
@ -113,29 +115,22 @@ test "posting a status", %{conn: conn} do
"expires_in" => expires_in
})
assert fourth_response =
%{"id" => fourth_id} = json_response_and_validate_schema(conn_four, 200)
assert %{"id" => fourth_id} = json_response_and_validate_schema(conn_four, 200)
assert activity = Activity.get_by_id(fourth_id)
assert expiration = ActivityExpiration.get_by_activity_id(fourth_id)
assert Activity.get_by_id(fourth_id)
estimated_expires_at =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(expires_in)
|> NaiveDateTime.truncate(:second)
# This assert will fail if the test takes longer than a minute. I sure hope it never does:
assert abs(NaiveDateTime.diff(expiration.scheduled_at, estimated_expires_at, :second)) < 60
assert fourth_response["pleroma"]["expires_at"] ==
NaiveDateTime.to_iso8601(expiration.scheduled_at)
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: fourth_id},
scheduled_at: expires_at
)
end
test "it fails to create a status if `expires_in` is less or equal than an hour", %{
conn: conn
} do
# 1 hour
expires_in = 60 * 60
# 1 minute
expires_in = 1 * 60
assert %{"error" => "Expiry date is too soon"} =
conn
@ -146,8 +141,8 @@ test "it fails to create a status if `expires_in` is less or equal than an hour"
})
|> json_response_and_validate_schema(422)
# 30 minutes
expires_in = 30 * 60
# 5 minutes
expires_in = 5 * 60
assert %{"error" => "Expiry date is too soon"} =
conn
@ -160,8 +155,8 @@ test "it fails to create a status if `expires_in` is less or equal than an hour"
end
test "Get MRF reason when posting a status is rejected by one", %{conn: conn} do
Pleroma.Config.put([:mrf_keyword, :reject], ["GNO"])
Pleroma.Config.put([:mrf, :policies], [Pleroma.Web.ActivityPub.MRF.KeywordPolicy])
Config.put([:mrf_keyword, :reject], ["GNO"])
Config.put([:mrf, :policies], [Pleroma.Web.ActivityPub.MRF.KeywordPolicy])
assert %{"error" => "[KeywordPolicy] Matches with rejected keyword"} =
conn
@ -1146,6 +1141,52 @@ test "max pinned statuses", %{conn: conn, user: user, activity: activity_one} do
|> post("/api/v1/statuses/#{activity_two.id}/pin")
|> json_response_and_validate_schema(400)
end
test "on pin removes deletion job, on unpin reschedule deletion" do
%{conn: conn} = oauth_access(["write:accounts", "write:statuses"])
expires_in = 2 * 60 * 60
expires_at = DateTime.add(DateTime.utc_now(), expires_in)
assert %{"id" => id} =
conn
|> put_req_header("content-type", "application/json")
|> post("api/v1/statuses", %{
"status" => "oolong",
"expires_in" => expires_in
})
|> json_response_and_validate_schema(200)
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: id},
scheduled_at: expires_at
)
assert %{"id" => ^id, "pinned" => true} =
conn
|> put_req_header("content-type", "application/json")
|> post("/api/v1/statuses/#{id}/pin")
|> json_response_and_validate_schema(200)
refute_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: id},
scheduled_at: expires_at
)
assert %{"id" => ^id, "pinned" => false} =
conn
|> put_req_header("content-type", "application/json")
|> post("/api/v1/statuses/#{id}/unpin")
|> json_response_and_validate_schema(200)
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: id},
scheduled_at: expires_at
)
end
end
describe "cards" do
@ -1681,19 +1722,17 @@ test "returns the favorites of a user" do
test "expires_at is nil for another user" do
%{conn: conn, user: user} = oauth_access(["read:statuses"])
expires_at = DateTime.add(DateTime.utc_now(), 1_000_000)
{:ok, activity} = CommonAPI.post(user, %{status: "foobar", expires_in: 1_000_000})
expires_at =
activity.id
|> ActivityExpiration.get_by_activity_id()
|> Map.get(:scheduled_at)
|> NaiveDateTime.to_iso8601()
assert %{"pleroma" => %{"expires_at" => ^expires_at}} =
assert %{"pleroma" => %{"expires_at" => a_expires_at}} =
conn
|> get("/api/v1/statuses/#{activity.id}")
|> json_response_and_validate_schema(:ok)
{:ok, a_expires_at, 0} = DateTime.from_iso8601(a_expires_at)
assert DateTime.diff(expires_at, a_expires_at) == 0
%{conn: conn} = oauth_access(["read:statuses"])
assert %{"pleroma" => %{"expires_at" => nil}} =

View file

@ -1,84 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorkerTest do
use Pleroma.DataCase
alias Pleroma.ActivityExpiration
alias Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker
import Pleroma.Factory
import ExUnit.CaptureLog
setup do
clear_config([ActivityExpiration, :enabled])
end
test "deletes an expiration activity" do
Pleroma.Config.put([ActivityExpiration, :enabled], true)
activity = insert(:note_activity)
naive_datetime =
NaiveDateTime.add(
NaiveDateTime.utc_now(),
-:timer.minutes(2),
:millisecond
)
expiration =
insert(
:expiration_in_the_past,
%{activity_id: activity.id, scheduled_at: naive_datetime}
)
Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker.perform(%Oban.Job{})
refute Pleroma.Repo.get(Pleroma.Activity, activity.id)
refute Pleroma.Repo.get(Pleroma.ActivityExpiration, expiration.id)
end
test "works with ActivityExpirationPolicy" do
Pleroma.Config.put([ActivityExpiration, :enabled], true)
clear_config([:mrf, :policies], Pleroma.Web.ActivityPub.MRF.ActivityExpirationPolicy)
user = insert(:user)
days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
{:ok, %{id: id} = activity} = Pleroma.Web.CommonAPI.post(user, %{status: "cofe"})
past_date =
NaiveDateTime.utc_now() |> Timex.shift(days: -days) |> NaiveDateTime.truncate(:second)
activity
|> Repo.preload(:expiration)
|> Map.get(:expiration)
|> Ecto.Changeset.change(%{scheduled_at: past_date})
|> Repo.update!()
Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker.perform(%Oban.Job{})
assert [%{data: %{"type" => "Delete", "deleted_activity_id" => ^id}}] =
Pleroma.Repo.all(Pleroma.Activity)
end
describe "delete_activity/1" do
test "adds log message if activity isn't find" do
assert capture_log([level: :error], fn ->
PurgeExpiredActivitiesWorker.delete_activity(%ActivityExpiration{
activity_id: "test-activity"
})
end) =~ "Couldn't delete expired activity: not found activity"
end
test "adds log message if actor isn't find" do
assert capture_log([level: :error], fn ->
PurgeExpiredActivitiesWorker.delete_activity(%ActivityExpiration{
activity_id: "test-activity"
})
end) =~ "Couldn't delete expired activity: not found activity"
end
end
end

View file

@ -0,0 +1,59 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.PurgeExpiredActivityTest do
use Pleroma.DataCase, async: true
use Oban.Testing, repo: Pleroma.Repo
import Pleroma.Factory
alias Pleroma.Workers.PurgeExpiredActivity
test "enqueue job" do
activity = insert(:note_activity)
assert {:ok, _} =
PurgeExpiredActivity.enqueue(%{
activity_id: activity.id,
expires_at: DateTime.add(DateTime.utc_now(), 3601)
})
assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity,
args: %{activity_id: activity.id}
)
assert {:ok, _} =
perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: activity.id})
assert %Oban.Job{} = Pleroma.Workers.PurgeExpiredActivity.get_expiration(activity.id)
end
test "error if user was not found" do
activity = insert(:note_activity)
assert {:ok, _} =
PurgeExpiredActivity.enqueue(%{
activity_id: activity.id,
expires_at: DateTime.add(DateTime.utc_now(), 3601)
})
user = Pleroma.User.get_by_ap_id(activity.actor)
Pleroma.Repo.delete(user)
assert {:error, :user_not_found} =
perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: activity.id})
end
test "error if actiivity was not found" do
assert {:ok, _} =
PurgeExpiredActivity.enqueue(%{
activity_id: "some_id",
expires_at: DateTime.add(DateTime.utc_now(), 3601)
})
assert {:error, :activity_not_found} =
perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"})
end
end