activity_stream-1.0.x-dev/src/ActivityAggregationFactory.php
src/ActivityAggregationFactory.php
<?php
namespace Drupal\activity_stream;
use Drupal\activity_stream\Entity\Activity;
use Drupal\activity_stream\Plugin\ActivityDestinationManager;
use Drupal\Component\Render\FormattableMarkup;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Database\Connection;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Language\LanguageManagerInterface;
use Drupal\Core\Render\BubbleableMetadata;
use Drupal\Core\Utility\Token;
use Drupal\language\ConfigurableLanguageManagerInterface;
use Drupal\message\Entity\Message;
use Drupal\message\MessageInterface;
use Drupal\activity_stream\ActivityConfigInterface;
/**
* Class ActivityAggregationFactory to create Activity items based on ActivityLogs.
*
* @package Drupal\activity_stream
*/
class ActivityAggregationFactory extends ControllerBase {
/**
* Activity destination manager.
*
* @var \Drupal\activity_stream\Plugin\ActivityDestinationManager
*/
protected $activityDestinationManager;
/**
* The entity type manager.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* The connection to the database.
*
* @var \Drupal\Core\Database\Connection
*/
protected $database;
/**
* The language manager.
*
* @var \Drupal\Core\Language\LanguageManagerInterface
*/
protected $languageManager;
/**
* The token replacement instance.
*
* @var \Drupal\Core\Utility\Token
*/
protected $token;
/**
* The module handler.
*
* @var \Drupal\Core\Extension\ModuleHandlerInterface
*/
protected $moduleHandler;
/**
* ActivityFactory constructor.
*
* @param \Drupal\activity_stream\Plugin\ActivityDestinationManager $activityDestinationManager
* The activity destination manager.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* The entity type manager.
* @param \Drupal\Core\Database\Connection $database
* The connection to the database.
* @param \Drupal\Core\Language\LanguageManagerInterface $language_manager
* The new language manager.
* @param \Drupal\Core\Utility\Token $token
* The token replacement instance.
* @param \Drupal\Core\Extension\ModuleHandlerInterface $module_handler
* The module handler.
*/
public function __construct(
ActivityDestinationManager $activityDestinationManager,
EntityTypeManagerInterface $entity_type_manager,
Connection $database,
LanguageManagerInterface $language_manager,
Token $token,
ModuleHandlerInterface $module_handler
) {
$this->activityDestinationManager = $activityDestinationManager;
$this->entityTypeManager = $entity_type_manager;
$this->database = $database;
$this->languageManager = $language_manager;
$this->token = $token;
$this->moduleHandler = $module_handler;
}
/**
* Create the activities based on a data array.
*
* @param array $data
* An array of data to create activity from.
*
* @return array
* An array of created activities.
*/
public function createActivity(array $data, array $activity_fields) {
$activities = $this->buildActivity($data, $activity_fields);
return $activities;
}
/**
* Build the activities based on a data array.
*
* @param array $data
* An array of data to create activity from.
*
* @return array
* An array of created activities.
*
* @throws \Drupal\Core\Entity\EntityStorageException
*/
protected function buildActivity(array $data, array $activity_fields) {
$activity = NULL;
// First we check if we have an existing activity.
// Load message from message id only if available
if (isset($data['mid']) && !empty($data['mid'])) {
$message = $this->entityTypeManager
->getStorage('message')
->load($data['mid']);
}
// Load Activity config from activity config id
$activity_config = $this->entityTypeManager
->getStorage('activity_config')
->load($data['activity_config']);
// Activity config not needed in data array
unset($data['activity_config']);
if (!$activity_config instanceof ActivityConfigInterface) {
return $activities;
}
$activity_direct = $activity_config->get('activity_direct');
$activity_aggregate = $activity_config->get('activity_aggregate');
$activity_date = $activity_config->get('activity_date');
// Initialize fields for new activity entity.
$activity_fields = [
'field_activity_destinations' => $this->getFieldDestinations($data),
'field_activity_entity' => $this->getFieldEntity($data),
'field_activity_recipient_group' => $this->getFieldRecipientGroup($data),
//'field_activity_recipient_user' => $this->getFieldRecipientUser($data),
// Default activity status.
//'field_activity_status' => 'received',
'user_id' => $this->getActor($data),
];
// In case we have a message we want to attach it to our activity
if (isset($message) && $message instanceof MessageInterface) {
$activity_fields['field_activity_message'] = $this->getFieldMessage($data);
}
// In case we have an activity data
if (isset($data['activity_date']) && is_int($data['activity_date'])) {
$activity_fields['field_activity_date'] = $data['activity_date'];
}
else {
$activity_fields['field_activity_date'] = time();
}
// Check if aggregation is enabled for this activity type.
// @todo Consider if we should put aggregation to separate service.
if ($activity_aggregate) {
$activities = $this->buildAggregatedActivites($data, $activity_fields);
}
else {
$activities = $this->createActivitiesFromRecipients($data, $activity_fields);
}
return $activities;
}
protected function checkForExistingActivities() {
}
/**
* Create activities based on recipients
*
* @param array $data
* @param array $activity_fields
* @return array
* Empty array or array of created activities.
*/
protected function createActivitiesFromRecipients(array $data, array $activity_fields): array {
$user_recipients = [];
$activities = [];
// Get the recipients
$recipients = $this->getFieldRecipientUser($data);
if (!empty($recipients)) {
// Get all the activity recipient types. Maintain target IDs as key.
$activity_by_type = array_column($recipients, 'target_type');
foreach ($activity_by_type as $recipients_key => $target_type) {
// For all one to one target entity types we create an activity.
if ($target_type !== 'user') {
$activity_fields['field_activity_recipient_user'] = $recipients[$recipients_key];
$activity = $this->entityTypeManager
->getStorage('activity_stream_activity')
->create($activity_fields);
$activities[] = $activity->save();
}
// Create one activity with multiple recipients
if ($target_type === 'user') {
$user_recipients[] = $recipients[$recipients_key];
}
}
if (!empty($user_recipients)) {
$activity_fields['field_activity_recipient_user'] = $user_recipients;
$activity = $this->entityTypeManager
->getStorage('activity_stream_activity')
->create($activity_fields);
$activities[] = $activity->save();
}
}
else {
$activity = $this->entityTypeManager
->getStorage('activity_stream_activity')
->create($activity_fields);
$activities[] = $activity->save();
}
return $activities;
}
protected function isValidTimestamp($timestamp) {
return ((string) (int) $timestamp === $timestamp)
&& ($timestamp <= PHP_INT_MAX)
&& ($timestamp >= ~PHP_INT_MAX);
}
/**
* Get field value for 'destination' field from data array.
*/
protected function getFieldDestinations(array $data, $allowed_destinations = []) {
$value = NULL;
if (isset($data['destination'])) {
$value = $data['destination'];
if (!empty($allowed_destinations)) {
foreach ($value as $key => $destination) {
if (!in_array($destination['value'], $allowed_destinations)) {
unset($value[$key]);
}
}
}
}
return $value;
}
/**
* Get field value for 'entity' field from data array.
*/
protected function getFieldEntity($data) {
$value = NULL;
if (isset($data['related_object'])) {
$value = $data['related_object'];
}
return $value;
}
/**
* Get field value for 'message' field from data array.
*/
protected function getFieldMessage($data) {
$value = NULL;
if (isset($data['mid'])) {
$value = [];
$value[] = [
'target_id' => $data['mid'],
];
}
return $value;
}
/**
* Get field value for 'created' field from data array.
*/
protected function getCreated(Message $message) {
return $message->getCreatedTime();
}
/**
* Build the aggregated activities based on a data array.
*/
protected function buildAggregatedActivites($data, $activity_fields) {
$activities = [];
$common_destinations = $this->activityDestinationManager->getListByProperties('isCommon', TRUE);
$personal_destinations = $this->activityDestinationManager->getListByProperties('isCommon', FALSE);
// Get related activities.
$related_activities = $this->getAggregationRelatedActivities($data);
if (!empty($related_activities)) {
// Update related activities.
foreach ($related_activities as $related_activity) {
$destination = $related_activity->field_activity_destinations->value;
// If user already have related activity we remove it and create new.
// And we also remove related activities from common streams.
if ($related_activity->getOwnerId() == $this->getActor($data) || in_array($destination, $common_destinations)) {
// @todo Consider if need to delete or unpublish old activites.
$related_activity->delete();
}
}
}
return $activities;
}
/**
* Get related activities for activity aggregation.
*/
protected function getAggregationRelatedActivities($data) {
$activities = [];
$related_object = $data['related_object'];
if (!empty($related_object['target_id']) && !empty($related_object['target_type'])) {
if ($related_object['target_type'] === 'comment') {
// Get commented entity.
$comment_storage = $this->entityTypeManager->getStorage('comment');
$comment = $comment_storage->load($related_object['target_id']);
// This can happen if the comment was removed before the activity was
// processed.
if ($comment === NULL) {
$comment_ids = NULL;
}
else {
$commented_entity = $comment->getCommentedEntity();
// Get all comments of commented entity.
$comment_query = $this->entityTypeManager->getStorage('comment')
->getQuery();
$comment_query->condition('entity_id', $commented_entity->id(), '=');
$comment_query->condition('entity_type', $commented_entity->getEntityTypeId(), '=');
$comment_query->accessCheck();
$comment_ids = $comment_query->execute();
}
// Get all activities provided by comments of commented entity.
if (!empty($comment_ids)) {
$activity_query = $this->entityTypeManager->getStorage('activity')->getQuery();
$activity_query->condition('field_activity_entity.target_id', $comment_ids, 'IN');
$activity_query->condition('field_activity_entity.target_type', $related_object['target_type'], '=');
// We exclude activities with email, platform_email and notifications
// destinations from aggregation.
$aggregatable_destinations = $this->activityDestinationManager->getListByProperties('isAggregatable', TRUE);
$activity_query->condition('field_activity_destinations.value', $aggregatable_destinations, 'IN');
$activity_query->accessCheck();
$activity_ids = $activity_query->execute();
if (!empty($activity_ids)) {
$activities = Activity::loadMultiple($activity_ids);
}
}
}
}
return $activities;
}
/**
* Get related entity for activity aggregation.
*/
public function getActivityRelatedEntity($data) {
$related_object = $data['related_object'][0];
// We return parent comment as related object as comment
// for create_comment_reply messages.
if ($data['message_template'] === 'create_comment_reply') {
$comment_storage = $this->entityTypeManager->getStorage('comment');
// @todo Check if comment published?
$comment = $comment_storage->load($related_object['target_id']);
if ($comment) {
$parent_comment = $comment->getParentComment();
if (!empty($parent_comment)) {
$related_object = [
'target_type' => $parent_comment->getEntityTypeId(),
'target_id' => $parent_comment->id(),
];
}
}
}
// We return commented entity as related object for all other comments.
elseif (isset($related_object['target_type']) && $related_object['target_type'] === 'comment') {
$comment_storage = $this->entityTypeManager->getStorage('comment');
// @todo Check if comment published?
$comment = $comment_storage->load($related_object['target_id']);
if ($comment) {
$commented_entity = $comment->getCommentedEntity();
if (!empty($commented_entity)) {
$related_object = [
'target_type' => $commented_entity->getEntityTypeId(),
'target_id' => $commented_entity->id(),
];
}
}
}
$this->moduleHandler->alter('activity_stream_related_entity_object', $related_object, $data);
return $related_object;
}
/**
* Get unique authors number for activity aggregation.
*/
protected function getAggregationAuthorsCount(array $data) {
$count = 0;
$related_object = $data['related_object'][0];
if (isset($related_object['target_type']) && $related_object['target_type'] === 'comment') {
// Get related entity.
$related_entity = $this->getActivityRelatedEntity($data);
if (!empty($related_entity['target_id']) && !empty($related_entity['target_type'])) {
$query = $this->database->select('comment_field_data', 'cfd');
$query->addExpression('COUNT(DISTINCT cfd.uid)');
$query->condition('cfd.status', 1);
$query->condition('cfd.entity_type', $related_entity['target_type']);
$query->condition('cfd.entity_id', $related_entity['target_id']);
$count = $query->execute()->fetchField();
}
}
return $count;
}
/**
* Get field value for 'recipient_group' field from data array.
*/
protected function getFieldRecipientGroup($data) {
$value = NULL;
if (isset($data['recipient']['target_type']) && $data['recipient']['target_type'] === 'group') {
// Should be in an array for the field.
$value = [$data['recipient']];
}
return $value;
}
/**
* Get field value for 'recipient_user' field from data array.
*/
protected function getFieldRecipientUser($data) {
$recipients = [];
$context_plugin_manager = \Drupal::service('plugin.manager.activity_context.processor');
if ($context_plugin_manager->hasDefinition($data['context'])) {
$plugin = $context_plugin_manager->createInstance($data['context']);
$recipients = $plugin->getRecipients($data, $data['last_uid'], 0);
}
return $recipients;
}
/**
* Return the actor uid.
*
* @param array $data
* Array of data.
*
* @return int
* Value uid integer.
*/
protected function getActor(array $data) {
$value = 0;
if (isset($data['actor'])) {
$value = $data['actor'];
}
return $value;
}
/**
* Process the message given the arguments saved with it.
*
* @param array $arguments
* Array with the arguments.
* @param array $output
* Array with the templated text saved in the message template.
* @param \Drupal\message\Entity\Message $message
* Message object.
*
* @return array
* The templated text, with the placeholders replaced with the actual value,
* if there are indeed arguments.
*/
protected function processArguments(array $arguments, array $output, Message $message) {
// Check if we have arguments saved along with the message.
if (empty($arguments)) {
return $output;
}
foreach ($arguments as $key => $value) {
if (is_array($value) && !empty($value['callback']) && is_callable($value['callback'])) {
// A replacement via callback function.
$value += ['pass message' => FALSE];
if ($value['pass message']) {
// Pass the message object as-well.
$value['arguments']['message'] = $message;
}
$arguments[$key] = call_user_func_array($value['callback'], $value['arguments']);
}
}
foreach ($output as $key => $value) {
$output[$key] = new FormattableMarkup($value, $arguments);
}
return $output;
}
/**
* Replace placeholders with tokens.
*
* @param array $output
* The templated text to be replaced.
* @param array $options
* A keyed array of settings and flags to control the token
* replacement process. Supported options are:
* - langcode: A language code to be used when generating locale-sensitive
* tokens.
* - callback: A callback function that will be used to post-process the
* array of token replacements after they are generated.
* - clear: A boolean flag indicating that tokens should be removed from
* the final text if no replacement value can be generated.
* This will be passed to \Drupal\Core\Utility\Token::replace().
* @param \Drupal\message\Entity\Message $message
* Message object.
*
* @return array
* The output with placeholders replaced with the token value,
* if there are indeed tokens.
*/
protected function processTokens(array $output, array $options, Message $message) {
$bubbleable_metadata = new BubbleableMetadata();
foreach ($output as $key => $value) {
if (is_string($value)) {
$output[$key] = $this->token
->replace($value, ['message' => $message], $options, $bubbleable_metadata);
}
else {
if (isset($value['value'])) {
$output[$key] = $this->token
->replace($value['value'], ['message' => $message], $options, $bubbleable_metadata);
}
}
}
$bubbleable_metadata->applyTo($output);
return $output;
}
}
