class Jabber::Stream

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and #send_with_id methods.

To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.

Constants

CONNECTED
DISCONNECTED

Attributes

fd[R]

file descriptor used

processing[R]

number of stanzas currently being processed

status[R]

connection status

Public Class Methods

new() click to toggle source

Initialize a new stream

# File lib/xmpp4r/stream.rb, line 43
def initialize
  @fd = nil
  @status = DISCONNECTED
  @xmlcbs = CallbackList.new
  @stanzacbs = CallbackList.new
  @messagecbs = CallbackList.new
  @iqcbs = CallbackList.new
  @presencecbs = CallbackList.new
  @send_lock = Mutex.new
  @last_send = Time.now
  @exception_block = nil
  @tbcbmutex = Mutex.new
  @threadblocks = []
  @wakeup_thread = nil
  @streamid = nil
  @streamns = 'jabber:client'
  @features_sem = Semaphore.new
  @parser_thread = nil
  @processing = 0
end

Public Instance Methods

add_iq_callback(priority = 0, ref = nil, &block) click to toggle source

Adds a callback block to process received Iqs

priority
Integer

The callback's priority, the higher, the sooner

ref
String

The callback's reference

&block
Block

The optional block

# File lib/xmpp4r/stream.rb, line 552
def add_iq_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @iqcbs.add(priority, ref, block)
  end
end
add_message_callback(priority = 0, ref = nil, &block) click to toggle source

Adds a callback block to process received Messages

priority
Integer

The callback's priority, the higher, the sooner

ref
String

The callback's reference

&block
Block

The optional block

# File lib/xmpp4r/stream.rb, line 486
def add_message_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @messagecbs.add(priority, ref, block)
  end
end
add_presence_callback(priority = 0, ref = nil, &block) click to toggle source

Adds a callback block to process received Presences

priority
Integer

The callback's priority, the higher, the sooner

ref
String

The callback's reference

&block
Block

The optional block

# File lib/xmpp4r/stream.rb, line 530
def add_presence_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @presencecbs.add(priority, ref, block)
  end
end
add_stanza_callback(priority = 0, ref = nil, &block) click to toggle source

Adds a callback block to process received Stanzas

priority
Integer

The callback's priority, the higher, the sooner

ref
String

The callback's reference

&block
Block

The optional block

# File lib/xmpp4r/stream.rb, line 508
def add_stanza_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @stanzacbs.add(priority, ref, block)
  end
end
add_xml_callback(priority = 0, ref = nil, &block) click to toggle source

Adds a callback block to process received XML messages, these will be handled before any blocks given to #send or other callbacks.

priority
Integer

The callback's priority, the higher, the sooner

ref
String

The callback's reference

&block
Block

The optional block

# File lib/xmpp4r/stream.rb, line 464
def add_xml_callback(priority = 0, ref = nil, &block)
  @tbcbmutex.synchronize do
    @xmlcbs.add(priority, ref, block)
  end
end
close() click to toggle source

Closes the connection to the Jabber service

# File lib/xmpp4r/stream.rb, line 570
def close
  close!
end
close!() click to toggle source
# File lib/xmpp4r/stream.rb, line 574
def close!
  pr = 1
  n = 0
  # In some cases, we might lost count of some stanzas
  # (for example, if the handler raises an exception)
  # so we can't block forever.
  while pr > 0 and n <= 1000
    @tbcbmutex.synchronize { pr = @processing }
    if pr > 0
      n += 1
      Jabber::debuglog("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS")
      #puts("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS")
      Thread::pass
    end
  end

  # Order Matters here! If this method is called from within 
  # @parser_thread then killing @parser_thread first would 
  # mean the other parts of the method fail to execute. 
  # That would be bad. So kill parser_thread last
  @fd.close if @fd and !@fd.closed?
  @status = DISCONNECTED
  @parser_thread.kill if @parser_thread
end
delete_iq_callback(ref) click to toggle source

Delete an Iq callback

ref
String

The reference of the callback to delete

# File lib/xmpp4r/stream.rb, line 563
def delete_iq_callback(ref)
  @tbcbmutex.synchronize do
    @iqcbs.delete(ref)
  end
end
delete_message_callback(ref) click to toggle source

Delete an Message callback

ref
String

The reference of the callback to delete

# File lib/xmpp4r/stream.rb, line 496
def delete_message_callback(ref)
  @tbcbmutex.synchronize do
    @messagecbs.delete(ref)
  end
end
delete_presence_callback(ref) click to toggle source

Delete a Presence callback

ref
String

The reference of the callback to delete

# File lib/xmpp4r/stream.rb, line 540
def delete_presence_callback(ref)
  @tbcbmutex.synchronize do
    @presencecbs.delete(ref)
  end
end
delete_stanza_callback(ref) click to toggle source

Delete a Stanza callback

ref
String

The reference of the callback to delete

