Attempt to terminate workers cooperatively
This change is designed fix address pthread thread flakiness around
process termination. For example see #19683 and #15014.
In some cases I believe that workers are being terminated while they
have postMessage messages in their queue, so messages are getting lost.
In other cases we are seeing message arrive after the call to
`terminate()`.
This change attempts to avoid using `terminate()` at all and attempts
to cooperatively shut down the worker. We only use `terminate()` if the
thread doesn't respond in a certain amount of time.
diff --git a/src/library_pthread.js b/src/library_pthread.js
index 5c01f4e..dbb39a3 100644
--- a/src/library_pthread.js
+++ b/src/library_pthread.js
@@ -55,6 +55,7 @@
// the reverse mapping, each worker has a `pthread_ptr` when its running a
// pthread.
pthreads: {},
+ shuttingDown: false,
#if ASSERTIONS
nextWorkerID: 1,
debugInit: function() {
@@ -176,12 +177,15 @@
terminateAllThreads__deps: ['$terminateWorker'],
terminateAllThreads: function() {
+ if (PThread.shuttingDown) return Promise.resolve();
#if ASSERTIONS
assert(!ENVIRONMENT_IS_PTHREAD, 'Internal Error! terminateAllThreads() can only ever be called from main application thread!');
#endif
-#if PTHREADS_DEBUG
+#if RUNTIME_DEBUG
dbg('terminateAllThreads');
#endif
+ PThread.shuttingDown = true;
+ var promises = [];
// Attempt to kill all workers. Sadly (at least on the web) there is no
// way to terminate a worker synchronously, or to be notified when a
// worker in actually terminated. This means there is some risk that
@@ -189,14 +193,19 @@
// returned. For this reason, we don't call `returnWorkerToPool` here or
// free the underlying pthread data structures.
for (var worker of PThread.runningWorkers) {
- terminateWorker(worker);
+ promises.push(terminateWorker(worker));
}
for (var worker of PThread.unusedWorkers) {
- terminateWorker(worker);
+ promises.push(terminateWorker(worker));
}
- PThread.unusedWorkers = [];
- PThread.runningWorkers = [];
- PThread.pthreads = [];
+ return Promise.all(promises).then(() => {
+#if RUNTIME_DEBUG
+ dbg('terminateAllThreads done');
+#endif
+ PThread.unusedWorkers = [];
+ PThread.runningWorkers = [];
+ PThread.pthreads = [];
+ });
},
returnWorkerToPool: function(worker) {
// We don't want to run main thread queued calls here, since we are doing
@@ -274,6 +283,12 @@
checkMailbox();
} else if (cmd === 'spawnThread') {
spawnThread(d);
+ } else if (cmd === 'doneShutdown') {
+#if RUNTIME_DEBUG
+ dbg('doneShutdown');
+#endif
+ clearTimeout(worker.shutdownTimer);
+ worker.shutdownComplete();
} else if (cmd === 'cleanupThread') {
cleanupThread(d['thread']);
#if MAIN_MODULE
@@ -523,18 +538,29 @@
#if PTHREADS_DEBUG
dbg('terminateWorker: ' + worker.workerID);
#endif
- worker.terminate();
- // terminate() can be asynchronous, so in theory the worker can continue
- // to run for some amount of time after termination. However from our POV
- // the worker now dead and we don't want to hear from it again, so we stub
- // out its message handler here. This avoids having to check in each of
- // the onmessage handlers if the message was coming from valid worker.
- worker.onmessage = (e) => {
-#if ASSERTIONS
- var cmd = e['data']['cmd'];
- err('received "' + cmd + '" command from terminated worker: ' + worker.workerID);
-#endif
- };
+ // Returns a promise that resoves once the worker has been shut down.
+ return new Promise((resolve) => {
+ // Give the worker 200ms to voluntarily shutdown, and if it fails to
+ // response we `terminate()` it with prejudice.
+ worker.shutdownComplete = () => resolve();
+ worker.shutdownTimer = setTimeout(() => {
+ err('worker failed to respond for cooperative shutdown: ' + worker.workerID);
+ worker.terminate();
+ // terminate() can be asynchronous, so in theory the worker can continue
+ // to run for some amount of time after termination. However from our POV
+ // the worker now dead and we don't want to hear from it again, so we stub
+ // out its message handler here. This avoids having to check in each of
+ // the onmessage handlers if the message was coming from valid worker.
+ worker.onmessage = (e) => {
+ #if ASSERTIONS
+ var cmd = e['data']['cmd'];
+ err('received "' + cmd + '" command from terminated worker: ' + worker.workerID);
+ #endif
+ }
+ worker.shutdownComplete();
+ }, 200);
+ worker.postMessage({ 'cmd': 'shutdown' });
+ });
},
$killThread__deps: ['_emscripten_thread_free_data', '$terminateWorker'],
@@ -547,13 +573,15 @@
assert(pthread_ptr, 'Internal Error! Null pthread_ptr in killThread!');
#endif
var worker = PThread.pthreads[pthread_ptr];
- delete PThread.pthreads[pthread_ptr];
- terminateWorker(worker);
- __emscripten_thread_free_data(pthread_ptr);
- // The worker was completely nuked (not just the pthread execution it was hosting), so remove it from running workers
- // but don't put it back to the pool.
- PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker), 1); // Not a running Worker anymore.
- worker.pthread_ptr = 0;
+ terminateWorker(worker).then(() => {
+ delete PThread.pthreads[pthread_ptr];
+ __emscripten_thread_free_data(pthread_ptr);
+ // The worker was completely nuked (not just the pthread execution it was
+ // hosting), so remove it from running workers but don't put it back to the
+ // pool.
+ PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker), 1); // Not a running Worker anymore.
+ worker.pthread_ptr = 0;
+ });
},
__emscripten_thread_cleanup: function(thread) {
diff --git a/src/library_wasi.js b/src/library_wasi.js
index 42b96c1..07c8743 100644
--- a/src/library_wasi.js
+++ b/src/library_wasi.js
@@ -25,13 +25,17 @@
#endif
EXITSTATUS = code;
if (!keepRuntimeAlive()) {
-#if PTHREADS
- PThread.terminateAllThreads();
-#endif
+ var doneExit = () => {
#if expectToReceiveOnModule('onExit')
- if (Module['onExit']) Module['onExit'](code);
+ if (Module['onExit']) Module['onExit'](code);
#endif
- ABORT = true;
+ ABORT = true;
+ }
+#if PTHREADS
+ PThread.terminateAllThreads().then(doneExit);
+#else
+ doneExit();
+#endif
}
quit_(code, new ExitStatus(code));
#endif // MINIMAL_RUNTIME
diff --git a/src/postamble_minimal.js b/src/postamble_minimal.js
index 89b1dbe..f7a9b69 100644
--- a/src/postamble_minimal.js
+++ b/src/postamble_minimal.js
@@ -30,10 +30,6 @@
PThread.terminateAllThreads();
#endif
-#endif
-
-#if EXIT_RUNTIME
-
#if ASSERTIONS
runtimeExited = true;
#endif
diff --git a/src/preamble.js b/src/preamble.js
index f386199..8f4bd7c 100644
--- a/src/preamble.js
+++ b/src/preamble.js
@@ -280,9 +280,10 @@
callRuntimeCallbacks(__ATEXIT__);
<<< ATEXITS >>>
#if PTHREADS
- PThread.terminateAllThreads();
-#endif
+ PThread.terminateAllThreads().then(() => { runtimeExited = true });
+#else
runtimeExited = true;
+#endif
}
#endif
diff --git a/src/worker.js b/src/worker.js
index 3e4760c..5ffc692 100644
--- a/src/worker.js
+++ b/src/worker.js
@@ -36,6 +36,7 @@
Worker: nodeWorkerThreads.Worker,
importScripts: (f) => (0, eval)(fs.readFileSync(f, 'utf8') + '//# sourceURL=' + f),
postMessage: (msg) => parentPort.postMessage(msg),
+ close: () => parentPort.close(),
performance: global.performance || { now: Date.now },
});
}
@@ -251,6 +252,9 @@
dbg(`Pthread 0x${Module['_pthread_self']().toString(16)} completed its main entry point with an 'unwind', keeping the worker alive for asynchronous operation.`);
#endif
}
+ } else if (e.data.cmd === 'shutdown') {
+ postMessage({ 'cmd': 'doneShutdown' });
+ close();
} else if (e.data.cmd === 'cancel') { // Main thread is asking for a pthread_cancel() on this thread.
if (Module['_pthread_self']()) {
Module['__emscripten_thread_exit']({{{ cDefs.PTHREAD_CANCELED }}});
diff --git a/test/other/metadce/test_metadce_minimal_pthreads.jssize b/test/other/metadce/test_metadce_minimal_pthreads.jssize
index cc1f5e5..e43efaa 100644
--- a/test/other/metadce/test_metadce_minimal_pthreads.jssize
+++ b/test/other/metadce/test_metadce_minimal_pthreads.jssize
@@ -1 +1 @@
-15069
+15510