NRP Core  1.4.1
input_edge.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef INPUT_EDGE_H
23 #define INPUT_EDGE_H
24 
25 #include <boost/python.hpp>
26 
30 
31 template<class T, class T_IN>
32 concept INPUT_C = std::is_base_of_v<InputNode<T_IN>, T>;
33 
37 template <class T_IN, class T_OUT, INPUT_C<T_IN> INPUT_CLASS>
38 class InputEdge {
39 
40 public:
41 
42  InputEdge() = delete;
43 
47  InputEdge(std::string keyword, std::string id, std::string port, InputNodePolicies::MsgPublishPolicy msgPublishPolicy,
48  InputNodePolicies::MsgCachePolicy msgCachePolicy) :
49  _keyword(std::move(keyword)), _id(std::move(id)), _port(std::move(port)), _msgPublishPolicy(std::move(msgPublishPolicy)),
50  _msgCachePolicy(std::move(msgCachePolicy))
51  {}
52 
59  boost::python::object pySetup(const boost::python::object& obj)
60  {
61  // Register input node
62  std::shared_ptr<ComputationalNode> node(makeNewNode());
64  INPUT_CLASS* iNode = dynamic_cast<INPUT_CLASS*>(node.get());
65  if(!iNode)
66  throw NRPException::logCreate("Error in creating Input Node: a node with the same name (\""+node->id()+"\") is already registered with a different type");
67 
68  iNode->setMsgPublishPolicy(_msgPublishPolicy);
69  iNode->setMsgCachePolicy(_msgCachePolicy);
70  iNode->registerOutput(_port);
71 
72  // Register edge
73  if(boost::python::extract<std::shared_ptr<PythonFunctionalNode>>(obj).check()) {
74  // PythonFunctionalNode case
75  std::shared_ptr<PythonFunctionalNode> pyFn = boost::python::extract<std::shared_ptr<PythonFunctionalNode>>(obj);
76  registerEdgePythonFN(iNode, pyFn);
77  }
78  else if(boost::python::extract<std::shared_ptr<FunctionalNodeBase>>(obj).check()) {
79  // FunctionalNodeBase case
80  std::shared_ptr<FunctionalNodeBase> pyFn = boost::python::extract<std::shared_ptr<FunctionalNodeBase>>(obj);
81  registerEdgeFNBase(iNode, pyFn);
82  }
83  else
84  throw NRPException::logCreate("InputEdge \""+node->id()+"\" was called with the wrong argument type. Argument must be a FunctionalNode object");
85 
86  // Returns FunctionalNode
87  return obj;
88  }
89 
90 protected:
91 
95  void registerEdgePythonFN(INPUT_CLASS* iNode, std::shared_ptr<PythonFunctionalNode>& pyFn)
96  {
97  try {
99  if(iNode->msgPublishPolicy() == InputNodePolicies::MsgPublishPolicy::LAST)
100  cgm.registerEdge<T_IN, boost::python::object>(
101  iNode->getSinglePort(_port), pyFn->getOrRegisterInput<T_IN>(_keyword));
102  else
103  cgm.registerEdge<std::vector<const T_IN*>, boost::python::object>(
104  iNode->getListPort(_port), pyFn->getOrRegisterInput<std::vector<const T_IN*>>(_keyword));
105  }
106  catch (const NRPException&) {
107  throw NRPException::logCreate("An error occurred while creating graph edge to input node \""+iNode->id()+"\"");
108  }
109  }
110 
114  void registerEdgeFNBase(INPUT_CLASS* iNode, std::shared_ptr<FunctionalNodeBase>& pyFn)
115  {
116  try {
118  std::string errorMsg = "Port \"" + _keyword + "\" of Node \"" + pyFn->id() +
119  "\" can't be connected to port \"" + _port + "\" of Node " + iNode->id() +
120  "\". Probably there is a mismatch between the port types. Review your CG configuration.";
121  if(iNode->msgPublishPolicy() == InputNodePolicies::MsgPublishPolicy::LAST) {
122  auto iPort = dynamic_cast<InputPort<T_IN, T_OUT>*>(pyFn->getInputById(_keyword));
123  if(!iPort)
124  throw NRPException::logCreate(errorMsg);
125 
126  cgm.registerEdge<T_IN, T_OUT>(
127  iNode->getSinglePort(_port), iPort);
128  }
129  else {
130  auto iPort = dynamic_cast<InputPort<std::vector<const T_IN *>, std::vector<const T_OUT *>>*>(pyFn->getInputById(_keyword));
131  if(!iPort)
132  throw NRPException::logCreate(errorMsg);
133 
134  cgm.registerEdge<std::vector<const T_IN *>, std::vector<const T_OUT *>>(
135  iNode->getListPort(_port), iPort);
136  }
137  }
138  catch (const NRPException&) {
139  throw NRPException::logCreate("An error occurred while creating graph edge to input node \""+iNode->id()+"\"");
140  }
141  }
142 
148  virtual INPUT_CLASS* makeNewNode() = 0;
149 
150  std::string _keyword;
151  std::string _id;
152  std::string _port;
153 
156 };
157 
158 template <class T_IN, INPUT_C<T_IN> INPUT_CLASS>
160 
161 #endif //INPUT_EDGE_H
InputNodePolicies::MsgCachePolicy
MsgCachePolicy
Defines input node message cache behavior.
Definition: computational_node_policies.h:27
NRPException
Base NRPException class.
Definition: nrp_exceptions.h:36
InputEdge::_msgPublishPolicy
InputNodePolicies::MsgPublishPolicy _msgPublishPolicy
Definition: input_edge.h:154
input_node.h
InputEdge::_msgCachePolicy
InputNodePolicies::MsgCachePolicy _msgCachePolicy
Definition: input_edge.h:155
INPUT_C
concept INPUT_C
Definition: input_edge.h:32
ComputationalGraphManager::registerNode
void registerNode(std::shared_ptr< ComputationalNode > &obj)
Register a node in the graph.
Definition: computational_graph_manager.h:75
InputEdge::_port
std::string _port
Definition: input_edge.h:152
InputEdge::makeNewNode
virtual INPUT_CLASS * makeNewNode()=0
InputEdge::InputEdge
InputEdge(std::string keyword, std::string id, std::string port, InputNodePolicies::MsgPublishPolicy msgPublishPolicy, InputNodePolicies::MsgCachePolicy msgCachePolicy)
Constructor.
Definition: input_edge.h:47
InputEdge::registerEdgeFNBase
void registerEdgeFNBase(INPUT_CLASS *iNode, std::shared_ptr< FunctionalNodeBase > &pyFn)
registers an edge between iNode and pyFn. FunctionalNodeBase case
Definition: input_edge.h:114
ComputationalGraphManager::getInstance
static ComputationalGraphManager & getInstance()
Get singleton instance of ComputationalGraphManager.
Definition: computational_graph_manager.cpp:31
InputEdge::_id
std::string _id
Definition: input_edge.h:151
InputEdge::registerEdgePythonFN
void registerEdgePythonFN(INPUT_CLASS *iNode, std::shared_ptr< PythonFunctionalNode > &pyFn)
registers an edge between iNode and pyFn. PythonFunctionalNode case
Definition: input_edge.h:95
computational_graph_manager.h
InputEdge::_keyword
std::string _keyword
Definition: input_edge.h:150
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
InputPort
Implementation of an input port in the computation graph.
Definition: input_port.h:42
python_json_engine.port
port
Definition: python_json_engine.py:197
InputNodePolicies::LAST
@ LAST
Definition: computational_node_policies.h:34
InputNodePolicies::MsgPublishPolicy
MsgPublishPolicy
Defines how an input node publish stored msgs.
Definition: computational_node_policies.h:33
InputEdge::pySetup
boost::python::object pySetup(const boost::python::object &obj)
call function in associated Python decorator
Definition: input_edge.h:59
functional_node.h
InputEdge::InputEdge
InputEdge()=delete
InputEdge
Helper template class used to implement Python input edge decorators.
Definition: input_edge.h:38