Skip to content
This repository has been archived by the owner on Feb 19, 2020. It is now read-only.

callbacks for success/failure on stream sending #182

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions sleekxmpp/basexmpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def make_presence(self, pshow=None, pstatus=None, ppriority=None,
return presence

def send_message(self, mto, mbody, msubject=None, mtype=None,
mhtml=None, mfrom=None, mnick=None):
mhtml=None, mfrom=None, mnick=None, extra=None):
"""
Create, initialize, and send a new
:class:`~sleekxmpp.stanza.message.Message` stanza.
Expand All @@ -519,12 +519,15 @@ def send_message(self, mto, mbody, msubject=None, mtype=None,
be aware that some servers require that the full JID
of the sender be used.
:param mnick: Optional nickname of the sender.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
"""
self.make_message(mto, mbody, msubject, mtype,
mhtml, mfrom, mnick).send()
mhtml, mfrom, mnick).send(extra=extra)

def send_presence(self, pshow=None, pstatus=None, ppriority=None,
pto=None, pfrom=None, ptype=None, pnick=None):
pto=None, pfrom=None, ptype=None, pnick=None,
extra=None):
"""
Create, initialize, and send a new
:class:`~sleekxmpp.stanza.presence.Presence` stanza.
Expand All @@ -536,12 +539,15 @@ def send_presence(self, pshow=None, pstatus=None, ppriority=None,
:param ptype: The type of presence, such as ``'subscribe'``.
:param pfrom: The sender of the presence.
:param pnick: Optional nickname of the presence's sender.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
"""
self.make_presence(pshow, pstatus, ppriority, pto,
ptype, pfrom, pnick).send()
ptype, pfrom, pnick).send(extra=extra)

def send_presence_subscription(self, pto, pfrom=None,
ptype='subscribe', pnick=None):
ptype='subscribe', pnick=None,
extra=None):
"""
Create, initialize, and send a new
:class:`~sleekxmpp.stanza.presence.Presence` stanza of
Expand All @@ -551,11 +557,13 @@ def send_presence_subscription(self, pto, pfrom=None,
:param pfrom: The sender of the presence.
:param ptype: The type of presence, such as ``'subscribe'``.
:param pnick: Optional nickname of the presence's sender.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
"""
self.make_presence(ptype=ptype,
pfrom=pfrom,
pto=JID(pto).bare,
pnick=pnick).send()
pnick=pnick).send(extra=extra)

@property
def jid(self):
Expand Down
4 changes: 2 additions & 2 deletions sleekxmpp/xmlstream/stanzabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,14 +1568,14 @@ def exception(self, e):
log.exception('Error handling {%s}%s stanza', self.namespace,
self.name)

def send(self, now=False):
def send(self, now=False, extra=None):
"""Queue the stanza to be sent on the XML stream.

:param bool now: Indicates if the queue should be skipped and the
stanza sent immediately. Useful for stream
initialization. Defaults to ``False``.
"""
self.stream.send(self, now=now)
self.stream.send(self, now=now, extra=extra)

def __copy__(self):
"""Return a copy of the stanza object that does not share the
Expand Down
34 changes: 26 additions & 8 deletions sleekxmpp/xmlstream/xmlstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ def incoming_filter(self, xml):
"""
return xml

def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
def send(self, data, mask=None, timeout=None, now=False, use_filters=True, extra=None):
"""A wrapper for :meth:`send_raw()` for sending stanza objects.

May optionally block until an expected response is received.
Expand All @@ -1172,6 +1172,8 @@ def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
applied to the given stanza data. Disabling
filters is useful when resending stanzas.
Defaults to ``True``.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
"""
if timeout is None:
timeout = self.response_timeout
Expand Down Expand Up @@ -1199,13 +1201,13 @@ def send(self, data, mask=None, timeout=None, now=False, use_filters=True):
if data is None:
return
str_data = str(data)
self.send_raw(str_data, now)
self.send_raw(str_data, now, extra=extra)
else:
self.send_raw(data, now)
self.send_raw(data, now, extra=extra)
if mask is not None:
return wait_for.wait(timeout)

def send_xml(self, data, mask=None, timeout=None, now=False):
def send_xml(self, data, mask=None, timeout=None, now=False, extra=None):
"""Send an XML object on the stream, and optionally wait
for a response.

Expand All @@ -1222,19 +1224,23 @@ def send_xml(self, data, mask=None, timeout=None, now=False):
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to ``False``.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
"""
if timeout is None:
timeout = self.response_timeout
return self.send(tostring(data), mask, timeout, now)
return self.send(tostring(data), mask, timeout, now, extra=extra)

def send_raw(self, data, now=False, reconnect=None):
def send_raw(self, data, now=False, reconnect=None, extra=None):
"""Send raw data across the stream.

:param string data: Any string value.
:param bool reconnect: Indicates if the stream should be
restarted if there is an error sending
the stanza. Used mainly for testing.
Defaults to :attr:`auto_reconnect`.
:param object extra: Extra data to be sent to the callback function
for send_success and send_failure messages.
"""
if now:
log.debug("SEND (IMMED): %s", data)
Expand All @@ -1254,6 +1260,8 @@ def send_raw(self, data, now=False, reconnect=None):
log.debug('SSL error: max retries reached')
self.exception(serr)
log.warning("Failed to send %s", data)
self.event(
'send_failure', (data, extra), direct=True)
if reconnect is None:
reconnect = self.auto_reconnect
if not self.stop.is_set():
Expand All @@ -1268,12 +1276,15 @@ def send_raw(self, data, now=False, reconnect=None):
except (Socket.error, ssl.SSLError) as serr:
self.event('socket_error', serr, direct=True)
log.warning("Failed to send %s", data)
self.event('send_failure', (data, extra), direct=True)
if reconnect is None:
reconnect = self.auto_reconnect
if not self.stop.is_set():
self.disconnect(reconnect, send_close=False)
else:
self.event('send_success', (data, extra), direct=True)
else:
self.send_queue.put(data)
self.send_queue.put((data, extra))
return True

def _start_thread(self, name, target, track=True):
Expand Down Expand Up @@ -1647,10 +1658,11 @@ def _send_thread(self):
self.session_started_event.wait(timeout=0.1)
if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza
extra = None
self.__failed_send_stanza = None
else:
try:
data = self.send_queue.get(True, 1)
data, extra = self.send_queue.get(True, 1)
except queue.Empty:
continue
log.debug("SEND: %s", data)
Expand All @@ -1671,6 +1683,8 @@ def _send_thread(self):
log.debug('SSL error: max retries reached')
self.exception(serr)
log.warning("Failed to send %s", data)
self.event(
'send_failure', (data, extra), direct=True)
if not self.stop.is_set():
self.disconnect(self.auto_reconnect,
send_close=False)
Expand All @@ -1684,11 +1698,15 @@ def _send_thread(self):
except (Socket.error, ssl.SSLError) as serr:
self.event('socket_error', serr, direct=True)
log.warning("Failed to send %s", data)
self.event('send_failure', (data, extra), direct=True)
if not self.stop.is_set():
self.__failed_send_stanza = data
self._end_thread('send')
self.disconnect(self.auto_reconnect, send_close=False)
return
else:
log.debug('send_success event about to be called')
self.event('send_success', (data, extra), direct=True)
except Exception as ex:
log.exception('Unexpected error in send thread: %s', ex)
self.exception(ex)
Expand Down