Add type hints, and clean up Client a bit

This commit is contained in:
Mads Marquart
2020-01-22 01:43:04 +01:00
parent 701fe8ffc8
commit 2644aa9b7a
19 changed files with 339 additions and 346 deletions

View File

@@ -2,13 +2,15 @@ import attr
from ._core import attrs_default, Image from ._core import attrs_default, Image
from . import _util from . import _util
from typing import Sequence
@attrs_default @attrs_default
class Attachment: class Attachment:
"""Represents a Facebook attachment.""" """Represents a Facebook attachment."""
#: The attachment ID #: The attachment ID
id = attr.ib(None) id = attr.ib(None, type=str)
@attrs_default @attrs_default
@@ -21,23 +23,23 @@ class ShareAttachment(Attachment):
"""Represents a shared item (e.g. URL) attachment.""" """Represents a shared item (e.g. URL) attachment."""
#: ID of the author of the shared post #: ID of the author of the shared post
author = attr.ib(None) author = attr.ib(None, type=str)
#: Target URL #: Target URL
url = attr.ib(None) url = attr.ib(None, type=str)
#: Original URL if Facebook redirects the URL #: Original URL if Facebook redirects the URL
original_url = attr.ib(None) original_url = attr.ib(None, type=str)
#: Title of the attachment #: Title of the attachment
title = attr.ib(None) title = attr.ib(None, type=str)
#: Description of the attachment #: Description of the attachment
description = attr.ib(None) description = attr.ib(None, type=str)
#: Name of the source #: Name of the source
source = attr.ib(None) source = attr.ib(None, type=str)
#: The attached image #: The attached image
image = attr.ib(None) image = attr.ib(None, type=Image)
#: URL of the original image if Facebook uses ``safe_image`` #: URL of the original image if Facebook uses ``safe_image``
original_image_url = attr.ib(None) original_image_url = attr.ib(None, type=str)
#: List of additional attachments #: List of additional attachments
attachments = attr.ib(factory=list) attachments = attr.ib(factory=list, type=Sequence[Attachment])
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):

View File

