From 6b68916d74df483bc5e1fbe12c2082f39554bbc8 Mon Sep 17 00:00:00 2001 From: Marco Gavelli Date: Thu, 5 Sep 2019 20:02:51 +0200 Subject: [PATCH 01/32] Fix Python 2 only issue (str.split does not take keyword parameters) Fixes #469 --- fbchat/_message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fbchat/_message.py b/fbchat/_message.py index f2d3a8c..8c82b7a 100644 --- a/fbchat/_message.py +++ b/fbchat/_message.py @@ -26,7 +26,7 @@ class EmojiSize(Enum): "s": cls.SMALL, } for tag in tags or (): - data = tag.split(":", maxsplit=1) + data = tag.split(":", 1) if len(data) > 1 and data[0] == "hot_emoji_size": return string_to_emojisize.get(data[1]) return None From 3d28c958d35565dd2ae43691b9973301d23a513d Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Thu, 5 Sep 2019 20:07:44 +0200 Subject: [PATCH 02/32] =?UTF-8?q?Bump=20version:=201.8.1=20=E2=86=92=201.8?= =?UTF-8?q?.2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- fbchat/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6d42ea3..a4757d5 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.8.1 +current_version = 1.8.2 commit = True tag = True diff --git a/fbchat/__init__.py b/fbchat/__init__.py index fa53c73..74e1a8a 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -13,7 +13,7 @@ from ._client import Client from ._util import log # TODO: Remove this (from examples too) __title__ = "fbchat" -__version__ = "1.8.1" +__version__ = "1.8.2" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim" From bb1f7d929478942a07af20d9cce44418a9f9c7b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Asiel=20D=C3=ADaz=20Ben=C3=ADtez?= Date: Sun, 8 Sep 2019 09:56:27 -0400 Subject: [PATCH 03/32] Fix mimetypes.guess_type (#471) `mimetypes.guess_type` fails if the url is something like `http://example.com/file.zip?u=10`. Backported from 6bffb66 --- fbchat/_util.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fbchat/_util.py b/fbchat/_util.py index 58fa657..af06d42 100644 --- a/fbchat/_util.py +++ b/fbchat/_util.py @@ -219,11 +219,12 @@ def get_files_from_urls(file_urls): r = requests.get(file_url) # We could possibly use r.headers.get('Content-Disposition'), see # https://stackoverflow.com/a/37060758 + file_name = basename(file_url).split("?")[0].split("#")[0] files.append( ( - basename(file_url).split("?")[0].split("#")[0], + file_name, r.content, - r.headers.get("Content-Type") or guess_type(file_url)[0], + r.headers.get("Content-Type") or guess_type(file_name)[0], ) ) return files From 813219cd9c32175e7c502550d9ebdc434351cc22 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 8 Sep 2019 15:59:29 +0200 Subject: [PATCH 04/32] =?UTF-8?q?Bump=20version:=201.8.2=20=E2=86=92=201.8?= =?UTF-8?q?.3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- fbchat/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index a4757d5..ebf7240 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.8.2 +current_version = 1.8.3 commit = True tag = True diff --git a/fbchat/__init__.py b/fbchat/__init__.py index 74e1a8a..8422e42 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -13,7 +13,7 @@ from ._client import Client from ._util import log # TODO: Remove this (from examples too) __title__ = "fbchat" -__version__ = "1.8.2" +__version__ = "1.8.3" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim" From a97ef67411f3bb459c66204c689877ce4c1723d6 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 15 Dec 2019 15:26:53 +0100 Subject: [PATCH 05/32] Backport e348425 --- fbchat/_client.py | 28 +++++++++++++++------------- tests/test_fetch.py | 8 ++++---- tests/test_message_management.py | 2 +- tests/test_polls.py | 2 +- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 7c8b7ee..42b0dba 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -280,7 +280,7 @@ class Client(object): def _forcedFetch(self, thread_id, mid): params = {"thread_and_message_id": {"thread_id": thread_id, "message_id": mid}} - j, = self.graphql_requests(_graphql.from_doc_id("1768656253222505", params)) + (j,) = self.graphql_requests(_graphql.from_doc_id("1768656253222505", params)) return j def fetchThreads(self, thread_location, before=None, after=None, limit=None): @@ -404,7 +404,7 @@ class Client(object): FBchatException: If request failed """ params = {"search": name, "limit": limit} - j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_USER, params)) + (j,) = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_USER, params)) return [User._from_graphql(node) for node in j[name]["users"]["nodes"]] @@ -421,7 +421,7 @@ class Client(object): FBchatException: If request failed """ params = {"search": name, "limit": limit} - j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_PAGE, params)) + (j,) = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_PAGE, params)) return [Page._from_graphql(node) for node in j[name]["pages"]["nodes"]] @@ -439,7 +439,7 @@ class Client(object): FBchatException: If request failed """ params = {"search": name, "limit": limit} - j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_GROUP, params)) + (j,) = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_GROUP, params)) return [Group._from_graphql(node) for node in j["viewer"]["groups"]["nodes"]] @@ -457,7 +457,9 @@ class Client(object): FBchatException: If request failed """ params = {"search": name, "limit": limit} - j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_THREAD, params)) + (j,) = self.graphql_requests( + _graphql.from_query(_graphql.SEARCH_THREAD, params) + ) rtn = [] for node in j[name]["threads"]["nodes"]: @@ -764,7 +766,7 @@ class Client(object): "load_read_receipts": True, "before": before, } - j, = self.graphql_requests(_graphql.from_doc_id("1860982147341344", params)) + (j,) = self.graphql_requests(_graphql.from_doc_id("1860982147341344", params)) if j.get("message_thread") is None: raise FBchatException("Could not fetch thread {}: {}".format(thread_id, j)) @@ -823,7 +825,7 @@ class Client(object): "includeDeliveryReceipts": True, "includeSeqID": False, } - j, = self.graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + (j,) = self.graphql_requests(_graphql.from_doc_id("1349387578499440", params)) rtn = [] for node in j["viewer"]["message_threads"]["nodes"]: @@ -943,7 +945,7 @@ class Client(object): return Plan._from_fetch(j) def _getPrivateData(self): - j, = self.graphql_requests(_graphql.from_doc_id("1868889766468115", {})) + (j,) = self.graphql_requests(_graphql.from_doc_id("1868889766468115", {})) return j["viewer"] def getPhoneNumbers(self): @@ -994,7 +996,7 @@ class Client(object): thread_id, thread_type = self._getThread(thread_id, None) data = {"id": thread_id, "first": 48} thread_id = str(thread_id) - j, = self.graphql_requests(_graphql.from_query_id("515216185516880", data)) + (j,) = self.graphql_requests(_graphql.from_query_id("515216185516880", data)) while True: try: i = j[thread_id]["message_shared_media"]["edges"][0] @@ -1005,7 +1007,7 @@ class Client(object): data["after"] = j[thread_id]["message_shared_media"][ "page_info" ].get("end_cursor") - j, = self.graphql_requests( + (j,) = self.graphql_requests( _graphql.from_query_id("515216185516880", data) ) continue @@ -1534,7 +1536,7 @@ class Client(object): "response": "ACCEPT" if approve else "DENY", "surface": "ADMIN_MODEL_APPROVAL_CENTER", } - j, = self.graphql_requests( + (j,) = self.graphql_requests( _graphql.from_doc_id("1574519202665847", {"data": data}) ) @@ -1589,7 +1591,7 @@ class Client(object): Raises: FBchatException: If request failed """ - (image_id, mimetype), = self._upload(get_files_from_urls([image_url])) + ((image_id, mimetype),) = self._upload(get_files_from_urls([image_url])) return self._changeGroupImage(image_id, thread_id) def changeGroupImageLocal(self, image_path, thread_id=None): @@ -1603,7 +1605,7 @@ class Client(object): FBchatException: If request failed """ with get_files_from_paths([image_path]) as files: - (image_id, mimetype), = self._upload(files) + ((image_id, mimetype),) = self._upload(files) return self._changeGroupImage(image_id, thread_id) diff --git a/tests/test_fetch.py b/tests/test_fetch.py index d509fad..2c30900 100644 --- a/tests/test_fetch.py +++ b/tests/test_fetch.py @@ -27,7 +27,7 @@ def test_fetch_threads(client1): @pytest.mark.parametrize("emoji, emoji_size", EMOJI_LIST) def test_fetch_message_emoji(client, emoji, emoji_size): mid = client.sendEmoji(emoji, emoji_size) - message, = client.fetchThreadMessages(limit=1) + (message,) = client.fetchThreadMessages(limit=1) assert subset( vars(message), uid=mid, author=client.uid, text=emoji, emoji_size=emoji_size @@ -46,7 +46,7 @@ def test_fetch_message_info_emoji(client, thread, emoji, emoji_size): def test_fetch_message_mentions(client, thread, message_with_mentions): mid = client.send(message_with_mentions) - message, = client.fetchThreadMessages(limit=1) + (message,) = client.fetchThreadMessages(limit=1) assert subset( vars(message), uid=mid, author=client.uid, text=message_with_mentions.text @@ -71,7 +71,7 @@ def test_fetch_message_info_mentions(client, thread, message_with_mentions): @pytest.mark.parametrize("sticker", STICKER_LIST) def test_fetch_message_sticker(client, sticker): mid = client.send(Message(sticker=sticker)) - message, = client.fetchThreadMessages(limit=1) + (message,) = client.fetchThreadMessages(limit=1) assert subset(vars(message), uid=mid, author=client.uid) assert subset(vars(message.sticker), uid=sticker.uid) @@ -96,6 +96,6 @@ def test_fetch_info(client1, group): def test_fetch_image_url(client): client.sendLocalFiles([path.join(path.dirname(__file__), "resources", "image.png")]) - message, = client.fetchThreadMessages(limit=1) + (message,) = client.fetchThreadMessages(limit=1) assert client.fetchImageUrl(message.attachments[0].uid) diff --git a/tests/test_message_management.py b/tests/test_message_management.py index 00291ee..96f4537 100644 --- a/tests/test_message_management.py +++ b/tests/test_message_management.py @@ -19,5 +19,5 @@ def test_delete_messages(client): mid1 = client.sendMessage(text1) mid2 = client.sendMessage(text2) client.deleteMessages(mid2) - message, = client.fetchThreadMessages(limit=1) + (message,) = client.fetchThreadMessages(limit=1) assert subset(vars(message), uid=mid1, author=client.uid, text=text1) diff --git a/tests/test_polls.py b/tests/test_polls.py index 32c2457..17d9005 100644 --- a/tests/test_polls.py +++ b/tests/test_polls.py @@ -63,7 +63,7 @@ def test_create_poll(client1, group, catch_event, poll_data): for recv_option in event[ "poll" ].options: # The recieved options may not be the full list - old_option, = list(filter(lambda o: o.text == recv_option.text, poll.options)) + (old_option,) = list(filter(lambda o: o.text == recv_option.text, poll.options)) voters = [client1.uid] if old_option.vote else [] assert subset( vars(recv_option), voters=voters, votes_count=len(voters), vote=False From ffdf4222bf0a8b1f859e80800e7786d1ba314497 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 15 Dec 2019 15:30:02 +0100 Subject: [PATCH 06/32] Split ._parseMessage to reduce indentation --- fbchat/_client.py | 164 +++++++++++++++++++++++----------------------- 1 file changed, 82 insertions(+), 82 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 42b0dba..b5ab257 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2751,6 +2751,87 @@ class Client(object): else: self.onUnknownMesssageType(msg=m) + def _parse_payload(self, m): + mtype = m.get("type") + # Things that directly change chat + if mtype == "delta": + self._parseDelta(m) + # Inbox + elif mtype == "inbox": + self.onInbox( + unseen=m["unseen"], + unread=m["unread"], + recent_unread=m["recent_unread"], + msg=m, + ) + + # Typing + elif mtype == "typ" or mtype == "ttyp": + author_id = str(m.get("from")) + thread_id = m.get("thread_fbid") + if thread_id: + thread_type = ThreadType.GROUP + thread_id = str(thread_id) + else: + thread_type = ThreadType.USER + if author_id == self._uid: + thread_id = m.get("to") + else: + thread_id = author_id + typing_status = TypingStatus(m.get("st")) + self.onTyping( + author_id=author_id, + status=typing_status, + thread_id=thread_id, + thread_type=thread_type, + msg=m, + ) + + # Delivered + + # Seen + # elif mtype == "m_read_receipt": + # + # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) + + elif mtype in ["jewel_requests_add"]: + from_id = m["from"] + self.onFriendRequest(from_id=from_id, msg=m) + + # Happens on every login + elif mtype == "qprimer": + self.onQprimer(ts=m.get("made"), msg=m) + + # Is sent before any other message + elif mtype == "deltaflow": + pass + + # Chat timestamp + elif mtype == "chatproxy-presence": + statuses = dict() + for id_, data in m.get("buddyList", {}).items(): + statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) + self._buddylist[id_] = statuses[id_] + + self.onChatTimestamp(buddylist=statuses, msg=m) + + # Buddylist overlay + elif mtype == "buddylist_overlay": + statuses = dict() + for id_, data in m.get("overlay", {}).items(): + old_in_game = None + if id_ in self._buddylist: + old_in_game = self._buddylist[id_].in_game + + statuses[id_] = ActiveStatus._from_buddylist_overlay(data, old_in_game) + self._buddylist[id_] = statuses[id_] + + self.onBuddylistOverlay(statuses=statuses, msg=m) + + # Unknown message type + else: + self.onUnknownMesssageType(msg=m) + def _parseMessage(self, content): """Get message and author name from content. @@ -2770,89 +2851,8 @@ class Client(object): return for m in content["ms"]: - mtype = m.get("type") try: - # Things that directly change chat - if mtype == "delta": - self._parseDelta(m) - # Inbox - elif mtype == "inbox": - self.onInbox( - unseen=m["unseen"], - unread=m["unread"], - recent_unread=m["recent_unread"], - msg=m, - ) - - # Typing - elif mtype == "typ" or mtype == "ttyp": - author_id = str(m.get("from")) - thread_id = m.get("thread_fbid") - if thread_id: - thread_type = ThreadType.GROUP - thread_id = str(thread_id) - else: - thread_type = ThreadType.USER - if author_id == self._uid: - thread_id = m.get("to") - else: - thread_id = author_id - typing_status = TypingStatus(m.get("st")) - self.onTyping( - author_id=author_id, - status=typing_status, - thread_id=thread_id, - thread_type=thread_type, - msg=m, - ) - - # Delivered - - # Seen - # elif mtype == "m_read_receipt": - # - # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) - - elif mtype in ["jewel_requests_add"]: - from_id = m["from"] - self.onFriendRequest(from_id=from_id, msg=m) - - # Happens on every login - elif mtype == "qprimer": - self.onQprimer(ts=m.get("made"), msg=m) - - # Is sent before any other message - elif mtype == "deltaflow": - pass - - # Chat timestamp - elif mtype == "chatproxy-presence": - statuses = dict() - for id_, data in m.get("buddyList", {}).items(): - statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) - self._buddylist[id_] = statuses[id_] - - self.onChatTimestamp(buddylist=statuses, msg=m) - - # Buddylist overlay - elif mtype == "buddylist_overlay": - statuses = dict() - for id_, data in m.get("overlay", {}).items(): - old_in_game = None - if id_ in self._buddylist: - old_in_game = self._buddylist[id_].in_game - - statuses[id_] = ActiveStatus._from_buddylist_overlay( - data, old_in_game - ) - self._buddylist[id_] = statuses[id_] - - self.onBuddylistOverlay(statuses=statuses, msg=m) - - # Unknown message type - else: - self.onUnknownMesssageType(msg=m) - + self._parse_payload(m) except Exception as e: self.onMessageError(exception=e, msg=m) From ea518ba4c91e9587f827ef18fee9924e1e10a7e3 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sat, 4 Jan 2020 16:23:35 +0100 Subject: [PATCH 07/32] Add initial MQTT helper --- fbchat/_mqtt.py | 108 ++++++++++++++++++++++++++++++++++++++++++++++++ fbchat/_util.py | 12 ++++++ pyproject.toml | 1 + 3 files changed, 121 insertions(+) create mode 100644 fbchat/_mqtt.py diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py new file mode 100644 index 0000000..f01d6d2 --- /dev/null +++ b/fbchat/_mqtt.py @@ -0,0 +1,108 @@ +import attr +import random +import paho.mqtt.client +from . import _util, _graphql + + +@attr.s(slots=True) +class Mqtt: + _state = attr.ib() + _mqtt = attr.ib() + + @classmethod + def connect(cls, state, foreground): + mqtt = paho.mqtt.client.Client( + client_id="mqttwsclient", + clean_session=True, + protocol=paho.mqtt.client.MQTTv31, + transport="websockets", + ) + mqtt.enable_logger() + + # Generate a random session ID between 1 and 9007199254740991 + session_id = random.randint(1, 2 ** 53) + + username = { + # The user ID + "u": state.user_id, + # Session ID + "s": session_id, + # Active status setting + "chat_on": True, + # foreground_state - Whether the window is focused + "fg": foreground, + # Can be any random ID + "d": state._client_id, + # Application ID, taken from facebook.com + "aid": 219994525426954, + # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing + "st": [ + # TODO: Investigate the result from these + # "/inbox", + # "/mercury", + # "/messaging_events", + # "/orca_message_notifications", + # "/pp", + # "/t_p", + # "/t_rtc", + # "/webrtc_response", + "/legacy_web", + "/webrtc", + "/onevc", + # Things that happen in chats (e.g. messages) + "/t_ms", + # Group typing notifications + "/thread_typing", + # Private chat typing notifications + "/orca_typing_notifications", + "/notify_disconnect", + # Active notifications + "/orca_presence", + "/br_sr", + "/sr_res", + ], + # MQTT extension by FB, allows making a PUBLISH while CONNECTing + "pm": [], + # Unknown parameters + "cp": 3, + "ecp": 10, + "ct": "websocket", + "mqtt_sid": "", + "dc": "", + "no_auto_fg": True, + "gas": None, + "pack": [], + } + mqtt.username_pw_set(_util.json_minimal(username)) + + headers = { + "Cookie": _util.get_cookie_header(state._session, "edge-chat.facebook.com"), + "User-Agent": state._session.headers["User-Agent"], + "Origin": "https://www.facebook.com", + "Host": "edge-chat.facebook.com", + } + + # TODO: Is region (lla | atn | odn | others?) important? + mqtt.ws_set_options(path="/chat?sid={}".format(session_id), headers=headers) + mqtt.tls_set() + response_code = mqtt.connect("edge-chat.facebook.com", 443, keepalive=10) + # TODO: Handle response code + + return cls(state=state, mqtt=mqtt) + + def listen(self, on_message): + def real_on_message(client, userdata, message): + on_message(message.topic, message.payload) + + self._mqtt.on_message = real_on_message + + self._mqtt.loop_forever() # TODO: retry_first_connection=True? + + def disconnect(self): + self._mqtt.disconnect() + + def set_foreground(self, state): + payload = _util.json_minimal({"foreground": state}) + info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() diff --git a/fbchat/_util.py b/fbchat/_util.py index af06d42..6228989 100644 --- a/fbchat/_util.py +++ b/fbchat/_util.py @@ -57,6 +57,11 @@ def now(): return int(time() * 1000) +def json_minimal(data): + """Get JSON data in minimal form.""" + return json.dumps(data, separators=(",", ":")) + + def strip_json_cruft(text): """Removes `for(;;);` (and other cruft) that preceeds JSON responses.""" try: @@ -65,6 +70,13 @@ def strip_json_cruft(text): raise FBchatException("No JSON object found: {!r}".format(text)) +def get_cookie_header(session, host): + """Extract a cookie header from a requests session.""" + return requests.cookies.get_cookie_header( + session.cookies, requests.Request("GET", host), + ) + + def get_decoded_r(r): return get_decoded(r._content) diff --git a/pyproject.toml b/pyproject.toml index 136268d..29cbc28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ requires = [ "attrs>=18.2", "requests~=2.19", "beautifulsoup4~=4.0", + "paho-mqtt~=1.5", ] description-file = "README.rst" classifiers = [ From ecc6edac5a8ab8447189e1c4c11977ca0c351816 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sat, 4 Jan 2020 16:14:39 +0100 Subject: [PATCH 08/32] Fix message receiving in MQTT --- fbchat/_mqtt.py | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index f01d6d2..c17d110 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -4,6 +4,24 @@ import paho.mqtt.client from . import _util, _graphql +def fetch_sequence_id(state): + """Fetch sequence ID.""" + params = { + "limit": 1, + "tags": ["INBOX"], + "before": None, + "includeDeliveryReceipts": False, + "includeSeqID": True, + } + # Same request as in `Client.fetchThreadList` + (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + try: + return int(j["viewer"]["message_threads"]["sync_sequence_id"]) + except (KeyError, ValueError): + # TODO: Proper exceptions + raise + + @attr.s(slots=True) class Mqtt: _state = attr.ib() @@ -21,6 +39,17 @@ class Mqtt: # Generate a random session ID between 1 and 9007199254740991 session_id = random.randint(1, 2 ** 53) + last_seq_id = fetch_sequence_id(state) + + messenger_sync_create_queue_payload = { + "sync_api_version": 10, + "max_deltas_able_to_process": 1000, + "delta_batch_size": 500, + "encoding": "JSON", + "entity_fbid": state.user_id, + "initial_titan_sequence_id": str(last_seq_id), + "device_params": None, + } username = { # The user ID @@ -62,7 +91,20 @@ class Mqtt: "/sr_res", ], # MQTT extension by FB, allows making a PUBLISH while CONNECTing - "pm": [], + "pm": [ + # This is required to actually receive messages + { + "topic": "/messenger_sync_create_queue", + "payload": _util.json_minimal(messenger_sync_create_queue_payload), + "qos": 1, + "messageId": 65536, + } + # The above is more efficient, but the same effect could have been + # acheived with: + # def on_connect(*args): + # mqtt.publish("/messenger_sync_create_queue", ..., qos=1) + # mqtt.on_connect = on_connect + ], # Unknown parameters "cp": 3, "ecp": 10, From 998fa43fb239b3691d6518859bc2532f082e6b3b Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sat, 4 Jan 2020 23:18:20 +0100 Subject: [PATCH 09/32] Refactor MQTT connecting --- fbchat/_mqtt.py | 62 +++++++++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index c17d110..a305f8a 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -22,46 +22,70 @@ def fetch_sequence_id(state): raise +HOST = "edge-chat.facebook.com" + + @attr.s(slots=True) class Mqtt: _state = attr.ib() _mqtt = attr.ib() + _session_id = attr.ib() - @classmethod - def connect(cls, state, foreground): - mqtt = paho.mqtt.client.Client( + def __init__(self, state): + self._state = state + + # Generate a random session ID between 1 and 9007199254740991 + self._session_id = random.randint(1, 2 ** 53) + + self._mqtt = paho.mqtt.client.Client( client_id="mqttwsclient", clean_session=True, protocol=paho.mqtt.client.MQTTv31, transport="websockets", ) - mqtt.enable_logger() + self._mqtt.enable_logger() + # self._mqtt.max_inflight_messages_set(20) + # self._mqtt.max_queued_messages_set(0) # unlimited + # self._mqtt.message_retry_set(5) + # self._mqtt.reconnect_delay_set(min_delay=1, max_delay=1) - # Generate a random session ID between 1 and 9007199254740991 - session_id = random.randint(1, 2 ** 53) - last_seq_id = fetch_sequence_id(state) + # TODO: Is region (lla | atn | odn | others?) important? + self._mqtt.ws_set_options( + path="/chat?sid={}".format(self._session_id), headers=self._create_headers + ) + self._mqtt.tls_set() + + def _create_headers(self, headers): + headers["Cookie"] = _util.get_cookie_header(self._state._session, HOST) + headers["User-Agent"] = self._state._session.headers["User-Agent"] + headers["Origin"] = "https://www.facebook.com" + headers["Host"] = HOST + return headers + + def connect(self, foreground): + last_seq_id = fetch_sequence_id(self._state) messenger_sync_create_queue_payload = { "sync_api_version": 10, "max_deltas_able_to_process": 1000, "delta_batch_size": 500, "encoding": "JSON", - "entity_fbid": state.user_id, + "entity_fbid": self._state.user_id, "initial_titan_sequence_id": str(last_seq_id), "device_params": None, } username = { # The user ID - "u": state.user_id, + "u": self._state.user_id, # Session ID - "s": session_id, + "s": self._session_id, # Active status setting "chat_on": True, # foreground_state - Whether the window is focused "fg": foreground, # Can be any random ID - "d": state._client_id, + "d": self._state._client_id, # Application ID, taken from facebook.com "aid": 219994525426954, # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing @@ -115,23 +139,11 @@ class Mqtt: "gas": None, "pack": [], } - mqtt.username_pw_set(_util.json_minimal(username)) + self._mqtt.username_pw_set(_util.json_minimal(username)) - headers = { - "Cookie": _util.get_cookie_header(state._session, "edge-chat.facebook.com"), - "User-Agent": state._session.headers["User-Agent"], - "Origin": "https://www.facebook.com", - "Host": "edge-chat.facebook.com", - } - - # TODO: Is region (lla | atn | odn | others?) important? - mqtt.ws_set_options(path="/chat?sid={}".format(session_id), headers=headers) - mqtt.tls_set() - response_code = mqtt.connect("edge-chat.facebook.com", 443, keepalive=10) + response_code = self._mqtt.connect(HOST, 443, keepalive=10) # TODO: Handle response code - return cls(state=state, mqtt=mqtt) - def listen(self, on_message): def real_on_message(client, userdata, message): on_message(message.topic, message.payload) From 766b0125fbe12585417920555b9ff26e57878f35 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 00:31:58 +0100 Subject: [PATCH 10/32] Refactor MQTT connecting, add sync token support --- fbchat/_mqtt.py | 186 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 126 insertions(+), 60 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index a305f8a..da9a588 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -1,89 +1,152 @@ import attr import random import paho.mqtt.client +from ._core import log from . import _util, _graphql -def fetch_sequence_id(state): - """Fetch sequence ID.""" - params = { - "limit": 1, - "tags": ["INBOX"], - "before": None, - "includeDeliveryReceipts": False, - "includeSeqID": True, - } - # Same request as in `Client.fetchThreadList` - (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) - try: - return int(j["viewer"]["message_threads"]["sync_sequence_id"]) - except (KeyError, ValueError): - # TODO: Proper exceptions - raise - - -HOST = "edge-chat.facebook.com" +def generate_session_id(): + """Generate a random session ID between 1 and 9007199254740991.""" + return random.randint(1, 2 ** 53) @attr.s(slots=True) class Mqtt: _state = attr.ib() _mqtt = attr.ib() - _session_id = attr.ib() + _on_message = attr.ib() + _chat_on = attr.ib() + _foreground = attr.ib() + _session_id = attr.ib(factory=generate_session_id) + _sync_token = attr.ib(None) - def __init__(self, state): - self._state = state + _HOST = "edge-chat.facebook.com" - # Generate a random session ID between 1 and 9007199254740991 - self._session_id = random.randint(1, 2 ** 53) - - self._mqtt = paho.mqtt.client.Client( + @classmethod + def connect(cls, state, on_message, chat_on, foreground): + mqtt = paho.mqtt.client.Client( client_id="mqttwsclient", clean_session=True, protocol=paho.mqtt.client.MQTTv31, transport="websockets", ) - self._mqtt.enable_logger() - # self._mqtt.max_inflight_messages_set(20) - # self._mqtt.max_queued_messages_set(0) # unlimited - # self._mqtt.message_retry_set(5) - # self._mqtt.reconnect_delay_set(min_delay=1, max_delay=1) - # TODO: Is region (lla | atn | odn | others?) important? - self._mqtt.ws_set_options( - path="/chat?sid={}".format(self._session_id), headers=self._create_headers + self = cls( + state=state, + mqtt=mqtt, + on_message=on_message, + chat_on=chat_on, + foreground=foreground, ) - self._mqtt.tls_set() + + mqtt.enable_logger() + # mqtt.max_inflight_messages_set(20) + # mqtt.max_queued_messages_set(0) # unlimited + # mqtt.message_retry_set(5) + # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) + # TODO: Is region (lla | atn | odn | others?) important? + mqtt.tls_set() + mqtt.ws_set_options( + path="/chat?sid={}".format(session_id), headers=self._create_headers + ) + mqtt.on_message = self._on_message_handler + + sequence_id = self._fetch_sequence_id(self._state) + # Set connect/reconnect data with an empty sync token and an newly fetched + # sequence id initially + self._set_reconnect_data(self._sync_token, sequence_id) + + # TODO: Handle response code + response_code = mqtt.connect(self._HOST, 443, keepalive=10) def _create_headers(self, headers): - headers["Cookie"] = _util.get_cookie_header(self._state._session, HOST) + log.debug("Fetching MQTT headers") + # TODO: Make this access thread safe + headers["Cookie"] = _util.get_cookie_header(self._state._session, self._HOST) headers["User-Agent"] = self._state._session.headers["User-Agent"] headers["Origin"] = "https://www.facebook.com" - headers["Host"] = HOST + headers["Host"] = self._HOST return headers - def connect(self, foreground): - last_seq_id = fetch_sequence_id(self._state) + def _on_message_handler(self, client, userdata, message): + j = _util.parse_json(message.payload) + if message.topic == "/t_ms": + sequence_id = None - messenger_sync_create_queue_payload = { + # Update sync_token when received + # This is received in the first message after we've created a messenger + # sync queue. + if "syncToken" in j and "firstDeltaSeqId" in j: + self._sync_token = j["syncToken"] + sequence_id = j["firstDeltaSeqId"] + + # Update last sequence id when received + if "lastIssuedSeqId" in j: + sequence_id = j["lastIssuedSeqId"] + + if sequence_id is not None: + self._set_reconnect_data(self._sync_token, sequence_id) + + # Call the external callback + self._on_message(message.topic, j) + + @staticmethod + def _fetch_sequence_id(state): + """Fetch sequence ID.""" + params = { + "limit": 1, + "tags": ["INBOX"], + "before": None, + "includeDeliveryReceipts": False, + "includeSeqID": True, + } + log.debug("Fetching MQTT sequence ID") + # Same request as in `Client.fetchThreadList` + (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + try: + return int(j["viewer"]["message_threads"]["sync_sequence_id"]) + except (KeyError, ValueError): + # TODO: Proper exceptions + raise + + @staticmethod + def _get_messenger_sync(state, sync_token, sequence_id): + """Get the data to configure receiving messages.""" + payload = { "sync_api_version": 10, "max_deltas_able_to_process": 1000, "delta_batch_size": 500, "encoding": "JSON", - "entity_fbid": self._state.user_id, - "initial_titan_sequence_id": str(last_seq_id), - "device_params": None, + "entity_fbid": state.user_id, } + # If we don't have a sync_token, create a new messenger queue + # This is done so that across reconnects, if we've received a sync token, we + # SHOULD receive a piece of data in /t_ms exactly once! + if sync_token is None: + topic = "/messenger_sync_create_queue" + payload["initial_titan_sequence_id"] = str(sequence_id) + payload["device_params"] = None + else: + topic = "/messenger_sync_get_diffs" + payload["last_seq_id"] = str(sequence_id) + payload["sync_token"] = sync_token + + return topic, payload + + def _set_reconnect_data(self, sync_token, sequence_id): + log.debug("Setting MQTT reconnect data: %s/%s", sync_token, sequence_id) + topic, payload = self._get_messenger_sync(self._state, sync_token, sequence_id) + username = { # The user ID "u": self._state.user_id, # Session ID "s": self._session_id, # Active status setting - "chat_on": True, + "chat_on": self._chat_on, # foreground_state - Whether the window is focused - "fg": foreground, + "fg": self._foreground, # Can be any random ID "d": self._state._client_id, # Application ID, taken from facebook.com @@ -116,17 +179,16 @@ class Mqtt: ], # MQTT extension by FB, allows making a PUBLISH while CONNECTing "pm": [ - # This is required to actually receive messages { - "topic": "/messenger_sync_create_queue", - "payload": _util.json_minimal(messenger_sync_create_queue_payload), + "topic": topic, + "payload": _util.json_minimal(payload), "qos": 1, "messageId": 65536, } # The above is more efficient, but the same effect could have been # acheived with: # def on_connect(*args): - # mqtt.publish("/messenger_sync_create_queue", ..., qos=1) + # mqtt.publish(topic, payload=..., qos=1) # mqtt.on_connect = on_connect ], # Unknown parameters @@ -139,24 +201,28 @@ class Mqtt: "gas": None, "pack": [], } + + # TODO: Make this thread safe self._mqtt.username_pw_set(_util.json_minimal(username)) - response_code = self._mqtt.connect(HOST, 443, keepalive=10) - # TODO: Handle response code - - def listen(self, on_message): - def real_on_message(client, userdata, message): - on_message(message.topic, message.payload) - - self._mqtt.on_message = real_on_message - + def listen(self): self._mqtt.loop_forever() # TODO: retry_first_connection=True? def disconnect(self): self._mqtt.disconnect() - def set_foreground(self, state): - payload = _util.json_minimal({"foreground": state}) + def set_foreground(self, value): + payload = _util.json_minimal({"foreground": value}) info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) + self._foreground = value # TODO: We can't wait for this, since the loop is running with .loop_forever() # info.wait_for_publish() + + # def set_client_settings(self, available_when_in_foreground: bool): + # data = {"make_user_available_when_in_foreground": available_when_in_foreground} + # payload = _util.json_minimal(data) + # info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) + # + # def send_additional_contacts(self, additional_contacts): + # payload = _util.json_minimal({"additional_contacts": additional_contacts}) + # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) From a298e0cf165e1c670fc97e17c0a478b7ae039d41 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 14:56:01 +0100 Subject: [PATCH 11/32] Refactor MQTT to do proper reconnecting --- fbchat/_mqtt.py | 146 ++++++++++++++++++++++++++++-------------------- 1 file changed, 86 insertions(+), 60 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index da9a588..3875e88 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -2,7 +2,7 @@ import attr import random import paho.mqtt.client from ._core import log -from . import _util, _graphql +from . import _util, _exception, _graphql def generate_session_id(): @@ -17,7 +17,7 @@ class Mqtt: _on_message = attr.ib() _chat_on = attr.ib() _foreground = attr.ib() - _session_id = attr.ib(factory=generate_session_id) + _sequence_id = attr.ib() _sync_token = attr.ib(None) _HOST = "edge-chat.facebook.com" @@ -30,15 +30,6 @@ class Mqtt: protocol=paho.mqtt.client.MQTTv31, transport="websockets", ) - - self = cls( - state=state, - mqtt=mqtt, - on_message=on_message, - chat_on=chat_on, - foreground=foreground, - ) - mqtt.enable_logger() # mqtt.max_inflight_messages_set(20) # mqtt.max_queued_messages_set(0) # unlimited @@ -46,46 +37,46 @@ class Mqtt: # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) # TODO: Is region (lla | atn | odn | others?) important? mqtt.tls_set() - mqtt.ws_set_options( - path="/chat?sid={}".format(session_id), headers=self._create_headers - ) - mqtt.on_message = self._on_message_handler - sequence_id = self._fetch_sequence_id(self._state) - # Set connect/reconnect data with an empty sync token and an newly fetched - # sequence id initially - self._set_reconnect_data(self._sync_token, sequence_id) + self = cls( + state=state, + mqtt=mqtt, + on_message=on_message, + chat_on=chat_on, + foreground=foreground, + sequence_id=cls._fetch_sequence_id(state), + ) + + # Configure callbacks + mqtt.on_message = self._on_message_handler + mqtt.on_connect = self._on_connect_handler + + self._configure_connect_options() # TODO: Handle response code response_code = mqtt.connect(self._HOST, 443, keepalive=10) - def _create_headers(self, headers): - log.debug("Fetching MQTT headers") - # TODO: Make this access thread safe - headers["Cookie"] = _util.get_cookie_header(self._state._session, self._HOST) - headers["User-Agent"] = self._state._session.headers["User-Agent"] - headers["Origin"] = "https://www.facebook.com" - headers["Host"] = self._HOST - return headers + return self def _on_message_handler(self, client, userdata, message): - j = _util.parse_json(message.payload) - if message.topic == "/t_ms": - sequence_id = None + # Parse payload JSON + try: + j = _util.parse_json(message.payload) + except _exception.FBchatFacebookError: + log.exception("Failed parsing MQTT data as JSON: %r", message.payload) + return + if message.topic == "/t_ms": # Update sync_token when received # This is received in the first message after we've created a messenger # sync queue. if "syncToken" in j and "firstDeltaSeqId" in j: self._sync_token = j["syncToken"] - sequence_id = j["firstDeltaSeqId"] + self._sequence_id = j["firstDeltaSeqId"] # Update last sequence id when received if "lastIssuedSeqId" in j: - sequence_id = j["lastIssuedSeqId"] - - if sequence_id is not None: - self._set_reconnect_data(self._sync_token, sequence_id) + self._sequence_id = j["lastIssuedSeqId"] # Call the external callback self._on_message(message.topic, j) @@ -109,40 +100,39 @@ class Mqtt: # TODO: Proper exceptions raise - @staticmethod - def _get_messenger_sync(state, sync_token, sequence_id): - """Get the data to configure receiving messages.""" + def _on_connect_handler(self, client, userdata, flags, rc): + # configure receiving messages. payload = { "sync_api_version": 10, "max_deltas_able_to_process": 1000, "delta_batch_size": 500, "encoding": "JSON", - "entity_fbid": state.user_id, + "entity_fbid": self._state.user_id, } # If we don't have a sync_token, create a new messenger queue # This is done so that across reconnects, if we've received a sync token, we # SHOULD receive a piece of data in /t_ms exactly once! - if sync_token is None: + if self._sync_token is None: topic = "/messenger_sync_create_queue" - payload["initial_titan_sequence_id"] = str(sequence_id) + payload["initial_titan_sequence_id"] = str(self._sequence_id) payload["device_params"] = None else: topic = "/messenger_sync_get_diffs" - payload["last_seq_id"] = str(sequence_id) - payload["sync_token"] = sync_token + payload["last_seq_id"] = str(self._sequence_id) + payload["sync_token"] = self._sync_token - return topic, payload + self._mqtt.publish(topic, _util.json_minimal(payload), qos=1) - def _set_reconnect_data(self, sync_token, sequence_id): - log.debug("Setting MQTT reconnect data: %s/%s", sync_token, sequence_id) - topic, payload = self._get_messenger_sync(self._state, sync_token, sequence_id) + def _configure_connect_options(self): + # Generate a new session ID on each reconnect + session_id = generate_session_id() username = { # The user ID "u": self._state.user_id, # Session ID - "s": self._session_id, + "s": session_id, # Active status setting "chat_on": self._chat_on, # foreground_state - Whether the window is focused @@ -178,18 +168,18 @@ class Mqtt: "/sr_res", ], # MQTT extension by FB, allows making a PUBLISH while CONNECTing + # Using this is more efficient, but the same can be acheived with: + # def on_connect(*args): + # mqtt.publish(topic, payload, qos=1) + # mqtt.on_connect = on_connect + # TODO: For some reason this doesn't work! "pm": [ - { - "topic": topic, - "payload": _util.json_minimal(payload), - "qos": 1, - "messageId": 65536, - } - # The above is more efficient, but the same effect could have been - # acheived with: - # def on_connect(*args): - # mqtt.publish(topic, payload=..., qos=1) - # mqtt.on_connect = on_connect + # { + # "topic": topic, + # "payload": payload, + # "qos": 1, + # "messageId": 65536, + # } ], # Unknown parameters "cp": 3, @@ -205,8 +195,44 @@ class Mqtt: # TODO: Make this thread safe self._mqtt.username_pw_set(_util.json_minimal(username)) + headers = { + # TODO: Make this access thread safe + "Cookie": _util.get_cookie_header(self._state._session, self._HOST), + "User-Agent": self._state._session.headers["User-Agent"], + "Origin": "https://www.facebook.com", + "Host": self._HOST, + } + + self._mqtt.ws_set_options( + path="/chat?sid={}".format(session_id), headers=headers + ) + def listen(self): - self._mqtt.loop_forever() # TODO: retry_first_connection=True? + while True: + rc = self._mqtt.loop(timeout=1.0) + if rc == paho.mqtt.client.MQTT_ERR_SUCCESS: + continue # No errors + + # If disconnect() has been called + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + break + + # Wait before reconnecting + self._mqtt._reconnect_wait() + + # Try reconnecting + self._configure_connect_options() + try: + self._mqtt.reconnect() + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ): + log.debug("MQTT connection failed") + + # self._mqtt.loop_forever() # TODO: retry_first_connection=True? def disconnect(self): self._mqtt.disconnect() From d1cb866b44be90c40963b793384be51ce82b3038 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:57:52 +0100 Subject: [PATCH 12/32] Refactor MQTT listening --- fbchat/_mqtt.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 3875e88..4afb810 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -78,6 +78,8 @@ class Mqtt: if "lastIssuedSeqId" in j: self._sequence_id = j["lastIssuedSeqId"] + log.debug("MQTT payload: %s", j) + # Call the external callback self._on_message(message.topic, j) @@ -207,16 +209,18 @@ class Mqtt: path="/chat?sid={}".format(session_id), headers=headers ) - def listen(self): - while True: - rc = self._mqtt.loop(timeout=1.0) - if rc == paho.mqtt.client.MQTT_ERR_SUCCESS: - continue # No errors + def loop_once(self): + """Run the listening loop once. - # If disconnect() has been called - if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: - break + Returns whether to keep listening or not. + """ + rc = self._mqtt.loop(timeout=1.0) + # If disconnect() has been called + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + return False # Stop listening + + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: # Wait before reconnecting self._mqtt._reconnect_wait() @@ -232,7 +236,7 @@ class Mqtt: ): log.debug("MQTT connection failed") - # self._mqtt.loop_forever() # TODO: retry_first_connection=True? + return True # Keep listening def disconnect(self): self._mqtt.disconnect() From 803bfa70845c50b10cb09cadefdb6f13433fbc96 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 18:48:48 +0100 Subject: [PATCH 13/32] Add proper MQTT error handling --- fbchat/_mqtt.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 4afb810..2d81131 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -53,8 +53,21 @@ class Mqtt: self._configure_connect_options() - # TODO: Handle response code - response_code = mqtt.connect(self._HOST, 443, keepalive=10) + # Attempt to connect + try: + rc = mqtt.connect(self._HOST, 443, keepalive=10) + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ) as e: + raise _exception.FBchatException("MQTT connection failed") from e + + # Raise error if connecting failed + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + err = paho.mqtt.client.error_string(rc) + raise _exception.FBchatException("MQTT connection failed: {}".format(err)) return self @@ -221,6 +234,9 @@ class Mqtt: return False # Stop listening if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + err = paho.mqtt.client.error_string(rc) + log.warning("MQTT Error: %s", err) + # Wait before reconnecting self._mqtt._reconnect_wait() From a1b80a7abb131eab4efc9f1e0e4080943779b30e Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:42:07 +0100 Subject: [PATCH 14/32] Replace pull channel with MQTT setup --- fbchat/_client.py | 136 ++++++++++++---------------------------------- fbchat/_mqtt.py | 22 ++++++-- 2 files changed, 52 insertions(+), 106 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index b5ab257..41b9e14 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -12,6 +12,7 @@ from ._util import * from .models import * from . import _graphql from ._state import State +from ._mqtt import Mqtt import time import json @@ -85,13 +86,11 @@ class Client(object): Raises: FBchatException: On failed login """ - self._sticky, self._pool = (None, None) - self._seq = "0" self._default_thread_id = None self._default_thread_type = None - self._pull_channel = 0 self._markAlive = True self._buddylist = dict() + self._mqtt = None handler.setLevel(logging_level) @@ -2169,38 +2168,6 @@ class Client(object): LISTEN METHODS """ - def _ping(self): - data = { - "seq": self._seq, - "channel": "p_" + self._uid, - "clientid": self._state._client_id, - "partition": -2, - "cap": 0, - "uid": self._uid, - "sticky_token": self._sticky, - "sticky_pool": self._pool, - "viewer_uid": self._uid, - "state": "active", - } - j = self._get( - "https://{}-edge-chat.facebook.com/active_ping".format(self._pull_channel), - data, - ) - - def _pullMessage(self): - """Call pull api to fetch message data.""" - data = { - "seq": self._seq, - "msgs_recv": 0, - "sticky_token": self._sticky, - "sticky_pool": self._pool, - "clientid": self._state._client_id, - "state": "active" if self._markAlive else "offline", - } - return self._get( - "https://{}-edge-chat.facebook.com/pull".format(self._pull_channel), data - ) - def _parseDelta(self, m): def getThreadIdAndThreadType(msg_metadata): """Return a tuple consisting of thread ID and thread type.""" @@ -2751,13 +2718,15 @@ class Client(object): else: self.onUnknownMesssageType(msg=m) - def _parse_payload(self, m): - mtype = m.get("type") + def _parse_payload(self, topic, m): # Things that directly change chat - if mtype == "delta": + if topic == "delta": self._parseDelta(m) + + # TODO: Remove old parsing below + # Inbox - elif mtype == "inbox": + elif topic == "inbox": self.onInbox( unseen=m["unseen"], unread=m["unread"], @@ -2766,7 +2735,7 @@ class Client(object): ) # Typing - elif mtype == "typ" or mtype == "ttyp": + elif topic in ["typ", "ttyp"]: author_id = str(m.get("from")) thread_id = m.get("thread_fbid") if thread_id: @@ -2794,20 +2763,12 @@ class Client(object): # # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) - elif mtype in ["jewel_requests_add"]: + elif topic == "jewel_requests_add": from_id = m["from"] self.onFriendRequest(from_id=from_id, msg=m) - # Happens on every login - elif mtype == "qprimer": - self.onQprimer(ts=m.get("made"), msg=m) - - # Is sent before any other message - elif mtype == "deltaflow": - pass - # Chat timestamp - elif mtype == "chatproxy-presence": + elif topic == "chatproxy-presence": statuses = dict() for id_, data in m.get("buddyList", {}).items(): statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) @@ -2816,7 +2777,7 @@ class Client(object): self.onChatTimestamp(buddylist=statuses, msg=m) # Buddylist overlay - elif mtype == "buddylist_overlay": + elif topic == "buddylist_overlay": statuses = dict() for id_, data in m.get("overlay", {}).items(): old_in_game = None @@ -2832,29 +2793,11 @@ class Client(object): else: self.onUnknownMesssageType(msg=m) - def _parseMessage(self, content): - """Get message and author name from content. - - May contain multiple messages in the content. - """ - self._seq = content.get("seq", "0") - - if "lb_info" in content: - self._sticky = content["lb_info"]["sticky"] - self._pool = content["lb_info"]["pool"] - - if "batches" in content: - for batch in content["batches"]: - self._parseMessage(batch) - - if "ms" not in content: - return - - for m in content["ms"]: - try: - self._parse_payload(m) - except Exception as e: - self.onMessageError(exception=e, msg=m) + def _parse_message(self, topic, data): + try: + self._parse_payload(topic, data) + except Exception as e: + self.onMessageError(exception=e, msg=data) def startListening(self): """Start listening from an external event loop. @@ -2862,6 +2805,15 @@ class Client(object): Raises: FBchatException: If request failed """ + if not self._mqtt: + self._mqtt = Mqtt.connect( + state=self._state, + on_message=self._parse_message, + chat_on=self._markAlive, + foreground=True, + ) + # Backwards compat + self.onQprimer(ts=now(), msg=None) self.listening = True def doOneListen(self, markAlive=None): @@ -2879,36 +2831,20 @@ class Client(object): """ if markAlive is not None: self._markAlive = markAlive - try: - if self._markAlive: - self._ping() - content = self._pullMessage() - if content: - self._parseMessage(content) - except KeyboardInterrupt: - return False - except requests.Timeout: - pass - except requests.ConnectionError: - # If the client has lost their internet connection, keep trying every 30 seconds - time.sleep(30) - except FBchatFacebookError as e: - # Fix 502 and 503 pull errors - if e.request_status_code in [502, 503]: - # Bump pull channel, while contraining withing 0-4 - self._pull_channel = (self._pull_channel + 1) % 5 - self.startListening() - else: - raise e - except Exception as e: - return self.onListenError(exception=e) - return True + # TODO: Remove this wierd check, and let the user handle the chat_on parameter + if self._markAlive != self._mqtt._chat_on: + self._mqtt.set_chat_on(self._markAlive) + + # TODO: Remove on_error param + return self._mqtt.loop_once(on_error=self.onListenError) def stopListening(self): - """Clean up the variables from `Client.startListening`.""" + """Stop the listening loop.""" + if not self._mqtt: + raise ValueError("Not listening") + self._mqtt.disconnect() self.listening = False - self._sticky, self._pool = (None, None) def listen(self, markAlive=None): """Initialize and runs the listening loop continually. diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 2d81131..0a6070f 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -222,7 +222,7 @@ class Mqtt: path="/chat?sid={}".format(session_id), headers=headers ) - def loop_once(self): + def loop_once(self, on_error=None): """Run the listening loop once. Returns whether to keep listening or not. @@ -236,6 +236,12 @@ class Mqtt: if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: err = paho.mqtt.client.error_string(rc) log.warning("MQTT Error: %s", err) + if on_error: + # Temporary to support on_error param + try: + raise _exception.FBchatException("MQTT Error: {}".format(err)) + except _exception.FBchatException as e: + on_error(exception=e) # Wait before reconnecting self._mqtt._reconnect_wait() @@ -264,11 +270,15 @@ class Mqtt: # TODO: We can't wait for this, since the loop is running with .loop_forever() # info.wait_for_publish() - # def set_client_settings(self, available_when_in_foreground: bool): - # data = {"make_user_available_when_in_foreground": available_when_in_foreground} - # payload = _util.json_minimal(data) - # info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) - # + def set_chat_on(self, value): + # TODO: Is this the right request to make? + data = {"make_user_available_when_in_foreground": value} + payload = _util.json_minimal(data) + info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) + self._chat_on = value + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() + # def send_additional_contacts(self, additional_contacts): # payload = _util.json_minimal({"additional_contacts": additional_contacts}) # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) From e9804d41847a1f11ff5d92796e94a7428c28b230 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:26:31 +0100 Subject: [PATCH 15/32] Fix message parsing --- fbchat/_client.py | 84 +++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 41b9e14..2799f98 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2168,7 +2168,7 @@ class Client(object): LISTEN METHODS """ - def _parseDelta(self, m): + def _parseDelta(self, delta): def getThreadIdAndThreadType(msg_metadata): """Return a tuple consisting of thread ID and thread type.""" id_thread = None @@ -2181,7 +2181,6 @@ class Client(object): type_thread = ThreadType.USER return id_thread, type_thread - delta = m["delta"] delta_type = delta.get("type") delta_class = delta.get("class") metadata = delta.get("messageMetadata") @@ -2201,7 +2200,7 @@ class Client(object): author_id=author_id, thread_id=thread_id, ts=ts, - msg=m, + msg=delta, ) # Left/removed participants @@ -2214,7 +2213,7 @@ class Client(object): author_id=author_id, thread_id=thread_id, ts=ts, - msg=m, + msg=delta, ) # Color change @@ -2229,7 +2228,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Emoji change @@ -2244,7 +2243,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Thread title change @@ -2259,14 +2258,14 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Forced fetch elif delta_class == "ForcedFetch": mid = delta.get("messageId") if mid is None: - self.onUnknownMesssageType(msg=m) + self.onUnknownMesssageType(msg=delta) else: thread_id = str(delta["threadKey"]["threadFbId"]) fetch_info = self._forcedFetch(thread_id, mid) @@ -2288,7 +2287,7 @@ class Client(object): thread_id=thread_id, thread_type=ThreadType.GROUP, ts=ts, - msg=m, + msg=delta, ) # Nickname change @@ -2305,7 +2304,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Admin added or removed in a group thread @@ -2321,7 +2320,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) elif admin_event == "remove_admin": self.onAdminRemoved( @@ -2331,7 +2330,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Group approval mode change @@ -2345,7 +2344,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Message delivered @@ -2363,7 +2362,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Message seen @@ -2379,7 +2378,7 @@ class Client(object): seen_ts=seen_ts, ts=delivered_ts, metadata=metadata, - msg=m, + msg=delta, ) # Messages marked as seen @@ -2400,7 +2399,11 @@ class Client(object): # thread_id, thread_type = getThreadIdAndThreadType(delta) self.onMarkedSeen( - threads=threads, seen_ts=seen_ts, ts=delivered_ts, metadata=delta, msg=m + threads=threads, + seen_ts=seen_ts, + ts=delivered_ts, + metadata=delta, + msg=delta, ) # Game played @@ -2425,7 +2428,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Group call started/ended @@ -2443,7 +2446,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) elif call_status == "call_ended": self.onCallEnded( @@ -2455,7 +2458,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # User joined to group call @@ -2470,7 +2473,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Group poll event @@ -2489,7 +2492,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) elif event_type == "update_vote": # User voted on group poll @@ -2505,7 +2508,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan created @@ -2519,7 +2522,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan ended @@ -2532,7 +2535,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan edited @@ -2546,7 +2549,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan deleted @@ -2560,7 +2563,7 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Plan participation change @@ -2576,13 +2579,13 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Client payload (that weird numbers) elif delta_class == "ClientPayload": payload = json.loads("".join(chr(z) for z in delta["payload"])) - ts = m.get("ofd_ts") + ts = now() # Hack for d in payload.get("deltas", []): # Message reaction @@ -2603,7 +2606,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) else: self.onReactionRemoved( @@ -2612,7 +2615,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Viewer status change @@ -2629,7 +2632,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) else: self.onBlock( @@ -2637,7 +2640,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Live location info @@ -2655,7 +2658,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) # Message deletion @@ -2671,7 +2674,7 @@ class Client(object): thread_id=thread_id, thread_type=thread_type, ts=ts, - msg=m, + msg=delta, ) elif d.get("deltaMessageReply"): @@ -2690,7 +2693,7 @@ class Client(object): thread_type=thread_type, ts=message.timestamp, metadata=metadata, - msg=m, + msg=delta, ) # New message @@ -2711,17 +2714,20 @@ class Client(object): thread_type=thread_type, ts=ts, metadata=metadata, - msg=m, + msg=delta, ) # Unknown message type else: - self.onUnknownMesssageType(msg=m) + self.onUnknownMesssageType(msg=delta) def _parse_payload(self, topic, m): # Things that directly change chat - if topic == "delta": - self._parseDelta(m) + if topic == "/t_ms": + if "deltas" not in m: + return + for delta in m["deltas"]: + self._parseDelta(delta) # TODO: Remove old parsing below From afad38d8e1824559a3fc6f5f53142b1720168394 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:40:00 +0100 Subject: [PATCH 16/32] Fix chat timestamp parsing --- fbchat/_client.py | 30 +++++++++++------------------- fbchat/_user.py | 17 +++-------------- 2 files changed, 14 insertions(+), 33 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 2799f98..007a6b5 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2773,26 +2773,19 @@ class Client(object): from_id = m["from"] self.onFriendRequest(from_id=from_id, msg=m) - # Chat timestamp - elif topic == "chatproxy-presence": - statuses = dict() - for id_, data in m.get("buddyList", {}).items(): - statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) - self._buddylist[id_] = statuses[id_] + # Chat timestamp / Buddylist overlay + elif topic == "/orca_presence": + if m["list_type"] == "full": + self._buddylist = {} # Refresh internal list + statuses = dict() + for data in m["list"]: + user_id = str(data["u"]) + statuses[user_id] = ActiveStatus._from_orca_presence(data) + self._buddylist[user_id] = statuses[user_id] + + # TODO: Which one should we call? self.onChatTimestamp(buddylist=statuses, msg=m) - - # Buddylist overlay - elif topic == "buddylist_overlay": - statuses = dict() - for id_, data in m.get("overlay", {}).items(): - old_in_game = None - if id_ in self._buddylist: - old_in_game = self._buddylist[id_].in_game - - statuses[id_] = ActiveStatus._from_buddylist_overlay(data, old_in_game) - self._buddylist[id_] = statuses[id_] - self.onBuddylistOverlay(statuses=statuses, msg=m) # Unknown message type @@ -3807,7 +3800,6 @@ class Client(object): statuses (dict): Dictionary with user IDs as keys and :class:`ActiveStatus` as values msg: A full set of the data received """ - log.debug("Buddylist overlay received: {}".format(statuses)) def onUnknownMesssageType(self, msg=None): """Called when the client is listening, and some unknown data was received. diff --git a/fbchat/_user.py b/fbchat/_user.py index eae5fdb..e7ed86e 100644 --- a/fbchat/_user.py +++ b/fbchat/_user.py @@ -192,17 +192,6 @@ class ActiveStatus(object): in_game = attr.ib(None) @classmethod - def _from_chatproxy_presence(cls, id_, data): - return cls( - active=data["p"] in [2, 3] if "p" in data else None, - last_active=data.get("lat"), - in_game=int(id_) in data.get("gamers", {}), - ) - - @classmethod - def _from_buddylist_overlay(cls, data, in_game=None): - return cls( - active=data["a"] in [2, 3] if "a" in data else None, - last_active=data.get("la"), - in_game=None, - ) + def _from_orca_presence(cls, data): + # TODO: Handle `c` and `vc` keys (Probably some binary data) + return cls(active=data["p"] in [2, 3], last_active=data.get("l"), in_game=None) From e488f4a7dae052bbfb5df26f9f3afdb6cf0d6388 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:52:59 +0100 Subject: [PATCH 17/32] Fix typing status parsing Co-authored-by: Tulir Asokan --- fbchat/_client.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 007a6b5..7ab6941 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2741,19 +2741,13 @@ class Client(object): ) # Typing - elif topic in ["typ", "ttyp"]: - author_id = str(m.get("from")) - thread_id = m.get("thread_fbid") - if thread_id: - thread_type = ThreadType.GROUP - thread_id = str(thread_id) - else: - thread_type = ThreadType.USER - if author_id == self._uid: - thread_id = m.get("to") - else: - thread_id = author_id - typing_status = TypingStatus(m.get("st")) + elif topic in ("/thread_typing", "/orca_typing_notifications"): + author_id = str(m["sender_fbid"]) + thread_id = m.get("thread", author_id) + typing_status = TypingStatus(m.get("state")) + thread_type = ( + ThreadType.USER if thread_id == author_id else ThreadType.GROUP + ) self.onTyping( author_id=author_id, status=typing_status, From bc1e3edf1728672b2af9ffb9c312992829d13f48 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 20:29:44 +0100 Subject: [PATCH 18/32] Small fixes Handle more errors, and fix Client.stopListening --- fbchat/_client.py | 9 ++++++--- fbchat/_mqtt.py | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 7ab6941..f9fc529 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2834,10 +2834,13 @@ class Client(object): def stopListening(self): """Stop the listening loop.""" - if not self._mqtt: - raise ValueError("Not listening") - self._mqtt.disconnect() self.listening = False + if not self._mqtt: + return + self._mqtt.disconnect() + # TODO: Preserve the _mqtt object + # Currently, there's some issues when disconnecting + self._mqtt = None def listen(self, markAlive=None): """Initialize and runs the listening loop continually. diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 0a6070f..8504a3e 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -91,6 +91,12 @@ class Mqtt: if "lastIssuedSeqId" in j: self._sequence_id = j["lastIssuedSeqId"] + if "errorCode" in j: + # Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND + # 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' + log.error("MQTT error code %s received", j["errorCode"]) + # TODO: Consider resetting the sync_token and sequence ID here? + log.debug("MQTT payload: %s", j) # Call the external callback From 8c367af0ff70f7baa8b44079ca080ce75ccdf482 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 20:47:31 +0100 Subject: [PATCH 19/32] Fix Python 2.7 errors --- fbchat/_mqtt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 8504a3e..c7ce350 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -11,7 +11,7 @@ def generate_session_id(): @attr.s(slots=True) -class Mqtt: +class Mqtt(object): _state = attr.ib() _mqtt = attr.ib() _on_message = attr.ib() @@ -62,7 +62,7 @@ class Mqtt: OSError, paho.mqtt.client.WebsocketConnectionError, ) as e: - raise _exception.FBchatException("MQTT connection failed") from e + raise _exception.FBchatException("MQTT connection failed") # Raise error if connecting failed if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: From 3bb99541e7e4adb122abdad40d1442161004f435 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 23:11:10 +0100 Subject: [PATCH 20/32] Improve MQTT connection error reporting --- fbchat/_mqtt.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index c7ce350..b5a6628 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -76,7 +76,7 @@ class Mqtt(object): try: j = _util.parse_json(message.payload) except _exception.FBchatFacebookError: - log.exception("Failed parsing MQTT data as JSON: %r", message.payload) + log.exception("Failed parsing MQTT data on %s as JSON", message.topic) return if message.topic == "/t_ms": @@ -241,13 +241,15 @@ class Mqtt(object): if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: err = paho.mqtt.client.error_string(rc) - log.warning("MQTT Error: %s", err) - if on_error: - # Temporary to support on_error param - try: - raise _exception.FBchatException("MQTT Error: {}".format(err)) - except _exception.FBchatException as e: - on_error(exception=e) + + # If known/expected error + if rc in [paho.mqtt.client.MQTT_ERR_CONN_LOST]: + log.warning(err) + else: + log.warning("MQTT Error: %s", err) + # For backwards compatibility + if on_error: + on_error(exception=FBchatException("MQTT Error {}".format(err))) # Wait before reconnecting self._mqtt._reconnect_wait() @@ -262,7 +264,7 @@ class Mqtt(object): OSError, paho.mqtt.client.WebsocketConnectionError, ): - log.debug("MQTT connection failed") + log.debug("MQTT reconnection failed") return True # Keep listening From cf4c22898c745142153b8e7739b065c490d3bab5 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 00:00:19 +0100 Subject: [PATCH 21/32] Add undocumented _onSeen callback Mostly just to slowly document unknown events --- fbchat/_client.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index f9fc529..9c9d85f 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2231,6 +2231,12 @@ class Client(object): msg=delta, ) + elif delta_class == "MarkFolderSeen": + locations = [ + ThreadLocation(folder.lstrip("FOLDER_")) for folder in delta["folders"] + ] + self._onSeen(locations=locations, ts=delta["timestamp"], msg=delta) + # Emoji change elif delta_type == "change_thread_icon": new_emoji = delta["untypedData"]["thread_icon"] @@ -2741,6 +2747,8 @@ class Client(object): ) # Typing + # /thread_typing {'sender_fbid': X, 'state': 1, 'type': 'typ', 'thread': 'Y'} + # /orca_typing_notifications {'type': 'typ', 'sender_fbid': X, 'state': 0} elif topic in ("/thread_typing", "/orca_typing_notifications"): author_id = str(m["sender_fbid"]) thread_id = m.get("thread", author_id) @@ -2756,13 +2764,6 @@ class Client(object): msg=m, ) - # Delivered - - # Seen - # elif mtype == "m_read_receipt": - # - # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) - elif topic == "jewel_requests_add": from_id = m["from"] self.onFriendRequest(from_id=from_id, msg=m) @@ -3299,6 +3300,17 @@ class Client(object): """ log.info("Friend request from {}".format(from_id)) + def _onSeen(self, locations=None, ts=None, msg=None): + """ + Todo: + Document this, and make it public + + Args: + locations: --- + ts: A timestamp of the action + msg: A full set of the data received + """ + def onInbox(self, unseen=None, unread=None, recent_unread=None, msg=None): """ Todo: From e57265016eacaa3c592d57882040b9ba566a5134 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 10:27:40 +0100 Subject: [PATCH 22/32] Skip NoOp events --- fbchat/_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fbchat/_client.py b/fbchat/_client.py index 9c9d85f..3bac889 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2437,6 +2437,10 @@ class Client(object): msg=delta, ) + # Skip "no operation" events + elif delta_class == "NoOp": + pass + # Group call started/ended elif delta_type == "rtc_call_log": thread_id, thread_type = getThreadIdAndThreadType(metadata) From 67fd6ffdf653a5adf04816b9f3fac95b81bf9e1d Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 10:29:30 +0100 Subject: [PATCH 23/32] Better document MQTT topics --- fbchat/_mqtt.py | 65 +++++++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index b5a6628..3769ac8 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -97,7 +97,7 @@ class Mqtt(object): log.error("MQTT error code %s received", j["errorCode"]) # TODO: Consider resetting the sync_token and sequence ID here? - log.debug("MQTT payload: %s", j) + log.debug("MQTT payload: %s, %s", message.topic, j) # Call the external callback self._on_message(message.topic, j) @@ -149,6 +149,40 @@ class Mqtt(object): # Generate a new session ID on each reconnect session_id = generate_session_id() + topics = [ + # Things that happen in chats (e.g. messages) + "/t_ms", + # Group typing notifications + "/thread_typing", + # Private chat typing notifications + "/orca_typing_notifications", + # Active notifications + "/orca_presence", + # Other notifications not related to chats (e.g. friend requests) + "/legacy_web", + # Facebook's continuous error reporting/logging? + "/br_sr", + # Response to /br_sr + "/sr_res", + # TODO: Investigate the response from this! (A bunch of binary data) + # "/t_p", + # TODO: Find out what this does! + "/webrtc", + # TODO: Find out what this does! + "/onevc", + # TODO: Find out what this does! + "/notify_disconnect", + # Old, no longer active topics + # These are here just in case something interesting pops up + "/inbox", + "/mercury", + "/messaging_events", + "/orca_message_notifications", + "/pp", + "/t_rtc", + "/webrtc_response", + ] + username = { # The user ID "u": self._state.user_id, @@ -163,31 +197,7 @@ class Mqtt(object): # Application ID, taken from facebook.com "aid": 219994525426954, # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing - "st": [ - # TODO: Investigate the result from these - # "/inbox", - # "/mercury", - # "/messaging_events", - # "/orca_message_notifications", - # "/pp", - # "/t_p", - # "/t_rtc", - # "/webrtc_response", - "/legacy_web", - "/webrtc", - "/onevc", - # Things that happen in chats (e.g. messages) - "/t_ms", - # Group typing notifications - "/thread_typing", - # Private chat typing notifications - "/orca_typing_notifications", - "/notify_disconnect", - # Active notifications - "/orca_presence", - "/br_sr", - "/sr_res", - ], + "st": topics, # MQTT extension by FB, allows making a PUBLISH while CONNECTing # Using this is more efficient, but the same can be acheived with: # def on_connect(*args): @@ -290,3 +300,6 @@ class Mqtt(object): # def send_additional_contacts(self, additional_contacts): # payload = _util.json_minimal({"additional_contacts": additional_contacts}) # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) + # + # def browser_close(self): + # info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1) From b199d597b27a18ce3f9639a2699ef3ac95338dc6 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 10:57:19 +0100 Subject: [PATCH 24/32] =?UTF-8?q?Bump=20version:=201.8.3=20=E2=86=92=201.9?= =?UTF-8?q?.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- fbchat/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index ebf7240..253cb1a 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.8.3 +current_version = 1.9.0 commit = True tag = True diff --git a/fbchat/__init__.py b/fbchat/__init__.py index 8422e42..0dcf64b 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -13,7 +13,7 @@ from ._client import Client from ._util import log # TODO: Remove this (from examples too) __title__ = "fbchat" -__version__ = "1.8.3" +__version__ = "1.9.0" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim" From b4d3769fd5d08b5875f1afde66d04a03339753b8 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 13:14:07 +0100 Subject: [PATCH 25/32] Fix MQTT error handling - Fix "Out of memory" errors - Fix typo --- fbchat/_client.py | 2 +- fbchat/_mqtt.py | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 3bac889..582f89a 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2835,7 +2835,7 @@ class Client(object): self._mqtt.set_chat_on(self._markAlive) # TODO: Remove on_error param - return self._mqtt.loop_once(on_error=self.onListenError) + return self._mqtt.loop_once(on_error=lambda e: self.onListenError(exception=e)) def stopListening(self): """Stop the listening loop.""" diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 3769ac8..c35433c 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -31,9 +31,9 @@ class Mqtt(object): transport="websockets", ) mqtt.enable_logger() - # mqtt.max_inflight_messages_set(20) - # mqtt.max_queued_messages_set(0) # unlimited - # mqtt.message_retry_set(5) + # mqtt.max_inflight_messages_set(20) # The rest will get queued + # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued + # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) # TODO: Is region (lla | atn | odn | others?) important? mqtt.tls_set() @@ -250,16 +250,19 @@ class Mqtt(object): return False # Stop listening if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: - err = paho.mqtt.client.error_string(rc) - # If known/expected error - if rc in [paho.mqtt.client.MQTT_ERR_CONN_LOST]: - log.warning(err) + if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: + log.warning("Connection lost, retrying") + elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: + # This error is wrongly classified + # See https://github.com/eclipse/paho.mqtt.python/issues/340 + log.warning("Connection error, retrying") else: - log.warning("MQTT Error: %s", err) + err = paho.mqtt.client.error_string(rc) + log.error("MQTT Error: %s", err) # For backwards compatibility if on_error: - on_error(exception=FBchatException("MQTT Error {}".format(err))) + on_error(_exception.FBchatException("MQTT Error {}".format(err))) # Wait before reconnecting self._mqtt._reconnect_wait() @@ -273,8 +276,8 @@ class Mqtt(object): paho.mqtt.client.socket.error, OSError, paho.mqtt.client.WebsocketConnectionError, - ): - log.debug("MQTT reconnection failed") + ) as e: + log.debug("MQTT reconnection failed: %s", e) return True # Keep listening From c5f447e20b85c18d3bfef550a2e051ae025ebfb0 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Mon, 6 Jan 2020 13:23:39 +0100 Subject: [PATCH 26/32] =?UTF-8?q?Bump=20version:=201.9.0=20=E2=86=92=201.9?= =?UTF-8?q?.1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- fbchat/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 253cb1a..2c321f9 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.9.0 +current_version = 1.9.1 commit = True tag = True diff --git a/fbchat/__init__.py b/fbchat/__init__.py index 0dcf64b..ec592dc 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -13,7 +13,7 @@ from ._client import Client from ._util import log # TODO: Remove this (from examples too) __title__ = "fbchat" -__version__ = "1.9.0" +__version__ = "1.9.1" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim" From fb63ff0db86f6b3118b5f1b65f3f9f462afb5c10 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Wed, 8 Jan 2020 08:46:22 +0100 Subject: [PATCH 27/32] Fix cookie header extraction Only worked when the cookies were loaded from file, hence the reason I didn't spot it the first time. Thanks to @gave92 for the suggestion. Fixes #495 --- fbchat/_mqtt.py | 11 ++++++++++- fbchat/_util.py | 5 +++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index c35433c..cc9ad06 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -122,6 +122,13 @@ class Mqtt(object): raise def _on_connect_handler(self, client, userdata, flags, rc): + if rc == 21: + raise _exception.FBchatException( + "Failed connecting. Maybe your cookies are wrong?" + ) + if rc != 0: + return # Don't try to send publish if the connection failed + # configure receiving messages. payload = { "sync_api_version": 10, @@ -228,7 +235,9 @@ class Mqtt(object): headers = { # TODO: Make this access thread safe - "Cookie": _util.get_cookie_header(self._state._session, self._HOST), + "Cookie": _util.get_cookie_header( + self._state._session, "https://edge-chat.facebook.com/chat" + ), "User-Agent": self._state._session.headers["User-Agent"], "Origin": "https://www.facebook.com", "Host": self._HOST, diff --git a/fbchat/_util.py b/fbchat/_util.py index 6228989..920e374 100644 --- a/fbchat/_util.py +++ b/fbchat/_util.py @@ -70,10 +70,11 @@ def strip_json_cruft(text): raise FBchatException("No JSON object found: {!r}".format(text)) -def get_cookie_header(session, host): +def get_cookie_header(session, url): """Extract a cookie header from a requests session.""" + # The cookies are extracted this way to make sure they're escaped correctly return requests.cookies.get_cookie_header( - session.cookies, requests.Request("GET", host), + session.cookies, requests.Request("GET", url), ) From cb7f4a72d7a3bef5121415ddb791a2abf48b08fc Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Wed, 8 Jan 2020 08:47:16 +0100 Subject: [PATCH 28/32] =?UTF-8?q?Bump=20version:=201.9.1=20=E2=86=92=201.9?= =?UTF-8?q?.2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- fbchat/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 2c321f9..5f1ea8a 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.9.1 +current_version = 1.9.2 commit = True tag = True diff --git a/fbchat/__init__.py b/fbchat/__init__.py index ec592dc..4318ffb 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -13,7 +13,7 @@ from ._client import Client from ._util import log # TODO: Remove this (from examples too) __title__ = "fbchat" -__version__ = "1.9.1" +__version__ = "1.9.2" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim" From 4714be5697b665c531d230b9190c533c57938e9e Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Wed, 8 Jan 2020 09:35:26 +0100 Subject: [PATCH 29/32] Fix MQTT JSON decoding --- fbchat/_mqtt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index cc9ad06..a9f6e31 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -74,8 +74,8 @@ class Mqtt(object): def _on_message_handler(self, client, userdata, message): # Parse payload JSON try: - j = _util.parse_json(message.payload) - except _exception.FBchatFacebookError: + j = _util.parse_json(message.payload.decode("utf-8")) + except (_exception.FBchatFacebookError, UnicodeDecodeError): log.exception("Failed parsing MQTT data on %s as JSON", message.topic) return From 881aa9adcecf7554cfbeaa6fd471cdaa0405eeb3 Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Wed, 8 Jan 2020 09:38:18 +0100 Subject: [PATCH 30/32] =?UTF-8?q?Bump=20version:=201.9.2=20=E2=86=92=201.9?= =?UTF-8?q?.3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- fbchat/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 5f1ea8a..bda4cee 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.9.2 +current_version = 1.9.3 commit = True tag = True diff --git a/fbchat/__init__.py b/fbchat/__init__.py index 4318ffb..2fb012f 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -13,7 +13,7 @@ from ._client import Client from ._util import log # TODO: Remove this (from examples too) __title__ = "fbchat" -__version__ = "1.9.2" +__version__ = "1.9.3" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim" From 45303005b8b635fd27374fa0ba3600d7a1b7c3bb Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Tue, 14 Jan 2020 23:27:50 +0100 Subject: [PATCH 31/32] Fix onFriendRequest --- fbchat/_client.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index 582f89a..e491d19 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -2768,9 +2768,15 @@ class Client(object): msg=m, ) - elif topic == "jewel_requests_add": - from_id = m["from"] - self.onFriendRequest(from_id=from_id, msg=m) + # Other notifications + elif topic == "/legacy_web": + # Friend request + if m["type"] == "jewel_requests_add": + from_id = m["from"] + # TODO: from_id = str(from_id) + self.onFriendRequest(from_id=from_id, msg=m) + else: + self.onUnknownMesssageType(msg=m) # Chat timestamp / Buddylist overlay elif topic == "/orca_presence": From 9c81806b957c2a1575ee449fa18623f66f6753fc Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Tue, 14 Jan 2020 23:29:58 +0100 Subject: [PATCH 32/32] =?UTF-8?q?Bump=20version:=201.9.3=20=E2=86=92=201.9?= =?UTF-8?q?.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- fbchat/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index bda4cee..9938290 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.9.3 +current_version = 1.9.4 commit = True tag = True diff --git a/fbchat/__init__.py b/fbchat/__init__.py index 2fb012f..eb8153f 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -13,7 +13,7 @@ from ._client import Client from ._util import log # TODO: Remove this (from examples too) __title__ = "fbchat" -__version__ = "1.9.3" +__version__ = "1.9.4" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim"