diff --git a/fbchat/_client.py b/fbchat/_client.py index a69170a..f3c3d05 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -1,6 +1,4 @@ import datetime -import time -import requests from ._core import log from . import ( @@ -654,7 +652,9 @@ class Client: self.on_color_change( mid=mid, author_id=author_id, - new_color=ThreadABC._parse_color(delta["untypedData"]["theme_color"]), + new_color=_thread.ThreadABC._parse_color( + delta["untypedData"]["theme_color"] + ), thread=get_thread(metadata), at=at, metadata=metadata, @@ -664,6 +664,7 @@ class Client: locations = [ ThreadLocation(folder.lstrip("FOLDER_")) for folder in delta["folders"] ] + at = _util.millis_to_datetime(int(delta["timestamp"])) self._on_seen(locations=locations, at=at) # Emoji change @@ -685,7 +686,7 @@ class Client: mid=mid, author_id=author_id, new_title=new_title, - thread=get_thread(metadata), + group=get_thread(metadata), at=at, metadata=metadata, ) @@ -696,7 +697,7 @@ class Client: if mid is None: self.on_unknown_messsage_type(msg=delta) else: - group = get_thread(metadata) + group = get_thread(delta) fetch_info = group._forced_fetch(mid) fetch_data = fetch_info["message"] author_id = fetch_data["message_sender"]["id"] @@ -740,7 +741,7 @@ class Client: mid=mid, added_id=target_id, author_id=author_id, - thread=get_thread(metadata), + group=get_thread(metadata), at=at, ) elif admin_event == "remove_admin": @@ -748,7 +749,7 @@ class Client: mid=mid, removed_id=target_id, author_id=author_id, - thread=get_thread(metadata), + group=get_thread(metadata), at=at, ) @@ -759,7 +760,7 @@ class Client: mid=mid, approval_mode=approval_mode, author_id=author_id, - thread=get_thread(metadata), + group=get_thread(metadata), at=at, ) @@ -773,7 +774,7 @@ class Client: self.on_message_delivered( msg_ids=message_ids, delivered_for=delivered_for, - thread=get_thread(metadata), + thread=get_thread(delta), at=at, metadata=metadata, ) @@ -785,7 +786,7 @@ class Client: at = _util.millis_to_datetime(int(delta["watermarkTimestampMs"])) self.on_message_seen( seen_by=seen_by, - thread=get_thread(metadata), + thread=get_thread(delta), seen_at=seen_at, at=at, metadata=metadata, @@ -978,15 +979,12 @@ class Client: mid=mid, reaction=i.get("reaction"), author_id=author_id, - thread=get_thread(metadata), + thread=get_thread(i), at=at, ) else: self.on_reaction_removed( - mid=mid, - author_id=author_id, - thread=get_thread(metadata), - at=at, + mid=mid, author_id=author_id, thread=get_thread(i), at=at, ) # Viewer status change @@ -998,11 +996,11 @@ class Client: if reason == 2: if can_reply: self.on_unblock( - author_id=author_id, thread=get_thread(metadata), at=at, + author_id=author_id, thread=get_thread(i), at=at ) else: self.on_block( - author_id=author_id, thread=get_thread(metadata), at=at, + author_id=author_id, thread=get_thread(i), at=at ) # Live location info @@ -1016,7 +1014,7 @@ class Client: mid=mid, location=location, author_id=author_id, - thread=get_thread(metadata), + thread=get_thread(i), at=at, ) @@ -1027,18 +1025,19 @@ class Client: at = _util.millis_to_datetime(i["deletionTimestamp"]) author_id = str(i["senderID"]) self.on_message_unsent( - mid=mid, - author_id=author_id, - thread=get_thread(metadata), - at=at, + mid=mid, author_id=author_id, thread=get_thread(i), at=at ) elif d.get("deltaMessageReply"): i = d["deltaMessageReply"] - thread = get_thread(metadata) metadata = i["message"]["messageMetadata"] - replied_to = MessageData._from_reply(thread, i["repliedToMessage"]) - message = MessageData._from_reply(thread, i["message"], replied_to) + thread = get_thread(metadata) + replied_to = _message.MessageData._from_reply( + thread, i["repliedToMessage"] + ) + message = _message.MessageData._from_reply( + thread, i["message"], replied_to + ) self.on_message( mid=message.id, author_id=message.author, @@ -1048,13 +1047,16 @@ class Client: metadata=metadata, ) + else: + self.on_unknown_messsage_type(msg=d) + # New message elif delta.get("class") == "NewMessage": thread = get_thread(metadata) self.on_message( mid=mid, author_id=author_id, - message_object=MessageData._from_pull( + message_object=_message.MessageData._from_pull( thread, delta, mid=mid, @@ -1099,11 +1101,8 @@ class Client: thread = _group.Group(session=self.session, id=str(thread_id)) else: thread = _user.User(session=self.session, id=author_id) - typing_status = TypingStatus(m.get("state")) self.on_typing( - author_id=author_id, - status=typing_status, - thread=thread, + author_id=author_id, status=m["state"] == 1, thread=thread, ) # Other notifications @@ -1139,32 +1138,23 @@ class Client: except Exception as e: self.on_message_error(exception=e, msg=data) - def startListening(self): - """Start listening from an external event loop. - - Raises: - FBchatException: If request failed - """ + def _start_listening(self): if not self._mqtt: self._mqtt = _mqtt.Mqtt.connect( - state=self.session, + session=self.session, on_message=self._parse_message, chat_on=self._mark_alive, foreground=True, ) - # Backwards compat - self.on_qprimer(ts=now(), msg=None) def _do_one_listen(self): # TODO: Remove this wierd check, and let the user handle the chat_on parameter if self._mark_alive != self._mqtt._chat_on: self._mqtt.set_chat_on(self._mark_alive) - # TODO: Remove on_error param - return self._mqtt.loop_once(on_error=lambda e: self.on_listen_error(exception=e)) + return self._mqtt.loop_once() - def stopListening(self): - """Stop the listening loop.""" + def _stop_listening(self): if not self._mqtt: return self._mqtt.disconnect() @@ -1181,12 +1171,12 @@ class Client: if markAlive is not None: self.set_active_status(markAlive) - self.on_listening() + self._start_listening() while self._do_one_listen(): pass - self._sticky, self._pool = (None, None) + self._stop_listening() def set_active_status(self, markAlive): """Change active status while listening. @@ -1204,22 +1194,6 @@ class Client: EVENTS """ - def on_listening(self): - """Called when the client is listening.""" - log.info("Listening...") - - def on_listen_error(self, exception=None): - """Called when an error was encountered while listening. - - Args: - exception: The exception that was encountered - - Returns: - Whether the loop should keep running - """ - log.exception("Got exception while listening") - return True - def on_message( self, mid=None, @@ -1810,14 +1784,6 @@ class Client: "{} won't take part in {} in {} ({})".format(author_id, plan, thread) ) - def on_qprimer(self, at=None): - """Called when the client just started listening. - - Args: - at (datetime.datetime): When the action was executed - """ - pass - def on_chat_timestamp(self, buddylist=None): """Called when the client receives chat online presence update. diff --git a/fbchat/_message.py b/fbchat/_message.py index e7738f1..b230477 100644 --- a/fbchat/_message.py +++ b/fbchat/_message.py @@ -387,7 +387,7 @@ class MessageData(Message): image_metadata = a.get("imageMetadata", {}) attach_type = mercury["blob_attachment"]["__typename"] attachment = _file.graphql_to_attachment( - mercury["blob_attachment"], a["fileSize"] + mercury["blob_attachment"], a.get("fileSize") ) attachments.append(attachment) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index a9f6e31..f4f0a25 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -1,10 +1,19 @@ import attr import random import paho.mqtt.client +import requests from ._core import log from . import _util, _exception, _graphql +def get_cookie_header(session, url): + """Extract a cookie header from a requests session.""" + # The cookies are extracted this way to make sure they're escaped correctly + return requests.cookies.get_cookie_header( + session.cookies, requests.Request("GET", url), + ) + + def generate_session_id(): """Generate a random session ID between 1 and 9007199254740991.""" return random.randint(1, 2 ** 53) @@ -12,7 +21,7 @@ def generate_session_id(): @attr.s(slots=True) class Mqtt(object): - _state = attr.ib() + _session = attr.ib() _mqtt = attr.ib() _on_message = attr.ib() _chat_on = attr.ib() @@ -23,7 +32,7 @@ class Mqtt(object): _HOST = "edge-chat.facebook.com" @classmethod - def connect(cls, state, on_message, chat_on, foreground): + def connect(cls, session, on_message, chat_on, foreground): mqtt = paho.mqtt.client.Client( client_id="mqttwsclient", clean_session=True, @@ -39,12 +48,12 @@ class Mqtt(object): mqtt.tls_set() self = cls( - state=state, + session=session, mqtt=mqtt, on_message=on_message, chat_on=chat_on, foreground=foreground, - sequence_id=cls._fetch_sequence_id(state), + sequence_id=cls._fetch_sequence_id(session), ) # Configure callbacks @@ -103,7 +112,7 @@ class Mqtt(object): self._on_message(message.topic, j) @staticmethod - def _fetch_sequence_id(state): + def _fetch_sequence_id(session): """Fetch sequence ID.""" params = { "limit": 1, @@ -114,7 +123,9 @@ class Mqtt(object): } log.debug("Fetching MQTT sequence ID") # Same request as in `Client.fetchThreadList` - (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + (j,) = session._graphql_requests( + _graphql.from_doc_id("1349387578499440", params) + ) try: return int(j["viewer"]["message_threads"]["sync_sequence_id"]) except (KeyError, ValueError): @@ -135,7 +146,7 @@ class Mqtt(object): "max_deltas_able_to_process": 1000, "delta_batch_size": 500, "encoding": "JSON", - "entity_fbid": self._state.user_id, + "entity_fbid": self._session.user_id, } # If we don't have a sync_token, create a new messenger queue @@ -192,7 +203,7 @@ class Mqtt(object): username = { # The user ID - "u": self._state.user_id, + "u": self._session.user_id, # Session ID "s": session_id, # Active status setting @@ -200,7 +211,7 @@ class Mqtt(object): # foreground_state - Whether the window is focused "fg": self._foreground, # Can be any random ID - "d": self._state._client_id, + "d": self._session._client_id, # Application ID, taken from facebook.com "aid": 219994525426954, # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing @@ -235,10 +246,10 @@ class Mqtt(object): headers = { # TODO: Make this access thread safe - "Cookie": _util.get_cookie_header( - self._state._session, "https://edge-chat.facebook.com/chat" + "Cookie": get_cookie_header( + self._session._session, "https://edge-chat.facebook.com/chat" ), - "User-Agent": self._state._session.headers["User-Agent"], + "User-Agent": self._session._session.headers["User-Agent"], "Origin": "https://www.facebook.com", "Host": self._HOST, } @@ -247,7 +258,7 @@ class Mqtt(object): path="/chat?sid={}".format(session_id), headers=headers ) - def loop_once(self, on_error=None): + def loop_once(self): """Run the listening loop once. Returns whether to keep listening or not. @@ -269,9 +280,6 @@ class Mqtt(object): else: err = paho.mqtt.client.error_string(rc) log.error("MQTT Error: %s", err) - # For backwards compatibility - if on_error: - on_error(_exception.FBchatException("MQTT Error {}".format(err))) # Wait before reconnecting self._mqtt._reconnect_wait() diff --git a/fbchat/_user.py b/fbchat/_user.py index fae76e3..e72361d 100644 --- a/fbchat/_user.py +++ b/fbchat/_user.py @@ -206,4 +206,8 @@ class ActiveStatus: @classmethod def _from_orca_presence(cls, data): # TODO: Handle `c` and `vc` keys (Probably some binary data) - return cls(active=data["p"] in [2, 3], last_active=_util.millis_to_datetime(data["l"]), in_game=None) + return cls( + active=data["p"] in [2, 3], + last_active=_util.seconds_to_datetime(data["l"]) if "l" in data else None, + in_game=None, + ) diff --git a/fbchat/_util.py b/fbchat/_util.py index c66bc7e..3ff0520 100644 --- a/fbchat/_util.py +++ b/fbchat/_util.py @@ -55,14 +55,6 @@ def strip_json_cruft(text): raise _exception.ParseError("No JSON object found", data=text) from e -def get_cookie_header(session, url): - """Extract a cookie header from a requests session.""" - # The cookies are extracted this way to make sure they're escaped correctly - return requests.cookies.get_cookie_header( - session.cookies, requests.Request("GET", url), - ) - - def get_decoded_r(r): return get_decoded(r._content)