22 #ifndef ENGINE_GRPC_CLIENT_H
23 #define ENGINE_GRPC_CLIENT_H
27 #include <grpcpp/grpcpp.h>
28 #include <grpcpp/support/time.h>
29 #include <nlohmann/json.hpp>
32 #include "nrp_general_library/config/cmake_constants.h"
35 #include "nrp_protobuf/engine_grpc.grpc.pb.h"
36 #include "nrp_protobuf/config/cmake_constants.h"
39 #include <pistache/router.h>
44 template<
class ENGINE, const
char* SCHEMA>
48 void prepareRpcContext(grpc::ClientContext * context)
57 if(this->_rpcTimeout > SimulationTime::zero())
59 context->set_deadline(std::chrono::system_clock::now() + this->_rpcTimeout);
66 :
EngineClient<ENGINE, SCHEMA>(config, std::move(launcher))
71 this->
template setDefaultProperty<std::vector<std::string>>(
"ProtobufPackages", std::vector<std::string>());
72 this->
template setDefaultProperty<std::string>(
"ProtobufPluginsPath", NRP_PLUGIN_INSTALL_DIR);
78 if(timeout != SimulationTime::zero())
80 this->_rpcTimeout = (timeout > std::chrono::milliseconds(1)) ? timeout : std::chrono::milliseconds(1);
84 this->_rpcTimeout = SimulationTime::zero();
89 this->_serverAddress = this->
engineConfig().at(
"ServerAddress");
90 _channel = grpc::CreateChannel(_serverAddress, grpc::InsecureChannelCredentials());
91 _stub = EngineGrpc::EngineGrpcService::NewStub(_channel);
94 for(
const auto& packageName : this->
engineConfig().at(
"ProtobufPackages")) {
95 auto packageNameStr = packageName.template get<std::string>();
96 _protoOpsStr += packageNameStr +
",";
97 std::stringstream pluginLibName;
98 pluginLibName <<
"lib" << NRP_PROTO_OPS_LIB_PREFIX << packageNameStr <<
99 NRP_PROTO_OPS_LIB_SUFIX <<
".so";
102 _protoOps.template emplace_back(std::move(pluginLib));
107 if(!_protoOpsStr.empty())
108 _protoOpsStr.pop_back();
128 if(!_channel->WaitForConnected(gpr_time_add(
129 gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(20, GPR_TIMESPAN))))
139 EngineGrpc::InitializeRequest request;
140 EngineGrpc::InitializeReply reply;
141 grpc::ClientContext context;
143 prepareRpcContext(&context);
145 request.set_json(data.dump());
148 grpc::Status status = _stub->initialize(&context, request, &reply);
152 const auto errMsg =
"Engine server initialization failed: " + status.error_message() +
" (" + std::to_string(status.error_code()) +
")";
153 throw std::runtime_error(errMsg);
160 EngineGrpc::ResetRequest request;
161 EngineGrpc::ResetReply reply;
162 grpc::ClientContext context;
164 prepareRpcContext(&context);
167 grpc::Status status = _stub->reset(&context, request, &reply);
171 const auto errMsg =
"Engine server reset failed: " + status.error_message() +
" (" + std::to_string(status.error_code()) +
")";
172 throw std::runtime_error(errMsg);
182 EngineGrpc::ShutdownRequest request;
183 EngineGrpc::ShutdownReply reply;
184 grpc::ClientContext context;
186 prepareRpcContext(&context);
188 request.set_json(data.dump());
191 grpc::Status status = _stub->shutdown(&context, request, &reply);
195 const auto errMsg =
"Engine server shutdown failed: " + status.error_message() +
" (" + std::to_string(status.error_code()) +
")";
196 throw std::runtime_error(errMsg);
204 EngineGrpc::RunLoopStepRequest request;
205 EngineGrpc::RunLoopStepReply reply;
206 grpc::ClientContext context;
208 prepareRpcContext(&context);
210 request.set_timestep(timeStep.count());
212 grpc::Status status = _stub->runLoopStep(&context, request, &reply);
216 const auto errMsg =
"Engine server runLoopStep failed: " + status.error_message() +
" (" + std::to_string(status.error_code()) +
")";
217 throw std::runtime_error(errMsg);
222 if(engineTime < SimulationTime::zero())
224 const auto errMsg =
"Invalid engine time (should be greater than 0): " + std::to_string(engineTime.count());
225 throw std::runtime_error(errMsg);
228 if(engineTime < this->_prevEngineTime)
230 const auto errMsg =
"Invalid engine time (should be greater than previous time): "
231 + std::to_string(engineTime.count())
233 + std::to_string(this->_prevEngineTime.count());
235 throw std::runtime_error(errMsg);
238 this->_prevEngineTime = engineTime;
247 EngineGrpc::GetDataPacksRequest request;
248 EngineGrpc::GetDataPacksReply reply;
249 grpc::ClientContext context;
251 for(
const auto & requestedId: requestedDataPackIds)
253 if(this->
engineName().compare(requestedId.EngineName) == 0)
255 auto dataPackId = request.add_datapackids();
257 dataPackId->set_datapackname(requestedId.Name);
258 dataPackId->set_datapacktype(requestedId.Type);
259 dataPackId->set_enginename(requestedId.EngineName);
263 grpc::Status status = _stub->getDataPacks(&context, request, &reply);
267 const auto errMsg =
"In Engine \"" + this->
engineName() +
"\", getDataPacksFromEngine failed: " + status.error_message() +
" (" + std::to_string(status.error_code()) +
")";
268 throw std::runtime_error(errMsg);
272 for(
int i = 0; i < reply.datapacks_size(); i++) {
273 auto datapackData = reply.datapacks(i);
276 for(
auto& mod : _protoOps) {
277 datapack = mod->getDataPackInterfaceFromMessage(this->
engineName(), datapackData);
279 if(datapack !=
nullptr)
284 interfaces.push_back(datapack);
287 datapackData.datapackid().datapackname() +
"\" using any of the NRP-Core Protobuf plugins specified in the"
288 " engine configuration: [" + _protoOpsStr +
"]. Ensure that the parameter "
289 "\"ProtobufPackages\" is properly set in the Engine configuration");
299 EngineGrpc::SetDataPacksRequest request;
300 EngineGrpc::SetDataPacksReply reply;
301 grpc::ClientContext context;
303 prepareRpcContext(&context);
305 for(
const auto &datapack : dataPacks)
307 assert(datapack->engineName() == this->engineName());
309 if(datapack->isEmpty())
310 throw NRPException::logCreate(
"Attempt to send empty datapack " + datapack->name() +
" to Engine " + this->engineName());
312 auto protoDataPack = request.add_datapacks();
315 for(
auto& mod : _protoOps) {
317 mod->setDataPackMessageFromInterface(*datapack, protoDataPack);
327 datapack->name() +
"\" using any of the NRP-Core Protobuf plugins specified in the"
328 " engine configuration: [" + _protoOpsStr +
"]. Ensure that the parameter "
329 "\"ProtobufPackages\" is properly set in the Engine configuration");
333 grpc::Status status = _stub->setDataPacks(&context, request, &reply);
337 const auto errMsg =
"In Engine \"" + this->
engineName() +
"\", sendDataPacksToEngine failed: " + status.error_message() +
" (" + std::to_string(status.error_code()) +
")";
338 throw std::runtime_error(errMsg);
346 std::vector<std::string> startParams = this->
engineConfig().at(
"EngineProcStartParams");
348 std::string name = this->
engineConfig().at(
"EngineName");
352 std::string address = this->
engineConfig().at(
"ServerAddress");
357 this->
engineConfig().at(
"ProtobufPluginsPath").
template get<std::string>());
366 std::string
tryBind(
const std::string & address)
368 Pistache::Address addressParse(address);
374 catch(
const std::exception & e)
377 const uint16_t newPort =
getFreePort(addressParse.host());
380 return addressParse.host() +
":" + std::to_string(newPort);
391 const std::string ServerAddressConfig = this->
engineConfig().at(
"ServerAddress");
392 const std::string ServerAddressActual =
tryBind(ServerAddressConfig);
394 if(ServerAddressActual != ServerAddressConfig)
396 NRPLogger::info(
"Engine {} could not bind to the address specified in the engine configuration '{}'. Using '{}' instead.", this->
engineName(), ServerAddressConfig, ServerAddressActual);
400 this->
engineConfig().at(
"ServerAddress") = ServerAddressActual;
409 return this->_serverAddress;
417 this->_prevEngineTime = SimulationTime::zero();
422 std::shared_ptr<grpc::Channel> _channel;
423 std::unique_ptr<EngineGrpc::EngineGrpcService::Stub> _stub;
424 std::string _serverAddress;
429 std::vector<std::unique_ptr<protobuf_ops::NRPProtobufOpsIface>> _protoOps;
430 std::string _protoOpsStr =
"";
434 #endif // ENGINE_GRPC_CLIENT_H