#include "../options.hpp"
#include "mt_container.hpp"
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/error_condition.hpp>
#include <proton/listen_handler.hpp>
#include <proton/listener.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/sender_options.hpp>
#include <proton/source_options.hpp>
#include <proton/target.hpp>
#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <atomic>
#include <deque>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <thread>
#include "../fake_cpp11.hpp"
class queue {
public:
queue(const std::string& name) : name_(name) {}
std::string name() const { return name_; }
std::lock_guard<std::mutex> g(lock_);
messages_.push_back(m);
if (messages_.size() == 1) {
for (auto cb : callbacks_)
cb(this);
callbacks_.clear();
}
}
std::lock_guard<std::mutex> g(lock_);
if (messages_.empty()) {
callbacks_.push_back(callback);
return false;
} else {
m = std::move(messages_.front());
messages_.pop_front();
return true;
}
}
private:
const std::string name_;
std::mutex lock_;
std::deque<proton::message> messages_;
std::vector<std::function<void(queue*)> > callbacks_;
};
class queues {
public:
queues() : next_id_(0) {}
queue* get(const std::string& name) {
std::lock_guard<std::mutex> g(lock_);
auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
if (!i->second)
i->second.reset(new queue(name));
return i->second.get();
}
queue* dynamic() {
std::ostringstream os;
os << "_dynamic_" << next_id_++;
return get(os.str());
}
private:
typedef std::map<std::string, std::unique_ptr<queue> > queue_map;
std::mutex lock_;
queue_map queues_;
std::atomic<int> next_id_;
};
public:
broker_connection_handler(queues& qs) : queues_(qs) {}
has_messages_callback_ = [this, ts_c](queue* q) mutable {
ts_c->event_loop()->inject(
std::bind(&broker_connection_handler::has_messages, this, q));
};
}
std::cout << "sending from " << q->name() << std::endl;
}
queue* q = sender_queue(s);
if (!do_send(q, s))
blocked_.insert(std::make_pair(q, s));
}
if (qname == "shutdown") {
std::cout << "broker shutting down" << std::endl;
} else {
std::cout << "receiving to " << qname << std::endl;
}
}
queues_.get(qname)->push(m);
}
};
erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
}
auto range = blocked_.equal_range(sender_queue(sender));
auto predicate = [sender](
const proton::sender& s) {
return s == sender; };
erase_sender_if(range.first, range.second, predicate);
}
std::cerr <<
"error: " << e.
what() << std::endl;
}
delete this;
}
private:
typedef std::multimap<queue*, proton::sender> blocked_map;
}
bool popped = q->pop(m, has_messages_callback_);
if (popped)
return popped;
}
void has_messages(queue* q) {
auto range = blocked_.equal_range(q);
for (auto i = range.first; i != range.second;) {
if (i->second.credit() <= 0 || do_send(q, i->second))
i = blocked_.erase(i);
else
++i;
}
}
template <class Predicate>
void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) {
for (auto i = begin; i != end; ) {
if (p(i->second))
i = blocked_.erase(i);
else
++i;
}
}
queues& queues_;
blocked_map blocked_;
std::function<void(queue*)> has_messages_callback_;
};
class broker {
public:
broker(const std::string addr) :
container_(make_mt_container("mt_broker")), listener_(queues_)
{
container_->listen(addr, listener_);
std::cout << "broker listening on " << addr << std::endl;
}
void run() {
std::vector<std::thread> threads(std::thread::hardware_concurrency()-1);
for (auto& t : threads)
container_->run();
for (auto& t : threads)
t.join();
}
private:
listener(queues& qs) : queues_(qs) {}
}
void on_error(const std::string& s) OVERRIDE {
std::cerr << "listen error: " << s << std::endl;
throw std::runtime_error(s);
}
queues& queues_;
};
queues queues_;
std::unique_ptr<proton::container> container_;
listener listener_;
};
int main(int argc, char **argv) {
std::string address("0.0.0.0");
example::options opts(argc, argv);
opts.add_value(address, 'a', "address", "listen on URL", "URL");
try {
opts.parse();
broker(address).run();
return 0;
} catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << "broker shutdown: " << e.what() << std::endl;
}
return 1;
}