From 2da8369c70deb86903142bc7af32ded50f4c311e Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sat, 25 Jan 2020 14:34:20 +0100 Subject: [PATCH] Refactor and improve MQTT listener a bit --- fbchat/_mqtt.py | 204 +++++++++++++++++++++++++----------------------- 1 file changed, 106 insertions(+), 98 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 0fddeb7..5ec3010 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -8,6 +8,46 @@ from . import _util, _exception, _session, _graphql, _events from typing import Iterable, Optional +HOST = "edge-chat.facebook.com" + +TOPICS = [ + # Things that happen in chats (e.g. messages) + "/t_ms", + # Group typing notifications + "/thread_typing", + # Private chat typing notifications + "/orca_typing_notifications", + # Active notifications + "/orca_presence", + # Other notifications not related to chats (e.g. friend requests) + "/legacy_web", + # Facebook's continuous error reporting/logging? + "/br_sr", + # Response to /br_sr + "/sr_res", + # Data about user-to-user calls + # TODO: Investigate the response from this! (A bunch of binary data) + # "/t_rtc", + # TODO: Find out what this does! + # TODO: Investigate the response from this! (A bunch of binary data) + # "/t_p", + # TODO: Find out what this does! + "/webrtc", + # TODO: Find out what this does! + "/onevc", + # TODO: Find out what this does! + "/notify_disconnect", + # Old, no longer active topics + # These are here just in case something interesting pops up + "/inbox", + "/mercury", + "/messaging_events", + "/orca_message_notifications", + "/pp", + "/webrtc_response", +] + + def get_cookie_header(session: requests.Session, url: str) -> str: """Extract a cookie header from a requests session.""" # The cookies are extracted this way to make sure they're escaped correctly @@ -21,6 +61,24 @@ def generate_session_id() -> int: return random.randint(1, 2 ** 53) +def fetch_sequence_id(session: _session.Session) -> int: + """Fetch sequence ID.""" + params = { + "limit": 0, + "tags": ["INBOX"], + "before": None, + "includeDeliveryReceipts": False, + "includeSeqID": True, + } + log.debug("Fetching MQTT sequence ID") + # Same doc id as in `Client.fetch_threads` + (j,) = session._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"] + if not sequence_id: + raise _exception.NotLoggedIn("Failed fetching sequence id") + return int(sequence_id) + + @attr.s(slots=True, kw_only=kw_only, repr=False, eq=False) class Listener: """Helper, to listen for incoming Facebook events.""" @@ -33,8 +91,6 @@ class Listener: _sync_token = attr.ib(None, type=str) _tmp_events = attr.ib(None, type=Optional[Iterable[_events.Event]]) - _HOST = "edge-chat.facebook.com" - def __repr__(self) -> str: # An alternative repr, to illustrate that you can't create the class directly return "".format( @@ -64,7 +120,6 @@ class Listener: # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) - # TODO: Is region (lla | atn | odn | others?) important? mqtt.tls_set() self = cls( @@ -72,7 +127,7 @@ class Listener: mqtt=mqtt, chat_on=chat_on, foreground=foreground, - sequence_id=cls._fetch_sequence_id(session), + sequence_id=fetch_sequence_id(session), ) # Configure callbacks @@ -83,7 +138,7 @@ class Listener: # Attempt to connect try: - rc = mqtt.connect(self._HOST, 443, keepalive=10) + rc = mqtt.connect(HOST, 443, keepalive=10) except ( # Taken from .loop_forever paho.mqtt.client.socket.error, @@ -99,6 +154,46 @@ class Listener: return self + def _handle_ms(self, j): + """Handle /t_ms special logic. + + Returns whether to continue parsing the message. + """ + # TODO: Merge this with the parsing in _events + + # Update sync_token when received + # This is received in the first message after we've created a messenger + # sync queue. + if "syncToken" in j and "firstDeltaSeqId" in j: + self._sync_token = j["syncToken"] + self._sequence_id = j["firstDeltaSeqId"] + return False + + if "errorCode" in j: + error = j["errorCode"] + # TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' + if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"): + # ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too + # much time passed, or that it was simply missing + # ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so + # the desired events could not be retrieved + log.error( + "The MQTT listener was disconnected for too long," + " events may have been lost" + ) + self._sync_token = None + self._sequence_id = fetch_sequence_id(self.session) + self._messenger_queue_publish() + # TODO: Signal to the user that they should reload their data! + return False + log.error("MQTT error code %s received", error) + return False + + # Update last sequence id + # Except for the two cases above, this is always received + self._sequence_id = j["lastIssuedSeqId"] + return True + def _on_message_handler(self, client, userdata, message): # Parse payload JSON try: @@ -111,40 +206,9 @@ class Listener: log.debug("MQTT payload: %s, %s", message.topic, j) if message.topic == "/t_ms": - # Update sync_token when received - # This is received in the first message after we've created a messenger - # sync queue. - if "syncToken" in j and "firstDeltaSeqId" in j: - self._sync_token = j["syncToken"] - self._sequence_id = j["firstDeltaSeqId"] + if not _handle_ms(j): return - if "errorCode" in j: - error = j["errorCode"] - # TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' - if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"): - # ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too - # much time passed, or that it was simply missing - # ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so - # the desired events could not be retrieved - log.error( - "The MQTT listener was disconnected for too long," - " events may have been lost" - ) - self._sync_token = None - self._sequence_id = self._fetch_sequence_id(self.session) - self._messenger_queue_publish() - # TODO: Signal to the user that they should reload their data! - return - log.error("MQTT error code %s received", error) - return - - # Update last sequence id when received - if "lastIssuedSeqId" in j: - self._sequence_id = j["lastIssuedSeqId"] - else: - log.error("Missing last sequence id: %s", j) - try: # TODO: Don't handle this in a callback self._tmp_events = list( @@ -153,32 +217,14 @@ class Listener: except _exception.ParseError: log.exception("Failed parsing MQTT data") - @staticmethod - def _fetch_sequence_id(session: _session.Session) -> int: - """Fetch sequence ID.""" - params = { - "limit": 0, - "tags": ["INBOX"], - "before": None, - "includeDeliveryReceipts": False, - "includeSeqID": True, - } - log.debug("Fetching MQTT sequence ID") - # Same request as in `Client.fetch_threads` - (j,) = session._graphql_requests( - _graphql.from_doc_id("1349387578499440", params) - ) - sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"] - if not sequence_id: - raise _exception.NotLoggedIn("Failed fetching sequence id") - return int(sequence_id) - def _on_connect_handler(self, client, userdata, flags, rc): if rc == 21: raise _exception.FacebookError( "Failed connecting. Maybe your cookies are wrong?" ) if rc != 0: + err = paho.mqtt.client.connack_string(rc) + log.error("MQTT Connection Error: %s", err) return # Don't try to send publish if the connection failed self._messenger_queue_publish() @@ -211,43 +257,6 @@ class Listener: # Generate a new session ID on each reconnect session_id = generate_session_id() - topics = [ - # Things that happen in chats (e.g. messages) - "/t_ms", - # Group typing notifications - "/thread_typing", - # Private chat typing notifications - "/orca_typing_notifications", - # Active notifications - "/orca_presence", - # Other notifications not related to chats (e.g. friend requests) - "/legacy_web", - # Facebook's continuous error reporting/logging? - "/br_sr", - # Response to /br_sr - "/sr_res", - # Data about user-to-user calls - # TODO: Investigate the response from this! (A bunch of binary data) - # "/t_rtc", - # TODO: Find out what this does! - # TODO: Investigate the response from this! (A bunch of binary data) - # "/t_p", - # TODO: Find out what this does! - "/webrtc", - # TODO: Find out what this does! - "/onevc", - # TODO: Find out what this does! - "/notify_disconnect", - # Old, no longer active topics - # These are here just in case something interesting pops up - "/inbox", - "/mercury", - "/messaging_events", - "/orca_message_notifications", - "/pp", - "/webrtc_response", - ] - username = { # The user ID "u": self.session.user.id, @@ -262,7 +271,7 @@ class Listener: # Application ID, taken from facebook.com "aid": 219994525426954, # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing - "st": topics, + "st": TOPICS, # MQTT extension by FB, allows making a PUBLISH while CONNECTing # Using this is more efficient, but the same can be acheived with: # def on_connect(*args): @@ -288,19 +297,18 @@ class Listener: "pack": [], } - # TODO: Make this thread safe self._mqtt.username_pw_set(_util.json_minimal(username)) headers = { - # TODO: Make this access thread safe "Cookie": get_cookie_header( self.session._session, "https://edge-chat.facebook.com/chat" ), "User-Agent": self.session._session.headers["User-Agent"], "Origin": "https://www.facebook.com", - "Host": self._HOST, + "Host": HOST, } + # TODO: Is region (lla | atn | odn | others?) important? self._mqtt.ws_set_options( path="/chat?sid={}".format(session_id), headers=headers )