@@ -11,14 +11,10 @@ from . import (
_page, _page,
_group, _group,
_thread, _thread,
_message,
) )
from ._thread import ThreadLocation from typing import Sequence, Iterable, Tuple, Optional, Set
from ._user import User, UserData
from ._group import Group, GroupData
from ._page import Page, PageData
from typing import Sequence, Iterable, Tuple, Optional
@attrs_default @attrs_default
@@ -68,7 +64,7 @@ class Client:
limit: The max. amount of users to fetch limit: The max. amount of users to fetch
Returns: Returns:
list: `User` objects, ordered by relevance Users, ordered by relevance
""" """
params = {"search": name, "limit": limit} params = {"search": name, "limit": limit}
(j,) = self.session._graphql_requests( (j,) = self.session._graphql_requests(
@@ -76,7 +72,7 @@ class Client:
) )
return ( return (
UserData._from_graphql(self.session, node) _user.UserData._from_graphql(self.session, node)
for node in j[name]["users"]["nodes"] for node in j[name]["users"]["nodes"]
) )
@@ -93,7 +89,7 @@ class Client:
) )
return ( return (
PageData._from_graphql(self.session, node) _page.PageData._from_graphql(self.session, node)
for node in j[name]["pages"]["nodes"] for node in j[name]["pages"]["nodes"]
) )
@@ -110,7 +106,7 @@ class Client:
) )
return ( return (
GroupData._from_graphql(self.session, node) _group.GroupData._from_graphql(self.session, node)
for node in j["viewer"]["groups"]["nodes"] for node in j["viewer"]["groups"]["nodes"]
) )
@@ -128,12 +124,12 @@ class Client:
for node in j[name]["threads"]["nodes"]: for node in j[name]["threads"]["nodes"]:
if node["__typename"] == "User": if node["__typename"] == "User":
yield UserData._from_graphql(self.session, node) yield _user.UserData._from_graphql(self.session, node)
elif node["__typename"] == "MessageThread": elif node["__typename"] == "MessageThread":
# MessageThread => Group thread # MessageThread => Group thread
yield GroupData._from_graphql(self.session, node) yield _group.GroupData._from_graphql(self.session, node)
elif node["__typename"] == "Page": elif node["__typename"] == "Page":
yield PageData._from_graphql(self.session, node) yield _page.PageData._from_graphql(self.session, node)
elif node["__typename"] == "Group": elif node["__typename"] == "Group":
# We don't handle Facebook "Groups" # We don't handle Facebook "Groups"
pass pass
@@ -152,7 +148,7 @@ class Client:
for node in j["graphql_payload"]["message_threads"]: for node in j["graphql_payload"]["message_threads"]:
type_ = node["thread_type"] type_ = node["thread_type"]
if type_ == "GROUP": if type_ == "GROUP":
thread = Group( thread = _group.Group(
session=self.session, id=node["thread_key"]["thread_fbid"] session=self.session, id=node["thread_key"]["thread_fbid"]
) )
elif type_ == "ONE_TO_ONE": elif type_ == "ONE_TO_ONE":
@@ -160,9 +156,9 @@ class Client:
session=self.session, id=node["thread_key"]["other_user_id"] session=self.session, id=node["thread_key"]["other_user_id"]
) )
# if True: # TODO: This check! # if True: # TODO: This check!
# thread = UserData._from_graphql(self.session, node) # thread = _user.UserData._from_graphql(self.session, node)
# else: # else:
# thread = PageData._from_graphql(self.session, node) # thread = _page.PageData._from_graphql(self.session, node)
else: else:
thread = None thread = None
log.warning("Unknown thread type %s, data: %s", type_, node) log.warning("Unknown thread type %s, data: %s", type_, node)
@@ -238,20 +234,18 @@ class Client:
log.debug(entries) log.debug(entries)
return entries return entries
def fetch_thread_info(self, *thread_ids): def fetch_thread_info(self, ids: Iterable[str]) -> Iterable[_thread.ThreadABC]:
"""Fetch threads' info from IDs, unordered. """Fetch threads' info from IDs, unordered.
Warning: Warning:
Sends two requests if users or pages are present, to fetch all available info! Sends two requests if users or pages are present, to fetch all available info!
Args: Args:
thread_ids: One or more thread ID(s) to query ids: Thread ids to query
Returns:
dict: `Thread` objects, labeled by their ID
""" """
ids = list(ids)
queries = [] queries = []
for thread_id in thread_ids: for thread_id in ids:
params = { params = {
"id": thread_id, "id": thread_id,
"message_limit": 0, "message_limit": 0,
@@ -267,7 +261,7 @@ class Client:
if entry.get("message_thread") is None: if entry.get("message_thread") is None:
# If you don't have an existing thread with this person, attempt to retrieve user data anyways # If you don't have an existing thread with this person, attempt to retrieve user data anyways
j[i]["message_thread"] = { j[i]["message_thread"] = {
"thread_key": {"other_user_id": thread_ids[i]}, "thread_key": {"other_user_id": ids[i]},
"thread_type": "ONE_TO_ONE", "thread_type": "ONE_TO_ONE",
} }
@@ -280,12 +274,11 @@ class Client:
if len(pages_and_user_ids) != 0: if len(pages_and_user_ids) != 0:
pages_and_users = self._fetch_info(*pages_and_user_ids) pages_and_users = self._fetch_info(*pages_and_user_ids)
rtn = {}
for i, entry in enumerate(j): for i, entry in enumerate(j):
entry = entry["message_thread"] entry = entry["message_thread"]
if entry.get("thread_type") == "GROUP": if entry.get("thread_type") == "GROUP":
_id = entry["thread_key"]["thread_fbid"] _id = entry["thread_key"]["thread_fbid"]
rtn[_id] = GroupData._from_graphql(self.session, entry) yield _group.GroupData._from_graphql(self.session, entry)
elif entry.get("thread_type") == "ONE_TO_ONE": elif entry.get("thread_type") == "ONE_TO_ONE":
_id = entry["thread_key"]["other_user_id"] _id = entry["thread_key"]["other_user_id"]
if pages_and_users.get(_id) is None: if pages_and_users.get(_id) is None:
@@ -294,14 +287,12 @@ class Client:
) )
entry.update(pages_and_users[_id]) entry.update(pages_and_users[_id])
if "first_name" in entry: if "first_name" in entry:
rtn[_id] = UserData._from_graphql(self.session, entry) yield _user.UserData._from_graphql(self.session, entry)
else: else:
rtn[_id] = PageData._from_graphql(self.session, entry) yield _page.PageData._from_graphql(self.session, entry)
else: else:
raise _exception.ParseError("Unknown thread type", data=entry) raise _exception.ParseError("Unknown thread type", data=entry)
return rtn
def _fetch_threads(self, limit, before, folders): def _fetch_threads(self, limit, before, folders):
params = { params = {
"limit": limit, "limit": limit,
@@ -318,16 +309,18 @@ class Client:
for node in j["viewer"]["message_threads"]["nodes"]: for node in j["viewer"]["message_threads"]["nodes"]:
_type = node.get("thread_type") _type = node.get("thread_type")
if _type == "GROUP": if _type == "GROUP":
rtn.append(GroupData._from_graphql(self.session, node)) rtn.append(_group.GroupData._from_graphql(self.session, node))
elif _type == "ONE_TO_ONE": elif _type == "ONE_TO_ONE":
rtn.append(UserData._from_thread_fetch(self.session, node)) rtn.append(_user.UserData._from_thread_fetch(self.session, node))
else: else:
rtn.append(None) rtn.append(None)
log.warning("Unknown thread type: %s, data: %s", _type, node) log.warning("Unknown thread type: %s, data: %s", _type, node)
return rtn return rtn
def fetch_threads( def fetch_threads(
self, limit: Optional[int], location: ThreadLocation = ThreadLocation.INBOX, self,
limit: Optional[int],
location: _thread.ThreadLocation = _thread.ThreadLocation.INBOX,
) -> Iterable[_thread.ThreadABC]: ) -> Iterable[_thread.ThreadABC]:
"""Fetch the client's thread list. """Fetch the client's thread list.
@@ -340,7 +333,7 @@ class Client:
MAX_BATCH_LIMIT = 100 MAX_BATCH_LIMIT = 100
# TODO: Clean this up after implementing support for more threads types # TODO: Clean this up after implementing support for more threads types
seen_ids = set() seen_ids = set() # type: Set[str]
before = None before = None
for limit in _util.get_limits(limit, MAX_BATCH_LIMIT): for limit in _util.get_limits(limit, MAX_BATCH_LIMIT):
threads = self._fetch_threads(limit, before, [location.value]) threads = self._fetch_threads(limit, before, [location.value])
@@ -361,11 +354,11 @@ class Client:
if not before: if not before:
raise ValueError("Too many unknown threads.") raise ValueError("Too many unknown threads.")
def fetch_unread(self): def fetch_unread(self) -> Sequence[_thread.ThreadABC]:
"""Fetch unread threads. """Fetch unread threads.
Returns: Warning:
list: List of unread thread ids This is not finished, and the API may change at any point!
""" """
form = { form = {
"folders[0]": "inbox", "folders[0]": "inbox",
@@ -376,27 +369,39 @@ class Client:
j = self.session._payload_post("/ajax/mercury/unread_threads.php", form) j = self.session._payload_post("/ajax/mercury/unread_threads.php", form)
result = j["unread_thread_fbids"][0] result = j["unread_thread_fbids"][0]
return result["thread_fbids"] + result["other_user_fbids"] # TODO: Parse Pages?
return [
_group.Group(session=self.session, id=id_) for id_ in result["thread_fbids"]
] + [
_user.User(session=self.session, id=id_)
for id_ in result["other_user_fbids"]
]
def fetch_unseen(self): def fetch_unseen(self) -> Sequence[_thread.ThreadABC]:
"""Fetch unseen / new threads. """Fetch unseen / new threads.
Returns: Warning:
list: List of unseen thread ids This is not finished, and the API may change at any point!
""" """
j = self.session._payload_post("/mercury/unseen_thread_ids/", {}) j = self.session._payload_post("/mercury/unseen_thread_ids/", {})
result = j["unseen_thread_fbids"][0] result = j["unseen_thread_fbids"][0]
return result["thread_fbids"] + result["other_user_fbids"] # TODO: Parse Pages?
return [
_group.Group(session=self.session, id=id_) for id_ in result["thread_fbids"]
] + [
_user.User(session=self.session, id=id_)
for id_ in result["other_user_fbids"]
]
def fetch_image_url(self, image_id): def fetch_image_url(self, image_id: str) -> str:
"""Fetch URL to download the original image from an image attachment ID. """Fetch URL to download the original image from an image attachment ID.
Args: Args:
image_id (str): The image you want to fetch image_id: The image you want to fetch
Returns: Returns:
str: An URL where you can download the original image An URL where you can download the original image
""" """
image_id = str(image_id) image_id = str(image_id)
data = {"photo_id": str(image_id)} data = {"photo_id": str(image_id)}
@@ -414,77 +419,67 @@ class Client:
) )
return j["viewer"] return j["viewer"]
def get_phone_numbers(self): def get_phone_numbers(self) -> Sequence[str]:
"""Fetch list of user's phone numbers. """Fetch the user's phone numbers."""
Returns:
list: List of phone numbers
"""
data = self._get_private_data() data = self._get_private_data()
return [ return [
j["phone_number"]["universal_number"] for j in data["user"]["all_phones"] j["phone_number"]["universal_number"] for j in data["user"]["all_phones"]
] ]
def get_emails(self): def get_emails(self) -> Sequence[str]:
"""Fetch list of user's emails. """Fetch the user's emails."""
Returns:
list: List of emails
"""
data = self._get_private_data() data = self._get_private_data()
return [j["display_email"] for j in data["all_emails"]] return [j["display_email"] for j in data["all_emails"]]
def mark_as_delivered(self, thread_id, message_id): def mark_as_delivered(self, message: _message.Message):
"""Mark a message as delivered. """Mark a message as delivered.
Warning:
This is not finished, and the API may change at any point!
Args: Args:
thread_id: User/Group ID to which the message belongs. See :ref:`intro_threads` message: The message to set as delivered
message_id: Message ID to set as delivered. See :ref:`intro_threads`
""" """
data = { data = {
"message_ids[0]": message_id, "message_ids[0]": message.id,
"thread_ids[%s][0]" % thread_id: message_id, "thread_ids[%s][0]" % message.thread.id: message.id,
} }
j = self.session._payload_post("/ajax/mercury/delivery_receipts.php", data) j = self.session._payload_post("/ajax/mercury/delivery_receipts.php", data)
return True
def _read_status(self, read, thread_ids, timestamp=None):
thread_ids = _util.require_list(thread_ids)
def _read_status(self, read, threads, at):
data = { data = {
"watermarkTimestamp": _util.datetime_to_millis(timestamp) "watermarkTimestamp": _util.datetime_to_millis(at),
if timestamp
else _util.now(),
"shouldSendReadReceipt": "true", "shouldSendReadReceipt": "true",
} }
for thread_id in thread_ids: for threads in threads:
data["ids[{}]".format(thread_id)] = "true" if read else "false" data["ids[{}]".format(thread.id)] = "true" if read else "false"
j = self.session._payload_post("/ajax/mercury/change_read_status.php", data) j = self.session._payload_post("/ajax/mercury/change_read_status.php", data)
def mark_as_read(self, thread_ids=None, timestamp=None): def mark_as_read(self, threads: Iterable[_thread.ThreadABC], at: datetime.datetime):
"""Mark threads as read. """Mark threads as read.
All messages inside the specified threads will be marked as read. All messages inside the specified threads will be marked as read.
Args: Args:
thread_ids: User/Group IDs to set as read. See :ref:`intro_threads` threads: Threads to set as read
timestamp: Timestamp (as a Datetime) to signal the read cursor at, default is the current time at: Timestamp to signal the read cursor at
""" """
self._read_status(True, thread_ids, timestamp) return self._read_status(True, threads, at)
def mark_as_unread(self, thread_ids=None, timestamp=None): def mark_as_unread(
self, threads: Iterable[_thread.ThreadABC], at: datetime.datetime
):
"""Mark threads as unread. """Mark threads as unread.
All messages inside the specified threads will be marked as unread. All messages inside the specified threads will be marked as unread.
Args: Args:
thread_ids: User/Group IDs to set as unread. See :ref:`intro_threads` threads: Threads to set as unread
timestamp: Timestamp (as a Datetime) to signal the read cursor at, default is the current time at: Timestam to signal the read cursor at
""" """
self._read_status(False, thread_ids, timestamp) return self._read_status(False, threads, at)
def mark_as_seen(self): def mark_as_seen(self):
""" """
@@ -495,24 +490,24 @@ class Client:
"/ajax/mercury/mark_seen.php", {"seen_timestamp": _util.now()} "/ajax/mercury/mark_seen.php", {"seen_timestamp": _util.now()}
) )
def move_threads(self, location, thread_ids): def move_threads(
self, location: _thread.ThreadLocation, threads: Iterable[_thread.ThreadABC]
):
"""Move threads to specified location. """Move threads to specified location.
Args: Args:
location (ThreadLocation): INBOX, PENDING, ARCHIVED or OTHER location: INBOX, PENDING, ARCHIVED or OTHER
thread_ids: Thread IDs to move. See :ref:`intro_threads` threads: Threads to move
""" """
thread_ids = _util.require_list(thread_ids) if location == _thread.ThreadLocation.PENDING:
location = _thread.ThreadLocation.OTHER
if location == ThreadLocation.PENDING: if location == _thread.ThreadLocation.ARCHIVED:
location = ThreadLocation.OTHER data_archive = {}
data_unpin = {}
if location == ThreadLocation.ARCHIVED: for thread in threads:
data_archive = dict() data_archive["ids[{}]".format(thread.id)] = "true"
data_unpin = dict() data_unpin["ids[{}]".format(thread.id)] = "false"
for thread_id in thread_ids:
data_archive["ids[{}]".format(thread_id)] = "true"
data_unpin["ids[{}]".format(thread_id)] = "false"
j_archive = self.session._payload_post( j_archive = self.session._payload_post(
"/ajax/mercury/change_archived_status.php?dpr=1", data_archive "/ajax/mercury/change_archived_status.php?dpr=1", data_archive
) )
@@ -520,42 +515,32 @@ class Client:
"/ajax/mercury/change_pinned_status.php?dpr=1", data_unpin "/ajax/mercury/change_pinned_status.php?dpr=1", data_unpin
) )
else: else:
data = dict() data = {}
for i, thread_id in enumerate(thread_ids): for i, thread in enumerate(threads):
data["{}[{}]".format(location.name.lower(), i)] = thread_id data["{}[{}]".format(location.name.lower(), i)] = thread.id
j = self.session._payload_post("/ajax/mercury/move_thread.php", data) j = self.session._payload_post("/ajax/mercury/move_thread.php", data)
return True
def delete_threads(self, thread_ids): def delete_threads(self, threads: Iterable[_thread.ThreadABC]):
"""Delete threads. """Delete threads."""
data_unpin = {}
Args: data_delete = {}
thread_ids: Thread IDs to delete. See :ref:`intro_threads` for i, thread in enumerate(threads):
""" data_unpin["ids[{}]".format(thread.id)] = "false"
thread_ids = _util.require_list(thread_ids) data_delete["ids[{}]".format(i)] = thread.id
data_unpin = dict()
data_delete = dict()
for i, thread_id in enumerate(thread_ids):
data_unpin["ids[{}]".format(thread_id)] = "false"
data_delete["ids[{}]".format(i)] = thread_id
j_unpin = self.session._payload_post( j_unpin = self.session._payload_post(
"/ajax/mercury/change_pinned_status.php?dpr=1", data_unpin "/ajax/mercury/change_pinned_status.php?dpr=1", data_unpin
) )
j_delete = self.session._payload_post( j_delete = self.session._payload_post(
"/ajax/mercury/delete_thread.php?dpr=1", data_delete "/ajax/mercury/delete_thread.php?dpr=1", data_delete
) )
return True
def delete_messages(self, message_ids): def delete_messages(self, messages: Iterable[_message.Message]):
"""Delete specified messages. """Delete specified messages.
Args: Args:
message_ids: Message IDs to delete messages: Messages to delete
""" """
message_ids = _util.require_list(message_ids) data = {}
data = dict() for i, message in enumerate(messages):
for i, message_id in enumerate(message_ids): data["message_ids[{}]".format(i)] = message.id
data["message_ids[{}]".format(i)] = message_id
j = self.session._payload_post("/ajax/mercury/delete_messages.php?dpr=1", data) j = self.session._payload_post("/ajax/mercury/delete_messages.php?dpr=1", data)
return True

