Merge pull request #507 from carpedm20/refactor-limits

Refactor method limits
This commit is contained in:
Mads Marquart
2020-01-14 22:13:58 +01:00
committed by GitHub
7 changed files with 267 additions and 156 deletions

View File

@@ -1,4 +1,3 @@
import itertools
import fbchat
session = fbchat.Session.login("<email>", "<password>")
@@ -62,7 +61,9 @@ print("thread's name: {}".format(thread.name))
# Here should be an example of `getUnread`
# Print image url for 20 last images from thread.
images = thread.fetch_images()
for image in itertools.islice(image, 20):
print(image.large_preview_url)
# Print image url for up to 20 last images from thread.
images = list(thread.fetch_images(limit=20))
for image in images:
if isinstance(image, fbchat.ImageAttachment):
url = c.fetch_image_url(image.id)
print(url)

View File

@@ -3,7 +3,7 @@ import time
import requests
from ._core import log
from . import _util, _graphql, _session, _poll, _user
from . import _util, _graphql, _session, _poll, _user, _page, _group, _thread, _message
from ._exception import FBchatException, FBchatFacebookError
from ._thread import ThreadLocation
@@ -24,7 +24,7 @@ from ._quick_reply import (
)
from ._plan import PlanData
from typing import Sequence
from typing import Sequence, Iterable, Tuple, Optional
class Client:
@@ -78,7 +78,7 @@ class Client:
users.append(_user.UserData._from_all_fetch(self.session, data))
return users
def search_for_users(self, name, limit=10):
def search_for_users(self, name: str, limit: int) -> Iterable[_user.UserData]:
"""Find and get users by their name.
Args:
@@ -87,92 +87,71 @@ class Client:
Returns:
list: `User` objects, ordered by relevance
Raises:
FBchatException: If request failed
"""
params = {"search": name, "limit": limit}
(j,) = self.session._graphql_requests(
_graphql.from_query(_graphql.SEARCH_USER, params)
)
return [
return (
UserData._from_graphql(self.session, node)
for node in j[name]["users"]["nodes"]
]
)
def search_for_pages(self, name, limit=10):
def search_for_pages(self, name: str, limit: int) -> Iterable[_page.PageData]:
"""Find and get pages by their name.
Args:
name: Name of the page
Returns:
list: `Page` objects, ordered by relevance
Raises:
FBchatException: If request failed
limit: The max. amount of pages to fetch
"""
params = {"search": name, "limit": limit}
(j,) = self.session._graphql_requests(
_graphql.from_query(_graphql.SEARCH_PAGE, params)
)
return [
return (
PageData._from_graphql(self.session, node)
for node in j[name]["pages"]["nodes"]
]
)
def search_for_groups(self, name, limit=10):
def search_for_groups(self, name: str, limit: int) -> Iterable[_group.GroupData]:
"""Find and get group threads by their name.
Args:
name: Name of the group thread
limit: The max. amount of groups to fetch
Returns:
list: `Group` objects, ordered by relevance
Raises:
FBchatException: If request failed
"""
params = {"search": name, "limit": limit}
(j,) = self.session._graphql_requests(
_graphql.from_query(_graphql.SEARCH_GROUP, params)
)
return [
return (
GroupData._from_graphql(self.session, node)
for node in j["viewer"]["groups"]["nodes"]
]
)
def search_for_threads(self, name, limit=10):
def search_for_threads(self, name: str, limit: int) -> Iterable[_thread.ThreadABC]:
"""Find and get threads by their name.
Args:
name: Name of the thread
limit: The max. amount of groups to fetch
Returns:
list: `User`, `Group` and `Page` objects, ordered by relevance
Raises:
FBchatException: If request failed
limit: The max. amount of threads to fetch
"""
params = {"search": name, "limit": limit}
(j,) = self.session._graphql_requests(
_graphql.from_query(_graphql.SEARCH_THREAD, params)
)
rtn = []
for node in j[name]["threads"]["nodes"]:
if node["__typename"] == "User":
rtn.append(UserData._from_graphql(self.session, node))
yield UserData._from_graphql(self.session, node)
elif node["__typename"] == "MessageThread":
# MessageThread => Group thread
rtn.append(GroupData._from_graphql(self.session, node))
yield GroupData._from_graphql(self.session, node)
elif node["__typename"] == "Page":
rtn.append(PageData._from_graphql(self.session, node))
yield PageData._from_graphql(self.session, node)
elif node["__typename"] == "Group":
# We don't handle Facebook "Groups"
pass
@@ -181,39 +160,68 @@ class Client:
"Unknown type {} in {}".format(repr(node["__typename"]), node)
)
def _search_messages(self, query, offset, limit):
data = {"query": query, "offset": offset, "limit": limit}
j = self.session._payload_post("/ajax/mercury/search_snippets.php?dpr=1", data)
total_snippets = j["search_snippets"][query]
rtn = []
for node in j["graphql_payload"]["message_threads"]:
type_ = node["thread_type"]
if type_ == "GROUP":
thread = Group(
session=self.session, id=node["thread_key"]["thread_fbid"]
)
elif type_ == "ONE_TO_ONE":
thread = _thread.Thread(
session=self.session, id=node["thread_key"]["other_user_id"]
)
# if True: # TODO: This check!
# thread = UserData._from_graphql(self.session, node)
# else:
# thread = PageData._from_graphql(self.session, node)
else:
thread = None
log.warning("Unknown thread type %s, data: %s", type_, node)
if thread:
rtn.append((thread, total_snippets[thread.id]["num_total_snippets"]))
else:
rtn.append((None, 0))
return rtn
def search(self, query, fetch_messages=False, thread_limit=5, message_limit=5):
def search_messages(
self, query: str, limit: Optional[int]
) -> Iterable[Tuple[_thread.ThreadABC, int]]:
"""Search for messages in all threads.
Intended to be used alongside `ThreadABC.search_messages`
Warning! If someone send a message to a thread that matches the query, while
we're searching, some snippets will get returned twice.
Not sure if we should handle it, Facebook's implementation doesn't...
Args:
query: Text to search for
fetch_messages: Whether to fetch `Message` objects or IDs only
thread_limit (int): Max. number of threads to retrieve
message_limit (int): Max. number of messages to retrieve
limit: Max. number of threads to retrieve. If ``None``, all threads will be
retrieved.
Returns:
typing.Dict[str, typing.Iterable]: Dictionary with thread IDs as keys and iterables to get messages as values
Raises:
FBchatException: If request failed
Iterable with tuples of threads, and the total amount of matches.
"""
data = {"query": query, "snippetLimit": thread_limit}
j = self.session._payload_post("/ajax/mercury/search_snippets.php?dpr=1", data)
result = j["search_snippets"][query]
if not result:
return {}
if fetch_messages:
search_method = self.search_for_messages
else:
search_method = self.search_for_message_ids
return {
thread_id: search_method(query, limit=message_limit, thread_id=thread_id)
for thread_id in result
}
offset = 0
# The max limit is measured empirically to ~500, safe default chosen below
for limit in _util.get_limits(limit, max_limit=100):
data = self._search_messages(query, offset, limit)
for thread, total_snippets in data:
if thread:
yield (thread, total_snippets)
if len(data) < limit:
return # No more data to fetch
offset += limit
def _fetch_info(self, *ids):
data = {"ids[{}]".format(i): _id for i, _id in enumerate(ids)}
@@ -317,33 +325,10 @@ class Client:
return rtn
def fetch_thread_list(
self, limit=20, thread_location=ThreadLocation.INBOX, before=None
):
"""Fetch the client's thread list.
Args:
limit (int): Max. number of threads to retrieve. Capped at 20
thread_location (ThreadLocation): INBOX, PENDING, ARCHIVED or OTHER
before (datetime.datetime): The point from which to retrieve threads
Returns:
list: `Thread` objects
Raises:
FBchatException: If request failed
"""
if limit > 20 or limit < 1:
raise ValueError("`limit` should be between 1 and 20")
if thread_location in ThreadLocation:
loc_str = thread_location.value
else:
raise TypeError('"thread_location" must be a value of ThreadLocation')
def _fetch_threads(self, limit, before, folders):
params = {
"limit": limit,
"tags": [loc_str],
"tags": folders,
"before": _util.datetime_to_millis(before) if before else None,
"includeDeliveryReceipts": True,
"includeSeqID": False,
@@ -358,15 +343,47 @@ class Client:
if _type == "GROUP":
rtn.append(GroupData._from_graphql(self.session, node))
elif _type == "ONE_TO_ONE":
user = UserData._from_thread_fetch(self.session, node)
if user:
rtn.append(user)
rtn.append(UserData._from_thread_fetch(self.session, node))
else:
raise FBchatException(
"Unknown thread type: {}, with data: {}".format(_type, node)
)
rtn.append(None)
log.warning("Unknown thread type: %s, data: %s", _type, node)
return rtn
def fetch_threads(
self, limit: Optional[int], location: ThreadLocation = ThreadLocation.INBOX,
) -> Iterable[_thread.ThreadABC]:
"""Fetch the client's thread list.
Args:
limit: Max. number of threads to retrieve. If ``None``, all threads will be
retrieved.
location: INBOX, PENDING, ARCHIVED or OTHER
"""
# This is measured empirically as 837, safe default chosen below
MAX_BATCH_LIMIT = 100
# TODO: Clean this up after implementing support for more threads types
seen_ids = set()
before = None
for limit in _util.get_limits(limit, MAX_BATCH_LIMIT):
threads = self._fetch_threads(limit, before, [location.value])
before = None
for thread in threads:
# Don't return seen and unknown threads
if thread and thread.id not in seen_ids:
seen_ids.add(thread.id)
# TODO: Ensure type-wise that .last_active is available
before = thread.last_active
yield thread
if len(threads) < MAX_BATCH_LIMIT:
return # No more data to fetch
# We check this here in case _fetch_threads only returned `None` threads
if not before:
raise ValueError("Too many unknown threads.")
def fetch_unread(self):
"""Fetch unread threads.

View File

@@ -91,8 +91,6 @@ class ImageAttachment(Attachment):
@classmethod
def _from_list(cls, data):
data = data["node"]
previews = {
Image._from_uri_or_none(data["image"]),
Image._from_uri(data["image1"]),
@@ -156,7 +154,6 @@ class VideoAttachment(Attachment):
@classmethod
def _from_list(cls, data):
data = data["node"]
previews = {
Image._from_uri(data["image"]),
Image._from_uri(data["image1"]),

View File

@@ -170,6 +170,34 @@ class Message:
return result, mentions
@attrs_default
class MessageSnippet(Message):
"""Represents data in a Facebook message snippet.
Inherits `Message`.
"""
#: ID of the sender
author = attr.ib()
#: Datetime of when the message was sent
created_at = attr.ib()
#: The actual message
text = attr.ib()
#: A dict with offsets, mapped to the matched text
matched_keywords = attr.ib()
@classmethod
def _parse(cls, thread, data):
return cls(
thread=thread,
id=data["message_id"],
author=data["author"].rstrip("fbid:"),
created_at=_util.millis_to_datetime(data["timestamp"]),
text=data["body"],
matched_keywords={int(k): v for k, v in data["matched_keywords"].items()},
)
@attrs_default
class MessageData(Message):
"""Represents data in a Facebook message.

View File

@@ -250,20 +250,9 @@ class ThreadABC(metaclass=abc.ABCMeta):
# )
# return self.send(Message(text=payload, quick_replies=[new]))
def search_messages(
self, query: str, offset: int = 0, limit: int = 5
) -> Iterable[str]:
"""Find and get message IDs by query.
def _search_messages(self, query, offset, limit):
from . import _message
Args:
query: Text to search for
offset (int): Number of messages to skip
limit (int): Max. number of messages to retrieve
Returns:
typing.Iterable: Found Message IDs
"""
# TODO: Return proper searchable iterator
data = {
"query": query,
"snippetOffset": offset,
@@ -273,33 +262,54 @@ class ThreadABC(metaclass=abc.ABCMeta):
}
j = self.session._payload_post("/ajax/mercury/search_snippets.php?dpr=1", data)
result = j["search_snippets"][query]
snippets = result[self.id]["snippets"] if result.get(self.id) else []
for snippet in snippets:
yield snippet["message_id"]
result = j["search_snippets"][query].get(self.id)
if not result:
return (0, [])
def fetch_messages(self, limit: int = 20, before: datetime.datetime = None):
"""Fetch messages in a thread, ordered by most recent.
# TODO: May or may not be a good idea to attach the current thread?
# For now, we just create a new thread:
thread = self.__class__(session=self.session, id=self.id)
snippets = [
_message.MessageSnippet._parse(thread, snippet)
for snippet in result["snippets"]
]
return (result["num_total_snippets"], snippets)
def search_messages(self, query: str, limit: int) -> Iterable["MessageSnippet"]:
"""Find and get message IDs by query.
Warning! If someone send a message to the thread that matches the query, while
we're searching, some snippets will get returned twice.
Not sure if we should handle it, Facebook's implementation doesn't...
Args:
limit: Max. number of messages to retrieve
before: The point from which to retrieve messages
Returns:
list: `Message` objects
query: Text to search for
limit: Max. number of message snippets to retrieve
"""
offset = 0
# The max limit is measured empirically to 420, safe default chosen below
for limit in _util.get_limits(limit, max_limit=50):
_, snippets = self._search_messages(query, offset, limit)
yield from snippets
if len(snippets) < limit:
return # No more data to fetch
offset += limit
def _fetch_messages(self, limit, before):
from . import _message
# TODO: Return proper searchable iterator
params = {
"id": self.id,
"message_limit": limit,
"load_messages": True,
"load_read_receipts": True,
# "load_delivery_receipts": False,
# "is_work_teamwork_not_putting_muted_in_unreads": False,
"before": _util.datetime_to_millis(before) if before else None,
}
(j,) = self.session._graphql_requests(
_graphql.from_doc_id("1860982147341344", params)
_graphql.from_doc_id("1860982147341344", params) # 2696825200377124
)
if j.get("message_thread") is None:
@@ -307,48 +317,86 @@ class ThreadABC(metaclass=abc.ABCMeta):
"Could not fetch thread {}: {}".format(self.id, j)
)
# TODO: Should we parse the returned thread data, too?
read_receipts = j["message_thread"]["read_receipts"]["nodes"]
# TODO: May or may not be a good idea to attach the current thread?
# For now, we just create a new thread:
thread = self.__class__(session=self.session, id=self.id)
messages = [
return [
_message.MessageData._from_graphql(thread, message, read_receipts)
for message in j["message_thread"]["messages"]["nodes"]
]
messages.reverse()
return messages
def fetch_messages(self, limit: Optional[int]) -> Iterable["_message.Message"]:
"""Fetch messages in a thread, with most recent messages first.
def fetch_images(self):
"""Fetch images/videos posted in the thread."""
# TODO: Return proper searchable iterator
data = {"id": self.id, "first": 48}
Args:
limit: Max. number of threads to retrieve. If ``None``, all threads will be
retrieved.
"""
# This is measured empirically as 210 in extreme cases, fairly safe default
# chosen below
MAX_BATCH_LIMIT = 100
before = None
for limit in _util.get_limits(limit, MAX_BATCH_LIMIT):
messages = self._fetch_messages(limit, before)
if before:
# Strip the first thread
yield from messages[1:]
else:
yield from messages
if len(messages) < MAX_BATCH_LIMIT:
return # No more data to fetch
before = messages[-1].created_at
def _fetch_images(self, limit, after):
data = {"id": self.id, "first": limit, "after": after}
(j,) = self.session._graphql_requests(
_graphql.from_query_id("515216185516880", data)
)
while True:
try:
i = j[self.id]["message_shared_media"]["edges"][0]
except IndexError:
if j[self.id]["message_shared_media"]["page_info"].get("has_next_page"):
data["after"] = j[self.id]["message_shared_media"]["page_info"].get(
"end_cursor"
)
(j,) = self.session._graphql_requests(
_graphql.from_query_id("515216185516880", data)
)
continue
else:
break
if i["node"].get("__typename") == "MessageImage":
yield _file.ImageAttachment._from_list(i)
elif i["node"].get("__typename") == "MessageVideo":
yield _file.VideoAttachment._from_list(i)
result = j[self.id]["message_shared_media"]
print(len(result["edges"]))
rtn = []
for edge in result["edges"]:
node = edge["node"]
type_ = node["__typename"]
if type_ == "MessageImage":
rtn.append(_file.ImageAttachment._from_list(node))
elif type_ == "MessageVideo":
rtn.append(_file.VideoAttachment._from_list(node))
else:
yield _attachment.Attachment(id=i["node"].get("legacy_attachment_id"))
del j[self.id]["message_shared_media"]["edges"][0]
log.warning("Unknown image type %s, data: %s", type_, edge)
rtn.append(None)
# result["page_info"]["has_next_page"] is not correct when limit > 12
return (result["page_info"]["end_cursor"], rtn)
def fetch_images(self, limit: int) -> Iterable[_attachment.Attachment]:
"""Fetch images/videos posted in the thread.
Args:
limit: Max. number of images to retrieve. If ``None``, all images will be
retrieved.
"""
cursor = None
# The max limit on this request is unknown, so we set it reasonably high
# This way `limit=None` also still works
for limit in _util.get_limits(limit, max_limit=1000):
cursor, images = self._fetch_images(limit, cursor)
if not images:
return # No more data to fetch
for image in images:
if image:
yield image
def set_nickname(self, user_id: str, nickname: str):
"""Change the nickname of a user in the thread.

View File

@@ -13,6 +13,8 @@ from ._exception import (
FBchatPleaseRefresh,
)
from typing import Iterable, Optional
#: Default list of user agents
USER_AGENTS = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
@@ -24,6 +26,24 @@ USER_AGENTS = [
]
def get_limits(limit: Optional[int], max_limit: int) -> Iterable[int]:
"""Helper that generates limits based on a max limit."""
if limit is None:
# Generate infinite items
while True:
yield max_limit
if limit < 0:
raise ValueError("Limit cannot be negative")
# Generate n items
yield from [max_limit] * (limit // max_limit)
remainder = limit % max_limit
if remainder:
yield remainder
def now():
return int(time.time() * 1000)

View File

@@ -46,7 +46,7 @@ def test_imageattachment_from_list():
height=988,
),
},
) == ImageAttachment._from_list({"node": data})
) == ImageAttachment._from_list(data)
def test_videoattachment_from_list():
@@ -88,7 +88,7 @@ def test_videoattachment_from_list():
height=368,
),
},
) == VideoAttachment._from_list({"node": data})
) == VideoAttachment._from_list(data)
def test_graphql_to_attachment_empty():