# File lib/xmpp4r/stream.rb, line 518
def delete_stanza_callback(ref)
  @tbcbmutex.synchronize do
    @stanzacbs.delete(ref)
  end
end
delete_xml_callback(ref) click to toggle source

Delete an XML-messages callback

ref
String

The reference of the callback to delete

# File lib/xmpp4r/stream.rb, line 474
def delete_xml_callback(ref)
  @tbcbmutex.synchronize do
    @xmlcbs.delete(ref)
  end
end
iq_callbacks() click to toggle source

Get the list of iq callbacks.

# File lib/xmpp4r/stream.rb, line 289
def iq_callbacks
  @iqcbs
end
is_connected?() click to toggle source

Returns if this connection is connected to a Jabber service

return
Boolean

Connection status

# File lib/xmpp4r/stream.rb, line 160
def is_connected?
  return @status == CONNECTED
end
is_disconnected?() click to toggle source

Returns if this connection is NOT connected to a Jabber service

return
Boolean

Connection status

# File lib/xmpp4r/stream.rb, line 168
def is_disconnected?
  return @status == DISCONNECTED
end
message_callbacks() click to toggle source

Get the list of message callbacks.

# File lib/xmpp4r/stream.rb, line 295
def message_callbacks
  @messagecbs
end
on_exception(&block) click to toggle source

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception

  • the Jabber::Stream object (self)

  • a symbol where it happened, namely :start, :parser, :sending and :end

# File lib/xmpp4r/stream.rb, line 117
def on_exception(&block)
  @exception_block = block
end
parse_failure(e) click to toggle source

This method is called by the parser when a failure occurs

# File lib/xmpp4r/stream.rb, line 123
def parse_failure(e)
  Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

  # A new thread has to be created because close will cause the thread
  # to commit suicide(???)
  if @exception_block
    # New thread, because close will kill the current thread
    Thread.new do
      Thread.current.abort_on_exception = true
      close
      @exception_block.call(e, self, :parser)
    end
  else
    Jabber::warnlog "Stream#parse_failure was called by XML parser. Dumping " +
      "backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}"
    close
    raise
  end
end
parser_end() click to toggle source

This method is called by the parser upon receiving </stream:stream>

# File lib/xmpp4r/stream.rb, line 145
def parser_end
  if @exception_block
    Thread.new do
      Thread.current.abort_on_exception = true
      close
      @exception_block.call(nil, self, :close)
    end
  else
    close
  end
end
presence_callbacks() click to toggle source

Get the list of presence callbacks.

# File lib/xmpp4r/stream.rb, line 301
def presence_callbacks
  @presencecbs
end
receive(element) click to toggle source

Processes a received REXML::Element and executes registered thread blocks and filters against it.

element
REXML::Element

The received element

# File lib/xmpp4r/stream.rb, line 177
def receive(element)
  @tbcbmutex.synchronize { @processing += 1 }
  Jabber::debuglog("RECEIVED:\n#{element.to_s}")

  if element.namespace('').to_s == '' # REXML namespaces are always strings
    element.add_namespace(@streamns)
  end

  case element.prefix
  when 'stream'
    case element.name
      when 'stream'
        stanza = element
        @streamid = element.attributes['id']
        @streamns = element.namespace('') if element.namespace('')

        # Hack: component streams are basically client streams.
        # Someday we may want to create special stanza classes
        # for components/s2s deriving from normal stanzas but
        # posessing these namespaces
        @streamns = 'jabber:client' if @streamns == 'jabber:component:accept'

        unless element.attributes['version']  # isn't XMPP compliant, so
          Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
          @features_sem.run                   # don't wait for <stream:features/>
        end
      when 'features'
        stanza = element
        element.each { |e|
          if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
            e.each_element('mechanism') { |mech|
              @stream_mechanisms.push(mech.text)
            }
          else
            @stream_features[e.name] = e.namespace
          end
        }
        Jabber::debuglog("FEATURES: received")
        @features_sem.run
      else
        stanza = element
    end
  else
    # Any stanza, classes are registered by XMPPElement::name_xmlns
    begin
      stanza = XMPPStanza::import(element)
    rescue NoNameXmlnsRegistered
      stanza = element
    end
  end

  if @xmlcbs.process(stanza)
    @tbcbmutex.synchronize { @processing -= 1 }
    return true
  end

  # Iterate through blocked threads (= waiting for an answer)
  #
  # We're dup'ping the @threadblocks here, so that we won't end up in an
  # endless loop if Stream#send is being nested. That means, the nested
  # threadblock won't receive the stanza currently processed, but the next
  # one.
  threadblocks = nil
  @tbcbmutex.synchronize do
    threadblocks = @threadblocks.dup
  end
  threadblocks.each { |threadblock|
    exception = nil
    r = false
    begin
      r = threadblock.call(stanza)
    rescue Exception => e
      exception = e
    end

    if r == true
      @tbcbmutex.synchronize do
        @threadblocks.delete(threadblock)
      end
      threadblock.wakeup
      @tbcbmutex.synchronize { @processing -= 1 }
      return true
    elsif exception
      @tbcbmutex.synchronize do
        @threadblocks.delete(threadblock)
      end
      threadblock.raise(exception)
    end
  }

  Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
  Jabber::debuglog("TRYING stanzacbs...")
  if @stanzacbs.process(stanza)
      @tbcbmutex.synchronize { @processing -= 1 }
      return true
  end
  r = false
  Jabber::debuglog("TRYING message/iq/presence/cbs...")
  case stanza
  when Message
    r = @messagecbs.process(stanza)
  when Iq
    r = @iqcbs.process(stanza)
  when Presence
    r = @presencecbs.process(stanza)
  end
  @tbcbmutex.synchronize { @processing -= 1 }
  return r
