NRP Core  1.4.1
functional_node.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef PYTHON_FUNCTIONAL_NODE_H
23 #define PYTHON_FUNCTIONAL_NODE_H
24 
25 #include <boost/python.hpp>
26 #include <boost/python/stl_iterator.hpp>
27 
30 
32 
34 
35 namespace bpy = boost::python;
36 
37 // Helper struct to define a tuple type with N elements of type T
38 template <typename T, size_t N>
39 struct tuple_array {
40  typedef decltype(std::tuple_cat(std::tuple<T>(), typename tuple_array<T, N-1>::type())) type;
41 };
42 
43 template <typename T>
44 struct tuple_array<T, 0> {
45  typedef decltype(std::tuple<>()) type;
46 };
47 
53 class PythonFunctionalNode : public FunctionalNode<typename tuple_array<bpy::object, 10>::type, typename tuple_array<bpy::object, 10>::type> {
54 
55  // TODO: publicly inheriting from FunctionalNode allows to bypass the safety checks when registering port in PythonFunctionalNode
56 
57  // TODO: PythonFunctionalNode should be a template with parameter size_t N = number of inputs and outputs. When doing this
58  // FunctionalNode template is not inferred properly. After figuring this out, instances of '3' should be replaced with N
59 
60  const static size_t input_s = 10;
61  const static size_t output_s = 10;
62 
63 public:
64 
69  FunctionalNode(id, [] (params_t&) { return true; }, policy)
70  {
71  bpy::stl_input_iterator<std::string> begin(o_ports), end;
72  _oPortIds.insert(_oPortIds.begin(), begin, end);
73  }
74 
84  boost::python::object pySetup(boost::python::object obj)
85  {
86  // Create heap allocated shared pointer from 'this' and set it up
87  std::shared_ptr<PythonFunctionalNode> self = this->moveToSharedPtr();
88  self->_pythonF = obj;
89  self->_function = std::bind(&PythonFunctionalNode::pythonCallback, self, std::placeholders::_1 );
90  self->setInputPtrs(std::make_index_sequence<input_s>());
91  self->setOutputPtrs(std::make_index_sequence<output_s>());
92 
93  // Collect function arguments
94  auto inspect = bpy::import("inspect");
95  bpy::stl_input_iterator<std::string> begin(inspect.attr("getfullargspec")(obj).attr("args")), end;
96  self->_iPortIds.insert(self->_iPortIds.begin(), begin, end);
97 
98  // Setup ports
99  if(self->_oPortIds.size() > output_s || self->_iPortIds.size() > output_s)
100  throw NRPException::logCreate("PythonFunctionalNode has not enough input or output ports to wrap the "
101  "requested python object");
102 
103  for(const auto& p : self->_oPortIds)
104  self->registerOutput(p);
105 
106  // Register node
107  std::shared_ptr<ComputationalNode> self_base = std::dynamic_pointer_cast<ComputationalNode>(self);
109 
110  return boost::python::object(self);
111  }
112 
116  template<typename T_IN, size_t N = 0>
118  {
119  int idx = -1;
120  for(size_t n = 0; n < _iPortIds.size(); ++n)
121  if(_iPortIds[n] == id) {
122  idx = n;
123  break;
124  }
125 
126  if(idx == -1)
127  throw NRPException::logCreate("\"" + id + "\" does not match any of the declared input arguments in node \"" + this->id() + "\"");
128 
129  if constexpr (N < input_s) {
130  Port* port = getInputByIndex(N);
131  if(port) {
132  if(port->id() == id) {
133  if(dynamic_cast< InputPort<T_IN, bpy::object>* >(port))
134  return dynamic_cast< InputPort<T_IN, bpy::object>* >(port);
135  else
136  throw NRPException::logCreate("Input \"" + id + "\" has been registered with a different type in node \"" + this->id() + "\"");
137  }
138  else
139  return getOrRegisterInput<T_IN, N+1>(id);
140  }
141  else {
142  _iPortIdsMap.emplace(id, N);
143  return registerInput<N, T_IN, bpy::object>(id);
144  }
145  }
146 
147  throw NRPException::logCreate("There is no input port with name" + id + "registered to this node and no additional ports can be registered");
148  }
149 
153  template<size_t N = 0>
154  OutputPort<bpy::object>* registerOutput(const std::string& id)
155  {
156  if constexpr (N < output_s) {
157  if(!getOutputByIndex<N>())
158  return FunctionalNode::registerOutput<N, bpy::object>(id);
159  else
160  return registerOutput<N+1>(id);
161  }
162 
163  throw NRPException::logCreate("Output can't be registered. All available ports are bound.");
164  }
165 
169  template<size_t N = 0>
170  OutputPort<bpy::object>* getOutput(const std::string& id)
171  { return dynamic_cast<OutputPort<bpy::object>*>(getOutputByIdTuple<N>(id)); }
172 
173 protected:
174 
178  void createEdge(const std::string& port_id, Port* out_port) override
179  {
180  // Get output port
181  OutputPort<bpy::object>* o_port = dynamic_cast<OutputPort<bpy::object>*>(out_port);
182  if(!o_port) {
183  std::stringstream error_msg;
184  error_msg << "In Functional node '" << this->id() << "'. Error While creating edge to port '" << port_id << "'. ";
185  error_msg << "Attempt to connect to port '" << out_port->id() << "', but they are of different types.";
186  throw NRPException::logCreate(error_msg.str());
187  }
188 
189  // Get input port
190  // NOTE: part of the code duplicated here and in FunctionalNode implementation of this function is due to
191  // the different way of accessing input ports
192  InputPort<bpy::object, bpy::object>* i_port = this->getOrRegisterInput<bpy::object>(port_id);
193 
194  // Register edge
195  ComputationalGraphManager::getInstance().registerEdge<bpy::object, bpy::object>(o_port, i_port);
196  }
197 
201  void configure() override
202  {
203  // check unbound inputs and outputs and print warning
204  for(size_t i=0; i < _iPortIds.size(); ++i)
205  if(!getInputByIndex(i)) {
206  std::stringstream s;
207  s << "In python functional node \"" << this->id() << "\". Input argument \"" << _iPortIds[i] <<
208  "\" is not connected" << std::endl;
209  NRPLogger::warn(s.str());
210  }
211 
212  for(const auto& pId : _oPortIds) {
213  if(!getOutput(pId)->subscriptionsSize()) {
214  std::stringstream s;
215  s << "In python functional node \"" << this->id() << "\". Output \"" << pId <<
216  "\" is not connected" << std::endl;
217  NRPLogger::warn(s.str());
218  }
219  }
220  }
221 
222  friend class ComputationalGraphPythonNodes_PYTHON_FUNCTIONAL_NODE_Test;
223 
224 private:
225 
229  bool pythonCallback(params_t&)
230  {
231  boost::python::tuple args;
232  boost::python::dict kwargs;
233  boost::python::object o_output;
234 
235  for(size_t i=0; i < _iPortIds.size(); ++i) {
236  const bpy::object* in = nullptr;
237  if(_iPortIdsMap.find(_iPortIds[i]) != _iPortIdsMap.end())
238  in = *_inputs[_iPortIdsMap.at(_iPortIds[i])];
239  kwargs[_iPortIds[i]] = in != nullptr ? *in : bpy::object();
240  }
241 
242  try {
243  o_output = _pythonF(*args, **kwargs);
244 
245  // Check None case
246  if (o_output.is_none())
247  return false;
248  }
249  catch (const boost::python::error_already_set&) {
250  std::string error_msg = "An error occurred while executing Functional Node with id \"" + this->id() + "\"";
251  NRPLogger::error(error_msg);
252  PyErr_Print();
253  throw NRPException::logCreate(error_msg);
254  }
255 
256  try {
257  // Otherwise a list is expected
258  boost::python::list l_output = boost::python::extract<boost::python::list>(o_output);
259  if(_oPortIds.size() != (size_t)boost::python::len(l_output)) {
260  std::stringstream error_msg;
261  error_msg << "Functional Node with id \"" << this->id() << "\" has " << _oPortIds.size() <<
262  " declared outputs, but returns " << boost::python::len(l_output) << " elements.";
263  throw NRPException::logCreate(error_msg.str());
264  }
265 
266 
267  for(int i=0; i < boost::python::len(l_output); ++i)
268  *_outputs[i] = l_output[i];
269  }
270  catch (const boost::python::error_already_set&) {
271  std::string error_msg = "An error occurred while executing Functional Node with id \"" + this->id() + "\". It is expected to return an object of type list or None";
272  PyErr_Print();
273  throw NRPException::logCreate(error_msg);
274  }
275 
276  return true;
277  }
278 
282  template<std::size_t... Ints>
283  void setInputPtrs(std::index_sequence<Ints...>)
284  { ((_inputs[Ints] = &std::get<Ints>(_params)),...); }
285 
289  template<std::size_t... Ints>
290  void setOutputPtrs(std::index_sequence<Ints...>)
291  { ((_outputs[Ints] = &std::get<input_s+Ints>(_params)),...); }
292 
294  std::shared_ptr<PythonFunctionalNode> moveToSharedPtr()
295  { return std::shared_ptr<PythonFunctionalNode>(new PythonFunctionalNode(std::move(static_cast<PythonFunctionalNode&>(*this)))); }
296 
298  std::vector<std::string> _iPortIds;
300  std::map<std::string, size_t> _iPortIdsMap;
302  std::vector<std::string> _oPortIds;
304  boost::python::object _pythonF;
306  std::array< const bpy::object**, input_s > _inputs;
308  std::array< bpy::object*, output_s > _outputs;
309 };
310 
311 
312 
313 #endif //PYTHON_FUNCTIONAL_NODE_H
ComputationalGraphManager::registerEdge
void registerEdge(OutputPort< T_IN > *source, InputPort< T_IN, T_OUT > *target)
Connects an InputPort to an Output port and registers an edge in the graph between their parent nodes...
Definition: computational_graph_manager.h:107
NRPLogger::warn
static void warn(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for warning level.
Definition: nrp_logger.h:149
PythonFunctionalNode::getOutput
OutputPort< bpy::object > * getOutput(const std::string &id)
Safely get an output port.
Definition: functional_node.h:170
PythonFunctionalNode::getOrRegisterInput
InputPort< T_IN, bpy::object > * getOrRegisterInput(const std::string &id)
Safely get or register an input port.
Definition: functional_node.h:117
FunctionalNodePolicies::ExecutionPolicy
ExecutionPolicy
Possible execution policies for this node.
Definition: computational_node_policies.h:41
Port
Base class implementing a port in the computational graph.
Definition: port.h:30
PythonFunctionalNode::PythonFunctionalNode
PythonFunctionalNode(const std::string &id, const boost::python::list &o_ports, FunctionalNodePolicies::ExecutionPolicy policy=FunctionalNodePolicies::ExecutionPolicy::ON_NEW_INPUT)
Constructor.
Definition: functional_node.h:68
python_error_handler.h
Port::id
const std::string & id()
Returns the port 'id'.
Definition: port.h:47
nrp_exceptions.h
PythonFunctionalNode::registerOutput
OutputPort< bpy::object > * registerOutput(const std::string &id)
Safely registers an output port.
Definition: functional_node.h:154
ComputationalGraphManager::registerNode
void registerNode(std::shared_ptr< ComputationalNode > &obj)
Register a node in the graph.
Definition: computational_graph_manager.h:75
python_json_engine.args
Namespace args
Definition: python_json_engine.py:196
FunctionalNode
Definition: functional_node.h:179
PythonFunctionalNode
Specialization of FunctionalNode in which _function is a python callable object.
Definition: functional_node.h:53
ComputationalGraphManager::getInstance
static ComputationalGraphManager & getInstance()
Get singleton instance of ComputationalGraphManager.
Definition: computational_graph_manager.cpp:31
functional_node.h
FunctionalNodePolicies::ON_NEW_INPUT
@ ON_NEW_INPUT
Definition: computational_node_policies.h:43
tuple_array
Definition: functional_node.h:39
tuple_array::type
decltype(std::tuple_cat(std::tuple< T >(), typename tuple_array< T, N-1 >::type())) typedef type
Definition: functional_node.h:40
computational_graph_manager.h
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
OutputPort
Implementation of an output port in the computation graph.
Definition: output_port.h:36
InputPort
Implementation of an input port in the computation graph.
Definition: input_port.h:42
python_json_engine.port
port
Definition: python_json_engine.py:197
PythonFunctionalNode::configure
void configure() override
Configure this node.
Definition: functional_node.h:201
NRPLogger::error
static void error(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for error level.
Definition: nrp_logger.h:160
PythonFunctionalNode::pySetup
boost::python::object pySetup(boost::python::object obj)
Setup this node with a python callable object and registers it to ComputationalGraphManager.
Definition: functional_node.h:84
PythonFunctionalNode::createEdge
void createEdge(const std::string &port_id, Port *out_port) override
Create an edge in the graph between this node 'port_id' input port and o_port.
Definition: functional_node.h:178