BlueZero (BØ)
Middleware for distributed applications
resolver.h
1 #ifndef B0__RESOLVER__RESOLVER_H__INCLUDED
2 #define B0__RESOLVER__RESOLVER_H__INCLUDED
3 
4 #include <b0/node.h>
5 #include <b0/protobuf/service_server.h>
6 #include <b0/protobuf/publisher.h>
7 
8 #include <string>
9 #include <vector>
10 #include <set>
11 #include <boost/thread.hpp>
12 #include <boost/format.hpp>
13 
14 namespace b0
15 {
16 
17 namespace resolver
18 {
19 
21 
22 struct ServiceEntry;
23 
24 struct NodeEntry
25 {
26  std::string host_id;
27  int process_id;
28  std::string thread_id;
29  std::string name;
30  boost::posix_time::ptime last_heartbeat;
31  std::vector<ServiceEntry*> services;
32 };
33 
34 struct ServiceEntry
35 {
36  NodeEntry *node;
37  std::string name;
38  std::string addr;
39 };
40 
41 class Resolver;
42 
43 class ResolverServiceServer : public b0::protobuf::ServiceServer<b0::resolver_msgs::Request, b0::resolver_msgs::Response>
44 {
45 public:
46  ResolverServiceServer(Resolver *resolver);
47 
51  void setPort(int port);
52 
56  int port() const;
57 
58 protected:
62  virtual void announce() override;
63 
65  Resolver *resolver_;
66 
68  int port_;
69 };
70 
72 
76 class Resolver : public Node
77 {
78 public:
82  Resolver();
83 
87  virtual ~Resolver();
88 
92  void init() override;
93 
97  void shutdown() override;
98 
102  virtual std::string getXPUBSocketAddress() const override;
103 
107  virtual std::string getXSUBSocketAddress() const override;
108 
112  virtual void announceNode() override;
113 
117  virtual void notifyShutdown() override;
118 
122  void onNodeConnected(std::string name);
123 
127  void onNodeDisconnected(std::string name);
128 
132  void onNodeTopicPublishStart(std::string node_name, std::string topic_name);
133 
137  void onNodeTopicPublishStop(std::string node_name, std::string topic_name);
138 
142  void onNodeTopicSubscribeStart(std::string node_name, std::string topic_name);
143 
147  void onNodeTopicSubscribeStop(std::string node_name, std::string topic_name);
148 
152  void onNodeServiceOfferStart(std::string node_name, std::string service_name);
153 
157  void onNodeServiceOfferStop(std::string node_name, std::string service_name);
158 
162  void onNodeServiceUseStart(std::string node_name, std::string service_name);
163 
167  void onNodeServiceUseStop(std::string node_name, std::string service_name);
168 
172  void pubProxy(int xsub_proxy_port, int xpub_proxy_port);
173 
177  bool nodeNameExists(std::string name);
178 
182  virtual void setResolverPort(int port);
183 
187  virtual std::string address(std::string host, int port);
188 
192  virtual std::string address(int port);
193 
197  virtual resolver::NodeEntry * nodeByName(std::string node_name);
198 
202  virtual resolver::ServiceEntry * serviceByName(std::string service_name);
203 
207  virtual void heartBeat(resolver::NodeEntry *node_entry);
208 
212  virtual void handle(const b0::resolver_msgs::Request &req, b0::resolver_msgs::Response &resp);
213 
217  std::string makeUniqueNodeName(std::string nodeName);
218 
223 
228 
233 
238 
243 
248 
253 
258 
262  void getGraph(b0::resolver_msgs::Graph &graph);
263 
270  void onGraphChanged();
271 
275  void heartBeatSweeper();
276 
283  void setOnlineMonitoring(bool enabled);
284 
285 protected:
287  ResolverServiceServer resolv_server_;
288 
290  std::string xsub_proxy_addr_;
291 
293  std::string xpub_proxy_addr_;
294 
296  boost::thread pub_proxy_thread_;
297 
300 
302  std::map<std::string, resolver::NodeEntry*> nodes_by_name_;
303 
305  std::map<std::string, resolver::NodeEntry*> nodes_by_key_;
306 
308  std::map<std::string, resolver::ServiceEntry*> services_by_name_;
309 
311  std::set<std::pair<std::string, std::string> > node_publishes_topic_;
312 
314  std::set<std::pair<std::string, std::string> > node_subscribes_topic_;
315 
317  std::set<std::pair<std::string, std::string> > node_offers_service_;
318 
320  std::set<std::pair<std::string, std::string> > node_uses_service_;
321 
323  b0::protobuf::Publisher<b0::resolver_msgs::Graph> graph_pub_;
324 
328 };
329 
330 } // namespace resolver
331 
332 } // namespace b0
333 
334 #endif // B0__RESOLVER__RESOLVER_H__INCLUDED
This is the response returned by the resolver service.
Definition: resolver.proto:179
std::set< std::pair< std::string, std::string > > node_subscribes_topic_
Graph edges node <– topic.
Definition: resolver.h:314
void shutdown() override
Shutdown this node (set a flag such that Node::shutdownRequested() returns true)
void onNodeServiceUseStop(std::string node_name, std::string service_name)
void handleGetGraph(const b0::resolver_msgs::GetGraphRequest &req, b0::resolver_msgs::GetGraphResponse &resp)
Handle the GetGraph request.
void onNodeTopicSubscribeStart(std::string node_name, std::string topic_name)
virtual void handle(const b0::resolver_msgs::Request &req, b0::resolver_msgs::Response &resp)
Handle a service on the resolv service.
void pubProxy(int xsub_proxy_port, int xpub_proxy_port)
The XSUB/XPUB proxy (will be started in a separate thread)
Response to HeartBeatRequest message.
Definition: resolver.proto:90
virtual void handleAnnounceService(const b0::resolver_msgs::AnnounceServiceRequest &rq, b0::resolver_msgs::AnnounceServiceResponse &rsp)
Handle the AnnounceService request.
void onNodeConnected(std::string name)
bool nodeNameExists(std::string name)
Checks wether a node with this name exists in the connected nodes list.
b0::protobuf::Publisher< b0::resolver_msgs::Graph > graph_pub_
Publisher of the Graph message.
Definition: resolver.h:323
virtual void handleShutdownNode(const b0::resolver_msgs::ShutdownNodeRequest &rq, b0::resolver_msgs::ShutdownNodeResponse &rsp)
Handle the ShutdownNode request.
boost::thread pub_proxy_thread_
The thread running the ZeroMQ XSUB/XPUB proxy.
Definition: resolver.h:296
Response to NodeTopicRequest message.
Definition: resolver.proto:110
Response to ResolveServiceRequest message.
Definition: resolver.proto:73
void setOnlineMonitoring(bool enabled)
Enable or diable online node monitoring.
void onNodeServiceOfferStart(std::string node_name, std::string service_name)
void handleNodeTopic(const b0::resolver_msgs::NodeTopicRequest &req, b0::resolver_msgs::NodeTopicResponse &resp)
Handle the NodeTopic request.
Sent by node to tell a topic it is publishing onto/subscribing to.
Definition: resolver.proto:99
Sent by node to resolver, to announce its presence and try to self-assign a name. ...
Definition: resolver.proto:8
void onNodeServiceOfferStop(std::string node_name, std::string service_name)
The abstraction for a node in the network.
Definition: node.h:40
virtual void handleResolveService(const b0::resolver_msgs::ResolveServiceRequest &rq, b0::resolver_msgs::ResolveServiceResponse &rsp)
Handle the ResolveService request.
Sent by node to resolver when shutting down (it&#39;s not really a request buit rather a notification) ...
Definition: resolver.proto:29
Response to AnnounceServiceRequest message.
Definition: resolver.proto:57
virtual std::string address(std::string host, int port)
Format a tcp:// address.
void handleNodeService(const b0::resolver_msgs::NodeServiceRequest &req, b0::resolver_msgs::NodeServiceResponse &resp)
Handle the NodeService request.
Sent by ServiceServer to announce a service by some name The name must be unique. ...
Definition: resolver.proto:47
std::string xpub_proxy_addr_
Public address of the XPUB socket of the ZeroMQ proxy.
Definition: resolver.h:293
std::map< std::string, resolver::ServiceEntry * > services_by_name_
Map of services by name.
Definition: resolver.h:308
This is the message accepted by the resolver service.
Definition: resolver.proto:164
std::string xsub_proxy_addr_
Public address of the XSUB socket of the ZeroMQ proxy.
Definition: resolver.h:290
Response to NodeServiceRequest message.
Definition: resolver.proto:128
virtual resolver::NodeEntry * nodeByName(std::string node_name)
Get the NodeEntry given the node name.
std::set< std::pair< std::string, std::string > > node_offers_service_
Graph edges node –> service.
Definition: resolver.h:317
void getGraph(b0::resolver_msgs::Graph &graph)
virtual void notifyShutdown() override
Hijack notifyShutdown step.
The resolver node.
Definition: resolver.h:76
void onNodeDisconnected(std::string name)
Sent by resolver in reply to AnnounceNodeRequest message, to assign final name and give some connecti...
Definition: resolver.proto:17
void init() override
Perform node initialization.
Sent by node to tell a service it is offering.
Definition: resolver.proto:117
virtual void announceNode() override
Hijack announceNode step.
virtual std::string getXPUBSocketAddress() const override
Retrieve address of the proxy&#39;s XPUB socket.
std::set< std::pair< std::string, std::string > > node_uses_service_
Graph edges node <– service.
Definition: resolver.h:320
ResolverServiceServer resolv_server_
The ServiceServer serving the requests for the resolv protocol.
Definition: resolver.h:287
void onNodeTopicSubscribeStop(std::string node_name, std::string topic_name)
std::map< std::string, resolver::NodeEntry * > nodes_by_key_
Map of nodes by key.
Definition: resolver.h:305
Heartbeat sent by node to resolver.
Definition: resolver.proto:82
Sent by node to resolver, for getting the full graph.
Definition: resolver.proto:151
A complete graph of the network.
Definition: resolver.proto:141
std::map< std::string, resolver::NodeEntry * > nodes_by_name_
Map of nodes by name.
Definition: resolver.h:302
void onGraphChanged()
Called when the global graph changes.
virtual void setResolverPort(int port)
Set a specific port number to use (otherwise B0_RESOLVER_PORT will be used).
bool online_monitoring_
If false, silent nodes (i.e.
Definition: resolver.h:327
virtual resolver::ServiceEntry * serviceByName(std::string service_name)
Get the ServiceEntry given the service name.
boost::thread heartbeat_sweeper_thread_
The heartbeat sweeper thread.
Definition: resolver.h:299
virtual std::string getXSUBSocketAddress() const override
Retrieve address of the proxy&#39;s XSUB socket.
virtual ~Resolver()
Resolver node destructor.
Response to GetGraphRequest message.
Definition: resolver.proto:158
void heartBeatSweeper()
Code to run in the heartbeat sweeper thread.
Sent by resolver to node in reply to ShutdownNodeRequest (but probably will not be received) ...
Definition: resolver.proto:38
void onNodeTopicPublishStop(std::string node_name, std::string topic_name)
Sent by a ServiceClient to resolve a service name to a ZeroMQ address.
Definition: resolver.proto:65
Definition: node.h:17
void onNodeServiceUseStart(std::string node_name, std::string service_name)
virtual void handleHeartBeat(const b0::resolver_msgs::HeartBeatRequest &rq, b0::resolver_msgs::HeartBeatResponse &rsp)
Handle the HeartBeat request.
virtual void handleAnnounceNode(const b0::resolver_msgs::AnnounceNodeRequest &rq, b0::resolver_msgs::AnnounceNodeResponse &rsp)
Handle the AnnounceNode request.
std::string makeUniqueNodeName(std::string nodeName)
Adjust nodeName such that it is unique in the network (amongst the list of connected nodes) ...
Resolver()
Construct a resolver node.
void onNodeTopicPublishStart(std::string node_name, std::string topic_name)
virtual void heartBeat(resolver::NodeEntry *node_entry)
Update the NodeEntry timestamp.
std::set< std::pair< std::string, std::string > > node_publishes_topic_
Graph edges node –> topic.
Definition: resolver.h:311