end
send(xml, &block) click to toggle source

Sends XML data to the socket and (optionally) waits to process received data.

Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).

xml
String

The xml data to send

&block
Block

The optional block

# File lib/xmpp4r/stream.rb, line 361
def send(xml, &block)
  Jabber::debuglog("SENDING:\n#{xml}")
  if block
    threadblock = ThreadBlock.new(block)
    @tbcbmutex.synchronize do
      @threadblocks.unshift(threadblock)
    end
  end
  begin
    # Temporarily remove stanza's namespace to
    # reduce bandwidth consumption
    if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client' and
        xml.prefix != 'stream' and xml.name != 'stream'
      xml.delete_namespace
      send_data(xml.to_s)
      xml.add_namespace(@streamns)
    else
      send_data(xml.to_s)
    end
  rescue Exception => e
    Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

    if @exception_block
      Thread.new do
        Thread.current.abort_on_exception = true
        close!
        @exception_block.call(e, self, :sending)
      end
    else
      Jabber::warnlog "Exception caught while sending! (#{e.class})\n#{e.backtrace.join("\n")}"
      close!
      raise
    end
  end
  # The parser thread might be running this (think of a callback running send())
  # If this is the case, we mustn't stop (or we would cause a deadlock)
  if block and Thread.current != @parser_thread
    threadblock.wait
  elsif block
    Jabber::warnlog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!")
  end
end
send_data(data) click to toggle source
# File lib/xmpp4r/stream.rb, line 343
def send_data(data)
  @send_lock.synchronize do
    @last_send = Time.now
    @fd << data
    @fd.flush
  end
end
send_with_id(xml) { |received| ... } click to toggle source

Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned. This is a direct result of unique request/response stanza identification via the id attribute.

The block may be omitted. Then, the result will be the response stanza.

Be aware that if a stanza with type='error' is received the function does not yield but raises an ServerError with the corresponding error element.

Please see #send for some implementational details.

Please read the note about nesting at #send

xml
XMPPStanza
# File lib/xmpp4r/stream.rb, line 425
def send_with_id(xml, &block)
  if xml.id.nil?
    xml.id = Jabber::IdGenerator.instance.generate_id
  end

  res = nil
  error = nil
  send(xml) do |received|
    if received.kind_of? XMPPStanza and received.id == xml.id
      if received.type == :error
        error = (received.error ? received.error : ErrorResponse.new)
        true
      elsif block_given?
        res = yield(received)
        true
      else
        res = received
        true
      end
    else
      false
    end
  end

  unless error.nil?
    raise ServerError.new(error)
  end

  res
end
stanza_callbacks() click to toggle source

Get the list of stanza callbacks.

# File lib/xmpp4r/stream.rb, line 307
def stanza_callbacks
  @stanzacbs
end
start(fd) click to toggle source

Start the XML parser on the fd

# File lib/xmpp4r/stream.rb, line 66
def start(fd)
  @stream_mechanisms = []
  @stream_features = {}

  @fd = fd
  @parser = StreamParser.new(@fd, self)
  @parser_thread = Thread.new do
    Thread.current.abort_on_exception = true
    begin
      @parser.parse
      Jabber::debuglog("DISCONNECTED\n")

      if @exception_block
        Thread.new { close!; @exception_block.call(nil, self, :disconnected) }
      else
        close!
      end
    rescue Exception => e
      Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

      if @exception_block
        Thread.new do
          Thread.current.abort_on_exception = true
          close
          @exception_block.call(e, self, :start)
        end
      else
        Jabber::warnlog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}"
        close!
        raise
      end
    end
  end

  @status = CONNECTED
end
stop() click to toggle source
# File lib/xmpp4r/stream.rb, line 103
def stop
  @parser_thread.kill
  @parser = nil
end
xml_callbacks() click to toggle source

Get the list of xml callbacks.

# File lib/xmpp4r/stream.rb, line 313
def xml_callbacks
  @xmlcbs
end