fix(fediverse): add "processing" and "failed" statuses to better manage broadcast load

fixes #511
This commit is contained in:
Yassine Doghri 2024-08-16 15:28:28 +00:00
parent d30c49cdff
commit 1d7583d738
4 changed files with 71 additions and 20 deletions

View File

@ -5,6 +5,7 @@ declare(strict_types=1);
namespace Modules\Fediverse\Commands; namespace Modules\Fediverse\Commands;
use CodeIgniter\CLI\BaseCommand; use CodeIgniter\CLI\BaseCommand;
use Exception;
use Override; use Override;
class Broadcast extends BaseCommand class Broadcast extends BaseCommand
@ -22,32 +23,49 @@ class Broadcast extends BaseCommand
// retrieve scheduled activities from database // retrieve scheduled activities from database
$scheduledActivities = model('ActivityModel', false) $scheduledActivities = model('ActivityModel', false)
->getScheduledActivities(); ->getScheduledActivities(10);
foreach ($scheduledActivities as $scheduledActivity) {
// set activity post to processing
model('ActivityModel', false)
->update($scheduledActivity->id, [
'status' => 'processing',
]);
}
// Send activity to all followers // Send activity to all followers
foreach ($scheduledActivities as $scheduledActivity) { foreach ($scheduledActivities as $scheduledActivity) {
if ($scheduledActivity->target_actor_id !== null) { try {
if ($scheduledActivity->actor_id !== $scheduledActivity->target_actor_id) { if ($scheduledActivity->target_actor_id !== null) {
// send activity to targeted actor if ($scheduledActivity->actor_id !== $scheduledActivity->target_actor_id) {
send_activity_to_actor( // send activity to targeted actor
send_activity_to_actor(
$scheduledActivity->actor,
$scheduledActivity->targetActor,
json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR)
);
}
} else {
// send activity to all actor followers
send_activity_to_followers(
$scheduledActivity->actor, $scheduledActivity->actor,
$scheduledActivity->targetActor, json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR),
json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR)
); );
} }
} else {
// send activity to all actor followers
send_activity_to_followers(
$scheduledActivity->actor,
json_encode($scheduledActivity->payload, JSON_THROW_ON_ERROR),
);
}
// set activity post to delivered // set activity post to delivered
model('ActivityModel', false) model('ActivityModel', false)
->update($scheduledActivity->id, [ ->update($scheduledActivity->id, [
'status' => 'delivered', 'status' => 'delivered',
]); ]);
} catch (Exception) {
// set activity post to delivered
model('ActivityModel', false)
->update($scheduledActivity->id, [
'status' => 'failed',
]);
}
} }
} }
} }

View File

@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
/**
* @copyright 2024 Ad Aures
* @license https://www.gnu.org/licenses/agpl-3.0.en.html AGPL3
* @link https://castopod.org/
*/
namespace Modules\Fediverse\Migrations;
use App\Database\Migrations\BaseMigration;
use Override;
class UpdateActivitiesStatus extends BaseMigration
{
#[Override]
public function up(): void
{
$fields = [
'status' => [
'type' => 'ENUM',
'constraint' => ['queued', 'processing', 'delivered', 'failed'],
'null' => true,
],
];
$this->forge->modifyColumn('fediverse_activities', $fields);
}
}

View File

@ -131,6 +131,7 @@ if (! function_exists('send_activity_to_followers')) {
*/ */
function send_activity_to_followers(Actor $actor, string $activityPayload): void function send_activity_to_followers(Actor $actor, string $activityPayload): void
{ {
// TODO: send activities in parallel with https://www.php.net/manual/en/function.curl-multi-init.php
foreach ($actor->followers as $follower) { foreach ($actor->followers as $follower) {
send_activity_to_actor($actor, $follower, $activityPayload); send_activity_to_actor($actor, $follower, $activityPayload);
} }

View File

@ -123,11 +123,12 @@ class ActivityModel extends UuidModel
/** /**
* @return Activity[] * @return Activity[]
*/ */
public function getScheduledActivities(): array public function getScheduledActivities(int $limit = 10): array
{ {
return $this->where('`scheduled_at` <= UTC_TIMESTAMP()', null, false) return $this->where('`scheduled_at` <= UTC_TIMESTAMP()', null, false)
->where('status', 'queued') ->where('status', 'queued')
->orderBy('scheduled_at', 'ASC') ->orderBy('scheduled_at', 'ASC')
->limit($limit)
->findAll(); ->findAll();
} }