View File

@@ -3,6 +3,8 @@ import abc
from ._core import kw_only from ._core import kw_only
from . import _exception, _util, _thread, _group, _user, _message from . import _exception, _util, _thread, _group, _user, _message
from typing import Any
#: Default attrs settings for events #: Default attrs settings for events
attrs_event = attr.s(slots=True, kw_only=kw_only, frozen=True) attrs_event = attr.s(slots=True, kw_only=kw_only, frozen=True)
@@ -22,9 +24,9 @@ class UnknownEvent(Event):
"""Represent an unknown event.""" """Represent an unknown event."""
#: Some data describing the unknown event's origin #: Some data describing the unknown event's origin
source = attr.ib() source = attr.ib(type=str)
#: The unknown data. This cannot be relied on, it's only for debugging purposes. #: The unknown data. This cannot be relied on, it's only for debugging purposes.
data = attr.ib() data = attr.ib(type=Any)
@classmethod @classmethod
def _parse(cls, session, data): def _parse(cls, session, data):

View File

@@ -1,6 +1,8 @@
import attr import attr
import requests import requests
from typing import Any
# Not frozen, since that doesn't work in PyPy # Not frozen, since that doesn't work in PyPy
attrs_exception = attr.s(slots=True, auto_exc=True) attrs_exception = attr.s(slots=True, auto_exc=True)
@@ -36,7 +38,7 @@ class ParseError(FacebookError):
This may contain sensitive data, so should not be logged to file. This may contain sensitive data, so should not be logged to file.
""" """
data = attr.ib() data = attr.ib(type=Any)
"""The data that triggered the error. """The data that triggered the error.
The format of this cannot be relied on, it's only for debugging purposes. The format of this cannot be relied on, it's only for debugging purposes.

View File

@@ -1,21 +1,24 @@
import attr import attr
import datetime
from ._core import attrs_default, Image from ._core import attrs_default, Image
from . import _util from . import _util
from ._attachment import Attachment from ._attachment import Attachment
from typing import Set
@attrs_default @attrs_default
class FileAttachment(Attachment): class FileAttachment(Attachment):
"""Represents a file that has been sent as a Facebook attachment.""" """Represents a file that has been sent as a Facebook attachment."""
#: URL where you can download the file #: URL where you can download the file
url = attr.ib(None) url = attr.ib(None, type=str)
#: Size of the file in bytes #: Size of the file in bytes
size = attr.ib(None) size = attr.ib(None, type=int)
#: Name of the file #: Name of the file
name = attr.ib(None) name = attr.ib(None, type=str)
#: Whether Facebook determines that this file may be harmful #: Whether Facebook determines that this file may be harmful
is_malicious = attr.ib(None) is_malicious = attr.ib(None, type=bool)
@classmethod @classmethod
def _from_graphql(cls, data, size=None): def _from_graphql(cls, data, size=None):
@@ -33,13 +36,13 @@ class AudioAttachment(Attachment):
"""Represents an audio file that has been sent as a Facebook attachment.""" """Represents an audio file that has been sent as a Facebook attachment."""
#: Name of the file #: Name of the file
filename = attr.ib(None) filename = attr.ib(None, type=str)
#: URL of the audio file #: URL of the audio file
url = attr.ib(None) url = attr.ib(None, type=str)
#: Duration of the audio clip as a timedelta #: Duration of the audio clip as a timedelta
duration = attr.ib(None) duration = attr.ib(None, type=datetime.timedelta)
#: Audio type #: Audio type
audio_type = attr.ib(None) audio_type = attr.ib(None, type=str)
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):
@@ -60,15 +63,15 @@ class ImageAttachment(Attachment):
""" """
#: The extension of the original image (e.g. ``png``) #: The extension of the original image (e.g. ``png``)
original_extension = attr.ib(None) original_extension = attr.ib(None, type=str)
#: Width of original image #: Width of original image
width = attr.ib(None, converter=lambda x: None if x is None else int(x)) width = attr.ib(None, converter=lambda x: None if x is None else int(x), type=int)
#: Height of original image #: Height of original image
height = attr.ib(None, converter=lambda x: None if x is None else int(x)) height = attr.ib(None, converter=lambda x: None if x is None else int(x), type=int)
#: Whether the image is animated #: Whether the image is animated
is_animated = attr.ib(None) is_animated = attr.ib(None, type=bool)
#: A set, containing variously sized / various types of previews of the image #: A set, containing variously sized / various types of previews of the image
previews = attr.ib(factory=set) previews = attr.ib(factory=set, type=Set[Image])
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):
@@ -110,17 +113,17 @@ class VideoAttachment(Attachment):
"""Represents a video that has been sent as a Facebook attachment.""" """Represents a video that has been sent as a Facebook attachment."""
#: Size of the original video in bytes #: Size of the original video in bytes
size = attr.ib(None) size = attr.ib(None, type=int)
#: Width of original video #: Width of original video
width = attr.ib(None) width = attr.ib(None, type=int)
#: Height of original video #: Height of original video
height = attr.ib(None) height = attr.ib(None, type=int)
#: Length of video as a timedelta #: Length of video as a timedelta
duration = attr.ib(None) duration = attr.ib(None, type=datetime.timedelta)
#: URL to very compressed preview video #: URL to very compressed preview video
preview_url = attr.ib(None) preview_url = attr.ib(None, type=str)
#: A set, containing variously sized previews of the video #: A set, containing variously sized previews of the video
previews = attr.ib(factory=set) previews = attr.ib(factory=set, type=Set[Image])
@classmethod @classmethod
def _from_graphql(cls, data, size=None): def _from_graphql(cls, data, size=None):

