Compare commits

...

1 Commits

Author SHA1 Message Date
Sergei Zimmerman
b1c0f416cb libutil: Make computeClosure async
This makes computeFSClosure much faster in practice, because the graph is now traversed in an async BFS manner, while
previously it serialised on each single path info query. For something like

rm ~/.cache/nix/binary-cache-v7.sqlite && nix path-info --store https://cache.nixos.org --recursive /nix/store/7zz3zmv2a0ssmgqlfhy4rsb6ii6z475a-stdenv-linux.drv

The difference is huge:

(Before)

Command being timed: "nix path-info --store https://cache.nixos.org --recursive /nix/store/7zz3zmv2a0ssmgqlfhy4rsb6ii6z475a-stdenv-linux.drv"
        User time (seconds): 0.07
        System time (seconds): 0.06
        Percent of CPU this job got: 1%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:08.88

(After)

Command being timed: "build/src/nix/nix path-info --store https://cache.nixos.org --recursive /nix/store/7zz3zmv2a0ssmgqlfhy4rsb6ii6z475a-stdenv-linux.drv"
        User time (seconds): 0.07
        System time (seconds): 0.04
        Percent of CPU this job got: 22%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:00.49

Basically 20x speedup with just processing stuff in an async manner.

Note that for now we have rather ad-hoc event loop just for
computeClosure. This seems perfectly fine for now, but in the future
we could extended it and possibly have a global even loop with multiple
threads handling it.

For now we have basic rate limiting <= 64 coroutines in flight which is
smaller than CURLMOPT_MAX_CONCURRENT_STREAMS (100 by default).
2025-12-19 07:28:44 +03:00
5 changed files with 228 additions and 105 deletions

View File

@@ -23,9 +23,9 @@ void Store::computeFSClosure(
bool includeOutputs,
bool includeDerivers)
{
std::function<std::set<StorePath>(const StorePath & path, std::future<ref<const ValidPathInfo>> &)> queryDeps;
std::function<asio::awaitable<StorePathSet>(const StorePath & path)> queryDeps;
if (flipDirection)
queryDeps = [&](const StorePath & path, std::future<ref<const ValidPathInfo>> & fut) {
queryDeps = [this, includeOutputs, includeDerivers](const StorePath & path) -> asio::awaitable<StorePathSet> {
StorePathSet res;
StorePathSet referrers;
queryReferrers(path, referrers);
@@ -41,12 +41,14 @@ void Store::computeFSClosure(
for (auto & [_, maybeOutPath] : queryPartialDerivationOutputMap(path))
if (maybeOutPath && isValidPath(*maybeOutPath))
res.insert(*maybeOutPath);
return res;
co_return res;
};
else
queryDeps = [&](const StorePath & path, std::future<ref<const ValidPathInfo>> & fut) {
queryDeps = [this, includeOutputs, includeDerivers](const StorePath & path) -> asio::awaitable<StorePathSet> {
StorePathSet res;
auto info = fut.get();
auto info = co_await callbackToAwaitable<ref<const ValidPathInfo>>(
[this, path](Callback<ref<const ValidPathInfo>> cb) { queryPathInfo(path, std::move(cb)); });
for (auto & ref : info->references)
if (ref != path)
res.insert(ref);
@@ -58,25 +60,9 @@ void Store::computeFSClosure(
if (includeDerivers && info->deriver && isValidPath(*info->deriver))
res.insert(*info->deriver);
return res;
co_return res;
};
computeClosure<StorePath>(
startPaths,
paths_,
[&](const StorePath & path, std::function<void(std::promise<std::set<StorePath>> &)> processEdges) {
std::promise<std::set<StorePath>> promise;
std::function<void(std::future<ref<const ValidPathInfo>>)> getDependencies =
[&](std::future<ref<const ValidPathInfo>> fut) {
try {
promise.set_value(queryDeps(path, fut));
} catch (...) {
promise.set_exception(std::current_exception());
}
};
queryPathInfo(path, getDependencies);
processEdges(promise);
});
computeClosure<StorePath>(startPaths, paths_, queryDeps);
}
void Store::computeFSClosure(

View File

@@ -20,10 +20,8 @@ TEST(closure, correctClosure)
set<string> aClosure;
set<string> expectedClosure = {"A", "B", "C", "F", "G"};
computeClosure<string>(
{"A"}, aClosure, [&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promisedNodes;
promisedNodes.set_value(testGraph[currentNode]);
processEdges(promisedNodes);
{"A"}, aClosure, [&](const std::string & currentNode) -> asio::awaitable<std::set<std::string>> {
co_return testGraph[currentNode];
});
ASSERT_EQ(aClosure, expectedClosure);
@@ -37,31 +35,7 @@ TEST(closure, properlyHandlesDirectExceptions)
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) { throw TestExn(); }),
TestExn);
}
TEST(closure, properlyHandlesExceptionsInPromise)
{
struct TestExn
{};
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promise;
try {
throw TestExn();
} catch (...) {
promise.set_exception(std::current_exception());
}
processEdges(promise);
}),
{"A"}, aClosure, [&](const std::string &) -> asio::awaitable<std::set<std::string>> { throw TestExn(); }),
TestExn);
}

