Common logic for a simple "mini broker" that creates creates queues automatically when a client tries to send or subscribe. This file contains the queue
class that queues messages and the broker_handler
class that manages queues and links and transfers messages to/from clients.
#ifndef BROKER_HPP
#define BROKER_HPP
#include <proton/connection.hpp>
#include <proton/delivery.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/message.hpp>
#include <proton/sasl.hpp>
#include <proton/sender.hpp>
#include <proton/tracker.hpp>
#include <proton/transport.hpp>
#include <proton/sender_options.hpp>
#include <proton/receiver_options.hpp>
#include <proton/source_options.hpp>
#include <proton/target_options.hpp>
#include <iostream>
#include <deque>
#include <map>
#include <list>
#include <sstream>
class queue {
public:
queue(const std::string &name, bool dynamic = false) : name_(name), dynamic_(dynamic) {}
std::string name() const { return name_; }
consumers_.push_back(s);
}
consumers_.remove(s);
return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0));
}
messages_.push_back(m);
dispatch(0);
}
while (deliver_to(s)) {}
}
int count = s ? 1 : consumers_.size();
if (!count) return false;
bool result = false;
sender_list::iterator it = consumers_.begin();
if (!s && count) {
s = &*it;
}
while (messages_.size()) {
messages_.pop_front();
result = true;
}
if (--count) {
it++;
} else {
return result;
}
}
return false;
}
private:
typedef std::deque<proton::message> message_queue;
typedef std::list<proton::sender> sender_list;
std::string name_;
bool dynamic_;
message_queue messages_;
sender_list consumers_;
};
class queues {
public:
queues() : next_id_(0) {}
virtual ~queues() {}
virtual queue &get(const std::string &address = std::string()) {
if (address.empty()) {
throw std::runtime_error("empty queue name");
}
queue*& q = queues_[address];
if (!q) q = new queue(address);
return *q;
}
virtual queue &dynamic() {
std::ostringstream os;
os << "q" << next_id_++;
queue *q = queues_[os.str()] = new queue(os.str(), true);
return *q;
}
virtual void erase(std::string &name) {
delete queues_[name];
queues_.erase(name);
}
protected:
typedef std::map<std::string, queue *> queue_map;
queue_map queues_;
int next_id_;
};
#include <proton/config.hpp>
public:
broker_handler(queues& qs) : queues_(qs) {}
std::cout <<
"Connection from user: " << t.
sasl().
user() <<
" (mechanism: " << t.
sasl().
mech() <<
")" << std::endl;
}
queues_.dynamic() : queues_.get(src.address());
q.subscribe(sender);
std::cout << "broker outgoing link from " << q.name() << std::endl;
}
if (!address.empty()) {
std::cout << "broker incoming link to " << address << std::endl;
}
}
if (queues_.get(address).unsubscribe(lnk)) {
queues_.erase(address);
}
}
unsubscribe(sender);
}
remove_stale_consumers(c);
}
}
std::cout <<
"broker client disconnect: " << t.
error().
what() << std::endl;
}
std::cerr <<
"broker error: " << c.
what() << std::endl;
}
proton::sender_range sr = connection.
senders();
for (proton::sender_iterator i = sr.begin(); i != sr.end(); ++i) {
if (i->active())
unsubscribe(*i);
}
}
queues_.get(address).dispatch(&s);
}
queues_.get(address).publish(m);
}
protected:
queues& queues_;
};
#endif // BROKER_HPP