View File

@@ -1,7 +1,8 @@
import attr import attr
import datetime
from ._core import attrs_default, Image from ._core import attrs_default, Image
from . import _util, _session, _graphql, _plan, _thread, _user from . import _util, _session, _graphql, _plan, _thread, _user
from typing import Sequence, Iterable from typing import Sequence, Iterable, Set, Mapping
@attrs_default @attrs_default
@@ -11,7 +12,7 @@ class Group(_thread.ThreadABC):
#: The session to use when making requests. #: The session to use when making requests.
session = attr.ib(type=_session.Session) session = attr.ib(type=_session.Session)
#: The group's unique identifier. #: The group's unique identifier.
id = attr.ib(converter=str) id = attr.ib(converter=str, type=str)
def _to_send_data(self): def _to_send_data(self):
return {"thread_fbid": self.id} return {"thread_fbid": self.id}
@@ -137,31 +138,31 @@ class GroupData(Group):
""" """
#: The group's picture #: The group's picture
photo = attr.ib(None) photo = attr.ib(None, type=Image)
#: The name of the group #: The name of the group
name = attr.ib(None) name = attr.ib(None, type=str)
#: Datetime when the group was last active / when the last message was sent #: When the group was last active / when the last message was sent
last_active = attr.ib(None) last_active = attr.ib(None, type=datetime.datetime)
#: Number of messages in the group #: Number of messages in the group
message_count = attr.ib(None) message_count = attr.ib(None, type=int)
#: Set `Plan` #: Set `Plan`
plan = attr.ib(None) plan = attr.ib(None, type=_plan.PlanData)
#: Unique list (set) of the group thread's participant user IDs #: The group thread's participant user ids
participants = attr.ib(factory=set) participants = attr.ib(factory=set, type=Set[str])
#: A dictionary, containing user nicknames mapped to their IDs #: A dictionary, containing user nicknames mapped to their IDs
nicknames = attr.ib(factory=dict) nicknames = attr.ib(factory=dict, type=Mapping[str, str])
#: The groups's message color #: The groups's message color
color = attr.ib(None) color = attr.ib(None, type=str)
#: The groups's default emoji #: The groups's default emoji
emoji = attr.ib(None) emoji = attr.ib(None, type=str)
# Set containing user IDs of thread admins # User ids of thread admins
admins = attr.ib(factory=set) admins = attr.ib(factory=set, type=Set[str])
# True if users need approval to join # True if users need approval to join
approval_mode = attr.ib(None) approval_mode = attr.ib(None, type=bool)
# Set containing user IDs requesting to join # Set containing user IDs requesting to join
approval_requests = attr.ib(factory=set) approval_requests = attr.ib(factory=set, type=Set[str])
# Link for joining group # Link for joining group
join_link = attr.ib(None) join_link = attr.ib(None, type=str)
@classmethod @classmethod
def _from_graphql(cls, session, data): def _from_graphql(cls, session, data):

View File

@@ -12,15 +12,15 @@ class LocationAttachment(Attachment):
""" """
#: Latitude of the location #: Latitude of the location
latitude = attr.ib(None) latitude = attr.ib(None, type=float)
#: Longitude of the location #: Longitude of the location
longitude = attr.ib(None) longitude = attr.ib(None, type=float)
#: Image showing the map of the location #: Image showing the map of the location
image = attr.ib(None) image = attr.ib(None, type=Image)
#: URL to Bing maps with the location #: URL to Bing maps with the location
url = attr.ib(None) url = attr.ib(None, type=str)
# Address of the location # Address of the location
address = attr.ib(None) address = attr.ib(None, type=str)
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):

View File

@@ -1,4 +1,5 @@
import attr import attr
import datetime
import enum import enum
from string import Formatter from string import Formatter
from ._core import log, attrs_default from ._core import log, attrs_default
@@ -11,8 +12,9 @@ from . import (
_file, _file,
_quick_reply, _quick_reply,
_sticker, _sticker,
_thread,
) )
from typing import Optional from typing import Optional, Mapping, Sequence
class EmojiSize(enum.Enum): class EmojiSize(enum.Enum):
@@ -44,11 +46,11 @@ class Mention:
"""Represents a ``@mention``.""" """Represents a ``@mention``."""
#: The thread ID the mention is pointing at #: The thread ID the mention is pointing at
thread_id = attr.ib() thread_id = attr.ib(type=str)
#: The character where the mention starts #: The character where the mention starts
offset = attr.ib() offset = attr.ib(type=int)
#: The length of the mention #: The length of the mention
length = attr.ib() length = attr.ib(type=int)
@classmethod @classmethod
def _from_range(cls, data): def _from_range(cls, data):
@@ -85,7 +87,7 @@ class Message:
#: The thread that this message belongs to. #: The thread that this message belongs to.
thread = attr.ib(type="_thread.ThreadABC") thread = attr.ib(type="_thread.ThreadABC")
#: The message ID. #: The message ID.
id = attr.ib(converter=str) id = attr.ib(converter=str, type=str)
@property @property
def session(self): def session(self):
@@ -189,13 +191,13 @@ class MessageSnippet(Message):
""" """
#: ID of the sender #: ID of the sender
author = attr.ib() author = attr.ib(type=str)
#: Datetime of when the message was sent #: Datetime of when the message was sent
created_at = attr.ib() created_at = attr.ib(type=datetime.datetime)
#: The actual message #: The actual message
text = attr.ib() text = attr.ib(type=str)
#: A dict with offsets, mapped to the matched text #: A dict with offsets, mapped to the matched text
matched_keywords = attr.ib() matched_keywords = attr.ib(type=Mapping[int, str])
@classmethod @classmethod
def _parse(cls, thread, data): def _parse(cls, thread, data):
@@ -217,35 +219,35 @@ class MessageData(Message):
""" """
#: ID of the sender #: ID of the sender
author = attr.ib() author = attr.ib(type=str)
#: Datetime of when the message was sent #: Datetime of when the message was sent
created_at = attr.ib() created_at = attr.ib(type=datetime.datetime)
#: The actual message #: The actual message
text = attr.ib(None) text = attr.ib(None, type=str)
#: A list of `Mention` objects #: A list of `Mention` objects
mentions = attr.ib(factory=list) mentions = attr.ib(factory=list, type=Sequence[Mention])
#: A `EmojiSize`. Size of a sent emoji #: Size of a sent emoji
emoji_size = attr.ib(None) emoji_size = attr.ib(None, type=EmojiSize)
#: Whether the message is read #: Whether the message is read
is_read = attr.ib(None) is_read = attr.ib(None, type=bool)
#: A list of people IDs who read the message, works only with `Client.fetch_thread_messages` #: A list of people IDs who read the message, works only with `Client.fetch_thread_messages`
read_by = attr.ib(factory=list) read_by = attr.ib(factory=list, type=bool)
#: A dictionary with user's IDs as keys, and their reaction as values #: A dictionary with user's IDs as keys, and their reaction as values
reactions = attr.ib(factory=dict) reactions = attr.ib(factory=dict, type=Mapping[str, str])
#: A `Sticker` #: A `Sticker`
sticker = attr.ib(None) sticker = attr.ib(None, type=_sticker.Sticker)
#: A list of attachments #: A list of attachments
attachments = attr.ib(factory=list) attachments = attr.ib(factory=list, type=Sequence[_attachment.Attachment])
#: A list of `QuickReply` #: A list of `QuickReply`
quick_replies = attr.ib(factory=list) quick_replies = attr.ib(factory=list, type=Sequence[_quick_reply.QuickReply])
#: Whether the message is unsent (deleted for everyone) #: Whether the message is unsent (deleted for everyone)
unsent = attr.ib(False) unsent = attr.ib(False, type=bool)
#: Message ID you want to reply to #: Message ID you want to reply to
reply_to_id = attr.ib(None) reply_to_id = attr.ib(None, type=str)
#: Replied message #: Replied message
replied_to = attr.ib(None) replied_to = attr.ib(None, type="MessageData")
#: Whether the message was forwarded #: Whether the message was forwarded
forwarded = attr.ib(False) forwarded = attr.ib(False, type=bool)
@staticmethod @staticmethod
def _get_forwarded_from_tags(tags): def _get_forwarded_from_tags(tags):

View File