View File

@@ -0,0 +1,68 @@
#pragma once
///@file
#include "nix/util/callback.hh"
#include "nix/util/ref.hh"
#include "nix/util/signals.hh"
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/associated_cancellation_slot.hpp>
#include <concepts>
namespace nix {
namespace asio = boost::asio;
template<typename T, std::invocable<Callback<T>> F, typename CompletionToken>
auto callbackToAwaitable(F && initiate, CompletionToken && token)
{
return asio::async_initiate<CompletionToken, void(std::future<T>)>(
[initiate = std::forward<F>(initiate)](auto handler) mutable {
auto executor = asio::get_associated_executor(handler);
auto done = std::make_shared<std::atomic<bool>>(false);
auto h = std::make_shared<decltype(handler)>(std::move(handler));
if (auto slot = asio::get_associated_cancellation_slot(*h); slot.is_connected()) {
std::weak_ptr wh = h; /* To handle the cyclic ownership. */
std::weak_ptr wdone = done;
slot.assign([executor, wh, wdone](asio::cancellation_type /*don't care*/) {
auto h = wh.lock();
auto done = wdone.lock();
if (!h || !done || done->exchange(true))
return; /* Gracefully die. */
/* Doesn't need to be kept alive for get_future() since it shares the ownership. */
std::promise<T> p;
p.set_exception(std::make_exception_ptr(Interrupted("interrupted by user")));
asio::post(executor, [h, fut = p.get_future()]() mutable { std::move (*h)(std::move(fut)); });
});
}
initiate(Callback<T>([executor, done, h](std::future<T> fut) mutable {
if (done->exchange(true))
/* Early return for cooperative cancellation. The callback has been caller
later than we've been cancelled. In practice we'll get an error, the handler
has already been posted by the cancellation handler. */
return;
asio::post(executor, [h, fut = std::move(fut)]() mutable { std::move (*h)(std::move(fut)); });
}));
},
std::forward<CompletionToken>(token));
}
/**
* Convert a completion handler callback into a stackless coroutine. The
* callback can be invoked on any thread and the completion handler will be
* marshalled to the coroutines executer.
*/
template<typename T, std::invocable<Callback<T>> F>
asio::awaitable<T> callbackToAwaitable(F && initiate)
{
auto fut = co_await callbackToAwaitable<T>(std::forward<F>(initiate), asio::use_awaitable);
co_return fut.get();
}
} // namespace nix

View File

