From 9990952fa6098c6e12adc44e287ecbd85d61fea5 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Wed, 11 Mar 2020 15:27:00 +0100 Subject: [PATCH] Add Connect and Disconnect events --- fbchat/__init__.py | 2 ++ fbchat/_events/__init__.py | 19 +++++++++++++++++++ fbchat/_listen.py | 26 ++++++++++++++++---------- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/fbchat/__init__.py b/fbchat/__init__.py index b74a755..5e189e6 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -74,6 +74,8 @@ from ._events import ( Event, UnknownEvent, ThreadEvent, + Connect, + Disconnect, # _client_payload ReactionEvent, UserStatusEvent, diff --git a/fbchat/_events/__init__.py b/fbchat/_events/__init__.py index 7410cdb..619d686 100644 --- a/fbchat/_events/__init__.py +++ b/fbchat/_events/__init__.py @@ -67,6 +67,25 @@ class Presence(Event): return cls(statuses=statuses, full=data["list_type"] == "full") +@attrs_event +class Connect(Event): + """The client was connected to Facebook. + + This is not guaranteed to be triggered the same amount of times `Disconnect`! + """ + + +@attrs_event +class Disconnect(Event): + """The client lost the connection to Facebook. + + This is not guaranteed to be triggered the same amount of times `Connect`! + """ + + #: The reason / error string for the disconnect + reason = attr.ib(type=str) + + def parse_events(session, topic, data): # See Mqtt._configure_connect_options for information about these topics try: diff --git a/fbchat/_listen.py b/fbchat/_listen.py index 4b60a8e..003c78d 100644 --- a/fbchat/_listen.py +++ b/fbchat/_listen.py @@ -152,6 +152,7 @@ class Listener: "The MQTT listener was disconnected for too long," " events may have been lost" ) + # TODO: Find a way to tell the user that they may now be missing events self._sync_token = None self._sequence_id = None return False @@ -282,11 +283,12 @@ class Listener: path="/chat?sid={}".format(session_id), headers=headers ) - def _reconnect(self): + def _reconnect(self) -> bool: # Try reconnecting self._configure_connect_options() try: self._mqtt.reconnect() + return True except ( # Taken from .loop_forever paho.mqtt.client.socket.error, @@ -296,6 +298,7 @@ class Listener: log.debug("MQTT reconnection failed: %s", e) # Wait before reconnecting self._mqtt._reconnect_wait() + return False def listen(self) -> Iterable[_events.Event]: """Run the listening loop continually. @@ -315,12 +318,10 @@ class Listener: self._sequence_id = fetch_sequence_id(self.session) # Make sure we're connected - while True: - # Beware, internal API, may have to change this to something more stable! - if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async: - self._reconnect() - else: - break + while not self._reconnect(): + pass + + yield _events.Connect() while True: rc = self._mqtt.loop(timeout=1.0) @@ -339,18 +340,23 @@ class Listener: if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: # If known/expected error if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: - log.warning("Connection lost, retrying") + yield _events.Disconnect(reason="Connection lost, retrying") elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: # This error is wrongly classified # See https://github.com/eclipse/paho.mqtt.python/issues/340 - log.warning("Connection error, retrying") + yield _events.Disconnect(reason="Connection error, retrying") elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED: raise _exception.NotLoggedIn("MQTT connection refused") else: err = paho.mqtt.client.error_string(rc) log.error("MQTT Error: %s", err) + reason = "MQTT Error: {}, retrying".format(err) + yield _events.Disconnect(reason=reason) - self._reconnect() + while not self._reconnect(): + pass + + yield _events.Connect() if self._tmp_events: yield from self._tmp_events