activitypub-1.0.x-dev/src/Services/ActivityPubProcessClient.php

src/Services/ActivityPubProcessClient.php
<?php

namespace Drupal\activitypub\Services;

use Drupal\activitypub\Entity\ActivityPubActivityInterface;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\Exception\RequestException;
use Psr\Log\LoggerInterface;

class ActivityPubProcessClient implements ActivityPubProcessClientInterface {

  /**
   * The entity type manager.
   *
   * @var \Drupal\Core\Entity\EntityTypeManagerInterface
   */
  protected $entityTypeManager;

  /**
   * The ActivityPub Utility service.
   *
   * @var \Drupal\activitypub\Services\ActivityPubUtilityInterface
   */
  protected $activityPubUtility;

  /**
   * The ActivityPub signature service.
   *
   * @var \Drupal\activitypub\Services\ActivityPubSignatureInterface
   */
  protected $activityPubSignature;

  /**
   * The http client.
   *
   * @var \GuzzleHttp\ClientInterface
   */
  protected $httpClient;

  /**
   * The config factory.
   *
   * @var \Drupal\Core\Config\ConfigFactoryInterface
   */
  protected $configFactory;

  /**
   * A logger instance.
   *
   * @var \Psr\Log\LoggerInterface
   */
  protected $logger;

  /**
   * Whether in debug mode or not.
   *
   * @var bool
   */
  protected $debug = FALSE;

  /**
   * ActivityPubProcessClient constructor
   *
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
   *   The entity type manager.
   * @param \Drupal\activitypub\Services\ActivityPubUtilityInterface $activitypub_utility
   *   The activitypub utility.
   * @param \Drupal\activitypub\Services\ActivityPubSignatureInterface $activitypub_signature
   *   The ActivityPub Signature service.
   * @param \GuzzleHttp\ClientInterface $http_client
   *   The HTTP client.
   * @param \Drupal\Core\Config\ConfigFactoryInterface $config_factory
   *   The config factory.
   * @param \Psr\Log\LoggerInterface $logger
   *   The logger.
   */
  public function __construct(EntityTypeManagerInterface $entity_type_manager, ActivityPubUtilityInterface $activitypub_utility, ActivityPubSignatureInterface $activitypub_signature, ClientInterface $http_client, ConfigFactoryInterface $config_factory, LoggerInterface $logger) {
    $this->entityTypeManager = $entity_type_manager;
    $this->activityPubUtility = $activitypub_utility;
    $this->activityPubSignature = $activitypub_signature;
    $this->httpClient = $http_client;
    $this->configFactory = $config_factory;
    $this->logger = $logger;
  }

