use wcf\system\Regex;
use wcf\util\CLIUtil;
use wcf\util\DirectoryUtil;
+use wcf\util\JSON;
use wcf\util\StringUtil;
use Zend\Console\Exception\RuntimeException as ArgvException;
use Zend\Console\Getopt as ArgvParser;
if (!empty($_ENV['WCF_WORKER_STATUS_FD'])) {
$output = new File("php://fd/".$_ENV['WCF_WORKER_STATUS_FD'], "w");
}
- for ($i = ($threadId !== null ? ($threadId + 1) : 0); $progress < 100; $i += $threads) {
- $worker->setLoopCount($i);
- $worker->validate();
-
- // execute worker
- $worker->execute();
- $worker->finalize();
-
- // update progress
- $progress = $worker->getProgress();
- $progressbar->update($progress);
+ try {
+ for ($i = ($threadId !== null ? ($threadId + 1) : 0); $progress < 100; $i += $threads) {
+ $worker->setLoopCount($i);
+ $worker->validate();
+
+ // execute worker
+ $worker->execute();
+ $worker->finalize();
+
+ // update progress
+ $progress = $worker->getProgress();
+ $progressbar->update($progress);
+ if ($output) {
+ $output->write(JSON::encode([
+ 'iteration' => $i,
+ 'progress' => $progress,
+ ])."\n");
+ }
+ }
+ }
+ catch (\Exception $e) {
if ($output) {
- $output->write("$i,$progress\n");
+ $output->write(JSON::encode([
+ 'error' => (string) $e,
+ ]));
}
+ throw $e;
}
if ($output) {
- $output->write("done,$progress");
+ $output->write(JSON::encode([
+ 'finished' => true,
+ 'progress' => $progress,
+ ])."\n");
$output->close();
}
$progressbar->update($progress);
// Invoke the worker processes with the same command line ...
$arguments = $_SERVER['argv'];
- // ... with the quiet argument.
+ // ... with the quiet argument ...
$arguments[] = '-qqqqq';
+ // ... and requesting an unclean exit of an Exception is thrown.
+ $arguments[] = '--exitOnFail';
$commandLine = PHP_BINARY.' '.implode(' ', array_map('escapeshellarg', $arguments));
Log::debug('Using "'.$commandLine.'" as the worker command line.');
'WCF_SESSION_ID' => CLIWCF::getSession()->sessionID,
]);
- // 1) Spawn the processes.
- $processes = [];
- for ($threadId = 0; $threadId < $threads; $threadId++) {
- $process = proc_open($commandLine, $fileDescriptors, $pipes, null, $env, null);
-
- $consoleBar = new ConsoleProgressBar([
- 'width' => CLIWCF::getTerminal()->getWidth()
- ]);
- $consoleBar->setElements([
- ConsoleProgressBar::ELEMENT_PERCENT,
- ConsoleProgressBar::ELEMENT_BAR,
- ConsoleProgressBar::ELEMENT_ETA,
- ConsoleProgressBar::ELEMENT_TEXT,
- ]);
- $consoleBar->setTextWidth(30);
- $progressbar = new ProgressBar($consoleBar);
- $progressbar->update(0, 'T'.$threadId.': spawned');
- echo "\n";
-
- $processes[] = [
- 'threadId' => $threadId,
- 'pipes' => $pipes,
- 'process' => $process,
- 'progressbar' => $progressbar,
- ];
- }
-
- // 2) Start processing in all processes.
- foreach ($processes as $processData) {
- $workerCommand = 'worker --threads='.$threads.' --threadId='.$processData['threadId'].' "'.addcslashes(get_class($worker), "\\\"");
- fwrite($processData['pipes'][0], $workerCommand."\n");
- fclose($processData['pipes'][0]);
- }
-
- // 3) Handle their status output.
- while (true) {
- // 3.1) Check which processes' status FD is readable.
- $read = array_filter(array_map(function ($processData) {
- return $processData['pipes'][3];
- }, $processes), function ($handle) {
- return $handle !== false;
- });
- $write = null;
- $except = null;
-
- // 3.2) Exit if all status FDs have been closed.
- if (empty($read)) {
- break;
+ try {
+ // 1) Spawn the processes.
+ $processes = [];
+ for ($threadId = 0; $threadId < $threads; $threadId++) {
+ $process = proc_open($commandLine, $fileDescriptors, $pipes, null, $env, null);
+
+ $consoleBar = new ConsoleProgressBar([
+ 'width' => CLIWCF::getTerminal()->getWidth()
+ ]);
+ $consoleBar->setElements([
+ ConsoleProgressBar::ELEMENT_PERCENT,
+ ConsoleProgressBar::ELEMENT_BAR,
+ ConsoleProgressBar::ELEMENT_ETA,
+ ConsoleProgressBar::ELEMENT_TEXT,
+ ]);
+ $consoleBar->setTextWidth(30);
+ $progressbar = new ProgressBar($consoleBar);
+ $progressbar->update(0, 'T'.$threadId.': spawned');
+ echo "\n";
+
+ $processes[] = [
+ 'threadId' => $threadId,
+ 'pipes' => $pipes,
+ 'process' => $process,
+ 'progressbar' => $progressbar,
+ ];
}
- stream_select($read, $write, $except, 2, null);
+ // 2) Start processing in all processes.
+ foreach ($processes as $processData) {
+ $workerCommand = 'worker --threads='.$threads.' --threadId='.$processData['threadId'].' "'.addcslashes(get_class($worker), "\\\"");
+ fwrite($processData['pipes'][0], $workerCommand."\n");
+ fclose($processData['pipes'][0]);
+ }
- // 3.3) Rerender the progressbars with the updated status information.
- echo "\033[".$threads."A"; // Move up $threads lines to move into the line of the first progress bar.
- foreach ($processes as &$processData) {
- $status = proc_get_status($processData['process']);
- if ($status === false) {
- throw new \LogicException('Unreachable');
+ // 3) Handle their status output.
+ while (true) {
+ // 3.1) Check which processes' status FD is readable.
+ $read = array_filter(array_map(function ($processData) {
+ return $processData['pipes'][3];
+ }, $processes), function ($handle) {
+ return $handle !== false;
+ });
+ $write = null;
+ $except = null;
+
+ // 3.2) Exit if all status FDs have been closed.
+ if (empty($read)) {
+ break;
}
- $line = false;
- if ($processData['pipes'][3] !== false) {
- if (in_array($processData['pipes'][3], $read)) {
- $line = fgets($processData['pipes'][3]);
+ stream_select($read, $write, $except, 2, null);
+
+ // 3.3) Rerender the progressbars with the updated status information.
+ echo "\033[".$threads."A"; // Move up $threads lines to move into the line of the first progress bar.
+ $cursorOffset = -$threads;
+ foreach ($processes as &$processData) {
+ $status = proc_get_status($processData['process']);
+ if ($status === false) {
+ throw new \LogicException('Unreachable');
}
- if (feof($processData['pipes'][3])) {
- fclose($processData['pipes'][3]);
- $processData['pipes'][3] = false;
+ $line = false;
+ if ($processData['pipes'][3] !== false) {
+ if (in_array($processData['pipes'][3], $read)) {
+ $line = fgets($processData['pipes'][3]);
+ }
+
+ if (feof($processData['pipes'][3])) {
+ fclose($processData['pipes'][3]);
+ $processData['pipes'][3] = false;
+ }
}
- }
-
- $statusPrefix = 'T'.$processData['threadId'].': ';
- if ($line) {
- // If a line could be read we update the progressbar with the data sent.
- list($iteration, $progress) = explode(',', StringUtil::trim($line));
- if ($iteration === 'done') {
- $processData['progressbar']->update($progress, $statusPrefix.'finished');
+
+ $statusPrefix = 'T'.$processData['threadId'].': ';
+ if ($line) {
+ // If a line could be read we update the progressbar with the data sent.
+ $parsedLine = JSON::decode(StringUtil::trim($line));
+
+ if (!empty($parsedLine['error'])) {
+ fwrite(STDERR, str_repeat("\n", -$cursorOffset + 1));
+ fwrite(STDERR, str_repeat("=", 20)."\n");
+ fwrite(STDERR, "Error in thread ".$processData['threadId']."\n");
+ fwrite(STDERR, $parsedLine['error']."\n");
+ fwrite(STDERR, str_repeat("=", 20)."\n");
+ fwrite(STDERR, str_repeat("\n", $threads + $cursorOffset + 1));
+ }
+ else if (isset($parsedLine['finished'])) {
+ $processData['progressbar']->update($parsedLine['progress'], $statusPrefix.'finished');
+ }
+ else {
+ $processData['progressbar']->update($parsedLine['progress'], $statusPrefix.'loop#'.$parsedLine['iteration']);
+ }
+ }
+ else if (!$status['running']) {
+ // If the process exited we update the text status to indicate so.
+ $processData['progressbar']->update(null, $statusPrefix.'exited');
}
else {
- $processData['progressbar']->update($progress, $statusPrefix.'loop#'.$iteration);
+ // Otherwise just rerender the bar without changing anything.
+ $processData['progressbar']->update();
+ }
+ echo "\n";
+ $cursorOffset++;
+
+ // Check the exit code after processing the status line, to allow for dumping the error message.
+ if (!$status['running'] && $status['exitcode'] != -1) {
+ if ($status['exitcode']) {
+ throw new \Exception('Unclean exit of thread '.$processData['threadId'].' detected. Exiting.');
+ }
}
}
- else if (!$status['running']) {
- // If the process exited we update the text status to indicate so.
- $processData['progressbar']->update(null, $statusPrefix.'exited');
+ unset($processData);
+ }
+
+ // 4) Make sure to update the text status to 'exited' for every process to not confuse the user.
+ echo "\033[".$threads."A";
+ foreach ($processes as $processData) {
+ $status = proc_get_status($processData['process']);
+ if ($status === false) {
+ throw new \LogicException('Unreachable');
+ }
+
+ if (!$status['running']) {
+ $processData['progressbar']->update(null, 'T'.$processData['threadId'].': exited');
}
else {
- // Otherwise just rerender the bar without changing anything.
- $processData['progressbar']->update();
+ throw new \LogicException('Unreachable');
}
echo "\n";
}
- unset($processData);
}
-
- // 4) Make sure to update the text status to 'exited' for every process to not confuse the user.
- echo "\033[".$threads."A";
- foreach ($processes as &$processData) {
- $status = proc_get_status($processData['process']);
- if ($status === false) {
- throw new \LogicException('Unreachable');
- }
-
- if (!$status['running']) {
- $processData['progressbar']->update(null, 'T'.$processData['threadId'].': exited');
- }
- else {
- throw new \LogicException('Unreachable');
+ finally {
+ if ($cursorOffset) {
+ // Move out of the progress bar area.
+ echo str_repeat("\n", -$cursorOffset);
}
echo "\n";
+
+ // Kill all remaining processes and check their status to not leave zombies.
+ // There should not be any running processes at this point, except in case of
+ // an unclean exit of one process
+ foreach ($processes as $processData) {
+ if (proc_get_status($processData['process'])['running']) {
+ echo "Killing thread ".$processData['threadId']."\n";
+ proc_terminate($processData['process'], 9);
+ $i = 0;
+ while (proc_get_status($processData['process'])['running']) {
+ usleep(100000);
+ if ($i++ > 3) echo "Waiting for exit\n";
+ }
+ }
+ }
}
}