Fawkes API  Fawkes Development Version
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
server_thread.cpp
1 
2 /***************************************************************************
3  * server_thread.cpp - Fawkes Network Protocol (server part)
4  *
5  * Created: Sun Nov 19 15:08:30 2006
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <netcomm/fawkes/server_thread.h>
25 #include <netcomm/fawkes/server_client_thread.h>
26 #include <netcomm/utils/acceptor_thread.h>
27 #include <netcomm/fawkes/message.h>
28 #include <netcomm/fawkes/handler.h>
29 #include <netcomm/fawkes/message_queue.h>
30 #include <netcomm/fawkes/message_content.h>
31 #include <core/threading/thread_collector.h>
32 #include <core/threading/mutex.h>
33 #include <core/exception.h>
34 
35 #include <unistd.h>
36 
37 namespace fawkes {
38 
39 /** @class FawkesNetworkServerThread <netcomm/fawkes/server_thread.h>
40  * Fawkes Network Thread.
41  * Maintains a list of clients and reacts on events triggered by the clients.
42  * Also runs the acceptor thread.
43  *
44  * @ingroup NetComm
45  * @author Tim Niemueller
46  */
47 
48 /** Constructor.
49  * @param thread_collector thread collector to register new threads with
50  * @param fawkes_port port for Fawkes network protocol
51  */
53  ThreadCollector *thread_collector)
54  : Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP)
55 {
56  this->thread_collector = thread_collector;
57  clients.clear();
58  next_client_id = 1;
59  inbound_messages = new FawkesNetworkMessageQueue();
60 
61  acceptor_thread = new NetworkAcceptorThread(this, fawkes_port,
62  "FawkesNetworkAcceptorThread");
63  if ( thread_collector ) {
64  thread_collector->add(acceptor_thread);
65  } else {
66  acceptor_thread->start();
67  }
68 }
69 
70 
71 /** Destructor. */
73 {
74  for (cit = clients.begin(); cit != clients.end(); ++cit) {
75  if ( thread_collector ) {
76  thread_collector->remove((*cit).second);
77  } else {
78  (*cit).second->cancel();
79  (*cit).second->join();
80  }
81  delete (*cit).second;
82  }
83  if ( thread_collector ) {
84  thread_collector->remove(acceptor_thread);
85  } else {
86  acceptor_thread->cancel();
87  acceptor_thread->join();
88  }
89  delete acceptor_thread;
90 
91  delete inbound_messages;
92 }
93 
94 
95 /** Add a new connection.
96  * Called by the NetworkAcceptorThread if a new client connected.
97  * @param s socket for new client
98  */
99 void
101 {
103 
104  clients.lock();
105  client->set_clid(next_client_id);
106  if ( thread_collector ) {
107  thread_collector->add(client);
108  } else {
109  client->start();
110  }
111  clients[next_client_id] = client;
112  for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
113  (*hit).second->client_connected(next_client_id);
114  }
115  ++next_client_id;
116  clients.unlock();
117 
118  wakeup();
119 }
120 
121 
122 /** Add a handler.
123  * @param handler to add.
124  */
125 void
127 {
128  handlers.lock();
129  if ( handlers.find(handler->id()) != handlers.end()) {
130  handlers.unlock();
131  throw Exception("Handler already registered");
132  }
133  handlers[handler->id()] = handler;
134  handlers.unlock();
135 }
136 
137 
138 /** Remove handler.
139  * @param handler handler to remove
140  */
141 void
143 {
144  handlers.lock();
145  if( handlers.find(handler->id()) != handlers.end() ) {
146  handlers.erase(handler->id());
147  }
148  handlers.unlock();
149 }
150 
151 
152 /** Fawkes network thread loop.
153  * The thread loop will check all clients for their alivness and dead
154  * clients are removed. Then inbound messages are processed and dispatched
155  * properly to registered handlers. Then the thread waits for a new event
156  * to happen (event emitting threads need to wakeup this thread!).
157  */
158 void
160 {
161  clients.lock();
162 
163  // check for dead clients
164  cit = clients.begin();
165  while (cit != clients.end()) {
166  if ( ! cit->second->alive() ) {
167  if ( thread_collector ) {
168  thread_collector->remove((*cit).second);
169  } else {
170  cit->second->cancel();
171  cit->second->join();
172  }
173  usleep(5000);
174  delete cit->second;
175  unsigned int clid = (*cit).first;
176  ++cit;
177  clients.erase(clid);
178  for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
179  (*hit).second->client_disconnected(clid);
180  }
181  } else {
182  ++cit;
183  }
184  }
185 
186  // dispatch messages
187  inbound_messages->lock();
188  while ( ! inbound_messages->empty() ) {
189  FawkesNetworkMessage *m = inbound_messages->front();
190  if ( handlers.find(m->cid()) != handlers.end()) {
191  handlers[m->cid()]->handle_network_message(m);
192  }
193  m->unref();
194  inbound_messages->pop();
195  }
196  inbound_messages->unlock();
197 
198  clients.unlock();
199 }
200 
201 
202 /** Force sending of all pending messages. */
203 void
205 {
206  clients.lock();
207  for (cit = clients.begin(); cit != clients.end(); ++cit) {
208  (*cit).second->force_send();
209  }
210  clients.unlock();
211 }
212 
213 
214 /** Broadcast a message.
215  * Method to broadcast a message to all connected clients. This method will take
216  * ownership of the passed message. If you want to use if after enqueing it you
217  * must reference it explicitly before calling this method.
218  * @param msg Message to broadcast
219  */
220 void
222 {
223  for (cit = clients.begin(); cit != clients.end(); ++cit) {
224  if ( (*cit).second->alive() ) {
225  msg->ref();
226  (*cit).second->enqueue(msg);
227  }
228  }
229  msg->unref();
230 }
231 
232 
233 /** Broadcast a message.
234  * A FawkesNetworkMessage is created and broacasted via the emitter.
235  * @param component_id component ID
236  * @param msg_id message type id
237  * @param payload payload buffer
238  * @param payload_size size of payload buffer
239  * @see FawkesNetworkEmitter::broadcast()
240  */
241 void
242 FawkesNetworkServerThread::broadcast(unsigned short int component_id,
243  unsigned short int msg_id,
244  void *payload, unsigned int payload_size)
245 {
246  FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id,
247  payload, payload_size);
248  broadcast(m);
249 }
250 
251 
252 /** Broadcast message without payload.
253  * @param component_id component ID
254  * @param msg_id message type ID
255  */
256 void
257 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id)
258 {
259  FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id);
260  broadcast(m);
261 }
262 
263 
264 /** Send a message.
265  * Method to send a message to a specific client.
266  * The client ID provided in the message is used to determine the correct
267  * recipient. If no client is connected for the given client ID the message
268  * shall be silently ignored.
269  * This method will take ownership of the passed message. If you want to use
270  * if after enqueing it you must reference it explicitly before calling this
271  * method.
272  * Implemented Emitter interface message.
273  * @param msg Message to send
274  */
275 void
277 {
278  unsigned int clid = msg->clid();
279  if ( clients.find(clid) != clients.end() ) {
280  if ( clients[clid]->alive() ) {
281  clients[clid]->enqueue(msg);
282  } else {
283  throw Exception("Client %u not alive", clid);
284  }
285  } else {
286  throw Exception("Client %u not found", clid);
287  }
288 }
289 
290 
291 /** Send a message.
292  * A FawkesNetworkMessage is created and sent via the emitter.
293  * @param to_clid client ID of recipient
294  * @param component_id component ID
295  * @param msg_id message type id
296  * @param payload payload buffer
297  * @param payload_size size of payload buffer
298  * @see FawkesNetworkEmitter::broadcast()
299  */
300 void
301 FawkesNetworkServerThread::send(unsigned int to_clid,
302  unsigned short int component_id, unsigned short int msg_id,
303  void *payload, unsigned int payload_size)
304 {
305  FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
306  payload, payload_size);
307  send(m);
308 }
309 
310 
311 /** Send a message.
312  * A FawkesNetworkMessage is created and sent via the emitter.
313  * @param to_clid client ID of recipient
314  * @param component_id component ID
315  * @param msg_id message type id
316  * @param content Fawkes complex network message content
317  * @see FawkesNetworkEmitter::broadcast()
318  */
319 void
320 FawkesNetworkServerThread::send(unsigned int to_clid,
321  unsigned short int component_id, unsigned short int msg_id,
323 {
324  FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
325  content);
326  send(m);
327 }
328 
329 
330 /** Send a message without payload.
331  * A FawkesNetworkMessage with empty payload is created and sent via the emitter.
332  * This is particularly useful for simple status messages that you want to send.
333  * @param to_clid client ID of recipient
334  * @param component_id component ID
335  * @param msg_id message type id
336  * @see FawkesNetworkEmitter::broadcast()
337  */
338 void
339 FawkesNetworkServerThread::send(unsigned int to_clid,
340  unsigned short int component_id, unsigned short int msg_id)
341 {
342  FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id);
343  send(m);
344 }
345 
346 
347 /** Dispatch messages.
348  * Actually messages are just put into the inbound message queue and dispatched
349  * during the next loop iteration. So after adding all the messages you have
350  * to wakeup the thread to get them actually dispatched.
351  * @param msg message to dispatch
352  */
353 void
355 {
356  msg->ref();
357  inbound_messages->push_locked(msg);
358 }
359 
360 } // end namespace fawkes