  /**
   * {@inheritdoc}
   */
  public function prepareOutboxQueue($create_send_queue_item = TRUE, $time_limit = 30, $remove_item_from_queue = TRUE, $debug = FALSE) {
    $end = time() + $time_limit;
    $release_items = [];
    $this->setDebug($debug);

    while (time() < $end && ($item = \Drupal::queue(ACTIVITYPUB_OUTBOX_QUEUE)->claimItem())) {
      $data = $item->data;

      if (empty($data['activity'])) {
        $this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_QUEUE);
        continue;
      }

      $remove_queue_item = TRUE;
      if (!$remove_item_from_queue) {
        $remove_queue_item = FALSE;
      }

      /** @var \Drupal\activitypub\Entity\ActivityPubActivityInterface $activity */
      $activity = $this->entityTypeManager->getStorage('activitypub_activity')->load($data['activity']);
      if (!empty($activity)) {
        try {

          // Check the queue value.
          if (!$activity->isQueued()) {
            $this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_QUEUE);
            continue;
          }

          // Build activity.
          $build = $activity->buildActivity();

          // Replace type.
          if (!empty($data['replaceType']) && $data['replaceType'] == 'Update' && $build['type'] == 'Create') {
            $build['type'] = $data['replaceType'];
          }

          // Send to.
          $inboxes = [];
          $targets = [];
          $followers_url = $activity->getActor() . '/followers';
          if (!empty($build['to'])) {
            foreach ($build['to'] as $t) {
              if ($t != ActivityPubActivityInterface::PUBLIC_URL && $t != $followers_url) {
                $targets[] = $t;
              }
            }
          }

          $this->debug($build);

          // Add followers.
          if ($this->notifyFollowers($build, $followers_url)) {

            // Get followers.
            $conditions = ['type' => 'Follow', 'object' => $activity->getActor(), 'status' => 1];
            /** @var \Drupal\activitypub\Entity\ActivityPubActivityInterface[] $followers */
            $followers = $this->entityTypeManager->getStorage('activitypub_activity')->loadByProperties($conditions);
            foreach ($followers as $follower) {
              $targets[] = $follower->getActor();
            }
          }

          // Create inboxes based on host.
          foreach ($targets as $target) {
            $parsed = parse_url($target);
            $inboxes[$parsed['host']][] = $target;
          }

          $this->debug($inboxes);
          if (!empty($inboxes) && $create_send_queue_item) {
            foreach ($inboxes as $targets) {
              $this->createSendQueueItem($activity, $build, $targets);
            }
          }
        }
        catch (\Exception $e) {
          $this->debug($e->getMessage());
          $this->logger->error('Outbox prepare general exception in @file line @line for @id: @message', ['@message' => $e->getMessage(), '@file' => $e->getFile(), '@line' => $e->getLine(), '@id' => $activity->id()]);
        }
      }

