Commit | Line | Data |
---|---|---|
db6698ad | 1 | <?php |
a9229942 | 2 | |
db6698ad | 3 | namespace wcf\system\background; |
a9229942 | 4 | |
6b4fb98a | 5 | use wcf\data\user\User; |
db6698ad | 6 | use wcf\system\background\job\AbstractBackgroundJob; |
e4e29d4e | 7 | use wcf\system\background\job\AbstractUniqueBackgroundJob; |
7b9ff46b | 8 | use wcf\system\exception\ParentClassException; |
4205bd62 | 9 | use wcf\system\session\SessionHandler; |
db6698ad TD |
10 | use wcf\system\SingletonFactory; |
11 | use wcf\system\WCF; | |
12 | ||
13 | /** | |
14 | * Manages the background queue. | |
a9229942 TD |
15 | * |
16 | * @author Tim Duesterhus | |
0aac9571 | 17 | * @copyright 2001-2022 WoltLab GmbH |
a9229942 | 18 | * @license GNU Lesser General Public License <http://opensource.org/licenses/lgpl-license.php> |
a9229942 | 19 | * @since 3.0 |
db6698ad | 20 | */ |
0aac9571 | 21 | final class BackgroundQueueHandler extends SingletonFactory |
a9229942 | 22 | { |
85d53532 | 23 | public const FORCE_CHECK_HTTP_HEADER_NAME = 'woltlab-background-queue-check'; |
8da397b1 | 24 | |
85d53532 AE |
25 | public const FORCE_CHECK_HTTP_HEADER_VALUE = 'yes'; |
26 | ||
4051e456 AE |
27 | private bool $hasPendingCheck = false; |
28 | ||
a9229942 TD |
29 | /** |
30 | * Forces checking whether a background queue item is due. | |
31 | * This means that the AJAX request to BackgroundQueuePerformAction is triggered. | |
32 | */ | |
e6a154d8 | 33 | public function forceCheck(): void |
a9229942 TD |
34 | { |
35 | WCF::getSession()->register('forceBackgroundQueuePerform', true); | |
36 | ||
37 | WCF::getTPL()->assign([ | |
38 | 'forceBackgroundQueuePerform' => true, | |
39 | ]); | |
4051e456 AE |
40 | |
41 | $this->hasPendingCheck = true; | |
a9229942 TD |
42 | } |
43 | ||
44 | /** | |
45 | * Enqueues the given job(s) for execution in the specified number of | |
46 | * seconds. Defaults to "as soon as possible" (0 seconds). | |
47 | * | |
e6a154d8 TD |
48 | * @param AbstractBackgroundJob|AbstractBackgroundJob[] $jobs |
49 | * @param $time Minimum number of seconds to wait before performing the job. | |
a9229942 TD |
50 | * @see \wcf\system\background\BackgroundQueueHandler::enqueueAt() |
51 | */ | |
e6a154d8 | 52 | public function enqueueIn(AbstractBackgroundJob|array $jobs, int $time = 0): void |
a9229942 TD |
53 | { |
54 | $this->enqueueAt($jobs, TIME_NOW + $time); | |
55 | } | |
56 | ||
57 | /** | |
58 | * Enqueues the given job(s) for execution at the given time. | |
59 | * Note: The time is a minimum time. Depending on the size of | |
60 | * the queue the job can be performed later as well! | |
61 | * | |
e6a154d8 TD |
62 | * @param AbstractBackgroundJob|AbstractBackgroundJob[] $jobs |
63 | * @param $time Earliest time to consider the job for execution. | |
a9229942 TD |
64 | * @throws \InvalidArgumentException |
65 | */ | |
e6a154d8 | 66 | public function enqueueAt(AbstractBackgroundJob|array $jobs, int $time): void |
a9229942 TD |
67 | { |
68 | if ($time < TIME_NOW) { | |
69 | throw new \InvalidArgumentException("You may not schedule a job in the past (" . $time . " is smaller than the current timestamp " . TIME_NOW . ")."); | |
70 | } | |
71 | if (!\is_array($jobs)) { | |
72 | $jobs = [$jobs]; | |
73 | } | |
e4e29d4e | 74 | |
a9229942 TD |
75 | foreach ($jobs as $job) { |
76 | if (!($job instanceof AbstractBackgroundJob)) { | |
77 | throw new ParentClassException(\get_class($job), AbstractBackgroundJob::class); | |
78 | } | |
79 | } | |
80 | ||
6bd7f10e C |
81 | $committed = false; |
82 | try { | |
83 | WCF::getDB()->beginTransaction(); | |
84 | $sql = "INSERT INTO wcf1_background_job | |
85 | (job, time,identifier) | |
86 | VALUES (?, ?, ?)"; | |
87 | $statement = WCF::getDB()->prepare($sql); | |
88 | $sql = "SELECT jobID | |
89 | FROM wcf1_background_job | |
90 | WHERE identifier = ? | |
91 | FOR UPDATE"; | |
92 | $selectJobStatement = WCF::getDB()->prepare($sql); | |
e4e29d4e | 93 | |
6bd7f10e C |
94 | foreach ($jobs as $job) { |
95 | $identifier = null; | |
96 | if ($job instanceof AbstractUniqueBackgroundJob) { | |
97 | // Check if the job is already in the queue | |
98 | $selectJobStatement->execute([$job->identifier()]); | |
99 | $jobID = $selectJobStatement->fetchSingleColumn(); | |
100 | if ($jobID !== false) { | |
101 | continue; | |
102 | } | |
103 | $identifier = $job->identifier(); | |
e4e29d4e | 104 | } |
e4e29d4e | 105 | |
6bd7f10e | 106 | $statement->execute([ |
86543b0c C |
107 | \serialize($job), |
108 | $time, | |
109 | $identifier | |
6bd7f10e C |
110 | ]); |
111 | } | |
112 | WCF::getDB()->commitTransaction(); | |
113 | $committed = true; | |
114 | } finally { | |
115 | if (!$committed) { | |
116 | WCF::getDB()->rollBackTransaction(); | |
117 | } | |
a9229942 | 118 | } |
a9229942 TD |
119 | } |
120 | ||
121 | /** | |
122 | * Immediately performs the given job. | |
123 | * This method automatically handles requeuing in case of failure. | |
124 | * | |
125 | * This method is used internally by performNextJob(), but it can | |
126 | * be useful if you wish immediate execution of a certain job, but | |
127 | * don't want to miss the automated error handling mechanism of the | |
128 | * queue. | |
129 | * | |
e6a154d8 | 130 | * @param $debugSynchronousExecution Disables fail-safe mechanisms, errors will no longer be suppressed. |
a9229942 TD |
131 | * @throws \Throwable |
132 | */ | |
e6a154d8 | 133 | public function performJob(AbstractBackgroundJob $job, bool $debugSynchronousExecution = false): void |
a9229942 TD |
134 | { |
135 | $user = WCF::getUser(); | |
136 | ||
137 | try { | |
138 | SessionHandler::getInstance()->changeUser(new User(null), true); | |
139 | if (!WCF::debugModeIsEnabled()) { | |
140 | \ob_start(); | |
141 | } | |
142 | $job->perform(); | |
143 | } catch (\Throwable $e) { | |
144 | // do not suppress exceptions for debugging purposes, see https://github.com/WoltLab/WCF/issues/2501 | |
145 | if ($debugSynchronousExecution) { | |
146 | throw $e; | |
147 | } | |
148 | ||
149 | $job->fail(); | |
150 | ||
151 | if ($job->getFailures() <= $job::MAX_FAILURES) { | |
152 | $this->enqueueIn($job, $job->retryAfter()); | |
153 | ||
154 | if (WCF::debugModeIsEnabled()) { | |
155 | \wcf\functions\exception\logThrowable($e); | |
156 | } | |
157 | } else { | |
158 | $job->onFinalFailure(); | |
159 | ||
160 | // job failed too often: log | |
161 | \wcf\functions\exception\logThrowable($e); | |
162 | } | |
163 | } finally { | |
164 | if (!WCF::debugModeIsEnabled()) { | |
165 | \ob_end_clean(); | |
166 | } | |
167 | SessionHandler::getInstance()->changeUser($user, true); | |
168 | } | |
169 | } | |
170 | ||
171 | /** | |
172 | * Performs the (single) job that is due next. | |
173 | * This method automatically handles requeuing in case of failure. | |
174 | * | |
175 | * @return bool true if this call attempted to execute a job regardless of its result | |
176 | */ | |
e6a154d8 | 177 | public function performNextJob(): bool |
a9229942 TD |
178 | { |
179 | WCF::getDB()->beginTransaction(); | |
180 | $committed = false; | |
181 | try { | |
182 | $sql = "SELECT jobID, job | |
5786b603 | 183 | FROM wcf1_background_job |
a9229942 TD |
184 | WHERE status = ? |
185 | AND time <= ? | |
186 | ORDER BY time ASC, jobID ASC | |
187 | FOR UPDATE"; | |
5786b603 | 188 | $statement = WCF::getDB()->prepare($sql, 1); |
a9229942 TD |
189 | $statement->execute([ |
190 | 'ready', | |
191 | TIME_NOW, | |
192 | ]); | |
193 | $row = $statement->fetchSingleRow(); | |
194 | if (!$row) { | |
195 | // nothing to do here | |
196 | return false; | |
197 | } | |
198 | ||
199 | // lock job | |
5786b603 | 200 | $sql = "UPDATE wcf1_background_job |
a9229942 TD |
201 | SET status = ?, |
202 | time = ? | |
203 | WHERE jobID = ? | |
204 | AND status = ?"; | |
5786b603 | 205 | $statement = WCF::getDB()->prepare($sql); |
a9229942 TD |
206 | $statement->execute([ |
207 | 'processing', | |
208 | TIME_NOW, | |
209 | $row['jobID'], | |
210 | 'ready', | |
211 | ]); | |
212 | if ($statement->getAffectedRows() != 1) { | |
213 | // somebody stole the job | |
214 | // this cannot happen unless MySQL violates it's contract to lock the row | |
215 | // -> silently ignore, there will be plenty of other opportunities to perform a job | |
216 | return true; | |
217 | } | |
218 | WCF::getDB()->commitTransaction(); | |
219 | $committed = true; | |
220 | } finally { | |
221 | if (!$committed) { | |
222 | WCF::getDB()->rollBackTransaction(); | |
223 | } | |
224 | } | |
225 | ||
226 | $job = null; | |
227 | try { | |
228 | // no shut up operator, exception will be caught | |
229 | $job = \unserialize($row['job']); | |
230 | if ($job) { | |
231 | $this->performJob($job); | |
232 | } | |
233 | } catch (\Throwable $e) { | |
234 | // job is completely broken: log | |
235 | \wcf\functions\exception\logThrowable($e); | |
236 | } finally { | |
237 | // remove entry of processed job | |
5786b603 | 238 | $sql = "DELETE FROM wcf1_background_job |
a9229942 | 239 | WHERE jobID = ?"; |
5786b603 | 240 | $statement = WCF::getDB()->prepare($sql); |
a9229942 TD |
241 | $statement->execute([$row['jobID']]); |
242 | } | |
86543b0c C |
243 | if ($job instanceof AbstractUniqueBackgroundJob && $job->queueAgain()) { |
244 | $this->enqueueIn($job->newInstance(), $job->retryAfter()); | |
245 | } | |
a9229942 TD |
246 | |
247 | return true; | |
248 | } | |
249 | ||
250 | /** | |
251 | * Returns how many items are due. | |
e6a154d8 | 252 | * |
a9229942 TD |
253 | * Note: Do not rely on the return value being correct, some other process may |
254 | * have modified the queue contents, before this method returns. Think of it as an | |
255 | * approximation to know whether you should spend some time to clear the queue. | |
a9229942 | 256 | */ |
e6a154d8 | 257 | public function getRunnableCount(): int |
a9229942 TD |
258 | { |
259 | $sql = "SELECT COUNT(*) | |
5786b603 | 260 | FROM wcf1_background_job |
a9229942 TD |
261 | WHERE status = ? |
262 | AND time <= ?"; | |
5786b603 | 263 | $statement = WCF::getDB()->prepare($sql); |
a9229942 TD |
264 | $statement->execute(['ready', TIME_NOW]); |
265 | ||
266 | return $statement->fetchSingleColumn(); | |
267 | } | |
4051e456 AE |
268 | |
269 | /** | |
58362798 AE |
270 | * Indicates that the client should trigger a check for |
271 | * pending jobs in the background queue. | |
272 | * | |
4051e456 AE |
273 | * @since 6.0 |
274 | */ | |
275 | public function hasPendingCheck(): bool | |
276 | { | |
277 | return $this->hasPendingCheck; | |
278 | } | |
6032b6a7 | 279 | } |