From a1b80a7abb131eab4efc9f1e0e4080943779b30e Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Sun, 5 Jan 2020 19:42:07 +0100 Subject: [PATCH] Replace pull channel with MQTT setup --- fbchat/_client.py | 136 ++++++++++++---------------------------------- fbchat/_mqtt.py | 22 ++++++-- 2 files changed, 52 insertions(+), 106 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index b5ab257..41b9e14 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -12,6 +12,7 @@ from ._util import * from .models import * from . import _graphql from ._state import State +from ._mqtt import Mqtt import time import json @@ -85,13 +86,11 @@ class Client(object): Raises: FBchatException: On failed login """ - self._sticky, self._pool = (None, None) - self._seq = "0" self._default_thread_id = None self._default_thread_type = None - self._pull_channel = 0 self._markAlive = True self._buddylist = dict() + self._mqtt = None handler.setLevel(logging_level) @@ -2169,38 +2168,6 @@ class Client(object): LISTEN METHODS """ - def _ping(self): - data = { - "seq": self._seq, - "channel": "p_" + self._uid, - "clientid": self._state._client_id, - "partition": -2, - "cap": 0, - "uid": self._uid, - "sticky_token": self._sticky, - "sticky_pool": self._pool, - "viewer_uid": self._uid, - "state": "active", - } - j = self._get( - "https://{}-edge-chat.facebook.com/active_ping".format(self._pull_channel), - data, - ) - - def _pullMessage(self): - """Call pull api to fetch message data.""" - data = { - "seq": self._seq, - "msgs_recv": 0, - "sticky_token": self._sticky, - "sticky_pool": self._pool, - "clientid": self._state._client_id, - "state": "active" if self._markAlive else "offline", - } - return self._get( - "https://{}-edge-chat.facebook.com/pull".format(self._pull_channel), data - ) - def _parseDelta(self, m): def getThreadIdAndThreadType(msg_metadata): """Return a tuple consisting of thread ID and thread type.""" @@ -2751,13 +2718,15 @@ class Client(object): else: self.onUnknownMesssageType(msg=m) - def _parse_payload(self, m): - mtype = m.get("type") + def _parse_payload(self, topic, m): # Things that directly change chat - if mtype == "delta": + if topic == "delta": self._parseDelta(m) + + # TODO: Remove old parsing below + # Inbox - elif mtype == "inbox": + elif topic == "inbox": self.onInbox( unseen=m["unseen"], unread=m["unread"], @@ -2766,7 +2735,7 @@ class Client(object): ) # Typing - elif mtype == "typ" or mtype == "ttyp": + elif topic in ["typ", "ttyp"]: author_id = str(m.get("from")) thread_id = m.get("thread_fbid") if thread_id: @@ -2794,20 +2763,12 @@ class Client(object): # # self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) - elif mtype in ["jewel_requests_add"]: + elif topic == "jewel_requests_add": from_id = m["from"] self.onFriendRequest(from_id=from_id, msg=m) - # Happens on every login - elif mtype == "qprimer": - self.onQprimer(ts=m.get("made"), msg=m) - - # Is sent before any other message - elif mtype == "deltaflow": - pass - # Chat timestamp - elif mtype == "chatproxy-presence": + elif topic == "chatproxy-presence": statuses = dict() for id_, data in m.get("buddyList", {}).items(): statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) @@ -2816,7 +2777,7 @@ class Client(object): self.onChatTimestamp(buddylist=statuses, msg=m) # Buddylist overlay - elif mtype == "buddylist_overlay": + elif topic == "buddylist_overlay": statuses = dict() for id_, data in m.get("overlay", {}).items(): old_in_game = None @@ -2832,29 +2793,11 @@ class Client(object): else: self.onUnknownMesssageType(msg=m) - def _parseMessage(self, content): - """Get message and author name from content. - - May contain multiple messages in the content. - """ - self._seq = content.get("seq", "0") - - if "lb_info" in content: - self._sticky = content["lb_info"]["sticky"] - self._pool = content["lb_info"]["pool"] - - if "batches" in content: - for batch in content["batches"]: - self._parseMessage(batch) - - if "ms" not in content: - return - - for m in content["ms"]: - try: - self._parse_payload(m) - except Exception as e: - self.onMessageError(exception=e, msg=m) + def _parse_message(self, topic, data): + try: + self._parse_payload(topic, data) + except Exception as e: + self.onMessageError(exception=e, msg=data) def startListening(self): """Start listening from an external event loop. @@ -2862,6 +2805,15 @@ class Client(object): Raises: FBchatException: If request failed """ + if not self._mqtt: + self._mqtt = Mqtt.connect( + state=self._state, + on_message=self._parse_message, + chat_on=self._markAlive, + foreground=True, + ) + # Backwards compat + self.onQprimer(ts=now(), msg=None) self.listening = True def doOneListen(self, markAlive=None): @@ -2879,36 +2831,20 @@ class Client(object): """ if markAlive is not None: self._markAlive = markAlive - try: - if self._markAlive: - self._ping() - content = self._pullMessage() - if content: - self._parseMessage(content) - except KeyboardInterrupt: - return False - except requests.Timeout: - pass - except requests.ConnectionError: - # If the client has lost their internet connection, keep trying every 30 seconds - time.sleep(30) - except FBchatFacebookError as e: - # Fix 502 and 503 pull errors - if e.request_status_code in [502, 503]: - # Bump pull channel, while contraining withing 0-4 - self._pull_channel = (self._pull_channel + 1) % 5 - self.startListening() - else: - raise e - except Exception as e: - return self.onListenError(exception=e) - return True + # TODO: Remove this wierd check, and let the user handle the chat_on parameter + if self._markAlive != self._mqtt._chat_on: + self._mqtt.set_chat_on(self._markAlive) + + # TODO: Remove on_error param + return self._mqtt.loop_once(on_error=self.onListenError) def stopListening(self): - """Clean up the variables from `Client.startListening`.""" + """Stop the listening loop.""" + if not self._mqtt: + raise ValueError("Not listening") + self._mqtt.disconnect() self.listening = False - self._sticky, self._pool = (None, None) def listen(self, markAlive=None): """Initialize and runs the listening loop continually. diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 2d81131..0a6070f 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -222,7 +222,7 @@ class Mqtt: path="/chat?sid={}".format(session_id), headers=headers ) - def loop_once(self): + def loop_once(self, on_error=None): """Run the listening loop once. Returns whether to keep listening or not. @@ -236,6 +236,12 @@ class Mqtt: if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: err = paho.mqtt.client.error_string(rc) log.warning("MQTT Error: %s", err) + if on_error: + # Temporary to support on_error param + try: + raise _exception.FBchatException("MQTT Error: {}".format(err)) + except _exception.FBchatException as e: + on_error(exception=e) # Wait before reconnecting self._mqtt._reconnect_wait() @@ -264,11 +270,15 @@ class Mqtt: # TODO: We can't wait for this, since the loop is running with .loop_forever() # info.wait_for_publish() - # def set_client_settings(self, available_when_in_foreground: bool): - # data = {"make_user_available_when_in_foreground": available_when_in_foreground} - # payload = _util.json_minimal(data) - # info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) - # + def set_chat_on(self, value): + # TODO: Is this the right request to make? + data = {"make_user_available_when_in_foreground": value} + payload = _util.json_minimal(data) + info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) + self._chat_on = value + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() + # def send_additional_contacts(self, additional_contacts): # payload = _util.json_minimal({"additional_contacts": additional_contacts}) # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1)