From a298e0cf165e1c670fc97e17c0a478b7ae039d41 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 14:56:01 +0100 Subject: [PATCH] Refactor MQTT to do proper reconnecting --- fbchat/_mqtt.py | 146 ++++++++++++++++++++++++++++-------------------- 1 file changed, 86 insertions(+), 60 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index da9a588..3875e88 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -2,7 +2,7 @@ import attr import random import paho.mqtt.client from ._core import log -from . import _util, _graphql +from . import _util, _exception, _graphql def generate_session_id(): @@ -17,7 +17,7 @@ class Mqtt: _on_message = attr.ib() _chat_on = attr.ib() _foreground = attr.ib() - _session_id = attr.ib(factory=generate_session_id) + _sequence_id = attr.ib() _sync_token = attr.ib(None) _HOST = "edge-chat.facebook.com" @@ -30,15 +30,6 @@ class Mqtt: protocol=paho.mqtt.client.MQTTv31, transport="websockets", ) - - self = cls( - state=state, - mqtt=mqtt, - on_message=on_message, - chat_on=chat_on, - foreground=foreground, - ) - mqtt.enable_logger() # mqtt.max_inflight_messages_set(20) # mqtt.max_queued_messages_set(0) # unlimited @@ -46,46 +37,46 @@ class Mqtt: # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) # TODO: Is region (lla | atn | odn | others?) important? mqtt.tls_set() - mqtt.ws_set_options( - path="/chat?sid={}".format(session_id), headers=self._create_headers - ) - mqtt.on_message = self._on_message_handler - sequence_id = self._fetch_sequence_id(self._state) - # Set connect/reconnect data with an empty sync token and an newly fetched - # sequence id initially - self._set_reconnect_data(self._sync_token, sequence_id) + self = cls( + state=state, + mqtt=mqtt, + on_message=on_message, + chat_on=chat_on, + foreground=foreground, + sequence_id=cls._fetch_sequence_id(state), + ) + + # Configure callbacks + mqtt.on_message = self._on_message_handler + mqtt.on_connect = self._on_connect_handler + + self._configure_connect_options() # TODO: Handle response code response_code = mqtt.connect(self._HOST, 443, keepalive=10) - def _create_headers(self, headers): - log.debug("Fetching MQTT headers") - # TODO: Make this access thread safe - headers["Cookie"] = _util.get_cookie_header(self._state._session, self._HOST) - headers["User-Agent"] = self._state._session.headers["User-Agent"] - headers["Origin"] = "https://www.facebook.com" - headers["Host"] = self._HOST - return headers + return self def _on_message_handler(self, client, userdata, message): - j = _util.parse_json(message.payload) - if message.topic == "/t_ms": - sequence_id = None + # Parse payload JSON + try: + j = _util.parse_json(message.payload) + except _exception.FBchatFacebookError: + log.exception("Failed parsing MQTT data as JSON: %r", message.payload) + return + if message.topic == "/t_ms": # Update sync_token when received # This is received in the first message after we've created a messenger # sync queue. if "syncToken" in j and "firstDeltaSeqId" in j: self._sync_token = j["syncToken"] - sequence_id = j["firstDeltaSeqId"] + self._sequence_id = j["firstDeltaSeqId"] # Update last sequence id when received if "lastIssuedSeqId" in j: - sequence_id = j["lastIssuedSeqId"] - - if sequence_id is not None: - self._set_reconnect_data(self._sync_token, sequence_id) + self._sequence_id = j["lastIssuedSeqId"] # Call the external callback self._on_message(message.topic, j) @@ -109,40 +100,39 @@ class Mqtt: # TODO: Proper exceptions raise - @staticmethod - def _get_messenger_sync(state, sync_token, sequence_id): - """Get the data to configure receiving messages.""" + def _on_connect_handler(self, client, userdata, flags, rc): + # configure receiving messages. payload = { "sync_api_version": 10, "max_deltas_able_to_process": 1000, "delta_batch_size": 500, "encoding": "JSON", - "entity_fbid": state.user_id, + "entity_fbid": self._state.user_id, } # If we don't have a sync_token, create a new messenger queue # This is done so that across reconnects, if we've received a sync token, we # SHOULD receive a piece of data in /t_ms exactly once! - if sync_token is None: + if self._sync_token is None: topic = "/messenger_sync_create_queue" - payload["initial_titan_sequence_id"] = str(sequence_id) + payload["initial_titan_sequence_id"] = str(self._sequence_id) payload["device_params"] = None else: topic = "/messenger_sync_get_diffs" - payload["last_seq_id"] = str(sequence_id) - payload["sync_token"] = sync_token + payload["last_seq_id"] = str(self._sequence_id) + payload["sync_token"] = self._sync_token - return topic, payload + self._mqtt.publish(topic, _util.json_minimal(payload), qos=1) - def _set_reconnect_data(self, sync_token, sequence_id): - log.debug("Setting MQTT reconnect data: %s/%s", sync_token, sequence_id) - topic, payload = self._get_messenger_sync(self._state, sync_token, sequence_id) + def _configure_connect_options(self): + # Generate a new session ID on each reconnect + session_id = generate_session_id() username = { # The user ID "u": self._state.user_id, # Session ID - "s": self._session_id, + "s": session_id, # Active status setting "chat_on": self._chat_on, # foreground_state - Whether the window is focused @@ -178,18 +168,18 @@ class Mqtt: "/sr_res", ], # MQTT extension by FB, allows making a PUBLISH while CONNECTing + # Using this is more efficient, but the same can be acheived with: + # def on_connect(*args): + # mqtt.publish(topic, payload, qos=1) + # mqtt.on_connect = on_connect + # TODO: For some reason this doesn't work! "pm": [ - { - "topic": topic, - "payload": _util.json_minimal(payload), - "qos": 1, - "messageId": 65536, - } - # The above is more efficient, but the same effect could have been - # acheived with: - # def on_connect(*args): - # mqtt.publish(topic, payload=..., qos=1) - # mqtt.on_connect = on_connect + # { + # "topic": topic, + # "payload": payload, + # "qos": 1, + # "messageId": 65536, + # } ], # Unknown parameters "cp": 3, @@ -205,8 +195,44 @@ class Mqtt: # TODO: Make this thread safe self._mqtt.username_pw_set(_util.json_minimal(username)) + headers = { + # TODO: Make this access thread safe + "Cookie": _util.get_cookie_header(self._state._session, self._HOST), + "User-Agent": self._state._session.headers["User-Agent"], + "Origin": "https://www.facebook.com", + "Host": self._HOST, + } + + self._mqtt.ws_set_options( + path="/chat?sid={}".format(session_id), headers=headers + ) + def listen(self): - self._mqtt.loop_forever() # TODO: retry_first_connection=True? + while True: + rc = self._mqtt.loop(timeout=1.0) + if rc == paho.mqtt.client.MQTT_ERR_SUCCESS: + continue # No errors + + # If disconnect() has been called + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + break + + # Wait before reconnecting + self._mqtt._reconnect_wait() + + # Try reconnecting + self._configure_connect_options() + try: + self._mqtt.reconnect() + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ): + log.debug("MQTT connection failed") + + # self._mqtt.loop_forever() # TODO: retry_first_connection=True? def disconnect(self): self._mqtt.disconnect()