Go to the documentation of this file.
22 #ifndef INPUT_MQTT_NODE_H
23 #define INPUT_MQTT_NODE_H
27 #include "nlohmann/json.hpp"
28 #include <google/protobuf/message.h>
29 #include "nrp_protobuf/engine_grpc.pb.h"
42 template<
class MSG_TYPE>
55 {
return "MQTTSubscriber"; }
66 using std::placeholders::_1;
71 "\". NRPCoreSim is not connected to MQTT and this node can't subscribe to topics. Check your experiment configuration");
83 std::lock_guard<std::mutex> lock(_msgMutex);
86 if(_msgTemp.size() < _msgTemp.capacity())
89 NRPLogger::debug(
"'"+this->
id()+
"' node capacity is full. Dropping message...");
94 std::lock_guard<std::mutex> lock(_msgMutex);
102 for(
auto & msg : _msgTemp) {
103 _msgStore.push_back(std::move(msg));
123 if constexpr (std::is_same_v<MSG_TYPE, std::string>)
125 else if constexpr (std::is_same_v<MSG_TYPE, nlohmann::json>)
128 return nlohmann::json::parse(s);
130 catch(
const std::exception &e) {
131 std::stringstream error_msg;
132 error_msg <<
"Error in InputMQTTNode while converting incoming msg into json format: " << e.what();
136 else if constexpr (std::is_base_of_v<google::protobuf::Message, MSG_TYPE>)
139 if(m.ParseFromString(s))
145 throw NRPException::logCreate(
"InputMQTTNode \""+ this->
id() +
"\" was instantiated with a non supported type");
154 std::mutex _msgMutex;
156 std::vector<MSG_TYPE> _msgTemp;
158 std::vector<MSG_TYPE> _msgStore;
160 std::string _address;
164 template<
class MSG_TYPE>
185 template <
class MSG_TYPE>
199 if constexpr (std::is_base_of_v<google::protobuf::Message, MSG_TYPE>) {
201 EngineGrpc::DataPackMessage m;
202 if (!m.ParseFromString(s))
204 "Error in DPInputMQTTNode while converting incoming msg into EngineGrpc::DataPackMessage protobuf message");
205 else if (!m.has_data())
207 else if (!m.has_datapackid())
209 else if (!m.data().Is<MSG_TYPE>())
211 "Error in DPInputMQTTNode while converting incoming msg into DataPack. Wrong msg type");
213 MSG_TYPE *data =
new MSG_TYPE();
214 m.data().UnpackTo(data);
215 return DataPack<MSG_TYPE>(m.datapackid().datapackname(), m.datapackid().enginename(), data);
218 else if constexpr (std::is_same_v<MSG_TYPE, nlohmann::json>)
221 auto datapack = nlohmann::json::parse(s);
222 if(!datapack.contains(
"name") || !datapack.contains(
"engine_name"))
224 "\". It must contain keys name and engine_name");
225 else if(!datapack.contains(
"data"))
229 datapack[
"name"].get<std::string>(),
230 datapack[
"engine_name"].get<std::string>(),
233 catch(
const std::exception &e) {
234 std::stringstream error_msg;
235 error_msg <<
"Error in InputMQTTNode while converting incoming msg into json datapack format: " << e.what();
244 template<
class MSG_TYPE>
262 #endif //INPUT_MQTT_NODE_H
static void warn(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for warning level.
Definition: nrp_logger.h:149
static NRPMQTTProxy & getInstance()
Get singleton instance of NRPMQTTProxy.
Definition: nrp_mqtt_proxy.cpp:26
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
Definition: nrp_mqtt_proxy.h:27
void subscribe(const std::string &address, const std::function< void(const std::string &)> &callback)
Subscribe to MQTT topic 'address' with callback function 'callback'.
Definition: nrp_mqtt_proxy.cpp:45
Base datapack class.
Definition: datapack.h:36
static void debug(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for debug level.
Definition: nrp_logger.h:127
nlohmann::json json
Definition: engine_json_server.cpp:31