BlueZero (BØ)
Middleware for distributed applications
socket.h
1 #ifndef B0__SOCKET_H__INCLUDED
2 #define B0__SOCKET_H__INCLUDED
3 
4 #include <string>
5 
6 #include <b0/user_data.h>
7 #include <b0/logger/interface.h>
8 
9 #include <boost/function.hpp>
10 #include <boost/bind.hpp>
11 
12 namespace b0
13 {
14 
15 class Node;
16 
18 
19 struct SocketPrivate;
20 
22 
31 class Socket : public logger::LogInterface, public UserData
32 {
33 public:
37  Socket(Node *node, int type, std::string name, bool managed = true);
38 
42  virtual ~Socket();
43 
48  void setHasHeader(bool has_header);
49 
53  void log(LogLevel level, std::string message) const override;
54 
58  virtual void init() = 0;
59 
63  virtual void cleanup() = 0;
64 
68  virtual void spinOnce();
69 
73  void setRemoteAddress(std::string addr);
74 
78  std::string getName() const;
79 
83  Node & getNode() const;
84 
93  bool matchesPattern(const std::string &pattern) const;
94 
95 private:
96  std::unique_ptr<SocketPrivate> private_;
97 
98 protected:
101 
103  std::string name_;
104 
108 
110  const bool managed_;
111 
113  std::string remote_addr_;
114 
115 public:
119  virtual void readRaw(std::string &msg);
120 
124  virtual void readRaw(std::string &msg, std::string &type);
125 
130  virtual bool poll(long timeout = 0);
131 
135  virtual void writeRaw(const std::string &msg, const std::string &type = "");
136 
137 public:
145  void setCompression(std::string algorithm, int level = -1);
146 
147 private:
150  std::string compression_algorithm_;
151 
154  int compression_level_;
155 
156 public:
158  int getReadTimeout() const;
159 
161  void setReadTimeout(int timeout);
162 
164  int getWriteTimeout() const;
165 
167  void setWriteTimeout(int timeout);
168 
170  int getLingerPeriod() const;
171 
173  void setLingerPeriod(int period);
174 
176  int getBacklog() const;
177 
179  void setBacklog(int backlog);
180 
182  bool getImmediate() const;
183 
185  void setImmediate(bool immediate);
186 
188  bool getConflate() const;
189 
191  void setConflate(bool conflate);
192 
194  int getReadHWM() const;
195 
197  void setReadHWM(int n);
198 
200  int getWriteHWM() const;
201 
203  void setWriteHWM(int n);
204 
205 protected:
207  void connect(std::string const &addr);
208 
210  void disconnect(std::string const &addr);
211 
213  void bind(std::string const &addr);
214 
216  void unbind(std::string const &addr);
217 
219  void setsockopt(int option, const void *optval, size_t optvallen);
220 
222  void getsockopt(int option, void *optval, size_t *optvallen) const;
223 
225  void setIntOption(int option, int value);
226 
228  int getIntOption(int option) const;
229 };
230 
231 } // namespace b0
232 
233 #endif // B0__SOCKET_H__INCLUDED
void unbind(std::string const &addr)
Wrapper to zmq::socket_t::unbind.
LogLevel
Definition: interface.h:23
virtual void spinOnce()
Process incoming messages and call callbacks.
int getBacklog() const
(low-level socket option) Get backlog
void log(LogLevel level, std::string message) const override
Log a message to the default logger of this node.
virtual bool poll(long timeout=0)
Poll for messages. If timeout is 0 return immediately, otherwise wait for the specified amount of mil...
void setIntOption(int option, int value)
High level wrapper for setsockopt.
void getsockopt(int option, void *optval, size_t *optvallen) const
Wrapper to zmq::socket_t::getsockopt.
virtual void readRaw(std::string &msg)
Read a raw payload from the underlying ZeroMQ socket.
Socket(Node *node, int type, std::string name, bool managed=true)
Construct a Socket.
void setImmediate(bool immediate)
(low-level socket option) Set immediate flag
int getReadHWM() const
(low-level socket option) Get read high-water-mark
Node & node_
The Node owning this Socket.
Definition: socket.h:100
bool has_header_
True if the payload has a header part (i.e.
Definition: socket.h:107
void bind(std::string const &addr)
Wrapper to zmq::socket_t::bind.
virtual void writeRaw(const std::string &msg, const std::string &type="")
Write a raw payload.
std::string name_
This socket bus name.
Definition: socket.h:103
int getIntOption(int option) const
High level wrapper for getsockopt.
std::string getName() const
Return the name of the socket bus.
bool getImmediate() const
(low-level socket option) Get immediate flag
The abstraction for a node in the network.
Definition: node.h:40
void connect(std::string const &addr)
Wrapper to zmq::socket_t::connect.
void setReadHWM(int n)
(low-level socket option) Set read high-water-mark
void setWriteHWM(int n)
(low-level socket option) Set write high-water-mark
void setCompression(std::string algorithm, int level=-1)
Set compression algorithm and level.
Node & getNode() const
Return the node owning this socket.
bool matchesPattern(const std::string &pattern) const
Check if this socket name matches the specified pattern.
virtual void init()=0
Perform initialization (resolve name, connect socket, set subscription)
void setBacklog(int backlog)
(low-level socket option) Set backlog
virtual ~Socket()
Socket destructor.
int getReadTimeout() const
(low-level socket option) Get read timeout (in milliseconds, -1 for no timeout)
void setsockopt(int option, const void *optval, size_t optvallen)
Wrapper to zmq::socket_t::setsockopt.
void disconnect(std::string const &addr)
Wrapper to zmq::socket_t::disconnect.
The Socket class.
Definition: socket.h:31
void setWriteTimeout(int timeout)
(low-level socket option) Set write timeout (in milliseconds, -1 for no timeout)
bool getConflate() const
(low-level socket option) Get conflate flag
void setLingerPeriod(int period)
(low-level socket option) Set linger period (in milliseconds, -1 for no timeout)
void setRemoteAddress(std::string addr)
Set the remote address the socket will connect to.
int getWriteHWM() const
(low-level socket option) Get write high-water-mark
void setReadTimeout(int timeout)
(low-level socket option) Set read timeout (in milliseconds, -1 for no timeout)
std::string remote_addr_
The address of the ZeroMQ socket to connect to (will skip name resolution if given) ...
Definition: socket.h:113
void setConflate(bool conflate)
(low-level socket option) Set conflate flag
Base class to add logging functionalities to nodes.
Definition: interface.h:17
int getLingerPeriod() const
(low-level socket option) Get linger period (in milliseconds, -1 for no timeout)
Definition: node.h:17
const bool managed_
True if this socket is managed (init(), cleanup() are called by the owner Node)
Definition: socket.h:110
virtual void cleanup()=0
Perform cleanup (clear subscription, disconnect socket)
int getWriteTimeout() const
(low-level socket option) Get write timeout (in milliseconds, -1 for no timeout)
void setHasHeader(bool has_header)
Set the has_header_ flag which specifies if this socket require a message part with the address (usua...