Go to the documentation of this file.
22 #ifndef OUTPUT_MQTT_NODE_H
23 #define OUTPUT_MQTT_NODE_H
27 #include "nlohmann/json.hpp"
28 #include <google/protobuf/message.h>
29 #include "nrp_protobuf/engine_grpc.pb.h"
42 template<
class MSG_TYPE>
51 unsigned int computePeriod = 1) :
57 {
return "MQTTPublisher"; }
64 publishMqttMsg<MSG_TYPE>(data);
67 void sendBatchMsg(
const std::string& ,
const std::vector<const MSG_TYPE*>& )
override
72 template<
class PUB_TYPE>
77 if constexpr (std::is_same_v<PUB_TYPE, std::string>)
78 mqttProxy->
publish(_address, *data);
79 else if constexpr (std::is_same_v<PUB_TYPE, nlohmann::json>)
80 mqttProxy->
publish(_address, data->dump());
81 else if constexpr (std::is_base_of_v<google::protobuf::Message, PUB_TYPE>) {
82 if(data->SerializeToString(&_msg))
83 mqttProxy->
publish(_address, _msg);
85 NRPLogger::warn(
"From OutputMQTTNode \""+ this->
id() +
"\". Failed to serialize protobuf message");
91 "OutputMQTTNode \"" + this->
id() +
"\" was instantiated with a non supported type");
96 "\". NRPCoreSim is not connected to MQTT and this node can't publish. Check your experiment configuration");
101 void dummyCall(
const MSG_TYPE* )
105 std::string _address;
112 template<
class MSG_TYPE>
118 bool publishFromCache =
false,
119 unsigned int computePeriod = 1) :
121 publishFromCache, computePeriod),
134 template <
class MSG_TYPE>
143 unsigned int computePeriod = 1) :
151 if(!data || (*data)->
isEmpty()) {
152 NRPLogger::warn(
"From OutputMQTTNode \""+ this->
id() +
"\". Received null or empty datapack.");
156 if constexpr (std::is_base_of_v<google::protobuf::Message, MSG_TYPE>) {
157 EngineGrpc::DataPackMessage protoDataPack;
158 protoDataPack.mutable_datapackid()->set_datapackname((*data)->name());
159 protoDataPack.mutable_datapackid()->set_datapacktype((*data)->type());
160 protoDataPack.mutable_datapackid()->set_enginename((*data)->engineName());
161 protoDataPack.mutable_data()->PackFrom((*data)->getData());
163 this->
template publishMqttMsg<EngineGrpc::DataPackMessage>(&protoDataPack);
165 else if constexpr (std::is_same_v<MSG_TYPE, nlohmann::json>) {
167 jsonDataPack[
"name"] = (*data)->name();
168 jsonDataPack[
"engine_name"] = (*data)->engineName();
169 jsonDataPack[
"type"] = (*data)->type();
170 jsonDataPack[
"data"] = (*data)->getData();
172 this->
template publishMqttMsg<nlohmann::json>(&jsonDataPack);
176 "DPOutputMQTTNode \"" + this->
id() +
"\" was instantiated with a non supported datapack type");
180 template<
class MSG_TYPE>
186 bool publishFromCache =
false,
187 unsigned int computePeriod = 1) :
199 #endif //OUTPUT_MQTT_NODE_H
static void warn(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for warning level.
Definition: nrp_logger.h:149
unsigned int _computePeriod
Definition: output_edge.h:173
bool _publishFromCache
Definition: output_edge.h:172
void publishMqttMsg(const PUB_TYPE *data)
Definition: output_node.h:73
static NRPMQTTProxy & getInstance()
Get singleton instance of NRPMQTTProxy.
Definition: nrp_mqtt_proxy.cpp:26
std::string _id
Definition: output_edge.h:170
Definition: computational_node_policies.h:47
Output node used to connect an MQTT publisher to the computational graph.
Definition: output_node.h:43
void sendSingleMsg(const std::string &, const MSG_TYPE *data) override
Sends out a single msg, to be implemented by derived classes.
Definition: output_node.h:61
Definition: output_node.h:135
bool publishFromCache()
Definition: output_node.h:109
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
std::string _address
Definition: output_node.h:130
Helper class used to implement Python output edge decorators.
Definition: output_edge.h:36
OutputMQTTNode(const std::string &id, const std::string &address, bool publishFromCache=false, unsigned int computePeriod=1)
Constructor.
Definition: output_node.h:49
Implementation of an output node in the computation graph.
Definition: output_node.h:38
DPOutputMQTTNode(const std::string &id, const std::string &address, bool publishFromCache=false, unsigned int computePeriod=1)
Definition: output_node.h:141
DPOutputMQTTEdge(const std::string &keyword, const std::string &address, bool publishFromCache=false, unsigned int computePeriod=1)
Definition: output_node.h:185
bool isEmpty() const
Indicates if the datapack contains any data aside from datapack ID.
Definition: datapack_interface.cpp:74
OutputMQTTEdge(const std::string &keyword, const std::string &address, bool publishFromCache=false, unsigned int computePeriod=1)
Definition: output_node.h:117
std::string typeStr() const override
Returns the node 'type' as a string.
Definition: output_node.h:56
@ SERIES
Definition: computational_node_policies.h:51
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
Definition: nrp_mqtt_proxy.h:27
Definition: output_node.h:113
void publish(const std::string &address, const std::string &msg, bool retained=false)
Publishes 'msg' to MQTT topic 'address'.
Definition: nrp_mqtt_proxy.cpp:37
OutputMQTTNode< DataPack< MSG_TYPE > * > * makeNewNode() override
Definition: output_node.h:193
Base datapack class.
Definition: datapack.h:36
void sendSingleMsg(const std::string &, const DataPackPtr *data) override
Definition: output_node.h:149
OutputMQTTNode< MSG_TYPE > * makeNewNode() override
Definition: output_node.h:127
void sendBatchMsg(const std::string &, const std::vector< const MSG_TYPE * > &) override
Sends out a vector of msg as a single batch, to be implemented by derived classes.
Definition: output_node.h:67
PublishFormatPolicy
Defines how output nodes send stored msgs.
Definition: computational_node_policies.h:50
Definition: output_node.h:181
nlohmann::json json
Definition: engine_json_server.cpp:31