21 #include <ModuleUtils.h> 24 #include <boost/graph/graphviz.hpp> 25 #include <boost/graph/topological_sort.hpp> 37 typedef boost::graph_traits<Graph>::out_edge_iterator out_edge_iterator_t;
38 typedef boost::graph_traits<Graph>::in_edge_iterator in_edge_iterator_t;
41 using std::runtime_error::runtime_error;
45 using std::runtime_error::runtime_error;
55 bool isConnectedToByOut(Graph& g, vertex_t vertex, vertex_t to) {
56 out_edge_iterator_t o, o_end;
57 std::tie(o, o_end) = boost::out_edges(vertex, g);
59 for (; o != o_end; ++o) {
60 vertex_t target = boost::target(*o, g);
64 if (isConnectedToByOut(g, target, to))
78 bool isConnectedToByIn(Graph& g, vertex_t vertex, vertex_t to) {
79 in_edge_iterator_t i, i_end;
80 std::tie(i, i_end) = boost::in_edges(vertex, g);
82 for (; i != i_end; ++i) {
83 vertex_t source = boost::source(*i, g);
87 if (isConnectedToByIn(g, source, to))
101 bool isConnectedTo(Graph& g, vertex_t vertex, vertex_t to) {
102 if (isConnectedToByOut(g, vertex, to))
105 return isConnectedToByIn(g, vertex, to);
115 bool isConnectedDirectlyTo(Graph& g, vertex_t from, vertex_t to) {
116 out_edge_iterator_t o, o_end;
117 std::tie(o, o_end) = boost::out_edges(from, g);
119 for (; o != o_end; ++o) {
120 vertex_t target = boost::target(*o, g);
141 auto it = std::find_if(looper_path.elements.begin(), looper_path.elements.end(),
142 [&module_name](
const std::string& m) {
143 return m == module_name;
146 return it != looper_path.elements.end();
149 momemta::ModuleList::value_type get_module_def(
const std::string& module_type,
150 const momemta::ModuleList& available_modules) {
152 auto it = std::find_if(available_modules.begin(), available_modules.end(),
153 [&module_type](
const momemta::ModuleList::value_type& m) {
154 return m.name == module_type;
157 assert(it != available_modules.end());
164 auto& storage = module_decls;
167 auto map_it = storage.find(path);
168 if (map_it == storage.end()) {
170 sorted_execution_paths.emplace_back(path);
173 storage[path].emplace_back(decl);
175 map_it->second.emplace_back(decl);
179 const std::vector<uuid>& ComputationGraph::getPaths()
const {
180 return sorted_execution_paths;
183 const std::vector<Configuration::ModuleDecl>& ComputationGraph::getDecls(
const uuid& path)
const {
184 return module_decls.at(path);
187 void ComputationGraph::initialize(PoolPtr pool) {
188 const auto& execution_paths = sorted_execution_paths;
191 std::map<uuid, std::vector<ModulePtr>> module_instances;
199 for (
auto it = execution_paths.rbegin(); it != execution_paths.rend(); ++it) {
201 const auto& modules = getDecls(*it);
202 for (
auto module_decl_it = modules.begin(); module_decl_it != modules.end(); ++module_decl_it) {
204 std::unique_ptr<ParameterSet> params(module_decl_it->parameters->clone());
206 if (module_decl_it->type ==
"Looper") {
212 params->raw_set(
"path",
Path(module_instances.at(config_path_id)));
216 module_instances[*it].push_back(ModuleFactory::get().create(module_decl_it->type, pool, *params));
218 LOG(fatal) <<
"Exception while trying to create module " << module_decl_it->type
219 <<
"::" << module_decl_it->name
220 <<
". See message above for a (possible) more detailed description of the error.";
221 std::rethrow_exception(std::current_exception());
226 modules = module_instances[DEFAULT_EXECUTION_PATH];
229 void ComputationGraph::configure() {
230 for (
auto& module: modules)
234 void ComputationGraph::finish() {
235 for (
auto& module: modules)
239 void ComputationGraph::beginIntegration() {
240 for (
auto& module: modules)
241 module->beginIntegration();
244 void ComputationGraph::endIntegration() {
245 for (
auto& module: modules)
246 module->endIntegration();
249 Module::Status ComputationGraph::execute() {
250 for (
auto& module: modules)
251 module->beginPoint();
253 for (
auto& module: modules) {
255 auto start = high_resolution_clock::now();
257 auto status = module->work();
259 module_timings[module.get()] += high_resolution_clock::now() - start;
262 if (status == Module::Status::NEXT) {
264 return Module::Status::NEXT;
265 }
else if (status == Module::Status::ABORT) {
267 return Module::Status::ABORT;
271 for (
auto& module: modules)
274 return Module::Status::OK;
278 void ComputationGraph::logTimings()
const {
279 LOG(info) <<
"Time spent evaluating modules (more details for loopers below):";
280 for (
auto it: module_timings) {
281 LOG(info) <<
" " << it.first->name() <<
": " << duration_cast<duration<double>>(it.second).count() <<
"s";
286 void ComputationGraph::setNDimensions(
size_t n) {
290 size_t ComputationGraph::getNDimensions()
const {
294 ComputationGraphBuilder::ComputationGraphBuilder(
const momemta::ModuleList& available_modules,
296 available_modules(available_modules), configuration(configuration) { }
301 const auto& requested_modules = configuration.
getModules();
304 for (
const auto& module: requested_modules) {
305 vertex_t v = boost::add_vertex(g);
309 vertex.name = module.name;
310 vertex.type = module.type;
313 vertex.def = get_module_def(module.type, available_modules);
316 vertex.decl = module;
318 vertices.emplace(module.name, v);
321 graph_exportable =
true;
324 typename boost::graph_traits<Graph>::vertex_iterator vtx_it, vtx_it_end;
325 for (std::tie(vtx_it, vtx_it_end) = boost::vertices(g); vtx_it != vtx_it_end; vtx_it++) {
327 const auto& vertex = g[*vtx_it];
330 for (
const auto& output: vertex.def.outputs) {
333 typename boost::graph_traits<Graph>::vertex_iterator test_vtx_it, test_vtx_it_end;
334 for (std::tie(test_vtx_it, test_vtx_it_end) = boost::vertices(g); test_vtx_it != test_vtx_it_end;
337 const auto& test_module = g[*test_vtx_it];
340 if (test_module.name == vertex.name)
344 const auto& test_module_def = test_module.def;
345 for (
const auto& input: test_module_def.inputs) {
348 momemta::getInputTagsForInput(input, *test_module.decl.parameters);
354 for (
const auto& inputTag: *inputTags) {
355 if (vertex.name == inputTag.module && output.name == inputTag.parameter) {
361 std::tie(e, inserted) = boost::add_edge(*vtx_it, vertices.at(test_module.name), g);
366 edge.description = inputTag.parameter;
367 if (inputTag.isIndexed()) {
368 edge.description +=
"[" + std::to_string(inputTag.index) +
"]";
379 for (
const auto& vertex: vertices) {
380 if (g[vertex.second].type !=
"Looper")
383 const auto& looper_vtx = vertex.second;
384 const auto& looper_decl = g[looper_vtx].decl;
390 for (
const auto& m: looper_path.elements) {
392 auto module_it = vertices.find(m);
393 if (module_it == vertices.end()) {
394 LOG(warning) <<
"Module '" << m <<
"' present in Looper '" << looper_decl.
name 395 <<
"' execution path does not exists";
399 auto module_vertex = module_it->second;
400 if (!isConnectedDirectlyTo(g, looper_vtx, module_vertex)) {
403 std::tie(e, inserted) = boost::add_edge(looper_vtx, module_vertex, g);
404 g[e].description =
"virtual link (module in path)";
409 out_edge_iterator_t e, e_end;
410 std::tie(e, e_end) = boost::out_edges(looper_vtx, g);
413 for (; e != e_end; ++e) {
414 auto target = boost::target(*e, g);
417 in_edge_iterator_t i, i_end;
418 std::tie(i, i_end) = boost::in_edges(target, g);
420 for (; i != i_end; ++i) {
421 auto source = boost::source(*i, g);
423 if (source == looper_vtx)
427 if (!isConnectedTo(g, source, looper_vtx)) {
430 std::tie(e, inserted) = boost::add_edge(source, looper_vtx, g);
431 g[e].description =
"virtual link";
451 out_edge_iterator_t o, o_end;
452 std::tie(o, o_end) = boost::out_edges(vertices.at(
"cuba"), g);
454 std::set<size_t> unique_cuba_indices;
455 for (; o != o_end; ++o) {
456 const auto& inputTag = g[*o].tag;
459 if (inputTag.parameter !=
"ps_points")
462 assert(inputTag.isIndexed());
463 unique_cuba_indices.emplace(inputTag.index);
466 size_t n_dimensions = unique_cuba_indices.size();
473 std::unordered_map<size_t, size_t> new_indices_mapping;
474 size_t current_index = 0;
475 std::tie(o, o_end) = boost::out_edges(vertices.at(
"cuba"), g);
476 for (; o != o_end; ++o) {
477 const auto& module_vertex = g[boost::target(*o, g)];
479 for (
const auto& input: module_vertex.def.inputs) {
483 momemta::getInputTagsForInput(input, *module_vertex.decl.parameters);
489 bool update_decl =
false;
490 std::vector<InputTag> updatedInputTags;
491 for (
const auto& inputTag: *inputTags) {
493 auto updatedInputTag = inputTag;
496 if (inputTag.module ==
"cuba" && inputTag.parameter ==
"ps_points") {
499 auto it = new_indices_mapping.find(inputTag.index);
500 if (it == new_indices_mapping.end()) {
501 new_indices_mapping.emplace(inputTag.index, current_index);
502 updatedInputTag.index = current_index;
505 updatedInputTag.index = it->second;
507 updatedInputTag.update();
510 updatedInputTags.push_back(updatedInputTag);
515 momemta::setInputTagsForInput(input, *module_vertex.decl.parameters, updatedInputTags);
522 const auto& execution_paths = configuration.
getPaths();
525 computationGraph->setNDimensions(n_dimensions);
527 for (
auto vertex: sorted_vertices) {
530 auto find_module_in_path = [&vertex,
this](std::shared_ptr<ExecutionPath> p) ->
bool {
532 auto it = std::find_if(p->elements.begin(), p->elements.end(),
533 [&vertex,
this](
const std::string& element) ->
bool {
534 return g[vertex].name == element;
538 return it != p->elements.end();
541 auto execution_path_it = std::find_if(execution_paths.begin(), execution_paths.end(), find_module_in_path);
542 auto execution_path = execution_path_it == execution_paths.end() ? DEFAULT_EXECUTION_PATH :
543 (*execution_path_it)->id;
546 assert(!computationGraph->getPaths().empty() || execution_path == DEFAULT_EXECUTION_PATH);
548 computationGraph->addDecl(execution_path, g[vertex].decl);
551 return computationGraph;
554 void ComputationGraphBuilder::prune_graph() {
557 for (
auto it = vertices.begin(), ite = vertices.end(); it != ite;) {
558 if (boost::out_degree(it->second, g) == 0) {
560 auto& vertex = g[it->second];
563 if (vertex.def.internal || vertex.def.sticky) {
569 LOG(info) <<
"Module '" << it->first <<
"' output is not used by any other module. Removing it from the configuration.";
570 boost::clear_vertex(it->second, g);
571 boost::remove_vertex(it->second, g);
573 it = vertices.erase(it);
580 typename boost::graph_traits<Graph>::vertex_iterator vtx_it, vtx_it_end;
581 for (std::tie(vtx_it, vtx_it_end) = boost::vertices(g); vtx_it != vtx_it_end; vtx_it++) {
582 g[*vtx_it].id =
id++;
586 void ComputationGraphBuilder::sort_graph() {
587 const std::vector<std::shared_ptr<ExecutionPath>>& execution_paths = configuration.
getPaths();
590 boost::topological_sort(g, std::front_inserter(sorted_vertices),
591 boost::vertex_index_map(boost::get(&Vertex::id, g)));
594 LOG(fatal) <<
"Exception while sorting the graph. Graphviz representation saved as graph.debug";
599 sorted_vertices.erase(std::remove_if(sorted_vertices.begin(), sorted_vertices.end(),
600 [
this](
const vertex_t& vertex) ->
bool {
601 return g[vertex].def.internal;
602 }), sorted_vertices.end());
605 void ComputationGraphBuilder::validate() {
607 auto log_and_throw_unresolved_input = [](
const std::string& module_name,
const InputTag& input) {
608 LOG(fatal) <<
"Module '" << module_name <<
"' requested a non-existing input (" << input.toString() <<
")";
609 throw unresolved_input(
"Module '" + module_name +
"' requested a non-existing input (" + input.toString() +
")");
613 for (
auto it = vertices.begin(), ite = vertices.end(); it != ite; it++) {
615 const auto& vertex = g[it->second];
617 if (vertex.def.internal)
620 const auto& inputs = vertex.def.inputs;
622 for (
const auto& input_def: inputs) {
626 momemta::getInputTagsForInput(input_def, *vertex.decl.parameters);
632 for (
const auto& inputTag: *inputTags) {
633 auto target_it = vertices.find(inputTag.module);
635 if (target_it == vertices.end()) {
637 log_and_throw_unresolved_input(it->first, inputTag);
642 const auto& target_module_outputs = g[target_it->second].def.outputs;
643 auto output_it = std::find_if(target_module_outputs.begin(),
644 target_module_outputs.end(),
645 [&inputTag](
const ArgDef& output) {
646 return inputTag.parameter == output.name;
649 if (output_it == target_module_outputs.end()) {
651 log_and_throw_unresolved_input(it->first, inputTag);
658 std::map<vertex_t, std::vector<vertex_t>> modules_not_in_path;
659 for (
const auto& vertex: sorted_vertices) {
660 if (g[vertex].type ==
"Looper") {
662 const auto& decl = g[vertex].decl;
664 out_edge_iterator_t e, e_end;
665 std::tie(e, e_end) = boost::out_edges(vertex, g);
668 for (; e != e_end; ++e) {
669 auto target = boost::target(*e, g);
672 if (! checkInPath(decl, g[target].name)) {
673 auto& loopers = modules_not_in_path[target];
674 auto it = std::find(loopers.begin(), loopers.end(), vertex);
675 if (it == loopers.end())
676 loopers.push_back(vertex);
678 modules_not_in_path.erase(target);
684 if (modules_not_in_path.size() != 0) {
686 auto it = modules_not_in_path.begin();
688 auto target = g[it->first];
690 std::stringstream loopers;
691 for (
size_t i = 0; i < it->second.size(); i++) {
692 loopers <<
"'" << g[it->second[i]].name <<
"'";
693 if (i != it->second.size() - 1)
697 std::string plural = it->second.size() > 1?
"s" :
"";
698 std::string one_of_the = it->second.size() > 1 ?
"one of the" :
"the";
700 LOG(fatal) <<
"Module '" << target.name <<
"' is configured to use Looper " << loopers.str()
701 <<
" output" << plural <<
", but is not actually part of the Looper" << plural <<
" execution path. This will lead to undefined " 702 <<
"behavior. You can fix the issue by adding the module '" 704 <<
"' to " << one_of_the <<
" Looper" << plural <<
" execution path";
718 const std::vector<std::shared_ptr<ExecutionPath>>& paths):
719 graph(g), paths(paths) {}
723 void writeVertex(std::ostream& out,
const vertex_t& v)
const {
724 std::string shape =
"ellipse";
725 std::string color =
"black";
726 std::string style =
"solid";
727 std::string extra =
"";
729 if (graph[v].def.internal) {
735 if (graph[v].type ==
"Looper") {
740 out <<
"[shape=\"" << shape <<
"\",color=\"" << color <<
"\",style=\"" << style
741 <<
"\",label=\"" << graph[v].name <<
"\"";
743 if (!extra.empty()) {
751 void writeEdge(std::ostream& out,
const edge_t& e)
const {
752 std::string color =
"black";
753 std::string style =
"solid";
754 std::string extra =
"";
758 extra =
"constraint=false";
761 out <<
"[color=\"" << color <<
"\",style=\"" << style
762 <<
"\",label=\"" << graph[e].description <<
"\"";
764 if (!extra.empty()) {
772 void writeGraph(std::ostream& out)
const {
775 size_t color_index = 0;
776 for (
const auto& path: paths) {
778 out <<
"subgraph cluster_" << index <<
" {" << std::endl;
780 out << R
"(style=filled; fillcolor=")" << colors[color_index] << R"(";)" << std::endl; 781 path_colors.emplace(path->id, colors[color_index]); 783 auto looper_vertex = find_looper(path->id);
785 if (looper_vertex != boost::graph_traits<Graph>::null_vertex())
786 out <<
" label=\"" << graph[looper_vertex].name <<
" execution path\";" << std::endl;
789 for (
const auto& e: path->elements) {
790 auto vertex = find_vertex(e);
791 if (vertex != boost::graph_traits<Graph>::null_vertex())
792 out << graph[vertex].id <<
"; ";
796 out <<
"}" << std::endl;
800 if (color_index >= colors.size())
806 vertex_t find_vertex(
const std::string& name)
const {
807 typename boost::graph_traits<Graph>::vertex_iterator vtx_it, vtx_it_end;
809 for(boost::tie(vtx_it, vtx_it_end) = boost::vertices(graph); vtx_it != vtx_it_end; ++vtx_it) {
810 if (graph[*vtx_it].name == name)
814 return boost::graph_traits<Graph>::null_vertex();
817 vertex_t find_looper(
const uuid& path)
const {
818 typename boost::graph_traits<Graph>::vertex_iterator vtx_it, vtx_it_end;
820 for(boost::tie(vtx_it, vtx_it_end) = boost::vertices(graph); vtx_it != vtx_it_end; ++vtx_it) {
821 if (graph[*vtx_it].type ==
"Looper") {
822 const auto& looper_path = graph[*vtx_it].decl.parameters->get<
ExecutionPath>(
"path");
823 if (looper_path.id == path)
828 return boost::graph_traits<Graph>::null_vertex();
832 const std::vector<std::shared_ptr<ExecutionPath>> paths;
834 mutable std::unordered_map<uuid, std::string,
835 boost::hash<uuid>> path_colors;
837 const std::array<std::string, 5> colors = {{
851 void operator()(std::ostream& out,
const vertex_t& v)
const {
852 writer->writeVertex(out, v);
855 void operator()(std::ostream& out,
const edge_t& e)
const {
856 writer->writeEdge(out, e);
859 void operator()(std::ostream& out)
const {
860 writer->writeGraph(out);
864 std::shared_ptr<graph_writer> writer;
867 void graphviz_export(
const Graph& g,
868 const std::vector<std::shared_ptr<ExecutionPath>>& paths,
869 const std::string& filename) {
871 std::ofstream f(filename.c_str());
872 auto writer = std::make_shared<graph_writer>(g, paths);
878 if (! graph_exportable)
881 graphviz_export(g, configuration.getPaths(), output);
size_t getNDimensions() const
std::string name
Name of the module (user-defined from the configuration file)
std::shared_ptr< ComputationGraph > build()
Build the computation graph.
std::shared_ptr< ParameterSet > parameters
Module's parameters, as parsed from the configuration file.
std::vector< std::shared_ptr< ExecutionPath > > getPaths() const
Defines an input / output.
void exportGraph(const std::string &output) const
Export a GraphViz representation of the computation graph to a file.
A frozen snapshot of the configuration file.
A module declaration, defined from the configuration file.
const std::vector< ModuleDecl > & getModules() const