      // Remove or release the queue item.
      if ($remove_queue_item) {
        if (isset($activity)) {
          try {
            $activity->set('processed', TRUE);
            $activity->set('queued', FALSE);
            $activity->save();
          }
          catch (\Exception $ignored) {}
        }
        $this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_QUEUE);
      }
      else {
        $release_items[] = $item;
      }
    }

    foreach ($release_items as $item) {
      $this->releaseItemFromQueue($item, ACTIVITYPUB_OUTBOX_QUEUE);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function handleOutboxQueue($send = TRUE, $time_limit = 30, $remove_item_from_queue = TRUE, $debug = FALSE) {
    $end = time() + $time_limit;
    $release_items = [];
    $this->setDebug($debug);
    $server = $this->activityPubUtility->getServer();

    /** @var \Drupal\activitypub\Entity\Storage\ActivityPubActorStorage $actorStorage */
    $actorStorage = $this->entityTypeManager->getStorage('activitypub_actor');

    while (time() < $end && ($item = \Drupal::queue(ACTIVITYPUB_OUTBOX_SEND_QUEUE)->claimItem())) {
      $data = $item->data;

      if (empty($data['activity'])) {
        $this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_SEND_QUEUE);
        continue;
      }

      $inboxes = [];
      $build = $data['build'];
      $build['@context'] = ActivityPubActivityInterface::CONTEXT_URL;
      $targets = $data['targets'];

      $remove_queue_item = TRUE;
      if (!$remove_item_from_queue) {
        $remove_queue_item = FALSE;
      }

      /** @var \Drupal\activitypub\Entity\ActivityPubActivityInterface $activity */
      $activity = $this->entityTypeManager->getStorage('activitypub_activity')->load($data['activity']);
      if (!empty($activity)) {
        try {

          $actor = $actorStorage->loadActorByEntityIdAndType($activity->getOwnerId(), 'person');
          $this->debug($build);

          // Get inboxes.
          foreach ($targets as $target) {

            // Ignore 'https://example.com/user/random' target which is used in
            // the follow test, we don't care about it that the call is going
            // out.
            if ($target == ACTIVITYPUB_TEST_USER) {
              continue;
            }

            $target_actor = NULL;
            try {
              $target_actor = $server->actor($target);
            }
            catch (\Exception $ignored) {}
            if ($target_actor) {
              $inbox = $target_actor->get('inbox');
              if ($activity->canUseSharedInbox() && ($endpoints = $target_actor->get('endpoints')) && !empty($endpoints['sharedInbox'])) {
                $inbox = $endpoints['sharedInbox'];
              }
              if (is_string($inbox)) {
                $inboxes[$inbox] = $inbox;
              }
            }
          }

          if (!empty($inboxes)) {
            $this->debug($inboxes);
            $keyId = $activity->getActor();

            // Create digest.
            $json = json_encode($build);
            $digest = $this->activityPubSignature->createdigest($json);

            foreach ($inboxes as $inbox) {

              $parsed = parse_url($inbox);
              $host = $parsed['host'];
              $path = $parsed['path'];
              $date = gmdate('D, d M Y H:i:s T', time());
              $this->debug($parsed);

              // Create signature.
              $signature = $this->activityPubSignature->createSignature($actor->getName(), $host, $path, $digest, $date);

              $headers = [
                'Content-Type' => 'application/activity+json',
                'host' => $host,
                'date' => $date,
                'digest' => $digest,
                'Signature' => 'keyId="' . $keyId . '#main-key",headers="(request-target) host date digest",signature="' . base64_encode($signature) . '",algorithm="rsa-sha256"',
              ];

              $this->debug($headers);

              if ($send) {
                try {
                  $response = $this->httpClient->post($inbox, ['body' => $json, 'headers' => $headers]);
                  $this->logger->notice('Outbox response to @inbox for @id: code: @code -  @response', ['@inbox' => $inbox, '@id' => $activity->id(), '@code' => $response->getStatusCode(), '@response' => (string) $response->getBody()]);
                }
                catch (RequestException $e) {
                  $this->logger->error('Outbox exception to @inbox for @id to @inbox: @message', ['@inbox' => $inbox, '@message' => $e->getMessage(), '@id' => $activity->id()]);
                }
              }
            }
          }
        }
        catch (\Exception $e) {
          $this->debug($e->getMessage());
          $this->logger->error('Outbox general exception in @file line @line for @id: @message', ['@message' => $e->getMessage(), '@file' => $e->getFile(), '@line' => $e->getLine(), '@id' => $activity->id()]);
        }
      }

      // Remove or release the queue item.
      if ($remove_queue_item) {
        $this->deleteItemFromQueue($item, ACTIVITYPUB_OUTBOX_SEND_QUEUE);
      }
      else {
        $release_items[] = $item;
      }
    }

    foreach ($release_items as $item) {
      $this->releaseItemFromQueue($item, ACTIVITYPUB_OUTBOX_SEND_QUEUE);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function handleInboxQueue($time_limit = 30, $remove_item_from_queue = TRUE, $debug = FALSE) {
    $end = time() + $time_limit;
    $release_items = [];
    $this->setDebug($debug);

    while (time() < $end && ($item = \Drupal::queue(ACTIVITYPUB_INBOX_QUEUE)->claimItem())) {
      $data = $item->data;

      if (empty($data['activity'])) {
        $this->deleteItemFromQueue($item, ACTIVITYPUB_INBOX_QUEUE);
        continue;
      }

      $activity = NULL;
      $remove_queue_item = TRUE;

      /** @var \Drupal\activitypub\Entity\ActivityPubActivityInterface $activity */
      $activity = $this->entityTypeManager->getStorage('activitypub_activity')->load($data['activity']);
      if (!empty($activity)) {
        try {

          // Check the queue value.
          if (!$activity->isQueued()) {
            $this->deleteItemFromQueue($item, ACTIVITYPUB_INBOX_QUEUE);
            continue;
          }

          $type = NULL;
          if (!empty($data['config_id'])) {
            $type = $data['config_id'];
          }
          $activity->doInboxProcess($type);

          if (!$remove_item_from_queue) {
            $remove_queue_item = FALSE;
          }

        }
        catch (\Exception $e) {
          $this->debug($e->getMessage());
          $this->logger->error('Inbox general exception in @file line @line for @id: @message', ['@message' => $e->getMessage(), '@file' => $e->getFile(), '@line' => $e->getLine(), '@id' => $activity->id()]);
        }
      }

      // Remove or release the queue item.
      if ($remove_queue_item) {
        if ($activity) {
          try {
            $activity->set('processed', TRUE);
            $activity->set('queued', FALSE);
            $activity->save();
          }
          catch (\Exception $ignored) {}
        }
        $this->deleteItemFromQueue($item, ACTIVITYPUB_INBOX_QUEUE);
      }
      else {
        $release_items[] = $item;
      }

    }

    foreach ($release_items as $item) {
      $this->releaseItemFromQueue($item, ACTIVITYPUB_INBOX_QUEUE);
    }

  }

  /**
   * {@inheritdoc}
   */
  public function createQueueItem(ActivityPubActivityInterface $activityPubActivity, $queue = NULL, $extra_data = []) {

    $data = ['activity' => $activityPubActivity->id()] + $extra_data;

    try {
      if (!isset($queue)) {
        $queue = $activityPubActivity->getCollection() == ActivityPubActivityInterface::OUTBOX ? ACTIVITYPUB_OUTBOX_QUEUE : ACTIVITYPUB_INBOX_QUEUE;
      }
      \Drupal::queue($queue)->createItem($data);
      $activityPubActivity->set('queued', TRUE)->save();
    }
    catch (\Exception $e) {
      $this->logger->notice('Error creating queue item: @message', ['@message' => $e->getMessage()]);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function createSendQueueItem(ActivityPubActivityInterface $activityPubActivity, $build, $targets) {

    $data = [
      'activity' => $activityPubActivity->id(),
      'build' => $build,
      'targets' => $targets,
    ];

    try {
      \Drupal::queue(ACTIVITYPUB_OUTBOX_SEND_QUEUE)->createItem($data);
    }
    catch (\Exception $e) {
      $this->logger->notice('Error creating send queue item: @message', ['@message' => $e->getMessage()]);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function removeOldActivities() {
    $days = $this->configFactory->get('activitypub.settings')->get('inbox_remove_x_days');
    if ($days > 0) {

      $conditions = [
        '<created' => strtotime("today - $days days"),
        'collection' => ActivityPubActivityInterface::INBOX,
        'type' => $this->activityPubUtility->getTimelineTypes(),
        'visibility' => ActivityPubActivityInterface::VISIBILITY_PUBLIC,
        'entity_id' => 'isNull',
      ];

      if ($this->configFactory->get('activitypub.settings')->get('inbox_remove_x_days_keep_unread')) {
        $conditions['is_read'] = 1;
      }

      /** @var \Drupal\activitypub\Entity\Storage\ActivityPubActivityStorageInterface $storage */
      $storage = $this->entityTypeManager->getStorage('activitypub_activity');
      $activities = $storage->getActivities($conditions);
      if (!empty($activities)) {
        $storage->delete($activities);
        $this->logger->notice('Removed @count old activities', ['@count' => count($activities)]);
      }
    }
  }

  /**
   * Notify followers.
   *
   * @param $build
   * @param $followers_url
   *
   * @return false
   */
  protected function notifyFollowers($build, $followers_url) {
    $add = FALSE;

    if ((isset($build['to']) && in_array($followers_url, $build['to'])) || (isset($build['cc']) && in_array($followers_url, $build['cc']))) {
      $add = TRUE;
    }

    return $add;
  }

  /**
   * Delete item from queue.
   *
   * @param $item
   * @param $queue
   */
  protected function deleteItemFromQueue($item, $queue) {
    \Drupal::queue($queue)->deleteItem($item);
  }

  /**
   * Release item from queue.
   *
   * @param $item
   * @param $queue
   */
  protected function releaseItemFromQueue($item, $queue) {
    \Drupal::queue($queue)->releaseItem($item);
  }

  /**
   * Debug helper function.
   *
   * @param $var
   */
  protected function debug($var) {
    if ($this->debug) {
      print_r($var);
      echo "\n--------\n";
    }
  }

  /**
   * Set debug property.
   *
   * @param $debug
   */
  protected function setDebug($debug) {
    $this->debug = $debug;
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc