NRP Core  1.4.1
input_node.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef INPUT_NODE_H
23 #define INPUT_NODE_H
24 
25 #include <memory>
26 #include <map>
27 #include <iostream>
28 
30 
34 
38 template<class DATA>
40 
41  DataPortHandle() = delete;
42 
46  DataPortHandle(const std::string& id, ComputationalNode* parent, size_t queue_size) :
47  singlePort(new OutputPort<DATA>(id, parent)),
48  listPort(new OutputPort<std::vector<const DATA*>>(id, parent))
49  { _data.reserve(queue_size); }
50 
54  void publishLast()
55  {
56  if(_data.size())
57  singlePort->publish(_data.back());
58  }
59 
63  void publishAll()
64  {
65  if(_data.size())
66  listPort->publish(&_data);
67  }
68 
73  {
74  singlePort->publish(nullptr);
75  listPort->publish(nullptr);
76  _data.clear();
77  }
78 
82  bool addMsg(const DATA* msg)
83  {
84  if(_data.size() < _data.capacity()) {
85  _data.push_back(msg);
86  return true;
87  }
88  else {
89  std::string s = "Port handle '" + singlePort->id() + "' reached max size. Added msg will be dropped.";
90  NRPLogger::info(s);
91  return false;
92  }
93  }
94 
98  void clear()
99  { _data.clear(); }
100 
104  size_t size()
105  { return _data.size(); }
106 
108  std::shared_ptr<OutputPort<DATA>> singlePort;
110  std::shared_ptr<OutputPort<std::vector<const DATA*>>> listPort;
111 
112 private:
113 
115  std::vector<const DATA*> _data;
116 };
117 
128 template <class DATA>
129 class InputNode : public ComputationalNode {
130 public:
131 
139  _queueSize(queue_size)
140  { }
141 
145  void registerOutput(const std::string& id)
146  {
147  if(!_portMap.count(id))
148  _portMap.emplace(id, DataPortHandle<DATA>(id, this, _queueSize));
149  else {
150  std::string s = "A port with id '" + id + "' is already registered with node '" + this->id() +
151  "', ignoring this request.";
152  NRPLogger::info(s);
153  }
154  }
155 
159  OutputPort<DATA>* getSinglePort(const std::string& id)
160  { return _portMap.count(id) ? _portMap.at(id).singlePort.get() : nullptr; }
161 
166  { return _portMap.count(id) ? _portMap.at(id).listPort.get() : nullptr; }
167 
169  { return _msgPublishPolicy; }
170 
172  { return _msgCachePolicy; }
173 
176 
179 
180 protected:
181 
185  void compute() override final
186  {
187  for(auto& [id, port]: _portMap) {
188  auto hasNewMsgs = this->updatePortData(id);
189 
190  if(hasNewMsgs) {
192  port.publishLast();
193  else
194  port.publishAll();
195  }
197  port.publishNullandClear();
198  }
199  }
200 
212  virtual bool updatePortData(const std::string& id) = 0;
213 
219  std::map<std::string, DataPortHandle<DATA>> _portMap;
221  size_t _queueSize;
222 
225 };
226 
227 
228 #endif //INPUT_NODE_H
InputNodePolicies::MsgCachePolicy
MsgCachePolicy
Defines input node message cache behavior.
Definition: computational_node_policies.h:27
nrp_logger.h
InputNode::getListPort
OutputPort< std::vector< const DATA * > > * getListPort(const std::string &id)
Returns a pointer to list output port if the port is registered, nullptr otherwise.
Definition: input_node.h:165
DataPortHandle::singlePort
std::shared_ptr< OutputPort< DATA > > singlePort
Port used to send a single msg.
Definition: input_node.h:108
InputNode::InputNode
InputNode(const std::string &id, InputNodePolicies::MsgPublishPolicy msgPublishPolicy=InputNodePolicies::MsgPublishPolicy::LAST, InputNodePolicies::MsgCachePolicy msgCachePolicy=InputNodePolicies::MsgCachePolicy::KEEP_CACHE, size_t queue_size=10)
Constructor.
Definition: input_node.h:135
InputNodePolicies::KEEP_CACHE
@ KEEP_CACHE
Definition: computational_node_policies.h:29
InputNode
Implementation of an input node in the computation graph.
Definition: input_node.h:129
InputNode::_portMap
std::map< std::string, DataPortHandle< DATA > > _portMap
Map containing data to handle topics. Data is guaranteed to be unchanged between 'compute' calls
Definition: input_node.h:219
DataPortHandle::size
size_t size()
Return the size of stored data.
Definition: input_node.h:104
InputNode::registerOutput
void registerOutput(const std::string &id)
Registers an Output port with id 'id' with this node.
Definition: input_node.h:145
InputNode::setMsgCachePolicy
void setMsgCachePolicy(InputNodePolicies::MsgCachePolicy msgCachePolicy)
Definition: input_node.h:177
InputNode::_queueSize
size_t _queueSize
Maximum number of msgs that the node can store per port.
Definition: input_node.h:221
ComputationalNode::id
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
DataPortHandle::DataPortHandle
DataPortHandle()=delete
NRPLogger::info
static void info(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for info level.
Definition: nrp_logger.h:138
InputNode::getSinglePort
OutputPort< DATA > * getSinglePort(const std::string &id)
Returns a pointer to single output port if the port is registered, nullptr otherwise.
Definition: input_node.h:159
InputNode::msgPublishPolicy
InputNodePolicies::MsgPublishPolicy msgPublishPolicy()
Definition: input_node.h:168
InputNode::msgCachePolicy
InputNodePolicies::MsgCachePolicy msgCachePolicy()
Definition: input_node.h:171
DataPortHandle::publishAll
void publishAll()
Publish all items in data.
Definition: input_node.h:63
computational_node.h
InputNode::compute
void compute() override final
Compute. Updates and sends stored msgs.
Definition: input_node.h:185
computational_node_policies.h
InputNode::_msgCachePolicy
InputNodePolicies::MsgCachePolicy _msgCachePolicy
Msg cache policy used by this node.
Definition: input_node.h:217
output_port.h
OutputPort
Implementation of an output port in the computation graph.
Definition: output_port.h:36
InputNode::updatePortData
virtual bool updatePortData(const std::string &id)=0
Updates pointers stored in _portMap for port 'id'.
InputNodePolicies::CLEAR_CACHE
@ CLEAR_CACHE
Definition: computational_node_policies.h:28
python_json_engine.port
port
Definition: python_json_engine.py:197
InputNode::_msgPublishPolicy
InputNodePolicies::MsgPublishPolicy _msgPublishPolicy
Send policy used by this node.
Definition: input_node.h:215
InputNode::ComputationalNodes_INPUT_NODE_UPDATE_POLICY_WITH_KEEP_CACHE_Test
friend class ComputationalNodes_INPUT_NODE_UPDATE_POLICY_WITH_KEEP_CACHE_Test
Definition: input_node.h:223
InputNodePolicies::LAST
@ LAST
Definition: computational_node_policies.h:34
DataPortHandle::DataPortHandle
DataPortHandle(const std::string &id, ComputationalNode *parent, size_t queue_size)
Constructor.
Definition: input_node.h:46
DataPortHandle::addMsg
bool addMsg(const DATA *msg)
Add a new message to the stored data.
Definition: input_node.h:82
InputNodePolicies::MsgPublishPolicy
MsgPublishPolicy
Defines how an input node publish stored msgs.
Definition: computational_node_policies.h:33
DataPortHandle
Helper structure managing data and ports associated with a port id.
Definition: input_node.h:39
DataPortHandle::publishNullandClear
void publishNullandClear()
Publish a null pointer.
Definition: input_node.h:72
DataPortHandle::publishLast
void publishLast()
Publish last item in data.
Definition: input_node.h:54
DataPortHandle::clear
void clear()
Clear data.
Definition: input_node.h:98
DataPortHandle::listPort
std::shared_ptr< OutputPort< std::vector< const DATA * > > > listPort
Port used to send a list of msgs.
Definition: input_node.h:110
InputNode::setMsgPublishPolicy
void setMsgPublishPolicy(InputNodePolicies::MsgPublishPolicy msgPublishPolicy)
Definition: input_node.h:174
ComputationalNode::Input
@ Input
Definition: computational_node.h:36
ComputationalNode
Base class implementing a node in the computational graph.
Definition: computational_node.h:31
InputNode::ComputationalNodes_INPUT_NODE_UPDATE_POLICY_WITH_CLEAR_CACHE_Test
friend class ComputationalNodes_INPUT_NODE_UPDATE_POLICY_WITH_CLEAR_CACHE_Test
Definition: input_node.h:224