1
0
mirror of https://github.com/roytam1/UXP.git synced 2026-05-26 13:58:49 +00:00

Implement Atomics.waitAsync

This commit is contained in:
Basilisk-Dev
2026-05-15 19:16:34 -04:00
committed by roytam1
parent 2e51dc9f09
commit 22cb023133
3 changed files with 339 additions and 32 deletions
+301 -32
View File
@@ -51,6 +51,7 @@
#include "mozilla/Maybe.h"
#include "mozilla/Unused.h"
#include "builtin/Promise.h"
#include "jsapi.h"
#include "jsfriendapi.h"
#include "jsnum.h"
@@ -59,6 +60,7 @@
#include "jit/InlinableNatives.h"
#include "js/Class.h"
#include "vm/GlobalObject.h"
#include "vm/HelperThreads.h"
#include "vm/Time.h"
#include "vm/TypedArrayObject.h"
#include "wasm/WasmInstance.h"
@@ -693,11 +695,13 @@ js::atomics_cmpxchg_asm_callout(wasm::Instance* instance, int32_t vt, int32_t of
namespace js {
class AtomicsWaitAsyncTask;
// Represents one waiting worker.
//
// The type is declared opaque in SharedArrayObject.h. Instances of
// js::FutexWaiter are stack-allocated and linked onto a list across a
// call to FutexRuntime::wait().
// js::FutexWaiter are linked onto a list across a call to FutexRuntime::wait()
// or an async wait task.
//
// The 'waiters' field of the SharedArrayRawBuffer points to the highest
// priority waiter in the list, and lower priority nodes are linked through
@@ -711,14 +715,34 @@ class FutexWaiter
public:
FutexWaiter(uint32_t offset, JSRuntime* rt)
: offset(offset),
kind(Sync),
rt(rt),
asyncTask(nullptr),
lower_pri(nullptr),
back(nullptr)
{
}
FutexWaiter(uint32_t offset, AtomicsWaitAsyncTask* asyncTask)
: offset(offset),
kind(Async),
rt(nullptr),
asyncTask(asyncTask),
lower_pri(nullptr),
back(nullptr)
{
}
bool isWaiting() const;
void notify();
uint32_t offset; // int32 element index within the SharedArrayBuffer
enum WaiterKind {
Sync,
Async
} kind;
JSRuntime* rt; // The runtime of the waiter
AtomicsWaitAsyncTask* asyncTask; // The async waiter task, if any
FutexWaiter* lower_pri; // Lower priority nodes in circular doubly-linked list of waiters
FutexWaiter* back; // Other direction
};
@@ -742,8 +766,195 @@ class AutoLockFutexAPI
js::UniqueLock<js::Mutex>& unique() { return *unique_; }
};
static void
AddWaiter(SharedArrayRawBuffer* sarb, FutexWaiter* waiter)
{
if (FutexWaiter* waiters = sarb->waiters()) {
waiter->lower_pri = waiters;
waiter->back = waiters->back;
waiters->back->lower_pri = waiter;
waiters->back = waiter;
} else {
waiter->lower_pri = waiter->back = waiter;
sarb->setWaiters(waiter);
}
}
static void
RemoveWaiter(SharedArrayRawBuffer* sarb, FutexWaiter* waiter)
{
if (waiter->lower_pri == waiter) {
sarb->setWaiters(nullptr);
} else {
waiter->lower_pri->back = waiter->back;
waiter->back->lower_pri = waiter->lower_pri;
if (sarb->waiters() == waiter)
sarb->setWaiters(waiter->lower_pri);
}
waiter->lower_pri = nullptr;
waiter->back = nullptr;
}
class AtomicsWaitAsyncTask : public PromiseTask
{
enum class State {
Waiting,
Notified,
TimedOut
};
SharedArrayRawBuffer* sarb_;
FutexWaiter waiter_;
mozilla::Maybe<mozilla::TimeDuration> timeout_;
ConditionVariable cond_;
State state_;
bool isInWaiterList_;
public:
AtomicsWaitAsyncTask(JSContext* cx, Handle<PromiseObject*> promise,
SharedArrayRawBuffer* sarb, uint32_t offset,
mozilla::Maybe<mozilla::TimeDuration>& timeout)
: PromiseTask(cx, promise),
sarb_(sarb),
waiter_(offset, this),
timeout_(timeout),
state_(State::Waiting),
isInWaiterList_(false)
{
}
~AtomicsWaitAsyncTask() {
if (isInWaiterList_) {
AutoLockFutexAPI lock;
if (isInWaiterList_)
removeFromWaiterList();
}
sarb_->dropReference();
}
FutexWaiter* waiter() {
return &waiter_;
}
void setInWaiterList() {
isInWaiterList_ = true;
}
bool isWaiting() const {
return state_ == State::Waiting;
}
void notify() {
MOZ_ASSERT(isWaiting());
state_ = State::Notified;
cond_.notify_all();
}
void execute() override {
AutoLockFutexAPI lock;
const bool isTimed = timeout_.isSome();
auto finalEnd = timeout_.map([](mozilla::TimeDuration& timeout) {
return mozilla::TimeStamp::Now() + timeout;
});
auto maxSlice = mozilla::TimeDuration::FromSeconds(4000.0);
while (state_ == State::Waiting) {
if (isTimed) {
auto sliceEnd = finalEnd.map([&](mozilla::TimeStamp& finalEnd) {
auto sliceEnd = mozilla::TimeStamp::Now() + maxSlice;
if (finalEnd < sliceEnd)
sliceEnd = finalEnd;
return sliceEnd;
});
mozilla::Unused << cond_.wait_until(lock.unique(), *sliceEnd);
if (state_ == State::Waiting && mozilla::TimeStamp::Now() >= *finalEnd) {
state_ = State::TimedOut;
break;
}
} else {
cond_.wait(lock.unique());
}
}
if (isInWaiterList_)
removeFromWaiterList();
}
private:
void removeFromWaiterList() {
MOZ_ASSERT(isInWaiterList_);
RemoveWaiter(sarb_, &waiter_);
isInWaiterList_ = false;
}
bool finishPromise(JSContext* cx, Handle<PromiseObject*> promise) override {
RootedValue result(cx);
if (state_ == State::TimedOut)
result.setString(cx->names().futexTimedOut);
else
result.setString(cx->names().futexOK);
return PromiseObject::resolve(cx, promise, result);
}
};
bool
FutexWaiter::isWaiting() const
{
if (kind == Sync)
return rt->fx.isWaiting();
return asyncTask->isWaiting();
}
void
FutexWaiter::notify()
{
if (kind == Sync) {
rt->fx.notify(FutexRuntime::NotifyExplicit);
return;
}
asyncTask->notify();
}
} // namespace js
static bool
GetWaitTimeout(JSContext* cx, HandleValue timeoutv,
mozilla::Maybe<mozilla::TimeDuration>* timeout)
{
timeout->reset();
if (!timeoutv.isUndefined()) {
double timeout_ms;
if (!ToNumber(cx, timeoutv, &timeout_ms))
return false;
if (!mozilla::IsNaN(timeout_ms)) {
if (timeout_ms < 0)
*timeout = mozilla::Some(mozilla::TimeDuration::FromSeconds(0.0));
else if (!mozilla::IsInfinite(timeout_ms))
*timeout = mozilla::Some(mozilla::TimeDuration::FromMilliseconds(timeout_ms));
}
}
return true;
}
static bool
CreateWaitAsyncResult(JSContext* cx, bool isAsync, HandleValue value, MutableHandleValue rval)
{
RootedPlainObject obj(cx, NewBuiltinClassInstance<PlainObject>(cx));
if (!obj)
return false;
RootedValue asyncValue(cx, BooleanValue(isAsync));
if (!NativeDefineDataProperty(cx, obj, cx->names().async, asyncValue, JSPROP_ENUMERATE))
return false;
if (!NativeDefineDataProperty(cx, obj, cx->names().value, value, JSPROP_ENUMERATE))
return false;
rval.setObject(*obj);
return true;
}
bool
js::atomics_wait(JSContext* cx, unsigned argc, Value* vp)
{
@@ -768,17 +979,8 @@ js::atomics_wait(JSContext* cx, unsigned argc, Value* vp)
if (!ToInt32(cx, valv, &value))
return false;
mozilla::Maybe<mozilla::TimeDuration> timeout;
if (!timeoutv.isUndefined()) {
double timeout_ms;
if (!ToNumber(cx, timeoutv, &timeout_ms))
return false;
if (!mozilla::IsNaN(timeout_ms)) {
if (timeout_ms < 0)
timeout = mozilla::Some(mozilla::TimeDuration::FromSeconds(0.0));
else if (!mozilla::IsInfinite(timeout_ms))
timeout = mozilla::Some(mozilla::TimeDuration::FromMilliseconds(timeout_ms));
}
}
if (!GetWaitTimeout(cx, timeoutv, &timeout))
return false;
if (!rt->fx.canWait())
return ReportCannotWait(cx);
@@ -797,15 +999,7 @@ js::atomics_wait(JSContext* cx, unsigned argc, Value* vp)
SharedArrayRawBuffer* sarb = sab->rawBufferObject();
FutexWaiter w(offset, rt);
if (FutexWaiter* waiters = sarb->waiters()) {
w.lower_pri = waiters;
w.back = waiters->back;
waiters->back->lower_pri = &w;
waiters->back = &w;
} else {
w.lower_pri = w.back = &w;
sarb->setWaiters(&w);
}
AddWaiter(sarb, &w);
FutexRuntime::WaitResult result = FutexRuntime::FutexOK;
bool retval = rt->fx.wait(cx, lock.unique(), timeout, &result);
@@ -820,17 +1014,91 @@ js::atomics_wait(JSContext* cx, unsigned argc, Value* vp)
}
}
if (w.lower_pri == &w) {
sarb->setWaiters(nullptr);
} else {
w.lower_pri->back = w.back;
w.back->lower_pri = w.lower_pri;
if (sarb->waiters() == &w)
sarb->setWaiters(w.lower_pri);
}
RemoveWaiter(sarb, &w);
return retval;
}
bool
js::atomics_waitAsync(JSContext* cx, unsigned argc, Value* vp)
{
CallArgs args = CallArgsFromVp(argc, vp);
HandleValue objv = args.get(0);
HandleValue idxv = args.get(1);
HandleValue valv = args.get(2);
HandleValue timeoutv = args.get(3);
MutableHandleValue r = args.rval();
Rooted<TypedArrayObject*> view(cx, nullptr);
if (!GetSharedTypedArray(cx, objv, &view))
return false;
if (view->type() != Scalar::Int32)
return ReportBadArrayType(cx);
uint32_t offset;
if (!GetTypedArrayIndex(cx, idxv, view, &offset))
return false;
int32_t value;
if (!ToInt32(cx, valv, &value))
return false;
mozilla::Maybe<mozilla::TimeDuration> timeout;
if (!GetWaitTimeout(cx, timeoutv, &timeout))
return false;
Rooted<PromiseObject*> promise(cx, PromiseObject::createSkippingExecutor(cx));
if (!promise)
return false;
RootedValue promiseValue(cx, ObjectValue(*promise));
RootedValue result(cx);
if (!CreateWaitAsyncResult(cx, true, promiseValue, &result))
return false;
Rooted<SharedArrayBufferObject*> sab(cx, view->bufferShared());
SharedArrayRawBuffer* sarb = sab->rawBufferObject();
if (!sarb->addReference()) {
JS_ReportErrorASCII(cx, "Reference count overflow on SharedArrayBuffer");
return false;
}
auto task = cx->make_unique<AtomicsWaitAsyncTask>(cx, promise, sarb, offset, timeout);
if (!task) {
sarb->dropReference();
return false;
}
bool isAsync = false;
RootedValue immediateResult(cx);
{
AutoLockFutexAPI lock;
SharedMem<int32_t*> addr = view->viewDataShared().cast<int32_t*>() + offset;
if (jit::AtomicOperations::loadSafeWhenRacy(addr) != value) {
immediateResult.setString(cx->names().futexNotEqual);
} else if (timeout.isSome() && timeout->ToMilliseconds() == 0.0) {
immediateResult.setString(cx->names().futexTimedOut);
} else {
if (!cx->startAsyncTaskCallback || !cx->finishAsyncTaskCallback ||
!CanUseExtraThreads())
{
JS_ReportErrorASCII(cx, "Atomics.waitAsync not supported in this runtime.");
return false;
}
AddWaiter(sarb, task->waiter());
task->setInWaiterList();
isAsync = true;
}
}
if (!isAsync)
return CreateWaitAsyncResult(cx, false, immediateResult, r);
if (!StartPromiseTask(cx, Move(task)))
return false;
r.set(result);
return true;
}
bool
js::atomics_notify(JSContext* cx, unsigned argc, Value* vp)
{
@@ -870,9 +1138,9 @@ js::atomics_notify(JSContext* cx, unsigned argc, Value* vp)
do {
FutexWaiter* c = iter;
iter = iter->lower_pri;
if (c->offset != offset || !c->rt->fx.isWaiting())
if (c->offset != offset || !c->isWaiting())
continue;
c->rt->fx.notify(FutexRuntime::NotifyExplicit);
c->notify();
++woken;
--count;
} while (count > 0 && iter != waiters);
@@ -1106,6 +1374,7 @@ const JSFunctionSpec AtomicsMethods[] = {
JS_INLINABLE_FN("xor", atomics_xor, 3,0, AtomicsXor),
JS_INLINABLE_FN("isLockFree", atomics_isLockFree, 1,0, AtomicsIsLockFree),
JS_FN("wait", atomics_wait, 4,0),
JS_FN("waitAsync", atomics_waitAsync, 4,0),
JS_FN("notify", atomics_notify, 3,0),
JS_FN("wake", atomics_notify, 3,0), //Legacy name
JS_FS_END
+1
View File
@@ -35,6 +35,7 @@ MOZ_MUST_USE bool atomics_or(JSContext* cx, unsigned argc, Value* vp);
MOZ_MUST_USE bool atomics_xor(JSContext* cx, unsigned argc, Value* vp);
MOZ_MUST_USE bool atomics_isLockFree(JSContext* cx, unsigned argc, Value* vp);
MOZ_MUST_USE bool atomics_wait(JSContext* cx, unsigned argc, Value* vp);
MOZ_MUST_USE bool atomics_waitAsync(JSContext* cx, unsigned argc, Value* vp);
MOZ_MUST_USE bool atomics_notify(JSContext* cx, unsigned argc, Value* vp);
/* asm.js callouts */
+37
View File
@@ -0,0 +1,37 @@
// |reftest| skip-if(!this.SharedArrayBuffer || !this.Atomics || !this.drainJobQueue)
if (typeof SharedArrayBuffer === "function" && typeof Atomics === "object" &&
typeof drainJobQueue === "function") {
const sab = new SharedArrayBuffer(4);
const i32 = new Int32Array(sab);
let result = Atomics.waitAsync(i32, 0, 1, 10);
assertEq(result.async, false);
assertEq(result.value, "not-equal");
result = Atomics.waitAsync(i32, 0, 0, 0);
assertEq(result.async, false);
assertEq(result.value, "timed-out");
result = Atomics.waitAsync(i32, 0, 0);
assertEq(result.async, true);
let notified;
result.value.then(value => {
notified = value;
});
assertEq(Atomics.notify(i32, 0, 1), 1);
drainJobQueue();
assertEq(notified, "ok");
result = Atomics.waitAsync(i32, 0, 0, 1);
assertEq(result.async, true);
let timedOut;
result.value.then(value => {
timedOut = value;
});
drainJobQueue();
assertEq(timedOut, "timed-out");
}
if (typeof reportCompare === "function")
reportCompare(true, true);