NRP Core  1.4.1
computational_graph_handle.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef COMPUTATION_GRAPH_HANDLE_H
23 #define COMPUTATION_GRAPH_HANDLE_H
24 
25 #ifdef ROS_ON
26 #include "ros/ros.h"
27 #endif
28 
33 
35 
37 
39 
44 
45  ComputationalGraphHandle(SimulationDataManager * simulationDataManager, bool slaveMode = false, bool spinROS = false) :
46  DataPackProcessor(simulationDataManager),
47  _slaveMode(slaveMode),
48  _spinROS(spinROS)
49  {}
50 
51  bool _slaveMode;
52  bool _spinROS;
53  PyGILState_STATE _pyGILState;
54 
58  std::map<std::string, InputEngineNode*> _inputs;
59 
63  std::map<std::string, OutputEngineNode*> _outputs;
64 
66  InputClockNode* _clock = nullptr;
67 
70 
71 
72  void init(const jsonSharedPtr &simConfig, const engine_interfaces_t &engines) override
73  {
74  // Load Computation Graph
75  if(!_slaveMode) {
76  boost::python::dict globalDict;
77  // When controlling the graph set output driven mode to optimize on graph node execution
78  createPythonGraphFromConfig(simConfig->at("ComputationalGraph"),
79  ComputationalGraph::ExecMode::OUTPUT_DRIVEN, globalDict);
80  }
81 
83 
84  // Find Engine nodes in graph
85  for(const auto &engine : engines) {
86  std::string input_id = engine->engineName()+"_input";
87  std::string output_id = engine->engineName()+"_output";
88 
89  if(gm.getNode(input_id) && dynamic_cast<InputEngineNode *>(gm.getNode(input_id)))
90  _inputs[engine->engineName()] = dynamic_cast<InputEngineNode *>(gm.getNode(input_id));
91 
92  if(gm.getNode(output_id) && dynamic_cast<OutputEngineNode *>(gm.getNode(output_id))) {
93  auto o_node = dynamic_cast<OutputEngineNode *>(gm.getNode(output_id));
94  // setting computePeriod to 0 so node is only executed when linked Engine is synced
95  if(!_slaveMode)
96  o_node->setComputePeriod(0);
97  _outputs[engine->engineName()] = o_node;
98  }
99  }
100 
101  // Find Clock and Iteration nodes
102  std::tie(_clock, _iteration) = findTimeNodes();
103  }
104 
105  void updateDataPacksFromEngines(const std::vector<EngineClientInterfaceSharedPtr> &engines) override
106  {
107  if(_slaveMode)
108  _pyGILState = PyGILState_Ensure();
109 
110  try
111  {
112  for(auto &engine : engines)
113  if(_inputs.count(engine->engineName())) {
114  _inputs[engine->engineName()]->setDataPacks(
115  engine->getDataPacksFromEngine(_inputs[engine->engineName()]->requestedDataPacks()));
116  }
117  }
118  catch(std::exception &)
119  {
120  // TODO: Handle failure on datapack retrieval
121  throw;
122  }
123 
124  if(_slaveMode)
125  PyGILState_Release(_pyGILState);
126  }
127 
128  void compute(const std::vector<EngineClientInterfaceSharedPtr> & engines) override
129  {
130  if(!_slaveMode) {
131 #ifdef ROS_ON
132  if(_spinROS)
133  ros::spinOnce();
134 #endif
135  for(auto &engine : engines)
136  if(_outputs.count(engine->engineName()))
137  _outputs[engine->engineName()]->setDoCompute(true);
138 
139  if(_clock)
141 
142  if(_iteration)
144 
146  }
147  }
148 
149  void sendDataPacksToEngines(const std::vector<EngineClientInterfaceSharedPtr> &engines) override
150  {
151  if(_slaveMode)
152  _pyGILState = PyGILState_Ensure();
153 
154  for(const auto &engine : engines)
155  if(_outputs.count(engine->engineName())) {
156  try {
157  auto devs = _outputs[engine->engineName()]->getDataPacks();
158  engine->sendDataPacksToEngine(devs);
159  }
160  catch (std::exception &e) {
161  throw NRPException::logCreate(e,
162  "Failed to send datapacks to engine \"" + engine->engineName() + "\"");
163  }
164  }
165 
166  if(_slaveMode)
167  PyGILState_Release(_pyGILState);
168  }
169 };
170 
171 #endif // COMPUTATION_GRAPH_HANDLE_H
ComputationalGraphHandle
Uses a Computation Graph to execute datapack transformation operations.
Definition: computational_graph_handle.h:43
ComputationalGraphHandle::compute
void compute(const std::vector< EngineClientInterfaceSharedPtr > &engines) override
Perform computations on datapacks.
Definition: computational_graph_handle.h:128
ComputationalGraphHandle::_outputs
std::map< std::string, OutputEngineNode * > _outputs
Map containing all OutputEngineNodes associated with this simulation.
Definition: computational_graph_handle.h:63
createPythonGraphFromConfig
void createPythonGraphFromConfig(const nlohmann::json &config, const ComputationalGraph::ExecMode &execMode, const boost::python::dict &globalDict)
Definition: graph_utils.h:32
InputClockNode
Specialization of InputTimeBaseNode which stores and sends system clock in milliseconds.
Definition: input_time.h:77
input_node.h
ComputationalGraphManager::compute
void compute()
Executes ComputationalGraph.
Definition: computational_graph_manager.h:126
InputEngineNode
Input node used to connect an EngineClient with the computational graph.
Definition: input_node.h:40
ComputationalGraphHandle::_inputs
std::map< std::string, InputEngineNode * > _inputs
Map containing all InputEngineNodes associated with this simulation.
Definition: computational_graph_handle.h:58
ComputationalGraphHandle::init
void init(const jsonSharedPtr &simConfig, const engine_interfaces_t &engines) override
Initializes the handler.
Definition: computational_graph_handle.h:72
ComputationalGraphHandle::_iteration
InputIterationNode * _iteration
Pointer to the iteration_node of the graph.
Definition: computational_graph_handle.h:69
InputClockNode::updateClock
void updateClock(SimulationTime newTime)
Sets Node clock in milliseconds.
Definition: input_time.h:86
python_interpreter_state.h
ComputationalGraphHandle::sendDataPacksToEngines
void sendDataPacksToEngines(const std::vector< EngineClientInterfaceSharedPtr > &engines) override
Send datapacks to engines.
Definition: computational_graph_handle.h:149
jsonSharedPtr
std::shared_ptr< nlohmann::json > jsonSharedPtr
Definition: json_schema_utils.h:35
DataPackProcessor
Helper class for FTILoop encapsulating the datapack operations between Engines in a simulation loop.
Definition: datapack_handle.h:34
ComputationalGraphManager::getInstance
static ComputationalGraphManager & getInstance()
Get singleton instance of ComputationalGraphManager.
Definition: computational_graph_manager.cpp:31
graph_utils.h
DataPackProcessor::_simulationIteration
unsigned long _simulationIteration
Definition: datapack_handle.h:131
OutputNode::setComputePeriod
void setComputePeriod(unsigned int computePeriod)
Definition: output_node.h:98
ComputationalGraphManager::getNode
ComputationalNode * getNode(const std::string &id)
Retrieve a node from the graph as a pointer.
Definition: computational_graph_manager.h:95
InputIterationNode::updateIteration
void updateIteration(ulong newIteration)
Sets Node iteration number.
Definition: input_time.h:118
ComputationalGraphHandle::_spinROS
bool _spinROS
Definition: computational_graph_handle.h:52
computational_graph_manager.h
output_node.h
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
ComputationalGraphHandle::ComputationalGraphHandle
ComputationalGraphHandle(SimulationDataManager *simulationDataManager, bool slaveMode=false, bool spinROS=false)
Definition: computational_graph_handle.h:45
SimulationDataManager
Manages all simulation data.
Definition: simulation_data_manager.h:53
ComputationalGraphHandle::_pyGILState
PyGILState_STATE _pyGILState
Definition: computational_graph_handle.h:53
findTimeNodes
std::pair< InputClockNode *, InputIterationNode * > findTimeNodes()
Definition: graph_utils.h:61
ComputationalGraphHandle::_slaveMode
bool _slaveMode
Definition: computational_graph_handle.h:51
datapack_handle.h
InputIterationNode
Specialization of InputTimeBaseNode which stores and sends system iteration number.
Definition: input_time.h:109
input_time.h
ComputationalGraphHandle::updateDataPacksFromEngines
void updateDataPacksFromEngines(const std::vector< EngineClientInterfaceSharedPtr > &engines) override
Request datapacks from engines.
Definition: computational_graph_handle.h:105
ComputationalGraphHandle::_clock
InputClockNode * _clock
Pointer to the clock_node of the graph.
Definition: computational_graph_handle.h:66
ComputationalGraphManager
Singleton class managing a computational graph.
Definition: computational_graph_manager.h:45
DataPackProcessor::_simulationTime
SimulationTime _simulationTime
Definition: datapack_handle.h:130
OutputEngineNode
Output node used to connect the computational graph with an EngineClient.
Definition: output_node.h:37
DataPackProcessor::engine_interfaces_t
std::vector< EngineClientInterfaceSharedPtr > engine_interfaces_t
Definition: datapack_handle.h:38