@@ -1,73 +1,167 @@
#pragma once
///@file
#include <set>
#include <future>
#include "nix/util/sync.hh"
#include "nix/util/async.hh"
#include "nix/util/ref.hh"
using std::set;
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <queue>
#include <set>
namespace nix {
template<typename T>
using GetEdgesAsync = std::function<void(const T &, std::function<void(std::promise<set<T>> &)>)>;
using GetEdgesAsync = std::function<asio::awaitable<std::set<T>>(const T & elt)>;
template<typename T>
void computeClosure(const set<T> startElts, set<T> & res, GetEdgesAsync<T> getEdgesAsync)
template<typename T, typename CompletionToken>
auto computeClosure(std::set<T> startElts, std::set<T> & res, GetEdgesAsync<T> getEdges, CompletionToken && token)
{
struct State
{
size_t pending;
set<T> & res;
std::exception_ptr exc;
};
auto initiator = [&res, startElts = std::move(startElts), getEdges = std::move(getEdges)](auto handler) {
auto executor = asio::make_strand(asio::get_associated_executor(handler));
Sync<State> state_(State{0, res, 0});
std::condition_variable done;
auto enqueue = [&](this auto & enqueue, const T & current) -> void {
/* Hand-rolled dynamic async graph traversal. ASIO/Cobalt and standard
* C++ will get channels and task groups at some point, but this will
* have to suffice for now. */
struct State : std::enable_shared_from_this<State>
{
auto state(state_.lock());
if (state->exc)
return;
if (!state->res.insert(current).second)
return;
state->pending++;
decltype(executor) executor;
decltype(getEdges) getEdges;
decltype(handler) handler;
std::set<T> & res;
/**
* Needed to keep the ctx.run() alive because actual work might be happening on another thread
* (like with FileTransfer case).
*/
asio::executor_work_guard<decltype(State::executor)> workGuard;
std::size_t pending = 0;
/**
* Whether the completion handler has been called.
*/
bool done = false;
/**
* Amount of coroutines currently in flight.
*/
std::size_t inFlight = 0;
/**
* Maximum number of concurrent coroutines. Implements primitive rate limiting.
*/
std::size_t maxConcurrent = 64;
/**
* Nodes to handle next.
*/
std::queue<T> todo;
State(
decltype(executor) executor_, decltype(getEdges) getEdges, decltype(handler) handler, std::set<T> & res)
: executor(executor_)
, getEdges(std::move(getEdges))
, handler(std::move(handler))
, res(res)
, workGuard(asio::make_work_guard(executor_))
{
}
void complete(std::exception_ptr ex)
{
if (std::exchange(done, true))
return;
workGuard.reset(); /* We are done and we can release the lock. */
asio::post(executor, [state = this->shared_from_this(), ex] { state->handler(ex); });
}
void enqueue(const std::set<T> & elts)
{
for (const auto & elt : elts)
enqueue(elt);
}
void spawnWorker(const T & elt)
{
++inFlight;
auto state = this->shared_from_this();
asio::post(executor, [state = this->shared_from_this(), elt = std::move(elt)] {
asio::co_spawn(
state->executor,
[state, elt]() -> asio::awaitable<void> {
try {
state->enqueue(co_await state->getEdges(elt));
} catch (...) {
state->complete(std::current_exception());
}
state->onWorkDone();
},
asio::detached);
});
}
void onWorkDone()
{
--inFlight;
--pending;
if (!todo.empty()) {
auto next = std::move(todo.front());
todo.pop();
asio::post(executor, [state = this->shared_from_this(), next = std::move(next)]() mutable {
state->spawnWorker(std::move(next));
});
} else if (pending == 0) {
complete(std::exception_ptr{});
}
}
void enqueue(const T & elt)
{
if (done)
return;
if (!res.insert(elt).second)
return;
++pending;
if (inFlight < maxConcurrent) {
spawnWorker(elt);
} else {
todo.push(elt);
}
}
};
auto state = make_ref<State>(executor, std::move(getEdges), std::move(handler), res);
if (startElts.empty()) {
/* No work to do. */
state->complete(std::exception_ptr{});
return;
}
getEdgesAsync(current, [&](std::promise<set<T>> & prom) {
try {
auto children = prom.get_future().get();
for (auto & child : children)
enqueue(child);
{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending)
done.notify_one();
}
} catch (...) {
auto state(state_.lock());
if (!state->exc)
state->exc = std::current_exception();
assert(state->pending);
if (!--state->pending)
done.notify_one();
};
});
asio::post(executor, [state, startElts = std::move(startElts)] { state->enqueue(startElts); });
};
for (auto & startElt : startElts)
enqueue(startElt);
return asio::async_initiate<CompletionToken, void(std::exception_ptr)>(std::move(initiator), token);
}
{
auto state(state_.lock());
while (state->pending)
state.wait(done);
if (state->exc)
std::rethrow_exception(state->exc);
}
template<typename T>
void computeClosure(std::set<T> startElts, std::set<T> & res, GetEdgesAsync<T> getEdges)
{
asio::io_context ctx;
std::exception_ptr ex = nullptr;
computeClosure(
std::move(startElts),
res,
std::move(getEdges),
asio::bind_executor(ctx.get_executor(), [&](std::exception_ptr ex2) { ex = ex2; }));
ctx.run();
if (ex)
std::rethrow_exception(ex);
}
} // namespace nix

View File

@@ -10,6 +10,7 @@ headers = files(
'args.hh',
'args/root.hh',
'array-from-string-literal.hh',
'async.hh',
'base-n.hh',
'base-nix-32.hh',
'callback.hh',