NRP Core  1.4.1
engine_json_nrp_client.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef ENGINE_JSON_NRP_CLIENT_H
23 #define ENGINE_JSON_NRP_CLIENT_H
24 
31 
32 #include <nlohmann/json.hpp>
33 #include <list>
34 
35 #include <chrono>
36 
37 #include <iostream>
38 #include <restclient-cpp/restclient.h>
39 
44 template<class ENGINE, const char* SCHEMA>
46  : public EngineClient<ENGINE, SCHEMA>
47 {
48  public:
55  : EngineClient<ENGINE, SCHEMA>(config, std::move(launcher)),
56  _serverAddress(this->engineConfig().at("ServerAddress"))
57  {
58  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
59 
61  }
62 
69  EngineJSONNRPClient(const std::string &serverAddress, nlohmann::json &config, ProcessLauncherInterface::unique_ptr &&launcher)
70  : EngineClient<ENGINE, SCHEMA>(config, std::move(launcher)),
71  _serverAddress(serverAddress)
72  {
73  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
74 
75  this->engineConfig()["ServerAddress"] = this->_serverAddress;
77  }
78 
79  virtual ~EngineJSONNRPClient() override = default;
80 
81  virtual pid_t launchEngine() override
82  {
83  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
84 
85  this->startRegistrationServer();
86 
87  // Launch engine process.
88  // enginePID == -1 means that the launcher hasn't launched a process
89  auto enginePID = this->EngineClientInterface::launchEngine();
90 
91  // Wait for engine to register itself if process launching has succeeded
92  if(enginePID > 0 && !this->engineConfig().at("RegistrationServerAddress").empty())
93  {
94  const auto serverAddr = this->waitForRegistration(20, 1);
95 
96  if(serverAddr.empty())
97  throw NRPException::logCreate("Error while waiting for engine \"" + this->engineName() + "\" to register its address. Did not receive a reply");
98 
99  this->engineConfig()["ServerAddress"] = serverAddr;
100  this->_serverAddress = serverAddr;
101  }
102 
103  return enginePID;
104  }
105 
106  virtual void sendDataPacksToEngine(const datapacks_set_t & dataPacks) override
107  {
108  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
109 
110  // TODO Early return if no datapacks?
111 
112  // Convert datapacks to JSON format
113  nlohmann::json request;
114  for(auto curDataPack : dataPacks)
115  {
116  assert(curDataPack->engineName() == this->engineName());
117 
118  if(curDataPack->isEmpty())
119  throw NRPException::logCreate("Attempt to send empty datapack " + curDataPack->name() + " to Engine " + this->engineName());
120  else if(curDataPack->type() != JsonDataPack::getType())
121  {
122  throw NRPException::logCreate("Engine \"" +
123  this->engineName() +
124  "\" cannot handle datapack type '" +
125  curDataPack->type() + "'");
126  }
127 
128  const nlohmann::json & data = (dynamic_cast<const JsonDataPack *>(curDataPack.get()))->getData();
129  const auto & name = curDataPack->name();
130 
131  request[name]["engine_name"] = curDataPack->engineName();
132  request[name]["type"] = curDataPack->type();
133  request[name]["data"] = data;
134  }
135 
136  // Send updated datapacks to Engine JSON server
137  EngineJSONNRPClient::sendRequest(this->_serverAddress + "/" + EngineJSONConfigConst::EngineServerSetDataPacksRoute.data(),
138  EngineJSONConfigConst::EngineServerContentType.data(), request.dump(),
139  "Engine server \"" + this->engineName() + "\" failed during datapack handling");
140 
141  // TODO: Check if engine has processed all sent datapacks
142  }
143 
144  virtual const std::vector<std::string> engineProcStartParams() const override
145  {
146  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
147 
148  std::vector<std::string> startParams = this->engineConfig().at("EngineProcStartParams");
149 
150  std::string name = this->engineConfig().at("EngineName");
151  startParams.push_back(std::string("--") + EngineJSONConfigConst::EngineNameArg.data() + "=" + name);
152 
153  // Add JSON Server address (will be used by EngineJSONServer)
154  std::string address = this->engineConfig().at("ServerAddress");
155  startParams.push_back(std::string("--") + EngineJSONConfigConst::EngineServerAddrArg.data() + "=" + address);
156 
157  // Add JSON registration Server address (will be used by EngineJSONServer)
158  std::string reg_address = this->engineConfig().at("RegistrationServerAddress");
159  startParams.push_back(std::string("--") + EngineJSONConfigConst::EngineRegistrationServerAddrArg.data() + "=" + reg_address);
160 
161  return startParams;
162  }
163 
164  virtual datapacks_vector_t getDataPacksFromEngine(const datapack_identifiers_set_t &requestedDataPackIds) override
165  {
166  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
167 
168  nlohmann::json request;
169  for(const auto &id : requestedDataPackIds)
170  {
171  if(this->engineName().compare(id.EngineName) == 0)
172  {
173  request[id.Name] = {{"engine_name", id.EngineName}, {"type", id.Type}};
174  }
175 
176  }
177 
178  // Post request to Engine JSON server
179  const auto resp(EngineJSONNRPClient::sendRequest(this->_serverAddress + "/" + EngineJSONConfigConst::EngineServerGetDataPacksRoute.data(),
180  EngineJSONConfigConst::EngineServerContentType.data(), request.dump(),
181  "Engine server \"" + this->engineName() + "\" failed during datapack retrieval"));
182 
183  return this->getDataPackInterfacesFromJSON(resp);
184  }
185 
186 protected:
193  {
194  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
195 
196  NRPLogger::debug("EngineJSONNRPClient::sendInitCommand [ data: {} ]", data.dump());
197 
198  // Post init request to Engine JSON server
199  return sendRequest(this->_serverAddress + "/" + EngineJSONConfigConst::EngineServerInitializeRoute.data(),
201  "Engine server \"" + this->engineName() + "\" failed during initialization");
202  }
203 
210  {
211  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
212 
213  NRPLogger::debug("EngineJSONNRPClient::sendResetCommand [ data: {} ]", data.dump());
214 
215  // Post reset request to Engine JSON server
216  return sendRequest(this->_serverAddress + "/" + EngineJSONConfigConst::EngineServerResetRoute.data(),
218  "Engine server \"" + this->engineName() + "\" failed during reset");
219  }
220 
227  {
228  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
229 
230  NRPLogger::debug("EngineJSONNRPClient::sendShutdownCommand [ data: {} ]", data.dump());
231 
232  // Post init request to Engine JSON server
233  return sendRequest(this->_serverAddress + "/" + EngineJSONConfigConst::EngineServerShutdownRoute.data(),
235  "Engine server \"" + this->engineName() + "\" failed during shutdown");
236  }
237 
244  std::string waitForRegistration(unsigned int numTries, unsigned int waitTime) const
245  {
246  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
247 
248  auto *pRegistrationServer = EngineJSONRegistrationServer::getInstance();
249 
250  assert(pRegistrationServer != nullptr);
251  assert(pRegistrationServer->isRunning());
252 
253  // Try to retrieve engine address
254  auto engineAddr = pRegistrationServer->requestEngine(this->engineName());
255 
256  while(engineAddr.empty() && numTries > 0)
257  {
258  // Continue to wait for engine address for 20s
259  sleep(waitTime);
260  engineAddr = pRegistrationServer->retrieveEngineAddress(this->engineName());
261  --numTries;
262  }
263 
264  // Close server if no additional clients are waiting for their engines to register
265  if(pRegistrationServer->getNumWaitingEngines() == 0)
266  {
268  }
269 
270  return engineAddr;
271  }
272 
273  private:
274 
275  const unsigned NUM_RETRIES_REG_SERVER = 2;
276 
280  std::string _serverAddress;
281 
290  static nlohmann::json sendRequest(const std::string &serverName, const std::string &contentType, const std::string &request, const std::string_view &exceptionMessage)
291  {
292  // Post request to Engine JSON server
293  try
294  {
295  auto resp = RestClient::post(serverName, contentType, request);
296  if(resp.code != 200)
297  {
298  throw std::domain_error("Request failed with code " + std::to_string(resp.code) +
299  " and message:" + "\n" + resp.body +
300  "Client context message: " + exceptionMessage.data());
301  }
302 
303  return nlohmann::json::parse(resp.body);
304  }
305  catch(std::exception &e)
306  {
307  throw NRPException::logCreate(e, "Communication with engine server failed");
308  }
309  }
310 
316  SimulationTime runLoopStepCallback(SimulationTime timeStep) override
317  {
318  nlohmann::json request;
319  request[EngineJSONConfigConst::EngineTimeStepName.data()] = timeStep.count();
320 
321  // Post run loop request to Engine JSON server
322  nlohmann::json resp(EngineJSONNRPClient::sendRequest(this->_serverAddress + "/" + EngineJSONConfigConst::EngineServerRunLoopStepRoute.data(),
323  EngineJSONConfigConst::EngineServerContentType.data(), request.dump(),
324  "Engine Server failed during loop execution"));
325 
326  // Get engine time from response
327  SimulationTime engineTime;
328  try
329  {
330  engineTime = SimulationTime(resp[EngineJSONConfigConst::EngineTimeName.data()]);
331  }
332  catch(std::exception &e)
333  {
334  throw NRPException::logCreate(e, "Error while parsing the return value of the run_step of \"" + this->engineName() + "\"");
335  }
336 
337  if(engineTime < SimulationTime::zero())
338  throw NRPException::logCreate("Error during execution of engine \"" + this->engineName() + "\"");
339 
340  return engineTime;
341  }
342 
348  datapacks_vector_t getDataPackInterfacesFromJSON(const nlohmann::json &datapacks) const
349  {
350  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
351 
352  datapacks_vector_t interfaces;
353 
354  for(auto curDataPackIterator = datapacks.begin(); curDataPackIterator != datapacks.end(); ++curDataPackIterator)
355  {
356  try
357  {
358  DataPackIdentifier datapackID(curDataPackIterator.key(),
359  (*curDataPackIterator)["engine_name"],
360  (*curDataPackIterator)["type"]);
361 
362  datapackID.EngineName = this->engineName();
363  interfaces.push_back(this->getSingleDataPackInterfaceFromJSON(curDataPackIterator, datapackID));
364  }
365  catch(std::exception &e)
366  {
367  // TODO: Handle json datapack parsing error
368  throw NRPException::logCreate(e, "Failed to parse JSON DataPack Interface");
369  }
370  }
371 
372  return interfaces;
373  }
374 
384  inline DataPackInterfaceConstSharedPtr getSingleDataPackInterfaceFromJSON(const nlohmann::json::const_iterator &datapackData, DataPackIdentifier &datapackID) const
385  {
386  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
387 
388  if(datapackID.Type == JsonDataPack::getType())
389  {
390  // Check whether the requested datapack has new data
391  // A datapack that has no data will contain two JSON objects with "engine_name" and "type" keys (parts of datapack ID)
392 
393  if(datapackData->size() == 2)
394  {
395  // There's no meaningful data in the datapack, so create an empty datapack with datapack ID only
396 
397  return DataPackInterfaceSharedPtr(new DataPackInterface(std::move(datapackID)));
398  }
399 
400  nlohmann::json * data = new nlohmann::json(std::move((*datapackData)["data"]));
401 
402  DataPackInterfaceSharedPtr newDataPack(new JsonDataPack(datapackID.Name, datapackID.EngineName, data));
403  newDataPack->setEngineName(this->engineName());
404 
405  return newDataPack;
406  }
407  else
408  {
409  throw NRPException::logCreate("DataPack type \"" + datapackID.Type + "\" cannot be handled by the \"" + this->engineName() + "\" engine");
410  }
411  }
412 
416  void startRegistrationServer()
417  {
418  const std::string regServerAddressConfig = this->engineConfig().at("RegistrationServerAddress");
419  const std::string regServerAddressActual = EngineJSONRegistrationServer::tryInstantiate(regServerAddressConfig, NUM_RETRIES_REG_SERVER);
420 
421  if(regServerAddressActual != regServerAddressConfig)
422  {
423  NRPLogger::warn("Registration server failed to bind to the specified address '{}'. Using '{}' instead.", regServerAddressConfig, regServerAddressActual);
424 
425  // Update the address in the config
426  // This value will be passed to the forked process
427 
428  this->engineConfig()["RegistrationServerAddress"] = regServerAddressActual;
429  }
430  }
431 };
432 
433 
434 #endif //ENGINE_JSON_NRP_CLIENT_H
engine_json_config.h
datapacks_vector_t
std::vector< std::shared_ptr< const DataPackInterface > > datapacks_vector_t
Definition: datapack_interface.h:220
EngineClientInterface::launchEngine
virtual pid_t launchEngine()
Launch the engine.
Definition: engine_client_interface.cpp:31
NRPLogger::warn
static void warn(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for warning level.
Definition: nrp_logger.h:149
datapacks_set_t
std::set< std::shared_ptr< const DataPackInterface >, DataPackPointerComparator > datapacks_set_t
Definition: datapack_interface.h:219
PtrTemplates< ProcessLauncherInterface >::unique_ptr
std::unique_ptr< ProcessLauncherInterface > unique_ptr
Definition: ptr_templates.h:34
EngineJSONNRPClient::waitForRegistration
std::string waitForRegistration(unsigned int numTries, unsigned int waitTime) const
Wait for the engine registration server to receive a call from the engine.
Definition: engine_json_nrp_client.h:244
EngineJSONConfigConst::EngineServerContentType
static constexpr std::string_view EngineServerContentType
Content Type passed between server and client.
Definition: engine_json_config.h:92
DataPackIdentifier::Type
std::string Type
DataPack Type.
Definition: datapack_interface.h:54
EngineJSONConfigConst::EngineTimeName
static constexpr std::string_view EngineTimeName
JSON name under which the runLoopStep engine time is sent.
Definition: engine_json_config.h:87
DataPackInterfaceConstSharedPtr
DataPackInterface::const_shared_ptr DataPackInterfaceConstSharedPtr
Definition: datapack_interface.h:180
JsonDataPack
DataPack< nlohmann::json > JsonDataPack
Definition: json_datapack.h:29
EngineClient::engineName
const std::string engineName() const override final
Get Engine Name.
Definition: engine_client_interface.h:266
EngineJSONRegistrationServer::tryInstantiate
static std::string tryInstantiate(const std::string &initialAddress, const unsigned numRetries)
Attempts to instantiate and start the registration server.
Definition: engine_json_registration_server.cpp:81
EngineJSONNRPClient
NRP - Gazebo Communicator on the NRP side. Converts DataPackInterface classes from/to JSON objects.
Definition: engine_json_nrp_client.h:45
EngineJSONNRPClient::sendResetCommand
nlohmann::json sendResetCommand(const nlohmann::json &data)
Send a reset command.
Definition: engine_json_nrp_client.h:209
nrp_exceptions.h
EngineJSONNRPClient::sendInitCommand
nlohmann::json sendInitCommand(const nlohmann::json &data)
Send an initialization command.
Definition: engine_json_nrp_client.h:192
EngineClient
Base class for all Engines.
Definition: engine_client_interface.h:191
EngineJSONConfigConst::EngineServerResetRoute
static constexpr std::string_view EngineServerResetRoute
REST Server Route for engine reset.
Definition: engine_json_config.h:72
EngineJSONConfigConst::EngineServerInitializeRoute
static constexpr std::string_view EngineServerInitializeRoute
REST Server Route for engine initialization.
Definition: engine_json_config.h:67
DataPack::getType
static std::string getType()
Returns type of the datapack class.
Definition: datapack.h:61
EngineJSONRegistrationServer::clearInstance
static void clearInstance()
Delete Instance.
Definition: engine_json_registration_server.cpp:74
engine_client_interface.h
EngineJSONRegistrationServer::getInstance
static EngineJSONRegistrationServer * getInstance()
Get Instance of EngineJSONRegistrationServer.
Definition: engine_json_registration_server.cpp:61
EngineClient::engineConfig
const nlohmann::json & engineConfig() const override final
Get Engine Configuration.
Definition: engine_client_interface.h:278
DataPackIdentifier::Name
std::string Name
DataPack Name. Used by simulator to identify source/sink of datapack.
Definition: datapack_interface.h:44
DataPackInterfaceSharedPtr
DataPackInterface::shared_ptr DataPackInterfaceSharedPtr
Definition: datapack_interface.h:179
datapack_identifiers_set_t
std::set< DataPackIdentifier > datapack_identifiers_set_t
Definition: datapack_interface.h:221
EngineJSONConfigConst::EngineServerSetDataPacksRoute
static constexpr std::string_view EngineServerSetDataPacksRoute
REST Server Route to which to send datapack changes.
Definition: engine_json_config.h:57
EngineJSONConfigConst::EngineRegistrationServerAddrArg
static constexpr std::string_view EngineRegistrationServerAddrArg
Parameter name that is used to pass along the server address.
Definition: engine_json_config.h:42
EngineJSONConfigConst::EngineTimeStepName
static constexpr std::string_view EngineTimeStepName
JSON name under which the runLoopStep timeStep is saved.
Definition: engine_json_config.h:82
json_datapack.h
EngineJSONNRPClient::~EngineJSONNRPClient
virtual ~EngineJSONNRPClient() override=default
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
EngineJSONNRPClient::EngineJSONNRPClient
EngineJSONNRPClient(const std::string &serverAddress, nlohmann::json &config, ProcessLauncherInterface::unique_ptr &&launcher)
Constructor.
Definition: engine_json_nrp_client.h:69
EngineJSONConfigConst::EngineServerAddrArg
static constexpr std::string_view EngineServerAddrArg
Parameter name that is used to pass along the server address.
Definition: engine_json_config.h:37
EngineJSONConfigConst::EngineServerRunLoopStepRoute
static constexpr std::string_view EngineServerRunLoopStepRoute
REST Server Route to execute a single loop.
Definition: engine_json_config.h:62
EngineJSONNRPClient::sendShutdownCommand
nlohmann::json sendShutdownCommand(const nlohmann::json &data)
Send a shutdown command.
Definition: engine_json_nrp_client.h:226
restclient_setup.h
DataPackIdentifier
Identifies a single datapack.
Definition: datapack_interface.h:38
EngineJSONConfigConst::EngineServerShutdownRoute
static constexpr std::string_view EngineServerShutdownRoute
REST Server Route for engine shutdown.
Definition: engine_json_config.h:77
DataPack
Base datapack class.
Definition: datapack.h:36
DataPackInterface
Interface to datapacks.
Definition: datapack_interface.h:89
EngineJSONConfigConst::EngineServerGetDataPacksRoute
static constexpr std::string_view EngineServerGetDataPacksRoute
REST Server Route from which to get datapack information.
Definition: engine_json_config.h:52
EngineJSONConfigConst::EngineNameArg
static constexpr std::string_view EngineNameArg
Parameter name that is used to pass along the engine name.
Definition: engine_json_config.h:47
EngineJSONNRPClient::EngineJSONNRPClient
EngineJSONNRPClient(nlohmann::json &config, ProcessLauncherInterface::unique_ptr &&launcher)
Constructor.
Definition: engine_json_nrp_client.h:54
EngineJSONNRPClient::getDataPacksFromEngine
virtual datapacks_vector_t getDataPacksFromEngine(const datapack_identifiers_set_t &requestedDataPackIds) override
Gets requested datapacks from engine.
Definition: engine_json_nrp_client.h:164
NRPLogger::debug
static void debug(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for debug level.
Definition: nrp_logger.h:127
SimulationTime
std::chrono::nanoseconds SimulationTime
Definition: time_utils.h:31
engine_json_registration_server.h
DataPackIdentifier::EngineName
std::string EngineName
Corresponding engine.
Definition: datapack_interface.h:49
EngineJSONNRPClient::launchEngine
virtual pid_t launchEngine() override
Launch the engine.
Definition: engine_json_nrp_client.h:81
EngineJSONNRPClient::engineProcStartParams
virtual const std::vector< std::string > engineProcStartParams() const override
Get all Engine Process Startup parameters.
Definition: engine_json_nrp_client.h:144
RestClientSetup::ensureInstance
static RestClientSetup * ensureInstance()
Ensure that RestClientSetup has been initialized.
Definition: restclient_setup.cpp:39
NRP_LOGGER_TRACE
#define NRP_LOGGER_TRACE(...)
trace log macro. It is voided by defining \PRODUCTION_RELEASE
Definition: nrp_logger.h:39
json
nlohmann::json json
Definition: engine_json_server.cpp:31
EngineJSONNRPClient::sendDataPacksToEngine
virtual void sendDataPacksToEngine(const datapacks_set_t &dataPacks) override
Sends datapacks to engine.
Definition: engine_json_nrp_client.h:106