From f8e110f180661f5e2b5f3838b0e4aa20e3866750 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sat, 25 Jan 2020 14:50:15 +0100 Subject: [PATCH] Handle connecting in `Listener.listen` --- fbchat/_mqtt.py | 150 +++++++++++++++++++++++------------------------- 1 file changed, 71 insertions(+), 79 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 5ec3010..b82a07c 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -61,6 +61,24 @@ def generate_session_id() -> int: return random.randint(1, 2 ** 53) +def mqtt_factory() -> paho.mqtt.client.Client: + # Configure internal MQTT handler + mqtt = paho.mqtt.client.Client( + client_id="mqttwsclient", + clean_session=True, + protocol=paho.mqtt.client.MQTTv31, + transport="websockets", + ) + mqtt.enable_logger() + # mqtt.max_inflight_messages_set(20) # The rest will get queued + # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued + # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds + # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) + mqtt.tls_set() + mqtt.connect_async(HOST, 443, keepalive=10) + return mqtt + + def fetch_sequence_id(session: _session.Session) -> int: """Fetch sequence ID.""" params = { @@ -84,10 +102,10 @@ class Listener: """Helper, to listen for incoming Facebook events.""" session = attr.ib(type=_session.Session) - _mqtt = attr.ib(type=paho.mqtt.client.Client) _chat_on = attr.ib(type=bool) _foreground = attr.ib(type=bool) _sequence_id = attr.ib(type=int) + _mqtt = attr.ib(factory=mqtt_factory, type=paho.mqtt.client.Client) _sync_token = attr.ib(None, type=str) _tmp_events = attr.ib(None, type=Optional[Iterable[_events.Event]]) @@ -97,6 +115,11 @@ class Listener: self.session, self._chat_on, self._foreground ) + def __attrs_post_init__(self): + # Configure callbacks + self._mqtt.on_message = self._on_message_handler + self._mqtt.on_connect = self._on_connect_handler + @classmethod def connect(cls, session: _session.Session, chat_on: bool, foreground: bool): """Initialize a connection to the Facebook MQTT service. @@ -109,51 +132,13 @@ class Listener: Example: >>> listener = fbchat.Listener.connect(session, chat_on=True, foreground=True) """ - mqtt = paho.mqtt.client.Client( - client_id="mqttwsclient", - clean_session=True, - protocol=paho.mqtt.client.MQTTv31, - transport="websockets", - ) - mqtt.enable_logger() - # mqtt.max_inflight_messages_set(20) # The rest will get queued - # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued - # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds - # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) - mqtt.tls_set() - - self = cls( + return cls( session=session, - mqtt=mqtt, chat_on=chat_on, foreground=foreground, sequence_id=fetch_sequence_id(session), ) - # Configure callbacks - mqtt.on_message = self._on_message_handler - mqtt.on_connect = self._on_connect_handler - - self._configure_connect_options() - - # Attempt to connect - try: - rc = mqtt.connect(HOST, 443, keepalive=10) - except ( - # Taken from .loop_forever - paho.mqtt.client.socket.error, - OSError, - paho.mqtt.client.WebsocketConnectionError, - ) as e: - raise _exception.FacebookError("MQTT connection failed") from e - - # Raise error if connecting failed - if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: - err = paho.mqtt.client.error_string(rc) - raise _exception.FacebookError("MQTT connection failed: {}".format(err)) - - return self - def _handle_ms(self, j): """Handle /t_ms special logic. @@ -206,7 +191,7 @@ class Listener: log.debug("MQTT payload: %s, %s", message.topic, j) if message.topic == "/t_ms": - if not _handle_ms(j): + if not self._handle_ms(j): return try: @@ -313,45 +298,21 @@ class Listener: path="/chat?sid={}".format(session_id), headers=headers ) - def _loop_once(self) -> bool: - rc = self._mqtt.loop(timeout=1.0) - - # If disconnect() has been called - # Beware, internal API, may have to change this to something more stable! - if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: - return False # Stop listening - - if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: - # If known/expected error - if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: - log.warning("Connection lost, retrying") - elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: - # This error is wrongly classified - # See https://github.com/eclipse/paho.mqtt.python/issues/340 - log.warning("Connection error, retrying") - elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED: - raise _exception.NotLoggedIn("MQTT connection refused") - else: - err = paho.mqtt.client.error_string(rc) - log.error("MQTT Error: %s", err) - + def _reconnect(self): + # Try reconnecting + self._configure_connect_options() + try: + self._mqtt.reconnect() + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ) as e: + log.debug("MQTT reconnection failed: %s", e) # Wait before reconnecting self._mqtt._reconnect_wait() - # Try reconnecting - self._configure_connect_options() - try: - self._mqtt.reconnect() - except ( - # Taken from .loop_forever - paho.mqtt.client.socket.error, - OSError, - paho.mqtt.client.WebsocketConnectionError, - ) as e: - log.debug("MQTT reconnection failed: %s", e) - - return True # Keep listening - def listen(self) -> Iterable[_events.Event]: """Run the listening loop continually. @@ -365,10 +326,41 @@ class Listener: >>> for event in listener.listen(): ... print(event) """ - while self._loop_once(): + # Make sure we're connected + while True: + # Beware, internal API, may have to change this to something more stable! + if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async: + self._reconnect() + else: + break + + while True: + rc = self._mqtt.loop(timeout=1.0) + + # If disconnect() has been called + # Beware, internal API, may have to change this to something more stable! + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + break # Stop listening + + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + # If known/expected error + if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: + log.warning("Connection lost, retrying") + elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: + # This error is wrongly classified + # See https://github.com/eclipse/paho.mqtt.python/issues/340 + log.warning("Connection error, retrying") + elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED: + raise _exception.NotLoggedIn("MQTT connection refused") + else: + err = paho.mqtt.client.error_string(rc) + log.error("MQTT Error: %s", err) + + self._reconnect() + if self._tmp_events: yield from self._tmp_events - self._tmp_events = None + self._tmp_events = None def disconnect(self) -> None: """Disconnect the MQTT listener.