Add Connect and Disconnect events
This commit is contained in:
@@ -74,6 +74,8 @@ from ._events import (
|
|||||||
Event,
|
Event,
|
||||||
UnknownEvent,
|
UnknownEvent,
|
||||||
ThreadEvent,
|
ThreadEvent,
|
||||||
|
Connect,
|
||||||
|
Disconnect,
|
||||||
# _client_payload
|
# _client_payload
|
||||||
ReactionEvent,
|
ReactionEvent,
|
||||||
UserStatusEvent,
|
UserStatusEvent,
|
||||||
|
@@ -67,6 +67,25 @@ class Presence(Event):
|
|||||||
return cls(statuses=statuses, full=data["list_type"] == "full")
|
return cls(statuses=statuses, full=data["list_type"] == "full")
|
||||||
|
|
||||||
|
|
||||||
|
@attrs_event
|
||||||
|
class Connect(Event):
|
||||||
|
"""The client was connected to Facebook.
|
||||||
|
|
||||||
|
This is not guaranteed to be triggered the same amount of times `Disconnect`!
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@attrs_event
|
||||||
|
class Disconnect(Event):
|
||||||
|
"""The client lost the connection to Facebook.
|
||||||
|
|
||||||
|
This is not guaranteed to be triggered the same amount of times `Connect`!
|
||||||
|
"""
|
||||||
|
|
||||||
|
#: The reason / error string for the disconnect
|
||||||
|
reason = attr.ib(type=str)
|
||||||
|
|
||||||
|
|
||||||
def parse_events(session, topic, data):
|
def parse_events(session, topic, data):
|
||||||
# See Mqtt._configure_connect_options for information about these topics
|
# See Mqtt._configure_connect_options for information about these topics
|
||||||
try:
|
try:
|
||||||
|
@@ -152,6 +152,7 @@ class Listener:
|
|||||||
"The MQTT listener was disconnected for too long,"
|
"The MQTT listener was disconnected for too long,"
|
||||||
" events may have been lost"
|
" events may have been lost"
|
||||||
)
|
)
|
||||||
|
# TODO: Find a way to tell the user that they may now be missing events
|
||||||
self._sync_token = None
|
self._sync_token = None
|
||||||
self._sequence_id = None
|
self._sequence_id = None
|
||||||
return False
|
return False
|
||||||
@@ -282,11 +283,12 @@ class Listener:
|
|||||||
path="/chat?sid={}".format(session_id), headers=headers
|
path="/chat?sid={}".format(session_id), headers=headers
|
||||||
)
|
)
|
||||||
|
|
||||||
def _reconnect(self):
|
def _reconnect(self) -> bool:
|
||||||
# Try reconnecting
|
# Try reconnecting
|
||||||
self._configure_connect_options()
|
self._configure_connect_options()
|
||||||
try:
|
try:
|
||||||
self._mqtt.reconnect()
|
self._mqtt.reconnect()
|
||||||
|
return True
|
||||||
except (
|
except (
|
||||||
# Taken from .loop_forever
|
# Taken from .loop_forever
|
||||||
paho.mqtt.client.socket.error,
|
paho.mqtt.client.socket.error,
|
||||||
@@ -296,6 +298,7 @@ class Listener:
|
|||||||
log.debug("MQTT reconnection failed: %s", e)
|
log.debug("MQTT reconnection failed: %s", e)
|
||||||
# Wait before reconnecting
|
# Wait before reconnecting
|
||||||
self._mqtt._reconnect_wait()
|
self._mqtt._reconnect_wait()
|
||||||
|
return False
|
||||||
|
|
||||||
def listen(self) -> Iterable[_events.Event]:
|
def listen(self) -> Iterable[_events.Event]:
|
||||||
"""Run the listening loop continually.
|
"""Run the listening loop continually.
|
||||||
@@ -315,12 +318,10 @@ class Listener:
|
|||||||
self._sequence_id = fetch_sequence_id(self.session)
|
self._sequence_id = fetch_sequence_id(self.session)
|
||||||
|
|
||||||
# Make sure we're connected
|
# Make sure we're connected
|
||||||
while True:
|
while not self._reconnect():
|
||||||
# Beware, internal API, may have to change this to something more stable!
|
pass
|
||||||
if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async:
|
|
||||||
self._reconnect()
|
yield _events.Connect()
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
rc = self._mqtt.loop(timeout=1.0)
|
rc = self._mqtt.loop(timeout=1.0)
|
||||||
@@ -339,18 +340,23 @@ class Listener:
|
|||||||
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
|
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
|
||||||
# If known/expected error
|
# If known/expected error
|
||||||
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
|
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
|
||||||
log.warning("Connection lost, retrying")
|
yield _events.Disconnect(reason="Connection lost, retrying")
|
||||||
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
|
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
|
||||||
# This error is wrongly classified
|
# This error is wrongly classified
|
||||||
# See https://github.com/eclipse/paho.mqtt.python/issues/340
|
# See https://github.com/eclipse/paho.mqtt.python/issues/340
|
||||||
log.warning("Connection error, retrying")
|
yield _events.Disconnect(reason="Connection error, retrying")
|
||||||
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
|
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
|
||||||
raise _exception.NotLoggedIn("MQTT connection refused")
|
raise _exception.NotLoggedIn("MQTT connection refused")
|
||||||
else:
|
else:
|
||||||
err = paho.mqtt.client.error_string(rc)
|
err = paho.mqtt.client.error_string(rc)
|
||||||
log.error("MQTT Error: %s", err)
|
log.error("MQTT Error: %s", err)
|
||||||
|
reason = "MQTT Error: {}, retrying".format(err)
|
||||||
|
yield _events.Disconnect(reason=reason)
|
||||||
|
|
||||||
self._reconnect()
|
while not self._reconnect():
|
||||||
|
pass
|
||||||
|
|
||||||
|
yield _events.Connect()
|
||||||
|
|
||||||
if self._tmp_events:
|
if self._tmp_events:
|
||||||
yield from self._tmp_events
|
yield from self._tmp_events
|
||||||
|
Reference in New Issue
Block a user