diff --git a/src/Parallel/Pool/Swoole/Process.php b/src/Parallel/Pool/Swoole/Process.php index 65c529e..bb8a50f 100644 --- a/src/Parallel/Pool/Swoole/Process.php +++ b/src/Parallel/Pool/Swoole/Process.php @@ -350,21 +350,36 @@ public function shutdown(): void if (\is_int($pid) && isset($pidsToWait[$pid])) { unset($pidsToWait[$pid]); } - } else { - if (\time() - $startTime > $maxWaitTime) { - foreach ($this->workers as $worker) { - if (isset($pidsToWait[$worker->pid])) { - SwooleProcess::kill($worker->pid, SIGKILL); - } - } - break; + continue; + } + + // No child was ready to reap. Drop any workers that have already + // exited and been reaped elsewhere so we neither wait on nor signal + // a dead PID. kill($pid, 0) only probes for existence (no signal is + // sent) and, unlike SIGKILL, does not warn when the PID is gone. + foreach (\array_keys($pidsToWait) as $pid) { + if (!SwooleProcess::kill($pid, 0)) { + unset($pidsToWait[$pid]); } + } - if (SwooleCoroutine::getCid() > 0) { - SwooleCoroutine::sleep(0.001); // 1ms - } else { - \usleep(1000); + if (empty($pidsToWait)) { + break; + } + + if (\time() - $startTime > $maxWaitTime) { + foreach (\array_keys($pidsToWait) as $pid) { + if (SwooleProcess::kill($pid, 0)) { + SwooleProcess::kill($pid, SIGKILL); + } } + break; + } + + if (SwooleCoroutine::getCid() > 0) { + SwooleCoroutine::sleep(0.001); // 1ms + } else { + \usleep(1000); } } diff --git a/src/Promise/Adapter/Swoole/Coroutine.php b/src/Promise/Adapter/Swoole/Coroutine.php index 2d1decf..452d11a 100644 --- a/src/Promise/Adapter/Swoole/Coroutine.php +++ b/src/Promise/Adapter/Swoole/Coroutine.php @@ -124,10 +124,10 @@ public static function all(array $promises): static $channel->push(true); return $value; }, function ($err) use ($channel, &$error) { - $channel->push(true); if ($error === null) { $error = $err; } + $channel->push(true); }); $key++; } @@ -159,7 +159,7 @@ public static function race(array $promises): static { return self::create(function (callable $resolve, callable $reject) use ($promises) { if (empty($promises)) { - $reject(new PromiseException('Cannot race with an empty array of promises')); + $reject(new Promise('Cannot race with an empty array of promises')); return; } diff --git a/src/Timer/Adapter.php b/src/Timer/Adapter.php index d5fb398..b5dcbdd 100644 --- a/src/Timer/Adapter.php +++ b/src/Timer/Adapter.php @@ -14,6 +14,7 @@ * Static methods are provided for API consistency with other facades. * * @internal Use Utopia\Async\Timer facade instead + * @phpstan-consistent-constructor * @package Utopia\Async\Timer */ abstract class Adapter diff --git a/src/Timer/Adapter/Sync.php b/src/Timer/Adapter/Sync.php index a7018cb..446cd6f 100644 --- a/src/Timer/Adapter/Sync.php +++ b/src/Timer/Adapter/Sync.php @@ -18,13 +18,6 @@ */ class Sync extends Adapter { - /** - * Track whether tick timers should continue running - * - * @var array - */ - private array $running = []; - /** * Sync adapter is always supported as it has no dependencies. * @@ -81,15 +74,10 @@ protected function doTick(int $milliseconds, callable $callback): int 'interval' => $milliseconds, 'type' => 'tick', ]; - $this->running[$timerId] = true; - while ($this->running[$timerId] ?? false) { + while ($this->doExists($timerId)) { \usleep($milliseconds * 1000); - if (isset($this->timers[$timerId]) && ($this->running[$timerId] ?? false)) { - $callback($timerId); - } else { - break; - } + $callback($timerId); } return $timerId; @@ -108,7 +96,6 @@ protected function doClear(int $timerId): bool } unset($this->timers[$timerId]); - unset($this->running[$timerId]); return true; } @@ -120,7 +107,6 @@ protected function doClear(int $timerId): bool */ protected function doClearAll(): void { - $this->running = []; $this->timers = []; } } diff --git a/tests/E2e/Promise/Swoole/CoroutineTest.php b/tests/E2e/Promise/Swoole/CoroutineTest.php index c95cdfd..802cfd5 100644 --- a/tests/E2e/Promise/Swoole/CoroutineTest.php +++ b/tests/E2e/Promise/Swoole/CoroutineTest.php @@ -176,6 +176,34 @@ public function testAllWithRejection(): void }); } + public function testAllRejectsWhenABranchRejectsAfterYielding(): void + { + // Regression: the rejection handler in all() must record the error + // before signalling the channel. If it pushes first, the awaiting + // coroutine can drain the channel and read a still-null error, causing + // all() to resolve instead of reject. Async branches that yield before + // rejecting (the realistic case) are what expose the ordering. + SwooleCoroutine\run(function () { + $promises = [ + Coroutine::async(function () { + SwooleCoroutine::sleep(0.01); + throw new \Exception('delayed failure'); + }), + Coroutine::async(function () { + SwooleCoroutine::sleep(0.01); + return 'ok'; + }), + ]; + + try { + Coroutine::all($promises)->await(); + $this->fail('Expected rejection to propagate from all()'); + } catch (\Exception $e) { + $this->assertEquals('delayed failure', $e->getMessage()); + } + }); + } + public function testRace(): void { SwooleCoroutine\run(function () { diff --git a/tests/E2e/Timer/Swoole/CoroutineTest.php b/tests/E2e/Timer/Swoole/CoroutineTest.php index 7dcc914..0eb8ec2 100644 --- a/tests/E2e/Timer/Swoole/CoroutineTest.php +++ b/tests/E2e/Timer/Swoole/CoroutineTest.php @@ -148,7 +148,6 @@ public function testAfterReturnsTimerId(): void SwooleCoroutine\run(function () { $timerId = Coroutine::after(100, function () {}); - $this->assertIsInt($timerId); $this->assertGreaterThan(0, $timerId); Coroutine::clear($timerId); @@ -160,7 +159,6 @@ public function testTickReturnsTimerId(): void SwooleCoroutine\run(function () { $timerId = Coroutine::tick(100, function () {}); - $this->assertIsInt($timerId); $this->assertGreaterThan(0, $timerId); Coroutine::clear($timerId); diff --git a/tests/E2e/Timer/SyncTest.php b/tests/E2e/Timer/SyncTest.php index d9e6d72..b12e3df 100644 --- a/tests/E2e/Timer/SyncTest.php +++ b/tests/E2e/Timer/SyncTest.php @@ -104,21 +104,15 @@ public function testTickWithExternalClear(): void public function testAfterReturnsTimerId(): void { $timerId = Sync::after(1, function () {}); - $this->assertIsInt($timerId); $this->assertGreaterThan(0, $timerId); } public function testTickReturnsTimerId(): void { - $count = 0; - $timerId = Sync::tick(1, function (int $id) use (&$count) { - $count++; - if ($count >= 1) { - Sync::clear($id); - } + $timerId = Sync::tick(1, function (int $id) { + Sync::clear($id); }); - $this->assertIsInt($timerId); $this->assertGreaterThan(0, $timerId); } @@ -136,14 +130,10 @@ public function testTimerIdsAreUnique(): void public function testCallbackReceivesTimerId(): void { $receivedId = null; - $count = 0; - Sync::tick(1, function (int $id) use (&$receivedId, &$count) { + Sync::tick(1, function (int $id) use (&$receivedId) { $receivedId = $id; - $count++; - if ($count >= 1) { - Sync::clear($id); - } + Sync::clear($id); }); $this->assertIsInt($receivedId); diff --git a/tests/Unit/TimerTest.php b/tests/Unit/TimerTest.php index db07b8b..84a5716 100644 --- a/tests/Unit/TimerTest.php +++ b/tests/Unit/TimerTest.php @@ -130,7 +130,6 @@ public function testFacadeAutoDetectsAdapter(): void $timerId = Timer::after(1, function () {}); - $this->assertIsInt($timerId); $this->assertGreaterThan(0, $timerId); } @@ -140,7 +139,6 @@ public function testAfterReturnsTimerId(): void $timerId = Timer::after(1, function () {}); - $this->assertIsInt($timerId); $this->assertGreaterThan(0, $timerId); } @@ -148,15 +146,10 @@ public function testTickReturnsTimerId(): void { Timer::setAdapter(Sync::class); - $count = 0; - $timerId = Timer::tick(1, function (int $id) use (&$count) { - $count++; - if ($count >= 1) { - Timer::clear($id); - } + $timerId = Timer::tick(1, function (int $id) { + Timer::clear($id); }); - $this->assertIsInt($timerId); $this->assertGreaterThan(0, $timerId); } }