From ffdf4222bf0a8b1f859e80800e7786d1ba314497 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 15 Dec 2019 15:30:02 +0100 Subject: [PATCH 01/18] Split ._parseMessage to reduce indentation --- fbchat/_client.py | 164 +++++++++++++++++++++++----------------------- 1 file changed, 82 insertions(+), 82 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 42b0dba..b5ab257 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2751,6 +2751,87 @@ class Client(object): else: self.onUnknownMesssageType(msg=m) + def _parse_payload(self, m): + mtype = m.get("type") + # Things that directly change chat + if mtype == "delta": + self._parseDelta(m) + # Inbox + elif mtype == "inbox": + self.onInbox( + unseen=m["unseen"], + unread=m["unread"], + recent_unread=m["recent_unread"], + msg=m, + ) + + # Typing + elif mtype == "typ" or mtype == "ttyp": + author_id = str(m.get("from")) + thread_id = m.get("thread_fbid") + if thread_id: + thread_type = ThreadType.GROUP + thread_id = str(thread_id) + else: + thread_type = ThreadType.USER + if author_id == self._uid: + thread_id = m.get("to") + else: + thread_id = author_id + typing_status = TypingStatus(m.get("st")) + self.onTyping( + author_id=author_id, + status=typing_status, + thread_id=thread_id, + thread_type=thread_type, + msg=m, + ) + + # Delivered + + # Seen + # elif mtype == "m_read_receipt": + # + # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) + + elif mtype in ["jewel_requests_add"]: + from_id = m["from"] + self.onFriendRequest(from_id=from_id, msg=m) + + # Happens on every login + elif mtype == "qprimer": + self.onQprimer(ts=m.get("made"), msg=m) + + # Is sent before any other message + elif mtype == "deltaflow": + pass + + # Chat timestamp + elif mtype == "chatproxy-presence": + statuses = dict() + for id_, data in m.get("buddyList", {}).items(): + statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) + self._buddylist[id_] = statuses[id_] + + self.onChatTimestamp(buddylist=statuses, msg=m) + + # Buddylist overlay + elif mtype == "buddylist_overlay": + statuses = dict() + for id_, data in m.get("overlay", {}).items(): + old_in_game = None + if id_ in self._buddylist: + old_in_game = self._buddylist[id_].in_game + + statuses[id_] = ActiveStatus._from_buddylist_overlay(data, old_in_game) + self._buddylist[id_] = statuses[id_] + + self.onBuddylistOverlay(statuses=statuses, msg=m) + + # Unknown message type + else: + self.onUnknownMesssageType(msg=m) + def _parseMessage(self, content): """Get message and author name from content. @@ -2770,89 +2851,8 @@ class Client(object): return for m in content["ms"]: - mtype = m.get("type") try: - # Things that directly change chat - if mtype == "delta": - self._parseDelta(m) - # Inbox - elif mtype == "inbox": - self.onInbox( - unseen=m["unseen"], - unread=m["unread"], - recent_unread=m["recent_unread"], - msg=m, - ) - - # Typing - elif mtype == "typ" or mtype == "ttyp": - author_id = str(m.get("from")) - thread_id = m.get("thread_fbid") - if thread_id: - thread_type = ThreadType.GROUP - thread_id = str(thread_id) - else: - thread_type = ThreadType.USER - if author_id == self._uid: - thread_id = m.get("to") - else: - thread_id = author_id - typing_status = TypingStatus(m.get("st")) - self.onTyping( - author_id=author_id, - status=typing_status, - thread_id=thread_id, - thread_type=thread_type, - msg=m, - ) - - # Delivered - - # Seen - # elif mtype == "m_read_receipt": - # - # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) - - elif mtype in ["jewel_requests_add"]: - from_id = m["from"] - self.onFriendRequest(from_id=from_id, msg=m) - - # Happens on every login - elif mtype == "qprimer": - self.onQprimer(ts=m.get("made"), msg=m) - - # Is sent before any other message - elif mtype == "deltaflow": - pass - - # Chat timestamp - elif mtype == "chatproxy-presence": - statuses = dict() - for id_, data in m.get("buddyList", {}).items(): - statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) - self._buddylist[id_] = statuses[id_] - - self.onChatTimestamp(buddylist=statuses, msg=m) - - # Buddylist overlay - elif mtype == "buddylist_overlay": - statuses = dict() - for id_, data in m.get("overlay", {}).items(): - old_in_game = None - if id_ in self._buddylist: - old_in_game = self._buddylist[id_].in_game - - statuses[id_] = ActiveStatus._from_buddylist_overlay( - data, old_in_game - ) - self._buddylist[id_] = statuses[id_] - - self.onBuddylistOverlay(statuses=statuses, msg=m) - - # Unknown message type - else: - self.onUnknownMesssageType(msg=m) - + self._parse_payload(m) except Exception as e: self.onMessageError(exception=e, msg=m) From ea518ba4c91e9587f827ef18fee9924e1e10a7e3 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sat, 4 Jan 2020 16:23:35 +0100 Subject: [PATCH 02/18] Add initial MQTT helper --- fbchat/_mqtt.py | 108 ++++++++++++++++++++++++++++++++++++++++++++++++ fbchat/_util.py | 12 ++++++ pyproject.toml | 1 + 3 files changed, 121 insertions(+) create mode 100644 fbchat/_mqtt.py diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py new file mode 100644 index 0000000..f01d6d2 --- /dev/null +++ b/fbchat/_mqtt.py @@ -0,0 +1,108 @@ +import attr +import random +import paho.mqtt.client +from . import _util, _graphql + + +@attr.s(slots=True) +class Mqtt: + _state = attr.ib() + _mqtt = attr.ib() + + @classmethod + def connect(cls, state, foreground): + mqtt = paho.mqtt.client.Client( + client_id="mqttwsclient", + clean_session=True, + protocol=paho.mqtt.client.MQTTv31, + transport="websockets", + ) + mqtt.enable_logger() + + # Generate a random session ID between 1 and 9007199254740991 + session_id = random.randint(1, 2 ** 53) + + username = { + # The user ID + "u": state.user_id, + # Session ID + "s": session_id, + # Active status setting + "chat_on": True, + # foreground_state - Whether the window is focused + "fg": foreground, + # Can be any random ID + "d": state._client_id, + # Application ID, taken from facebook.com + "aid": 219994525426954, + # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing + "st": [ + # TODO: Investigate the result from these + # "/inbox", + # "/mercury", + # "/messaging_events", + # "/orca_message_notifications", + # "/pp", + # "/t_p", + # "/t_rtc", + # "/webrtc_response", + "/legacy_web", + "/webrtc", + "/onevc", + # Things that happen in chats (e.g. messages) + "/t_ms", + # Group typing notifications + "/thread_typing", + # Private chat typing notifications + "/orca_typing_notifications", + "/notify_disconnect", + # Active notifications + "/orca_presence", + "/br_sr", + "/sr_res", + ], + # MQTT extension by FB, allows making a PUBLISH while CONNECTing + "pm": [], + # Unknown parameters + "cp": 3, + "ecp": 10, + "ct": "websocket", + "mqtt_sid": "", + "dc": "", + "no_auto_fg": True, + "gas": None, + "pack": [], + } + mqtt.username_pw_set(_util.json_minimal(username)) + + headers = { + "Cookie": _util.get_cookie_header(state._session, "edge-chat.facebook.com"), + "User-Agent": state._session.headers["User-Agent"], + "Origin": "https://www.facebook.com", + "Host": "edge-chat.facebook.com", + } + + # TODO: Is region (lla | atn | odn | others?) important? + mqtt.ws_set_options(path="/chat?sid={}".format(session_id), headers=headers) + mqtt.tls_set() + response_code = mqtt.connect("edge-chat.facebook.com", 443, keepalive=10) + # TODO: Handle response code + + return cls(state=state, mqtt=mqtt) + + def listen(self, on_message): + def real_on_message(client, userdata, message): + on_message(message.topic, message.payload) + + self._mqtt.on_message = real_on_message + + self._mqtt.loop_forever() # TODO: retry_first_connection=True? + + def disconnect(self): + self._mqtt.disconnect() + + def set_foreground(self, state): + payload = _util.json_minimal({"foreground": state}) + info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() diff --git a/fbchat/_util.py b/fbchat/_util.py index af06d42..6228989 100644 --- a/fbchat/_util.py +++ b/fbchat/_util.py @@ -57,6 +57,11 @@ def now(): return int(time() * 1000) +def json_minimal(data): + """Get JSON data in minimal form.""" + return json.dumps(data, separators=(",", ":")) + + def strip_json_cruft(text): """Removes `for(;;);` (and other cruft) that preceeds JSON responses.""" try: @@ -65,6 +70,13 @@ def strip_json_cruft(text): raise FBchatException("No JSON object found: {!r}".format(text)) +def get_cookie_header(session, host): + """Extract a cookie header from a requests session.""" + return requests.cookies.get_cookie_header( + session.cookies, requests.Request("GET", host), + ) + + def get_decoded_r(r): return get_decoded(r._content) diff --git a/pyproject.toml b/pyproject.toml index 136268d..29cbc28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ requires = [ "attrs>=18.2", "requests~=2.19", "beautifulsoup4~=4.0", + "paho-mqtt~=1.5", ] description-file = "README.rst" classifiers = [ From ecc6edac5a8ab8447189e1c4c11977ca0c351816 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sat, 4 Jan 2020 16:14:39 +0100 Subject: [PATCH 03/18] Fix message receiving in MQTT --- fbchat/_mqtt.py | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index f01d6d2..c17d110 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -4,6 +4,24 @@ import paho.mqtt.client from . import _util, _graphql +def fetch_sequence_id(state): + """Fetch sequence ID.""" + params = { + "limit": 1, + "tags": ["INBOX"], + "before": None, + "includeDeliveryReceipts": False, + "includeSeqID": True, + } + # Same request as in `Client.fetchThreadList` + (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + try: + return int(j["viewer"]["message_threads"]["sync_sequence_id"]) + except (KeyError, ValueError): + # TODO: Proper exceptions + raise + + @attr.s(slots=True) class Mqtt: _state = attr.ib() @@ -21,6 +39,17 @@ class Mqtt: # Generate a random session ID between 1 and 9007199254740991 session_id = random.randint(1, 2 ** 53) + last_seq_id = fetch_sequence_id(state) + + messenger_sync_create_queue_payload = { + "sync_api_version": 10, + "max_deltas_able_to_process": 1000, + "delta_batch_size": 500, + "encoding": "JSON", + "entity_fbid": state.user_id, + "initial_titan_sequence_id": str(last_seq_id), + "device_params": None, + } username = { # The user ID @@ -62,7 +91,20 @@ class Mqtt: "/sr_res", ], # MQTT extension by FB, allows making a PUBLISH while CONNECTing - "pm": [], + "pm": [ + # This is required to actually receive messages + { + "topic": "/messenger_sync_create_queue", + "payload": _util.json_minimal(messenger_sync_create_queue_payload), + "qos": 1, + "messageId": 65536, + } + # The above is more efficient, but the same effect could have been + # acheived with: + # def on_connect(*args): + # mqtt.publish("/messenger_sync_create_queue", ..., qos=1) + # mqtt.on_connect = on_connect + ], # Unknown parameters "cp": 3, "ecp": 10, From 998fa43fb239b3691d6518859bc2532f082e6b3b Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sat, 4 Jan 2020 23:18:20 +0100 Subject: [PATCH 04/18] Refactor MQTT connecting --- fbchat/_mqtt.py | 62 +++++++++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index c17d110..a305f8a 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -22,46 +22,70 @@ def fetch_sequence_id(state): raise +HOST = "edge-chat.facebook.com" + + @attr.s(slots=True) class Mqtt: _state = attr.ib() _mqtt = attr.ib() + _session_id = attr.ib() - @classmethod - def connect(cls, state, foreground): - mqtt = paho.mqtt.client.Client( + def __init__(self, state): + self._state = state + + # Generate a random session ID between 1 and 9007199254740991 + self._session_id = random.randint(1, 2 ** 53) + + self._mqtt = paho.mqtt.client.Client( client_id="mqttwsclient", clean_session=True, protocol=paho.mqtt.client.MQTTv31, transport="websockets", ) - mqtt.enable_logger() + self._mqtt.enable_logger() + # self._mqtt.max_inflight_messages_set(20) + # self._mqtt.max_queued_messages_set(0) # unlimited + # self._mqtt.message_retry_set(5) + # self._mqtt.reconnect_delay_set(min_delay=1, max_delay=1) - # Generate a random session ID between 1 and 9007199254740991 - session_id = random.randint(1, 2 ** 53) - last_seq_id = fetch_sequence_id(state) + # TODO: Is region (lla | atn | odn | others?) important? + self._mqtt.ws_set_options( + path="/chat?sid={}".format(self._session_id), headers=self._create_headers + ) + self._mqtt.tls_set() + + def _create_headers(self, headers): + headers["Cookie"] = _util.get_cookie_header(self._state._session, HOST) + headers["User-Agent"] = self._state._session.headers["User-Agent"] + headers["Origin"] = "https://www.facebook.com" + headers["Host"] = HOST + return headers + + def connect(self, foreground): + last_seq_id = fetch_sequence_id(self._state) messenger_sync_create_queue_payload = { "sync_api_version": 10, "max_deltas_able_to_process": 1000, "delta_batch_size": 500, "encoding": "JSON", - "entity_fbid": state.user_id, + "entity_fbid": self._state.user_id, "initial_titan_sequence_id": str(last_seq_id), "device_params": None, } username = { # The user ID - "u": state.user_id, + "u": self._state.user_id, # Session ID - "s": session_id, + "s": self._session_id, # Active status setting "chat_on": True, # foreground_state - Whether the window is focused "fg": foreground, # Can be any random ID - "d": state._client_id, + "d": self._state._client_id, # Application ID, taken from facebook.com "aid": 219994525426954, # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing @@ -115,23 +139,11 @@ class Mqtt: "gas": None, "pack": [], } - mqtt.username_pw_set(_util.json_minimal(username)) + self._mqtt.username_pw_set(_util.json_minimal(username)) - headers = { - "Cookie": _util.get_cookie_header(state._session, "edge-chat.facebook.com"), - "User-Agent": state._session.headers["User-Agent"], - "Origin": "https://www.facebook.com", - "Host": "edge-chat.facebook.com", - } - - # TODO: Is region (lla | atn | odn | others?) important? - mqtt.ws_set_options(path="/chat?sid={}".format(session_id), headers=headers) - mqtt.tls_set() - response_code = mqtt.connect("edge-chat.facebook.com", 443, keepalive=10) + response_code = self._mqtt.connect(HOST, 443, keepalive=10) # TODO: Handle response code - return cls(state=state, mqtt=mqtt) - def listen(self, on_message): def real_on_message(client, userdata, message): on_message(message.topic, message.payload) From 766b0125fbe12585417920555b9ff26e57878f35 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 00:31:58 +0100 Subject: [PATCH 05/18] Refactor MQTT connecting, add sync token support --- fbchat/_mqtt.py | 186 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 126 insertions(+), 60 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index a305f8a..da9a588 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -1,89 +1,152 @@ import attr import random import paho.mqtt.client +from ._core import log from . import _util, _graphql -def fetch_sequence_id(state): - """Fetch sequence ID.""" - params = { - "limit": 1, - "tags": ["INBOX"], - "before": None, - "includeDeliveryReceipts": False, - "includeSeqID": True, - } - # Same request as in `Client.fetchThreadList` - (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) - try: - return int(j["viewer"]["message_threads"]["sync_sequence_id"]) - except (KeyError, ValueError): - # TODO: Proper exceptions - raise - - -HOST = "edge-chat.facebook.com" +def generate_session_id(): + """Generate a random session ID between 1 and 9007199254740991.""" + return random.randint(1, 2 ** 53) @attr.s(slots=True) class Mqtt: _state = attr.ib() _mqtt = attr.ib() - _session_id = attr.ib() + _on_message = attr.ib() + _chat_on = attr.ib() + _foreground = attr.ib() + _session_id = attr.ib(factory=generate_session_id) + _sync_token = attr.ib(None) - def __init__(self, state): - self._state = state + _HOST = "edge-chat.facebook.com" - # Generate a random session ID between 1 and 9007199254740991 - self._session_id = random.randint(1, 2 ** 53) - - self._mqtt = paho.mqtt.client.Client( + @classmethod + def connect(cls, state, on_message, chat_on, foreground): + mqtt = paho.mqtt.client.Client( client_id="mqttwsclient", clean_session=True, protocol=paho.mqtt.client.MQTTv31, transport="websockets", ) - self._mqtt.enable_logger() - # self._mqtt.max_inflight_messages_set(20) - # self._mqtt.max_queued_messages_set(0) # unlimited - # self._mqtt.message_retry_set(5) - # self._mqtt.reconnect_delay_set(min_delay=1, max_delay=1) - # TODO: Is region (lla | atn | odn | others?) important? - self._mqtt.ws_set_options( - path="/chat?sid={}".format(self._session_id), headers=self._create_headers + self = cls( + state=state, + mqtt=mqtt, + on_message=on_message, + chat_on=chat_on, + foreground=foreground, ) - self._mqtt.tls_set() + + mqtt.enable_logger() + # mqtt.max_inflight_messages_set(20) + # mqtt.max_queued_messages_set(0) # unlimited + # mqtt.message_retry_set(5) + # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) + # TODO: Is region (lla | atn | odn | others?) important? + mqtt.tls_set() + mqtt.ws_set_options( + path="/chat?sid={}".format(session_id), headers=self._create_headers + ) + mqtt.on_message = self._on_message_handler + + sequence_id = self._fetch_sequence_id(self._state) + # Set connect/reconnect data with an empty sync token and an newly fetched + # sequence id initially + self._set_reconnect_data(self._sync_token, sequence_id) + + # TODO: Handle response code + response_code = mqtt.connect(self._HOST, 443, keepalive=10) def _create_headers(self, headers): - headers["Cookie"] = _util.get_cookie_header(self._state._session, HOST) + log.debug("Fetching MQTT headers") + # TODO: Make this access thread safe + headers["Cookie"] = _util.get_cookie_header(self._state._session, self._HOST) headers["User-Agent"] = self._state._session.headers["User-Agent"] headers["Origin"] = "https://www.facebook.com" - headers["Host"] = HOST + headers["Host"] = self._HOST return headers - def connect(self, foreground): - last_seq_id = fetch_sequence_id(self._state) + def _on_message_handler(self, client, userdata, message): + j = _util.parse_json(message.payload) + if message.topic == "/t_ms": + sequence_id = None - messenger_sync_create_queue_payload = { + # Update sync_token when received + # This is received in the first message after we've created a messenger + # sync queue. + if "syncToken" in j and "firstDeltaSeqId" in j: + self._sync_token = j["syncToken"] + sequence_id = j["firstDeltaSeqId"] + + # Update last sequence id when received + if "lastIssuedSeqId" in j: + sequence_id = j["lastIssuedSeqId"] + + if sequence_id is not None: + self._set_reconnect_data(self._sync_token, sequence_id) + + # Call the external callback + self._on_message(message.topic, j) + + @staticmethod + def _fetch_sequence_id(state): + """Fetch sequence ID.""" + params = { + "limit": 1, + "tags": ["INBOX"], + "before": None, + "includeDeliveryReceipts": False, + "includeSeqID": True, + } + log.debug("Fetching MQTT sequence ID") + # Same request as in `Client.fetchThreadList` + (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + try: + return int(j["viewer"]["message_threads"]["sync_sequence_id"]) + except (KeyError, ValueError): + # TODO: Proper exceptions + raise + + @staticmethod + def _get_messenger_sync(state, sync_token, sequence_id): + """Get the data to configure receiving messages.""" + payload = { "sync_api_version": 10, "max_deltas_able_to_process": 1000, "delta_batch_size": 500, "encoding": "JSON", - "entity_fbid": self._state.user_id, - "initial_titan_sequence_id": str(last_seq_id), - "device_params": None, + "entity_fbid": state.user_id, } + # If we don't have a sync_token, create a new messenger queue + # This is done so that across reconnects, if we've received a sync token, we + # SHOULD receive a piece of data in /t_ms exactly once! + if sync_token is None: + topic = "/messenger_sync_create_queue" + payload["initial_titan_sequence_id"] = str(sequence_id) + payload["device_params"] = None + else: + topic = "/messenger_sync_get_diffs" + payload["last_seq_id"] = str(sequence_id) + payload["sync_token"] = sync_token + + return topic, payload + + def _set_reconnect_data(self, sync_token, sequence_id): + log.debug("Setting MQTT reconnect data: %s/%s", sync_token, sequence_id) + topic, payload = self._get_messenger_sync(self._state, sync_token, sequence_id) + username = { # The user ID "u": self._state.user_id, # Session ID "s": self._session_id, # Active status setting - "chat_on": True, + "chat_on": self._chat_on, # foreground_state - Whether the window is focused - "fg": foreground, + "fg": self._foreground, # Can be any random ID "d": self._state._client_id, # Application ID, taken from facebook.com @@ -116,17 +179,16 @@ class Mqtt: ], # MQTT extension by FB, allows making a PUBLISH while CONNECTing "pm": [ - # This is required to actually receive messages { - "topic": "/messenger_sync_create_queue", - "payload": _util.json_minimal(messenger_sync_create_queue_payload), + "topic": topic, + "payload": _util.json_minimal(payload), "qos": 1, "messageId": 65536, } # The above is more efficient, but the same effect could have been # acheived with: # def on_connect(*args): - # mqtt.publish("/messenger_sync_create_queue", ..., qos=1) + # mqtt.publish(topic, payload=..., qos=1) # mqtt.on_connect = on_connect ], # Unknown parameters @@ -139,24 +201,28 @@ class Mqtt: "gas": None, "pack": [], } + + # TODO: Make this thread safe self._mqtt.username_pw_set(_util.json_minimal(username)) - response_code = self._mqtt.connect(HOST, 443, keepalive=10) - # TODO: Handle response code - - def listen(self, on_message): - def real_on_message(client, userdata, message): - on_message(message.topic, message.payload) - - self._mqtt.on_message = real_on_message - + def listen(self): self._mqtt.loop_forever() # TODO: retry_first_connection=True? def disconnect(self): self._mqtt.disconnect() - def set_foreground(self, state): - payload = _util.json_minimal({"foreground": state}) + def set_foreground(self, value): + payload = _util.json_minimal({"foreground": value}) info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) + self._foreground = value # TODO: We can't wait for this, since the loop is running with .loop_forever() # info.wait_for_publish() + + # def set_client_settings(self, available_when_in_foreground: bool): + # data = {"make_user_available_when_in_foreground": available_when_in_foreground} + # payload = _util.json_minimal(data) + # info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) + # + # def send_additional_contacts(self, additional_contacts): + # payload = _util.json_minimal({"additional_contacts": additional_contacts}) + # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) From a298e0cf165e1c670fc97e17c0a478b7ae039d41 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 14:56:01 +0100 Subject: [PATCH 06/18] Refactor MQTT to do proper reconnecting --- fbchat/_mqtt.py | 146 ++++++++++++++++++++++++++++-------------------- 1 file changed, 86 insertions(+), 60 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index da9a588..3875e88 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -2,7 +2,7 @@ import attr import random import paho.mqtt.client from ._core import log -from . import _util, _graphql +from . import _util, _exception, _graphql def generate_session_id(): @@ -17,7 +17,7 @@ class Mqtt: _on_message = attr.ib() _chat_on = attr.ib() _foreground = attr.ib() - _session_id = attr.ib(factory=generate_session_id) + _sequence_id = attr.ib() _sync_token = attr.ib(None) _HOST = "edge-chat.facebook.com" @@ -30,15 +30,6 @@ class Mqtt: protocol=paho.mqtt.client.MQTTv31, transport="websockets", ) - - self = cls( - state=state, - mqtt=mqtt, - on_message=on_message, - chat_on=chat_on, - foreground=foreground, - ) - mqtt.enable_logger() # mqtt.max_inflight_messages_set(20) # mqtt.max_queued_messages_set(0) # unlimited @@ -46,46 +37,46 @@ class Mqtt: # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) # TODO: Is region (lla | atn | odn | others?) important? mqtt.tls_set() - mqtt.ws_set_options( - path="/chat?sid={}".format(session_id), headers=self._create_headers - ) - mqtt.on_message = self._on_message_handler - sequence_id = self._fetch_sequence_id(self._state) - # Set connect/reconnect data with an empty sync token and an newly fetched - # sequence id initially - self._set_reconnect_data(self._sync_token, sequence_id) + self = cls( + state=state, + mqtt=mqtt, + on_message=on_message, + chat_on=chat_on, + foreground=foreground, + sequence_id=cls._fetch_sequence_id(state), + ) + + # Configure callbacks + mqtt.on_message = self._on_message_handler + mqtt.on_connect = self._on_connect_handler + + self._configure_connect_options() # TODO: Handle response code response_code = mqtt.connect(self._HOST, 443, keepalive=10) - def _create_headers(self, headers): - log.debug("Fetching MQTT headers") - # TODO: Make this access thread safe - headers["Cookie"] = _util.get_cookie_header(self._state._session, self._HOST) - headers["User-Agent"] = self._state._session.headers["User-Agent"] - headers["Origin"] = "https://www.facebook.com" - headers["Host"] = self._HOST - return headers + return self def _on_message_handler(self, client, userdata, message): - j = _util.parse_json(message.payload) - if message.topic == "/t_ms": - sequence_id = None + # Parse payload JSON + try: + j = _util.parse_json(message.payload) + except _exception.FBchatFacebookError: + log.exception("Failed parsing MQTT data as JSON: %r", message.payload) + return + if message.topic == "/t_ms": # Update sync_token when received # This is received in the first message after we've created a messenger # sync queue. if "syncToken" in j and "firstDeltaSeqId" in j: self._sync_token = j["syncToken"] - sequence_id = j["firstDeltaSeqId"] + self._sequence_id = j["firstDeltaSeqId"] # Update last sequence id when received if "lastIssuedSeqId" in j: - sequence_id = j["lastIssuedSeqId"] - - if sequence_id is not None: - self._set_reconnect_data(self._sync_token, sequence_id) + self._sequence_id = j["lastIssuedSeqId"] # Call the external callback self._on_message(message.topic, j) @@ -109,40 +100,39 @@ class Mqtt: # TODO: Proper exceptions raise - @staticmethod - def _get_messenger_sync(state, sync_token, sequence_id): - """Get the data to configure receiving messages.""" + def _on_connect_handler(self, client, userdata, flags, rc): + # configure receiving messages. payload = { "sync_api_version": 10, "max_deltas_able_to_process": 1000, "delta_batch_size": 500, "encoding": "JSON", - "entity_fbid": state.user_id, + "entity_fbid": self._state.user_id, } # If we don't have a sync_token, create a new messenger queue # This is done so that across reconnects, if we've received a sync token, we # SHOULD receive a piece of data in /t_ms exactly once! - if sync_token is None: + if self._sync_token is None: topic = "/messenger_sync_create_queue" - payload["initial_titan_sequence_id"] = str(sequence_id) + payload["initial_titan_sequence_id"] = str(self._sequence_id) payload["device_params"] = None else: topic = "/messenger_sync_get_diffs" - payload["last_seq_id"] = str(sequence_id) - payload["sync_token"] = sync_token + payload["last_seq_id"] = str(self._sequence_id) + payload["sync_token"] = self._sync_token - return topic, payload + self._mqtt.publish(topic, _util.json_minimal(payload), qos=1) - def _set_reconnect_data(self, sync_token, sequence_id): - log.debug("Setting MQTT reconnect data: %s/%s", sync_token, sequence_id) - topic, payload = self._get_messenger_sync(self._state, sync_token, sequence_id) + def _configure_connect_options(self): + # Generate a new session ID on each reconnect + session_id = generate_session_id() username = { # The user ID "u": self._state.user_id, # Session ID - "s": self._session_id, + "s": session_id, # Active status setting "chat_on": self._chat_on, # foreground_state - Whether the window is focused @@ -178,18 +168,18 @@ class Mqtt: "/sr_res", ], # MQTT extension by FB, allows making a PUBLISH while CONNECTing + # Using this is more efficient, but the same can be acheived with: + # def on_connect(*args): + # mqtt.publish(topic, payload, qos=1) + # mqtt.on_connect = on_connect + # TODO: For some reason this doesn't work! "pm": [ - { - "topic": topic, - "payload": _util.json_minimal(payload), - "qos": 1, - "messageId": 65536, - } - # The above is more efficient, but the same effect could have been - # acheived with: - # def on_connect(*args): - # mqtt.publish(topic, payload=..., qos=1) - # mqtt.on_connect = on_connect + # { + # "topic": topic, + # "payload": payload, + # "qos": 1, + # "messageId": 65536, + # } ], # Unknown parameters "cp": 3, @@ -205,8 +195,44 @@ class Mqtt: # TODO: Make this thread safe self._mqtt.username_pw_set(_util.json_minimal(username)) + headers = { + # TODO: Make this access thread safe + "Cookie": _util.get_cookie_header(self._state._session, self._HOST), + "User-Agent": self._state._session.headers["User-Agent"], + "Origin": "https://www.facebook.com", + "Host": self._HOST, + } + + self._mqtt.ws_set_options( + path="/chat?sid={}".format(session_id), headers=headers + ) + def listen(self): - self._mqtt.loop_forever() # TODO: retry_first_connection=True? + while True: + rc = self._mqtt.loop(timeout=1.0) + if rc == paho.mqtt.client.MQTT_ERR_SUCCESS: + continue # No errors + + # If disconnect() has been called + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + break + + # Wait before reconnecting + self._mqtt._reconnect_wait() + + # Try reconnecting + self._configure_connect_options() + try: + self._mqtt.reconnect() + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ): + log.debug("MQTT connection failed") + + # self._mqtt.loop_forever() # TODO: retry_first_connection=True? def disconnect(self): self._mqtt.disconnect() From d1cb866b44be90c40963b793384be51ce82b3038 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:57:52 +0100 Subject: [PATCH 07/18] Refactor MQTT listening --- fbchat/_mqtt.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 3875e88..4afb810 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -78,6 +78,8 @@ class Mqtt: if "lastIssuedSeqId" in j: self._sequence_id = j["lastIssuedSeqId"] + log.debug("MQTT payload: %s", j) + # Call the external callback self._on_message(message.topic, j) @@ -207,16 +209,18 @@ class Mqtt: path="/chat?sid={}".format(session_id), headers=headers ) - def listen(self): - while True: - rc = self._mqtt.loop(timeout=1.0) - if rc == paho.mqtt.client.MQTT_ERR_SUCCESS: - continue # No errors + def loop_once(self): + """Run the listening loop once. - # If disconnect() has been called - if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: - break + Returns whether to keep listening or not. + """ + rc = self._mqtt.loop(timeout=1.0) + # If disconnect() has been called + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + return False # Stop listening + + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: # Wait before reconnecting self._mqtt._reconnect_wait() @@ -232,7 +236,7 @@ class Mqtt: ): log.debug("MQTT connection failed") - # self._mqtt.loop_forever() # TODO: retry_first_connection=True? + return True # Keep listening def disconnect(self): self._mqtt.disconnect() From 803bfa70845c50b10cb09cadefdb6f13433fbc96 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 18:48:48 +0100 Subject: [PATCH 08/18] Add proper MQTT error handling --- fbchat/_mqtt.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 4afb810..2d81131 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -53,8 +53,21 @@ class Mqtt: self._configure_connect_options() - # TODO: Handle response code - response_code = mqtt.connect(self._HOST, 443, keepalive=10) + # Attempt to connect + try: + rc = mqtt.connect(self._HOST, 443, keepalive=10) + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ) as e: + raise _exception.FBchatException("MQTT connection failed") from e + + # Raise error if connecting failed + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + err = paho.mqtt.client.error_string(rc) + raise _exception.FBchatException("MQTT connection failed: {}".format(err)) return self @@ -221,6 +234,9 @@ class Mqtt: return False # Stop listening if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + err = paho.mqtt.client.error_string(rc) + log.warning("MQTT Error: %s", err) + # Wait before reconnecting self._mqtt._reconnect_wait() From a1b80a7abb131eab4efc9f1e0e4080943779b30e Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:42:07 +0100 Subject: [PATCH 09/18] Replace pull channel with MQTT setup --- fbchat/_client.py | 136 ++++++++++++---------------------------------- fbchat/_mqtt.py | 22 ++++++-- 2 files changed, 52 insertions(+), 106 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index b5ab257..41b9e14 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -12,6 +12,7 @@ from ._util import * from .models import * from . import _graphql from ._state import State +from ._mqtt import Mqtt import time import json @@ -85,13 +86,11 @@ class Client(object): Raises: FBchatException: On failed login """ - self._sticky, self._pool = (None, None) - self._seq = "0" self._default_thread_id = None self._default_thread_type = None - self._pull_channel = 0 self._markAlive = True self._buddylist = dict() + self._mqtt = None handler.setLevel(logging_level) @@ -2169,38 +2168,6 @@ class Client(object): LISTEN METHODS """ - def _ping(self): - data = { - "seq": self._seq, - "channel": "p_" + self._uid, - "clientid": self._state._client_id, - "partition": -2, - "cap": 0, - "uid": self._uid, - "sticky_token": self._sticky, - "sticky_pool": self._pool, - "viewer_uid": self._uid, - "state": "active", - } - j = self._get( - "https://{}-edge-chat.facebook.com/active_ping".format(self._pull_channel), - data, - ) - - def _pullMessage(self): - """Call pull api to fetch message data.""" - data = { - "seq": self._seq, - "msgs_recv": 0, - "sticky_token": self._sticky, - "sticky_pool": self._pool, - "clientid": self._state._client_id, - "state": "active" if self._markAlive else "offline", - } - return self._get( - "https://{}-edge-chat.facebook.com/pull".format(self._pull_channel), data - ) - def _parseDelta(self, m): def getThreadIdAndThreadType(msg_metadata): """Return a tuple consisting of thread ID and thread type.""" @@ -2751,13 +2718,15 @@ class Client(object): else: self.onUnknownMesssageType(msg=m) - def _parse_payload(self, m): - mtype = m.get("type") + def _parse_payload(self, topic, m): # Things that directly change chat - if mtype == "delta": + if topic == "delta": self._parseDelta(m) + + # TODO: Remove old parsing below + # Inbox - elif mtype == "inbox": + elif topic == "inbox": self.onInbox( unseen=m["unseen"], unread=m["unread"], @@ -2766,7 +2735,7 @@ class Client(object): ) # Typing - elif mtype == "typ" or mtype == "ttyp": + elif topic in ["typ", "ttyp"]: author_id = str(m.get("from")) thread_id = m.get("thread_fbid") if thread_id: @@ -2794,20 +2763,12 @@ class Client(object): # # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) - elif mtype in ["jewel_requests_add"]: + elif topic == "jewel_requests_add": from_id = m["from"] self.onFriendRequest(from_id=from_id, msg=m) - # Happens on every login - elif mtype == "qprimer": - self.onQprimer(ts=m.get("made"), msg=m) - - # Is sent before any other message - elif mtype == "deltaflow": - pass - # Chat timestamp - elif mtype == "chatproxy-presence": + elif topic == "chatproxy-presence": statuses = dict() for id_, data in m.get("buddyList", {}).items(): statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) @@ -2816,7 +2777,7 @@ class Client(object): self.onChatTimestamp(buddylist=statuses, msg=m) # Buddylist overlay - elif mtype == "buddylist_overlay": + elif topic == "buddylist_overlay": statuses = dict() for id_, data in m.get("overlay", {}).items(): old_in_game = None @@ -2832,29 +2793,11 @@ class Client(object): else: self.onUnknownMesssageType(msg=m) - def _parseMessage(self, content): - """Get message and author name from content. - - May contain multiple messages in the content. - """ - self._seq = content.get("seq", "0") - - if "lb_info" in content: - self._sticky = content["lb_info"]["sticky"] - self._pool = content["lb_info"]["pool"] - - if "batches" in content: - for batch in content["batches"]: - self._parseMessage(batch) - - if "ms" not in content: - return - - for m in content["ms"]: - try: - self._parse_payload(m) - except Exception as e: - self.onMessageError(exception=e, msg=m) + def _parse_message(self, topic, data): + try: + self._parse_payload(topic, data) + except Exception as e: + self.onMessageError(exception=e, msg=data) def startListening(self): """Start listening from an external event loop. @@ -2862,6 +2805,15 @@ class Client(object): Raises: FBchatException: If request failed """ + if not self._mqtt: + self._mqtt = Mqtt.connect( + state=self._state, + on_message=self._parse_message, + chat_on=self._markAlive, + foreground=True, + ) + # Backwards compat + self.onQprimer(ts=now(), msg=None) self.listening = True def doOneListen(self, markAlive=None): @@ -2879,36 +2831,20 @@ class Client(object): """ if markAlive is not None: self._markAlive = markAlive - try: - if self._markAlive: - self._ping() - content = self._pullMessage() - if content: - self._parseMessage(content) - except KeyboardInterrupt: - return False - except requests.Timeout: - pass - except requests.ConnectionError: - # If the client has lost their internet connection, keep trying every 30 seconds - time.sleep(30) - except FBchatFacebookError as e: - # Fix 502 and 503 pull errors - if e.request_status_code in [502, 503]: - # Bump pull channel, while contraining withing 0-4 - self._pull_channel = (self._pull_channel + 1) % 5 - self.startListening() - else: - raise e - except Exception as e: - return self.onListenError(exception=e) - return True + # TODO: Remove this wierd check, and let the user handle the chat_on parameter + if self._markAlive != self._mqtt._chat_on: + self._mqtt.set_chat_on(self._markAlive) + + # TODO: Remove on_error param + return self._mqtt.loop_once(on_error=self.onListenError) def stopListening(self): - """Clean up the variables from `Client.startListening`.""" + """Stop the listening loop.""" + if not self._mqtt: + raise ValueError("Not listening") + self._mqtt.disconnect() self.listening = False - self._sticky, self._pool = (None, None) def listen(self, markAlive=None): """Initialize and runs the listening loop continually. diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 2d81131..0a6070f 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -222,7 +222,7 @@ class Mqtt: path="/chat?sid={}".format(session_id), headers=headers ) - def loop_once(self): + def loop_once(self, on_error=None): """Run the listening loop once. Returns whether to keep listening or not. @@ -236,6 +236,12 @@ class Mqtt: if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: err = paho.mqtt.client.error_string(rc) log.warning("MQTT Error: %s", err) + if on_error: + # Temporary to support on_error param + try: + raise _exception.FBchatException("MQTT Error: {}".format(err)) + except _exception.FBchatException as e: + on_error(exception=e) # Wait before reconnecting self._mqtt._reconnect_wait() @@ -264,11 +270,15 @@ class Mqtt: # TODO: We can't wait for this, since the loop is running with .loop_forever() # info.wait_for_publish() - # def set_client_settings(self, available_when_in_foreground: bool): - # data = {"make_user_available_when_in_foreground": available_when_in_foreground} - # payload = _util.json_minimal(data) - # info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) - # + def set_chat_on(self, value): + # TODO: Is this the right request to make? + data = {"make_user_available_when_in_foreground": value} + payload = _util.json_minimal(data) + info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) + self._chat_on = value + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() + # def send_additional_contacts(self, additional_contacts): # payload = _util.json_minimal({"additional_contacts": additional_contacts}) # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) From e9804d41847a1f11ff5d92796e94a7428c28b230 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:26:31 +0100 Subject: [PATCH 10/18] Fix message parsing --- fbchat/_client.py | 84 +++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 41b9e14..2799f98 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2168,7 +2168,7 @@ class Client(object): LISTEN METHODS """ - def _parseDelta(self, m): + def _parseDelta(self, delta): def getThreadIdAndThreadType(msg_metadata): """Return a tuple consisting of thread ID and thread type.""" id_thread = None @@ -2181,7 +2181,6 @@ class Client(object): type_thread = ThreadType.USER return id_thread, type_thread - delta = m["delta"] delta_type = delta.get("type") delta_class = delta.get("class") metadata = delta.get("messageMetadata") @@ -2201,7 +2200,7 @@ class Client(object): author_id=author_id, thread_id=thread_id, ts=ts, - msg=m, + msg=delta, ) # Left/removed participants @@ -2214,7 +2213,7 @@ class Client(object): author_id=author_id, thread_id=thread_id, ts=ts, - msg=m, + msg=delta, ) # Color change @@ -2229,7 +2228,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Emoji change @@ -2244,7 +2243,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Thread title change @@ -2259,14 +2258,14 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Forced fetch elif delta_class == "ForcedFetch": mid = delta.get("messageId") if mid is None: - self.onUnknownMesssageType(msg=m) + self.onUnknownMesssageType(msg=delta) else: thread_id = str(delta["threadKey"]["threadFbId"]) fetch_info = self._forcedFetch(thread_id, mid) @@ -2288,7 +2287,7 @@ class Client(object): thread_id=thread_id, thread_type=ThreadType.GROUP, ts=ts, - msg=m, + msg=delta, ) # Nickname change @@ -2305,7 +2304,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Admin added or removed in a group thread @@ -2321,7 +2320,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) elif admin_event == "remove_admin": self.onAdminRemoved( @@ -2331,7 +2330,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Group approval mode change @@ -2345,7 +2344,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Message delivered @@ -2363,7 +2362,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Message seen @@ -2379,7 +2378,7 @@ class Client(object): seen_ts=seen_ts, ts=delivered_ts, metadata=metadata, - msg=m, + msg=delta, ) # Messages marked as seen @@ -2400,7 +2399,11 @@ class Client(object): # thread_id, thread_type = getThreadIdAndThreadType(delta) self.onMarkedSeen( - threads=threads, seen_ts=seen_ts, ts=delivered_ts, metadata=delta, msg=m + threads=threads, + seen_ts=seen_ts, + ts=delivered_ts, + metadata=delta, + msg=delta, ) # Game played @@ -2425,7 +2428,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Group call started/ended @@ -2443,7 +2446,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) elif call_status == "call_ended": self.onCallEnded( @@ -2455,7 +2458,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # User joined to group call @@ -2470,7 +2473,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Group poll event @@ -2489,7 +2492,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) elif event_type == "update_vote": # User voted on group poll @@ -2505,7 +2508,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan created @@ -2519,7 +2522,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan ended @@ -2532,7 +2535,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan edited @@ -2546,7 +2549,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan deleted @@ -2560,7 +2563,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan participation change @@ -2576,13 +2579,13 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Client payload (that weird numbers) elif delta_class == "ClientPayload": payload = json.loads("".join(chr(z) for z in delta["payload"])) - ts = m.get("ofd_ts") + ts = now() # Hack for d in payload.get("deltas", []): # Message reaction @@ -2603,7 +2606,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) else: self.onReactionRemoved( @@ -2612,7 +2615,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Viewer status change @@ -2629,7 +2632,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) else: self.onBlock( @@ -2637,7 +2640,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Live location info @@ -2655,7 +2658,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Message deletion @@ -2671,7 +2674,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) elif d.get("deltaMessageReply"): @@ -2690,7 +2693,7 @@ class Client(object): thread_type=thread_type, ts=message.timestamp, metadata=metadata, - msg=m, + msg=delta, ) # New message @@ -2711,17 +2714,20 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Unknown message type else: - self.onUnknownMesssageType(msg=m) + self.onUnknownMesssageType(msg=delta) def _parse_payload(self, topic, m): # Things that directly change chat - if topic == "delta": - self._parseDelta(m) + if topic == "/t_ms": + if "deltas" not in m: + return + for delta in m["deltas"]: + self._parseDelta(delta) # TODO: Remove old parsing below From afad38d8e1824559a3fc6f5f53142b1720168394 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:40:00 +0100 Subject: [PATCH 11/18] Fix chat timestamp parsing --- fbchat/_client.py | 30 +++++++++++------------------- fbchat/_user.py | 17 +++-------------- 2 files changed, 14 insertions(+), 33 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 2799f98..007a6b5 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2773,26 +2773,19 @@ class Client(object): from_id = m["from"] self.onFriendRequest(from_id=from_id, msg=m) - # Chat timestamp - elif topic == "chatproxy-presence": - statuses = dict() - for id_, data in m.get("buddyList", {}).items(): - statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) - self._buddylist[id_] = statuses[id_] + # Chat timestamp / Buddylist overlay + elif topic == "/orca_presence": + if m["list_type"] == "full": + self._buddylist = {} # Refresh internal list + statuses = dict() + for data in m["list"]: + user_id = str(data["u"]) + statuses[user_id] = ActiveStatus._from_orca_presence(data) + self._buddylist[user_id] = statuses[user_id] + + # TODO: Which one should we call? self.onChatTimestamp(buddylist=statuses, msg=m) - - # Buddylist overlay - elif topic == "buddylist_overlay": - statuses = dict() - for id_, data in m.get("overlay", {}).items(): - old_in_game = None - if id_ in self._buddylist: - old_in_game = self._buddylist[id_].in_game - - statuses[id_] = ActiveStatus._from_buddylist_overlay(data, old_in_game) - self._buddylist[id_] = statuses[id_] - self.onBuddylistOverlay(statuses=statuses, msg=m) # Unknown message type @@ -3807,7 +3800,6 @@ class Client(object): statuses (dict): Dictionary with user IDs as keys and :class:`ActiveStatus` as values msg: A full set of the data received """ - log.debug("Buddylist overlay received: {}".format(statuses)) def onUnknownMesssageType(self, msg=None): """Called when the client is listening, and some unknown data was received. diff --git a/fbchat/_user.py b/fbchat/_user.py index eae5fdb..e7ed86e 100644 --- a/fbchat/_user.py +++ b/fbchat/_user.py @@ -192,17 +192,6 @@ class ActiveStatus(object): in_game = attr.ib(None) @classmethod - def _from_chatproxy_presence(cls, id_, data): - return cls( - active=data["p"] in [2, 3] if "p" in data else None, - last_active=data.get("lat"), - in_game=int(id_) in data.get("gamers", {}), - ) - - @classmethod - def _from_buddylist_overlay(cls, data, in_game=None): - return cls( - active=data["a"] in [2, 3] if "a" in data else None, - last_active=data.get("la"), - in_game=None, - ) + def _from_orca_presence(cls, data): + # TODO: Handle `c` and `vc` keys (Probably some binary data) + return cls(active=data["p"] in [2, 3], last_active=data.get("l"), in_game=None) From e488f4a7dae052bbfb5df26f9f3afdb6cf0d6388 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:52:59 +0100 Subject: [PATCH 12/18] Fix typing status parsing Co-authored-by: Tulir Asokan --- fbchat/_client.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 007a6b5..7ab6941 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2741,19 +2741,13 @@ class Client(object): ) # Typing - elif topic in ["typ", "ttyp"]: - author_id = str(m.get("from")) - thread_id = m.get("thread_fbid") - if thread_id: - thread_type = ThreadType.GROUP - thread_id = str(thread_id) - else: - thread_type = ThreadType.USER - if author_id == self._uid: - thread_id = m.get("to") - else: - thread_id = author_id - typing_status = TypingStatus(m.get("st")) + elif topic in ("/thread_typing", "/orca_typing_notifications"): + author_id = str(m["sender_fbid"]) + thread_id = m.get("thread", author_id) + typing_status = TypingStatus(m.get("state")) + thread_type = ( + ThreadType.USER if thread_id == author_id else ThreadType.GROUP + ) self.onTyping( author_id=author_id, status=typing_status, From bc1e3edf1728672b2af9ffb9c312992829d13f48 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 20:29:44 +0100 Subject: [PATCH 13/18] Small fixes Handle more errors, and fix Client.stopListening --- fbchat/_client.py | 9 ++++++--- fbchat/_mqtt.py | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 7ab6941..f9fc529 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2834,10 +2834,13 @@ class Client(object): def stopListening(self): """Stop the listening loop.""" - if not self._mqtt: - raise ValueError("Not listening") - self._mqtt.disconnect() self.listening = False + if not self._mqtt: + return + self._mqtt.disconnect() + # TODO: Preserve the _mqtt object + # Currently, there's some issues when disconnecting + self._mqtt = None def listen(self, markAlive=None): """Initialize and runs the listening loop continually. diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 0a6070f..8504a3e 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -91,6 +91,12 @@ class Mqtt: if "lastIssuedSeqId" in j: self._sequence_id = j["lastIssuedSeqId"] + if "errorCode" in j: + # Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND + # 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' + log.error("MQTT error code %s received", j["errorCode"]) + # TODO: Consider resetting the sync_token and sequence ID here? + log.debug("MQTT payload: %s", j) # Call the external callback From 8c367af0ff70f7baa8b44079ca080ce75ccdf482 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 20:47:31 +0100 Subject: [PATCH 14/18] Fix Python 2.7 errors --- fbchat/_mqtt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 8504a3e..c7ce350 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -11,7 +11,7 @@ def generate_session_id(): @attr.s(slots=True) -class Mqtt: +class Mqtt(object): _state = attr.ib() _mqtt = attr.ib() _on_message = attr.ib() @@ -62,7 +62,7 @@ class Mqtt: OSError, paho.mqtt.client.WebsocketConnectionError, ) as e: - raise _exception.FBchatException("MQTT connection failed") from e + raise _exception.FBchatException("MQTT connection failed") # Raise error if connecting failed if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: From 3bb99541e7e4adb122abdad40d1442161004f435 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 23:11:10 +0100 Subject: [PATCH 15/18] Improve MQTT connection error reporting --- fbchat/_mqtt.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index c7ce350..b5a6628 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -76,7 +76,7 @@ class Mqtt(object): try: j = _util.parse_json(message.payload) except _exception.FBchatFacebookError: - log.exception("Failed parsing MQTT data as JSON: %r", message.payload) + log.exception("Failed parsing MQTT data on %s as JSON", message.topic) return if message.topic == "/t_ms": @@ -241,13 +241,15 @@ class Mqtt(object): if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: err = paho.mqtt.client.error_string(rc) - log.warning("MQTT Error: %s", err) - if on_error: - # Temporary to support on_error param - try: - raise _exception.FBchatException("MQTT Error: {}".format(err)) - except _exception.FBchatException as e: - on_error(exception=e) + + # If known/expected error + if rc in [paho.mqtt.client.MQTT_ERR_CONN_LOST]: + log.warning(err) + else: + log.warning("MQTT Error: %s", err) + # For backwards compatibility + if on_error: + on_error(exception=FBchatException("MQTT Error {}".format(err))) # Wait before reconnecting self._mqtt._reconnect_wait() @@ -262,7 +264,7 @@ class Mqtt(object): OSError, paho.mqtt.client.WebsocketConnectionError, ): - log.debug("MQTT connection failed") + log.debug("MQTT reconnection failed") return True # Keep listening From cf4c22898c745142153b8e7739b065c490d3bab5 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 00:00:19 +0100 Subject: [PATCH 16/18] Add undocumented _onSeen callback Mostly just to slowly document unknown events --- fbchat/_client.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index f9fc529..9c9d85f 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2231,6 +2231,12 @@ class Client(object): msg=delta, ) + elif delta_class == "MarkFolderSeen": + locations = [ + ThreadLocation(folder.lstrip("FOLDER_")) for folder in delta["folders"] + ] + self._onSeen(locations=locations, ts=delta["timestamp"], msg=delta) + # Emoji change elif delta_type == "change_thread_icon": new_emoji = delta["untypedData"]["thread_icon"] @@ -2741,6 +2747,8 @@ class Client(object): ) # Typing + # /thread_typing {'sender_fbid': X, 'state': 1, 'type': 'typ', 'thread': 'Y'} + # /orca_typing_notifications {'type': 'typ', 'sender_fbid': X, 'state': 0} elif topic in ("/thread_typing", "/orca_typing_notifications"): author_id = str(m["sender_fbid"]) thread_id = m.get("thread", author_id) @@ -2756,13 +2764,6 @@ class Client(object): msg=m, ) - # Delivered - - # Seen - # elif mtype == "m_read_receipt": - # - # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) - elif topic == "jewel_requests_add": from_id = m["from"] self.onFriendRequest(from_id=from_id, msg=m) @@ -3299,6 +3300,17 @@ class Client(object): """ log.info("Friend request from {}".format(from_id)) + def _onSeen(self, locations=None, ts=None, msg=None): + """ + Todo: + Document this, and make it public + + Args: + locations: --- + ts: A timestamp of the action + msg: A full set of the data received + """ + def onInbox(self, unseen=None, unread=None, recent_unread=None, msg=None): """ Todo: From e57265016eacaa3c592d57882040b9ba566a5134 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 10:27:40 +0100 Subject: [PATCH 17/18] Skip NoOp events --- fbchat/_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fbchat/_client.py b/fbchat/_client.py index 9c9d85f..3bac889 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2437,6 +2437,10 @@ class Client(object): msg=delta, ) + # Skip "no operation" events + elif delta_class == "NoOp": + pass + # Group call started/ended elif delta_type == "rtc_call_log": thread_id, thread_type = getThreadIdAndThreadType(metadata) From 67fd6ffdf653a5adf04816b9f3fac95b81bf9e1d Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 10:29:30 +0100 Subject: [PATCH 18/18] Better document MQTT topics --- fbchat/_mqtt.py | 65 +++++++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index b5a6628..3769ac8 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -97,7 +97,7 @@ class Mqtt(object): log.error("MQTT error code %s received", j["errorCode"]) # TODO: Consider resetting the sync_token and sequence ID here? - log.debug("MQTT payload: %s", j) + log.debug("MQTT payload: %s, %s", message.topic, j) # Call the external callback self._on_message(message.topic, j) @@ -149,6 +149,40 @@ class Mqtt(object): # Generate a new session ID on each reconnect session_id = generate_session_id() + topics = [ + # Things that happen in chats (e.g. messages) + "/t_ms", + # Group typing notifications + "/thread_typing", + # Private chat typing notifications + "/orca_typing_notifications", + # Active notifications + "/orca_presence", + # Other notifications not related to chats (e.g. friend requests) + "/legacy_web", + # Facebook's continuous error reporting/logging? + "/br_sr", + # Response to /br_sr + "/sr_res", + # TODO: Investigate the response from this! (A bunch of binary data) + # "/t_p", + # TODO: Find out what this does! + "/webrtc", + # TODO: Find out what this does! + "/onevc", + # TODO: Find out what this does! + "/notify_disconnect", + # Old, no longer active topics + # These are here just in case something interesting pops up + "/inbox", + "/mercury", + "/messaging_events", + "/orca_message_notifications", + "/pp", + "/t_rtc", + "/webrtc_response", + ] + username = { # The user ID "u": self._state.user_id, @@ -163,31 +197,7 @@ class Mqtt(object): # Application ID, taken from facebook.com "aid": 219994525426954, # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing - "st": [ - # TODO: Investigate the result from these - # "/inbox", - # "/mercury", - # "/messaging_events", - # "/orca_message_notifications", - # "/pp", - # "/t_p", - # "/t_rtc", - # "/webrtc_response", - "/legacy_web", - "/webrtc", - "/onevc", - # Things that happen in chats (e.g. messages) - "/t_ms", - # Group typing notifications - "/thread_typing", - # Private chat typing notifications - "/orca_typing_notifications", - "/notify_disconnect", - # Active notifications - "/orca_presence", - "/br_sr", - "/sr_res", - ], + "st": topics, # MQTT extension by FB, allows making a PUBLISH while CONNECTing # Using this is more efficient, but the same can be acheived with: # def on_connect(*args): @@ -290,3 +300,6 @@ class Mqtt(object): # def send_additional_contacts(self, additional_contacts): # payload = _util.json_minimal({"additional_contacts": additional_contacts}) # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) + # + # def browser_close(self): + # info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1)