NRP Core  1.4.1
input_node.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef INPUT_ENGINE_NODE_H
23 #define INPUT_ENGINE_NODE_H
24 
25 #include <boost/python.hpp>
26 #include <mutex>
27 
30 
33 
40 class InputEngineNode : public InputNode<DataPackInterface> {
41 public:
42 
46  InputEngineNode(const std::string &id, const std::string &engineName) :
47  InputNode(id),
48  _engineName(engineName)
49  {}
50 
51  std::string typeStr() const override
52  { return "FromEngine"; }
53 
57  const std::set<DataPackIdentifier>& requestedDataPacks() const
58  { return _dataIds; }
59 
64  {
65  std::lock_guard<std::mutex> lock(_dataMutex);
66 
67  // TODO: in order to use MsgPublishPolicy::ALL policy with this type of node a vector of datapacks should be
68  // store, not just one which is being overwritten
69  // move datapacks into temporary storage without copying the shared pointer
70  for(auto dpack: dpacks) {
71  auto name = dpack->name();
72  if(this->_portMap.count(name) &&
73  (!_dataTemp.count(name) || !dpack->isEmpty()))
74  _dataTemp[name] = std::move(dpack);
75  }
76  }
77 
78 protected:
79 
80  void configure() override
81  {
82  for(auto& [id, port] : _portMap)
83  _dataIds.insert(DataPackIdentifier(id, _engineName, ""));
84  }
85 
86  bool updatePortData(const std::string& id) override
87  {
88  std::lock_guard<std::mutex> lock(_dataMutex);
89 
90  // move temp datapack to store without copying shared pointer and update pointer,
91  // only if there is no dapatapack 'id' stored already or the new one is not empty
92  auto d = _dataTemp.find(id);
93  if(d != _dataTemp.end() && (!_dataStore.count(id) || !_dataTemp.at(id)->isEmpty())) {
94  _portMap.at(id).clear();
95 
96  _dataStore[id] = std::move(d->second);
97  _portMap.at(id).addMsg(_dataStore[id].get());
98 
99  _dataTemp.erase(d);
100 
101  return true;
102  }
103 
104  return false;
105  }
106 
107 private:
108 
110  std::string _engineName;
112  std::mutex _dataMutex;
114  std::map<std::string, DataPackInterfaceConstSharedPtr> _dataTemp;
116  std::map<std::string, DataPackInterfaceConstSharedPtr> _dataStore;
118  std::set<DataPackIdentifier> _dataIds;
119 };
120 
121 
122 class InputEngineEdge : public SimpleInputEdge<DataPackInterface, InputEngineNode> {
123 
124 public:
125 
126  InputEngineEdge(const std::string& keyword, const std::string& address,
127  InputNodePolicies::MsgCachePolicy msgCachePolicy) :
128  SimpleInputEdge<DataPackInterface, InputEngineNode>(keyword, ComputationalNode::parseNodeAddress(address).first+"_input",
129  ComputationalNode::parseNodeAddress(address).second,
130  InputNodePolicies::LAST, msgCachePolicy),
131  _engineName(ComputationalNode::parseNodeAddress(address).first)
132  {}
133 
134 protected:
135 
137  { return new InputEngineNode(this->_id, _engineName); }
138 
139 private:
140 
141  std::string _engineName;
142 };
143 
144 
145 #endif //INPUT_ENGINE_NODE_H
InputNodePolicies::MsgCachePolicy
MsgCachePolicy
Defines input node message cache behavior.
Definition: computational_node_policies.h:27
datapacks_vector_t
std::vector< std::shared_ptr< const DataPackInterface > > datapacks_vector_t
Definition: datapack_interface.h:220
input_node.h
InputNode
Implementation of an input node in the computation graph.
Definition: input_node.h:129
InputNode< DataPackInterface >::_portMap
std::map< std::string, DataPortHandle< DataPackInterface > > _portMap
Map containing data to handle topics. Data is guaranteed to be unchanged between 'compute' calls
Definition: input_node.h:219
InputEngineNode
Input node used to connect an EngineClient with the computational graph.
Definition: input_node.h:40
datapack_interface.h
InputEngineNode::InputEngineNode
InputEngineNode(const std::string &id, const std::string &engineName)
Definition: input_node.h:46
engine_client_interface.h
ComputationalNode::id
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
InputEngineNode::typeStr
std::string typeStr() const override
Returns the node 'type' as a string.
Definition: input_node.h:51
InputEngineEdge
Definition: input_node.h:122
InputEngineNode::updatePortData
bool updatePortData(const std::string &id) override
Updates pointers stored in _portMap for port 'id'.
Definition: input_node.h:86
InputEngineNode::configure
void configure() override
Configures the node making it ready to execute 'compute'.
Definition: input_node.h:80
InputEdge::_id
std::string _id
Definition: input_edge.h:151
InputEngineNode::setDataPacks
void setDataPacks(datapacks_vector_t dpacks)
Definition: input_node.h:63
InputEngineNode::requestedDataPacks
const std::set< DataPackIdentifier > & requestedDataPacks() const
Definition: input_node.h:57
InputEngineEdge::InputEngineEdge
InputEngineEdge(const std::string &keyword, const std::string &address, InputNodePolicies::MsgCachePolicy msgCachePolicy)
Definition: input_node.h:126
python_json_engine.port
port
Definition: python_json_engine.py:197
input_edge.h
InputNodePolicies::LAST
@ LAST
Definition: computational_node_policies.h:34
DataPackInterface
Interface to datapacks.
Definition: datapack_interface.h:89
InputEngineEdge::makeNewNode
InputEngineNode * makeNewNode() override
Definition: input_node.h:136
InputEdge
Helper template class used to implement Python input edge decorators.
Definition: input_edge.h:38
ComputationalNode
Base class implementing a node in the computational graph.
Definition: computational_node.h:31
InputNodePolicies
Definition: computational_node_policies.h:25