NRP Core  1.4.1
spinnaker_proxy.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef SPINNAKER_PROXY_H
23 #define SPINNAKER_PROXY_H
24 
25 #include <boost/python.hpp>
26 #include <functional>
27 #include <nlohmann/json.hpp>
28 #include <thread>
29 #include <future>
30 #include <vector>
31 #include <iostream>
32 #include <SpynnakerLiveSpikesConnection.h>
33 
35 
36 namespace bpy = boost::python;
37 
38 #define use(x) do {} while ((x)!=(x))
39 
40 // Conversion factor from an integer containing an S1615 value to a float
41 #define VOLTAGE_INT_TO_S1615 32768.0f
42 
44 public:
45  virtual void new_msg_callback(nlohmann::json msg) = 0;
47 };
48 
49 class NRPSpinnakerProxy : SpikeReceiveCallbackInterface,
50  SpikesStartCallbackInterface, PayloadReceiveCallbackInterface {
51 
52 public:
53 
55  { this->stopSpinnaker(); }
56 
57  // Delete move and copy operators. This ensures this class is a singleton
58  NRPSpinnakerProxy(const NRPSpinnakerProxy &) = delete;
60 
63 
68 
73 
75  {
76  std::lock_guard<std::mutex> lk(runningMutex);
77  if (!started) {
78  started = true;
79  _runFuture = std::async(&NRPSpinnakerProxy::runSpinnaker, this);
80  }
81  }
82 
83  void spikes_start(char *label, SpynnakerLiveSpikesConnection *connection) {
84  use(label);
85  use(connection);
86  NRPLogger::info("SpiNNaker has started!");
87  running = true;
88  if (n_send_warnings > 0) {
89  NRPLogger::warn("Ignored {} warnings before start", n_send_warnings);
90  }
91  }
92 
93  void runSpinnaker()
94  {
95  char **charReceiveLabels = new char*[receiveLabels.size()];
96  char **charSendLabels = new char*[sendLabels.size()];
97  char *a_label = NULL;
98  for (std::size_t i = 0; i < receiveLabels.size(); i++) {
99  charReceiveLabels[i] = (char *) receiveLabels[i].c_str();
100  a_label = charReceiveLabels[i];
101  }
102  for (std::size_t i = 0; i < sendLabels.size(); i++) {
103  charSendLabels[i] = (char *) sendLabels[i].c_str();
104  a_label = charSendLabels[i];
105  }
106 
107  if (a_label == NULL) {
108  NRPLogger::warn("No senders or receivers have been detected!");
109  } else {
110 
111  connection = new SpynnakerLiveSpikesConnection(
112  receiveLabels.size(), charReceiveLabels,
113  sendLabels.size(), charSendLabels);
114  for (std::size_t i = 0; i < receiveLabels.size(); i++) {
115  connection->add_receive_callback(
116  (char *) receiveLabels[i].c_str(), (SpikeReceiveCallbackInterface *) this);
117  connection->add_receive_callback(
118  (char *) receiveLabels[i].c_str(), (PayloadReceiveCallbackInterface *) this);
119  }
120  connection->add_start_callback(a_label, this);
121  }
122 
123  NRPLogger::debug("Waiting for GIL");
124  PyGILState_STATE gstate = PyGILState_Ensure();
125  try
126  {
127  if (connection != NULL) {
128  NRPLogger::debug("Adding database socket address");
129  pySpinnakerExt.attr("add_database_socket_address")(
130  NULL, connection->get_local_port(), NULL);
131  }
132  NRPLogger::info("Running forever!");
133  pySpinnakerExt.attr("run_forever")();
134  NRPLogger::info("Finished running forever!");
135  }
136  catch (bpy::error_already_set const &)
137  {
138  PyErr_Print();
139  }
140  NRPLogger::debug("Releasing GIL");
141  PyGILState_Release(gstate);
142  }
143 
145  {
146  std::lock_guard<std::mutex> lk(runningMutex);
147  if(!running)
148  {
149  return;
150  }
151  running = false;
152  PyGILState_STATE gstate = PyGILState_Ensure();
153  try
154  {
155  pySpinnakerExt.attr("request_stop")();
156  }
157  catch (bpy::error_already_set const &) {
158  PyErr_Print();
159  }
160  PyGILState_Release(gstate);
161  _runFuture.wait();
162  gstate = PyGILState_Ensure();
163  try
164  {
165  pySpinnaker.attr("end")();
166  }
167  catch (bpy::error_already_set const &) {
168  PyErr_Print();
169  }
170  PyGILState_Release(gstate);
171  started = false;
172  }
173 
174  void addSender(std::string &label)
175  {
176  if (started) {
177  NRPLogger::error("A new sender cannot be added when SpiNNaker is already running!");
178  }
179  if (std::find(sendLabels.begin(), sendLabels.end(), label) == sendLabels.end()) {
180  sendLabels.push_back(label);
181  }
182  }
183 
184  // Also adapt 'callback' signature as needed
185  void addReceiver(std::string &label, SpiNNakerJsonReceiveCallbackInterface *callback)
186  {
187  if (started) {
188  NRPLogger::error("A new receiver cannot be added when SpiNNaker is already running!");
189  }
190  auto label_callbacks = callbacks.find(label);
191  if (label_callbacks == callbacks.end()) {
192  callbacks[label] = std::vector<SpiNNakerJsonReceiveCallbackInterface *>();
193  receiveLabels.push_back(label);
194  }
195  callbacks[label].push_back(callback);
196  }
197 
198  void send(std::string &label, const nlohmann::json* data)
199  {
200  if (!running) {
201  if (n_send_warnings == 0) {
202  NRPLogger::warn("Not sending until SpiNNaker is running...");
203  }
204  n_send_warnings++;
205  return;
206  }
207 
208  if(n_send_warnings > 0) {
209  NRPLogger::warn("SpiNNaker is running, sending enabled");
210  n_send_warnings = 0;
211  }
212 
213  std::vector<int> spikes;
214  if (data->contains("neuron_ids")) {
215  nlohmann::json neuron_ids = data->at("neuron_ids");
216  for (size_t i = 0; i < neuron_ids.size(); i++) {
217  spikes.push_back(neuron_ids[i]);
218  }
219  }
220  std::vector<rate_details> rates_to_send;
221  if (data->contains("rates")) {
222  nlohmann::json rates = data->at("rates");
223  for (size_t i = 0; i < rates.size(); i++) {
224  int neuron_id = rates[i].at("neuron_id");
225  float rate = rates[i].at("rate");
226  rates_to_send.push_back({.neuron_id=neuron_id, .rate=rate});
227  NRPLogger::debug("Sending rate of neuron {} to {} in {}", neuron_id, rate, label);
228  }
229  }
230  if ((spikes.size() > 0 && rates_to_send.size() > 0)
231  || (spikes.size() == 0 && rates_to_send.size() == 0)) {
232  NRPLogger::error("Both neuron_ids and rates found in data. Please"
233  " use neuron_ids to send spikes to a SpikeInjector or"
234  " rates to set the rates of a SpikeSourcePoisson");
235  }
236  if (spikes.size() > 0) {
237  connection->send_spikes((char *) label.c_str(), spikes);
238  } else {
239  connection->send_rates((char *) label.c_str(), rates_to_send);
240  }
241  }
242 
243  void receive_spikes(char *label, int time, int n_spikes, int* spikes)
244  {
245  std::string strLabel(label);
246  auto recvCallbacks = callbacks[strLabel];
247  nlohmann::json keyArray = nlohmann::json::array();
248  for (int i = 0; i < n_spikes; i++) {
249  keyArray.push_back(spikes[i]);
250  }
251  nlohmann::json data = {
252  {"label", strLabel},
253  {"time", time},
254  {"spikes", keyArray}
255  };
256  for (std::size_t i = 0; i < recvCallbacks.size(); i++) {
257  recvCallbacks[i]->new_msg_callback(data);
258  }
259  }
260 
261  void receive_payloads(char *label, int n_payloads, payload_details *payloads) {
262  std::string strLabel(label);
263  auto recvCallbacks = callbacks[strLabel];
264  nlohmann::json payloadArray = nlohmann::json::array();
265  for (int i = 0; i < n_payloads; i++) {
266  nlohmann::json payload = {
267  {"neuron_id", payloads[i].neuron_id},
268  {"voltage", (float) payloads[i].payload / VOLTAGE_INT_TO_S1615}
269  };
270  payloadArray.push_back(payload);
271  }
272  nlohmann::json data = {
273  {"label", strLabel},
274  {"voltages", payloadArray}
275  };
276  for (std::size_t i = 0; i < recvCallbacks.size(); i++) {
277  recvCallbacks[i]->new_msg_callback(data);
278  }
279  }
280 
281 private:
282  bpy::object pySpinnaker;
283  bpy::object pySpinnakerExt;
284  std::future<void> _runFuture;
285  bool started = false;
286  bool running = false;
287  std::mutex runningMutex;
288  std::map<std::string,
289  std::vector<SpiNNakerJsonReceiveCallbackInterface *>> callbacks;
290  SpynnakerLiveSpikesConnection *connection = NULL;
291  std::vector<std::string> receiveLabels;
292  std::vector<std::string> sendLabels;
293  int n_send_warnings = 0;
294 
296  {
297  try
298  {
299  pySpinnaker = bpy::import("pyNN.spiNNaker");
300  pySpinnakerExt = pySpinnaker.attr("external_devices");
301  }
302  catch (bpy::error_already_set const &)
303  {
304  PyErr_Print();
305  }
306  }
307 
308  static std::unique_ptr<NRPSpinnakerProxy> _instance;
309 
310 };
311 
312 #endif //SPINNAKER_PROXY_H
NRPSpinnakerProxy::receive_spikes
void receive_spikes(char *label, int time, int n_spikes, int *spikes)
Definition: spinnaker_proxy.h:243
nrp_logger.h
NRPLogger::warn
static void warn(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for warning level.
Definition: nrp_logger.h:149
NRPSpinnakerProxy::operator=
NRPSpinnakerProxy & operator=(const NRPSpinnakerProxy &)=delete
NRPSpinnakerProxy::send
void send(std::string &label, const nlohmann::json *data)
Definition: spinnaker_proxy.h:198
SpiNNakerJsonReceiveCallbackInterface::~SpiNNakerJsonReceiveCallbackInterface
virtual ~SpiNNakerJsonReceiveCallbackInterface()
Definition: spinnaker_proxy.h:46
NRPSpinnakerProxy::~NRPSpinnakerProxy
~NRPSpinnakerProxy()
Definition: spinnaker_proxy.h:54
use
#define use(x)
Definition: spinnaker_proxy.h:38
NRPSpinnakerProxy::startSpinnaker
void startSpinnaker()
Definition: spinnaker_proxy.h:74
NRPSpinnakerProxy::addSender
void addSender(std::string &label)
Definition: spinnaker_proxy.h:174
NRPLogger::info
static void info(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for info level.
Definition: nrp_logger.h:138
SpiNNakerJsonReceiveCallbackInterface
Definition: spinnaker_proxy.h:43
SpiNNakerJsonReceiveCallbackInterface::new_msg_callback
virtual void new_msg_callback(nlohmann::json msg)=0
NRPSpinnakerProxy::runSpinnaker
void runSpinnaker()
Definition: spinnaker_proxy.h:93
VOLTAGE_INT_TO_S1615
#define VOLTAGE_INT_TO_S1615
Definition: spinnaker_proxy.h:41
NRPSpinnakerProxy::getInstance
static NRPSpinnakerProxy & getInstance()
Get singleton instance of NRPSpinnakerProxy.
Definition: spinnaker_proxy.cpp:26
NRPSpinnakerProxy::stopSpinnaker
void stopSpinnaker()
Definition: spinnaker_proxy.h:144
NRPSpinnakerProxy::addReceiver
void addReceiver(std::string &label, SpiNNakerJsonReceiveCallbackInterface *callback)
Definition: spinnaker_proxy.h:185
NRPSpinnakerProxy::receive_payloads
void receive_payloads(char *label, int n_payloads, payload_details *payloads)
Definition: spinnaker_proxy.h:261
NRPLogger::error
static void error(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for error level.
Definition: nrp_logger.h:160
NRPSpinnakerProxy
Definition: spinnaker_proxy.h:49
NRPLogger::debug
static void debug(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for debug level.
Definition: nrp_logger.h:127
NRPSpinnakerProxy::resetInstance
static NRPSpinnakerProxy & resetInstance()
Reset singleton instance.
Definition: spinnaker_proxy.cpp:31
json
nlohmann::json json
Definition: engine_json_server.cpp:31
NRPSpinnakerProxy::spikes_start
void spikes_start(char *label, SpynnakerLiveSpikesConnection *connection)
Definition: spinnaker_proxy.h:83