drupalorg_migrate-1.0.x-dev/src/Plugin/migrate/source/DrupalOrgUser.php
src/Plugin/migrate/source/DrupalOrgUser.php
<?php
namespace Drupal\drupalorg_migrate\Plugin\migrate\source;
use Drupal\migrate\MigrateException;
use Drupal\migrate\Plugin\migrate\source\SourcePluginBase;
use Drupal\migrate\Row;
use Drupal\user\Plugin\migrate\source\d7\User;
/**
* Drupal 7 users from database.
*
* @MigrateSource(
* id = "d7_drupalorg_user",
* source_module = "user"
* )
*/
class DrupalOrgUser extends User {
/**
* {@inheritdoc}
*/
protected function initializeIterator() {
// Initialize the batch size.
if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
// Valid batch sizes are integers >= 0.
if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
$this->batchSize = $this->configuration['batch_size'];
}
else {
throw new MigrateException("batch_size must be greater than or equal to zero");
}
}
// If a batch has run the query is already setup.
if ($this->batch == 0) {
$this->prepareQuery();
// Removed all joinable + high-watermark code from the base plugin
// as we do not need it here. Only iterate on new/changed records.
// If we want to run a full migration:
// phpcs:ignore
// drush state:set drupalorg_migrate.drupalorg_migrate_users_last_record 0 --input-format=integer
$last_record = \Drupal::state()->get('drupalorg_migrate.drupalorg_migrate_users_last_record');
if (!is_null($last_record)) {
$this->query->condition('u.changed', $last_record, '>=');
$this->query->orderBy('u.changed');
}
// Set the time when the query was made.
\Drupal::state()->set('drupalorg_migrate.drupalorg_migrate_users_last_query', \Drupal::time()->getRequestTime());
}
// Download data in batches for performance.
if (($this->batchSize > 0)) {
$this->query->range($this->batch * $this->batchSize, $this->batchSize);
}
$statement = $this->query->execute();
$statement->setFetchMode(\PDO::FETCH_ASSOC);
return new \IteratorIterator($statement);
}
/**
* {@inheritdoc}
*/
#[\ReturnTypeWillChange]
public function next(): void {
// Timestamps from the last time the query was triggered and row processed.
$last_record = \Drupal::state()->get('drupalorg_migrate.drupalorg_migrate_users_last_record', 0);
$last_query = \Drupal::state()->get('drupalorg_migrate.drupalorg_migrate_users_last_query', 0);
// Do the full "next" calculation first. It will only come out of here with
// a "currentRow" if there is one to process, or with null, if there is
// none.
$this->currentSourceIds = NULL;
$this->currentRow = NULL;
// In order to find the next row we want to process, we ask the source
// plugin for the next possible row.
while (!isset($this->currentRow) && $this->getIterator()->valid()) {
$row_data = $this->getIterator()->current() + $this->configuration;
$this->fetchNextRow();
$row = new Row($row_data, $this->getIds());
// Populate the source key for this row.
$this->currentSourceIds = $row->getSourceIdValues();
// Pick up the existing map row, if any, unless fetchNextRow() did it.
if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
$row->setIdMap($id_map);
}
// Clear any previous messages for this row before potentially adding
// new ones.
if (!empty($this->currentSourceIds)) {
$this->idMap->delete($this->currentSourceIds, TRUE);
}
// Preparing the row gives source plugins the chance to skip.
$this->prepareRow($row);
// Check whether the row needs processing.
// 1. This row has not been imported yet.
// 2. Explicitly set to update.
// 3. The row is newer than the last processed row.
// 4. If no such property exists then try by checking the hash of the row.
if (!$row->getIdMap() || $row->needsUpdate() || ($row->getSourceProperty('changed') >= $last_record) || $this->rowChanged($row)) {
$this->currentRow = $row->freezeSource();
}
}
// By now, we've discarded all the rows that didn't need updating/creating.
if (!is_null($this->currentRow)) {
// Set the new timestamp only when the record is going to be processed.
$row_changed = $this->currentRow->getSourceProperty('changed');
if ($row_changed > $last_record) {
\Drupal::state()->set('drupalorg_migrate.drupalorg_migrate_users_last_record', $row_changed);
}
}
// If we could not find a row, then let's bring the timestamp up to the
// time of the initial query as there were no records created/changed.
elseif ($last_record < $last_query) {
\Drupal::state()->set('drupalorg_migrate.drupalorg_migrate_users_last_record', $last_query);
}
}
/**
* {@inheritdoc}
*/
public function query() {
// Query by UID earlier to speed up queries.
return $this->select('users', 'u')
->fields('u', ['uid'])
->condition('u.uid', 0, '>');
}
/**
* {@inheritdoc}
*/
public function prepareRow(Row $row) {
// Try to determine early if this row needs to be skipped.
$prepare_row = SourcePluginBase::prepareRow($row);
if ($prepare_row) {
$uid = $row->getSourceProperty('uid');
// Set all properties here as we only queried by UID earlier.
$row_data = $this->select('users', 'u')
->fields('u')
->condition('u.uid', $uid)
->execute()
->fetchAssoc();
foreach ($row_data as $field => $value) {
$row->setSourceProperty($field, $value);
}
return parent::prepareRow($row);
}
return FALSE;
}
}
