NRP Core  1.4.1
engine_grpc_server.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef ENGINE_GRPC_SERVER_H
23 #define ENGINE_GRPC_SERVER_H
24 
25 #include <string>
26 #include <map>
27 #include <type_traits>
28 
29 #include <grpcpp/grpcpp.h>
30 #include <grpcpp/health_check_service_interface.h>
31 #include <nlohmann/json.hpp>
32 
33 #include "nrp_protobuf/engine_grpc.grpc.pb.h"
36 #include "nrp_protobuf/config/cmake_constants.h"
40 
42 
43 
44 using EngineGrpc::EngineGrpcService;
45 
54 class EngineGrpcServer : public EngineGrpcService::Service
55 {
56  public:
57 
58  using mutex_t = std::timed_mutex;
59  using lock_t = std::unique_lock<EngineGrpcServer::mutex_t>;
60 
64  EngineGrpcServer() = delete;
65 
72  EngineGrpcServer(const std::string serverAddress, EngineProtoWrapper* engineWrapper)
73  : _engineWrapper(engineWrapper)
74  {
75  this->_serverAddress = serverAddress;
76  this->_isServerRunning = false;
77 
78  grpc::EnableDefaultHealthCheckService(true);
79  }
80 
85  {
86  this->shutdownServer();
87  }
88 
92  void startServer()
93  {
94  if(!this->_isServerRunning)
95  {
96  grpc::ServerBuilder builder;
97  builder.AddListeningPort(_serverAddress, grpc::InsecureServerCredentials());
98  builder.RegisterService(this);
99  NRPLogger::debug("Using server address: "+ this->_serverAddress);
100 
101  this->_server = builder.BuildAndStart();
102  // TODO Should we use a memory barrier here?
103  this->_isServerRunning = true;
104  }
105  }
106 
111  {
112  this->startServer();
113  }
114 
119  {
120  if(this->_isServerRunning)
121  {
122  this->_server->Shutdown();
123  // TODO Should we use a memory barrier here?
124  this->_isServerRunning = false;
125  }
126  }
127 
131  virtual bool initRunFlag() const { return this->_engineWrapper->initRunFlag(); }
132 
136  virtual bool shutdownFlag() const { return this->_engineWrapper->shutdownFlag(); }
137 
141  bool isServerRunning() const
142  {
143  return this->_isServerRunning;
144  }
145 
149  const std::string serverAddress() const
150  {
151  return this->_serverAddress;
152  }
153 
154  private:
155 
156  mutex_t _engineCallLock;
157 
161  std::string _serverAddress;
162 
166  bool _isServerRunning;
167 
171  std::unique_ptr<grpc::Server> _server;
172 
176  std::unique_ptr<EngineProtoWrapper> _engineWrapper;
177 
191  grpc::Status initialize( grpc::ServerContext * /*context*/,
192  const EngineGrpc::InitializeRequest * request,
193  EngineGrpc::InitializeReply * /*reply*/) override
194  {
195  try
196  {
197  EngineGrpcServer::lock_t lock(this->_engineCallLock);
198 
199  nlohmann::json requestJson = nlohmann::json::parse(request->json());
200 
201  // Run engine-specific initialization function
202 
203  this->_engineWrapper->initialize(requestJson);
204  }
205  catch(const std::exception &e)
206  {
207  return handleGrpcError("Error while executing initialization", e.what());
208  }
209  NRPLogger::debug("init command returns OK");
210  return grpc::Status::OK;
211  }
212 
213 
227  grpc::Status reset( grpc::ServerContext * /*context*/,
228  const EngineGrpc::ResetRequest * /*request*/,
229  EngineGrpc::ResetReply * /*reply*/) override
230  {
231  NRP_LOGGER_TRACE("{} called", __FUNCTION__);
232  try
233  {
234  EngineGrpcServer::lock_t lock(this->_engineCallLock);
235 
236  // Run engine-specific reset function
237  this->_engineWrapper->reset();
238  }
239  catch(const std::exception &e)
240  {
241  return handleGrpcError("Error while executing initialization", e.what());
242  }
243 
244  NRPLogger::debug("reset command returns OK");
245  return grpc::Status::OK;
246  }
247 
261  grpc::Status shutdown( grpc::ServerContext * /*context*/,
262  const EngineGrpc::ShutdownRequest * request,
263  EngineGrpc::ShutdownReply * /*reply*/) override
264  {
265  try
266  {
267  EngineGrpcServer::lock_t lock(this->_engineCallLock);
268 
269  // TODO: we don't have any instance where json data is actually sent to an engine for shutdown,
270  // it can be removed from EngineGrpc::ShutdownRequest definition
271  nlohmann::json requestJson = nlohmann::json::parse(request->json());
272 
273  // Run engine-specifi shutdown function
274 
275  this->_engineWrapper->shutdown();
276  }
277  catch(const std::exception &e)
278  {
279  return handleGrpcError("Error while executing shutdown", e.what());
280  }
281 
282  NRPLogger::debug("shutdown command returns OK");
283  return grpc::Status::OK;
284  }
285 
299  grpc::Status runLoopStep( grpc::ServerContext * /*context*/,
300  const EngineGrpc::RunLoopStepRequest * request,
301  EngineGrpc::RunLoopStepReply * reply) override
302  {
303  try
304  {
305  EngineGrpcServer::lock_t lock(this->_engineCallLock);
306 
307  int64_t engineTime = (this->_engineWrapper->runLoopStep(SimulationTime(request->timestep()))).count();
308 
309  reply->set_enginetime(engineTime);
310  }
311  catch(const std::exception &e)
312  {
313  return handleGrpcError("Error while executing runLoopStep", e.what());
314  }
315 
316  return grpc::Status::OK;
317  }
318 
332  grpc::Status setDataPacks( grpc::ServerContext * /*context*/,
333  const EngineGrpc::SetDataPacksRequest * request,
334  EngineGrpc::SetDataPacksReply * /*reply*/) override
335  {
336  try
337  {
338  EngineGrpcServer::lock_t lock(this->_engineCallLock);
339 
340  this->_engineWrapper->setDataPacks(*request);
341  }
342  catch(const std::exception &e)
343  {
344  return handleGrpcError("Error while executing setDataPack", e.what());
345  }
346 
347  return grpc::Status::OK;
348  }
349 
363  grpc::Status getDataPacks( grpc::ServerContext * /*context*/,
364  const EngineGrpc::GetDataPacksRequest * request,
365  EngineGrpc::GetDataPacksReply * reply) override
366  {
367  try
368  {
369  EngineGrpcServer::lock_t lock(this->_engineCallLock);
370 
371  this->_engineWrapper->getDataPacks(*request, reply);
372  }
373  catch(const std::exception &e)
374  {
375  return handleGrpcError("Error while executing getDataPack", e.what());
376  }
377 
378  return grpc::Status::OK;
379  }
380 
389  grpc::Status handleGrpcError(const std::string & contextMessage, const std::string & errorMessage)
390  {
391  NRPLogger::error("context message: [ {} ]", contextMessage);
392  NRPLogger::error("error message: [ {} ]", errorMessage);
393 
394  // Pass the error message to the client
395 
396  grpc::Status status(grpc::StatusCode::CANCELLED, errorMessage);
397 
398  return status;
399  }
400 };
401 
402 #endif // ENGINE_GRPC_SERVER_H
engine_proto_wrapper.h
EngineGrpcServer::EngineGrpcServer
EngineGrpcServer(const std::string serverAddress, EngineProtoWrapper *engineWrapper)
Constructor.
Definition: engine_grpc_server.h:72
EngineGrpcServer::lock_t
std::unique_lock< EngineGrpcServer::mutex_t > lock_t
Definition: engine_grpc_server.h:59
EngineGrpcServer::startServerAsync
void startServerAsync()
Starts the gRPC server in asynchronous mode.
Definition: engine_grpc_server.h:110
proto_field_ops.h
EngineGrpcServer
class for Engine server with gRPC support
Definition: engine_grpc_server.h:54
EngineGrpcServer::shutdownServer
void shutdownServer()
Shutdowns the gRPC server.
Definition: engine_grpc_server.h:118
datapack_controller.h
EngineGrpcServer::serverAddress
const std::string serverAddress() const
Returns address of the gRPC server.
Definition: engine_grpc_server.h:149
EngineGrpcServer::initRunFlag
virtual bool initRunFlag() const
Indicates if the simulation was initialized and is running.
Definition: engine_grpc_server.h:131
protobuf_ops.h
EngineGrpcServer::~EngineGrpcServer
~EngineGrpcServer()
Destructor.
Definition: engine_grpc_server.h:84
time_utils.h
EngineProtoWrapper
Abstract class defining an interface to interact with an Engine with data exchange via protobuf messa...
Definition: engine_proto_wrapper.h:48
EngineGrpcServer::startServer
void startServer()
Starts the gRPC server in synchronous mode.
Definition: engine_grpc_server.h:92
EngineGrpcServer::EngineGrpcServer
EngineGrpcServer()=delete
No dummy servers, only those with name and url.
NRPLogger::error
static void error(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for error level.
Definition: nrp_logger.h:160
EngineGrpcServer::mutex_t
std::timed_mutex mutex_t
Definition: engine_grpc_server.h:58
NRPLogger::debug
static void debug(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for debug level.
Definition: nrp_logger.h:127
SimulationTime
std::chrono::nanoseconds SimulationTime
Definition: time_utils.h:31
EngineGrpcServer::shutdownFlag
virtual bool shutdownFlag() const
Indicates if shutdown was requested by the client.
Definition: engine_grpc_server.h:136
proto_ops_manager.h
EngineGrpcServer::isServerRunning
bool isServerRunning() const
Indicates whether the gRPC server is currently running.
Definition: engine_grpc_server.h:141
NRP_LOGGER_TRACE
#define NRP_LOGGER_TRACE(...)
trace log macro. It is voided by defining \PRODUCTION_RELEASE
Definition: nrp_logger.h:39
json
nlohmann::json json
Definition: engine_json_server.cpp:31