Fix merge error, fix listening and clean up

This commit is contained in:
Mads Marquart
2020-01-15 16:42:21 +01:00
parent d2f8acb68f
commit dbc88bc4ed
5 changed files with 65 additions and 95 deletions

View File

@@ -1,6 +1,4 @@
import datetime import datetime
import time
import requests
from ._core import log from ._core import log
from . import ( from . import (
@@ -654,7 +652,9 @@ class Client:
self.on_color_change( self.on_color_change(
mid=mid, mid=mid,
author_id=author_id, author_id=author_id,
new_color=ThreadABC._parse_color(delta["untypedData"]["theme_color"]), new_color=_thread.ThreadABC._parse_color(
delta["untypedData"]["theme_color"]
),
thread=get_thread(metadata), thread=get_thread(metadata),
at=at, at=at,
metadata=metadata, metadata=metadata,
@@ -664,6 +664,7 @@ class Client:
locations = [ locations = [
ThreadLocation(folder.lstrip("FOLDER_")) for folder in delta["folders"] ThreadLocation(folder.lstrip("FOLDER_")) for folder in delta["folders"]
] ]
at = _util.millis_to_datetime(int(delta["timestamp"]))
self._on_seen(locations=locations, at=at) self._on_seen(locations=locations, at=at)
# Emoji change # Emoji change
@@ -685,7 +686,7 @@ class Client:
mid=mid, mid=mid,
author_id=author_id, author_id=author_id,
new_title=new_title, new_title=new_title,
thread=get_thread(metadata), group=get_thread(metadata),
at=at, at=at,
metadata=metadata, metadata=metadata,
) )
@@ -696,7 +697,7 @@ class Client:
if mid is None: if mid is None:
self.on_unknown_messsage_type(msg=delta) self.on_unknown_messsage_type(msg=delta)
else: else:
group = get_thread(metadata) group = get_thread(delta)
fetch_info = group._forced_fetch(mid) fetch_info = group._forced_fetch(mid)
fetch_data = fetch_info["message"] fetch_data = fetch_info["message"]
author_id = fetch_data["message_sender"]["id"] author_id = fetch_data["message_sender"]["id"]
@@ -740,7 +741,7 @@ class Client:
mid=mid, mid=mid,
added_id=target_id, added_id=target_id,
author_id=author_id, author_id=author_id,
thread=get_thread(metadata), group=get_thread(metadata),
at=at, at=at,
) )
elif admin_event == "remove_admin": elif admin_event == "remove_admin":
@@ -748,7 +749,7 @@ class Client:
mid=mid, mid=mid,
removed_id=target_id, removed_id=target_id,
author_id=author_id, author_id=author_id,
thread=get_thread(metadata), group=get_thread(metadata),
at=at, at=at,
) )
@@ -759,7 +760,7 @@ class Client:
mid=mid, mid=mid,
approval_mode=approval_mode, approval_mode=approval_mode,
author_id=author_id, author_id=author_id,
thread=get_thread(metadata), group=get_thread(metadata),
at=at, at=at,
) )
@@ -773,7 +774,7 @@ class Client:
self.on_message_delivered( self.on_message_delivered(
msg_ids=message_ids, msg_ids=message_ids,
delivered_for=delivered_for, delivered_for=delivered_for,
thread=get_thread(metadata), thread=get_thread(delta),
at=at, at=at,
metadata=metadata, metadata=metadata,
) )
@@ -785,7 +786,7 @@ class Client:
at = _util.millis_to_datetime(int(delta["watermarkTimestampMs"])) at = _util.millis_to_datetime(int(delta["watermarkTimestampMs"]))
self.on_message_seen( self.on_message_seen(
seen_by=seen_by, seen_by=seen_by,
thread=get_thread(metadata), thread=get_thread(delta),
seen_at=seen_at, seen_at=seen_at,
at=at, at=at,
metadata=metadata, metadata=metadata,
@@ -978,15 +979,12 @@ class Client:
mid=mid, mid=mid,
reaction=i.get("reaction"), reaction=i.get("reaction"),
author_id=author_id, author_id=author_id,
thread=get_thread(metadata), thread=get_thread(i),
at=at, at=at,
) )
else: else:
self.on_reaction_removed( self.on_reaction_removed(
mid=mid, mid=mid, author_id=author_id, thread=get_thread(i), at=at,
author_id=author_id,
thread=get_thread(metadata),
at=at,
) )
# Viewer status change # Viewer status change
@@ -998,11 +996,11 @@ class Client:
if reason == 2: if reason == 2:
if can_reply: if can_reply:
self.on_unblock( self.on_unblock(
author_id=author_id, thread=get_thread(metadata), at=at, author_id=author_id, thread=get_thread(i), at=at
) )
else: else:
self.on_block( self.on_block(
author_id=author_id, thread=get_thread(metadata), at=at, author_id=author_id, thread=get_thread(i), at=at
) )
# Live location info # Live location info
@@ -1016,7 +1014,7 @@ class Client:
mid=mid, mid=mid,
location=location, location=location,
author_id=author_id, author_id=author_id,
thread=get_thread(metadata), thread=get_thread(i),
at=at, at=at,
) )
@@ -1027,18 +1025,19 @@ class Client:
at = _util.millis_to_datetime(i["deletionTimestamp"]) at = _util.millis_to_datetime(i["deletionTimestamp"])
author_id = str(i["senderID"]) author_id = str(i["senderID"])
self.on_message_unsent( self.on_message_unsent(
mid=mid, mid=mid, author_id=author_id, thread=get_thread(i), at=at
author_id=author_id,
thread=get_thread(metadata),
at=at,
) )
elif d.get("deltaMessageReply"): elif d.get("deltaMessageReply"):
i = d["deltaMessageReply"] i = d["deltaMessageReply"]
thread = get_thread(metadata)
metadata = i["message"]["messageMetadata"] metadata = i["message"]["messageMetadata"]
replied_to = MessageData._from_reply(thread, i["repliedToMessage"]) thread = get_thread(metadata)
message = MessageData._from_reply(thread, i["message"], replied_to) replied_to = _message.MessageData._from_reply(
thread, i["repliedToMessage"]
)
message = _message.MessageData._from_reply(
thread, i["message"], replied_to
)
self.on_message( self.on_message(
mid=message.id, mid=message.id,
author_id=message.author, author_id=message.author,
@@ -1048,13 +1047,16 @@ class Client:
metadata=metadata, metadata=metadata,
) )
else:
self.on_unknown_messsage_type(msg=d)
# New message # New message
elif delta.get("class") == "NewMessage": elif delta.get("class") == "NewMessage":
thread = get_thread(metadata) thread = get_thread(metadata)
self.on_message( self.on_message(
mid=mid, mid=mid,
author_id=author_id, author_id=author_id,
message_object=MessageData._from_pull( message_object=_message.MessageData._from_pull(
thread, thread,
delta, delta,
mid=mid, mid=mid,
@@ -1099,11 +1101,8 @@ class Client:
thread = _group.Group(session=self.session, id=str(thread_id)) thread = _group.Group(session=self.session, id=str(thread_id))
else: else:
thread = _user.User(session=self.session, id=author_id) thread = _user.User(session=self.session, id=author_id)
typing_status = TypingStatus(m.get("state"))
self.on_typing( self.on_typing(
author_id=author_id, author_id=author_id, status=m["state"] == 1, thread=thread,
status=typing_status,
thread=thread,
) )
# Other notifications # Other notifications
@@ -1139,32 +1138,23 @@ class Client:
except Exception as e: except Exception as e:
self.on_message_error(exception=e, msg=data) self.on_message_error(exception=e, msg=data)
def startListening(self): def _start_listening(self):
"""Start listening from an external event loop.
Raises:
FBchatException: If request failed
"""
if not self._mqtt: if not self._mqtt:
self._mqtt = _mqtt.Mqtt.connect( self._mqtt = _mqtt.Mqtt.connect(
state=self.session, session=self.session,
on_message=self._parse_message, on_message=self._parse_message,
chat_on=self._mark_alive, chat_on=self._mark_alive,
foreground=True, foreground=True,
) )
# Backwards compat
self.on_qprimer(ts=now(), msg=None)
def _do_one_listen(self): def _do_one_listen(self):
# TODO: Remove this wierd check, and let the user handle the chat_on parameter # TODO: Remove this wierd check, and let the user handle the chat_on parameter
if self._mark_alive != self._mqtt._chat_on: if self._mark_alive != self._mqtt._chat_on:
self._mqtt.set_chat_on(self._mark_alive) self._mqtt.set_chat_on(self._mark_alive)
# TODO: Remove on_error param return self._mqtt.loop_once()
return self._mqtt.loop_once(on_error=lambda e: self.on_listen_error(exception=e))
def stopListening(self): def _stop_listening(self):
"""Stop the listening loop."""
if not self._mqtt: if not self._mqtt:
return return
self._mqtt.disconnect() self._mqtt.disconnect()
@@ -1181,12 +1171,12 @@ class Client:
if markAlive is not None: if markAlive is not None:
self.set_active_status(markAlive) self.set_active_status(markAlive)
self.on_listening() self._start_listening()
while self._do_one_listen(): while self._do_one_listen():
pass pass
self._sticky, self._pool = (None, None) self._stop_listening()
def set_active_status(self, markAlive): def set_active_status(self, markAlive):
"""Change active status while listening. """Change active status while listening.
@@ -1204,22 +1194,6 @@ class Client:
EVENTS EVENTS
""" """
def on_listening(self):
"""Called when the client is listening."""
log.info("Listening...")
def on_listen_error(self, exception=None):
"""Called when an error was encountered while listening.
Args:
exception: The exception that was encountered
Returns:
Whether the loop should keep running
"""
log.exception("Got exception while listening")
return True
def on_message( def on_message(
self, self,
mid=None, mid=None,
@@ -1810,14 +1784,6 @@ class Client:
"{} won't take part in {} in {} ({})".format(author_id, plan, thread) "{} won't take part in {} in {} ({})".format(author_id, plan, thread)
) )
def on_qprimer(self, at=None):
"""Called when the client just started listening.
Args:
at (datetime.datetime): When the action was executed
"""
pass
def on_chat_timestamp(self, buddylist=None): def on_chat_timestamp(self, buddylist=None):
"""Called when the client receives chat online presence update. """Called when the client receives chat online presence update.

View File

@@ -387,7 +387,7 @@ class MessageData(Message):
image_metadata = a.get("imageMetadata", {}) image_metadata = a.get("imageMetadata", {})
attach_type = mercury["blob_attachment"]["__typename"] attach_type = mercury["blob_attachment"]["__typename"]
attachment = _file.graphql_to_attachment( attachment = _file.graphql_to_attachment(
mercury["blob_attachment"], a["fileSize"] mercury["blob_attachment"], a.get("fileSize")
) )
attachments.append(attachment) attachments.append(attachment)

View File

@@ -1,10 +1,19 @@
import attr import attr
import random import random
import paho.mqtt.client import paho.mqtt.client
import requests
from ._core import log from ._core import log
from . import _util, _exception, _graphql from . import _util, _exception, _graphql
def get_cookie_header(session, url):
"""Extract a cookie header from a requests session."""
# The cookies are extracted this way to make sure they're escaped correctly
return requests.cookies.get_cookie_header(
session.cookies, requests.Request("GET", url),
)
def generate_session_id(): def generate_session_id():
"""Generate a random session ID between 1 and 9007199254740991.""" """Generate a random session ID between 1 and 9007199254740991."""
return random.randint(1, 2 ** 53) return random.randint(1, 2 ** 53)
@@ -12,7 +21,7 @@ def generate_session_id():
@attr.s(slots=True) @attr.s(slots=True)
class Mqtt(object): class Mqtt(object):
_state = attr.ib() _session = attr.ib()
_mqtt = attr.ib() _mqtt = attr.ib()
_on_message = attr.ib() _on_message = attr.ib()
_chat_on = attr.ib() _chat_on = attr.ib()
@@ -23,7 +32,7 @@ class Mqtt(object):
_HOST = "edge-chat.facebook.com" _HOST = "edge-chat.facebook.com"
@classmethod @classmethod
def connect(cls, state, on_message, chat_on, foreground): def connect(cls, session, on_message, chat_on, foreground):
mqtt = paho.mqtt.client.Client( mqtt = paho.mqtt.client.Client(
client_id="mqttwsclient", client_id="mqttwsclient",
clean_session=True, clean_session=True,
@@ -39,12 +48,12 @@ class Mqtt(object):
mqtt.tls_set() mqtt.tls_set()
self = cls( self = cls(
state=state, session=session,
mqtt=mqtt, mqtt=mqtt,
on_message=on_message, on_message=on_message,
chat_on=chat_on, chat_on=chat_on,
foreground=foreground, foreground=foreground,
sequence_id=cls._fetch_sequence_id(state), sequence_id=cls._fetch_sequence_id(session),
) )
# Configure callbacks # Configure callbacks
@@ -103,7 +112,7 @@ class Mqtt(object):
self._on_message(message.topic, j) self._on_message(message.topic, j)
@staticmethod @staticmethod
def _fetch_sequence_id(state): def _fetch_sequence_id(session):
"""Fetch sequence ID.""" """Fetch sequence ID."""
params = { params = {
"limit": 1, "limit": 1,
@@ -114,7 +123,9 @@ class Mqtt(object):
} }
log.debug("Fetching MQTT sequence ID") log.debug("Fetching MQTT sequence ID")
# Same request as in `Client.fetchThreadList` # Same request as in `Client.fetchThreadList`
(j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) (j,) = session._graphql_requests(
_graphql.from_doc_id("1349387578499440", params)
)
try: try:
return int(j["viewer"]["message_threads"]["sync_sequence_id"]) return int(j["viewer"]["message_threads"]["sync_sequence_id"])
except (KeyError, ValueError): except (KeyError, ValueError):
@@ -135,7 +146,7 @@ class Mqtt(object):
"max_deltas_able_to_process": 1000, "max_deltas_able_to_process": 1000,
"delta_batch_size": 500, "delta_batch_size": 500,
"encoding": "JSON", "encoding": "JSON",
"entity_fbid": self._state.user_id, "entity_fbid": self._session.user_id,
} }
# If we don't have a sync_token, create a new messenger queue # If we don't have a sync_token, create a new messenger queue
@@ -192,7 +203,7 @@ class Mqtt(object):
username = { username = {
# The user ID # The user ID
"u": self._state.user_id, "u": self._session.user_id,
# Session ID # Session ID
"s": session_id, "s": session_id,
# Active status setting # Active status setting
@@ -200,7 +211,7 @@ class Mqtt(object):
# foreground_state - Whether the window is focused # foreground_state - Whether the window is focused
"fg": self._foreground, "fg": self._foreground,
# Can be any random ID # Can be any random ID
"d": self._state._client_id, "d": self._session._client_id,
# Application ID, taken from facebook.com # Application ID, taken from facebook.com
"aid": 219994525426954, "aid": 219994525426954,
# MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing
@@ -235,10 +246,10 @@ class Mqtt(object):
headers = { headers = {
# TODO: Make this access thread safe # TODO: Make this access thread safe
"Cookie": _util.get_cookie_header( "Cookie": get_cookie_header(
self._state._session, "https://edge-chat.facebook.com/chat" self._session._session, "https://edge-chat.facebook.com/chat"
), ),
"User-Agent": self._state._session.headers["User-Agent"], "User-Agent": self._session._session.headers["User-Agent"],
"Origin": "https://www.facebook.com", "Origin": "https://www.facebook.com",
"Host": self._HOST, "Host": self._HOST,
} }
@@ -247,7 +258,7 @@ class Mqtt(object):
path="/chat?sid={}".format(session_id), headers=headers path="/chat?sid={}".format(session_id), headers=headers
) )
def loop_once(self, on_error=None): def loop_once(self):
"""Run the listening loop once. """Run the listening loop once.
Returns whether to keep listening or not. Returns whether to keep listening or not.
@@ -269,9 +280,6 @@ class Mqtt(object):
else: else:
err = paho.mqtt.client.error_string(rc) err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err) log.error("MQTT Error: %s", err)
# For backwards compatibility
if on_error:
on_error(_exception.FBchatException("MQTT Error {}".format(err)))
# Wait before reconnecting # Wait before reconnecting
self._mqtt._reconnect_wait() self._mqtt._reconnect_wait()

View File

@@ -206,4 +206,8 @@ class ActiveStatus:
@classmethod @classmethod
def _from_orca_presence(cls, data): def _from_orca_presence(cls, data):
# TODO: Handle `c` and `vc` keys (Probably some binary data) # TODO: Handle `c` and `vc` keys (Probably some binary data)
return cls(active=data["p"] in [2, 3], last_active=_util.millis_to_datetime(data["l"]), in_game=None) return cls(
active=data["p"] in [2, 3],
last_active=_util.seconds_to_datetime(data["l"]) if "l" in data else None,
in_game=None,
)

View File

@@ -55,14 +55,6 @@ def strip_json_cruft(text):
raise _exception.ParseError("No JSON object found", data=text) from e raise _exception.ParseError("No JSON object found", data=text) from e
def get_cookie_header(session, url):
"""Extract a cookie header from a requests session."""
# The cookies are extracted this way to make sure they're escaped correctly
return requests.cookies.get_cookie_header(
session.cookies, requests.Request("GET", url),
)
def get_decoded_r(r): def get_decoded_r(r):
return get_decoded(r._content) return get_decoded(r._content)