Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox: Improve batch handling #1286

Open
wants to merge 21 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions includes/class-activitypub.php
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,17 @@ private static function register_post_types() {
)
);

\register_post_meta(
Outbox::POST_TYPE,
'_activitypub_object_id',
array(
'type' => 'string',
'single' => true,
'description' => 'The ID (ActivityPub URI) of the object that the outbox item is about.',
'sanitize_callback' => 'sanitize_url',
)
);

\register_post_meta(
Outbox::POST_TYPE,
'activitypub_content_visibility',
Expand Down
18 changes: 9 additions & 9 deletions includes/class-dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public static function process_outbox( $id ) {
'activitypub_async_batch',
array(
self::$callback,
$actor->get__id(),
$outbox_item->ID,
self::$batch_size,
\get_post_meta( $outbox_item->ID, '_activitypub_outbox_offset', true ) ?: 0, // phpcs:ignore
Expand All @@ -104,26 +103,27 @@ public static function process_outbox( $id ) {
} else {
// No followers to process for this update. We're done.
\wp_publish_post( $outbox_item );
\delete_post_meta( $outbox_item->ID, '_activitypub_outbox_offset' );
}
}

/**
* Asynchronously runs batch processing routines.
*
* @param int $actor_id The actor ID.
* @param int $outbox_item_id The Outbox item ID.
* @param int $batch_size Optional. The batch size. Default 50.
* @param int $offset Optional. The offset. Default 0.
*
* @return array|void The next batch of followers to process, or void if done.
*/
public static function send_to_followers( $actor_id, $outbox_item_id, $batch_size = 50, $offset = 0 ) {
public static function send_to_followers( $outbox_item_id, $batch_size = 50, $offset = 0 ) {
$activity = self::get_activity( $outbox_item_id );
$actor = self::get_actor( \get_post( $outbox_item_id ) );
$json = $activity->to_json();
$inboxes = Followers::get_inboxes_for_activity( $json, $actor_id, $batch_size, $offset );
$inboxes = Followers::get_inboxes_for_activity( $json, $actor->get__id(), $batch_size, $offset );

foreach ( $inboxes as $inbox ) {
$result = safe_remote_post( $inbox, $json, $actor_id );
$result = safe_remote_post( $inbox, $json, $actor->get__id() );

/**
* Fires after an Activity has been sent to an inbox.
Expand All @@ -134,7 +134,7 @@ public static function send_to_followers( $actor_id, $outbox_item_id, $batch_siz
* @param int $actor_id The actor ID.
* @param int $outbox_item_id The Outbox item ID.
*/
\do_action( 'activitypub_sent_to_inbox', $result, $inbox, $json, $actor_id, $outbox_item_id );
\do_action( 'activitypub_sent_to_inbox', $result, $inbox, $json, $actor->get__id(), $outbox_item_id );
}

if ( is_countable( $inboxes ) && count( $inboxes ) < self::$batch_size ) {
Expand All @@ -150,7 +150,7 @@ public static function send_to_followers( $actor_id, $outbox_item_id, $batch_siz
* @param int $batch_size The batch size.
* @param int $offset The offset.
*/
\do_action( 'activitypub_outbox_processing_complete', $inboxes, $json, $actor_id, $outbox_item_id, $batch_size, $offset );
\do_action( 'activitypub_outbox_processing_complete', $inboxes, $json, $actor->get__id(), $outbox_item_id, $batch_size, $offset );

// No more followers to process for this update.
\wp_publish_post( $outbox_item_id );
Expand All @@ -167,9 +167,9 @@ public static function send_to_followers( $actor_id, $outbox_item_id, $batch_siz
* @param int $batch_size The batch size.
* @param int $offset The offset.
*/
\do_action( 'activitypub_outbox_processing_batch_complete', $inboxes, $json, $actor_id, $outbox_item_id, $batch_size, $offset );
\do_action( 'activitypub_outbox_processing_batch_complete', $inboxes, $json, $actor->get__id(), $outbox_item_id, $batch_size, $offset );

return array( $actor_id, $outbox_item_id, $batch_size, $offset + $batch_size );
return array( $outbox_item_id, $batch_size, $offset + $batch_size );
}
}

Expand Down
54 changes: 53 additions & 1 deletion includes/collection/class-outbox.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@ public static function add( $activity_object, $activity_type, $user_id, $content

$outbox_item = array(
'post_type' => self::POST_TYPE,
'post_title' => $activity_object->get_id(),
'post_title' => sprintf(
/* translators: 1. Activity type, 2. Object type, 3. Object Title or Excerpt */
__( '[%1$s] %2$s: %3$s', 'activitypub' ),
$activity_type,
$activity_object->get_type(),
\wp_trim_words( $activity_object->get_name() ?? $activity_object->get_content(), 5 )
),
'post_content' => wp_slash( $activity_object->to_json() ),
// ensure that user ID is not below 0.
'post_author' => \max( $user_id, 0 ),
'post_status' => 'pending',
'meta_input' => array(
'_activitypub_object_id' => $activity_object->get_id(),
'_activitypub_activity_type' => $activity_type,
'_activitypub_activity_actor' => $actor_type,
'activitypub_content_visibility' => $content_visibility,
Expand All @@ -72,6 +79,51 @@ public static function add( $activity_object, $activity_type, $user_id, $content
return false;
}

self::invalidate_existing_items( $activity_object->get_id(), $activity_type, $id );

return $id;
}

/**
* Invalidate existing outbox items with the same activity type and object ID
* by setting their status to 'publish'.
*
* @param string $object_id The ID of the activity object.
* @param string $activity_type The type of the activity.
* @param int $current_id The ID of the current outbox item to exclude.
*
* @return void
*/
private static function invalidate_existing_items( $object_id, $activity_type, $current_id ) {
$meta_query = array(
array(
'key' => '_activitypub_object_id',
'value' => $object_id,
),
);

// For non-Delete activities, only invalidate items of the same type.
if ( 'Delete' !== $activity_type ) {
$meta_query[] = array(
'key' => '_activitypub_activity_type',
'value' => $activity_type,
);
}

$existing_items = get_posts(
array(
'post_type' => self::POST_TYPE,
'post_status' => 'pending',
'exclude' => array( $current_id ),
// phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_meta_query
'meta_query' => $meta_query,
'fields' => 'ids',
)
);

foreach ( $existing_items as $existing_item_id ) {
\wp_publish_post( $existing_item_id );
\delete_post_meta( $existing_item_id, '_activitypub_outbox_offset' );
}
}
}
1 change: 0 additions & 1 deletion tests/includes/class-test-dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public function test_process_outbox() {
'activitypub_async_batch',
array(
Dispatcher::$callback,
self::$user_id,
$outbox_item->ID,
Dispatcher::$batch_size,
0,
Expand Down
14 changes: 10 additions & 4 deletions tests/includes/class-test-scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public function test_reprocess_outbox() {
);
}

$pending_ids[] = Outbox::add(
$activity_object,
'Update',
self::$user_id,
ACTIVITYPUB_CONTENT_VISIBILITY_PUBLIC
);

// Track scheduled events.
$scheduled_events = array();
add_filter(
Expand All @@ -84,10 +91,9 @@ function ( $event ) use ( &$scheduled_events ) {
Scheduler::reprocess_outbox();

// Verify each pending activity was scheduled.
$this->assertCount( 3, $scheduled_events, 'Should schedule 3 activities for processing' );
foreach ( $pending_ids as $id ) {
$this->assertContains( $id, $scheduled_events, "Activity $id should be scheduled" );
}
$this->assertCount( 2, $scheduled_events, 'Should schedule 2 activities for processing' );
$this->assertNotContains( $pending_ids[0], $scheduled_events, "Activity $pending_ids[0] should be scheduled" );
$this->assertContains( $pending_ids[3], $scheduled_events, "Activity $pending_ids[3] should be scheduled" );

// Test with published activities (should not be scheduled).
$published_id = Outbox::add(
Expand Down
83 changes: 83 additions & 0 deletions tests/includes/collection/class-test-outbox.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,87 @@ public function author_object_provider() {
array( ACTIVITYPUB_ACTOR_MODE, 90210, false ),
);
}

/**
* Test invalidating existing outbox items.
*/
public function test_invalidate_existing_items() {
$object = $this->get_dummy_activity_object();
$activity_type = 'Create';

// Create first outbox item.
$first_id = \Activitypub\add_to_outbox( $object, $activity_type, 1 );
$this->assertNotFalse( $first_id );
$this->assertEquals( 'pending', get_post_status( $first_id ) );

// Create second outbox item with same object_id and activity_type.
$second_id = \Activitypub\add_to_outbox( $object, $activity_type, 1 );
$this->assertNotFalse( $second_id );

// First item should now be published (invalidated).
$this->assertEquals( 'publish', get_post_status( $first_id ) );
// New item should still be pending.
$this->assertEquals( 'pending', get_post_status( $second_id ) );
}

/**
* Test that only items with matching object_id and activity_type are invalidated.
*/
public function test_selective_invalidation() {
$object1 = $this->get_dummy_activity_object();
$object2 = $this->get_dummy_activity_object();
$object2->set_id( 'https://example.com/different-object' );

// Create items with different combinations.
$item1 = \Activitypub\add_to_outbox( $object1, 'Create', 1 ); // Should be invalidated.
$item2 = \Activitypub\add_to_outbox( $object2, 'Create', 1 ); // Should stay pending (different object).
$item3 = \Activitypub\add_to_outbox( $object1, 'Update', 1 ); // Should stay pending (different activity).

// Add new item that should trigger invalidation of item1.
$new_item = \Activitypub\add_to_outbox( $object1, 'Create', 1 );

$this->assertEquals( 'publish', get_post_status( $item1 ) );
$this->assertEquals( 'pending', get_post_status( $item2 ) );
$this->assertEquals( 'pending', get_post_status( $item3 ) );
$this->assertEquals( 'pending', get_post_status( $new_item ) );
}

/**
* Test that Delete activities invalidate all existing items for the object.
*/
public function test_delete_invalidates_all_activities() {
$object = $this->get_dummy_activity_object();

// Create items with different activity types.
$create_id = \Activitypub\add_to_outbox( $object, 'Create', 1 );
$update_id = \Activitypub\add_to_outbox( $object, 'Update', 1 );
$like_id = \Activitypub\add_to_outbox( $object, 'Like', 1 );

$this->assertEquals( 'pending', get_post_status( $create_id ) );
$this->assertEquals( 'pending', get_post_status( $update_id ) );
$this->assertEquals( 'pending', get_post_status( $like_id ) );

// Add Delete activity.
$delete_id = \Activitypub\add_to_outbox( $object, 'Delete', 1 );

// All previous activities should be published (invalidated).
$this->assertEquals( 'publish', get_post_status( $create_id ) );
$this->assertEquals( 'publish', get_post_status( $update_id ) );
$this->assertEquals( 'publish', get_post_status( $like_id ) );
// Delete activity should still be pending.
$this->assertEquals( 'pending', get_post_status( $delete_id ) );
}

/**
* Helper method to create a dummy activity object for testing.
*
* @return \Activitypub\Activity\Base_Object
*/
private function get_dummy_activity_object() {
$object = new \Activitypub\Activity\Base_Object();
$object->set_id( 'https://example.com/test-object' );
$object->set_type( 'Note' );
$object->set_content( 'Test content' );
return $object;
}
}
15 changes: 10 additions & 5 deletions tests/includes/scheduler/class-test-actor.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public function test_user_meta_update( $meta_key ) {

$activitpub_id = Actors::get_by_id( self::$user_id )->get_id();
$post = $this->get_latest_outbox_item( $activitpub_id );
$this->assertSame( $activitpub_id, $post->post_title );
$id = \get_post_meta( $post->ID, '_activitypub_object_id', true );
$this->assertSame( $activitpub_id, $id );
}

/**
Expand All @@ -80,7 +81,8 @@ public function test_user_update() {

$activitpub_id = Actors::get_by_id( self::$user_id )->get_id();
$post = $this->get_latest_outbox_item( $activitpub_id );
$this->assertSame( $activitpub_id, $post->post_title );
$id = \get_post_meta( $post->ID, '_activitypub_object_id', true );
$this->assertSame( $activitpub_id, $id );
}

/**
Expand All @@ -95,7 +97,8 @@ public function test_blog_user_update() {

$activitpub_id = Actors::get_by_id( Actors::BLOG_USER_ID )->get_id();
$post = $this->get_latest_outbox_item( $activitpub_id );
$this->assertSame( $activitpub_id, $post->post_title );
$id = \get_post_meta( $post->ID, '_activitypub_object_id', true );
$this->assertSame( $activitpub_id, $id );
$this->assertSame( $test_value, $result );
}

Expand Down Expand Up @@ -229,7 +232,8 @@ public function test_schedule_post_activity_extra_fields() {
$activitpub_id = Actors::get_by_id( self::$user_id )->get_id();

$post = $this->get_latest_outbox_item( $activitpub_id );
$this->assertSame( $activitpub_id, $post->post_title );
$id = \get_post_meta( $post->ID, '_activitypub_object_id', true );
$this->assertSame( $activitpub_id, $id );

\wp_delete_post( $post_id, true );
}
Expand All @@ -245,7 +249,8 @@ public function test_schedule_post_activity_extra_field_blog() {
$activitpub_id = Actors::get_by_id( Actors::BLOG_USER_ID )->get_id();

$post = $this->get_latest_outbox_item( $activitpub_id );
$this->assertSame( $activitpub_id, $post->post_title );
$id = \get_post_meta( $post->ID, '_activitypub_object_id', true );
$this->assertSame( $activitpub_id, $id );

// Clean up.
\wp_delete_post( $blog_post_id, true );
Expand Down
6 changes: 4 additions & 2 deletions tests/includes/scheduler/class-test-comment.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public function test_schedule_comment_activity_on_approval() {
wp_set_comment_status( $comment_id, 'approve' );

$post = $this->get_latest_outbox_item( $activitpub_id );
$this->assertSame( $activitpub_id, $post->post_title );
$id = \get_post_meta( $post->ID, '_activitypub_object_id', true );
$this->assertSame( $activitpub_id, $id );

wp_delete_comment( $comment_id, true );
}
Expand All @@ -65,7 +66,8 @@ public function test_schedule_comment_activity_on_insert() {
$activitpub_id = \Activitypub\Comment::generate_id( $comment_id );

$post = $this->get_latest_outbox_item( $activitpub_id );
$this->assertSame( $activitpub_id, $post->post_title );
$id = \get_post_meta( $post->ID, '_activitypub_object_id', true );
$this->assertSame( $activitpub_id, $id );

wp_delete_comment( $comment_id, true );
}
Expand Down
3 changes: 2 additions & 1 deletion tests/includes/scheduler/class-test-post.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public function test_schedule_post_activity_regular_post() {
$activitpub_id = \add_query_arg( 'p', $post_id, \home_url( '/' ) );

$post = $this->get_latest_outbox_item( $activitpub_id );
$this->assertSame( $activitpub_id, $post->post_title );
$id = \get_post_meta( $post->ID, '_activitypub_object_id', true );
$this->assertSame( $activitpub_id, $id );

\wp_delete_post( $post_id, true );
}
Expand Down
Loading