NRP Core  1.4.1
input_node.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef INPUT_ROS_NODE_H
23 #define INPUT_ROS_NODE_H
24 
25 #include <boost/python.hpp>
26 #include <boost/shared_ptr.hpp>
27 #include <boost/function.hpp>
28 #include <mutex>
29 
32 
34 
40 template<class MSG_TYPE>
41 class InputROSNode : public InputNode<MSG_TYPE> {
42 public:
43 
47  InputROSNode(const std::string &id) :
48  InputNode<MSG_TYPE>(id)
49  {}
50 
51  std::string typeStr() const override
52  { return "RosSubscriber"; }
53 
54 protected:
55 
56  void configure() override
57  {
58  // creates ROS subscriber
59  using boost::placeholders::_1;
60  boost::function<void (const boost::shared_ptr<MSG_TYPE const>&)> callback = boost::bind(&InputROSNode::topic_callback, this, _1);
61  NRPROSProxy* rosProxy = &(NRPROSProxy::getInstance());
62  if(rosProxy)
63  rosProxy->subscribe(this->id(), callback);
64  else
65  NRPLogger::warn("From InputROSNode \"" + this->id() +
66  "\". NRPCoreSim is not connected to ROS and this node can't subscribe to topics. Add \"ROSNode\" parameter to your experiment configuration");
67 
68  // reserves memory space for storing incoming msgs
69  _msgTemp.reserve(InputNode<MSG_TYPE>::_queueSize);
70  _msgStore.reserve(InputNode<MSG_TYPE>::_queueSize);
71  }
72 
76  void topic_callback(const boost::shared_ptr<MSG_TYPE const>& msg)
77  {
78  std::lock_guard<std::mutex> lock(_msgMutex);
79 
80  // store msg pointer
81  if(_msgTemp.size() < _msgTemp.capacity())
82  _msgTemp.push_back(std::move(msg));
83  else
84  NRPLogger::debug("'"+this->id()+"' node capacity is full. Dropping message...");
85  }
86 
87  bool updatePortData(const std::string& id) override
88  {
89  std::lock_guard<std::mutex> lock(_msgMutex);
90 
91  // TODO: check that 'id' is equal to the topic address this node subscribes to?
92 
93  if(_msgTemp.size()) {
94  _msgStore.clear();
95  InputNode<MSG_TYPE>::_portMap.at(id).clear();
96 
97  for(auto & msg : _msgTemp) {
98  _msgStore.push_back(std::move(msg));
99  InputNode<MSG_TYPE>::_portMap.at(id).addMsg(_msgStore.back().get());
100  }
101 
102  _msgTemp.clear();
103 
104  return true;
105  }
106 
107  return false;
108  }
109 
110 private:
111 
113  std::mutex _msgMutex;
115  std::vector<boost::shared_ptr<MSG_TYPE const>> _msgTemp;
117  std::vector<boost::shared_ptr<MSG_TYPE const>> _msgStore;
118 
119 
120 };
121 
122 
123 template<class MSG_TYPE>
124 class InputROSEdge : public SimpleInputEdge<MSG_TYPE, InputROSNode<MSG_TYPE>> {
125 
126 public:
127 
128  InputROSEdge(const std::string& keyword, const std::string& address,
129  InputNodePolicies::MsgPublishPolicy msgPublishPolicy,
130  InputNodePolicies::MsgCachePolicy msgCachePolicy) :
131  SimpleInputEdge<MSG_TYPE, InputROSNode<MSG_TYPE>>(keyword, address, address, msgPublishPolicy, msgCachePolicy)
132  {}
133 
134 protected:
135 
137  { return new InputROSNode<MSG_TYPE>(this->_id); }
138 };
139 
140 
141 #endif //INPUT_ROS_NODE_H
InputNodePolicies::MsgCachePolicy
MsgCachePolicy
Defines input node message cache behavior.
Definition: computational_node_policies.h:27
NRPLogger::warn
static void warn(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for warning level.
Definition: nrp_logger.h:149
input_node.h
InputROSNode::typeStr
std::string typeStr() const override
Returns the node 'type' as a string.
Definition: input_node.h:51
InputROSNode
Input node used to connect a ROS subscriber to the computational graph.
Definition: input_node.h:41
NRPROSProxy
Definition: nrp_ros_proxy.h:30
nrp_ros_proxy.h
InputNode
Implementation of an input node in the computation graph.
Definition: input_node.h:129
InputROSNode::configure
void configure() override
Configures the node making it ready to execute 'compute'.
Definition: input_node.h:56
InputROSNode::topic_callback
void topic_callback(const boost::shared_ptr< MSG_TYPE const > &msg)
callback function used in the ROS subscriber
Definition: input_node.h:76
ComputationalNode::id
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
NRPROSProxy::getInstance
static NRPROSProxy & getInstance()
Get singleton instance of NRPROSProxy.
Definition: nrp_ros_proxy.cpp:26
InputROSEdge
Definition: input_node.h:124
InputEdge::_id
std::string _id
Definition: input_edge.h:151
NRPROSProxy::subscribe
void subscribe(const std::string &address, const boost::function< void(const boost::shared_ptr< MSG_TYPE const > &)> &callback, size_t queueSize=10)
Subscribe to ROS topic 'address' with callback function 'callback'.
Definition: nrp_ros_proxy.h:55
input_edge.h
InputROSEdge::InputROSEdge
InputROSEdge(const std::string &keyword, const std::string &address, InputNodePolicies::MsgPublishPolicy msgPublishPolicy, InputNodePolicies::MsgCachePolicy msgCachePolicy)
Definition: input_node.h:128
InputNodePolicies::MsgPublishPolicy
MsgPublishPolicy
Defines how an input node publish stored msgs.
Definition: computational_node_policies.h:33
InputROSNode::updatePortData
bool updatePortData(const std::string &id) override
Updates pointers stored in _portMap for port 'id'.
Definition: input_node.h:87
InputEdge
Helper template class used to implement Python input edge decorators.
Definition: input_edge.h:38
NRPLogger::debug
static void debug(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for debug level.
Definition: nrp_logger.h:127
InputROSNode::InputROSNode
InputROSNode(const std::string &id)
Constructor.
Definition: input_node.h:47
InputROSEdge::makeNewNode
InputROSNode< MSG_TYPE > * makeNewNode() override
Definition: input_node.h:136