#include <boost/shared_array.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/tuple/tuple_comparison.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/hashed_index.hpp>
+#include <boost/multi_index/key_extractors.hpp>
#include <vector>
#include <map>
#include <stdexcept>
#include <string>
#include <sys/time.h>
+using namespace ::boost::multi_index;
+
class FDMultiplexerException : public std::runtime_error
{
public:
{
public:
typedef boost::any funcparam_t;
+ typedef boost::function< void(int, funcparam_t&) > callbackfunc_t;
protected:
- typedef boost::function< void(int, funcparam_t&) > callbackfunc_t;
struct Callback
{
callbackfunc_t d_callback;
- funcparam_t d_parameter;
+ mutable funcparam_t d_parameter;
struct timeval d_ttd;
+ int d_fd;
};
public:
/* timeout is in ms */
virtual int run(struct timeval* tv, int timeout=500) = 0;
+ /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */
+ virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
+
//! Add an fd to the read watch list - currently an fd can only be on one list at a time!
- virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
+ virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
{
- this->addFD(d_readCallbacks, fd, toDo, parameter);
+ this->addFD(d_readCallbacks, fd, toDo, parameter, ttd);
}
//! Add an fd to the write watch list - currently an fd can only be on one list at a time!
- virtual void addWriteFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
+ virtual void addWriteFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
{
- this->addFD(d_writeCallbacks, fd, toDo, parameter);
+ this->addFD(d_writeCallbacks, fd, toDo, parameter, ttd);
}
//! Remove an fd from the read watch list. You can't call this function on an fd that is closed already!
virtual void setReadTTD(int fd, struct timeval tv, int timeout)
{
- if(!d_readCallbacks.count(fd))
+ const auto& it = d_readCallbacks.find(fd);
+ if (it == d_readCallbacks.end()) {
throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
+ }
+
+ auto newEntry = *it;
tv.tv_sec += timeout;
- d_readCallbacks[fd].d_ttd=tv;
+ newEntry.d_ttd = tv;
+ d_readCallbacks.replace(it, newEntry);
}
- virtual funcparam_t& getReadParameter(int fd)
+ virtual void setWriteTTD(int fd, struct timeval tv, int timeout)
{
- if(!d_readCallbacks.count(fd))
- throw FDMultiplexerException("attempt to look up data in multiplexer for unlisted fd "+std::to_string(fd));
- return d_readCallbacks[fd].d_parameter;
+ const auto& it = d_writeCallbacks.find(fd);
+ if (it == d_writeCallbacks.end()) {
+ throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
+ }
+
+ auto newEntry = *it;
+ tv.tv_sec += timeout;
+ newEntry.d_ttd = tv;
+ d_writeCallbacks.replace(it, newEntry);
}
- virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv)
+ virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv, bool writes=false)
{
std::vector<std::pair<int, funcparam_t> > ret;
- for(callbackmap_t::iterator i=d_readCallbacks.begin(); i!=d_readCallbacks.end(); ++i)
- if(i->second.d_ttd.tv_sec && boost::tie(tv.tv_sec, tv.tv_usec) > boost::tie(i->second.d_ttd.tv_sec, i->second.d_ttd.tv_usec))
- ret.push_back(std::make_pair(i->first, i->second.d_parameter));
+ const auto tied = boost::tie(tv.tv_sec, tv.tv_usec);
+ auto& idx = writes ? d_writeCallbacks.get<TTDOrderedTag>() : d_readCallbacks.get<TTDOrderedTag>();
+
+ for (auto it = idx.begin(); it != idx.end(); ++it) {
+ if (it->d_ttd.tv_sec == 0 || tied <= boost::tie(it->d_ttd.tv_sec, it->d_ttd.tv_usec)) {
+ break;
+ }
+ ret.push_back(std::make_pair(it->d_fd, it->d_parameter));
+ }
+
return ret;
}
return theMap;
}
- virtual std::string getName() = 0;
+ virtual std::string getName() const = 0;
+
+ size_t getWatchedFDCount(bool writeFDs) const
+ {
+ return writeFDs ? d_writeCallbacks.size() : d_readCallbacks.size();
+ }
protected:
- typedef std::map<int, Callback> callbackmap_t;
+ struct FDBasedTag {};
+ struct TTDOrderedTag {};
+ struct ttd_compare
+ {
+ /* we want a 0 TTD (no timeout) to come _after_ everything else */
+ bool operator() (const struct timeval& lhs, const struct timeval& rhs) const
+ {
+ /* special treatment if at least one of the TTD is 0,
+ normal comparison otherwise */
+ if (lhs.tv_sec == 0 && rhs.tv_sec == 0) {
+ return false;
+ }
+ if (lhs.tv_sec == 0 && rhs.tv_sec != 0) {
+ return false;
+ }
+ if (lhs.tv_sec != 0 && rhs.tv_sec == 0) {
+ return true;
+ }
+
+ return std::tie(lhs.tv_sec, lhs.tv_usec) < std::tie(rhs.tv_sec, rhs.tv_usec);
+ }
+ };
+
+ typedef multi_index_container<
+ Callback,
+ indexed_by <
+ hashed_unique<tag<FDBasedTag>,
+ member<Callback,int,&Callback::d_fd>
+ >,
+ ordered_non_unique<tag<TTDOrderedTag>,
+ member<Callback,struct timeval,&Callback::d_ttd>,
+ ttd_compare
+ >
+ >
+ > callbackmap_t;
+
callbackmap_t d_readCallbacks, d_writeCallbacks;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)=0;
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0;
virtual void removeFD(callbackmap_t& cbmap, int fd)=0;
bool d_inrun;
callbackmap_t::iterator d_iter;
- void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)
+ void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)
{
Callback cb;
+ cb.d_fd = fd;
cb.d_callback=toDo;
cb.d_parameter=parameter;
memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
-
- if(cbmap.count(fd))
+ if (ttd) {
+ cb.d_ttd = *ttd;
+ }
+
+ auto pair = cbmap.insert(cb);
+ if (!pair.second) {
throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
- cbmap[fd]=cb;
+ }
}
void accountingRemoveFD(callbackmap_t& cbmap, int fd)
{
- if(!cbmap.erase(fd))
+ if(!cbmap.erase(fd)) {
throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
+ }
}
};