activitypub-1.0.x-dev/src/Services/ActivityPubProcessClient.php
src/Services/ActivityPubProcessClient.php
<?php
namespace Drupal\activitypub\Services;
use Drupal\activitypub\Entity\ActivityPubActivityInterface;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\Exception\RequestException;
use Psr\Log\LoggerInterface;
class ActivityPubProcessClient implements ActivityPubProcessClientInterface {
/**
* The entity type manager.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* The ActivityPub Utility service.
*
* @var \Drupal\activitypub\Services\ActivityPubUtilityInterface
*/
protected $activityPubUtility;
/**
* The ActivityPub signature service.
*
* @var \Drupal\activitypub\Services\ActivityPubSignatureInterface
*/
protected $activityPubSignature;
/**
* The http client.
*
* @var \GuzzleHttp\ClientInterface
*/
protected $httpClient;
/**
* The config factory.
*
* @var \Drupal\Core\Config\ConfigFactoryInterface
*/
protected $configFactory;
/**
* A logger instance.
*
* @var \Psr\Log\LoggerInterface
*/
protected $logger;
/**
* Whether in debug mode or not.
*
* @var bool
*/
protected $debug = FALSE;
/**
* ActivityPubProcessClient constructor
*
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* The entity type manager.
* @param \Drupal\activitypub\Services\ActivityPubUtilityInterface $activitypub_utility
* The activitypub utility.
* @param \Drupal\activitypub\Services\ActivityPubSignatureInterface $activitypub_signature
* The ActivityPub Signature service.
* @param \GuzzleHttp\ClientInterface $http_client
* The HTTP client.
* @param \Drupal\Core\Config\ConfigFactoryInterface $config_factory
* The config factory.
* @param \Psr\Log\LoggerInterface $logger
* The logger.
*/
public function __construct(EntityTypeManagerInterface $entity_type_manager, ActivityPubUtilityInterface $activitypub_utility, ActivityPubSignatureInterface $activitypub_signature, ClientInterface $http_client, ConfigFactoryInterface $config_factory, LoggerInterface $logger) {
$this->entityTypeManager = $entity_type_manager;
$this->activityPubUtility = $activitypub_utility;
$this->activityPubSignature = $activitypub_signature;
$this->httpClient = $http_client;
$this->configFactory = $config_factory;
$this->logger = $logger;
}
/**
* {@inheritdoc}
*/
public function prepareOutboxQueue($create_send_queue_item = TRUE, $time_limit = 30, $remove_item_from_queue = TRUE, $debug = FALSE) {
$end = time() + $time_limit;
$release_items = [];
$this->setDebug($debug);
while (time() < $end && ($item = \Drupal::queue(ACTIVITYPUB_OUTBOX_QUEUE)->claimItem())) {
$data = $item->data;
if (empty($data['activity'])) {
$this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_QUEUE);
continue;
}
$remove_queue_item = TRUE;
if (!$remove_item_from_queue) {
$remove_queue_item = FALSE;
}
/** @var \Drupal\activitypub\Entity\ActivityPubActivityInterface $activity */
$activity = $this->entityTypeManager->getStorage('activitypub_activity')->load($data['activity']);
if (!empty($activity)) {
try {
// Check the queue value.
if (!$activity->isQueued()) {
$this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_QUEUE);
continue;
}
// Build activity.
$build = $activity->buildActivity();
// Replace type.
if (!empty($data['replaceType']) && $data['replaceType'] == 'Update' && $build['type'] == 'Create') {
$build['type'] = $data['replaceType'];
}
// Send to.
$inboxes = [];
$targets = [];
$followers_url = $activity->getActor() . '/followers';
if (!empty($build['to'])) {
foreach ($build['to'] as $t) {
if ($t != ActivityPubActivityInterface::PUBLIC_URL && $t != $followers_url) {
$targets[] = $t;
}
}
}
$this->debug($build);
// Add followers.
if ($this->notifyFollowers($build, $followers_url)) {
// Get followers.
$conditions = ['type' => 'Follow', 'object' => $activity->getActor(), 'status' => 1];
/** @var \Drupal\activitypub\Entity\ActivityPubActivityInterface[] $followers */
$followers = $this->entityTypeManager->getStorage('activitypub_activity')->loadByProperties($conditions);
foreach ($followers as $follower) {
$targets[] = $follower->getActor();
}
}
// Create inboxes based on host.
foreach ($targets as $target) {
$parsed = parse_url($target);
$inboxes[$parsed['host']][] = $target;
}
$this->debug($inboxes);
if (!empty($inboxes) && $create_send_queue_item) {
foreach ($inboxes as $targets) {
$this->createSendQueueItem($activity, $build, $targets);
}
}
}
catch (\Exception $e) {
$this->debug($e->getMessage());
$this->logger->error('Outbox prepare general exception in @file line @line for @id: @message', ['@message' => $e->getMessage(), '@file' => $e->getFile(), '@line' => $e->getLine(), '@id' => $activity->id()]);
}
}
// Remove or release the queue item.
if ($remove_queue_item) {
if (isset($activity)) {
try {
$activity->set('processed', TRUE);
$activity->set('queued', FALSE);
$activity->save();
}
catch (\Exception $ignored) {}
}
$this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_QUEUE);
}
else {
$release_items[] = $item;
}
}
foreach ($release_items as $item) {
$this->releaseItemFromQueue($item, ACTIVITYPUB_OUTBOX_QUEUE);
}
}
/**
* {@inheritdoc}
*/
public function handleOutboxQueue($send = TRUE, $time_limit = 30, $remove_item_from_queue = TRUE, $debug = FALSE) {
$end = time() + $time_limit;
$release_items = [];
$this->setDebug($debug);
$server = $this->activityPubUtility->getServer();
/** @var \Drupal\activitypub\Entity\Storage\ActivityPubActorStorage $actorStorage */
$actorStorage = $this->entityTypeManager->getStorage('activitypub_actor');
while (time() < $end && ($item = \Drupal::queue(ACTIVITYPUB_OUTBOX_SEND_QUEUE)->claimItem())) {
$data = $item->data;
if (empty($data['activity'])) {
$this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_SEND_QUEUE);
continue;
}
$inboxes = [];
$build = $data['build'];
$build['@context'] = ActivityPubActivityInterface::CONTEXT_URL;
$targets = $data['targets'];
$remove_queue_item = TRUE;
if (!$remove_item_from_queue) {
$remove_queue_item = FALSE;
}
/** @var \Drupal\activitypub\Entity\ActivityPubActivityInterface $activity */
$activity = $this->entityTypeManager->getStorage('activitypub_activity')->load($data['activity']);
if (!empty($activity)) {
try {
$actor = $actorStorage->loadActorByEntityIdAndType($activity->getOwnerId(), 'person');
$this->debug($build);
// Get inboxes.
foreach ($targets as $target) {
// Ignore 'https://example.com/user/random' target which is used in
// the follow test, we don't care about it that the call is going
// out.
if ($target == ACTIVITYPUB_TEST_USER) {
continue;
}
$target_actor = NULL;
try {
$target_actor = $server->actor($target);
}
catch (\Exception $ignored) {}
if ($target_actor) {
$inbox = $target_actor->get('inbox');
if ($activity->canUseSharedInbox() && ($endpoints = $target_actor->get('endpoints')) && !empty($endpoints['sharedInbox'])) {
$inbox = $endpoints['sharedInbox'];
}
if (is_string($inbox)) {
$inboxes[$inbox] = $inbox;
}
}
}
if (!empty($inboxes)) {
$this->debug($inboxes);
$keyId = $activity->getActor();
// Create digest.
$json = json_encode($build);
$digest = $this->activityPubSignature->createdigest($json);
foreach ($inboxes as $inbox) {
$parsed = parse_url($inbox);
$host = $parsed['host'];
$path = $parsed['path'];
$date = gmdate('D, d M Y H:i:s T', time());
$this->debug($parsed);
// Create signature.
$signature = $this->activityPubSignature->createSignature($actor->getName(), $host, $path, $digest, $date);
$headers = [
'Content-Type' => 'application/activity+json',
'host' => $host,
'date' => $date,
'digest' => $digest,
'Signature' => 'keyId="' . $keyId . '#main-key",headers="(request-target) host date digest",signature="' . base64_encode($signature) . '",algorithm="rsa-sha256"',
];
$this->debug($headers);
if ($send) {
try {
$response = $this->httpClient->post($inbox, ['body' => $json, 'headers' => $headers]);
$this->logger->notice('Outbox response to @inbox for @id: code: @code - @response', ['@inbox' => $inbox, '@id' => $activity->id(), '@code' => $response->getStatusCode(), '@response' => (string) $response->getBody()]);
}
catch (RequestException $e) {
$this->logger->error('Outbox exception to @inbox for @id to @inbox: @message', ['@inbox' => $inbox, '@message' => $e->getMessage(), '@id' => $activity->id()]);
}
}
}
}
}
catch (\Exception $e) {
$this->debug($e->getMessage());
$this->logger->error('Outbox general exception in @file line @line for @id: @message', ['@message' => $e->getMessage(), '@file' => $e->getFile(), '@line' => $e->getLine(), '@id' => $activity->id()]);
}
}
// Remove or release the queue item.
if ($remove_queue_item) {
$this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_SEND_QUEUE);
}
else {
$release_items[] = $item;
}
}
foreach ($release_items as $item) {
$this->releaseItemFromQueue($item, ACTIVITYPUB_OUTBOX_SEND_QUEUE);
}
}
/**
* {@inheritdoc}
*/
public function handleInboxQueue($time_limit = 30, $remove_item_from_queue = TRUE, $debug = FALSE) {
$end = time() + $time_limit;
$release_items = [];
$this->setDebug($debug);
while (time() < $end && ($item = \Drupal::queue(ACTIVITYPUB_INBOX_QUEUE)->claimItem())) {
$data = $item->data;
if (empty($data['activity'])) {
$this->deleteItemFromQueue($item, ACTIVITYPUB_INBOX_QUEUE);
continue;
}
$activity = NULL;
$remove_queue_item = TRUE;
/** @var \Drupal\activitypub\Entity\ActivityPubActivityInterface $activity */
$activity = $this->entityTypeManager->getStorage('activitypub_activity')->load($data['activity']);
if (!empty($activity)) {
try {
// Check the queue value.
if (!$activity->isQueued()) {
$this->deleteItemFromQueue($item, ACTIVITYPUB_INBOX_QUEUE);
continue;
}
$type = NULL;
if (!empty($data['config_id'])) {
$type = $data['config_id'];
}
$activity->doInboxProcess($type);
if (!$remove_item_from_queue) {
$remove_queue_item = FALSE;
}
}
catch (\Exception $e) {
$this->debug($e->getMessage());
$this->logger->error('Inbox general exception in @file line @line for @id: @message', ['@message' => $e->getMessage(), '@file' => $e->getFile(), '@line' => $e->getLine(), '@id' => $activity->id()]);
}
}
// Remove or release the queue item.
if ($remove_queue_item) {
if ($activity) {
try {
$activity->set('processed', TRUE);
$activity->set('queued', FALSE);
$activity->save();
}
catch (\Exception $ignored) {}
}
$this->deleteItemFromQueue($item, ACTIVITYPUB_INBOX_QUEUE);
}
else {
$release_items[] = $item;
}
}
foreach ($release_items as $item) {
$this->releaseItemFromQueue($item, ACTIVITYPUB_INBOX_QUEUE);
}
}
/**
* {@inheritdoc}
*/
public function createQueueItem(ActivityPubActivityInterface $activityPubActivity, $queue = NULL, $extra_data = []) {
$data = ['activity' => $activityPubActivity->id()] + $extra_data;
try {
if (!isset($queue)) {
$queue = $activityPubActivity->getCollection() == ActivityPubActivityInterface::OUTBOX ? ACTIVITYPUB_OUTBOX_QUEUE : ACTIVITYPUB_INBOX_QUEUE;
}
\Drupal::queue($queue)->createItem($data);
$activityPubActivity->set('queued', TRUE)->save();
}
catch (\Exception $e) {
$this->logger->notice('Error creating queue item: @message', ['@message' => $e->getMessage()]);
}
}
/**
* {@inheritdoc}
*/
public function createSendQueueItem(ActivityPubActivityInterface $activityPubActivity, $build, $targets) {
$data = [
'activity' => $activityPubActivity->id(),
'build' => $build,
'targets' => $targets,
];
try {
\Drupal::queue(ACTIVITYPUB_OUTBOX_SEND_QUEUE)->createItem($data);
}
catch (\Exception $e) {
$this->logger->notice('Error creating send queue item: @message', ['@message' => $e->getMessage()]);
}
}
/**
* {@inheritdoc}
*/
public function removeOldActivities() {
$days = $this->configFactory->get('activitypub.settings')->get('inbox_remove_x_days');
if ($days > 0) {
$conditions = [
'<created' => strtotime("today - $days days"),
'collection' => ActivityPubActivityInterface::INBOX,
'type' => $this->activityPubUtility->getTimelineTypes(),
'visibility' => ActivityPubActivityInterface::VISIBILITY_PUBLIC,
'entity_id' => 'isNull',
];
if ($this->configFactory->get('activitypub.settings')->get('inbox_remove_x_days_keep_unread')) {
$conditions['is_read'] = 1;
}
/** @var \Drupal\activitypub\Entity\Storage\ActivityPubActivityStorageInterface $storage */
$storage = $this->entityTypeManager->getStorage('activitypub_activity');
$activities = $storage->getActivities($conditions);
if (!empty($activities)) {
$storage->delete($activities);
$this->logger->notice('Removed @count old activities', ['@count' => count($activities)]);
}
}
}
/**
* Notify followers.
*
* @param $build
* @param $followers_url
*
* @return false
*/
protected function notifyFollowers($build, $followers_url) {
$add = FALSE;
if ((isset($build['to']) && in_array($followers_url, $build['to'])) || (isset($build['cc']) && in_array($followers_url, $build['cc']))) {
$add = TRUE;
}
return $add;
}
/**
* Delete item from queue.
*
* @param $item
* @param $queue
*/
protected function deleteItemFromQueue($item, $queue) {
\Drupal::queue($queue)->deleteItem($item);
}
/**
* Release item from queue.
*
* @param $item
* @param $queue
*/
protected function releaseItemFromQueue($item, $queue) {
\Drupal::queue($queue)->releaseItem($item);
}
/**
* Debug helper function.
*
* @param $var
*/
protected function debug($var) {
if ($this->debug) {
print_r($var);
echo "\n--------\n";
}
}
/**
* Set debug property.
*
* @param $debug
*/
protected function setDebug($debug) {
$this->debug = $debug;
}
}
