BlueZero (BØ)
Middleware for distributed applications
node.h
1 #ifndef B0__NODE_H__INCLUDED
2 #define B0__NODE_H__INCLUDED
3 
4 #include <b0/user_data.h>
5 #include <b0/node_state.h>
6 #include <b0/socket.h>
7 #include <b0/logger/interface.h>
8 #include <b0/utils/time_sync.h>
9 
10 #include <atomic>
11 #include <set>
12 #include <string>
13 
14 #include <boost/thread.hpp>
15 #include <boost/thread/mutex.hpp>
16 
17 namespace b0
18 {
19 
20 namespace logger
21 {
22 
23 class Logger;
24 
25 } // namespace logger
26 
28 
29 struct NodePrivate;
30 struct NodePrivate2;
31 
33 
40 class Node : public logger::LogInterface, public UserData
41 {
42 public:
44 
58  Node(std::string nodeName = "");
59 
66  virtual ~Node();
67 
71  void setResolverAddress(const std::string &addr);
72 
80  virtual void init();
81 
89  virtual void shutdown();
90 
94  bool shutdownRequested() const;
95 
105  virtual void spinOnce();
106 
112  virtual void spin(double spinRate = 10.0);
113 
117  virtual void cleanup();
118 
119 protected:
126  virtual void startHeartbeatThread();
127 
128 public:
132  void log(LogLevel level, std::string message) const override;
133 
137  std::string getName() const;
138 
142  NodeState getState() const;
143 
147  void * getContext();
148 
152  virtual std::string getXPUBSocketAddress() const;
153 
157  virtual std::string getXSUBSocketAddress() const;
158 
159 private:
163  void addSocket(Socket *socket);
164 
168  void removeSocket(Socket *socket);
169 
170 public:
174  virtual std::string hostname();
175 
179  virtual int pid();
180 
184  virtual std::string threadID();
185 
189  virtual int freeTCPPort();
190 
194  virtual void notifyTopic(std::string topic_name, bool reverse, bool active);
195 
199  virtual void notifyService(std::string service_name, bool reverse, bool active);
200 
204  virtual void announceService(std::string service_name, std::string addr);
205 
209  virtual void resolveService(std::string service_name, std::string &addr);
210 
214  virtual void setAnnounceTimeout(int timeout = -1);
215 
216 protected:
220  virtual std::string freeTCPAddress();
221 
225  virtual void announceNode();
226 
230  virtual void notifyShutdown();
231 
235  virtual void heartbeatLoop();
236 
237 public:
241  int64_t hardwareTimeUSec() const;
242 
246  int64_t timeUSec();
247 
248 private:
249  std::unique_ptr<NodePrivate> private_;
250  std::unique_ptr<NodePrivate2> private2_;
251 
252 protected:
254  std::string resolv_addr_;
255 
258 
259 private:
261  std::string name_;
262 
264  NodeState state_;
265 
267  boost::thread::id thread_id_;
268 
270  boost::thread heartbeat_thread_;
271 
273  std::set<Socket*> sockets_;
274 
276  std::string xsub_sock_addr_;
277 
279  std::string xpub_sock_addr_;
280 
282  static std::atomic<bool> quit_flag_;
283 
285  bool shutdown_flag_;
286 
288  static bool sigint_handler_setup_;
289 
291  TimeSync time_sync_;
292 
294  static void signalHandler(int s);
295 
297  static void setupSIGINTHandler();
298 
299 public:
300  friend class Socket;
301 };
302 
303 } // namespace b0
304 
305 #endif // B0__NODE_H__INCLUDED
LogLevel
Definition: interface.h:23
virtual std::string threadID()
Return the thread identifier of this node.
virtual std::string getXPUBSocketAddress() const
Retrieve address of the proxy&#39;s XPUB socket.
virtual void resolveService(std::string service_name, std::string &addr)
Resolve service address by name.
virtual void notifyShutdown()
Notify resolver of this node shutdown.
virtual void setAnnounceTimeout(int timeout=-1)
Set the timeout for the announce phase. See b0::resolver::Client::setAnnounceTimeout() ...
void * getContext()
Get the ZeroMQ Context.
virtual void spin(double spinRate=10.0)
Run the spin loop (continuously call spinOnce(), at the specified rate, and call cleanup() at the end...
virtual void announceNode()
Announce this node to resolver.
bool shutdownRequested() const
Return wether shutdown has requested (by Node::shutdown() method or by pressing CTRL-C) ...
The abstraction for a node in the network.
Definition: node.h:40
virtual int pid()
Return the process id of this node.
virtual void startHeartbeatThread()
Start the heartbeat thread.
virtual int freeTCPPort()
Find and return an available TCP port.
std::string resolv_addr_
Target address of resolver client.
Definition: node.h:254
virtual void cleanup()
Node cleanup: stop all threads, send a shutdown notification to resolver, and so on...
The TimeSync class.
Definition: time_sync.h:60
int64_t timeUSec()
Return the adjusted time in microseconds. See Time Synchronization for details.
virtual void notifyTopic(std::string topic_name, bool reverse, bool active)
Notify topic publishing/subscription start or end.
virtual void notifyService(std::string service_name, bool reverse, bool active)
Notify service advertising/use start or end.
virtual void spinOnce()
Read all available messages from the various ZeroMQ sockets, and dispatch them to callbacks...
virtual std::string freeTCPAddress()
Find and return an available tcp address, e.g. tcp://hostname:portnumber.
virtual std::string hostname()
Return the public address (IP or hostname) to reach this node on the network.
logger::LogInterface * p_logger_
The logger of this node.
Definition: node.h:257
The Socket class.
Definition: socket.h:31
NodeState getState() const
Get the state of this node.
virtual void log(LogLevel level, std::string message) const =0
Log a message to the remote logger, with a specified level.
void log(LogLevel level, std::string message) const override
Log a message to the default logger of this node.
int64_t hardwareTimeUSec() const
Return this computer&#39;s clock time in microseconds.
virtual ~Node()
Destruct this node.
Node(std::string nodeName="")
Create a node with a given name.
virtual void init()
Initialize the node (connect to resolve, start heartbeat, announce node name)
Base class to add logging functionalities to nodes.
Definition: interface.h:17
Definition: node.h:17
virtual std::string getXSUBSocketAddress() const
Retrieve address of the proxy&#39;s XSUB socket.
virtual void announceService(std::string service_name, std::string addr)
Announce service address.
virtual void shutdown()
Shutdown the node (stop all running threads, send shutdown notification)
virtual void heartbeatLoop()
The heartbeat message loop (run in its own thread)
std::string getName() const
Get the name assigned by resolver to this node.
void setResolverAddress(const std::string &addr)
Specify a different value for the resolver address. Otherwise B0_RESOLVER env var is used...