Nix 2.26.3
Nix, the purely functional package manager; unstable internal interfaces
 
Loading...
Searching...
No Matches
thread-pool.hh
Go to the documentation of this file.
1#pragma once
3
4#include "error.hh"
5#include "sync.hh"
6
7#include <queue>
8#include <functional>
9#include <thread>
10#include <map>
11#include <atomic>
12
13namespace nix {
14
15MakeError(ThreadPoolShutDown, Error);
16
21class ThreadPool
22{
23public:
24
25 ThreadPool(size_t maxThreads = 0);
26
27 ~ThreadPool();
28
34 typedef std::function<void()> work_t;
35
39 void enqueue(const work_t & t);
40
53 void process();
54
55private:
56
57 size_t maxThreads;
58
59 struct State
60 {
61 std::queue<work_t> pending;
62 size_t active = 0;
63 std::exception_ptr exception;
64 std::vector<std::thread> workers;
65 bool draining = false;
66 };
67
68 std::atomic_bool quit{false};
69
70 Sync<State> state_;
71
72 std::condition_variable work;
73
74 void doWork(bool mainThread);
75
76 void shutdown();
77};
78
84template<typename T>
86 const std::set<T> & nodes,
87 std::function<std::set<T>(const T &)> getEdges,
88 std::function<void(const T &)> processNode)
89{
90 struct Graph {
91 std::set<T> left;
92 std::map<T, std::set<T>> refs, rrefs;
93 };
94
95 Sync<Graph> graph_(Graph{nodes, {}, {}});
96
97 std::function<void(const T &)> worker;
98
99 /* Create pool last to ensure threads are stopped before other destructors
100 * run */
101 ThreadPool pool;
102
103 worker = [&](const T & node) {
104
105 {
106 auto graph(graph_.lock());
107 auto i = graph->refs.find(node);
108 if (i == graph->refs.end())
109 goto getRefs;
110 goto doWork;
111 }
112
113 getRefs:
114 {
115 auto refs = getEdges(node);
116 refs.erase(node);
117
118 {
119 auto graph(graph_.lock());
120 for (auto & ref : refs)
121 if (graph->left.count(ref)) {
122 graph->refs[node].insert(ref);
123 graph->rrefs[ref].insert(node);
124 }
125 if (graph->refs[node].empty())
126 goto doWork;
127 }
128 }
129
130 return;
131
132 doWork:
133 processNode(node);
134
135 /* Enqueue work for all nodes that were waiting on this one
136 and have no unprocessed dependencies. */
137 {
138 auto graph(graph_.lock());
139 for (auto & rref : graph->rrefs[node]) {
140 auto & refs(graph->refs[rref]);
141 auto i = refs.find(node);
142 assert(i != refs.end());
143 refs.erase(i);
144 if (refs.empty())
145 pool.enqueue(std::bind(worker, rref));
146 }
147 graph->left.erase(node);
148 graph->refs.erase(node);
149 graph->rrefs.erase(node);
150 }
151 };
152
153 for (auto & node : nodes) {
154 try {
155 pool.enqueue(std::bind(worker, std::ref(node)));
156 } catch (ThreadPoolShutDown &) {
157 /* Stop if the thread pool is shutting down. It means a
158 previous work item threw an exception, so process()
159 below will rethrow it. */
160 break;
161 }
162 }
163
164 pool.process();
165
166 if (!graph_.lock()->left.empty())
167 throw Error("graph processing incomplete (cyclic reference?)");
168}
169
170}
Definition thread-pool.hh:22
void process()
Definition thread-pool.cc:54
std::function< void()> work_t
Definition thread-pool.hh:34
void enqueue(const work_t &t)
Definition thread-pool.cc:42
Definition ref.hh:15
This file defines two main structs/classes used in nix error handling.
auto i
Definition lexer.l:2745
T t
Definition lexer.l:154
void processGraph(const std::set< T > &nodes, std::function< std::set< T >(const T &)> getEdges, std::function< void(const T &)> processNode)
Definition thread-pool.hh:85