apigee_edge-8.x-1.17/src/JobExecutor.php

src/JobExecutor.php
<?php

/**
 * Copyright 2018 Google Inc.
 *
 * This program is free software; you can redistribute it and/or modify it under
 * the terms of the GNU General Public License version 2 as published by the
 * Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
 * License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc., 51
 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 */

namespace Drupal\apigee_edge;

use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Utility\Error;
use Drupal\apigee_edge\Job\Job;

/**
 * Job executor service.
 */
class JobExecutor implements JobExecutorInterface {

  /**
   * Database connection.
   *
   * @var \Drupal\Core\Database\Connection
   */
  protected $connection;

  /**
   * Time service.
   *
   * @var \Drupal\Component\Datetime\TimeInterface
   */
  protected $time;

  /**
   * The 'apigee_edge_job' queue.
   *
   * @var \Drupal\Core\Queue\QueueInterface
   */
  protected $queue;

  /**
   * JobExecutor constructor.
   *
   * @param \Drupal\Core\Database\Connection $connection
   *   Database connection.
   * @param \Drupal\Component\Datetime\TimeInterface $time
   *   Time interface.
   * @param \Drupal\Core\Queue\QueueFactory $queue_factory
   *   Queue factory.
   */
  public function __construct(Connection $connection, TimeInterface $time, QueueFactory $queue_factory) {
    $this->connection = $connection;
    $this->time = $time;
    $this->queue = $queue_factory->get('apigee_edge_job');
  }

  /**
   * Ensures that a job exists with a given status.
   *
   * @param \Drupal\apigee_edge\Job\Job $job
   *   Job object.
   * @param int $status
   *   Job status.
   */
  protected function ensure(Job $job, int $status) {
    if ($job->getStatus() !== $status) {
      $job->setStatus($status);
      $this->save($job);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function save(Job $job) {
    $now = $this->time->getCurrentTime();
    $jobdata = serialize($job);
    $fields = [
      'status' => $job->getStatus(),
      'job' => $jobdata,
      'updated' => $now,
      'tag' => $job->getTag(),
    ];
    $this->connection->merge('apigee_edge_job')
      ->key('id', $job->getId())
      ->insertFields([
        'id' => $job->getId(),
        'created' => $now,
      ] + $fields)
      ->updateFields($fields)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public function load(string $id): ?Job {
    $query = $this->connection->select('apigee_edge_job', 'j')
      ->fields('j', ['job']);
    $query->condition('id', $id);
    $jobdata = $query->execute()->fetchField();

    return $jobdata ? unserialize($jobdata) : NULL;
  }

  /**
   * {@inheritdoc}
   */
  public function select(?string $tag = NULL): ?Job {
    // @todo handle race conditions.
    $query = $this->connection->select('apigee_edge_job', 'j')
      ->fields('j', ['job'])
      ->orderBy('updated')
      ->range(0, 1);
    $query->condition('status', [Job::IDLE, Job::RESCHEDULED], 'IN');
    if ($tag !== NULL) {
      $query->condition('tag', $tag);
    }
    $jobdata = $query->execute()->fetchField();

    if ($jobdata) {
      /** @var \Drupal\apigee_edge\Job\Job $job */
      $job = unserialize($jobdata);
      $this->ensure($job, Job::SELECTED);

      return $job;
    }

    return NULL;
  }

  /**
   * {@inheritdoc}
   */
  public function call(Job $job, bool $update = TRUE) {
    $this->ensure($job, Job::RUNNING);
    try {
      $result = $job->execute();
      $job->setStatus($result ? Job::IDLE : Job::FINISHED);
    }
    catch (\Exception $ex) {
      $logger = \Drupal::logger('apigee_edge_job');
      Error::logException($logger, $ex);
      $job->recordException($ex);
      $job->setStatus($job->shouldRetry($ex) && $job->consumeRetry() ? Job::RESCHEDULED : Job::FAILED);
    }
    finally {
      if ($update) {
        $this->save($job);
      }
    }
  }

  /**
   * {@inheritdoc}
   */
  public function cast(Job $job) {
    $this->save($job);
    $this->queue->createItem(['tag' => $job->getTag()]);
  }

  /**
   * {@inheritdoc}
   */
  public function countJobs(?string $tag = NULL, ?array $statuses = NULL): int {
    $query = $this->connection->select('apigee_edge_job', 'j');

    if ($tag !== NULL) {
      $query->condition('tag', $tag);
    }

    if ($statuses !== NULL) {
      $query->condition('status', $statuses, 'IN');
    }

    return (int) $query
      ->countQuery()
      ->execute()
      ->fetchField();
  }

  /**
   * {@inheritdoc}
   */
  public function cleanup(string $tag): void {
    $query = $this->connection->delete('apigee_edge_job')
      ->condition('status', Job::FINISHED)
      ->condition('tag', $tag);
    $query->execute();
  }

}

Главная | Обратная связь

drupal hosting | друпал хостинг | it patrol .inc