From 3344c93a2a219384563c18d979eab312ae6d9109 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tim=20D=C3=BCsterhus?= Date: Fri, 14 Aug 2020 11:39:29 +0200 Subject: [PATCH] Add proper error handling to WorkerCLICommand with --threads > 1 Fixes #3507 --- .../cli/command/WorkerCLICommand.class.php | 271 +++++++++++------- 1 file changed, 166 insertions(+), 105 deletions(-) diff --git a/wcfsetup/install/files/lib/system/cli/command/WorkerCLICommand.class.php b/wcfsetup/install/files/lib/system/cli/command/WorkerCLICommand.class.php index 45df1ebb6e..5601f48c47 100644 --- a/wcfsetup/install/files/lib/system/cli/command/WorkerCLICommand.class.php +++ b/wcfsetup/install/files/lib/system/cli/command/WorkerCLICommand.class.php @@ -7,6 +7,7 @@ use wcf\system\io\File; use wcf\system\Regex; use wcf\util\CLIUtil; use wcf\util\DirectoryUtil; +use wcf\util\JSON; use wcf\util\StringUtil; use Zend\Console\Exception\RuntimeException as ArgvException; use Zend\Console\Getopt as ArgvParser; @@ -144,23 +145,39 @@ class WorkerCLICommand implements IArgumentedCLICommand { if (!empty($_ENV['WCF_WORKER_STATUS_FD'])) { $output = new File("php://fd/".$_ENV['WCF_WORKER_STATUS_FD'], "w"); } - for ($i = ($threadId !== null ? ($threadId + 1) : 0); $progress < 100; $i += $threads) { - $worker->setLoopCount($i); - $worker->validate(); - - // execute worker - $worker->execute(); - $worker->finalize(); - - // update progress - $progress = $worker->getProgress(); - $progressbar->update($progress); + try { + for ($i = ($threadId !== null ? ($threadId + 1) : 0); $progress < 100; $i += $threads) { + $worker->setLoopCount($i); + $worker->validate(); + + // execute worker + $worker->execute(); + $worker->finalize(); + + // update progress + $progress = $worker->getProgress(); + $progressbar->update($progress); + if ($output) { + $output->write(JSON::encode([ + 'iteration' => $i, + 'progress' => $progress, + ])."\n"); + } + } + } + catch (\Exception $e) { if ($output) { - $output->write("$i,$progress\n"); + $output->write(JSON::encode([ + 'error' => (string) $e, + ])); } + throw $e; } if ($output) { - $output->write("done,$progress"); + $output->write(JSON::encode([ + 'finished' => true, + 'progress' => $progress, + ])."\n"); $output->close(); } $progressbar->update($progress); @@ -192,8 +209,10 @@ class WorkerCLICommand implements IArgumentedCLICommand { // Invoke the worker processes with the same command line ... $arguments = $_SERVER['argv']; - // ... with the quiet argument. + // ... with the quiet argument ... $arguments[] = '-qqqqq'; + // ... and requesting an unclean exit of an Exception is thrown. + $arguments[] = '--exitOnFail'; $commandLine = PHP_BINARY.' '.implode(' ', array_map('escapeshellarg', $arguments)); Log::debug('Using "'.$commandLine.'" as the worker command line.'); @@ -209,117 +228,159 @@ class WorkerCLICommand implements IArgumentedCLICommand { 'WCF_SESSION_ID' => CLIWCF::getSession()->sessionID, ]); - // 1) Spawn the processes. - $processes = []; - for ($threadId = 0; $threadId < $threads; $threadId++) { - $process = proc_open($commandLine, $fileDescriptors, $pipes, null, $env, null); - - $consoleBar = new ConsoleProgressBar([ - 'width' => CLIWCF::getTerminal()->getWidth() - ]); - $consoleBar->setElements([ - ConsoleProgressBar::ELEMENT_PERCENT, - ConsoleProgressBar::ELEMENT_BAR, - ConsoleProgressBar::ELEMENT_ETA, - ConsoleProgressBar::ELEMENT_TEXT, - ]); - $consoleBar->setTextWidth(30); - $progressbar = new ProgressBar($consoleBar); - $progressbar->update(0, 'T'.$threadId.': spawned'); - echo "\n"; - - $processes[] = [ - 'threadId' => $threadId, - 'pipes' => $pipes, - 'process' => $process, - 'progressbar' => $progressbar, - ]; - } - - // 2) Start processing in all processes. - foreach ($processes as $processData) { - $workerCommand = 'worker --threads='.$threads.' --threadId='.$processData['threadId'].' "'.addcslashes(get_class($worker), "\\\""); - fwrite($processData['pipes'][0], $workerCommand."\n"); - fclose($processData['pipes'][0]); - } - - // 3) Handle their status output. - while (true) { - // 3.1) Check which processes' status FD is readable. - $read = array_filter(array_map(function ($processData) { - return $processData['pipes'][3]; - }, $processes), function ($handle) { - return $handle !== false; - }); - $write = null; - $except = null; - - // 3.2) Exit if all status FDs have been closed. - if (empty($read)) { - break; + try { + // 1) Spawn the processes. + $processes = []; + for ($threadId = 0; $threadId < $threads; $threadId++) { + $process = proc_open($commandLine, $fileDescriptors, $pipes, null, $env, null); + + $consoleBar = new ConsoleProgressBar([ + 'width' => CLIWCF::getTerminal()->getWidth() + ]); + $consoleBar->setElements([ + ConsoleProgressBar::ELEMENT_PERCENT, + ConsoleProgressBar::ELEMENT_BAR, + ConsoleProgressBar::ELEMENT_ETA, + ConsoleProgressBar::ELEMENT_TEXT, + ]); + $consoleBar->setTextWidth(30); + $progressbar = new ProgressBar($consoleBar); + $progressbar->update(0, 'T'.$threadId.': spawned'); + echo "\n"; + + $processes[] = [ + 'threadId' => $threadId, + 'pipes' => $pipes, + 'process' => $process, + 'progressbar' => $progressbar, + ]; } - stream_select($read, $write, $except, 2, null); + // 2) Start processing in all processes. + foreach ($processes as $processData) { + $workerCommand = 'worker --threads='.$threads.' --threadId='.$processData['threadId'].' "'.addcslashes(get_class($worker), "\\\""); + fwrite($processData['pipes'][0], $workerCommand."\n"); + fclose($processData['pipes'][0]); + } - // 3.3) Rerender the progressbars with the updated status information. - echo "\033[".$threads."A"; // Move up $threads lines to move into the line of the first progress bar. - foreach ($processes as &$processData) { - $status = proc_get_status($processData['process']); - if ($status === false) { - throw new \LogicException('Unreachable'); + // 3) Handle their status output. + while (true) { + // 3.1) Check which processes' status FD is readable. + $read = array_filter(array_map(function ($processData) { + return $processData['pipes'][3]; + }, $processes), function ($handle) { + return $handle !== false; + }); + $write = null; + $except = null; + + // 3.2) Exit if all status FDs have been closed. + if (empty($read)) { + break; } - $line = false; - if ($processData['pipes'][3] !== false) { - if (in_array($processData['pipes'][3], $read)) { - $line = fgets($processData['pipes'][3]); + stream_select($read, $write, $except, 2, null); + + // 3.3) Rerender the progressbars with the updated status information. + echo "\033[".$threads."A"; // Move up $threads lines to move into the line of the first progress bar. + $cursorOffset = -$threads; + foreach ($processes as &$processData) { + $status = proc_get_status($processData['process']); + if ($status === false) { + throw new \LogicException('Unreachable'); } - if (feof($processData['pipes'][3])) { - fclose($processData['pipes'][3]); - $processData['pipes'][3] = false; + $line = false; + if ($processData['pipes'][3] !== false) { + if (in_array($processData['pipes'][3], $read)) { + $line = fgets($processData['pipes'][3]); + } + + if (feof($processData['pipes'][3])) { + fclose($processData['pipes'][3]); + $processData['pipes'][3] = false; + } } - } - - $statusPrefix = 'T'.$processData['threadId'].': '; - if ($line) { - // If a line could be read we update the progressbar with the data sent. - list($iteration, $progress) = explode(',', StringUtil::trim($line)); - if ($iteration === 'done') { - $processData['progressbar']->update($progress, $statusPrefix.'finished'); + + $statusPrefix = 'T'.$processData['threadId'].': '; + if ($line) { + // If a line could be read we update the progressbar with the data sent. + $parsedLine = JSON::decode(StringUtil::trim($line)); + + if (!empty($parsedLine['error'])) { + fwrite(STDERR, str_repeat("\n", -$cursorOffset + 1)); + fwrite(STDERR, str_repeat("=", 20)."\n"); + fwrite(STDERR, "Error in thread ".$processData['threadId']."\n"); + fwrite(STDERR, $parsedLine['error']."\n"); + fwrite(STDERR, str_repeat("=", 20)."\n"); + fwrite(STDERR, str_repeat("\n", $threads + $cursorOffset + 1)); + } + else if (isset($parsedLine['finished'])) { + $processData['progressbar']->update($parsedLine['progress'], $statusPrefix.'finished'); + } + else { + $processData['progressbar']->update($parsedLine['progress'], $statusPrefix.'loop#'.$parsedLine['iteration']); + } + } + else if (!$status['running']) { + // If the process exited we update the text status to indicate so. + $processData['progressbar']->update(null, $statusPrefix.'exited'); } else { - $processData['progressbar']->update($progress, $statusPrefix.'loop#'.$iteration); + // Otherwise just rerender the bar without changing anything. + $processData['progressbar']->update(); + } + echo "\n"; + $cursorOffset++; + + // Check the exit code after processing the status line, to allow for dumping the error message. + if (!$status['running'] && $status['exitcode'] != -1) { + if ($status['exitcode']) { + throw new \Exception('Unclean exit of thread '.$processData['threadId'].' detected. Exiting.'); + } } } - else if (!$status['running']) { - // If the process exited we update the text status to indicate so. - $processData['progressbar']->update(null, $statusPrefix.'exited'); + unset($processData); + } + + // 4) Make sure to update the text status to 'exited' for every process to not confuse the user. + echo "\033[".$threads."A"; + foreach ($processes as $processData) { + $status = proc_get_status($processData['process']); + if ($status === false) { + throw new \LogicException('Unreachable'); + } + + if (!$status['running']) { + $processData['progressbar']->update(null, 'T'.$processData['threadId'].': exited'); } else { - // Otherwise just rerender the bar without changing anything. - $processData['progressbar']->update(); + throw new \LogicException('Unreachable'); } echo "\n"; } - unset($processData); } - - // 4) Make sure to update the text status to 'exited' for every process to not confuse the user. - echo "\033[".$threads."A"; - foreach ($processes as &$processData) { - $status = proc_get_status($processData['process']); - if ($status === false) { - throw new \LogicException('Unreachable'); - } - - if (!$status['running']) { - $processData['progressbar']->update(null, 'T'.$processData['threadId'].': exited'); - } - else { - throw new \LogicException('Unreachable'); + finally { + if ($cursorOffset) { + // Move out of the progress bar area. + echo str_repeat("\n", -$cursorOffset); } echo "\n"; + + // Kill all remaining processes and check their status to not leave zombies. + // There should not be any running processes at this point, except in case of + // an unclean exit of one process + foreach ($processes as $processData) { + if (proc_get_status($processData['process'])['running']) { + echo "Killing thread ".$processData['threadId']."\n"; + proc_terminate($processData['process'], 9); + $i = 0; + while (proc_get_status($processData['process'])['running']) { + usleep(100000); + if ($i++ > 3) echo "Waiting for exit\n"; + } + } + } } } -- 2.20.1