diff --git a/src/Queue/Admin/DataExtension.php b/src/Queue/Admin/DataExtension.php new file mode 100644 index 00000000..ef16da95 --- /dev/null +++ b/src/Queue/Admin/DataExtension.php @@ -0,0 +1,70 @@ +owner; + + $fields->addFieldsToTab('Root.JobData', [ + $jobDataPreview = TextareaField::create('SavedJobDataPreview', 'Job Data'), + ]); + + if (strlen($owner->getMessagesRaw()) > 0) { + $fields->addFieldToTab( + 'Root.MessagesRaw', + $messagesRaw = LiteralField::create('MessagesRaw', $owner->getMessagesRaw()) + ); + } + + $jobDataPreview->setReadonly(true); + } + + /** + * @return string|null + */ + public function getSavedJobDataPreview(): ?string + { + return $this->owner->SavedJobData; + } + + /** + * @return string|null + */ + public function getMessagesRaw(): ?string + { + return $this->owner->SavedJobMessages; + } + + /** + * @return string + */ + public function getImplementationSummary(): string + { + $segments = explode('\\', $this->owner->Implementation); + + while (count($segments) > 2) { + array_shift($segments); + } + + return implode('\\', $segments); + } +} diff --git a/src/Queue/Admin/Extension.php b/src/Queue/Admin/Extension.php new file mode 100644 index 00000000..1db3c5d8 --- /dev/null +++ b/src/Queue/Admin/Extension.php @@ -0,0 +1,222 @@ +Fields(); + + // there are multiple fields that need to be updated + $fieldNames = [ + 'QueuedJobDescriptor', + $this->encodeClassName(QueuedJobDescriptor::class), + ]; + + foreach ($fieldNames as $fieldName) { + /** @var GridField $gridField */ + $gridField = $fields->fieldByName($fieldName); + + if (!$gridField) { + continue; + } + + $config = $gridField->getConfig(); + + // apply custom filters + $this->customiseFilters($config); + } + } + + /** + * Customise queued jobs filters UI + * + * @param GridFieldConfig $config + */ + private function customiseFilters(GridFieldConfig $config): void + { + /** @var GridFieldDataColumns $gridFieldColumns */ + $gridFieldColumns = $config->getComponentByType(GridFieldDataColumns::class); + + $gridFieldColumns->setDisplayFields([ + 'getImplementationSummary' => 'Type', + 'JobTypeString' => 'Queue', + 'JobStatus' => 'Status', + 'JobTitle' => 'Description', + 'Created' => 'Added', + 'StartAfter' => 'Scheduled', + 'JobFinished' => 'Finished', + ]); + + $config->removeComponentsByType(GridFieldFilterHeader::class); + + $filter = new RichFilterHeader(); + $filter + ->setFilterConfig([ + 'getImplementationSummary' => 'Implementation', + 'Description' => 'JobTitle', + 'Status' => [ + 'title' => 'JobStatus', + 'filter' => 'ExactMatchFilter', + ], + 'JobTypeString' => [ + 'title' => 'JobType', + 'filter' => 'ExactMatchFilter', + ], + 'Created' => 'Added', + 'StartAfter' => 'Scheduled', + ]) + ->setFilterFields([ + 'JobType' => $queueType = DropdownField::create( + '', + '', + $this->getQueueTypes() + ), + 'JobStatus' => $jobStatus = DropdownField::create( + '', + '', + $this->getJobStatuses() + ), + 'Added' => $added = DropdownField::create( + '', + '', + $this->getAddedDates() + ), + 'Scheduled' => $scheduled = DropdownField::create( + '', + '', + [ + self::SCHEDULED_FILTER_FUTURE => self::SCHEDULED_FILTER_FUTURE, + self::SCHEDULED_FILTER_PAST => self::SCHEDULED_FILTER_PAST, + ] + ), + ]) + ->setFilterMethods([ + 'Added' => static function (DataList $list, $name, $value): DataList { + if ($value) { + $added = DBDatetime::now()->modify($value); + + return $list->filter(['Created:LessThanOrEqual' => $added->Rfc2822()]); + } + + return $list; + }, + 'Scheduled' => static function (DataList $list, $name, $value): DataList { + if ($value === static::SCHEDULED_FILTER_FUTURE) { + return $list->filter([ + 'StartAfter:GreaterThan' => DBDatetime::now()->Rfc2822(), + ]); + } + + if ($value === static::SCHEDULED_FILTER_PAST) { + return $list->filter([ + 'StartAfter:LessThanOrEqual' => DBDatetime::now()->Rfc2822(), + ]); + } + + return $list; + }, + ]); + + foreach ([$jobStatus, $queueType, $added, $scheduled] as $dropDownField) { + /** @var DropdownField $dropDownField */ + $dropDownField->setEmptyString('-- select --'); + } + + $config->addComponent($filter, GridFieldPaginator::class); + } + + /** + * Queue types options for drop down field + * + * @return array + */ + private function getQueueTypes(): array + { + /** @var QueuedJobDescriptor $job */ + $job = QueuedJobDescriptor::singleton(); + $map = $job->getJobTypeValues(); + $values = array_values($map); + $keys = []; + + foreach (array_keys($map) as $key) { + $keys[] = (int) $key; + } + + return array_combine($keys, $values); + } + + /** + * All possible job statuses (this list is not exposed by the module) + * intended to be used in a drop down field + * + * @return array + */ + private function getJobStatuses(): array + { + /** @var QueuedJobDescriptor $job */ + $job = QueuedJobDescriptor::singleton(); + $statuses = $job->getJobStatusValues(); + + sort($statuses, SORT_STRING); + + $statuses = array_combine($statuses, $statuses); + + return $statuses; + } + + /** + * Encode class name to match the matching CMS field name + * + * @param string $className + * @return string + */ + private function encodeClassName(string $className): string + { + return str_replace('\\', '-', $className); + } + + /** + * Date options for added dates drop down field + * + * @return array + */ + private function getAddedDates(): array + { + return [ + '-1 day' => '1 day or older', + '-3 day' => '3 days or older', + '-7 day' => '7 days or older', + '-14 day' => '14 days or older', + '-1 month' => '1 month or older', + ]; + } +} diff --git a/src/Queue/Cleanup/Task.php b/src/Queue/Cleanup/Task.php new file mode 100644 index 00000000..0981392f --- /dev/null +++ b/src/Queue/Cleanup/Task.php @@ -0,0 +1,63 @@ +isMaintenanceLockActive()) { + return; + } + + $table = QueuedJobDescriptor::config()->get('table_name'); + + // determine expiry + $expired = DBDatetime::now()->modify(sprintf('-%s hours', self::EXPIRY_HOURS))->Rfc2822(); + + // Format query + $query = sprintf( + "DELETE FROM `%s` WHERE `JobStatus` = '%s' AND (`JobFinished` <= '%s' OR `JobFinished` IS NULL) LIMIT %d", + $table, + QueuedJob::STATUS_COMPLETE, + $expired, + self::EXPIRY_LIMIT + ); + + DB::query($query); + + echo sprintf('%d job descriptors deleted.', (int) DB::affected_rows()); + } +} diff --git a/src/Queue/Dev/Job.php b/src/Queue/Dev/Job.php new file mode 100644 index 00000000..b15883f5 --- /dev/null +++ b/src/Queue/Dev/Job.php @@ -0,0 +1,62 @@ +type = $type; + $this->randomID = $randomID; + $this->items = [1, 2, 3, 4, 5]; + } + + /** + * @return string + */ + public function getTitle(): string + { + return 'Test job'; + } + + /** + * @return int|null + */ + public function getJobType(): int + { + return (int) $this->type; + } + + public function getRunAsMemberID(): ?int + { + return 0; + } + + /** + * @param mixed $item + */ + public function processItem($item): void + { + $this->addMessage(sprintf('Step %d at %s', $item, DBDatetime::now()->Rfc2822())); + sleep(1); + } +} diff --git a/src/Queue/Dev/Task.php b/src/Queue/Dev/Task.php new file mode 100644 index 00000000..11b234d4 --- /dev/null +++ b/src/Queue/Dev/Task.php @@ -0,0 +1,57 @@ +Pass GET param ?total=x to create x jobs.
'; + echo 'Pass GET param ?type=(2|3) to create jobs in medium|large queues respectively' + . ' (defaults to large).
'; + + $total = $request->getVar('total') ?: 0; + $type = $request->getVar('type') ?: QueuedJob::LARGE; + $service = QueuedJobService::singleton(); + + for ($i = 1; $i <= $total; $i += 1) { + $randomId = $i . DBDatetime::now()->getTimestamp(); + $job = new Job(); + $job->hydrate((int) $type, (int) $randomId); + $service->queueJob($job); + } + } +} diff --git a/src/Queue/ExecutionTime.php b/src/Queue/ExecutionTime.php new file mode 100644 index 00000000..b3807b12 --- /dev/null +++ b/src/Queue/ExecutionTime.php @@ -0,0 +1,46 @@ +getMaxExecution(); + + try { + $this->setMaxExecution($executionTime); + + return $callback(); + } finally { + $this->setMaxExecution($originalTime); + } + } +} diff --git a/src/Queue/Extension.php b/src/Queue/Extension.php new file mode 100644 index 00000000..80e8fd59 --- /dev/null +++ b/src/Queue/Extension.php @@ -0,0 +1,37 @@ +addMessage(sprintf('%s : %s', ClassInfo::shortName($e), $e->getMessage())); + } +} diff --git a/src/Queue/Factory/Job.php b/src/Queue/Factory/Job.php new file mode 100644 index 00000000..0d4d9628 --- /dev/null +++ b/src/Queue/Factory/Job.php @@ -0,0 +1,46 @@ +jobClass = $jobClass; + $this->items = $items; + } + + public function getTitle(): string + { + return 'Factory job'; + } + + /** + * @param mixed $item + * @throws ValidationException + */ + protected function processItem($item): void + { + if (!is_array($item) || count($item) === 0) { + return; + } + + $job = Injector::inst()->create($this->jobClass); + $job->hydrate(array_values($item)); + QueuedJobService::singleton()->queueJob($job); + } +} diff --git a/src/Queue/Factory/Task.php b/src/Queue/Factory/Task.php new file mode 100644 index 00000000..1b9a728f --- /dev/null +++ b/src/Queue/Factory/Task.php @@ -0,0 +1,146 @@ +getVar('limit'); + $offset = (int) $request->getVar('offset'); + + if ($limit > 0) { + $list = $list->limit($limit, $offset); + } + + $ids = $list->columnUnique('ID'); + $this->queueJobsFromIds($request, $ids, $jobClass, $size); + } + + /** + * @param HTTPRequest $request + * @param array $ids + * @param string $jobClass + * @param int $size + * @throws ValidationException + */ + protected function queueJobsFromIds(HTTPRequest $request, array $ids, string $jobClass, int $size): void + { + $ids = $this->formatIds($ids); + $this->queueJobsFromData($request, $ids, $jobClass, $size); + } + + /** + * @param HTTPRequest $request + * @param array $data + * @param string $jobClass + * @param int $size + * @throws ValidationException + */ + protected function queueJobsFromData(HTTPRequest $request, array $data, string $jobClass, int $size): void + { + if (count($data) === 0) { + return; + } + + $jobs = $request->getVar('jobs') ?? self::FACTORY_JOBS_BATCH_SIZE; + $jobs = (int) $jobs; + + $chunkSize = (int) $request->getVar('size'); + $chunkSize = $chunkSize > 0 + ? $chunkSize + : $size; + + $chunks = array_chunk($data, $chunkSize); + + if ($jobs > 0) { + $this->createFactoryJobs($chunks, $jobClass, $jobs); + + return; + } + + $this->createSpecifiedJobs($chunks, $jobClass); + } + + /** + * @param array $chunks + * @param string $jobClass + * @throws ValidationException + */ + private function createSpecifiedJobs(array $chunks, string $jobClass): void + { + $service = QueuedJobService::singleton(); + + foreach ($chunks as $chunk) { + $job = Injector::inst()->create($jobClass); + $job->hydrate(array_values($chunk)); + $service->queueJob($job); + } + } + + /** + * @param array $chunks + * @param string $jobClass + * @param int $chunkSize + * @throws ValidationException + */ + private function createFactoryJobs(array $chunks, string $jobClass, int $chunkSize): void + { + $service = QueuedJobService::singleton(); + $chunks = array_chunk($chunks, $chunkSize); + + foreach ($chunks as $chunk) { + $job = new Job(); + $job->hydrate($jobClass, array_values($chunk)); + $service->queueJob($job); + } + } + + /** + * Cast all IDs to int so we don't end up with type errors + * + * @param array $ids + * @return array + */ + private function formatIds(array $ids): array + { + $formatted = []; + + foreach ($ids as $id) { + $formatted[] = (int) $id; + } + + return $formatted; + } +} diff --git a/src/Queue/Job.php b/src/Queue/Job.php new file mode 100644 index 00000000..f943fea1 --- /dev/null +++ b/src/Queue/Job.php @@ -0,0 +1,64 @@ +remaining = $this->items; + $this->totalSteps = count($this->items); + } + + public function process(): void + { + $remaining = $this->remaining; + + // check for trivial case + if (count($remaining) === 0) { + $this->isComplete = true; + + return; + } + + $item = array_shift($remaining); + + $this->processItem($item); + + // update job progress + $this->remaining = $remaining; + $this->currentStep += 1; + + // check for job completion + if (count($remaining) > 0) { + return; + } + + $this->isComplete = true; + } + + /** + * @param mixed $item + */ + abstract protected function processItem($item): void; +} diff --git a/src/Queue/Logger.php b/src/Queue/Logger.php new file mode 100644 index 00000000..d0649f46 --- /dev/null +++ b/src/Queue/Logger.php @@ -0,0 +1,83 @@ +job = $job; + } + + public function debug($message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + public function critical($message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + public function alert($message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + public function log($level, $message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + public function emergency($message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + public function warning($message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + public function error($message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + public function notice($message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + public function info($message, array $context = []): void // phpcs:ignore SlevomatCodingStandard.TypeHints + { + $this->logJobMessage($message); + } + + private function logJobMessage(string $message): void + { + $job = $this->job; + + if (!$job instanceof QueuedJob) { + return; + } + + $job->addMessage($message); + } +} diff --git a/src/Queue/Management/Report.php b/src/Queue/Management/Report.php new file mode 100644 index 00000000..213ad440 --- /dev/null +++ b/src/Queue/Management/Report.php @@ -0,0 +1,279 @@ +Rfc2822(); + $service = QueuedJobService::singleton(); + $queueState = []; + + if ($service->isMaintenanceLockActive()) { + $queueState[] = 'Paused'; + } + + if ($service->isAtMaxJobs()) { + $queueState[] = 'Maximum init jobs'; + } + + $queueState = $queueState + ? implode(' ', $queueState) + : 'Running'; + + // job states + $query = SQLSelect::create( + '`JobStatus`, COUNT(`JobStatus`) as `count`', + 'QueuedJobDescriptor', + ['StartAfter IS NULL OR StartAfter <= ?' => $now], + ['count' => 'DESC'], + ['JobStatus'] + ); + + $results = $query->execute(); + $totalJobs = 0; + + $jobsData = []; + + while ($result = $results->next()) { + $status = $result['JobStatus']; + $count = $result['count']; + $jobsData[$status] = $count; + $totalJobs+= $count; + } + + $brokenJobs = array_key_exists(QueuedJob::STATUS_BROKEN, $jobsData) + ? $jobsData[QueuedJob::STATUS_BROKEN] + : 0; + $newsJobs = array_key_exists(QueuedJob::STATUS_NEW, $jobsData) + ? $jobsData[QueuedJob::STATUS_NEW] + : 0; + $initJobs = array_key_exists(QueuedJob::STATUS_INIT, $jobsData) + ? $jobsData[QueuedJob::STATUS_INIT] + : 0; + $runningJobs = array_key_exists(QueuedJob::STATUS_RUN, $jobsData) + ? $jobsData[QueuedJob::STATUS_RUN] + : 0; + $completedJobs = array_key_exists(QueuedJob::STATUS_COMPLETE, $jobsData) + ? $jobsData[QueuedJob::STATUS_COMPLETE] + : 0; + $jobsInProgress = $newsJobs + $initJobs + $runningJobs; + + $queueState = $queueState === 'Running' && $jobsInProgress === 0 + ? 'Idle' + : $queueState; + + // progress bar + echo sprintf( + '%d - %s
', $count, $status); + } + + echo sprintf('%d - Total
', $totalJobs); + + // first and last completed job + $query = SQLSelect::create( + 'MAX(`JobFinished`) as `last_job`, MIN(`JobStarted`) as `first_job`', + 'QueuedJobDescriptor', + [['JobStatus' => QueuedJob::STATUS_COMPLETE]] + ); + + $results = $query->execute(); + $result = $results->first(); + $firstJob = $result['first_job'] ?? ''; + $lastJob = $result['last_job'] ?? ''; + + // total job duration + $query = SQLSelect::create( + sprintf( + '`JobTitle`, SUM(UNIX_TIMESTAMP(`JobFinished`) - UNIX_TIMESTAMP(%s) as `duration`, COUNT(*) as `count`', + 'COALESCE(`JobRestarted`, `JobStarted`))' + ), + 'QueuedJobDescriptor', + [['JobStatus' => QueuedJob::STATUS_COMPLETE]], + ['duration' => 'DESC'], + ['JobTitle'] + ); + + $results = $query->execute(); + + $totalDuration = 0; + $jobDurations = []; + $jobTypesCompleted = []; + $jobQueueTypeCompleted = []; + + while ($result = $results->next()) { + $jobType = $result['JobTitle']; + $duration = $result['duration']; + $totalDuration += $duration; + + $jobDurations[$jobType] = $duration; + + $count = $result['count']; + $jobTypesCompleted[$jobType] = $count; + } + + // total job duration + $query = SQLSelect::create( + 'JobType, COUNT(*) as `count`', + 'QueuedJobDescriptor', + [['JobStatus' => QueuedJob::STATUS_COMPLETE]], + [], + ['JobType'] + ); + + $results = $query->execute(); + + while ($result = $results->next()) { + $jobType = $result['JobType']; + $count = $result['count']; + + $jobQueueTypeCompleted[$jobType] = $count; + } + + $elapsed = 0; + + if ($totalDuration > 0) { + echo sprintf('%d s - total job duration
', $totalDuration); + echo sprintf('%0.4f s - average job duration
', $totalDuration / $completedJobs); + echo sprintf('%s - first job
', $firstJob); + echo sprintf('%s - last job
', $lastJob); + + $elapsed = strtotime($lastJob) - strtotime($firstJob); + echo sprintf('%s - elapsed time (s)
', $elapsed); + } else { + echo 'No completed jobs found
'; + } + + echo '%d s - %s
', $duration, $jobType); + } + + echo '%d jobs - %s
', $completed, $jobType); + } + + echo '%d - %s
', $completed, $queueTypes[(string) $jobType]); + } + + echo '%f s/job - %s
', ($duration / $completed), $jobType); + } + + echo '%f jobs/elapsed second - %s
', ($completed / $elapsed), $jobType); + } + } + + // job type breakdown + $query = SQLSelect::create( + '`JobTitle`, COUNT(`JobTitle`) as `count`', + 'QueuedJobDescriptor', + ['StartAfter IS NULL OR StartAfter <= ?' => $now], + ['count' => 'DESC'], + ['JobTitle'] + ); + + $results = $query->execute(); + echo '%d - %s
', $count, $result['JobTitle']); + } + } +} diff --git a/src/Queue/Management/Task.php b/src/Queue/Management/Task.php new file mode 100644 index 00000000..da678ce9 --- /dev/null +++ b/src/Queue/Management/Task.php @@ -0,0 +1,176 @@ +getJobStatusValues(); + sort($statuses, SORT_STRING); + + $currentStatuses = array_diff($statuses, [ + QueuedJob::STATUS_COMPLETE, + ]); + + // job implementations + $query = SQLSelect::create( + 'DISTINCT `Implementation`', + 'QueuedJobDescriptor', + ['`JobStatus` != ?' => QueuedJob::STATUS_COMPLETE], + ['Implementation' => 'ASC'] + ); + + $results = $query->execute(); + + $implementations = []; + + // Add job types + while ($result = $results->next()) { + $implementation = $result['Implementation']; + + if (!$implementation) { + continue; + } + + $implementations[] = $result['Implementation']; + } + + if (count($implementations) === 0) { + echo 'No job implementations found.'; + + return; + } + + $implementation = $request->postVar('implementation'); + $currentStatus = $request->postVar('currentStatus'); + $status = $request->postVar('status'); + + if ($implementation + && $status + && ($implementation === 'all' || in_array($implementation, $implementations)) + && ($currentStatus === 'any' || in_array($currentStatus, $currentStatuses)) + && in_array($status, $statuses) + ) { + $where = [ + ['`JobStatus` != ?' => QueuedJob::STATUS_COMPLETE], + ]; + + // Filter by implementation + $where[] = $implementation === 'all' + ? '`Implementation` IN ' . sprintf( + "('%s')", + str_replace('\\', '\\\\', implode("','", $implementations)) + ) + : ['`Implementation`' => $implementation]; + + // Filter by status + if ($currentStatus !== 'any') { + $where[] = ['`JobStatus`' => $currentStatus]; + } + + // Assemble query + $query = SQLUpdate::create( + 'QueuedJobDescriptor', + [ + 'JobStatus' => $status, + // make sure to reset all data which is related to job management + // job lock + 'Worker' => null, + 'Expiry' => null, + // resume / pause + 'ResumeCounts' => 0, + // broken job notification + 'NotifiedBroken' => 0, + ], + $where + ); + + $query->execute(); + + echo sprintf('Job status updated (%d rows affected).', DB::affected_rows()); + + return; + } + + echo ''; + } +} diff --git a/src/Queue/Manager.php b/src/Queue/Manager.php new file mode 100644 index 00000000..5753321c --- /dev/null +++ b/src/Queue/Manager.php @@ -0,0 +1,36 @@ + =>