NRP Core  1.4.1
engine_grpc_client.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef ENGINE_GRPC_CLIENT_H
23 #define ENGINE_GRPC_CLIENT_H
24 
25 #include <future>
26 
27 #include <grpcpp/grpcpp.h>
28 #include <grpcpp/support/time.h>
29 #include <nlohmann/json.hpp>
30 
32 #include "nrp_general_library/config/cmake_constants.h"
35 #include "nrp_protobuf/engine_grpc.grpc.pb.h"
36 #include "nrp_protobuf/config/cmake_constants.h"
39 #include <pistache/router.h>
40 
43 
44 template<class ENGINE, const char* SCHEMA>
46  : public EngineClient<ENGINE, SCHEMA>
47 {
48  void prepareRpcContext(grpc::ClientContext * context)
49  {
50  // Let client wait for server ready
51  // TODO It happens that gRPC call is performed before the server is fully initialized.
52  // This line was supposed to fix it, but it's breaking some of the tests. The issue will be addressed in NRRPLT-8187.
53  // context->set_wait_for_ready(true);
54 
55  // Set RPC timeout (in absolute time), if it has been specified by the user
56 
57  if(this->_rpcTimeout > SimulationTime::zero())
58  {
59  context->set_deadline(std::chrono::system_clock::now() + this->_rpcTimeout);
60  }
61  }
62 
63  public:
64 
66  : EngineClient<ENGINE, SCHEMA>(config, std::move(launcher))
67  {
68 
69  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
70 
71  this->template setDefaultProperty<std::vector<std::string>>("ProtobufPackages", std::vector<std::string>());
72  this->template setDefaultProperty<std::string>("ProtobufPluginsPath", NRP_PLUGIN_INSTALL_DIR);
73 
74  // Timeouts of less than 1ms will be rounded up to 1ms
75 
76  SimulationTime timeout = toSimulationTime<float, std::ratio<1>>(this->engineConfig().at("EngineCommandTimeout"));
77 
78  if(timeout != SimulationTime::zero())
79  {
80  this->_rpcTimeout = (timeout > std::chrono::milliseconds(1)) ? timeout : std::chrono::milliseconds(1);
81  }
82  else
83  {
84  this->_rpcTimeout = SimulationTime::zero();
85  }
86 
87  this->validateServerAddress();
88 
89  this->_serverAddress = this->engineConfig().at("ServerAddress");
90  _channel = grpc::CreateChannel(_serverAddress, grpc::InsecureChannelCredentials());
91  _stub = EngineGrpc::EngineGrpcService::NewStub(_channel);
92 
93  ProtoOpsManager::getInstance().addPluginPath(this->engineConfig().at("ProtobufPluginsPath"));
94  for(const auto& packageName : this->engineConfig().at("ProtobufPackages")) {
95  auto packageNameStr = packageName.template get<std::string>();
96  _protoOpsStr += packageNameStr + ",";
97  std::stringstream pluginLibName;
98  pluginLibName << "lib" << NRP_PROTO_OPS_LIB_PREFIX << packageNameStr <<
99  NRP_PROTO_OPS_LIB_SUFIX << ".so";
100  auto pluginLib = ProtoOpsManager::getInstance().loadProtobufPlugin(pluginLibName.str());
101  if(pluginLib)
102  _protoOps.template emplace_back(std::move(pluginLib));
103  else
104  throw NRPException::logCreate("Failed to load ProtobufPackage \""+packageNameStr+"\"");
105  }
106 
107  if(!_protoOpsStr.empty())
108  _protoOpsStr.pop_back();
109  }
110 
111  virtual pid_t launchEngine() override
112  {
113  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
114 
115  // Launch engine process.
116  // enginePID == -1 means that the launcher hasn't launched a process
117  auto enginePID = this->EngineClientInterface::launchEngine();
118 
119  this->connectToServer();
120 
121  return enginePID;
122  }
123 
125  {
126  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
127 
128  if(!_channel->WaitForConnected(gpr_time_add(
129  gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(20, GPR_TIMESPAN))))
130  {
131  throw NRPException::logCreate("Timeout while connecting to engine server (" + this->engineName() + ")");
132  }
133  }
134 
136  {
137  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
138 
139  EngineGrpc::InitializeRequest request;
140  EngineGrpc::InitializeReply reply;
141  grpc::ClientContext context;
142 
143  prepareRpcContext(&context);
144 
145  request.set_json(data.dump());
146 
147  NRPLogger::debug("Sending init command to server [ {} ]", this->engineName());
148  grpc::Status status = _stub->initialize(&context, request, &reply);
149 
150  if(!status.ok())
151  {
152  const auto errMsg = "Engine server initialization failed: " + status.error_message() + " (" + std::to_string(status.error_code()) + ")";
153  throw std::runtime_error(errMsg);
154  }
155  }
156 
158  {
159  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
160  EngineGrpc::ResetRequest request;
161  EngineGrpc::ResetReply reply;
162  grpc::ClientContext context;
163 
164  prepareRpcContext(&context);
165 
166  NRPLogger::debug("Sending reset command to server [ {} ]", this->engineName());
167  grpc::Status status = _stub->reset(&context, request, &reply);
168 
169  if(!status.ok())
170  {
171  const auto errMsg = "Engine server reset failed: " + status.error_message() + " (" + std::to_string(status.error_code()) + ")";
172  throw std::runtime_error(errMsg);
173  }
174 
175  resetEngineTime();
176  }
177 
179  {
180  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
181 
182  EngineGrpc::ShutdownRequest request;
183  EngineGrpc::ShutdownReply reply;
184  grpc::ClientContext context;
185 
186  prepareRpcContext(&context);
187 
188  request.set_json(data.dump());
189 
190  NRPLogger::debug("Sending shutdown command to server [ {} ]", this->engineName());
191  grpc::Status status = _stub->shutdown(&context, request, &reply);
192 
193  if(!status.ok())
194  {
195  const auto errMsg = "Engine server shutdown failed: " + status.error_message() + " (" + std::to_string(status.error_code()) + ")";
196  throw std::runtime_error(errMsg);
197  }
198  }
199 
201  {
202  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
203 
204  EngineGrpc::RunLoopStepRequest request;
205  EngineGrpc::RunLoopStepReply reply;
206  grpc::ClientContext context;
207 
208  prepareRpcContext(&context);
209 
210  request.set_timestep(timeStep.count());
211 
212  grpc::Status status = _stub->runLoopStep(&context, request, &reply);
213 
214  if(!status.ok())
215  {
216  const auto errMsg = "Engine server runLoopStep failed: " + status.error_message() + " (" + std::to_string(status.error_code()) + ")";
217  throw std::runtime_error(errMsg);
218  }
219 
220  const SimulationTime engineTime(reply.enginetime());
221 
222  if(engineTime < SimulationTime::zero())
223  {
224  const auto errMsg = "Invalid engine time (should be greater than 0): " + std::to_string(engineTime.count());
225  throw std::runtime_error(errMsg);
226  }
227 
228  if(engineTime < this->_prevEngineTime)
229  {
230  const auto errMsg = "Invalid engine time (should be greater than previous time): "
231  + std::to_string(engineTime.count())
232  + ", previous: "
233  + std::to_string(this->_prevEngineTime.count());
234 
235  throw std::runtime_error(errMsg);
236  }
237 
238  this->_prevEngineTime = engineTime;
239 
240  return engineTime;
241  }
242 
243  virtual datapacks_vector_t getDataPacksFromEngine(const datapack_identifiers_set_t &requestedDataPackIds) override
244  {
245  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
246 
247  EngineGrpc::GetDataPacksRequest request;
248  EngineGrpc::GetDataPacksReply reply;
249  grpc::ClientContext context;
250 
251  for(const auto & requestedId: requestedDataPackIds)
252  {
253  if(this->engineName().compare(requestedId.EngineName) == 0)
254  {
255  auto dataPackId = request.add_datapackids();
256 
257  dataPackId->set_datapackname(requestedId.Name);
258  dataPackId->set_datapacktype(requestedId.Type);
259  dataPackId->set_enginename(requestedId.EngineName);
260  }
261  }
262 
263  grpc::Status status = _stub->getDataPacks(&context, request, &reply);
264 
265  if(!status.ok())
266  {
267  const auto errMsg = "In Engine \"" + this->engineName() + "\", getDataPacksFromEngine failed: " + status.error_message() + " (" + std::to_string(status.error_code()) + ")";
268  throw std::runtime_error(errMsg);
269  }
270 
271  datapacks_vector_t interfaces;
272  for(int i = 0; i < reply.datapacks_size(); i++) {
273  auto datapackData = reply.datapacks(i);
275 
276  for(auto& mod : _protoOps) {
277  datapack = mod->getDataPackInterfaceFromMessage(this->engineName(), datapackData);
278 
279  if(datapack != nullptr)
280  break;
281  }
282 
283  if(datapack)
284  interfaces.push_back(datapack);
285  else
286  throw NRPException::logCreate("In Engine \"" + this->engineName() + "\", unable to deserialize datapack \"" +
287  datapackData.datapackid().datapackname() + "\" using any of the NRP-Core Protobuf plugins specified in the"
288  " engine configuration: [" + _protoOpsStr + "]. Ensure that the parameter "
289  "\"ProtobufPackages\" is properly set in the Engine configuration");
290  }
291 
292  return interfaces;
293  }
294 
295  virtual void sendDataPacksToEngine(const datapacks_set_t &dataPacks) override
296  {
297  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
298 
299  EngineGrpc::SetDataPacksRequest request;
300  EngineGrpc::SetDataPacksReply reply;
301  grpc::ClientContext context;
302 
303  prepareRpcContext(&context);
304 
305  for(const auto &datapack : dataPacks)
306  {
307  assert(datapack->engineName() == this->engineName());
308 
309  if(datapack->isEmpty())
310  throw NRPException::logCreate("Attempt to send empty datapack " + datapack->name() + " to Engine " + this->engineName());
311  else {
312  auto protoDataPack = request.add_datapacks();
313 
314  bool isSet = false;
315  for(auto& mod : _protoOps) {
316  try {
317  mod->setDataPackMessageFromInterface(*datapack, protoDataPack);
318  isSet = true;
319  }
320  catch (NRPException &) {
321  // this just means that the module couldn't process the request, try with the next one
322  }
323  }
324 
325  if(!isSet)
326  throw NRPException::logCreate("In Engine \"" + this->engineName() + "\", unable to serialize datapack \"" +
327  datapack->name() + "\" using any of the NRP-Core Protobuf plugins specified in the"
328  " engine configuration: [" + _protoOpsStr + "]. Ensure that the parameter "
329  "\"ProtobufPackages\" is properly set in the Engine configuration");
330  }
331  }
332 
333  grpc::Status status = _stub->setDataPacks(&context, request, &reply);
334 
335  if(!status.ok())
336  {
337  const auto errMsg = "In Engine \"" + this->engineName() + "\", sendDataPacksToEngine failed: " + status.error_message() + " (" + std::to_string(status.error_code()) + ")";
338  throw std::runtime_error(errMsg);
339  }
340  }
341 
342  virtual const std::vector<std::string> engineProcStartParams() const override
343  {
344  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
345 
346  std::vector<std::string> startParams = this->engineConfig().at("EngineProcStartParams");
347 
348  std::string name = this->engineConfig().at("EngineName");
349  startParams.push_back(std::string("--") + EngineGRPCConfigConst::EngineNameArg.data() + "=" + name);
350 
351  // Add Server address (will be used by EngineGrpcServer)
352  std::string address = this->engineConfig().at("ServerAddress");
353  startParams.push_back(std::string("--") + EngineGRPCConfigConst::EngineServerAddrArg.data() + "=" + address);
354 
355  // Information needed to load protobuf plugins
356  startParams.push_back(std::string("--") + EngineGRPCConfigConst::ProtobufPluginsPathArg.data() + "=" +
357  this->engineConfig().at("ProtobufPluginsPath").template get<std::string>());
358 
359  startParams.push_back(std::string("--") + EngineGRPCConfigConst::ProtobufPluginsArg.data() + "=" +
360  this->engineConfig().at("ProtobufPackages").dump());
361 
362 
363  return startParams;
364  }
365 
366  std::string tryBind(const std::string & address)
367  {
368  Pistache::Address addressParse(address);
369 
370  try
371  {
372  bindToAddress(addressParse.host(), addressParse.port());
373  }
374  catch(const std::exception & e)
375  {
376  // can't bind, ask the OS for a (free) port
377  const uint16_t newPort = getFreePort(addressParse.host());
378 
379  // Returns the address using the new port and the previous host
380  return addressParse.host() + ":" + std::to_string(newPort);
381  }
382 
383  return address;
384  }
385 
390  {
391  const std::string ServerAddressConfig = this->engineConfig().at("ServerAddress");
392  const std::string ServerAddressActual = tryBind(ServerAddressConfig);
393 
394  if(ServerAddressActual != ServerAddressConfig)
395  {
396  NRPLogger::info("Engine {} could not bind to the address specified in the engine configuration '{}'. Using '{}' instead.", this->engineName(), ServerAddressConfig, ServerAddressActual);
397 
398  // Update the address in the config
399  // This value will be passed to the Engine launcher
400  this->engineConfig().at("ServerAddress") = ServerAddressActual;
401  }
402  }
403 
407  const std::string serverAddress() const
408  {
409  return this->_serverAddress;
410  }
411 
412  protected:
413 
414  void resetEngineTime() override
415  {
417  this->_prevEngineTime = SimulationTime::zero();
418  }
419 
420  private:
421 
422  std::shared_ptr<grpc::Channel> _channel;
423  std::unique_ptr<EngineGrpc::EngineGrpcService::Stub> _stub;
424  std::string _serverAddress;
425 
426  SimulationTime _prevEngineTime = SimulationTime::zero();
427  SimulationTime _rpcTimeout = SimulationTime::zero();
428 
429  std::vector<std::unique_ptr<protobuf_ops::NRPProtobufOpsIface>> _protoOps;
430  std::string _protoOpsStr = "";
431 };
432 
433 
434 #endif // ENGINE_GRPC_CLIENT_H
435 
436 // EOF
NRPException
Base NRPException class.
Definition: nrp_exceptions.h:36
EngineGrpcClient::engineProcStartParams
virtual const std::vector< std::string > engineProcStartParams() const override
Get all Engine Process Startup parameters.
Definition: engine_grpc_client.h:342
datapacks_vector_t
std::vector< std::shared_ptr< const DataPackInterface > > datapacks_vector_t
Definition: datapack_interface.h:220
EngineClientInterface::launchEngine
virtual pid_t launchEngine()
Launch the engine.
Definition: engine_client_interface.cpp:31
EngineGrpcClient
Definition: engine_grpc_client.h:45
datapacks_set_t
std::set< std::shared_ptr< const DataPackInterface >, DataPackPointerComparator > datapacks_set_t
Definition: datapack_interface.h:219
EngineGrpcClient::launchEngine
virtual pid_t launchEngine() override
Launch the engine.
Definition: engine_grpc_client.h:111
PtrTemplates< ProcessLauncherInterface >::unique_ptr
std::unique_ptr< ProcessLauncherInterface > unique_ptr
Definition: ptr_templates.h:34
EngineGrpcClient::resetEngineTime
void resetEngineTime() override
Definition: engine_grpc_client.h:414
DataPackInterfaceConstSharedPtr
DataPackInterface::const_shared_ptr DataPackInterfaceConstSharedPtr
Definition: datapack_interface.h:180
EngineGrpcClient::getDataPacksFromEngine
virtual datapacks_vector_t getDataPacksFromEngine(const datapack_identifiers_set_t &requestedDataPackIds) override
Gets requested datapacks from engine.
Definition: engine_grpc_client.h:243
engine_grpc_config.h
EngineClient::engineName
const std::string engineName() const override final
Get Engine Name.
Definition: engine_client_interface.h:266
EngineGrpcClient::sendShutdownCommand
void sendShutdownCommand(const nlohmann::json &data)
Definition: engine_grpc_client.h:178
EngineGrpcClient::EngineGrpcClient
EngineGrpcClient(nlohmann::json &config, ProcessLauncherInterface::unique_ptr &&launcher)
Definition: engine_grpc_client.h:65
EngineGrpcClient::sendInitializeCommand
void sendInitializeCommand(const nlohmann::json &data)
Definition: engine_grpc_client.h:135
EngineClient
Base class for all Engines.
Definition: engine_client_interface.h:191
engine_client_interface.h
EngineClient::engineConfig
const nlohmann::json & engineConfig() const override final
Get Engine Configuration.
Definition: engine_client_interface.h:278
NRPLogger::info
static void info(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for info level.
Definition: nrp_logger.h:138
utils.h
getFreePort
int getFreePort(std::string hostIpv4)
Returns a free port number.
Definition: utils.h:94
EngineGRPCConfigConst::ProtobufPluginsArg
static constexpr std::string_view ProtobufPluginsArg
Parameter name that is used to pass the list of protobuf plugins.
Definition: engine_grpc_config.h:47
datapack_identifiers_set_t
std::set< DataPackIdentifier > datapack_identifiers_set_t
Definition: datapack_interface.h:221
ProtoOpsManager::loadProtobufPlugin
std::unique_ptr< protobuf_ops::NRPProtobufOpsIface > loadProtobufPlugin(const std::string &pluginLibFile)
Load a Protobuf conversion plugin from a given library.
Definition: proto_ops_manager.cpp:35
protobuf_ops.h
PluginManager::addPluginPath
void addPluginPath(const std::string &pluginPath)
Adds search path under which to look for plugins.
Definition: plugin_manager.cpp:88
EngineGRPCConfigConst::EngineNameArg
static constexpr std::string_view EngineNameArg
Parameter name that is used to pass along the engine name.
Definition: engine_grpc_config.h:37
EngineClient::resetEngineTime
virtual void resetEngineTime()
Definition: engine_client_interface.h:359
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
datapack.h
EngineGRPCConfigConst::EngineServerAddrArg
static constexpr std::string_view EngineServerAddrArg
Parameter name that is used to pass along the server address.
Definition: engine_grpc_config.h:32
json_schema_utils.h
bindToAddress
void bindToAddress(std::string hostIpv4, int port)
Attempts to bind to a given address.
Definition: utils.h:84
EngineGrpcClient::runLoopStepCallback
SimulationTime runLoopStepCallback(const SimulationTime timeStep) override
Executes a single loop step.
Definition: engine_grpc_client.h:200
EngineGrpcClient::sendDataPacksToEngine
virtual void sendDataPacksToEngine(const datapacks_set_t &dataPacks) override
Sends datapacks to engine.
Definition: engine_grpc_client.h:295
EngineGRPCConfigConst::ProtobufPluginsPathArg
static constexpr std::string_view ProtobufPluginsPathArg
Parameter name that is used to pass the protobuf plugins path.
Definition: engine_grpc_config.h:42
EngineGrpcClient::serverAddress
const std::string serverAddress() const
Returns the address used by the gRPC server.
Definition: engine_grpc_client.h:407
ProtoOpsManager::getInstance
static ProtoOpsManager & getInstance()
Get singleton instance of ProtoOpsManager.
Definition: proto_ops_manager.cpp:55
EngineGrpcClient::tryBind
std::string tryBind(const std::string &address)
Definition: engine_grpc_client.h:366
EngineGrpcClient::validateServerAddress
void validateServerAddress()
Validates if server address is already in use.
Definition: engine_grpc_client.h:389
EngineGrpcClient::connectToServer
void connectToServer()
Definition: engine_grpc_client.h:124
EngineGrpcClient::sendResetCommand
void sendResetCommand()
Definition: engine_grpc_client.h:157
NRPLogger::debug
static void debug(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for debug level.
Definition: nrp_logger.h:127
SimulationTime
std::chrono::nanoseconds SimulationTime
Definition: time_utils.h:31
proto_ops_manager.h
NRP_LOGGER_TRACE
#define NRP_LOGGER_TRACE(...)
trace log macro. It is voided by defining \PRODUCTION_RELEASE
Definition: nrp_logger.h:39
json
nlohmann::json json
Definition: engine_json_server.cpp:31