NRP Core  1.4.1
output_node.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef OUTPUT_MQTT_NODE_H
23 #define OUTPUT_MQTT_NODE_H
24 
25 #include <mutex>
26 #include <functional>
27 #include "nlohmann/json.hpp"
28 #include <google/protobuf/message.h>
29 #include "nrp_protobuf/engine_grpc.pb.h"
31 
34 
36 
42 template<class MSG_TYPE>
43 class OutputMQTTNode : public OutputNode<MSG_TYPE> {
44 public:
45 
49  OutputMQTTNode(const std::string &id, const std::string &address,
50  bool publishFromCache = false,
51  unsigned int computePeriod = 1) :
53  _address(address)
54  { }
55 
56  std::string typeStr() const override
57  { return "MQTTPublisher"; }
58 
59 protected:
60 
61  void sendSingleMsg(const std::string& /*id*/, const MSG_TYPE* data) override
62  {
63  // TODO: check that 'id' is equal to the topic address this node publishes to?
64  publishMqttMsg<MSG_TYPE>(data);
65  }
66 
67  void sendBatchMsg(const std::string& /*id*/, const std::vector<const MSG_TYPE*>& /*data*/) override
68  {
69  throw NRPException::logCreate("BATCH update policy is not supported by OutputMQTTNode");
70  }
71 
72  template<class PUB_TYPE>
73  void publishMqttMsg(const PUB_TYPE* data)
74  {
75  NRPMQTTProxy* mqttProxy = &(NRPMQTTProxy::getInstance());
76  if(mqttProxy) {
77  if constexpr (std::is_same_v<PUB_TYPE, std::string>)
78  mqttProxy->publish(_address, *data);
79  else if constexpr (std::is_same_v<PUB_TYPE, nlohmann::json>)
80  mqttProxy->publish(_address, data->dump());
81  else if constexpr (std::is_base_of_v<google::protobuf::Message, PUB_TYPE>) {
82  if(data->SerializeToString(&_msg))
83  mqttProxy->publish(_address, _msg);
84  else
85  NRPLogger::warn("From OutputMQTTNode \""+ this->id() +"\". Failed to serialize protobuf message");
86  }
87  else {
88  // This is required to avoid an irrelevant unused-but-set-parameter compilation error
89  dummyCall(data);
91  "OutputMQTTNode \"" + this->id() + "\" was instantiated with a non supported type");
92  }
93  }
94  else
95  NRPLogger::warn("From OutputMQTTNode \"" + this->id() +
96  "\". NRPCoreSim is not connected to MQTT and this node can't publish. Check your experiment configuration");
97  }
98 
99 private:
100 
101  void dummyCall(const MSG_TYPE* /*data*/)
102  { }
103 
105  std::string _address;
107  std::string _msg;
108 
109 };
110 
111 
112 template<class MSG_TYPE>
113 class OutputMQTTEdge : public SimpleOutputEdge<MSG_TYPE, OutputMQTTNode<MSG_TYPE>> {
114 
115 public:
116 
117  OutputMQTTEdge(const std::string &keyword, const std::string &address,
118  bool publishFromCache = false,
119  unsigned int computePeriod = 1) :
120  SimpleOutputEdge<MSG_TYPE, OutputMQTTNode<MSG_TYPE>>(keyword, address+"_output", address,
121  publishFromCache, computePeriod),
122  _address(address)
123  {}
124 
125 protected:
126 
128  { return new OutputMQTTNode<MSG_TYPE>(this->_id, _address, this->_publishFromCache, this->_computePeriod); }
129 
130  std::string _address;
131 };
132 
134 template <class MSG_TYPE>
135 class DPOutputMQTTNode : public OutputMQTTNode<DataPack<MSG_TYPE>*> {
136 
137 public:
138 
140 
141  DPOutputMQTTNode(const std::string &id, const std::string &address,
142  bool publishFromCache = false,
143  unsigned int computePeriod = 1) :
144  OutputMQTTNode<DataPack<MSG_TYPE>*>(id, address, publishFromCache, computePeriod)
145  { }
146 
147 protected:
148 
149  void sendSingleMsg(const std::string& /*id*/, const DataPackPtr* data) override
150  {
151  if(!data || (*data)->isEmpty()) {
152  NRPLogger::warn("From OutputMQTTNode \""+ this->id() +"\". Received null or empty datapack.");
153  return;
154  }
155 
156  if constexpr (std::is_base_of_v<google::protobuf::Message, MSG_TYPE>) {
157  EngineGrpc::DataPackMessage protoDataPack;
158  protoDataPack.mutable_datapackid()->set_datapackname((*data)->name());
159  protoDataPack.mutable_datapackid()->set_datapacktype((*data)->type());
160  protoDataPack.mutable_datapackid()->set_enginename((*data)->engineName());
161  protoDataPack.mutable_data()->PackFrom((*data)->getData());
162 
163  this->template publishMqttMsg<EngineGrpc::DataPackMessage>(&protoDataPack);
164  }
165  else if constexpr (std::is_same_v<MSG_TYPE, nlohmann::json>) {
166  nlohmann::json jsonDataPack;
167  jsonDataPack["name"] = (*data)->name();
168  jsonDataPack["engine_name"] = (*data)->engineName();
169  jsonDataPack["type"] = (*data)->type();
170  jsonDataPack["data"] = (*data)->getData();
171 
172  this->template publishMqttMsg<nlohmann::json>(&jsonDataPack);
173  }
174  else
176  "DPOutputMQTTNode \"" + this->id() + "\" was instantiated with a non supported datapack type");
177  }
178 };
179 
180 template<class MSG_TYPE>
182 
183 public:
184 
185  DPOutputMQTTEdge(const std::string &keyword, const std::string &address,
186  bool publishFromCache = false,
187  unsigned int computePeriod = 1) :
188  OutputMQTTEdge<DataPack<MSG_TYPE>*>(keyword, address, publishFromCache, computePeriod)
189  {}
190 
191 protected:
192 
194  { return new DPOutputMQTTNode<MSG_TYPE>(this->_id, this->_address, this->_publishFromCache, this->_computePeriod); }
195 
196 };
197 
198 
199 #endif //OUTPUT_MQTT_NODE_H
NRPLogger::warn
static void warn(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for warning level.
Definition: nrp_logger.h:149
OutputEdge::_computePeriod
unsigned int _computePeriod
Definition: output_edge.h:173
OutputEdge::_publishFromCache
bool _publishFromCache
Definition: output_edge.h:172
OutputMQTTNode::publishMqttMsg
void publishMqttMsg(const PUB_TYPE *data)
Definition: output_node.h:73
NRPMQTTProxy::getInstance
static NRPMQTTProxy & getInstance()
Get singleton instance of NRPMQTTProxy.
Definition: nrp_mqtt_proxy.cpp:26
OutputEdge::_id
std::string _id
Definition: output_edge.h:170
OutputNodePolicies
Definition: computational_node_policies.h:47
OutputMQTTNode
Output node used to connect an MQTT publisher to the computational graph.
Definition: output_node.h:43
OutputMQTTNode::sendSingleMsg
void sendSingleMsg(const std::string &, const MSG_TYPE *data) override
Sends out a single msg, to be implemented by derived classes.
Definition: output_node.h:61
DPOutputMQTTNode
Definition: output_node.h:135
OutputNode< MSG_TYPE >::publishFromCache
bool publishFromCache()
Definition: output_node.h:109
output_node.h
ComputationalNode::id
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
nrp_mqtt_proxy.h
OutputMQTTEdge::_address
std::string _address
Definition: output_node.h:130
OutputEdge
Helper class used to implement Python output edge decorators.
Definition: output_edge.h:36
OutputMQTTNode::OutputMQTTNode
OutputMQTTNode(const std::string &id, const std::string &address, bool publishFromCache=false, unsigned int computePeriod=1)
Constructor.
Definition: output_node.h:49
OutputNode
Implementation of an output node in the computation graph.
Definition: output_node.h:38
DPOutputMQTTNode::DPOutputMQTTNode
DPOutputMQTTNode(const std::string &id, const std::string &address, bool publishFromCache=false, unsigned int computePeriod=1)
Definition: output_node.h:141
DPOutputMQTTEdge::DPOutputMQTTEdge
DPOutputMQTTEdge(const std::string &keyword, const std::string &address, bool publishFromCache=false, unsigned int computePeriod=1)
Definition: output_node.h:185
DataPackInterface::isEmpty
bool isEmpty() const
Indicates if the datapack contains any data aside from datapack ID.
Definition: datapack_interface.cpp:74
OutputMQTTEdge::OutputMQTTEdge
OutputMQTTEdge(const std::string &keyword, const std::string &address, bool publishFromCache=false, unsigned int computePeriod=1)
Definition: output_node.h:117
output_edge.h
OutputMQTTNode::typeStr
std::string typeStr() const override
Returns the node 'type' as a string.
Definition: output_node.h:56
OutputNodePolicies::SERIES
@ SERIES
Definition: computational_node_policies.h:51
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
NRPMQTTProxy
Definition: nrp_mqtt_proxy.h:27
datapack.h
OutputMQTTEdge
Definition: output_node.h:113
NRPMQTTProxy::publish
void publish(const std::string &address, const std::string &msg, bool retained=false)
Publishes 'msg' to MQTT topic 'address'.
Definition: nrp_mqtt_proxy.cpp:37
DPOutputMQTTEdge::makeNewNode
OutputMQTTNode< DataPack< MSG_TYPE > * > * makeNewNode() override
Definition: output_node.h:193
DataPack
Base datapack class.
Definition: datapack.h:36
DPOutputMQTTNode::sendSingleMsg
void sendSingleMsg(const std::string &, const DataPackPtr *data) override
Definition: output_node.h:149
OutputMQTTEdge::makeNewNode
OutputMQTTNode< MSG_TYPE > * makeNewNode() override
Definition: output_node.h:127
OutputMQTTNode::sendBatchMsg
void sendBatchMsg(const std::string &, const std::vector< const MSG_TYPE * > &) override
Sends out a vector of msg as a single batch, to be implemented by derived classes.
Definition: output_node.h:67
OutputNodePolicies::PublishFormatPolicy
PublishFormatPolicy
Defines how output nodes send stored msgs.
Definition: computational_node_policies.h:50
DPOutputMQTTEdge
Definition: output_node.h:181
json
nlohmann::json json
Definition: engine_json_server.cpp:31