NRP Core  1.4.1
input_node.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef INPUT_MQTT_NODE_H
23 #define INPUT_MQTT_NODE_H
24 
25 #include <mutex>
26 #include <functional>
27 #include "nlohmann/json.hpp"
28 #include <google/protobuf/message.h>
29 #include "nrp_protobuf/engine_grpc.pb.h"
31 
34 
36 
42 template<class MSG_TYPE>
43 class InputMQTTNode : public InputNode<MSG_TYPE> {
44 public:
45 
49  InputMQTTNode(const std::string &id, const std::string &address) :
50  InputNode<MSG_TYPE>(id),
51  _address(address)
52  {}
53 
54  std::string typeStr() const override
55  { return "MQTTSubscriber"; }
56 
57 protected:
58 
59  void configure() override
60  {
62 
63  // Subscribes to mqtt topic
64  NRPMQTTProxy* mqttProxy = &(NRPMQTTProxy::getInstance());
65  if(mqttProxy) {
66  using std::placeholders::_1;
67  mqttProxy->subscribe(_address, std::bind(&InputMQTTNode::topic_callback, this, _1));
68  }
69  else
70  NRPLogger::warn("From InputMQTTNode \"" + this->id() +
71  "\". NRPCoreSim is not connected to MQTT and this node can't subscribe to topics. Check your experiment configuration");
72 
73  // reserves memory space for storing incoming msgs
74  _msgTemp.reserve(InputNode<MSG_TYPE>::_queueSize);
75  _msgStore.reserve(InputNode<MSG_TYPE>::_queueSize);
76  }
77 
81  void topic_callback(const std::string& msg)
82  {
83  std::lock_guard<std::mutex> lock(_msgMutex);
84 
85  // store msg pointer
86  if(_msgTemp.size() < _msgTemp.capacity())
87  _msgTemp.push_back(_msgFromString(msg));
88  else
89  NRPLogger::debug("'"+this->id()+"' node capacity is full. Dropping message...");
90  }
91 
92  bool updatePortData(const std::string& id) override
93  {
94  std::lock_guard<std::mutex> lock(_msgMutex);
95 
96  // TODO: check that 'id' is equal to the topic address this node subscribes to?
97 
98  if(_msgTemp.size()) {
99  _msgStore.clear();
100  InputNode<MSG_TYPE>::_portMap.at(id).clear();
101 
102  for(auto & msg : _msgTemp) {
103  _msgStore.push_back(std::move(msg));
104  InputNode<MSG_TYPE>::_portMap.at(id).addMsg(&_msgStore.back());
105  }
106 
107  _msgTemp.clear();
108 
109  return true;
110  }
111 
112  return false;
113  }
114 
115 protected:
116 
120  virtual void setMsgFromString()
121  {
122  // Set msg conversion function
123  if constexpr (std::is_same_v<MSG_TYPE, std::string>)
124  _msgFromString = [] (const std::string& s) { return s; };
125  else if constexpr (std::is_same_v<MSG_TYPE, nlohmann::json>)
126  _msgFromString = [] (const std::string& s) {
127  try {
128  return nlohmann::json::parse(s);
129  }
130  catch(const std::exception &e) {
131  std::stringstream error_msg;
132  error_msg << "Error in InputMQTTNode while converting incoming msg into json format: " << e.what();
133  throw NRPException::logCreate(error_msg.str());
134  }
135  };
136  else if constexpr (std::is_base_of_v<google::protobuf::Message, MSG_TYPE>)
137  _msgFromString = [] (const std::string& s) {
138  MSG_TYPE m;
139  if(m.ParseFromString(s))
140  return m;
141  else
142  throw NRPException::logCreate("Error in InputMQTTNode while converting incoming msg into protobuf format");
143  };
144  else
145  throw NRPException::logCreate("InputMQTTNode \""+ this->id() +"\" was instantiated with a non supported type");
146  }
147 
149  std::function<MSG_TYPE (const std::string&)> _msgFromString;
150 
151 private:
152 
154  std::mutex _msgMutex;
156  std::vector<MSG_TYPE> _msgTemp;
158  std::vector<MSG_TYPE> _msgStore;
160  std::string _address;
161 };
162 
163 
164 template<class MSG_TYPE>
165 class InputMQTTEdge : public SimpleInputEdge<MSG_TYPE, InputMQTTNode<MSG_TYPE>> {
166 
167 public:
168 
169  InputMQTTEdge(const std::string& keyword, const std::string& address,
170  InputNodePolicies::MsgPublishPolicy msgPublishPolicy,
171  InputNodePolicies::MsgCachePolicy msgCachePolicy) :
172  SimpleInputEdge<MSG_TYPE, InputMQTTNode<MSG_TYPE>>(keyword, address+"_input", address, msgPublishPolicy, msgCachePolicy),
173  _address(address)
174  {}
175 
176 protected:
177 
179  { return new InputMQTTNode<MSG_TYPE>(this->_id, _address); }
180 
181  std::string _address;
182 };
183 
185 template <class MSG_TYPE>
186 class DPInputMQTTNode : public InputMQTTNode<DataPack<MSG_TYPE>> {
187 public:
191  DPInputMQTTNode(const std::string &id, const std::string &address) :
192  InputMQTTNode<DataPack<MSG_TYPE>>(id, address)
193  { }
194 
195 protected:
196 
197  void setMsgFromString() override
198  {
199  if constexpr (std::is_base_of_v<google::protobuf::Message, MSG_TYPE>) {
200  this->_msgFromString = [](const std::string &s) {
201  EngineGrpc::DataPackMessage m;
202  if (!m.ParseFromString(s))
204  "Error in DPInputMQTTNode while converting incoming msg into EngineGrpc::DataPackMessage protobuf message");
205  else if (!m.has_data())
206  throw NRPException::logCreate("Error in DPInputMQTTNode. Received empty DataPack");
207  else if (!m.has_datapackid())
208  throw NRPException::logCreate("Error in DPInputMQTTNode. Received DataPack without id");
209  else if (!m.data().Is<MSG_TYPE>())
211  "Error in DPInputMQTTNode while converting incoming msg into DataPack. Wrong msg type");
212 
213  MSG_TYPE *data = new MSG_TYPE();
214  m.data().UnpackTo(data);
215  return DataPack<MSG_TYPE>(m.datapackid().datapackname(), m.datapackid().enginename(), data);
216  };
217  }
218  else if constexpr (std::is_same_v<MSG_TYPE, nlohmann::json>)
219  this->_msgFromString = [] (const std::string& s) {
220  try {
221  auto datapack = nlohmann::json::parse(s);
222  if(!datapack.contains("name") || !datapack.contains("engine_name"))
223  throw NRPException::logCreate("malformed json datapack received: \"" + s +
224  "\". It must contain keys name and engine_name");
225  else if(!datapack.contains("data"))
226  throw NRPException::logCreate("empty datapack received. This is not allowed");
227  else
229  datapack["name"].get<std::string>(),
230  datapack["engine_name"].get<std::string>(),
231  new nlohmann::json(std::move(datapack["data"])));
232  }
233  catch(const std::exception &e) {
234  std::stringstream error_msg;
235  error_msg << "Error in InputMQTTNode while converting incoming msg into json datapack format: " << e.what();
236  throw NRPException::logCreate(error_msg.str());
237  }
238  };
239  }
240 
241 };
242 
243 
244 template<class MSG_TYPE>
245 class DPInputMQTTEdge : public InputMQTTEdge<DataPack<MSG_TYPE>> {
246 
247 public:
248 
249  DPInputMQTTEdge(const std::string& keyword, const std::string& address,
250  InputNodePolicies::MsgPublishPolicy msgPublishPolicy,
251  InputNodePolicies::MsgCachePolicy msgCachePolicy) :
252  InputMQTTEdge<DataPack<MSG_TYPE>>(keyword, address, msgPublishPolicy, msgCachePolicy)
253  {}
254 
255 protected:
256 
258  { return new DPInputMQTTNode<MSG_TYPE>(this->_id, this->_address); }
259 
260 };
261 
262 #endif //INPUT_MQTT_NODE_H
InputNodePolicies::MsgCachePolicy
MsgCachePolicy
Defines input node message cache behavior.
Definition: computational_node_policies.h:27
DPInputMQTTNode
Definition: input_node.h:186
NRPLogger::warn
static void warn(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for warning level.
Definition: nrp_logger.h:149
DPInputMQTTNode::DPInputMQTTNode
DPInputMQTTNode(const std::string &id, const std::string &address)
Constructor.
Definition: input_node.h:191
InputMQTTNode::setMsgFromString
virtual void setMsgFromString()
set the msg conversion function depending on template argument MSG_TYPE
Definition: input_node.h:120
input_node.h
InputMQTTNode::_msgFromString
std::function< MSG_TYPE(const std::string &)> _msgFromString
converts msgs in string format to MSG_TYPE
Definition: input_node.h:149
InputNode
Implementation of an input node in the computation graph.
Definition: input_node.h:129
InputMQTTNode::configure
void configure() override
Configures the node making it ready to execute 'compute'.
Definition: input_node.h:59
NRPMQTTProxy::getInstance
static NRPMQTTProxy & getInstance()
Get singleton instance of NRPMQTTProxy.
Definition: nrp_mqtt_proxy.cpp:26
InputMQTTEdge
Definition: input_node.h:165
DPInputMQTTEdge
Definition: input_node.h:245
InputMQTTNode
Input node used to connect a ROS subscriber to the computational graph.
Definition: input_node.h:43
InputMQTTNode::topic_callback
void topic_callback(const std::string &msg)
callback function used in the MQTT subscriber
Definition: input_node.h:81
ComputationalNode::id
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
InputMQTTEdge::makeNewNode
InputMQTTNode< MSG_TYPE > * makeNewNode() override
Definition: input_node.h:178
nrp_mqtt_proxy.h
InputMQTTEdge::_address
std::string _address
Definition: input_node.h:181
InputEdge::_id
std::string _id
Definition: input_edge.h:151
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
NRPMQTTProxy
Definition: nrp_mqtt_proxy.h:27
datapack.h
DPInputMQTTEdge::DPInputMQTTEdge
DPInputMQTTEdge(const std::string &keyword, const std::string &address, InputNodePolicies::MsgPublishPolicy msgPublishPolicy, InputNodePolicies::MsgCachePolicy msgCachePolicy)
Definition: input_node.h:249
DPInputMQTTEdge::makeNewNode
InputMQTTNode< DataPack< MSG_TYPE > > * makeNewNode() override
Definition: input_node.h:257
InputMQTTNode::InputMQTTNode
InputMQTTNode(const std::string &id, const std::string &address)
Constructor.
Definition: input_node.h:49
NRPMQTTProxy::subscribe
void subscribe(const std::string &address, const std::function< void(const std::string &)> &callback)
Subscribe to MQTT topic 'address' with callback function 'callback'.
Definition: nrp_mqtt_proxy.cpp:45
InputMQTTEdge::InputMQTTEdge
InputMQTTEdge(const std::string &keyword, const std::string &address, InputNodePolicies::MsgPublishPolicy msgPublishPolicy, InputNodePolicies::MsgCachePolicy msgCachePolicy)
Definition: input_node.h:169
input_edge.h
DataPack
Base datapack class.
Definition: datapack.h:36
InputNodePolicies::MsgPublishPolicy
MsgPublishPolicy
Defines how an input node publish stored msgs.
Definition: computational_node_policies.h:33
InputMQTTNode::updatePortData
bool updatePortData(const std::string &id) override
Updates pointers stored in _portMap for port 'id'.
Definition: input_node.h:92
InputEdge
Helper template class used to implement Python input edge decorators.
Definition: input_edge.h:38
NRPLogger::debug
static void debug(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for debug level.
Definition: nrp_logger.h:127
DPInputMQTTNode::setMsgFromString
void setMsgFromString() override
set the msg conversion function depending on template argument MSG_TYPE
Definition: input_node.h:197
InputMQTTNode::typeStr
std::string typeStr() const override
Returns the node 'type' as a string.
Definition: input_node.h:54
json
nlohmann::json json
Definition: engine_json_server.cpp:31