@@ -5,7 +5,7 @@ import requests
from ._core import log, attrs_default from ._core import log, attrs_default
from . import _util, _exception, _session, _graphql, _event_common, _event from . import _util, _exception, _session, _graphql, _event_common, _event
from typing import Iterable from typing import Iterable, Optional
def get_cookie_header(session: requests.Session, url: str) -> str: def get_cookie_header(session: requests.Session, url: str) -> str:
@@ -25,18 +25,18 @@ def generate_session_id() -> int:
class Listener: class Listener:
"""Helper, to listen for incoming Facebook events.""" """Helper, to listen for incoming Facebook events."""
_session = attr.ib(type=_session.Session) session = attr.ib(type=_session.Session)
_mqtt = attr.ib(type=paho.mqtt.client.Client) _mqtt = attr.ib(type=paho.mqtt.client.Client)
_chat_on = attr.ib(type=bool) _chat_on = attr.ib(type=bool)
_foreground = attr.ib(type=bool) _foreground = attr.ib(type=bool)
_sequence_id = attr.ib(type=int) _sequence_id = attr.ib(type=int)
_sync_token = attr.ib(None, type=str) _sync_token = attr.ib(None, type=str)
_events = attr.ib(None, type=Iterable[_event_common.Event]) _events = attr.ib(None, type=Optional[Iterable[_event_common.Event]])
_HOST = "edge-chat.facebook.com" _HOST = "edge-chat.facebook.com"
@classmethod @classmethod
def connect(cls, session, chat_on: bool, foreground: bool): def connect(cls, session: _session.Session, chat_on: bool, foreground: bool):
"""Initialize a connection to the Facebook MQTT service. """Initialize a connection to the Facebook MQTT service.
Args: Args:
@@ -123,7 +123,7 @@ class Listener:
" events may have been lost" " events may have been lost"
) )
self._sync_token = None self._sync_token = None
self._sequence_id = self._fetch_sequence_id(self._session) self._sequence_id = self._fetch_sequence_id(self.session)
self._messenger_queue_publish() self._messenger_queue_publish()
# TODO: Signal to the user that they should reload their data! # TODO: Signal to the user that they should reload their data!
return return
@@ -138,12 +138,12 @@ class Listener:
try: try:
# TODO: Don't handle this in a callback # TODO: Don't handle this in a callback
self._events = list(_event.parse_events(self._session, message.topic, j)) self._events = list(_event.parse_events(self.session, message.topic, j))
except _exception.ParseError: except _exception.ParseError:
log.exception("Failed parsing MQTT data") log.exception("Failed parsing MQTT data")
@staticmethod @staticmethod
def _fetch_sequence_id(session) -> int: def _fetch_sequence_id(session: _session.Session) -> int:
"""Fetch sequence ID.""" """Fetch sequence ID."""
params = { params = {
"limit": 1, "limit": 1,
@@ -179,7 +179,7 @@ class Listener:
"max_deltas_able_to_process": 1000, "max_deltas_able_to_process": 1000,
"delta_batch_size": 500, "delta_batch_size": 500,
"encoding": "JSON", "encoding": "JSON",
"entity_fbid": self._session.user_id, "entity_fbid": self.session.user_id,
} }
# If we don't have a sync_token, create a new messenger queue # If we don't have a sync_token, create a new messenger queue
@@ -239,7 +239,7 @@ class Listener:
username = { username = {
# The user ID # The user ID
"u": self._session.user_id, "u": self.session.user_id,
# Session ID # Session ID
"s": session_id, "s": session_id,
# Active status setting # Active status setting
@@ -247,7 +247,7 @@ class Listener:
# foreground_state - Whether the window is focused # foreground_state - Whether the window is focused
"fg": self._foreground, "fg": self._foreground,
# Can be any random ID # Can be any random ID
"d": self._session._client_id, "d": self.session._client_id,
# Application ID, taken from facebook.com # Application ID, taken from facebook.com
"aid": 219994525426954, "aid": 219994525426954,
# MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing
@@ -283,9 +283,9 @@ class Listener:
headers = { headers = {
# TODO: Make this access thread safe # TODO: Make this access thread safe
"Cookie": get_cookie_header( "Cookie": get_cookie_header(
self._session._session, "https://edge-chat.facebook.com/chat" self.session._session, "https://edge-chat.facebook.com/chat"
), ),
"User-Agent": self._session._session.headers["User-Agent"], "User-Agent": self.session._session.headers["User-Agent"],
"Origin": "https://www.facebook.com", "Origin": "https://www.facebook.com",
"Host": self._HOST, "Host": self._HOST,
} }

View File

@@ -1,4 +1,5 @@
import attr import attr
import datetime
from ._core import attrs_default, Image from ._core import attrs_default, Image
from . import _session, _plan, _thread from . import _session, _plan, _thread
@@ -10,7 +11,7 @@ class Page(_thread.ThreadABC):
#: The session to use when making requests. #: The session to use when making requests.
session = attr.ib(type=_session.Session) session = attr.ib(type=_session.Session)
#: The unique identifier of the page. #: The unique identifier of the page.
id = attr.ib(converter=str) id = attr.ib(converter=str, type=str)
def _to_send_data(self): def _to_send_data(self):
return {"other_user_fbid": self.id} return {"other_user_fbid": self.id}
@@ -24,25 +25,25 @@ class PageData(Page):
""" """
#: The page's picture #: The page's picture
photo = attr.ib() photo = attr.ib(type=Image)
#: The name of the page #: The name of the page
name = attr.ib() name = attr.ib(type=str)
#: Datetime when the thread was last active / when the last message was sent #: When the thread was last active / when the last message was sent
last_active = attr.ib(None) last_active = attr.ib(None, type=datetime.datetime)
#: Number of messages in the thread #: Number of messages in the thread
message_count = attr.ib(None) message_count = attr.ib(None, type=int)
#: Set `Plan` #: Set `Plan`
plan = attr.ib(None) plan = attr.ib(None, type=_plan.PlanData)
#: The page's custom URL #: The page's custom URL
url = attr.ib(None) url = attr.ib(None, type=str)
#: The name of the page's location city #: The name of the page's location city
city = attr.ib(None) city = attr.ib(None, type=str)
#: Amount of likes the page has #: Amount of likes the page has
likes = attr.ib(None) likes = attr.ib(None, type=int)
#: Some extra information about the page #: Some extra information about the page
sub_title = attr.ib(None) sub_title = attr.ib(None, type=str)
#: The page's category #: The page's category
category = attr.ib(None) category = attr.ib(None, type=str)
@classmethod @classmethod
def _from_graphql(cls, session, data): def _from_graphql(cls, session, data):

View File

@@ -4,6 +4,8 @@ import enum
from ._core import attrs_default from ._core import attrs_default
from . import _exception, _util, _session from . import _exception, _util, _session
from typing import Mapping, Sequence
class GuestStatus(enum.Enum): class GuestStatus(enum.Enum):
INVITED = 1 INVITED = 1
@@ -25,7 +27,7 @@ class Plan:
#: The session to use when making requests. #: The session to use when making requests.
session = attr.ib(type=_session.Session) session = attr.ib(type=_session.Session)
#: The plan's unique identifier. #: The plan's unique identifier.
id = attr.ib(converter=str) id = attr.ib(converter=str, type=str)
def fetch(self) -> "PlanData": def fetch(self) -> "PlanData":
"""Fetch fresh `PlanData` object.""" """Fetch fresh `PlanData` object."""
@@ -92,32 +94,32 @@ class Plan:
def participate(self): def participate(self):
"""Set yourself as GOING/participating to the plan.""" """Set yourself as GOING/participating to the plan."""
self._change_participation(True) return self._change_participation(True)
def decline(self): def decline(self):
"""Set yourself as having DECLINED the plan.""" """Set yourself as having DECLINED the plan."""
self._change_participation(False) return self._change_participation(False)
@attrs_default @attrs_default
class PlanData(Plan): class PlanData(Plan):
"""Represents data about a plan.""" """Represents data about a plan."""
#: Plan time (datetime), only precise down to the minute #: Plan time, only precise down to the minute
time = attr.ib() time = attr.ib(type=datetime.datetime)
#: Plan title #: Plan title
title = attr.ib() title = attr.ib(type=str)
#: Plan location name #: Plan location name
location = attr.ib(None, converter=lambda x: x or "") location = attr.ib(None, converter=lambda x: x or "", type=str)
#: Plan location ID #: Plan location ID
location_id = attr.ib(None, converter=lambda x: x or "") location_id = attr.ib(None, converter=lambda x: x or "", type=str)
#: ID of the plan creator #: ID of the plan creator
author_id = attr.ib(None) author_id = attr.ib(None, type=str)
#: Dictionary of `User` IDs mapped to their `GuestStatus` #: `User` ids mapped to their `GuestStatus`
guests = attr.ib(None) guests = attr.ib(None, type=Mapping[str, GuestStatus])
@property @property
def going(self): def going(self) -> Sequence[str]:
"""List of the `User` IDs who will take part in the plan.""" """List of the `User` IDs who will take part in the plan."""
return [ return [
id_ id_
@@ -126,7 +128,7 @@ class PlanData(Plan):
] ]
@property @property
def declined(self): def declined(self) -> Sequence[str]:
"""List of the `User` IDs who won't take part in the plan.""" """List of the `User` IDs who won't take part in the plan."""
return [ return [
id_ id_
@@ -135,7 +137,7 @@ class PlanData(Plan):
] ]
@property @property
def invited(self): def invited(self) -> Sequence[str]:
"""List of the `User` IDs who are invited to the plan.""" """List of the `User` IDs who are invited to the plan."""
return [ return [
id_ id_

View File

@@ -2,19 +2,21 @@ import attr
from ._core import attrs_default from ._core import attrs_default
from ._attachment import Attachment from ._attachment import Attachment
from typing import Any
@attrs_default @attrs_default
class QuickReply: class QuickReply:
"""Represents a quick reply.""" """Represents a quick reply."""
#: Payload of the quick reply #: Payload of the quick reply
payload = attr.ib(None) payload = attr.ib(None, type=Any)
#: External payload for responses #: External payload for responses
external_payload = attr.ib(None) external_payload = attr.ib(None, type=Any)
#: Additional data #: Additional data
data = attr.ib(None) data = attr.ib(None, type=Any)
#: Whether it's a response for a quick reply #: Whether it's a response for a quick reply
is_response = attr.ib(False) is_response = attr.ib(False, type=bool)
@attrs_default @attrs_default
@@ -22,9 +24,9 @@ class QuickReplyText(QuickReply):
"""Represents a text quick reply.""" """Represents a text quick reply."""
#: Title of the quick reply #: Title of the quick reply
title = attr.ib(None) title = attr.ib(None, type=str)
#: URL of the quick reply image (optional) #: URL of the quick reply image (optional)
image_url = attr.ib(None) image_url = attr.ib(None, type=str)
#: Type of the quick reply #: Type of the quick reply
_type = "text" _type = "text"
@@ -42,7 +44,7 @@ class QuickReplyPhoneNumber(QuickReply):
"""Represents a phone number quick reply (Doesn't work on mobile).""" """Represents a phone number quick reply (Doesn't work on mobile)."""
#: URL of the quick reply image (optional) #: URL of the quick reply image (optional)
image_url = attr.ib(None) image_url = attr.ib(None, type=str)
#: Type of the quick reply #: Type of the quick reply
_type = "user_phone_number" _type = "user_phone_number"
@@ -52,7 +54,7 @@ class QuickReplyEmail(QuickReply):
"""Represents an email quick reply (Doesn't work on mobile).""" """Represents an email quick reply (Doesn't work on mobile)."""
#: URL of the quick reply image (optional) #: URL of the quick reply image (optional)
image_url = attr.ib(None) image_url = attr.ib(None, type=str)
#: Type of the quick reply #: Type of the quick reply
_type = "user_email" _type = "user_email"

View File

@@ -8,10 +8,12 @@ import urllib.parse
from ._core import log, kw_only from ._core import log, kw_only
from . import _graphql, _util, _exception from . import _graphql, _util, _exception
from typing import Optional, Tuple, Mapping, BinaryIO, Sequence, Iterable, Callable
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"') FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
def get_user_id(session): def get_user_id(session: requests.Session) -> str:
# TODO: Optimize this `.get_dict()` call! # TODO: Optimize this `.get_dict()` call!
cookies = session.cookies.get_dict() cookies = session.cookies.get_dict()
rtn = cookies.get("c_user") rtn = cookies.get("c_user")
@@ -20,11 +22,11 @@ def get_user_id(session):
return str(rtn) return str(rtn)
def find_input_fields(html): def find_input_fields(html: str):
return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input")) return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input"))
def session_factory(): def session_factory() -> requests.Session:
session = requests.session() session = requests.session()
session.headers["Referer"] = "https://www.facebook.com" session.headers["Referer"] = "https://www.facebook.com"
# TODO: Deprecate setting the user agent manually # TODO: Deprecate setting the user agent manually
@@ -32,27 +34,27 @@ def session_factory():
return session return session
def client_id_factory(): def client_id_factory() -> str:
return hex(int(random.random() * 2 ** 31))[2:] return hex(int(random.random() * 2 ** 31))[2:]
def is_home(url): def is_home(url: str) -> bool:
parts = urllib.parse.urlparse(url) parts = urllib.parse.urlparse(url)
# Check the urls `/home.php` and `/` # Check the urls `/home.php` and `/`
return "home" in parts.path or "/" == parts.path return "home" in parts.path or "/" == parts.path
def _2fa_helper(session, code, r): def _2fa_helper(session: requests.Session, code: int, r):
soup = find_input_fields(r.text) soup = find_input_fields(r.text)
data = dict() data = dict()
url = "https://m.facebook.com/login/checkpoint/" url = "https://m.facebook.com/login/checkpoint/"
data["approvals_code"] = code data["approvals_code"] = str(code)
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"] data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"]
data["nh"] = soup.find("input", {"name": "nh"})["value"] data["nh"] = soup.find("input", {"name": "nh"})["value"]
data["submit[Submit Code]"] = "Submit Code" data["submit[Submit Code]"] = "Submit Code"
data["codes_submitted"] = 0 data["codes_submitted"] = "0"
log.info("Submitting 2FA code.") log.info("Submitting 2FA code.")
r = session.post(url, data=data) r = session.post(url, data=data)
@@ -99,15 +101,16 @@ def _2fa_helper(session, code, r):
return r return r
def get_error_data(html, url): def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]:
"""Get error code and message from a request.""" """Get error code and message from a request."""
try:
code = _util.get_url_parameter(url, "e")
except IndexError:
code = None code = None
try:
code = int(_util.get_url_parameter(url, "e"))
except (IndexError, ValueError):
pass
soup = bs4.BeautifulSoup( soup = bs4.BeautifulSoup(
html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error"), html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error")
) )
return code, soup.get_text() or None return code, soup.get_text() or None
@@ -119,20 +122,20 @@ class Session:
This is the main class, which is used to login to Facebook. This is the main class, which is used to login to Facebook.
""" """
_user_id = attr.ib() _user_id = attr.ib(type=str)
_fb_dtsg = attr.ib() _fb_dtsg = attr.ib(type=str)
_revision = attr.ib() _revision = attr.ib(type=int)
_session = attr.ib(factory=session_factory) _session = attr.ib(factory=session_factory, type=requests.Session)
_counter = attr.ib(0) _counter = attr.ib(0, type=int)
_client_id = attr.ib(factory=client_id_factory) _client_id = attr.ib(factory=client_id_factory, type=str)
_logout_h = attr.ib(None) _logout_h = attr.ib(None, type=str)
@property @property
def user_id(self): def user_id(self) -> str:
"""The logged in user's ID.""" """The logged in user's ID."""
return self._user_id return self._user_id
def __repr__(self): def __repr__(self) -> str:
# An alternative repr, to illustrate that you can't create the class directly # An alternative repr, to illustrate that you can't create the class directly
return "<fbchat.Session user_id={}>".format(self._user_id) return "<fbchat.Session user_id={}>".format(self._user_id)
@@ -146,7 +149,9 @@ class Session:
} }
@classmethod @classmethod
def login(cls, email, password, on_2fa_callback=None): def login(
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
):
"""Login the user, using ``email`` and ``password``. """Login the user, using ``email`` and ``password``.
Args: Args:
@@ -205,11 +210,11 @@ class Session:
"Login failed at url {!r}".format(r.url), msg, code=code "Login failed at url {!r}".format(r.url), msg, code=code
) )
def is_logged_in(self): def is_logged_in(self) -> bool:
"""Send a request to Facebook to check the login status. """Send a request to Facebook to check the login status.
Returns: Returns:
bool: Whether the user is still logged in Whether the user is still logged in
""" """
# Send a request to the login url, to see if we're directed to the home page # Send a request to the login url, to see if we're directed to the home page
url = "https://m.facebook.com/login.php?login_attempt=1" url = "https://m.facebook.com/login.php?login_attempt=1"
@@ -219,7 +224,7 @@ class Session:
_exception.handle_requests_error(e) _exception.handle_requests_error(e)
return "Location" in r.headers and is_home(r.headers["Location"]) return "Location" in r.headers and is_home(r.headers["Location"])
def logout(self): def logout(self) -> None:
"""Safely log out the user. """Safely log out the user.
The session object must not be used after this action has been performed! The session object must not be used after this action has been performed!
@@ -275,20 +280,20 @@ class Session:
logout_h=logout_h, logout_h=logout_h,
) )
def get_cookies(self): def get_cookies(self) -> Mapping[str, str]:
"""Retrieve session cookies, that can later be used in `from_cookies`. """Retrieve session cookies, that can later be used in `from_cookies`.
Returns: Returns:
dict: A dictionary containing session cookies A dictionary containing session cookies
""" """
return self._session.cookies.get_dict() return self._session.cookies.get_dict()
@classmethod @classmethod
def from_cookies(cls, cookies): def from_cookies(cls, cookies: Mapping[str, str]):
"""Load a session from session cookies. """Load a session from session cookies.
Args: Args:
cookies (dict): A dictionary containing session cookies cookies: A dictionary containing session cookies
""" """
session = session_factory() session = session_factory()
session.cookies = requests.cookies.merge_cookies(session.cookies, cookies) session.cookies = requests.cookies.merge_cookies(session.cookies, cookies)
@@ -331,7 +336,9 @@ class Session:
} }
return self._post("/api/graphqlbatch/", data, as_graphql=True) return self._post("/api/graphqlbatch/", data, as_graphql=True)
def _upload(self, files, voice_clip=False): def _upload(
self, files: Iterable[Tuple[str, BinaryIO, str]], voice_clip: bool = False
) -> Sequence[Tuple[str, str]]:
"""Upload files to Facebook. """Upload files to Facebook.
`files` should be a list of files that requests can upload, see `files` should be a list of files that requests can upload, see
@@ -347,7 +354,7 @@ class Session:
"https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict "https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict
) )
if len(j["metadata"]) != len(files): if len(j["metadata"]) != len(file_dict):
raise _exception.ParseError("Some files could not be uploaded", data=j) raise _exception.ParseError("Some files could not be uploaded", data=j)
return [ return [

View File

@@ -8,28 +8,28 @@ class Sticker(Attachment):
"""Represents a Facebook sticker that has been sent to a thread as an attachment.""" """Represents a Facebook sticker that has been sent to a thread as an attachment."""
#: The sticker-pack's ID #: The sticker-pack's ID
pack = attr.ib(None) pack = attr.ib(None, type=str)
#: Whether the sticker is animated #: Whether the sticker is animated
is_animated = attr.ib(False) is_animated = attr.ib(False, type=bool)
# If the sticker is animated, the following should be present # If the sticker is animated, the following should be present
#: URL to a medium spritemap #: URL to a medium spritemap
medium_sprite_image = attr.ib(None) medium_sprite_image = attr.ib(None, type=str)
#: URL to a large spritemap #: URL to a large spritemap
large_sprite_image = attr.ib(None) large_sprite_image = attr.ib(None, type=str)
#: The amount of frames present in the spritemap pr. row #: The amount of frames present in the spritemap pr. row
frames_per_row = attr.ib(None) frames_per_row = attr.ib(None, type=int)
#: The amount of frames present in the spritemap pr. column #: The amount of frames present in the spritemap pr. column
frames_per_col = attr.ib(None) frames_per_col = attr.ib(None, type=int)
#: The total amount of frames in the spritemap #: The total amount of frames in the spritemap
frame_count = attr.ib(None) frame_count = attr.ib(None, type=int)
#: The frame rate the spritemap is intended to be played in #: The frame rate the spritemap is intended to be played in
frame_rate = attr.ib(None) frame_rate = attr.ib(None, type=int)
#: The sticker's image #: The sticker's image
image = attr.ib(None) image = attr.ib(None, type=Image)
#: The sticker's label/name #: The sticker's label/name
label = attr.ib(None) label = attr.ib(None, type=str)
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):

View File

@@ -4,7 +4,7 @@ import collections
import datetime import datetime
import enum import enum
from ._core import log, attrs_default, Image from ._core import log, attrs_default, Image
from . import _util, _exception, _session, _graphql, _attachment, _file, _plan from . import _util, _exception, _session, _graphql, _attachment, _file, _plan, _message
from typing import MutableMapping, Mapping, Any, Iterable, Tuple, Optional from typing import MutableMapping, Mapping, Any, Iterable, Tuple, Optional
@@ -161,7 +161,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
data["sticker_id"] = sticker_id data["sticker_id"] = sticker_id
return self.session._do_send_request(data) return self.session._do_send_request(data)
def _send_location(self, current, latitude, longitude) -> str: def _send_location(self, current, latitude, longitude):
data = self._to_send_data() data = self._to_send_data()
data["action_type"] = "ma-type:user-generated-message" data["action_type"] = "ma-type:user-generated-message"
data["location_attachment[coordinates][latitude]"] = latitude data["location_attachment[coordinates][latitude]"] = latitude
@@ -214,11 +214,11 @@ class ThreadABC(metaclass=abc.ABCMeta):
# data["platform_xmd"] = _util.json_minimal(xmd) # data["platform_xmd"] = _util.json_minimal(xmd)
# TODO: This! # TODO: This!
# def quick_reply(self, quick_reply, payload=None): # def quick_reply(self, quick_reply: QuickReply, payload=None):
# """Reply to chosen quick reply. # """Reply to chosen quick reply.
# #
# Args: # Args:
# quick_reply (QuickReply): Quick reply to reply to # quick_reply: Quick reply to reply to
# payload: Optional answer to the quick reply # payload: Optional answer to the quick reply
# """ # """
# if isinstance(quick_reply, QuickReplyText): # if isinstance(quick_reply, QuickReplyText):
@@ -255,8 +255,6 @@ class ThreadABC(metaclass=abc.ABCMeta):
# return self.send(Message(text=payload, quick_replies=[new])) # return self.send(Message(text=payload, quick_replies=[new]))
def _search_messages(self, query, offset, limit): def _search_messages(self, query, offset, limit):
from . import _message
data = { data = {
"query": query, "query": query,
"snippetOffset": offset, "snippetOffset": offset,
@@ -279,7 +277,9 @@ class ThreadABC(metaclass=abc.ABCMeta):
] ]
return (result["num_total_snippets"], snippets) return (result["num_total_snippets"], snippets)
def search_messages(self, query: str, limit: int) -> Iterable["MessageSnippet"]: def search_messages(
self, query: str, limit: int
) -> Iterable["_message.MessageSnippet"]:
"""Find and get message IDs by query. """Find and get message IDs by query.
Warning! If someone send a message to the thread that matches the query, while Warning! If someone send a message to the thread that matches the query, while
@@ -301,8 +301,6 @@ class ThreadABC(metaclass=abc.ABCMeta):
offset += limit offset += limit
def _fetch_messages(self, limit, before): def _fetch_messages(self, limit, before):
from . import _message
params = { params = {
"id": self.id, "id": self.id,
"message_limit": limit, "message_limit": limit,
@@ -385,7 +383,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
# result["page_info"]["has_next_page"] is not correct when limit > 12 # result["page_info"]["has_next_page"] is not correct when limit > 12
return (result["page_info"]["end_cursor"], rtn) return (result["page_info"]["end_cursor"], rtn)
def fetch_images(self, limit: int) -> Iterable[_attachment.Attachment]: def fetch_images(self, limit: Optional[int]) -> Iterable[_attachment.Attachment]:
"""Fetch images/videos posted in the thread. """Fetch images/videos posted in the thread.
Args: Args:
@@ -474,7 +472,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
"/messaging/save_thread_emoji/?source=thread_settings&dpr=1", data "/messaging/save_thread_emoji/?source=thread_settings&dpr=1", data
) )
def forward_attachment(self, attachment_id): def forward_attachment(self, attachment_id: str):
"""Forward an attachment. """Forward an attachment.
Args: Args:
@@ -690,7 +688,7 @@ class Thread(ThreadABC):
#: The session to use when making requests. #: The session to use when making requests.
session = attr.ib(type=_session.Session) session = attr.ib(type=_session.Session)
#: The unique identifier of the thread. #: The unique identifier of the thread.
id = attr.ib(converter=str) id = attr.ib(converter=str, type=str)
def _to_send_data(self): def _to_send_data(self):
raise NotImplementedError( raise NotImplementedError(

View File

@@ -1,4 +1,5 @@
import attr import attr
import datetime
from ._core import log, attrs_default, Image from ._core import log, attrs_default, Image
from . import _util, _session, _plan, _thread from . import _util, _session, _plan, _thread
@@ -40,7 +41,7 @@ class User(_thread.ThreadABC):
#: The session to use when making requests. #: The session to use when making requests.
session = attr.ib(type=_session.Session) session = attr.ib(type=_session.Session)
#: The user's unique identifier. #: The user's unique identifier.
id = attr.ib(converter=str) id = attr.ib(converter=str, type=str)
def _to_send_data(self): def _to_send_data(self):
return { return {
@@ -78,35 +79,35 @@ class UserData(User):
""" """
#: The user's picture #: The user's picture
photo = attr.ib() photo = attr.ib(type=Image)
#: The name of the user #: The name of the user
name = attr.ib() name = attr.ib(type=str)
#: Whether the user and the client are friends #: Whether the user and the client are friends
is_friend = attr.ib() is_friend = attr.ib(type=bool)
#: The users first name #: The users first name
first_name = attr.ib() first_name = attr.ib(type=str)
#: The users last name #: The users last name
last_name = attr.ib(None) last_name = attr.ib(None, type=str)
#: Datetime when the thread was last active / when the last message was sent #: Datetime when the thread was last active / when the last message was sent
last_active = attr.ib(None) last_active = attr.ib(None, type=datetime.datetime)
#: Number of messages in the thread #: Number of messages in the thread
message_count = attr.ib(None) message_count = attr.ib(None, type=int)
#: Set `Plan` #: Set `Plan`
plan = attr.ib(None) plan = attr.ib(None, type=_plan.PlanData)
#: The profile URL. ``None`` for Messenger-only users #: The profile URL. ``None`` for Messenger-only users
url = attr.ib(None) url = attr.ib(None, type=str)
#: The user's gender #: The user's gender
gender = attr.ib(None) gender = attr.ib(None, type=str)
#: From 0 to 1. How close the client is to the user #: From 0 to 1. How close the client is to the user
affinity = attr.ib(None) affinity = attr.ib(None, type=float)
#: The user's nickname #: The user's nickname
nickname = attr.ib(None) nickname = attr.ib(None, type=str)
#: The clients nickname, as seen by the user #: The clients nickname, as seen by the user
own_nickname = attr.ib(None) own_nickname = attr.ib(None, type=str)
#: The message color #: The message color
color = attr.ib(None) color = attr.ib(None, type=str)
#: The default emoji #: The default emoji
emoji = attr.ib(None) emoji = attr.ib(None, type=str)
@staticmethod @staticmethod
def _get_other_user(data): def _get_other_user(data):
@@ -197,11 +198,11 @@ class UserData(User):
@attr.s @attr.s
class ActiveStatus: class ActiveStatus:
#: Whether the user is active now #: Whether the user is active now
active = attr.ib(None) active = attr.ib(None, type=bool)
#: Datetime when the user was last active #: Datetime when the user was last active
last_active = attr.ib(None) last_active = attr.ib(None, type=datetime.datetime)
#: Whether the user is playing Messenger game now #: Whether the user is playing Messenger game now
in_game = attr.ib(None) in_game = attr.ib(None, type=bool)
@classmethod @classmethod
def _from_orca_presence(cls, data): def _from_orca_presence(cls, data):

View File

@@ -7,7 +7,7 @@ import urllib.parse
from ._core import log from ._core import log
from . import _exception from . import _exception
from typing import Iterable, Optional from typing import Iterable, Optional, Any
#: Default list of user agents #: Default list of user agents
USER_AGENTS = [ USER_AGENTS = [
@@ -42,12 +42,12 @@ def now():
return int(time.time() * 1000) return int(time.time() * 1000)
def json_minimal(data): def json_minimal(data: Any) -> str:
"""Get JSON data in minimal form.""" """Get JSON data in minimal form."""
return json.dumps(data, separators=(",", ":")) return json.dumps(data, separators=(",", ":"))
def strip_json_cruft(text): def strip_json_cruft(text: str) -> str:
"""Removes `for(;;);` (and other cruft) that preceeds JSON responses.""" """Removes `for(;;);` (and other cruft) that preceeds JSON responses."""
try: try:
return text[text.index("{") :] return text[text.index("{") :]
@@ -63,7 +63,7 @@ def get_decoded(content):
return content.decode("utf-8") return content.decode("utf-8")
def parse_json(content): def parse_json(content: str) -> Any:
try: try:
return json.loads(content) return json.loads(content)
except ValueError as e: except ValueError as e:
@@ -134,14 +134,7 @@ def get_jsmods_require(j, index):
return None return None
def require_list(list_): def mimetype_to_key(mimetype: str) -> str:
if isinstance(list_, list):
return set(list_)
else:
return set([list_])
def mimetype_to_key(mimetype):
if not mimetype: if not mimetype:
return "file_id" return "file_id"
if mimetype == "image/gif": if mimetype == "image/gif":
@@ -152,22 +145,22 @@ def mimetype_to_key(mimetype):
return "file_id" return "file_id"
def get_url_parameters(url, *args): def get_url_parameters(url: str, *args):
params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
return [params[arg][0] for arg in args if params.get(arg)] return [params[arg][0] for arg in args if params.get(arg)]
def get_url_parameter(url, param): def get_url_parameter(url: str, param: str) -> str:
return get_url_parameters(url, param)[0] return get_url_parameters(url, param)[0]
def prefix_url(url): def prefix_url(url: str) -> str:
if url.startswith("/"): if url.startswith("/"):
return "https://www.facebook.com" + url return "https://www.facebook.com" + url
return url return url
def seconds_to_datetime(timestamp_in_seconds): def seconds_to_datetime(timestamp_in_seconds: float) -> datetime.datetime:
"""Convert an UTC timestamp to a timezone-aware datetime object.""" """Convert an UTC timestamp to a timezone-aware datetime object."""
# `.utcfromtimestamp` will return a "naive" datetime object, which is why we use the # `.utcfromtimestamp` will return a "naive" datetime object, which is why we use the
# following: # following:
@@ -176,12 +169,12 @@ def seconds_to_datetime(timestamp_in_seconds):
) )
def millis_to_datetime(timestamp_in_milliseconds): def millis_to_datetime(timestamp_in_milliseconds: int) -> datetime.datetime:
"""Convert an UTC timestamp, in milliseconds, to a timezone-aware datetime.""" """Convert an UTC timestamp, in milliseconds, to a timezone-aware datetime."""
return seconds_to_datetime(timestamp_in_milliseconds / 1000) return seconds_to_datetime(timestamp_in_milliseconds / 1000)
def datetime_to_seconds(dt): def datetime_to_seconds(dt: datetime.datetime) -> int:
"""Convert a datetime to an UTC timestamp. """Convert a datetime to an UTC timestamp.
Naive datetime objects are presumed to represent time in the system timezone. Naive datetime objects are presumed to represent time in the system timezone.
@@ -193,7 +186,7 @@ def datetime_to_seconds(dt):
return round(dt.timestamp()) return round(dt.timestamp())
def datetime_to_millis(dt): def datetime_to_millis(dt: datetime.datetime) -> int:
"""Convert a datetime to an UTC timestamp, in milliseconds. """Convert a datetime to an UTC timestamp, in milliseconds.
Naive datetime objects are presumed to represent time in the system timezone. Naive datetime objects are presumed to represent time in the system timezone.
@@ -203,17 +196,17 @@ def datetime_to_millis(dt):
return round(dt.timestamp() * 1000) return round(dt.timestamp() * 1000)
def seconds_to_timedelta(seconds): def seconds_to_timedelta(seconds: float) -> datetime.timedelta:
"""Convert seconds to a timedelta.""" """Convert seconds to a timedelta."""
return datetime.timedelta(seconds=seconds) return datetime.timedelta(seconds=seconds)
def millis_to_timedelta(milliseconds): def millis_to_timedelta(milliseconds: int) -> datetime.timedelta:
"""Convert a duration (in milliseconds) to a timedelta object.""" """Convert a duration (in milliseconds) to a timedelta object."""
return datetime.timedelta(milliseconds=milliseconds) return datetime.timedelta(milliseconds=milliseconds)
def timedelta_to_seconds(td): def timedelta_to_seconds(td: datetime.timedelta) -> int:
"""Convert a timedelta to seconds. """Convert a timedelta to seconds.
The returned seconds will be rounded to the nearest whole number. The returned seconds will be rounded to the nearest whole number.

0
fbchat/py.typed Normal file
View File

View File

@@ -8,7 +8,6 @@ from fbchat._util import (
generate_message_id, generate_message_id,
get_signature_id, get_signature_id,
get_jsmods_require, get_jsmods_require,
require_list,
mimetype_to_key, mimetype_to_key,
get_url_parameter, get_url_parameter,
prefix_url, prefix_url,
@@ -105,13 +104,6 @@ def test_get_jsmods_require_get_image_url():
assert get_jsmods_require(data, 3) == url assert get_jsmods_require(data, 3) == url
def test_require_list():
assert require_list([]) == set()
assert require_list([1, 2, 2]) == {1, 2}
assert require_list(1) == {1}
assert require_list("abc") == {"abc"}
def test_mimetype_to_key(): def test_mimetype_to_key():
assert mimetype_to_key(None) == "file_id" assert mimetype_to_key(None) == "file_id"
assert mimetype_to_key("image/gif") == "gif_id" assert mimetype_to_key("image/gif") == "gif_id"