The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and #send_with_id methods.
To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.
file descriptor used
number of stanzas currently being processed
connection status
Initialize a new stream
# File lib/xmpp4r/stream.rb, line 43 def initialize @fd = nil @status = DISCONNECTED @xmlcbs = CallbackList.new @stanzacbs = CallbackList.new @messagecbs = CallbackList.new @iqcbs = CallbackList.new @presencecbs = CallbackList.new @send_lock = Mutex.new @last_send = Time.now @exception_block = nil @tbcbmutex = Mutex.new @threadblocks = [] @wakeup_thread = nil @streamid = nil @streamns = 'jabber:client' @features_sem = Semaphore.new @parser_thread = nil @processing = 0 end
Adds a callback block to process received Iqs
The callback's priority, the higher, the sooner
The callback's reference
The optional block
# File lib/xmpp4r/stream.rb, line 552 def add_iq_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @iqcbs.add(priority, ref, block) end end
Adds a callback block to process received Messages
The callback's priority, the higher, the sooner
The callback's reference
The optional block
# File lib/xmpp4r/stream.rb, line 486 def add_message_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @messagecbs.add(priority, ref, block) end end
Adds a callback block to process received Presences
The callback's priority, the higher, the sooner
The callback's reference
The optional block
# File lib/xmpp4r/stream.rb, line 530 def add_presence_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @presencecbs.add(priority, ref, block) end end
Adds a callback block to process received Stanzas
The callback's priority, the higher, the sooner
The callback's reference
The optional block
# File lib/xmpp4r/stream.rb, line 508 def add_stanza_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @stanzacbs.add(priority, ref, block) end end
Adds a callback block to process received XML messages, these will be handled before any blocks given to #send or other callbacks.
The callback's priority, the higher, the sooner
The callback's reference
The optional block
# File lib/xmpp4r/stream.rb, line 464 def add_xml_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @xmlcbs.add(priority, ref, block) end end
Closes the connection to the Jabber service
# File lib/xmpp4r/stream.rb, line 570 def close close! end
# File lib/xmpp4r/stream.rb, line 574 def close! pr = 1 n = 0 # In some cases, we might lost count of some stanzas # (for example, if the handler raises an exception) # so we can't block forever. while pr > 0 and n <= 1000 @tbcbmutex.synchronize { pr = @processing } if pr > 0 n += 1 Jabber::debuglog("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS") #puts("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS") Thread::pass end end # Order Matters here! If this method is called from within # @parser_thread then killing @parser_thread first would # mean the other parts of the method fail to execute. # That would be bad. So kill parser_thread last @fd.close if @fd and !@fd.closed? @status = DISCONNECTED @parser_thread.kill if @parser_thread end
Delete an Iq callback
The reference of the callback to delete
# File lib/xmpp4r/stream.rb, line 563 def delete_iq_callback(ref) @tbcbmutex.synchronize do @iqcbs.delete(ref) end end
Delete an Message callback
The reference of the callback to delete
# File lib/xmpp4r/stream.rb, line 496 def delete_message_callback(ref) @tbcbmutex.synchronize do @messagecbs.delete(ref) end end
Delete a Presence callback
The reference of the callback to delete
# File lib/xmpp4r/stream.rb, line 540 def delete_presence_callback(ref) @tbcbmutex.synchronize do @presencecbs.delete(ref) end end
Delete a Stanza callback
The reference of the callback to delete
# File lib/xmpp4r/stream.rb, line 518 def delete_stanza_callback(ref) @tbcbmutex.synchronize do @stanzacbs.delete(ref) end end
Delete an XML-messages callback
The reference of the callback to delete
# File lib/xmpp4r/stream.rb, line 474 def delete_xml_callback(ref) @tbcbmutex.synchronize do @xmlcbs.delete(ref) end end
Get the list of iq callbacks.
# File lib/xmpp4r/stream.rb, line 289 def iq_callbacks @iqcbs end
Returns if this connection is connected to a Jabber service
Connection status
# File lib/xmpp4r/stream.rb, line 160 def is_connected? return @status == CONNECTED end
Returns if this connection is NOT connected to a Jabber service
Connection status
# File lib/xmpp4r/stream.rb, line 168 def is_disconnected? return @status == DISCONNECTED end
Get the list of message callbacks.
# File lib/xmpp4r/stream.rb, line 295 def message_callbacks @messagecbs end
Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.
The block has to take three arguments:
the Exception
the Jabber::Stream object (self)
a symbol where it happened, namely :start, :parser, :sending and :end
# File lib/xmpp4r/stream.rb, line 117 def on_exception(&block) @exception_block = block end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 123 def parse_failure(e) Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") # A new thread has to be created because close will cause the thread # to commit suicide(???) if @exception_block # New thread, because close will kill the current thread Thread.new do Thread.current.abort_on_exception = true close @exception_block.call(e, self, :parser) end else Jabber::warnlog "Stream#parse_failure was called by XML parser. Dumping " + "backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}" close raise end end
This method is called by the parser upon receiving
</stream:stream>
# File lib/xmpp4r/stream.rb, line 145 def parser_end if @exception_block Thread.new do Thread.current.abort_on_exception = true close @exception_block.call(nil, self, :close) end else close end end
Get the list of presence callbacks.
# File lib/xmpp4r/stream.rb, line 301 def presence_callbacks @presencecbs end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
The received element
# File lib/xmpp4r/stream.rb, line 177 def receive(element) @tbcbmutex.synchronize { @processing += 1 } Jabber::debuglog("RECEIVED:\n#{element.to_s}") if element.namespace('').to_s == '' # REXML namespaces are always strings element.add_namespace(@streamns) end case element.prefix when 'stream' case element.name when 'stream' stanza = element @streamid = element.attributes['id'] @streamns = element.namespace('') if element.namespace('') # Hack: component streams are basically client streams. # Someday we may want to create special stanza classes # for components/s2s deriving from normal stanzas but # posessing these namespaces @streamns = 'jabber:client' if @streamns == 'jabber:component:accept' unless element.attributes['version'] # isn't XMPP compliant, so Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features") @features_sem.run # don't wait for <stream:features/> end when 'features' stanza = element element.each { |e| if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl' e.each_element('mechanism') { |mech| @stream_mechanisms.push(mech.text) } else @stream_features[e.name] = e.namespace end } Jabber::debuglog("FEATURES: received") @features_sem.run else stanza = element end else # Any stanza, classes are registered by XMPPElement::name_xmlns begin stanza = XMPPStanza::import(element) rescue NoNameXmlnsRegistered stanza = element end end if @xmlcbs.process(stanza) @tbcbmutex.synchronize { @processing -= 1 } return true end # Iterate through blocked threads (= waiting for an answer) # # We're dup'ping the @threadblocks here, so that we won't end up in an # endless loop if Stream#send is being nested. That means, the nested # threadblock won't receive the stanza currently processed, but the next # one. threadblocks = nil @tbcbmutex.synchronize do threadblocks = @threadblocks.dup end threadblocks.each { |threadblock| exception = nil r = false begin r = threadblock.call(stanza) rescue Exception => e exception = e end if r == true @tbcbmutex.synchronize do @threadblocks.delete(threadblock) end threadblock.wakeup @tbcbmutex.synchronize { @processing -= 1 } return true elsif exception @tbcbmutex.synchronize do @threadblocks.delete(threadblock) end threadblock.raise(exception) end } Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})") Jabber::debuglog("TRYING stanzacbs...") if @stanzacbs.process(stanza) @tbcbmutex.synchronize { @processing -= 1 } return true end r = false Jabber::debuglog("TRYING message/iq/presence/cbs...") case stanza when Message r = @messagecbs.process(stanza) when Iq r = @iqcbs.process(stanza) when Presence r = @presencecbs.process(stanza) end @tbcbmutex.synchronize { @processing -= 1 } return r end
Sends XML data to the socket and (optionally) waits to process received data.
Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).
The xml data to send
The optional block
# File lib/xmpp4r/stream.rb, line 361 def send(xml, &block) Jabber::debuglog("SENDING:\n#{xml}") if block threadblock = ThreadBlock.new(block) @tbcbmutex.synchronize do @threadblocks.unshift(threadblock) end end begin # Temporarily remove stanza's namespace to # reduce bandwidth consumption if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client' and xml.prefix != 'stream' and xml.name != 'stream' xml.delete_namespace send_data(xml.to_s) xml.add_namespace(@streamns) else send_data(xml.to_s) end rescue Exception => e Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") if @exception_block Thread.new do Thread.current.abort_on_exception = true close! @exception_block.call(e, self, :sending) end else Jabber::warnlog "Exception caught while sending! (#{e.class})\n#{e.backtrace.join("\n")}" close! raise end end # The parser thread might be running this (think of a callback running send()) # If this is the case, we mustn't stop (or we would cause a deadlock) if block and Thread.current != @parser_thread threadblock.wait elsif block Jabber::warnlog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!") end end
# File lib/xmpp4r/stream.rb, line 343 def send_data(data) @send_lock.synchronize do @last_send = Time.now @fd << data @fd.flush end end
Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned. This is a direct result of unique request/response stanza identification via the id attribute.
The block may be omitted. Then, the result will be the response stanza.
Be aware that if a stanza with type='error'
is
received the function does not yield but raises an ServerError with the corresponding error
element.
Please see #send for some implementational details.
Please read the note about nesting at #send
# File lib/xmpp4r/stream.rb, line 425 def send_with_id(xml, &block) if xml.id.nil? xml.id = Jabber::IdGenerator.instance.generate_id end res = nil error = nil send(xml) do |received| if received.kind_of? XMPPStanza and received.id == xml.id if received.type == :error error = (received.error ? received.error : ErrorResponse.new) true elsif block_given? res = yield(received) true else res = received true end else false end end unless error.nil? raise ServerError.new(error) end res end
Get the list of stanza callbacks.
# File lib/xmpp4r/stream.rb, line 307 def stanza_callbacks @stanzacbs end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 66 def start(fd) @stream_mechanisms = [] @stream_features = {} @fd = fd @parser = StreamParser.new(@fd, self) @parser_thread = Thread.new do Thread.current.abort_on_exception = true begin @parser.parse Jabber::debuglog("DISCONNECTED\n") if @exception_block Thread.new { close!; @exception_block.call(nil, self, :disconnected) } else close! end rescue Exception => e Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") if @exception_block Thread.new do Thread.current.abort_on_exception = true close @exception_block.call(e, self, :start) end else Jabber::warnlog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}" close! raise end end end @status = CONNECTED end
# File lib/xmpp4r/stream.rb, line 103 def stop @parser_thread.kill @parser = nil end
Get the list of xml callbacks.
# File lib/xmpp4r/stream.rb, line 313 def xml_callbacks @xmlcbs end