A Messenger
provides a high-level means for sending and
receiving AMQP messages.
Creates a new Messenger
.
The name
parameter is optional. If one is not provided then a
unique name is generated.
name - the name (def. nil)
# File lib/qpid_proton/messenger.rb, line 50 def initialize(name = nil) @impl = Cproton.pn_messenger(name) ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) end
Accepts the incoming message identified by the tracker.
tracker - the tracker
flag - the flag
# File lib/qpid_proton/messenger.rb, line 294 def accept(tracker, flag) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) raise TypeError.new("invalid flag: #{flag}") unless Qpid::Proton::Tracker.valid_flag?(flag) check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag)) end
Returns the current acceptance mode for the Messenger.
# File lib/qpid_proton/messenger.rb, line 283 def accept_mode Cproton.pn_messenger_get_accept_mode(@impl) end
Set the accept mode for the Messenger. See #ACCEPT_MODE_AUTO and #ACCEPT_MODE_MANUAL for more details
mode - the acceptance mode
@messenger.accept_mode = Qpid::Proton::Messenger::ACCEPT_MODE_AUTO
# File lib/qpid_proton/messenger.rb, line 276 def accept_mode=(mode) raise TypeError.new("Invalid mode: #{mode}") unless valid_mode?(mode) Cproton.pn_messenger_set_accept_mode(@impl, mode) end
Returns the path to a certificate file.
# File lib/qpid_proton/messenger.rb, line 143 def certificate Cproton.pn_messenger_get_certificate(@impl) end
Path to a certificate file for the Messenger
.
This certificate is used when the Messenger
accepts or
establishes SSL/TLS connections.
certificate - the certificate
# File lib/qpid_proton/messenger.rb, line 137 def certificate=(certificate) Cproton.pn_messenger_set_certificate(@impl, certificate) end
Returns the most recent error number.
# File lib/qpid_proton/messenger.rb, line 95 def errno Cproton.pn_messenger_errno(@impl) end
Returns the most recent error message.
# File lib/qpid_proton/messenger.rb, line 101 def error Cproton.pn_messenger_error(@impl) end
Reports whether an error occurred.
# File lib/qpid_proton/messenger.rb, line 89 def error? !Cproton.pn_messenger_errno(@impl).zero? end
Gets a single message incoming message from the local queue.
If no message is provided in the argument, then one is created. In either case, the one returned will be the fetched message.
msg - the (optional) Message
instance to be used
# File lib/qpid_proton/messenger.rb, line 215 def get(msg = nil) msg = Qpid::Proton::Message.new if msg.nil? check_for_error(Cproton.pn_messenger_get(@impl, msg.impl)) return msg end
Returns the number of messages in the incoming queue that have not been retrieved.
# File lib/qpid_proton/messenger.rb, line 244 def incoming Cproton.pn_messenger_incoming(@impl) end
Returns a Tracker
for the most recently received message.
# File lib/qpid_proton/messenger.rb, line 259 def incoming_tracker impl = Cproton.pn_messenger_incoming_tracker(@impl) return nil if impl == -1 Qpid::Proton::Tracker.new(impl) end
Returns the incoming window.
# File lib/qpid_proton/messenger.rb, line 357 def incoming_window Cproton.pn_messenger_get_incoming_window(@impl) end
Sets the incoming window.
If the incoming window is set to a positive value, then after each call to accept or reject, the object will track the status of that many deliveries.
window - the window size
# File lib/qpid_proton/messenger.rb, line 350 def incoming_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) check_for_error(Cproton.pn_messenger_set_incoming_window(@impl, window)) end
Returns the name.
# File lib/qpid_proton/messenger.rb, line 64 def name Cproton.pn_messenger_name(@impl) end
Returns the number messages in the outgoing queue that have not been transmitted.
# File lib/qpid_proton/messenger.rb, line 237 def outgoing Cproton.pn_messenger_outgoing(@impl) end
Returns a Tracker
for the message most recently sent via the
put method.
# File lib/qpid_proton/messenger.rb, line 251 def outgoing_tracker impl = Cproton.pn_messenger_outgoing_tracker(@impl) return nil if impl == -1 Qpid::Proton::Tracker.new(impl) end
Returns the outgoing window.
# File lib/qpid_proton/messenger.rb, line 377 def outgoing_window Cproton.pn_messenger_get_outgoing_window(@impl) end
Sets the outgoing window.
If the outgoing window is set to a positive value, then after each call to #send, the object will track the status of that many deliveries. ==== Options * window - the window size
# File lib/qpid_proton/messenger.rb, line 370 def outgoing_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) check_for_error(Cproton.pn_messenger_set_outgoing_window(@impl, window)) end
Returns the path to a private key file.
# File lib/qpid_proton/messenger.rb, line 163 def private_key Cproton.pn_messenger_get_private_key(@impl) end
Path to a private key file for the Messenger
.
The property must be specified for the Messenger
to accept
incoming SSL/TLS connections and to establish client authenticated outgoing
SSL/TLS connections.
key - the key file
# File lib/qpid_proton/messenger.rb, line 157 def private_key=(key) Cproton.pn_messenger_set_private_key(@impl, key) end
Puts a single message into the outgoing queue.
To ensure messages are sent, you should then call ::send.
message - the message
# File lib/qpid_proton/messenger.rb, line 193 def put(message) raise TypeError.new("invalid message: #{message}") if message.nil? raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message) check_for_error(Cproton.pn_messenger_put(@impl, message.impl)) end
Receives up to the specified number of messages, blocking until at least one message is received.
Options ====
max - the maximum number of messages to receive
# File lib/qpid_proton/messenger.rb, line 228 def receive(max) raise TypeError.new("invalid max: #{max}") if max.nil? || max.to_i.zero? raise RangeError.new("negative max: #{max}") if max < 0 check_for_error(Cproton.pn_messenger_recv(@impl, max)) end
Rejects the incoming message identified by the tracker.
tracker - the tracker
flag - the flag
# File lib/qpid_proton/messenger.rb, line 307 def reject(tracker, flag) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag)) end
Sends all outgoing messages, blocking until the outgoing queue is empty.
# File lib/qpid_proton/messenger.rb, line 202 def send check_for_error(Cproton.pn_messenger_send(@impl)) end
Settles messages for a tracker.
tracker - the tracker
flag - the flag
# File lib/qpid_proton/messenger.rb, line 334 def settle(tracker, flag) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) raise TypeError.new("invalid flag: #{flag}") unless Qpid::Proton::Tracker.valid_flag?(flag) Cproton.pn_messenger_settle(@impl, tracker.impl, flag) end
Starts the Messenger
, allowing it to begin sending and
receiving messages.
# File lib/qpid_proton/messenger.rb, line 108 def start check_for_error(Cproton.pn_messenger_start(@impl)) end
Gets the last known remote state of the delivery associated with the given tracker. See TrackerStatus for details on the values returned.
tracker - the tracker
# File lib/qpid_proton/messenger.rb, line 320 def status(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) Qpid::Proton::TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl)) end
Stops the Messenger
, preventing it from sending or receiving
any more messages.
# File lib/qpid_proton/messenger.rb, line 115 def stop check_for_error(Cproton.pn_messenger_stop(@impl)) end
Subscribes the Messenger
to a remote address.
# File lib/qpid_proton/messenger.rb, line 121 def subscribe(address) raise TypeError.new("invalid address: #{address}") if address.nil? subscription = Cproton.pn_messenger_subscribe(@impl, address) raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil? Qpid::Proton::Subscription.new(subscription) end
Returns the timeout period
# File lib/qpid_proton/messenger.rb, line 83 def timeout Cproton.pn_messenger_get_timeout(@impl) end
Sets the timeout period, in milliseconds.
A negative timeout period implies an infinite timeout.
timeout - the timeout period
# File lib/qpid_proton/messenger.rb, line 76 def timeout=(timeout) raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil? Cproton.pn_messenger_set_timeout(@impl, timeout) end
The path to the databse of trusted certificates.
# File lib/qpid_proton/messenger.rb, line 181 def trusted_certificates Cproton.pn_messenger_get_trusted_certificates(@impl) end
A path to a database of trusted certificates for use in verifying the peer
on an SSL/TLS connection. If this property is nil
, then the
peer will not be verified.
certificates - the certificates path
# File lib/qpid_proton/messenger.rb, line 175 def trusted_certificates=(certificates) Cproton.pn_messenger_set_trusted_certificates(@impl,certificates) end