diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6d42ea3..9938290 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.8.1 +current_version = 1.9.4 commit = True tag = True diff --git a/fbchat/__init__.py b/fbchat/__init__.py index 55405c4..13dd54e 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -45,7 +45,7 @@ from ._plan import GuestStatus, Plan, PlanData from ._client import Client __title__ = "fbchat" -__version__ = "1.8.1" +__version__ = "1.9.4" __description__ = "Facebook Chat (Messenger) for Python" __copyright__ = "Copyright 2015 - 2019 by Taehoon Kim" diff --git a/fbchat/_client.py b/fbchat/_client.py index dfa14b8..a69170a 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -7,6 +7,7 @@ from . import ( _exception, _util, _graphql, + _mqtt, _session, _poll, _user, @@ -51,12 +52,10 @@ class Client: Args: session: The session to use when making requests. """ - self._sticky, self._pool = (None, None) - self._seq = "0" - self._pull_channel = 0 self._mark_alive = True self._buddylist = dict() self._session = session + self._mqtt = None @property def session(self): @@ -608,41 +607,6 @@ class Client: LISTEN METHODS """ - def _ping(self): - data = { - "seq": self._seq, - "channel": "p_" + self.session.user_id, - "clientid": self.session._client_id, - "partition": -2, - "cap": 0, - "uid": self.session.user_id, - "sticky_token": self._sticky, - "sticky_pool": self._pool, - "viewer_uid": self.session.user_id, - "state": "active", - } - j = self.session._get( - "https://{}-edge-chat.facebook.com/active_ping".format(self._pull_channel), - data, - ) - _exception.handle_payload_error(j) - - def _pull_message(self): - """Call pull api to fetch message data.""" - data = { - "seq": self._seq, - "msgs_recv": 0, - "sticky_token": self._sticky, - "sticky_pool": self._pool, - "clientid": self.session._client_id, - "state": "active" if self._mark_alive else "offline", - } - j = self.session._get( - "https://{}-edge-chat.facebook.com/pull".format(self._pull_channel), data - ) - _exception.handle_payload_error(j) - return j - def _parse_delta(self, delta): def get_thread(data): if "threadFbId" in data["threadKey"]: @@ -696,6 +660,12 @@ class Client: metadata=metadata, ) + elif delta_class == "MarkFolderSeen": + locations = [ + ThreadLocation(folder.lstrip("FOLDER_")) for folder in delta["folders"] + ] + self._on_seen(locations=locations, at=at) + # Emoji change elif delta_type == "change_thread_icon": new_emoji = delta["untypedData"]["thread_icon"] @@ -861,6 +831,10 @@ class Client: metadata=metadata, ) + # Skip "no operation" events + elif delta_class == "NoOp": + pass + # Group call started/ended elif delta_type == "rtc_call_log": call_status = delta["untypedData"]["event"] @@ -989,7 +963,8 @@ class Client: # Client payload (that weird numbers) elif delta_class == "ClientPayload": payload = _util.parse_json("".join(chr(z) for z in delta["payload"])) - at = _util.millis_to_datetime(m.get("ofd_ts")) + # Hack + at = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) for d in payload.get("deltas", []): # Message reaction @@ -1096,132 +1071,106 @@ class Client: else: self.on_unknown_messsage_type(msg=delta) - def _parse_message(self, content): - """Get message and author name from content. + def _parse_payload(self, topic, m): + # Things that directly change chat + if topic == "/t_ms": + if "deltas" not in m: + return + for delta in m["deltas"]: + self._parse_delta(delta) - May contain multiple messages in the content. + # TODO: Remove old parsing below + + # Inbox + elif topic == "inbox": + self.on_inbox( + unseen=m["unseen"], + unread=m["unread"], + recent_unread=m["recent_unread"], + ) + + # Typing + # /thread_typing {'sender_fbid': X, 'state': 1, 'type': 'typ', 'thread': 'Y'} + # /orca_typing_notifications {'type': 'typ', 'sender_fbid': X, 'state': 0} + elif topic in ("/thread_typing", "/orca_typing_notifications"): + author_id = str(m["sender_fbid"]) + thread_id = m.get("thread") + if thread_id: + thread = _group.Group(session=self.session, id=str(thread_id)) + else: + thread = _user.User(session=self.session, id=author_id) + typing_status = TypingStatus(m.get("state")) + self.on_typing( + author_id=author_id, + status=typing_status, + thread=thread, + ) + + # Other notifications + elif topic == "/legacy_web": + # Friend request + if m["type"] == "jewel_requests_add": + self.on_friend_request(from_id=str(m["from"])) + else: + self.on_unknown_messsage_type(msg=m) + + # Chat timestamp / Buddylist overlay + elif topic == "/orca_presence": + if m["list_type"] == "full": + self._buddylist = {} # Refresh internal list + + statuses = dict() + for data in m["list"]: + user_id = str(data["u"]) + statuses[user_id] = ActiveStatus._from_orca_presence(data) + self._buddylist[user_id] = statuses[user_id] + + # TODO: Which one should we call? + self.on_chat_timestamp(buddylist=statuses) + self.on_buddylist_overlay(statuses=statuses) + + # Unknown message type + else: + self.on_unknown_messsage_type(msg=m) + + def _parse_message(self, topic, data): + try: + self._parse_payload(topic, data) + except Exception as e: + self.on_message_error(exception=e, msg=data) + + def startListening(self): + """Start listening from an external event loop. + + Raises: + FBchatException: If request failed """ - self._seq = content.get("seq", "0") - - if "lb_info" in content: - self._sticky = content["lb_info"]["sticky"] - self._pool = content["lb_info"]["pool"] - - if "batches" in content: - for batch in content["batches"]: - self._parse_message(batch) - - if "ms" not in content: - return - - for m in content["ms"]: - mtype = m.get("type") - try: - # Things that directly change chat - if mtype == "delta": - self._parse_delta(m) - # Inbox - elif mtype == "inbox": - self.on_inbox( - unseen=m["unseen"], - unread=m["unread"], - recent_unread=m["recent_unread"], - ) - - # Typing - elif mtype == "typ" or mtype == "ttyp": - author_id = str(m.get("from")) - thread_id = m.get("thread_fbid") - if thread_id: - thread = Group(session=self.session, id=str(thread_id)) - else: - if author_id == self.session.user_id: - thread_id = m.get("to") - else: - thread_id = author_id - thread = User(session=self.session, id=thread_id) - self.on_typing( - author_id=author_id, status=m["st"] == 1, thread=thread - ) - - # Delivered - - # Seen - # elif mtype == "m_read_receipt": - # - # self.on_seen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time')) - - elif mtype in ["jewel_requests_add"]: - from_id = m["from"] - self.on_friend_request(from_id=from_id) - - # Happens on every login - elif mtype == "qprimer": - self.on_qprimer(at=_util.millis_to_datetime(int(m.get("made")))) - - # Is sent before any other message - elif mtype == "deltaflow": - pass - - # Chat timestamp - elif mtype == "chatproxy-presence": - statuses = dict() - for id_, data in m.get("buddyList", {}).items(): - statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data) - self._buddylist[id_] = statuses[id_] - - self.on_chat_timestamp(buddylist=statuses) - - # Buddylist overlay - elif mtype == "buddylist_overlay": - statuses = dict() - for id_, data in m.get("overlay", {}).items(): - old_in_game = None - if id_ in self._buddylist: - old_in_game = self._buddylist[id_].in_game - - statuses[id_] = ActiveStatus._from_buddylist_overlay( - data, old_in_game - ) - self._buddylist[id_] = statuses[id_] - - self.on_buddylist_overlay(statuses=statuses) - - # Unknown message type - else: - self.on_unknown_messsage_type(msg=m) - - except Exception as e: - self.on_message_error(exception=e, msg=m) + if not self._mqtt: + self._mqtt = _mqtt.Mqtt.connect( + state=self.session, + on_message=self._parse_message, + chat_on=self._mark_alive, + foreground=True, + ) + # Backwards compat + self.on_qprimer(ts=now(), msg=None) def _do_one_listen(self): - try: - if self._mark_alive: - self._ping() - content = self._pull_message() - if content: - self._parse_message(content) - except KeyboardInterrupt: - return False - except _exception.HTTPError as e: - cause = e.__cause__ + # TODO: Remove this wierd check, and let the user handle the chat_on parameter + if self._mark_alive != self._mqtt._chat_on: + self._mqtt.set_chat_on(self._mark_alive) - # Fix 502 and 503 pull errors - if e.status_code in [502, 503]: - # Bump pull channel, while contraining withing 0-4 - self._pull_channel = (self._pull_channel + 1) % 5 - # TODO: Handle these exceptions better - elif isinstance(cause, requests.ReadTimeout): - pass # Expected - elif isinstance(cause, (requests.ConnectTimeout, requests.ConnectionError)): - # If the client has lost their internet connection, keep trying every 30 seconds - time.sleep(30) - else: - raise e - except Exception as e: - return self.on_listen_error(exception=e) + # TODO: Remove on_error param + return self._mqtt.loop_once(on_error=lambda e: self.on_listen_error(exception=e)) - return True + def stopListening(self): + """Stop the listening loop.""" + if not self._mqtt: + return + self._mqtt.disconnect() + # TODO: Preserve the _mqtt object + # Currently, there's some issues when disconnecting + self._mqtt = None def listen(self, markAlive=None): """Initialize and runs the listening loop continually. @@ -1536,6 +1485,16 @@ class Client: """ log.info("Friend request from {}".format(from_id)) + def _on_seen(self, locations=None, at=None): + """ + Todo: + Document this, and make it public + + Args: + locations: --- + at: A timestamp of the action + """ + def on_inbox(self, unseen=None, unread=None, recent_unread=None): """ Todo: @@ -1873,7 +1832,6 @@ class Client: Args: statuses (dict): Dictionary with user IDs as keys and `ActiveStatus` as values """ - log.debug("Buddylist overlay received: {}".format(statuses)) def on_unknown_messsage_type(self, msg=None): """Called when the client is listening, and some unknown data was received. diff --git a/fbchat/_message.py b/fbchat/_message.py index 32e6e3c..e7738f1 100644 --- a/fbchat/_message.py +++ b/fbchat/_message.py @@ -33,7 +33,7 @@ class EmojiSize(enum.Enum): "s": cls.SMALL, } for tag in tags or (): - data = tag.split(":", maxsplit=1) + data = tag.split(":", 1) if len(data) > 1 and data[0] == "hot_emoji_size": return string_to_emojisize.get(data[1]) return None diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py new file mode 100644 index 0000000..a9f6e31 --- /dev/null +++ b/fbchat/_mqtt.py @@ -0,0 +1,317 @@ +import attr +import random +import paho.mqtt.client +from ._core import log +from . import _util, _exception, _graphql + + +def generate_session_id(): + """Generate a random session ID between 1 and 9007199254740991.""" + return random.randint(1, 2 ** 53) + + +@attr.s(slots=True) +class Mqtt(object): + _state = attr.ib() + _mqtt = attr.ib() + _on_message = attr.ib() + _chat_on = attr.ib() + _foreground = attr.ib() + _sequence_id = attr.ib() + _sync_token = attr.ib(None) + + _HOST = "edge-chat.facebook.com" + + @classmethod + def connect(cls, state, on_message, chat_on, foreground): + mqtt = paho.mqtt.client.Client( + client_id="mqttwsclient", + clean_session=True, + protocol=paho.mqtt.client.MQTTv31, + transport="websockets", + ) + mqtt.enable_logger() + # mqtt.max_inflight_messages_set(20) # The rest will get queued + # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued + # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds + # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) + # TODO: Is region (lla | atn | odn | others?) important? + mqtt.tls_set() + + self = cls( + state=state, + mqtt=mqtt, + on_message=on_message, + chat_on=chat_on, + foreground=foreground, + sequence_id=cls._fetch_sequence_id(state), + ) + + # Configure callbacks + mqtt.on_message = self._on_message_handler + mqtt.on_connect = self._on_connect_handler + + self._configure_connect_options() + + # Attempt to connect + try: + rc = mqtt.connect(self._HOST, 443, keepalive=10) + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ) as e: + raise _exception.FBchatException("MQTT connection failed") + + # Raise error if connecting failed + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + err = paho.mqtt.client.error_string(rc) + raise _exception.FBchatException("MQTT connection failed: {}".format(err)) + + return self + + def _on_message_handler(self, client, userdata, message): + # Parse payload JSON + try: + j = _util.parse_json(message.payload.decode("utf-8")) + except (_exception.FBchatFacebookError, UnicodeDecodeError): + log.exception("Failed parsing MQTT data on %s as JSON", message.topic) + return + + if message.topic == "/t_ms": + # Update sync_token when received + # This is received in the first message after we've created a messenger + # sync queue. + if "syncToken" in j and "firstDeltaSeqId" in j: + self._sync_token = j["syncToken"] + self._sequence_id = j["firstDeltaSeqId"] + + # Update last sequence id when received + if "lastIssuedSeqId" in j: + self._sequence_id = j["lastIssuedSeqId"] + + if "errorCode" in j: + # Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND + # 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' + log.error("MQTT error code %s received", j["errorCode"]) + # TODO: Consider resetting the sync_token and sequence ID here? + + log.debug("MQTT payload: %s, %s", message.topic, j) + + # Call the external callback + self._on_message(message.topic, j) + + @staticmethod + def _fetch_sequence_id(state): + """Fetch sequence ID.""" + params = { + "limit": 1, + "tags": ["INBOX"], + "before": None, + "includeDeliveryReceipts": False, + "includeSeqID": True, + } + log.debug("Fetching MQTT sequence ID") + # Same request as in `Client.fetchThreadList` + (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + try: + return int(j["viewer"]["message_threads"]["sync_sequence_id"]) + except (KeyError, ValueError): + # TODO: Proper exceptions + raise + + def _on_connect_handler(self, client, userdata, flags, rc): + if rc == 21: + raise _exception.FBchatException( + "Failed connecting. Maybe your cookies are wrong?" + ) + if rc != 0: + return # Don't try to send publish if the connection failed + + # configure receiving messages. + payload = { + "sync_api_version": 10, + "max_deltas_able_to_process": 1000, + "delta_batch_size": 500, + "encoding": "JSON", + "entity_fbid": self._state.user_id, + } + + # If we don't have a sync_token, create a new messenger queue + # This is done so that across reconnects, if we've received a sync token, we + # SHOULD receive a piece of data in /t_ms exactly once! + if self._sync_token is None: + topic = "/messenger_sync_create_queue" + payload["initial_titan_sequence_id"] = str(self._sequence_id) + payload["device_params"] = None + else: + topic = "/messenger_sync_get_diffs" + payload["last_seq_id"] = str(self._sequence_id) + payload["sync_token"] = self._sync_token + + self._mqtt.publish(topic, _util.json_minimal(payload), qos=1) + + def _configure_connect_options(self): + # Generate a new session ID on each reconnect + session_id = generate_session_id() + + topics = [ + # Things that happen in chats (e.g. messages) + "/t_ms", + # Group typing notifications + "/thread_typing", + # Private chat typing notifications + "/orca_typing_notifications", + # Active notifications + "/orca_presence", + # Other notifications not related to chats (e.g. friend requests) + "/legacy_web", + # Facebook's continuous error reporting/logging? + "/br_sr", + # Response to /br_sr + "/sr_res", + # TODO: Investigate the response from this! (A bunch of binary data) + # "/t_p", + # TODO: Find out what this does! + "/webrtc", + # TODO: Find out what this does! + "/onevc", + # TODO: Find out what this does! + "/notify_disconnect", + # Old, no longer active topics + # These are here just in case something interesting pops up + "/inbox", + "/mercury", + "/messaging_events", + "/orca_message_notifications", + "/pp", + "/t_rtc", + "/webrtc_response", + ] + + username = { + # The user ID + "u": self._state.user_id, + # Session ID + "s": session_id, + # Active status setting + "chat_on": self._chat_on, + # foreground_state - Whether the window is focused + "fg": self._foreground, + # Can be any random ID + "d": self._state._client_id, + # Application ID, taken from facebook.com + "aid": 219994525426954, + # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing + "st": topics, + # MQTT extension by FB, allows making a PUBLISH while CONNECTing + # Using this is more efficient, but the same can be acheived with: + # def on_connect(*args): + # mqtt.publish(topic, payload, qos=1) + # mqtt.on_connect = on_connect + # TODO: For some reason this doesn't work! + "pm": [ + # { + # "topic": topic, + # "payload": payload, + # "qos": 1, + # "messageId": 65536, + # } + ], + # Unknown parameters + "cp": 3, + "ecp": 10, + "ct": "websocket", + "mqtt_sid": "", + "dc": "", + "no_auto_fg": True, + "gas": None, + "pack": [], + } + + # TODO: Make this thread safe + self._mqtt.username_pw_set(_util.json_minimal(username)) + + headers = { + # TODO: Make this access thread safe + "Cookie": _util.get_cookie_header( + self._state._session, "https://edge-chat.facebook.com/chat" + ), + "User-Agent": self._state._session.headers["User-Agent"], + "Origin": "https://www.facebook.com", + "Host": self._HOST, + } + + self._mqtt.ws_set_options( + path="/chat?sid={}".format(session_id), headers=headers + ) + + def loop_once(self, on_error=None): + """Run the listening loop once. + + Returns whether to keep listening or not. + """ + rc = self._mqtt.loop(timeout=1.0) + + # If disconnect() has been called + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + return False # Stop listening + + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + # If known/expected error + if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: + log.warning("Connection lost, retrying") + elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: + # This error is wrongly classified + # See https://github.com/eclipse/paho.mqtt.python/issues/340 + log.warning("Connection error, retrying") + else: + err = paho.mqtt.client.error_string(rc) + log.error("MQTT Error: %s", err) + # For backwards compatibility + if on_error: + on_error(_exception.FBchatException("MQTT Error {}".format(err))) + + # Wait before reconnecting + self._mqtt._reconnect_wait() + + # Try reconnecting + self._configure_connect_options() + try: + self._mqtt.reconnect() + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ) as e: + log.debug("MQTT reconnection failed: %s", e) + + return True # Keep listening + + def disconnect(self): + self._mqtt.disconnect() + + def set_foreground(self, value): + payload = _util.json_minimal({"foreground": value}) + info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) + self._foreground = value + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() + + def set_chat_on(self, value): + # TODO: Is this the right request to make? + data = {"make_user_available_when_in_foreground": value} + payload = _util.json_minimal(data) + info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) + self._chat_on = value + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() + + # def send_additional_contacts(self, additional_contacts): + # payload = _util.json_minimal({"additional_contacts": additional_contacts}) + # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) + # + # def browser_close(self): + # info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1) diff --git a/fbchat/_user.py b/fbchat/_user.py index de8d720..fae76e3 100644 --- a/fbchat/_user.py +++ b/fbchat/_user.py @@ -204,17 +204,6 @@ class ActiveStatus: in_game = attr.ib(None) @classmethod - def _from_chatproxy_presence(cls, id_, data): - return cls( - active=data["p"] in [2, 3] if "p" in data else None, - last_active=_util.millis_to_datetime(data.get("lat")), - in_game=int(id_) in data.get("gamers", {}), - ) - - @classmethod - def _from_buddylist_overlay(cls, data, in_game=None): - return cls( - active=data["a"] in [2, 3] if "a" in data else None, - last_active=_util.millis_to_datetime(data.get("la")), - in_game=None, - ) + def _from_orca_presence(cls, data): + # TODO: Handle `c` and `vc` keys (Probably some binary data) + return cls(active=data["p"] in [2, 3], last_active=_util.millis_to_datetime(data["l"]), in_game=None) diff --git a/fbchat/_util.py b/fbchat/_util.py index 3ff0520..c66bc7e 100644 --- a/fbchat/_util.py +++ b/fbchat/_util.py @@ -55,6 +55,14 @@ def strip_json_cruft(text): raise _exception.ParseError("No JSON object found", data=text) from e +def get_cookie_header(session, url): + """Extract a cookie header from a requests session.""" + # The cookies are extracted this way to make sure they're escaped correctly + return requests.cookies.get_cookie_header( + session.cookies, requests.Request("GET", url), + ) + + def get_decoded_r(r): return get_decoded(r._content) diff --git a/pyproject.toml b/pyproject.toml index 50c5ea0..5510324 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ requires = [ "attrs>=19.1", "requests~=2.19", "beautifulsoup4~=4.0", + "paho-mqtt~=1.5", ] description-file = "README.rst" classifiers = [