diff --git a/docs/intro.rst b/docs/intro.rst index 39a17fd..2749607 100644 --- a/docs/intro.rst +++ b/docs/intro.rst @@ -134,17 +134,18 @@ Listening & Events Now, we are finally at the point we have all been waiting for: Creating an automatic Facebook bot! -To get started, you create your methods that will handle your events:: +To get started, you create a listener object:: - def on_message(event): + listener = fbchat.Listener(session=session, chat_on=False, foreground=False) + +The you use that to register methods that will handle your events:: + + @listener.register + def on_message(event: fbchat.MessageEvent): print(f"Message from {event.author.id}: {event.message.text}") -And then you create a listener object, and start handling the incoming events:: +And then you start handling the incoming events:: - listener = fbchat.Listener.connect(session, False, False) - - for event in listener.listen(): - if isinstance(event, fbchat.MessageEvent): - on_message(event) + listener.run() View the :ref:`examples` to see some more examples illustrating the event system. diff --git a/examples/echobot.py b/examples/echobot.py index d196621..212e291 100644 --- a/examples/echobot.py +++ b/examples/echobot.py @@ -1,17 +1,15 @@ import fbchat session = fbchat.Session.login("", "") - -listener = fbchat.Listener.connect(session, chat_on=False, foreground=False) +listener = fbchat.Listener(session=session, chat_on=False, foreground=False) -def on_message(event): +@listener.register +def on_message(event: fbchat.MessageEvent): print(f"{event.message.text} from {event.author.id} in {event.thread.id}") # If you're not the author, echo if event.author.id != session.user.id: event.thread.send_text(event.message.text) -for event in listener.listen(): - if isinstance(event, fbchat.MessageEvent): - on_message(event) +listener.run() diff --git a/examples/keepbot.py b/examples/keepbot.py index 1529f90..4ef661e 100644 --- a/examples/keepbot.py +++ b/examples/keepbot.py @@ -15,10 +15,10 @@ old_nicknames = { } session = fbchat.Session.login("", "") - -listener = fbchat.Listener.connect(session, chat_on=False, foreground=False) +listener = fbchat.Listener(session=session, chat_on=False, foreground=False) +@listener.register def on_color_set(event: fbchat.ColorSet): if old_thread_id != event.thread.id: return @@ -27,6 +27,7 @@ def on_color_set(event: fbchat.ColorSet): event.thread.set_color(old_color) +@listener.register def on_emoji_set(event: fbchat.EmojiSet): if old_thread_id != event.thread.id: return @@ -35,6 +36,7 @@ def on_emoji_set(event: fbchat.EmojiSet): event.thread.set_emoji(old_emoji) +@listener.register def on_title_set(event: fbchat.TitleSet): if old_thread_id != event.thread.id: return @@ -43,6 +45,7 @@ def on_title_set(event: fbchat.TitleSet): event.thread.set_title(old_title) +@listener.register def on_nickname_set(event: fbchat.NicknameSet): if old_thread_id != event.thread.id: return @@ -55,6 +58,7 @@ def on_nickname_set(event: fbchat.NicknameSet): event.thread.set_nickname(event.subject.id, old_nickname) +@listener.register def on_people_added(event: fbchat.PeopleAdded): if old_thread_id != event.thread.id: return @@ -64,6 +68,7 @@ def on_people_added(event: fbchat.PeopleAdded): event.thread.remove_participant(added.id) +@listener.register def on_person_removed(event: fbchat.PersonRemoved): if old_thread_id != event.thread.id: return @@ -75,16 +80,4 @@ def on_person_removed(event: fbchat.PersonRemoved): event.thread.add_participants([removed.id]) -for event in listener.listen(): - if isinstance(event, fbchat.ColorSet): - on_color_set(event) - elif isinstance(event, fbchat.EmojiSet): - on_emoji_set(event) - elif isinstance(event, fbchat.TitleSet): - on_title_set(event) - elif isinstance(event, fbchat.NicknameSet): - on_nickname_set(event) - elif isinstance(event, fbchat.PeopleAdded): - on_people_added(event) - elif isinstance(event, fbchat.PersonRemoved): - on_person_removed(event) +listener.run() diff --git a/examples/removebot.py b/examples/removebot.py index 14d449a..0c83f27 100644 --- a/examples/removebot.py +++ b/examples/removebot.py @@ -1,10 +1,10 @@ import fbchat session = fbchat.Session.login("", "") - -listener = fbchat.Listener.connect(session, chat_on=False, foreground=False) +listener = fbchat.Listener(session=session, chat_on=False, foreground=False) +@listener.register def on_message(event): # We can only kick people from group chats, so no need to try if it's a user chat if not isinstance(event.thread, fbchat.Group): @@ -14,6 +14,4 @@ def on_message(event): event.thread.remove_participant(event.author.id) -for event in listener.listen(): - if isinstance(event, fbchat.MessageEvent): - on_message(event) +listener.run() diff --git a/fbchat/__init__.py b/fbchat/__init__.py index 9d5e8bb..8dda631 100644 --- a/fbchat/__init__.py +++ b/fbchat/__init__.py @@ -111,7 +111,7 @@ from ._events import ( FriendRequest, Presence, ) -from ._mqtt import Listener +from ._listen import Listener from ._client import Client diff --git a/fbchat/_exception.py b/fbchat/_exception.py index 76aa047..b35466b 100644 --- a/fbchat/_exception.py +++ b/fbchat/_exception.py @@ -46,6 +46,11 @@ class ParseError(FacebookError): return msg.format(self.message, self.data) +@attr.s(slots=True, auto_exc=True) +class NotLoggedIn(FacebookError): + """Raised by Facebook if the client has been logged out.""" + + @attr.s(slots=True, auto_exc=True) class ExternalError(FacebookError): """Base class for errors that Facebook return.""" @@ -86,13 +91,6 @@ class InvalidParameters(ExternalError): """ -@attr.s(slots=True, auto_exc=True) -class NotLoggedIn(ExternalError): - """Raised by Facebook if the client has been logged out.""" - - code = attr.ib() - - @attr.s(slots=True, auto_exc=True) class PleaseRefresh(ExternalError): """Raised by Facebook if the client has been inactive for too long. @@ -108,7 +106,7 @@ def handle_payload_error(j): return code = j["error"] if code == 1357001: - error_cls = NotLoggedIn + raise NotLoggedIn(j["errorSummary"]) elif code == 1357004: error_cls = PleaseRefresh elif code in (1357031, 1545010, 1545003): diff --git a/fbchat/_listen.py b/fbchat/_listen.py new file mode 100644 index 0000000..a7ee27a --- /dev/null +++ b/fbchat/_listen.py @@ -0,0 +1,460 @@ +import attr +import inspect +import random +import paho.mqtt.client +import requests +from ._common import log, kw_only +from . import _util, _exception, _session, _graphql, _events + +from typing import Iterable, Optional, Mapping, Callable + + +HOST = "edge-chat.facebook.com" + +TOPICS = [ + # Things that happen in chats (e.g. messages) + "/t_ms", + # Group typing notifications + "/thread_typing", + # Private chat typing notifications + "/orca_typing_notifications", + # Active notifications + "/orca_presence", + # Other notifications not related to chats (e.g. friend requests) + "/legacy_web", + # Facebook's continuous error reporting/logging? + "/br_sr", + # Response to /br_sr + "/sr_res", + # Data about user-to-user calls + # TODO: Investigate the response from this! (A bunch of binary data) + # "/t_rtc", + # TODO: Find out what this does! + # TODO: Investigate the response from this! (A bunch of binary data) + # "/t_p", + # TODO: Find out what this does! + "/webrtc", + # TODO: Find out what this does! + "/onevc", + # TODO: Find out what this does! + "/notify_disconnect", + # Old, no longer active topics + # These are here just in case something interesting pops up + "/inbox", + "/mercury", + "/messaging_events", + "/orca_message_notifications", + "/pp", + "/webrtc_response", +] + + +def get_cookie_header(session: requests.Session, url: str) -> str: + """Extract a cookie header from a requests session.""" + # The cookies are extracted this way to make sure they're escaped correctly + return requests.cookies.get_cookie_header( + session.cookies, requests.Request("GET", url), + ) + + +def generate_session_id() -> int: + """Generate a random session ID between 1 and 9007199254740991.""" + return random.randint(1, 2 ** 53) + + +def mqtt_factory() -> paho.mqtt.client.Client: + # Configure internal MQTT handler + mqtt = paho.mqtt.client.Client( + client_id="mqttwsclient", + clean_session=True, + protocol=paho.mqtt.client.MQTTv31, + transport="websockets", + ) + mqtt.enable_logger() + # mqtt.max_inflight_messages_set(20) # The rest will get queued + # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued + # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds + # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) + mqtt.tls_set() + mqtt.connect_async(HOST, 443, keepalive=10) + return mqtt + + +def fetch_sequence_id(session: _session.Session) -> int: + """Fetch sequence ID.""" + params = { + "limit": 0, + "tags": ["INBOX"], + "before": None, + "includeDeliveryReceipts": False, + "includeSeqID": True, + } + log.debug("Fetching MQTT sequence ID") + # Same doc id as in `Client.fetch_threads` + (j,) = session._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"] + if not sequence_id: + raise _exception.NotLoggedIn("Failed fetching sequence id") + return int(sequence_id) + + +HandlerT = Callable[[_events.Event], None] + + +@attr.s(slots=True, kw_only=kw_only, eq=False) +class Listener: + """Helper, to listen for incoming Facebook events. + + Initialize a connection to the Facebook MQTT service. + + Args: + session: The session to use when making requests. + chat_on: Whether ... + foreground: Whether ... + + Example: + >>> listener = fbchat.Listener(session, chat_on=True, foreground=True) + """ + + session = attr.ib(type=_session.Session) + _chat_on = attr.ib(type=bool) + _foreground = attr.ib(type=bool) + _mqtt = attr.ib(factory=mqtt_factory, type=paho.mqtt.client.Client) + _sync_token = attr.ib(None, type=Optional[str]) + _sequence_id = attr.ib(None, type=Optional[int]) + _tmp_events = attr.ib(factory=list, type=Iterable[_events.Event]) + _handlers = attr.ib(factory=dict, type=Mapping[HandlerT, _events.Event]) + + def __attrs_post_init__(self): + # Configure callbacks + self._mqtt.on_message = self._on_message_handler + self._mqtt.on_connect = self._on_connect_handler + + def _handle_ms(self, j): + """Handle /t_ms special logic. + + Returns whether to continue parsing the message. + """ + # TODO: Merge this with the parsing in _events + + # Update sync_token when received + # This is received in the first message after we've created a messenger + # sync queue. + if "syncToken" in j and "firstDeltaSeqId" in j: + self._sync_token = j["syncToken"] + self._sequence_id = j["firstDeltaSeqId"] + return False + + if "errorCode" in j: + error = j["errorCode"] + # TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' + if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"): + # ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too + # much time passed, or that it was simply missing + # ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so + # the desired events could not be retrieved + log.error( + "The MQTT listener was disconnected for too long," + " events may have been lost" + ) + self._sync_token = None + self._sequence_id = None + return False + log.error("MQTT error code %s received", error) + return False + + # Update last sequence id + # Except for the two cases above, this is always received + self._sequence_id = j["lastIssuedSeqId"] + return True + + def _on_message_handler(self, client, userdata, message): + # Parse payload JSON + try: + j = _util.parse_json(message.payload.decode("utf-8")) + except (_exception.FacebookError, UnicodeDecodeError): + log.debug(message.payload) + log.exception("Failed parsing MQTT data on %s as JSON", message.topic) + return + + log.debug("MQTT payload: %s, %s", message.topic, j) + + if message.topic == "/t_ms": + if not self._handle_ms(j): + return + + try: + # TODO: Don't handle this in a callback + self._tmp_events = list( + _events.parse_events(self.session, message.topic, j) + ) + except _exception.ParseError: + log.exception("Failed parsing MQTT data") + + def _on_connect_handler(self, client, userdata, flags, rc): + if rc == 21: + raise _exception.FacebookError( + "Failed connecting. Maybe your cookies are wrong?" + ) + if rc != 0: + err = paho.mqtt.client.connack_string(rc) + log.error("MQTT Connection Error: %s", err) + return # Don't try to send publish if the connection failed + + self._messenger_queue_publish() + + def _messenger_queue_publish(self): + # configure receiving messages. + payload = { + "sync_api_version": 10, + "max_deltas_able_to_process": 1000, + "delta_batch_size": 500, + "encoding": "JSON", + "entity_fbid": self.session.user.id, + } + + # If we don't have a sync_token, create a new messenger queue + # This is done so that across reconnects, if we've received a sync token, we + # SHOULD receive a piece of data in /t_ms exactly once! + if self._sync_token is None: + topic = "/messenger_sync_create_queue" + payload["initial_titan_sequence_id"] = str(self._sequence_id) + payload["device_params"] = None + else: + topic = "/messenger_sync_get_diffs" + payload["last_seq_id"] = str(self._sequence_id) + payload["sync_token"] = self._sync_token + + self._mqtt.publish(topic, _util.json_minimal(payload), qos=1) + + def _configure_connect_options(self): + # Generate a new session ID on each reconnect + session_id = generate_session_id() + + username = { + # The user ID + "u": self.session.user.id, + # Session ID + "s": session_id, + # Active status setting + "chat_on": self._chat_on, + # foreground_state - Whether the window is focused + "fg": self._foreground, + # Can be any random ID + "d": self.session._client_id, + # Application ID, taken from facebook.com + "aid": 219994525426954, + # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing + "st": TOPICS, + # MQTT extension by FB, allows making a PUBLISH while CONNECTing + # Using this is more efficient, but the same can be acheived with: + # def on_connect(*args): + # mqtt.publish(topic, payload, qos=1) + # mqtt.on_connect = on_connect + # TODO: For some reason this doesn't work! + "pm": [ + # { + # "topic": topic, + # "payload": payload, + # "qos": 1, + # "messageId": 65536, + # } + ], + # Unknown parameters + "cp": 3, + "ecp": 10, + "ct": "websocket", + "mqtt_sid": "", + "dc": "", + "no_auto_fg": True, + "gas": None, + "pack": [], + } + + self._mqtt.username_pw_set(_util.json_minimal(username)) + + headers = { + "Cookie": get_cookie_header( + self.session._session, "https://edge-chat.facebook.com/chat" + ), + "User-Agent": self.session._session.headers["User-Agent"], + "Origin": "https://www.facebook.com", + "Host": HOST, + } + + # TODO: Is region (lla | atn | odn | others?) important? + self._mqtt.ws_set_options( + path="/chat?sid={}".format(session_id), headers=headers + ) + + def _reconnect(self): + # Try reconnecting + self._configure_connect_options() + try: + self._mqtt.reconnect() + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ) as e: + log.debug("MQTT reconnection failed: %s", e) + # Wait before reconnecting + self._mqtt._reconnect_wait() + + def listen(self) -> Iterable[_events.Event]: + """Run the listening loop continually. + + Yields events when they arrive. + + This will automatically reconnect on errors, except if the errors are one of + `PleaseRefresh` or `NotLoggedIn`. + + Example: + Print events continually. + + >>> for event in listener.listen(): + ... print(event) + """ + if self._sequence_id is None: + self._sequence_id = fetch_sequence_id(self.session) + + # Make sure we're connected + while True: + # Beware, internal API, may have to change this to something more stable! + if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async: + self._reconnect() + else: + break + + while True: + rc = self._mqtt.loop(timeout=1.0) + + # The sequence ID was reset in _handle_ms + # TODO: Signal to the user that they should reload their data! + if self._sequence_id is None: + self._sequence_id = fetch_sequence_id(self.session) + self._messenger_queue_publish() + + # If disconnect() has been called + # Beware, internal API, may have to change this to something more stable! + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + break # Stop listening + + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + # If known/expected error + if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: + log.warning("Connection lost, retrying") + elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: + # This error is wrongly classified + # See https://github.com/eclipse/paho.mqtt.python/issues/340 + log.warning("Connection error, retrying") + elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED: + raise _exception.NotLoggedIn("MQTT connection refused") + else: + err = paho.mqtt.client.error_string(rc) + log.error("MQTT Error: %s", err) + + self._reconnect() + + if self._tmp_events: + yield from self._tmp_events + self._tmp_events = [] + + def disconnect(self) -> None: + """Disconnect the MQTT listener. + + Can be called while listening, which will stop the listening loop. + + The `Listener` object should not be used after this is called! + + Example: + Stop the listener when recieving a message with the text "/stop" + + >>> for event in listener.listen(): + ... if isinstance(event, fbchat.MessageEvent): + ... if event.message.text == "/stop": + ... listener.disconnect() # Almost the same "break" + """ + self._mqtt.disconnect() + + def set_foreground(self, value: bool) -> None: + """Set the `foreground` value while listening.""" + # TODO: Document what this actually does! + payload = _util.json_minimal({"foreground": value}) + info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) + self._foreground = value + # TODO: We can't wait for this, since the loop is running within the same thread + # info.wait_for_publish() + + def set_chat_on(self, value: bool) -> None: + """Set the `chat_on` value while listening.""" + # TODO: Document what this actually does! + # TODO: Is this the right request to make? + data = {"make_user_available_when_in_foreground": value} + payload = _util.json_minimal(data) + info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) + self._chat_on = value + # TODO: We can't wait for this, since the loop is running within the same thread + # info.wait_for_publish() + + # def send_additional_contacts(self, additional_contacts): + # payload = _util.json_minimal({"additional_contacts": additional_contacts}) + # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) + # + # def browser_close(self): + # info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1) + + def register(func: HandlerT) -> HandlerT: + """Register a function that will be called when .run is called. + + The input function must take a single annotated argument. + + Should be used as a function decorator. + + Example: + >>> @listener.register + >>> def my_handler(event: fbchat.Event): + ... print(f"New event: {event}") + """ + try: + parameter = next(iter(inspect.signature(func).parameters.values())) + except Exception as e: # TODO: More precise exceptions + raise ValueError("Invalid function. Must have at least an argument") from e + + if parameter.annotation is parameter.empty: + raise ValueError("Invalid function. Must be annotated") + + if not issubclass(parameter.annotation, _events.Event): + raise ValueError("Invalid function. Annotation must be an event class") + + # TODO: More error checks, e.g. kw_only parameters + + self._handlers[func] = parameter.annotation + + def unregister(func: HandlerT): + """Unregister a previously registered function.""" + try: + self._handlers.pop(func) + except KeyError: + raise ValueError("Tried to unregister a function that was not registered") + + def run(self) -> None: + """Run the listening loop, and dispatch incoming events to registered handlers. + + This uses `.listen`, which reconnect on errors, except if the errors are one of + `PleaseRefresh` or `NotLoggedIn`. + + Example: + Print incoming messages. + + >>> @listener.register + >>> def print_msg(event: fbchat.MessageEvent): + ... print(event.message.text) + ... + >>> listener.run() + """ + for event in self.listen(): + for handler, event_cls in self._handlers.items(): + if isinstance(event, event_cls): + handler(event) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py deleted file mode 100644 index 0fddeb7..0000000 --- a/fbchat/_mqtt.py +++ /dev/null @@ -1,407 +0,0 @@ -import attr -import random -import paho.mqtt.client -import requests -from ._common import log, kw_only -from . import _util, _exception, _session, _graphql, _events - -from typing import Iterable, Optional - - -def get_cookie_header(session: requests.Session, url: str) -> str: - """Extract a cookie header from a requests session.""" - # The cookies are extracted this way to make sure they're escaped correctly - return requests.cookies.get_cookie_header( - session.cookies, requests.Request("GET", url), - ) - - -def generate_session_id() -> int: - """Generate a random session ID between 1 and 9007199254740991.""" - return random.randint(1, 2 ** 53) - - -@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False) -class Listener: - """Helper, to listen for incoming Facebook events.""" - - session = attr.ib(type=_session.Session) - _mqtt = attr.ib(type=paho.mqtt.client.Client) - _chat_on = attr.ib(type=bool) - _foreground = attr.ib(type=bool) - _sequence_id = attr.ib(type=int) - _sync_token = attr.ib(None, type=str) - _tmp_events = attr.ib(None, type=Optional[Iterable[_events.Event]]) - - _HOST = "edge-chat.facebook.com" - - def __repr__(self) -> str: - # An alternative repr, to illustrate that you can't create the class directly - return "".format( - self.session, self._chat_on, self._foreground - ) - - @classmethod - def connect(cls, session: _session.Session, chat_on: bool, foreground: bool): - """Initialize a connection to the Facebook MQTT service. - - Args: - session: The session to use when making requests. - chat_on: Whether ... - foreground: Whether ... - - Example: - >>> listener = fbchat.Listener.connect(session, chat_on=True, foreground=True) - """ - mqtt = paho.mqtt.client.Client( - client_id="mqttwsclient", - clean_session=True, - protocol=paho.mqtt.client.MQTTv31, - transport="websockets", - ) - mqtt.enable_logger() - # mqtt.max_inflight_messages_set(20) # The rest will get queued - # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued - # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds - # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) - # TODO: Is region (lla | atn | odn | others?) important? - mqtt.tls_set() - - self = cls( - session=session, - mqtt=mqtt, - chat_on=chat_on, - foreground=foreground, - sequence_id=cls._fetch_sequence_id(session), - ) - - # Configure callbacks - mqtt.on_message = self._on_message_handler - mqtt.on_connect = self._on_connect_handler - - self._configure_connect_options() - - # Attempt to connect - try: - rc = mqtt.connect(self._HOST, 443, keepalive=10) - except ( - # Taken from .loop_forever - paho.mqtt.client.socket.error, - OSError, - paho.mqtt.client.WebsocketConnectionError, - ) as e: - raise _exception.FacebookError("MQTT connection failed") from e - - # Raise error if connecting failed - if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: - err = paho.mqtt.client.error_string(rc) - raise _exception.FacebookError("MQTT connection failed: {}".format(err)) - - return self - - def _on_message_handler(self, client, userdata, message): - # Parse payload JSON - try: - j = _util.parse_json(message.payload.decode("utf-8")) - except (_exception.FacebookError, UnicodeDecodeError): - log.debug(message.payload) - log.exception("Failed parsing MQTT data on %s as JSON", message.topic) - return - - log.debug("MQTT payload: %s, %s", message.topic, j) - - if message.topic == "/t_ms": - # Update sync_token when received - # This is received in the first message after we've created a messenger - # sync queue. - if "syncToken" in j and "firstDeltaSeqId" in j: - self._sync_token = j["syncToken"] - self._sequence_id = j["firstDeltaSeqId"] - return - - if "errorCode" in j: - error = j["errorCode"] - # TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' - if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"): - # ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too - # much time passed, or that it was simply missing - # ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so - # the desired events could not be retrieved - log.error( - "The MQTT listener was disconnected for too long," - " events may have been lost" - ) - self._sync_token = None - self._sequence_id = self._fetch_sequence_id(self.session) - self._messenger_queue_publish() - # TODO: Signal to the user that they should reload their data! - return - log.error("MQTT error code %s received", error) - return - - # Update last sequence id when received - if "lastIssuedSeqId" in j: - self._sequence_id = j["lastIssuedSeqId"] - else: - log.error("Missing last sequence id: %s", j) - - try: - # TODO: Don't handle this in a callback - self._tmp_events = list( - _events.parse_events(self.session, message.topic, j) - ) - except _exception.ParseError: - log.exception("Failed parsing MQTT data") - - @staticmethod - def _fetch_sequence_id(session: _session.Session) -> int: - """Fetch sequence ID.""" - params = { - "limit": 0, - "tags": ["INBOX"], - "before": None, - "includeDeliveryReceipts": False, - "includeSeqID": True, - } - log.debug("Fetching MQTT sequence ID") - # Same request as in `Client.fetch_threads` - (j,) = session._graphql_requests( - _graphql.from_doc_id("1349387578499440", params) - ) - sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"] - if not sequence_id: - raise _exception.NotLoggedIn("Failed fetching sequence id") - return int(sequence_id) - - def _on_connect_handler(self, client, userdata, flags, rc): - if rc == 21: - raise _exception.FacebookError( - "Failed connecting. Maybe your cookies are wrong?" - ) - if rc != 0: - return # Don't try to send publish if the connection failed - - self._messenger_queue_publish() - - def _messenger_queue_publish(self): - # configure receiving messages. - payload = { - "sync_api_version": 10, - "max_deltas_able_to_process": 1000, - "delta_batch_size": 500, - "encoding": "JSON", - "entity_fbid": self.session.user.id, - } - - # If we don't have a sync_token, create a new messenger queue - # This is done so that across reconnects, if we've received a sync token, we - # SHOULD receive a piece of data in /t_ms exactly once! - if self._sync_token is None: - topic = "/messenger_sync_create_queue" - payload["initial_titan_sequence_id"] = str(self._sequence_id) - payload["device_params"] = None - else: - topic = "/messenger_sync_get_diffs" - payload["last_seq_id"] = str(self._sequence_id) - payload["sync_token"] = self._sync_token - - self._mqtt.publish(topic, _util.json_minimal(payload), qos=1) - - def _configure_connect_options(self): - # Generate a new session ID on each reconnect - session_id = generate_session_id() - - topics = [ - # Things that happen in chats (e.g. messages) - "/t_ms", - # Group typing notifications - "/thread_typing", - # Private chat typing notifications - "/orca_typing_notifications", - # Active notifications - "/orca_presence", - # Other notifications not related to chats (e.g. friend requests) - "/legacy_web", - # Facebook's continuous error reporting/logging? - "/br_sr", - # Response to /br_sr - "/sr_res", - # Data about user-to-user calls - # TODO: Investigate the response from this! (A bunch of binary data) - # "/t_rtc", - # TODO: Find out what this does! - # TODO: Investigate the response from this! (A bunch of binary data) - # "/t_p", - # TODO: Find out what this does! - "/webrtc", - # TODO: Find out what this does! - "/onevc", - # TODO: Find out what this does! - "/notify_disconnect", - # Old, no longer active topics - # These are here just in case something interesting pops up - "/inbox", - "/mercury", - "/messaging_events", - "/orca_message_notifications", - "/pp", - "/webrtc_response", - ] - - username = { - # The user ID - "u": self.session.user.id, - # Session ID - "s": session_id, - # Active status setting - "chat_on": self._chat_on, - # foreground_state - Whether the window is focused - "fg": self._foreground, - # Can be any random ID - "d": self.session._client_id, - # Application ID, taken from facebook.com - "aid": 219994525426954, - # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing - "st": topics, - # MQTT extension by FB, allows making a PUBLISH while CONNECTing - # Using this is more efficient, but the same can be acheived with: - # def on_connect(*args): - # mqtt.publish(topic, payload, qos=1) - # mqtt.on_connect = on_connect - # TODO: For some reason this doesn't work! - "pm": [ - # { - # "topic": topic, - # "payload": payload, - # "qos": 1, - # "messageId": 65536, - # } - ], - # Unknown parameters - "cp": 3, - "ecp": 10, - "ct": "websocket", - "mqtt_sid": "", - "dc": "", - "no_auto_fg": True, - "gas": None, - "pack": [], - } - - # TODO: Make this thread safe - self._mqtt.username_pw_set(_util.json_minimal(username)) - - headers = { - # TODO: Make this access thread safe - "Cookie": get_cookie_header( - self.session._session, "https://edge-chat.facebook.com/chat" - ), - "User-Agent": self.session._session.headers["User-Agent"], - "Origin": "https://www.facebook.com", - "Host": self._HOST, - } - - self._mqtt.ws_set_options( - path="/chat?sid={}".format(session_id), headers=headers - ) - - def _loop_once(self) -> bool: - rc = self._mqtt.loop(timeout=1.0) - - # If disconnect() has been called - # Beware, internal API, may have to change this to something more stable! - if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: - return False # Stop listening - - if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: - # If known/expected error - if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: - log.warning("Connection lost, retrying") - elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: - # This error is wrongly classified - # See https://github.com/eclipse/paho.mqtt.python/issues/340 - log.warning("Connection error, retrying") - elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED: - raise _exception.NotLoggedIn("MQTT connection refused") - else: - err = paho.mqtt.client.error_string(rc) - log.error("MQTT Error: %s", err) - - # Wait before reconnecting - self._mqtt._reconnect_wait() - - # Try reconnecting - self._configure_connect_options() - try: - self._mqtt.reconnect() - except ( - # Taken from .loop_forever - paho.mqtt.client.socket.error, - OSError, - paho.mqtt.client.WebsocketConnectionError, - ) as e: - log.debug("MQTT reconnection failed: %s", e) - - return True # Keep listening - - def listen(self) -> Iterable[_events.Event]: - """Run the listening loop continually. - - Yields events when they arrive. - - This will automatically reconnect on errors. - - Example: - Print events continually. - - >>> for event in listener.listen(): - ... print(event) - """ - while self._loop_once(): - if self._tmp_events: - yield from self._tmp_events - self._tmp_events = None - - def disconnect(self) -> None: - """Disconnect the MQTT listener. - - Can be called while listening, which will stop the listening loop. - - The `Listener` object should not be used after this is called! - - Example: - Stop the listener when recieving a message with the text "/stop" - - >>> for event in listener.listen(): - ... if isinstance(event, fbchat.MessageEvent): - ... if event.message.text == "/stop": - ... listener.disconnect() # Almost the same "break" - """ - self._mqtt.disconnect() - - def set_foreground(self, value: bool) -> None: - """Set the `foreground` value while listening.""" - # TODO: Document what this actually does! - payload = _util.json_minimal({"foreground": value}) - info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) - self._foreground = value - # TODO: We can't wait for this, since the loop is running within the same thread - # info.wait_for_publish() - - def set_chat_on(self, value: bool) -> None: - """Set the `chat_on` value while listening.""" - # TODO: Document what this actually does! - # TODO: Is this the right request to make? - data = {"make_user_available_when_in_foreground": value} - payload = _util.json_minimal(data) - info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) - self._chat_on = value - # TODO: We can't wait for this, since the loop is running within the same thread - # info.wait_for_publish() - - # def send_additional_contacts(self, additional_contacts): - # payload = _util.json_minimal({"additional_contacts": additional_contacts}) - # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) - # - # def browser_close(self): - # info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1) diff --git a/fbchat/_session.py b/fbchat/_session.py index 6e34d67..2bedefc 100644 --- a/fbchat/_session.py +++ b/fbchat/_session.py @@ -308,7 +308,7 @@ class Session: # Fall back to searching with a regex res = FB_DTSG_REGEX.search(r.text) if not res: - raise ValueError("Failed loading session, could not find fb_dtsg") + raise _exception.NotLoggedIn("Could not find fb_dtsg") fb_dtsg = res.group(1) revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0])