Compare commits
1 Commits
getflake-p
...
async-comp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b1c0f416cb |
@@ -23,9 +23,9 @@ void Store::computeFSClosure(
|
||||
bool includeOutputs,
|
||||
bool includeDerivers)
|
||||
{
|
||||
std::function<std::set<StorePath>(const StorePath & path, std::future<ref<const ValidPathInfo>> &)> queryDeps;
|
||||
std::function<asio::awaitable<StorePathSet>(const StorePath & path)> queryDeps;
|
||||
if (flipDirection)
|
||||
queryDeps = [&](const StorePath & path, std::future<ref<const ValidPathInfo>> & fut) {
|
||||
queryDeps = [this, includeOutputs, includeDerivers](const StorePath & path) -> asio::awaitable<StorePathSet> {
|
||||
StorePathSet res;
|
||||
StorePathSet referrers;
|
||||
queryReferrers(path, referrers);
|
||||
@@ -41,12 +41,14 @@ void Store::computeFSClosure(
|
||||
for (auto & [_, maybeOutPath] : queryPartialDerivationOutputMap(path))
|
||||
if (maybeOutPath && isValidPath(*maybeOutPath))
|
||||
res.insert(*maybeOutPath);
|
||||
return res;
|
||||
co_return res;
|
||||
};
|
||||
else
|
||||
queryDeps = [&](const StorePath & path, std::future<ref<const ValidPathInfo>> & fut) {
|
||||
queryDeps = [this, includeOutputs, includeDerivers](const StorePath & path) -> asio::awaitable<StorePathSet> {
|
||||
StorePathSet res;
|
||||
auto info = fut.get();
|
||||
auto info = co_await callbackToAwaitable<ref<const ValidPathInfo>>(
|
||||
[this, path](Callback<ref<const ValidPathInfo>> cb) { queryPathInfo(path, std::move(cb)); });
|
||||
|
||||
for (auto & ref : info->references)
|
||||
if (ref != path)
|
||||
res.insert(ref);
|
||||
@@ -58,25 +60,9 @@ void Store::computeFSClosure(
|
||||
|
||||
if (includeDerivers && info->deriver && isValidPath(*info->deriver))
|
||||
res.insert(*info->deriver);
|
||||
return res;
|
||||
co_return res;
|
||||
};
|
||||
|
||||
computeClosure<StorePath>(
|
||||
startPaths,
|
||||
paths_,
|
||||
[&](const StorePath & path, std::function<void(std::promise<std::set<StorePath>> &)> processEdges) {
|
||||
std::promise<std::set<StorePath>> promise;
|
||||
std::function<void(std::future<ref<const ValidPathInfo>>)> getDependencies =
|
||||
[&](std::future<ref<const ValidPathInfo>> fut) {
|
||||
try {
|
||||
promise.set_value(queryDeps(path, fut));
|
||||
} catch (...) {
|
||||
promise.set_exception(std::current_exception());
|
||||
}
|
||||
};
|
||||
queryPathInfo(path, getDependencies);
|
||||
processEdges(promise);
|
||||
});
|
||||
computeClosure<StorePath>(startPaths, paths_, queryDeps);
|
||||
}
|
||||
|
||||
void Store::computeFSClosure(
|
||||
|
||||
@@ -20,10 +20,8 @@ TEST(closure, correctClosure)
|
||||
set<string> aClosure;
|
||||
set<string> expectedClosure = {"A", "B", "C", "F", "G"};
|
||||
computeClosure<string>(
|
||||
{"A"}, aClosure, [&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
|
||||
promise<set<string>> promisedNodes;
|
||||
promisedNodes.set_value(testGraph[currentNode]);
|
||||
processEdges(promisedNodes);
|
||||
{"A"}, aClosure, [&](const std::string & currentNode) -> asio::awaitable<std::set<std::string>> {
|
||||
co_return testGraph[currentNode];
|
||||
});
|
||||
|
||||
ASSERT_EQ(aClosure, expectedClosure);
|
||||
@@ -37,31 +35,7 @@ TEST(closure, properlyHandlesDirectExceptions)
|
||||
set<string> aClosure;
|
||||
EXPECT_THROW(
|
||||
computeClosure<string>(
|
||||
{"A"},
|
||||
aClosure,
|
||||
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) { throw TestExn(); }),
|
||||
TestExn);
|
||||
}
|
||||
|
||||
TEST(closure, properlyHandlesExceptionsInPromise)
|
||||
{
|
||||
struct TestExn
|
||||
{};
|
||||
|
||||
set<string> aClosure;
|
||||
EXPECT_THROW(
|
||||
computeClosure<string>(
|
||||
{"A"},
|
||||
aClosure,
|
||||
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
|
||||
promise<set<string>> promise;
|
||||
try {
|
||||
throw TestExn();
|
||||
} catch (...) {
|
||||
promise.set_exception(std::current_exception());
|
||||
}
|
||||
processEdges(promise);
|
||||
}),
|
||||
{"A"}, aClosure, [&](const std::string &) -> asio::awaitable<std::set<std::string>> { throw TestExn(); }),
|
||||
TestExn);
|
||||
}
|
||||
|
||||
|
||||
68
src/libutil/include/nix/util/async.hh
Normal file
68
src/libutil/include/nix/util/async.hh
Normal file
@@ -0,0 +1,68 @@
|
||||
#pragma once
|
||||
///@file
|
||||
|
||||
#include "nix/util/callback.hh"
|
||||
#include "nix/util/ref.hh"
|
||||
#include "nix/util/signals.hh"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/awaitable.hpp>
|
||||
#include <boost/asio/use_awaitable.hpp>
|
||||
#include <boost/asio/associated_cancellation_slot.hpp>
|
||||
|
||||
#include <concepts>
|
||||
|
||||
namespace nix {
|
||||
|
||||
namespace asio = boost::asio;
|
||||
|
||||
template<typename T, std::invocable<Callback<T>> F, typename CompletionToken>
|
||||
auto callbackToAwaitable(F && initiate, CompletionToken && token)
|
||||
{
|
||||
return asio::async_initiate<CompletionToken, void(std::future<T>)>(
|
||||
[initiate = std::forward<F>(initiate)](auto handler) mutable {
|
||||
auto executor = asio::get_associated_executor(handler);
|
||||
auto done = std::make_shared<std::atomic<bool>>(false);
|
||||
auto h = std::make_shared<decltype(handler)>(std::move(handler));
|
||||
|
||||
if (auto slot = asio::get_associated_cancellation_slot(*h); slot.is_connected()) {
|
||||
std::weak_ptr wh = h; /* To handle the cyclic ownership. */
|
||||
std::weak_ptr wdone = done;
|
||||
slot.assign([executor, wh, wdone](asio::cancellation_type /*don't care*/) {
|
||||
auto h = wh.lock();
|
||||
auto done = wdone.lock();
|
||||
if (!h || !done || done->exchange(true))
|
||||
return; /* Gracefully die. */
|
||||
/* Doesn't need to be kept alive for get_future() since it shares the ownership. */
|
||||
std::promise<T> p;
|
||||
p.set_exception(std::make_exception_ptr(Interrupted("interrupted by user")));
|
||||
asio::post(executor, [h, fut = p.get_future()]() mutable { std::move (*h)(std::move(fut)); });
|
||||
});
|
||||
}
|
||||
|
||||
initiate(Callback<T>([executor, done, h](std::future<T> fut) mutable {
|
||||
if (done->exchange(true))
|
||||
/* Early return for cooperative cancellation. The callback has been caller
|
||||
later than we've been cancelled. In practice we'll get an error, the handler
|
||||
has already been posted by the cancellation handler. */
|
||||
return;
|
||||
asio::post(executor, [h, fut = std::move(fut)]() mutable { std::move (*h)(std::move(fut)); });
|
||||
}));
|
||||
},
|
||||
std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a completion handler callback into a stackless coroutine. The
|
||||
* callback can be invoked on any thread and the completion handler will be
|
||||
* marshalled to the coroutines executer.
|
||||
*/
|
||||
template<typename T, std::invocable<Callback<T>> F>
|
||||
asio::awaitable<T> callbackToAwaitable(F && initiate)
|
||||
{
|
||||
auto fut = co_await callbackToAwaitable<T>(std::forward<F>(initiate), asio::use_awaitable);
|
||||
co_return fut.get();
|
||||
}
|
||||
|
||||
} // namespace nix
|
||||
@@ -1,73 +1,167 @@
|
||||
#pragma once
|
||||
///@file
|
||||
|
||||
#include <set>
|
||||
#include <future>
|
||||
#include "nix/util/sync.hh"
|
||||
#include "nix/util/async.hh"
|
||||
#include "nix/util/ref.hh"
|
||||
|
||||
using std::set;
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/asio/detached.hpp>
|
||||
#include <boost/asio/awaitable.hpp>
|
||||
#include <boost/asio/co_spawn.hpp>
|
||||
#include <boost/asio/bind_executor.hpp>
|
||||
#include <boost/asio/executor_work_guard.hpp>
|
||||
|
||||
#include <queue>
|
||||
#include <set>
|
||||
|
||||
namespace nix {
|
||||
|
||||
template<typename T>
|
||||
using GetEdgesAsync = std::function<void(const T &, std::function<void(std::promise<set<T>> &)>)>;
|
||||
using GetEdgesAsync = std::function<asio::awaitable<std::set<T>>(const T & elt)>;
|
||||
|
||||
template<typename T>
|
||||
void computeClosure(const set<T> startElts, set<T> & res, GetEdgesAsync<T> getEdgesAsync)
|
||||
template<typename T, typename CompletionToken>
|
||||
auto computeClosure(std::set<T> startElts, std::set<T> & res, GetEdgesAsync<T> getEdges, CompletionToken && token)
|
||||
{
|
||||
struct State
|
||||
{
|
||||
size_t pending;
|
||||
set<T> & res;
|
||||
std::exception_ptr exc;
|
||||
};
|
||||
auto initiator = [&res, startElts = std::move(startElts), getEdges = std::move(getEdges)](auto handler) {
|
||||
auto executor = asio::make_strand(asio::get_associated_executor(handler));
|
||||
|
||||
Sync<State> state_(State{0, res, 0});
|
||||
|
||||
std::condition_variable done;
|
||||
|
||||
auto enqueue = [&](this auto & enqueue, const T & current) -> void {
|
||||
/* Hand-rolled dynamic async graph traversal. ASIO/Cobalt and standard
|
||||
* C++ will get channels and task groups at some point, but this will
|
||||
* have to suffice for now. */
|
||||
struct State : std::enable_shared_from_this<State>
|
||||
{
|
||||
auto state(state_.lock());
|
||||
if (state->exc)
|
||||
return;
|
||||
if (!state->res.insert(current).second)
|
||||
return;
|
||||
state->pending++;
|
||||
decltype(executor) executor;
|
||||
decltype(getEdges) getEdges;
|
||||
decltype(handler) handler;
|
||||
std::set<T> & res;
|
||||
/**
|
||||
* Needed to keep the ctx.run() alive because actual work might be happening on another thread
|
||||
* (like with FileTransfer case).
|
||||
*/
|
||||
asio::executor_work_guard<decltype(State::executor)> workGuard;
|
||||
std::size_t pending = 0;
|
||||
/**
|
||||
* Whether the completion handler has been called.
|
||||
*/
|
||||
bool done = false;
|
||||
/**
|
||||
* Amount of coroutines currently in flight.
|
||||
*/
|
||||
std::size_t inFlight = 0;
|
||||
/**
|
||||
* Maximum number of concurrent coroutines. Implements primitive rate limiting.
|
||||
*/
|
||||
std::size_t maxConcurrent = 64;
|
||||
/**
|
||||
* Nodes to handle next.
|
||||
*/
|
||||
std::queue<T> todo;
|
||||
|
||||
State(
|
||||
decltype(executor) executor_, decltype(getEdges) getEdges, decltype(handler) handler, std::set<T> & res)
|
||||
: executor(executor_)
|
||||
, getEdges(std::move(getEdges))
|
||||
, handler(std::move(handler))
|
||||
, res(res)
|
||||
, workGuard(asio::make_work_guard(executor_))
|
||||
{
|
||||
}
|
||||
|
||||
void complete(std::exception_ptr ex)
|
||||
{
|
||||
if (std::exchange(done, true))
|
||||
return;
|
||||
workGuard.reset(); /* We are done and we can release the lock. */
|
||||
asio::post(executor, [state = this->shared_from_this(), ex] { state->handler(ex); });
|
||||
}
|
||||
|
||||
void enqueue(const std::set<T> & elts)
|
||||
{
|
||||
for (const auto & elt : elts)
|
||||
enqueue(elt);
|
||||
}
|
||||
|
||||
void spawnWorker(const T & elt)
|
||||
{
|
||||
++inFlight;
|
||||
auto state = this->shared_from_this();
|
||||
asio::post(executor, [state = this->shared_from_this(), elt = std::move(elt)] {
|
||||
asio::co_spawn(
|
||||
state->executor,
|
||||
[state, elt]() -> asio::awaitable<void> {
|
||||
try {
|
||||
state->enqueue(co_await state->getEdges(elt));
|
||||
} catch (...) {
|
||||
state->complete(std::current_exception());
|
||||
}
|
||||
state->onWorkDone();
|
||||
},
|
||||
asio::detached);
|
||||
});
|
||||
}
|
||||
|
||||
void onWorkDone()
|
||||
{
|
||||
--inFlight;
|
||||
--pending;
|
||||
|
||||
if (!todo.empty()) {
|
||||
auto next = std::move(todo.front());
|
||||
todo.pop();
|
||||
asio::post(executor, [state = this->shared_from_this(), next = std::move(next)]() mutable {
|
||||
state->spawnWorker(std::move(next));
|
||||
});
|
||||
} else if (pending == 0) {
|
||||
complete(std::exception_ptr{});
|
||||
}
|
||||
}
|
||||
|
||||
void enqueue(const T & elt)
|
||||
{
|
||||
if (done)
|
||||
return;
|
||||
|
||||
if (!res.insert(elt).second)
|
||||
return;
|
||||
|
||||
++pending;
|
||||
|
||||
if (inFlight < maxConcurrent) {
|
||||
spawnWorker(elt);
|
||||
} else {
|
||||
todo.push(elt);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto state = make_ref<State>(executor, std::move(getEdges), std::move(handler), res);
|
||||
if (startElts.empty()) {
|
||||
/* No work to do. */
|
||||
state->complete(std::exception_ptr{});
|
||||
return;
|
||||
}
|
||||
|
||||
getEdgesAsync(current, [&](std::promise<set<T>> & prom) {
|
||||
try {
|
||||
auto children = prom.get_future().get();
|
||||
for (auto & child : children)
|
||||
enqueue(child);
|
||||
{
|
||||
auto state(state_.lock());
|
||||
assert(state->pending);
|
||||
if (!--state->pending)
|
||||
done.notify_one();
|
||||
}
|
||||
} catch (...) {
|
||||
auto state(state_.lock());
|
||||
if (!state->exc)
|
||||
state->exc = std::current_exception();
|
||||
assert(state->pending);
|
||||
if (!--state->pending)
|
||||
done.notify_one();
|
||||
};
|
||||
});
|
||||
asio::post(executor, [state, startElts = std::move(startElts)] { state->enqueue(startElts); });
|
||||
};
|
||||
|
||||
for (auto & startElt : startElts)
|
||||
enqueue(startElt);
|
||||
return asio::async_initiate<CompletionToken, void(std::exception_ptr)>(std::move(initiator), token);
|
||||
}
|
||||
|
||||
{
|
||||
auto state(state_.lock());
|
||||
while (state->pending)
|
||||
state.wait(done);
|
||||
if (state->exc)
|
||||
std::rethrow_exception(state->exc);
|
||||
}
|
||||
template<typename T>
|
||||
void computeClosure(std::set<T> startElts, std::set<T> & res, GetEdgesAsync<T> getEdges)
|
||||
{
|
||||
asio::io_context ctx;
|
||||
std::exception_ptr ex = nullptr;
|
||||
computeClosure(
|
||||
std::move(startElts),
|
||||
res,
|
||||
std::move(getEdges),
|
||||
asio::bind_executor(ctx.get_executor(), [&](std::exception_ptr ex2) { ex = ex2; }));
|
||||
ctx.run();
|
||||
if (ex)
|
||||
std::rethrow_exception(ex);
|
||||
}
|
||||
|
||||
} // namespace nix
|
||||
|
||||
@@ -10,6 +10,7 @@ headers = files(
|
||||
'args.hh',
|
||||
'args/root.hh',
|
||||
'array-from-string-literal.hh',
|
||||
'async.hh',
|
||||
'base-n.hh',
|
||||
'base-nix-32.hh',
|
||||
'callback.hh',
|
||||
|
||||
Reference in New Issue
Block a user