Compare commits

...

18 Commits

Author SHA1 Message Date
Mads Marquart
db284cefdf Bump version: 2.0.0a2 -> 2.0.0a3 2020-05-07 11:10:42 +02:00
Mads Marquart
d11f417caa Make logins persistent 2020-05-07 10:56:47 +02:00
Mads Marquart
3b71258f2c Fix tests 2020-05-07 10:23:29 +02:00
Mads Marquart
81584d328b Add more session tests and small error improvements 2020-05-07 10:15:51 +02:00
Mads Marquart
7be2acad7d Initial re-add of 2FA 2020-05-06 23:34:27 +02:00
Mads Marquart
079d4093c4 Use messenger.com URLs instead of facebook.com
This should allow people who have only created a messenger account to
log in.

Also parse required `fb_dtsg` and `client_revision` values better.

The 2-fa flow is removed for now, I'll re-add it later.
2020-05-06 21:57:24 +02:00
Mads Marquart
cce947b18c Fix docs warnings 2020-05-06 13:31:09 +02:00
Mads Marquart
2545a01450 Re-add a few online tests, to easily check when Facebook breaks stuff 2020-05-06 13:31:09 +02:00
Mads Marquart
5d763dfbce Merge pull request #559 from xaadu/patch-1
Fix mistake in session handling example
2020-05-06 11:33:21 +02:00
Mads Marquart
0981be42b9 Fix errors in examples 2020-05-06 11:32:22 +02:00
Abdullah Zayed
93b71bf198 First Object then File Pointer
json.dump() receives object as first argument and File Pointer as 2nd argument.
2020-04-28 12:58:19 +06:00
Mads Marquart
af3758c8a9 Fix TitleSet.title attribute 2020-03-13 11:21:33 +01:00
Mads Marquart
f64c487a2d Bump version: 2.0.0a1 -> 2.0.0a2 2020-03-11 15:45:02 +01:00
Mads Marquart
11534604fe Remove user agent randomization
Caused problems with logging in, and didn't really help on anything
2020-03-11 15:44:34 +01:00
Mads Marquart
9990952fa6 Add Connect and Disconnect events 2020-03-11 15:27:00 +01:00
Mads Marquart
7ee7361646 Clean up event parsing 2020-03-11 15:10:25 +01:00
Mads Marquart
89c6af516c Fix various documentation mistakes 2020-03-11 15:00:50 +01:00
Mads Marquart
c27f599e37 Fix type specifiers in models 2020-03-11 14:43:28 +01:00
34 changed files with 726 additions and 364 deletions

View File

@@ -15,4 +15,6 @@ python:
# Build documentation in the docs/ directory with Sphinx # Build documentation in the docs/ directory with Sphinx
sphinx: sphinx:
configuration: docs/conf.py configuration: docs/conf.py
fail_on_warning: true # Disabled, until we can find a way to get sphinx-autodoc-typehints play nice with our
# module renaming!
fail_on_warning: false

View File

@@ -4,5 +4,8 @@ Exceptions
.. autoexception:: FacebookError() .. autoexception:: FacebookError()
.. autoexception:: HTTPError() .. autoexception:: HTTPError()
.. autoexception:: ParseError() .. autoexception:: ParseError()
.. autoexception:: NotLoggedIn()
.. autoexception:: ExternalError() .. autoexception:: ExternalError()
.. autoexception:: GraphQLError() .. autoexception:: GraphQLError()
.. autoexception:: InvalidParameters()
.. autoexception:: PleaseRefresh()

View File

@@ -1,3 +1,4 @@
.. highlight:: sh
.. See README.rst for explanation of these markers .. See README.rst for explanation of these markers
.. include:: ../README.rst .. include:: ../README.rst

View File

@@ -46,7 +46,7 @@ A thread basically just means "something I can chat with", but more precisely, i
- The conversation between you and a single Facebook user (`User`) - The conversation between you and a single Facebook user (`User`)
- The conversation between you and a Facebook Page (`Page`) - The conversation between you and a Facebook Page (`Page`)
You can get your own user ID with `Session.user.id`. You can get your own user ID from `Session.user` with ``session.user.id``.
Getting the ID of a specific group thread is fairly trivial, you only need to login to `<https://www.messenger.com/>`_, click on the group you want to find the ID of, and then read the id from the address bar. Getting the ID of a specific group thread is fairly trivial, you only need to login to `<https://www.messenger.com/>`_, click on the group you want to find the ID of, and then read the id from the address bar.
The URL will look something like this: ``https://www.messenger.com/t/1234567890``, where ``1234567890`` would be the ID of the group. The URL will look something like this: ``https://www.messenger.com/t/1234567890``, where ``1234567890`` would be the ID of the group.
@@ -111,19 +111,19 @@ Some functionality, like adding users to a `Group`, or blocking a `User`, logica
With that out of the way, let's see some examples! With that out of the way, let's see some examples!
The simplest way of interracting with a thread is by sending a message:: The simplest way of interacting with a thread is by sending a message::
# Send a message to the user # Send a message to the user
message = user.send_text("test message") message = user.send_text("test message")
There are many types of messages you can send, see the full API documentation for more. There are many types of messages you can send, see the full API documentation for more.
Notice how we held on to the sent message? The return type i a `Message` instance, so you can interract with it afterwards:: Notice how we held on to the sent message? The return type i a `Message` instance, so you can interact with it afterwards::
# React to the message with the 😍 emoji # React to the message with the 😍 emoji
message.react("😍") message.react("😍")
Besides sending messages, you can also interract with threads in other ways. An example is to change the thread color:: Besides sending messages, you can also interact with threads in other ways. An example is to change the thread color::
# Will change the thread color to the default blue # Will change the thread color to the default blue
thread.set_color("#0084ff") thread.set_color("#0084ff")

View File

@@ -65,5 +65,5 @@ print("thread's name: {}".format(thread.name))
images = list(thread.fetch_images(limit=20)) images = list(thread.fetch_images(limit=20))
for image in images: for image in images:
if isinstance(image, fbchat.ImageAttachment): if isinstance(image, fbchat.ImageAttachment):
url = c.fetch_image_url(image.id) url = client.fetch_image_url(image.id)
print(url) print(url)

View File

@@ -80,7 +80,7 @@ def on_person_removed(sender, event: fbchat.PersonRemoved):
return return
if event.author.id != session.user.id: if event.author.id != session.user.id:
print(f"{event.removed.id} got removed. They will be re-added") print(f"{event.removed.id} got removed. They will be re-added")
event.thread.add_participants([removed.id]) event.thread.add_participants([event.removed.id])
# Login, and start listening for events # Login, and start listening for events

View File

@@ -5,7 +5,7 @@ def on_message(event):
# We can only kick people from group chats, so no need to try if it's a user chat # We can only kick people from group chats, so no need to try if it's a user chat
if not isinstance(event.thread, fbchat.Group): if not isinstance(event.thread, fbchat.Group):
return return
if message.text == "Remove me!": if event.message.text == "Remove me!":
print(f"{event.author.id} will be removed from {event.thread.id}") print(f"{event.author.id} will be removed from {event.thread.id}")
event.thread.remove_participant(event.author.id) event.thread.remove_participant(event.author.id)

View File

@@ -18,7 +18,7 @@ def load_cookies(filename):
def save_cookies(filename, cookies): def save_cookies(filename, cookies):
with open(filename, "w") as f: with open(filename, "w") as f:
json.dump(f, cookies) json.dump(cookies, f)
def load_session(cookies): def load_session(cookies):

View File

@@ -65,6 +65,7 @@ from ._models import (
EmojiSize, EmojiSize,
Mention, Mention,
Message, Message,
MessageSnippet,
MessageData, MessageData,
) )
@@ -74,6 +75,8 @@ from ._events import (
Event, Event,
UnknownEvent, UnknownEvent,
ThreadEvent, ThreadEvent,
Connect,
Disconnect,
# _client_payload # _client_payload
ReactionEvent, ReactionEvent,
UserStatusEvent, UserStatusEvent,
@@ -115,7 +118,7 @@ from ._listen import Listener
from ._client import Client from ._client import Client
__version__ = "2.0.0a1" __version__ = "2.0.0a3"
__all__ = ("Session", "Listener", "Client") __all__ = ("Session", "Listener", "Client")

View File

@@ -33,10 +33,10 @@ class Client:
But does not include deactivated, deleted or memorialized users (logically, But does not include deactivated, deleted or memorialized users (logically,
since you can't chat with those). since you can't chat with those).
The order these are returned is arbitary. The order these are returned is arbitrary.
Example: Example:
Get the name of an arbitary user that you're currently chatting with. Get the name of an arbitrary user that you're currently chatting with.
>>> users = client.fetch_users() >>> users = client.fetch_users()
>>> users[0].name >>> users[0].name
@@ -211,7 +211,7 @@ class Client:
Warning! If someone send a message to a thread that matches the query, while Warning! If someone send a message to a thread that matches the query, while
we're searching, some snippets will get returned twice, and some will be lost. we're searching, some snippets will get returned twice, and some will be lost.
This is fundamentally unfixable, it's just how the endpoint is implemented. This is fundamentally not fixable, it's just how the endpoint is implemented.
Args: Args:
query: Text to search for query: Text to search for
@@ -524,7 +524,9 @@ class Client:
data = {"voice_clip": voice_clip} data = {"voice_clip": voice_clip}
j = self.session._payload_post( j = self.session._payload_post(
"https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict "https://upload.messenger.com/ajax/mercury/upload.php",
data,
files=file_dict,
) )
if len(j["metadata"]) != len(file_dict): if len(j["metadata"]) != len(file_dict):

View File

@@ -24,8 +24,7 @@ class Typing(ThreadEvent):
return cls(author=author, thread=author, status=status) return cls(author=author, thread=author, status=status)
@classmethod @classmethod
def _parse(cls, session, data): def _parse_thread_typing(cls, session, data):
# TODO: Rename this method
author = _threads.User(session=session, id=str(data["sender_fbid"])) author = _threads.User(session=session, id=str(data["sender_fbid"]))
thread = _threads.Group(session=session, id=str(data["thread"])) thread = _threads.Group(session=session, id=str(data["thread"]))
status = data["state"] == 1 status = data["state"] == 1
@@ -68,6 +67,25 @@ class Presence(Event):
return cls(statuses=statuses, full=data["list_type"] == "full") return cls(statuses=statuses, full=data["list_type"] == "full")
@attrs_event
class Connect(Event):
"""The client was connected to Facebook.
This is not guaranteed to be triggered the same amount of times `Disconnect`!
"""
@attrs_event
class Disconnect(Event):
"""The client lost the connection to Facebook.
This is not guaranteed to be triggered the same amount of times `Connect`!
"""
#: The reason / error string for the disconnect
reason = attr.ib(type=str)
def parse_events(session, topic, data): def parse_events(session, topic, data):
# See Mqtt._configure_connect_options for information about these topics # See Mqtt._configure_connect_options for information about these topics
try: try:
@@ -90,7 +108,7 @@ def parse_events(session, topic, data):
) from e ) from e
elif topic == "/thread_typing": elif topic == "/thread_typing":
yield Typing._parse(session, data) yield Typing._parse_thread_typing(session, data)
elif topic == "/orca_typing_notifications": elif topic == "/orca_typing_notifications":
yield Typing._parse_orca(session, data) yield Typing._parse_orca(session, data)

View File

@@ -1,5 +1,4 @@
import attr import attr
import abc
from .._common import kw_only from .._common import kw_only
from .. import _exception, _util, _threads from .. import _exception, _util, _threads
@@ -10,14 +9,9 @@ attrs_event = attr.s(slots=True, kw_only=kw_only, frozen=True)
@attrs_event @attrs_event
class Event(metaclass=abc.ABCMeta): class Event:
"""Base class for all events.""" """Base class for all events."""
@classmethod
@abc.abstractmethod
def _parse(cls, session, data):
raise NotImplementedError
@staticmethod @staticmethod
def _get_thread(session, data): def _get_thread(session, data):
# TODO: Handle pages? Is it even possible? # TODO: Handle pages? Is it even possible?
@@ -60,3 +54,9 @@ class ThreadEvent(Event):
thread = cls._get_thread(session, metadata) thread = cls._get_thread(session, metadata)
at = _util.millis_to_datetime(int(metadata["timestamp"])) at = _util.millis_to_datetime(int(metadata["timestamp"]))
return author, thread, at return author, thread, at
@classmethod
def _parse_fetch(cls, session, data):
author = _threads.User(session=session, id=data["message_sender"]["id"])
at = _util.millis_to_datetime(int(data["timestamp_precise"]))
return author, at

View File

@@ -54,15 +54,15 @@ class TitleSet(ThreadEvent):
"""Somebody changed a group's title.""" """Somebody changed a group's title."""
thread = attr.ib(type="_threads.Group") # Set the correct type thread = attr.ib(type="_threads.Group") # Set the correct type
#: The new title #: The new title. If ``None``, the title was removed
title = attr.ib(type=str) title = attr.ib(type=Optional[str])
#: When the title was set #: When the title was set
at = attr.ib(type=datetime.datetime) at = attr.ib(type=datetime.datetime)
@classmethod @classmethod
def _parse(cls, session, data): def _parse(cls, session, data):
author, thread, at = cls._parse_metadata(session, data) author, thread, at = cls._parse_metadata(session, data)
return cls(author=author, thread=thread, title=data["name"], at=at) return cls(author=author, thread=thread, title=data["name"] or None, at=at)
@attrs_event @attrs_event

View File

@@ -1,7 +1,7 @@
import attr import attr
import requests import requests
from typing import Any from typing import Any, Optional
# Not frozen, since that doesn't work in PyPy # Not frozen, since that doesn't work in PyPy
@attr.s(slots=True, auto_exc=True) @attr.s(slots=True, auto_exc=True)
@@ -20,7 +20,7 @@ class HTTPError(FacebookError):
"""Base class for errors with the HTTP(s) connection to Facebook.""" """Base class for errors with the HTTP(s) connection to Facebook."""
#: The returned HTTP status code, if relevant #: The returned HTTP status code, if relevant
status_code = attr.ib(None, type=int) status_code = attr.ib(None, type=Optional[int])
def __str__(self): def __str__(self):
if not self.status_code: if not self.status_code:
@@ -58,7 +58,7 @@ class ExternalError(FacebookError):
#: The error message that Facebook returned (Possibly in the user's own language) #: The error message that Facebook returned (Possibly in the user's own language)
description = attr.ib(type=str) description = attr.ib(type=str)
#: The error code that Facebook returned #: The error code that Facebook returned
code = attr.ib(None, type=int) code = attr.ib(None, type=Optional[int])
def __str__(self): def __str__(self):
if self.code: if self.code:
@@ -73,7 +73,7 @@ class GraphQLError(ExternalError):
# TODO: Handle multiple errors # TODO: Handle multiple errors
#: Query debug information #: Query debug information
debug_info = attr.ib(None, type=str) debug_info = attr.ib(None, type=Optional[str])
def __str__(self): def __str__(self):
if self.debug_info: if self.debug_info:

View File

@@ -5,10 +5,10 @@ import requests
from ._common import log, kw_only from ._common import log, kw_only
from . import _util, _exception, _session, _graphql, _events from . import _util, _exception, _session, _graphql, _events
from typing import Iterable, Optional, Mapping from typing import Iterable, Optional, Mapping, List
HOST = "edge-chat.facebook.com" HOST = "edge-chat.messenger.com"
TOPICS = [ TOPICS = [
# Things that happen in chats (e.g. messages) # Things that happen in chats (e.g. messages)
@@ -118,7 +118,7 @@ class Listener:
_mqtt = attr.ib(factory=mqtt_factory, type=paho.mqtt.client.Client) _mqtt = attr.ib(factory=mqtt_factory, type=paho.mqtt.client.Client)
_sync_token = attr.ib(None, type=Optional[str]) _sync_token = attr.ib(None, type=Optional[str])
_sequence_id = attr.ib(None, type=Optional[int]) _sequence_id = attr.ib(None, type=Optional[int])
_tmp_events = attr.ib(factory=list, type=Iterable[_events.Event]) _tmp_events = attr.ib(factory=list, type=List[_events.Event])
def __attrs_post_init__(self): def __attrs_post_init__(self):
# Configure callbacks # Configure callbacks
@@ -152,6 +152,7 @@ class Listener:
"The MQTT listener was disconnected for too long," "The MQTT listener was disconnected for too long,"
" events may have been lost" " events may have been lost"
) )
# TODO: Find a way to tell the user that they may now be missing events
self._sync_token = None self._sync_token = None
self._sequence_id = None self._sequence_id = None
return False return False
@@ -270,10 +271,10 @@ class Listener:
headers = { headers = {
"Cookie": get_cookie_header( "Cookie": get_cookie_header(
self.session._session, "https://edge-chat.facebook.com/chat" self.session._session, "https://edge-chat.messenger.com/chat"
), ),
"User-Agent": self.session._session.headers["User-Agent"], "User-Agent": self.session._session.headers["User-Agent"],
"Origin": "https://www.facebook.com", "Origin": "https://www.messenger.com",
"Host": HOST, "Host": HOST,
} }
@@ -282,11 +283,12 @@ class Listener:
path="/chat?sid={}".format(session_id), headers=headers path="/chat?sid={}".format(session_id), headers=headers
) )
def _reconnect(self): def _reconnect(self) -> bool:
# Try reconnecting # Try reconnecting
self._configure_connect_options() self._configure_connect_options()
try: try:
self._mqtt.reconnect() self._mqtt.reconnect()
return True
except ( except (
# Taken from .loop_forever # Taken from .loop_forever
paho.mqtt.client.socket.error, paho.mqtt.client.socket.error,
@@ -296,6 +298,7 @@ class Listener:
log.debug("MQTT reconnection failed: %s", e) log.debug("MQTT reconnection failed: %s", e)
# Wait before reconnecting # Wait before reconnecting
self._mqtt._reconnect_wait() self._mqtt._reconnect_wait()
return False
def listen(self) -> Iterable[_events.Event]: def listen(self) -> Iterable[_events.Event]:
"""Run the listening loop continually. """Run the listening loop continually.
@@ -315,12 +318,10 @@ class Listener:
self._sequence_id = fetch_sequence_id(self.session) self._sequence_id = fetch_sequence_id(self.session)
# Make sure we're connected # Make sure we're connected
while True: while not self._reconnect():
# Beware, internal API, may have to change this to something more stable! pass
if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async:
self._reconnect() yield _events.Connect()
else:
break
while True: while True:
rc = self._mqtt.loop(timeout=1.0) rc = self._mqtt.loop(timeout=1.0)
@@ -339,18 +340,23 @@ class Listener:
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
# If known/expected error # If known/expected error
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
log.warning("Connection lost, retrying") yield _events.Disconnect(reason="Connection lost, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified # This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340 # See https://github.com/eclipse/paho.mqtt.python/issues/340
log.warning("Connection error, retrying") yield _events.Disconnect(reason="Connection error, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED: elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
raise _exception.NotLoggedIn("MQTT connection refused") raise _exception.NotLoggedIn("MQTT connection refused")
else: else:
err = paho.mqtt.client.error_string(rc) err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err) log.error("MQTT Error: %s", err)
reason = "MQTT Error: {}, retrying".format(err)
yield _events.Disconnect(reason=reason)
self._reconnect() while not self._reconnect():
pass
yield _events.Connect()
if self._tmp_events: if self._tmp_events:
yield from self._tmp_events yield from self._tmp_events
@@ -364,7 +370,7 @@ class Listener:
The `Listener` object should not be used after this is called! The `Listener` object should not be used after this is called!
Example: Example:
Stop the listener when recieving a message with the text "/stop" Stop the listener when receiving a message with the text "/stop"
>>> for event in listener.listen(): >>> for event in listener.listen():
... if isinstance(event, fbchat.MessageEvent): ... if isinstance(event, fbchat.MessageEvent):
@@ -374,7 +380,7 @@ class Listener:
self._mqtt.disconnect() self._mqtt.disconnect()
def set_foreground(self, value: bool) -> None: def set_foreground(self, value: bool) -> None:
"""Set the `foreground` value while listening.""" """Set the ``foreground`` value while listening."""
# TODO: Document what this actually does! # TODO: Document what this actually does!
payload = _util.json_minimal({"foreground": value}) payload = _util.json_minimal({"foreground": value})
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
@@ -383,7 +389,7 @@ class Listener:
# info.wait_for_publish() # info.wait_for_publish()
def set_chat_on(self, value: bool) -> None: def set_chat_on(self, value: bool) -> None:
"""Set the `chat_on` value while listening.""" """Set the ``chat_on`` value while listening."""
# TODO: Document what this actually does! # TODO: Document what this actually does!
# TODO: Is this the right request to make? # TODO: Is this the right request to make?
data = {"make_user_available_when_in_foreground": value} data = {"make_user_available_when_in_foreground": value}

View File

@@ -3,7 +3,7 @@ from . import Image
from .._common import attrs_default from .._common import attrs_default
from .. import _util from .. import _util
from typing import Sequence from typing import Optional, Sequence
@attrs_default @attrs_default
@@ -11,7 +11,7 @@ class Attachment:
"""Represents a Facebook attachment.""" """Represents a Facebook attachment."""
#: The attachment ID #: The attachment ID
id = attr.ib(None, type=str) id = attr.ib(None, type=Optional[str])
@attrs_default @attrs_default
@@ -24,21 +24,21 @@ class ShareAttachment(Attachment):
"""Represents a shared item (e.g. URL) attachment.""" """Represents a shared item (e.g. URL) attachment."""
#: ID of the author of the shared post #: ID of the author of the shared post
author = attr.ib(None, type=str) author = attr.ib(None, type=Optional[str])
#: Target URL #: Target URL
url = attr.ib(None, type=str) url = attr.ib(None, type=Optional[str])
#: Original URL if Facebook redirects the URL #: Original URL if Facebook redirects the URL
original_url = attr.ib(None, type=str) original_url = attr.ib(None, type=Optional[str])
#: Title of the attachment #: Title of the attachment
title = attr.ib(None, type=str) title = attr.ib(None, type=Optional[str])
#: Description of the attachment #: Description of the attachment
description = attr.ib(None, type=str) description = attr.ib(None, type=Optional[str])
#: Name of the source #: Name of the source
source = attr.ib(None, type=str) source = attr.ib(None, type=Optional[str])
#: The attached image #: The attached image
image = attr.ib(None, type=Image) image = attr.ib(None, type=Optional[Image])
#: URL of the original image if Facebook uses ``safe_image`` #: URL of the original image if Facebook uses ``safe_image``
original_image_url = attr.ib(None, type=str) original_image_url = attr.ib(None, type=Optional[str])
#: List of additional attachments #: List of additional attachments
attachments = attr.ib(factory=list, type=Sequence[Attachment]) attachments = attr.ib(factory=list, type=Sequence[Attachment])

View File

@@ -4,6 +4,8 @@ import enum
from .._common import attrs_default from .._common import attrs_default
from .. import _util from .. import _util
from typing import Optional
class ThreadLocation(enum.Enum): class ThreadLocation(enum.Enum):
"""Used to specify where a thread is located (inbox, pending, archived, other).""" """Used to specify where a thread is located (inbox, pending, archived, other)."""
@@ -21,11 +23,11 @@ class ThreadLocation(enum.Enum):
@attrs_default @attrs_default
class ActiveStatus: class ActiveStatus:
#: Whether the user is active now #: Whether the user is active now
active = attr.ib(None, type=bool) active = attr.ib(type=bool)
#: Datetime when the user was last active #: When the user was last active
last_active = attr.ib(None, type=datetime.datetime) last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Whether the user is playing Messenger game now #: Whether the user is playing Messenger game now
in_game = attr.ib(None, type=bool) in_game = attr.ib(None, type=Optional[bool])
@classmethod @classmethod
def _from_orca_presence(cls, data): def _from_orca_presence(cls, data):
@@ -42,9 +44,9 @@ class Image:
#: URL to the image #: URL to the image
url = attr.ib(type=str) url = attr.ib(type=str)
#: Width of the image #: Width of the image
width = attr.ib(None, type=int) width = attr.ib(None, type=Optional[int])
#: Height of the image #: Height of the image
height = attr.ib(None, type=int) height = attr.ib(None, type=Optional[int])
@classmethod @classmethod
def _from_uri(cls, data): def _from_uri(cls, data):

View File

@@ -4,7 +4,7 @@ from . import Image, Attachment
from .._common import attrs_default from .._common import attrs_default
from .. import _util from .. import _util
from typing import Set from typing import Set, Optional
@attrs_default @attrs_default
@@ -12,13 +12,13 @@ class FileAttachment(Attachment):
"""Represents a file that has been sent as a Facebook attachment.""" """Represents a file that has been sent as a Facebook attachment."""
#: URL where you can download the file #: URL where you can download the file
url = attr.ib(None, type=str) url = attr.ib(None, type=Optional[str])
#: Size of the file in bytes #: Size of the file in bytes
size = attr.ib(None, type=int) size = attr.ib(None, type=Optional[int])
#: Name of the file #: Name of the file
name = attr.ib(None, type=str) name = attr.ib(None, type=Optional[str])
#: Whether Facebook determines that this file may be harmful #: Whether Facebook determines that this file may be harmful
is_malicious = attr.ib(None, type=bool) is_malicious = attr.ib(None, type=Optional[bool])
@classmethod @classmethod
def _from_graphql(cls, data, size=None): def _from_graphql(cls, data, size=None):
@@ -36,13 +36,13 @@ class AudioAttachment(Attachment):
"""Represents an audio file that has been sent as a Facebook attachment.""" """Represents an audio file that has been sent as a Facebook attachment."""
#: Name of the file #: Name of the file
filename = attr.ib(None, type=str) filename = attr.ib(None, type=Optional[str])
#: URL of the audio file #: URL of the audio file
url = attr.ib(None, type=str) url = attr.ib(None, type=Optional[str])
#: Duration of the audio clip as a timedelta #: Duration of the audio clip
duration = attr.ib(None, type=datetime.timedelta) duration = attr.ib(None, type=Optional[datetime.timedelta])
#: Audio type #: Audio type
audio_type = attr.ib(None, type=str) audio_type = attr.ib(None, type=Optional[str])
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):
@@ -63,13 +63,13 @@ class ImageAttachment(Attachment):
""" """
#: The extension of the original image (e.g. ``png``) #: The extension of the original image (e.g. ``png``)
original_extension = attr.ib(None, type=str) original_extension = attr.ib(None, type=Optional[str])
#: Width of original image #: Width of original image
width = attr.ib(None, converter=lambda x: None if x is None else int(x), type=int) width = attr.ib(None, converter=_util.int_or_none, type=Optional[int])
#: Height of original image #: Height of original image
height = attr.ib(None, converter=lambda x: None if x is None else int(x), type=int) height = attr.ib(None, converter=_util.int_or_none, type=Optional[int])
#: Whether the image is animated #: Whether the image is animated
is_animated = attr.ib(None, type=bool) is_animated = attr.ib(None, type=Optional[bool])
#: A set, containing variously sized / various types of previews of the image #: A set, containing variously sized / various types of previews of the image
previews = attr.ib(factory=set, type=Set[Image]) previews = attr.ib(factory=set, type=Set[Image])
@@ -113,15 +113,15 @@ class VideoAttachment(Attachment):
"""Represents a video that has been sent as a Facebook attachment.""" """Represents a video that has been sent as a Facebook attachment."""
#: Size of the original video in bytes #: Size of the original video in bytes
size = attr.ib(None, type=int) size = attr.ib(None, type=Optional[int])
#: Width of original video #: Width of original video
width = attr.ib(None, type=int) width = attr.ib(None, type=Optional[int])
#: Height of original video #: Height of original video
height = attr.ib(None, type=int) height = attr.ib(None, type=Optional[int])
#: Length of video as a timedelta #: Length of video
duration = attr.ib(None, type=datetime.timedelta) duration = attr.ib(None, type=Optional[datetime.timedelta])
#: URL to very compressed preview video #: URL to very compressed preview video
preview_url = attr.ib(None, type=str) preview_url = attr.ib(None, type=Optional[str])
#: A set, containing variously sized previews of the video #: A set, containing variously sized previews of the video
previews = attr.ib(factory=set, type=Set[Image]) previews = attr.ib(factory=set, type=Set[Image])

View File

@@ -1,8 +1,11 @@
import attr import attr
import datetime
from . import Image, Attachment from . import Image, Attachment
from .._common import attrs_default from .._common import attrs_default
from .. import _util, _exception from .. import _util, _exception
from typing import Optional
@attrs_default @attrs_default
class LocationAttachment(Attachment): class LocationAttachment(Attachment):
@@ -12,15 +15,15 @@ class LocationAttachment(Attachment):
""" """
#: Latitude of the location #: Latitude of the location
latitude = attr.ib(None, type=float) latitude = attr.ib(None, type=Optional[float])
#: Longitude of the location #: Longitude of the location
longitude = attr.ib(None, type=float) longitude = attr.ib(None, type=Optional[float])
#: Image showing the map of the location #: Image showing the map of the location
image = attr.ib(None, type=Image) image = attr.ib(None, type=Optional[Image])
#: URL to Bing maps with the location #: URL to Bing maps with the location
url = attr.ib(None, type=str) url = attr.ib(None, type=Optional[str])
# Address of the location # Address of the location
address = attr.ib(None, type=str) address = attr.ib(None, type=Optional[str])
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):
@@ -51,11 +54,11 @@ class LiveLocationAttachment(LocationAttachment):
"""Represents a live user location.""" """Represents a live user location."""
#: Name of the location #: Name of the location
name = attr.ib(None) name = attr.ib(None, type=Optional[str])
#: Datetime when live location expires #: When live location expires
expires_at = attr.ib(None) expires_at = attr.ib(None, type=Optional[datetime.datetime])
#: True if live location is expired #: True if live location is expired
is_expired = attr.ib(None) is_expired = attr.ib(None, type=Optional[bool])
@classmethod @classmethod
def _from_pull(cls, data): def _from_pull(cls, data):

View File

@@ -4,8 +4,8 @@ import enum
from string import Formatter from string import Formatter
from . import _attachment, _location, _file, _quick_reply, _sticker from . import _attachment, _location, _file, _quick_reply, _sticker
from .._common import log, attrs_default from .._common import log, attrs_default
from .. import _exception, _util, _session, _threads from .. import _exception, _util
from typing import Optional, Mapping, Sequence from typing import Optional, Mapping, Sequence, Any
class EmojiSize(enum.Enum): class EmojiSize(enum.Enum):
@@ -85,7 +85,7 @@ class Message:
""" """
#: The thread that this message belongs to. #: The thread that this message belongs to.
thread = attr.ib(type="_threads.ThreadABC") thread = attr.ib()
#: The message ID. #: The message ID.
id = attr.ib(converter=str, type=str) id = attr.ib(converter=str, type=str)
@@ -224,7 +224,7 @@ class MessageSnippet(Message):
#: ID of the sender #: ID of the sender
author = attr.ib(type=str) author = attr.ib(type=str)
#: Datetime of when the message was sent #: When the message was sent
created_at = attr.ib(type=datetime.datetime) created_at = attr.ib(type=datetime.datetime)
#: The actual message #: The actual message
text = attr.ib(type=str) text = attr.ib(type=str)
@@ -252,34 +252,34 @@ class MessageData(Message):
#: ID of the sender #: ID of the sender
author = attr.ib(type=str) author = attr.ib(type=str)
#: Datetime of when the message was sent #: When the message was sent
created_at = attr.ib(type=datetime.datetime) created_at = attr.ib(type=datetime.datetime)
#: The actual message #: The actual message
text = attr.ib(None, type=str) text = attr.ib(None, type=Optional[str])
#: A list of `Mention` objects #: A list of `Mention` objects
mentions = attr.ib(factory=list, type=Sequence[Mention]) mentions = attr.ib(factory=list, type=Sequence[Mention])
#: Size of a sent emoji #: Size of a sent emoji
emoji_size = attr.ib(None, type=EmojiSize) emoji_size = attr.ib(None, type=Optional[EmojiSize])
#: Whether the message is read #: Whether the message is read
is_read = attr.ib(None, type=bool) is_read = attr.ib(None, type=Optional[bool])
#: A list of people IDs who read the message, works only with `Client.fetch_thread_messages` #: People IDs who read the message, only works with `ThreadABC.fetch_messages`
read_by = attr.ib(factory=list, type=bool) read_by = attr.ib(factory=list, type=bool)
#: A dictionary with user's IDs as keys, and their reaction as values #: A dictionary with user's IDs as keys, and their reaction as values
reactions = attr.ib(factory=dict, type=Mapping[str, str]) reactions = attr.ib(factory=dict, type=Mapping[str, str])
#: A `Sticker` #: A `Sticker`
sticker = attr.ib(None, type=_sticker.Sticker) sticker = attr.ib(None, type=Optional[_sticker.Sticker])
#: A list of attachments #: A list of attachments
attachments = attr.ib(factory=list, type=Sequence[_attachment.Attachment]) attachments = attr.ib(factory=list, type=Sequence[_attachment.Attachment])
#: A list of `QuickReply` #: A list of `QuickReply`
quick_replies = attr.ib(factory=list, type=Sequence[_quick_reply.QuickReply]) quick_replies = attr.ib(factory=list, type=Sequence[_quick_reply.QuickReply])
#: Whether the message is unsent (deleted for everyone) #: Whether the message is unsent (deleted for everyone)
unsent = attr.ib(False, type=bool) unsent = attr.ib(False, type=Optional[bool])
#: Message ID you want to reply to #: Message ID you want to reply to
reply_to_id = attr.ib(None, type=str) reply_to_id = attr.ib(None, type=Optional[str])
#: Replied message #: Replied message
replied_to = attr.ib(None, type="MessageData") replied_to = attr.ib(None, type=Optional[Any])
#: Whether the message was forwarded #: Whether the message was forwarded
forwarded = attr.ib(False, type=bool) forwarded = attr.ib(False, type=Optional[bool])
@staticmethod @staticmethod
def _get_forwarded_from_tags(tags): def _get_forwarded_from_tags(tags):

View File

@@ -4,7 +4,7 @@ import enum
from .._common import attrs_default from .._common import attrs_default
from .. import _exception, _util, _session from .. import _exception, _util, _session
from typing import Mapping, Sequence from typing import Mapping, Sequence, Optional
class GuestStatus(enum.Enum): class GuestStatus(enum.Enum):
@@ -132,13 +132,13 @@ class PlanData(Plan):
#: Plan title #: Plan title
title = attr.ib(type=str) title = attr.ib(type=str)
#: Plan location name #: Plan location name
location = attr.ib(None, converter=lambda x: x or "", type=str) location = attr.ib(None, converter=lambda x: x or "", type=Optional[str])
#: Plan location ID #: Plan location ID
location_id = attr.ib(None, converter=lambda x: x or "", type=str) location_id = attr.ib(None, converter=lambda x: x or "", type=Optional[str])
#: ID of the plan creator #: ID of the plan creator
author_id = attr.ib(None, type=str) author_id = attr.ib(None, type=Optional[str])
#: `User` ids mapped to their `GuestStatus` #: `User` ids mapped to their `GuestStatus`
guests = attr.ib(None, type=Mapping[str, GuestStatus]) guests = attr.ib(None, type=Optional[Mapping[str, GuestStatus]])
@property @property
def going(self) -> Sequence[str]: def going(self) -> Sequence[str]:

View File

@@ -2,7 +2,7 @@ import attr
from . import Attachment from . import Attachment
from .._common import attrs_default from .._common import attrs_default
from typing import Any from typing import Any, Optional
@attrs_default @attrs_default
@@ -24,9 +24,9 @@ class QuickReplyText(QuickReply):
"""Represents a text quick reply.""" """Represents a text quick reply."""
#: Title of the quick reply #: Title of the quick reply
title = attr.ib(None, type=str) title = attr.ib(None, type=Optional[str])
#: URL of the quick reply image (optional) #: URL of the quick reply image
image_url = attr.ib(None, type=str) image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply #: Type of the quick reply
_type = "text" _type = "text"
@@ -43,8 +43,8 @@ class QuickReplyLocation(QuickReply):
class QuickReplyPhoneNumber(QuickReply): class QuickReplyPhoneNumber(QuickReply):
"""Represents a phone number quick reply (Doesn't work on mobile).""" """Represents a phone number quick reply (Doesn't work on mobile)."""
#: URL of the quick reply image (optional) #: URL of the quick reply image
image_url = attr.ib(None, type=str) image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply #: Type of the quick reply
_type = "user_phone_number" _type = "user_phone_number"
@@ -53,8 +53,8 @@ class QuickReplyPhoneNumber(QuickReply):
class QuickReplyEmail(QuickReply): class QuickReplyEmail(QuickReply):
"""Represents an email quick reply (Doesn't work on mobile).""" """Represents an email quick reply (Doesn't work on mobile)."""
#: URL of the quick reply image (optional) #: URL of the quick reply image
image_url = attr.ib(None, type=str) image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply #: Type of the quick reply
_type = "user_email" _type = "user_email"

View File

@@ -2,34 +2,36 @@ import attr
from . import Image, Attachment from . import Image, Attachment
from .._common import attrs_default from .._common import attrs_default
from typing import Optional
@attrs_default @attrs_default
class Sticker(Attachment): class Sticker(Attachment):
"""Represents a Facebook sticker that has been sent to a thread as an attachment.""" """Represents a Facebook sticker that has been sent to a thread as an attachment."""
#: The sticker-pack's ID #: The sticker-pack's ID
pack = attr.ib(None, type=str) pack = attr.ib(None, type=Optional[str])
#: Whether the sticker is animated #: Whether the sticker is animated
is_animated = attr.ib(False, type=bool) is_animated = attr.ib(False, type=bool)
# If the sticker is animated, the following should be present # If the sticker is animated, the following should be present
#: URL to a medium spritemap #: URL to a medium spritemap
medium_sprite_image = attr.ib(None, type=str) medium_sprite_image = attr.ib(None, type=Optional[str])
#: URL to a large spritemap #: URL to a large spritemap
large_sprite_image = attr.ib(None, type=str) large_sprite_image = attr.ib(None, type=Optional[str])
#: The amount of frames present in the spritemap pr. row #: The amount of frames present in the spritemap pr. row
frames_per_row = attr.ib(None, type=int) frames_per_row = attr.ib(None, type=Optional[int])
#: The amount of frames present in the spritemap pr. column #: The amount of frames present in the spritemap pr. column
frames_per_col = attr.ib(None, type=int) frames_per_col = attr.ib(None, type=Optional[int])
#: The total amount of frames in the spritemap #: The total amount of frames in the spritemap
frame_count = attr.ib(None, type=int) frame_count = attr.ib(None, type=Optional[int])
#: The frame rate the spritemap is intended to be played in #: The frame rate the spritemap is intended to be played in
frame_rate = attr.ib(None, type=int) frame_rate = attr.ib(None, type=Optional[int])
#: The sticker's image #: The sticker's image
image = attr.ib(None, type=Image) image = attr.ib(None, type=Optional[Image])
#: The sticker's label/name #: The sticker's label/name
label = attr.ib(None, type=str) label = attr.ib(None, type=Optional[str])
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):

View File

@@ -1,17 +1,51 @@
import attr import attr
import bs4
import datetime import datetime
import re
import requests import requests
import random import random
import urllib.parse import re
import json
# TODO: Only import when required
# Or maybe just replace usage with `html.parser`?
import bs4
from ._common import log, kw_only from ._common import log, kw_only
from . import _graphql, _util, _exception from . import _graphql, _util, _exception
from typing import Optional, Tuple, Mapping, Callable from typing import Optional, Mapping, Callable, Any
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
SERVER_JS_DEFINE_REGEX = re.compile(r'require\("ServerJSDefine"\)\)?\.handleDefines\(')
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
def parse_server_js_define(html: str) -> Mapping[str, Any]:
"""Parse ``ServerJSDefine`` entries from a HTML document."""
# Find points where we should start parsing
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
# Skip leading entry
_, *define_splits = define_splits
rtn = []
if not define_splits:
raise _exception.ParseError("Could not find any ServerJSDefine", data=html)
if len(define_splits) < 2:
raise _exception.ParseError("Could not find enough ServerJSDefine", data=html)
if len(define_splits) > 2:
raise _exception.ParseError("Found too many ServerJSDefine", data=define_splits)
# Parse entries (should be two)
for entry in define_splits:
try:
parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0)
except json.JSONDecodeError as e:
raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e
if not isinstance(parsed, list):
raise _exception.ParseError("Invalid ServerJSDefine", data=parsed)
rtn.extend(parsed)
# Convert to a dict
return _util.get_jsmods_define(rtn)
def base36encode(number: int) -> str: def base36encode(number: int) -> str:
@@ -32,7 +66,7 @@ def base36encode(number: int) -> str:
def prefix_url(url: str) -> str: def prefix_url(url: str) -> str:
if url.startswith("/"): if url.startswith("/"):
return "https://www.facebook.com" + url return "https://www.messenger.com" + url
return url return url
@@ -51,15 +85,15 @@ def get_user_id(session: requests.Session) -> str:
return str(rtn) return str(rtn)
def find_input_fields(html: str):
return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input"))
def session_factory() -> requests.Session: def session_factory() -> requests.Session:
from . import __version__
session = requests.session() session = requests.session()
session.headers["Referer"] = "https://www.facebook.com" session.headers["Referer"] = "https://www.messenger.com/"
# TODO: Deprecate setting the user agent manually # We won't try to set a fake user agent to mask our presence!
session.headers["User-Agent"] = random.choice(_util.USER_AGENTS) # Facebook allows us access anyhow, and it makes our motives clearer:
# We're not trying to cheat Facebook, we simply want to access their service
session.headers["User-Agent"] = "fbchat/{}".format(__version__)
return session return session
@@ -67,81 +101,86 @@ def client_id_factory() -> str:
return hex(int(random.random() * 2 ** 31))[2:] return hex(int(random.random() * 2 ** 31))[2:]
def is_home(url: str) -> bool: def find_form_request(html: str):
parts = urllib.parse.urlparse(url) soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form"))
# Check the urls `/home.php` and `/`
return "home" in parts.path or "/" == parts.path form = soup.form
if not form:
raise _exception.ParseError("Could not find form to submit", data=soup)
url = form.get("action")
if not url:
raise _exception.ParseError("Could not find url to submit to", data=form)
# From what I've seen, it'll always do this!
if url.startswith("/"):
url = "https://www.facebook.com" + url
# It's okay to set missing values to something crap, the values are localized, and
# hence are not available in the raw HTML
data = {
x["name"]: x.get("value", "[missing]")
for x in form.find_all(["input", "button"])
}
return url, data
def _2fa_helper(session: requests.Session, code: int, r): def two_factor_helper(session: requests.Session, r, on_2fa_callback):
soup = find_input_fields(r.text) url, data = find_form_request(r.content.decode("utf-8"))
data = dict()
url = "https://m.facebook.com/login/checkpoint/" # You don't have to type a code if your device is already saved
# Repeats if you get the code wrong
while "approvals_code" not in data:
data["approvals_code"] = on_2fa_callback()
log.info("Submitting 2FA code")
r = session.post(url, data=data, allow_redirects=False)
url, data = find_form_request(r.content.decode("utf-8"))
data["approvals_code"] = str(code) # TODO: Can be missing if checkup flow was done on another device in the meantime?
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"] if "name_action_selected" in data:
data["nh"] = soup.find("input", {"name": "nh"})["value"] data["name_action_selected"] = "save_device"
data["submit[Submit Code]"] = "Submit Code" log.info("Saving browser")
data["codes_submitted"] = "0" r = session.post(url, data=data, allow_redirects=False)
log.info("Submitting 2FA code.") url, data = find_form_request(r.content.decode("utf-8"))
r = session.post(url, data=data) log.info("Starting Facebook checkup flow")
r = session.post(url, data=data, allow_redirects=False)
if is_home(r.url): url, data = find_form_request(r.content.decode("utf-8"))
return r if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
del data["approvals_code"] data["submit[This was me]"] = "[any value]"
del data["submit[Submit Code]"] del data["submit[This wasn't me]"]
del data["codes_submitted"] log.info("Verifying login attempt")
r = session.post(url, data=data, allow_redirects=False)
url, data = find_form_request(r.content.decode("utf-8"))
if "name_action_selected" not in data:
raise _exception.ParseError("Could not fill out form properly (3)", data=data)
data["name_action_selected"] = "save_device" data["name_action_selected"] = "save_device"
data["submit[Continue]"] = "Continue" log.info("Saving device again")
log.info("Saving browser.") r = session.post(url, data=data, allow_redirects=False)
# At this stage, we have dtsg, nh, name_action_selected, submit[Continue]
r = session.post(url, data=data)
if is_home(r.url): print(r.status_code, r.url, r.headers)
return r return r.headers.get("Location")
del data["name_action_selected"]
log.info("Starting Facebook checkup flow.")
# At this stage, we have dtsg, nh, submit[Continue]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[Continue]"]
data["submit[This was me]"] = "This Was Me"
log.info("Verifying login attempt.")
# At this stage, we have dtsg, nh, submit[This was me]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[This was me]"]
data["submit[Continue]"] = "Continue"
data["name_action_selected"] = "save_device"
log.info("Saving device again.")
# At this stage, we have dtsg, nh, submit[Continue], name_action_selected
r = session.post(url, data=data)
return r
def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]: def get_error_data(html: str) -> Optional[str]:
"""Get error code and message from a request.""" """Get error message from a request."""
code = None
try:
code = int(_util.get_url_parameter(url, "e"))
except (TypeError, ValueError):
pass
soup = bs4.BeautifulSoup( soup = bs4.BeautifulSoup(
html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error") html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
) )
return code, soup.get_text() or None # Attempt to extract and format the error string
# The error message is in the user's own language!
return " ".join(list(soup.stripped_strings)[1:3]) or None
def get_fb_dtsg(define) -> Optional[str]:
if "DTSGInitData" in define:
return define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
return define["DTSGInitialData"]["token"]
return None
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False) @attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
@@ -157,7 +196,6 @@ class Session:
_session = attr.ib(factory=session_factory, type=requests.Session) _session = attr.ib(factory=session_factory, type=requests.Session)
_counter = attr.ib(0, type=int) _counter = attr.ib(0, type=int)
_client_id = attr.ib(factory=client_id_factory, type=str) _client_id = attr.ib(factory=client_id_factory, type=str)
_logout_h = attr.ib(None, type=str)
@property @property
def user(self): def user(self):
@@ -181,6 +219,7 @@ class Session:
"fb_dtsg": self._fb_dtsg, "fb_dtsg": self._fb_dtsg,
} }
# TODO: Add ability to load previous cookies in here, to avoid 2fa flow
@classmethod @classmethod
def login( def login(
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
@@ -190,65 +229,90 @@ class Session:
Args: Args:
email: Facebook ``email``, ``id`` or ``phone number`` email: Facebook ``email``, ``id`` or ``phone number``
password: Facebook account password password: Facebook account password
on_2fa_callback: Function that will be called, in case a 2FA code is needed. on_2fa_callback: Function that will be called, in case a two factor
This should return the requested 2FA code. authentication code is needed. This should return the requested code.
Only tested with SMS codes, might not work with authentication apps.
Note: Facebook limits the amount of codes they will give you, so if you
don't receive a code, be patient, and try again later!
Example: Example:
>>> import getpass
>>> import fbchat >>> import fbchat
>>> session = fbchat.Session.login("<email or phone>", getpass.getpass()) >>> import getpass
>>> session = fbchat.Session.login(
... input("Email: "),
... getpass.getpass(),
... on_2fa_callback=lambda: input("2FA Code: ")
... )
Email: abc@gmail.com
Password: ****
2FA Code: 123456
>>> session.user.id >>> session.user.id
"1234" "1234"
""" """
session = session_factory() session = session_factory()
try: data = {
r = session.get("https://m.facebook.com/") # "jazoest": "2754",
except requests.RequestException as e: # "lsd": "AVqqqRUa",
_exception.handle_requests_error(e) "initial_request_id": "x", # any, just has to be present
soup = find_input_fields(r.text) # "timezone": "-120",
# "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=",
data = dict( # "lgnrnd": "044039_RGm9",
(elem["name"], elem["value"]) "lgnjs": "n",
for elem in soup "email": email,
if elem.has_attr("value") and elem.has_attr("name") "pass": password,
) "login": "1",
data["email"] = email "persistent": "1", # Changes the cookie type to have a long "expires"
data["pass"] = password "default_persistent": "0",
data["login"] = "Log In" }
try: try:
url = "https://m.facebook.com/login.php?login_attempt=1" # Should hit a redirect to https://www.messenger.com/
r = session.post(url, data=data) # If this does happen, the session is logged in!
except requests.RequestException as e: r = session.post(
_exception.handle_requests_error(e) "https://www.messenger.com/login/password/",
data=data,
# Usually, 'Checkpoint' will refer to 2FA allow_redirects=False,
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
if not on_2fa_callback:
raise ValueError(
"2FA code required, please add `on_2fa_callback` to .login"
)
code = on_2fa_callback()
try:
r = _2fa_helper(session, code, r)
except requests.RequestException as e:
_exception.handle_requests_error(e)
# Sometimes Facebook tries to show the user a "Save Device" dialog
if "save-device" in r.url:
try:
r = session.get("https://m.facebook.com/login/save-device/cancel/")
except requests.RequestException as e:
_exception.handle_requests_error(e)
if is_home(r.url):
return cls._from_session(session=session)
else:
code, msg = get_error_data(r.text, r.url)
raise _exception.ExternalError(
"Login failed at url {!r}".format(r.url), msg, code=code
) )
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
url = r.headers.get("Location")
# We weren't redirected, hence the email or password was wrong
if not url:
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn(error)
if "checkpoint" in url:
if not on_2fa_callback:
raise _exception.NotLoggedIn(
"2FA code required! Please supply `on_2fa_callback` to .login"
)
# Get a facebook.com url that handles the 2FA flow
# This probably works differently for Messenger-only accounts
url = _util.get_url_parameter(url, "next")
# Explicitly allow redirects
r = session.get(url, allow_redirects=True)
url = two_factor_helper(session, r, on_2fa_callback)
if not url.startswith("https://www.messenger.com/login/auth_token/"):
raise _exception.ParseError("Failed 2fa flow", data=url)
r = session.get(url, allow_redirects=False)
url = r.headers.get("Location")
if url != "https://www.messenger.com/":
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error))
try:
return cls._from_session(session=session)
except _exception.NotLoggedIn as e:
raise _exception.ParseError("Failed loading session", data=r) from e
def is_logged_in(self) -> bool: def is_logged_in(self) -> bool:
"""Send a request to Facebook to check the login status. """Send a request to Facebook to check the login status.
@@ -260,12 +324,12 @@ class Session:
>>> assert session.is_logged_in() >>> assert session.is_logged_in()
""" """
# Send a request to the login url, to see if we're directed to the home page # Send a request to the login url, to see if we're directed to the home page
url = "https://m.facebook.com/login.php?login_attempt=1"
try: try:
r = self._session.get(url, allow_redirects=False) r = self._session.get(prefix_url("/login/"), allow_redirects=False)
except requests.RequestException as e: except requests.RequestException as e:
_exception.handle_requests_error(e) _exception.handle_requests_error(e)
return "Location" in r.headers and is_home(r.headers["Location"]) _exception.handle_http_error(r.status_code)
return "https://www.messenger.com/" == r.headers.get("Location")
def logout(self) -> None: def logout(self) -> None:
"""Safely log out the user. """Safely log out the user.
@@ -275,56 +339,51 @@ class Session:
Example: Example:
>>> session.logout() >>> session.logout()
""" """
logout_h = self._logout_h data = {"fb_dtsg": self._fb_dtsg}
if not logout_h:
url = prefix_url("/bluebar/modern_settings_menu/")
try:
h_r = self._session.post(url, data={"pmid": "4"})
except requests.RequestException as e:
_exception.handle_requests_error(e)
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
url = prefix_url("/logout.php")
try: try:
r = self._session.get(url, params={"ref": "mb", "h": logout_h}) r = self._session.post(
prefix_url("/logout/"), data=data, allow_redirects=False
)
except requests.RequestException as e: except requests.RequestException as e:
_exception.handle_requests_error(e) _exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code) _exception.handle_http_error(r.status_code)
if "Location" not in r.headers:
raise _exception.FacebookError("Failed logging out, was not redirected!")
if "https://www.messenger.com/login/" != r.headers["Location"]:
raise _exception.FacebookError(
"Failed logging out, got bad redirect: {}".format(r.headers["Location"])
)
@classmethod @classmethod
def _from_session(cls, session): def _from_session(cls, session):
# TODO: Automatically set user_id when the cookie changes in the session # TODO: Automatically set user_id when the cookie changes in the session
user_id = get_user_id(session) user_id = get_user_id(session)
# Make a request to the main page to retrieve ServerJSDefine entries
try: try:
r = session.get(prefix_url("/")) r = session.get(prefix_url("/"), allow_redirects=False)
except requests.RequestException as e: except requests.RequestException as e:
_exception.handle_requests_error(e) _exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
soup = find_input_fields(r.text) define = parse_server_js_define(r.content.decode("utf-8"))
fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"}) fb_dtsg = get_fb_dtsg(define)
if fb_dtsg_element: if fb_dtsg is None:
fb_dtsg = fb_dtsg_element["value"] raise _exception.ParseError("Could not find fb_dtsg", data=define)
else: if not fb_dtsg:
# Fall back to searching with a regex # Happens when the client is not actually logged in
res = FB_DTSG_REGEX.search(r.text) raise _exception.NotLoggedIn(
if not res: "Found empty fb_dtsg, the session was probably invalid."
raise _exception.NotLoggedIn("Could not find fb_dtsg") )
fb_dtsg = res.group(1)
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0]) try:
revision = int(define["SiteData"]["client_revision"])
except TypeError:
raise _exception.ParseError("Could not find client revision", data=define)
logout_h_element = soup.find("input", {"name": "h"}) return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session)
logout_h = logout_h_element["value"] if logout_h_element else None
return cls(
user_id=user_id,
fb_dtsg=fb_dtsg,
revision=revision,
session=session,
logout_h=logout_h,
)
def get_cookies(self) -> Mapping[str, str]: def get_cookies(self) -> Mapping[str, str]:
"""Retrieve session cookies, that can later be used in `from_cookies`. """Retrieve session cookies, that can later be used in `from_cookies`.
@@ -379,10 +438,9 @@ class Session:
# update fb_dtsg token if received in response # update fb_dtsg token if received in response
if "jsmods" in j: if "jsmods" in j:
define = _util.get_jsmods_define(j["jsmods"]["define"]) define = _util.get_jsmods_define(j["jsmods"]["define"])
if "DTSGInitData" in define: fb_dtsg = get_fb_dtsg(define)
self._fb_dtsg = define["DTSGInitData"]["token"] if fb_dtsg:
elif "DTSGInitialData" in define: self._fb_dtsg = fb_dtsg
self._fb_dtsg = define["DTSGInitialData"]["token"]
try: try:
return j["payload"] return j["payload"]

View File

@@ -313,13 +313,13 @@ class ThreadABC(metaclass=abc.ABCMeta):
def search_messages( def search_messages(
self, query: str, limit: int self, query: str, limit: int
) -> Iterable["_models.MessageSnippet"]: ) -> Iterable[_models.MessageSnippet]:
"""Find and get message IDs by query. """Find and get message IDs by query.
Warning! If someone send a message to the thread that matches the query, while Warning! If someone send a message to the thread that matches the query, while
we're searching, some snippets will get returned twice. we're searching, some snippets will get returned twice.
This is fundamentally unfixable, it's just how the endpoint is implemented. This is fundamentally not fixable, it's just how the endpoint is implemented.
The returned message snippets are ordered by last sent first. The returned message snippets are ordered by last sent first.

View File

@@ -4,7 +4,8 @@ from ._abc import ThreadABC
from . import _user from . import _user
from .._common import attrs_default from .._common import attrs_default
from .. import _util, _session, _graphql, _models from .. import _util, _session, _graphql, _models
from typing import Sequence, Iterable, Set, Mapping
from typing import Sequence, Iterable, Set, Mapping, Optional
@attrs_default @attrs_default
@@ -179,31 +180,31 @@ class GroupData(Group):
""" """
#: The group's picture #: The group's picture
photo = attr.ib(None, type="_models.Image") photo = attr.ib(None, type=Optional[_models.Image])
#: The name of the group #: The name of the group
name = attr.ib(None, type=str) name = attr.ib(None, type=Optional[str])
#: When the group was last active / when the last message was sent #: When the group was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime) last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the group #: Number of messages in the group
message_count = attr.ib(None, type=int) message_count = attr.ib(None, type=Optional[int])
#: Set `Plan` #: Set `Plan`
plan = attr.ib(None, type="_models.PlanData") plan = attr.ib(None, type=Optional[_models.PlanData])
#: The group thread's participant user ids #: The group thread's participant user ids
participants = attr.ib(factory=set, type=Set[str]) participants = attr.ib(factory=set, type=Set[str])
#: A dictionary, containing user nicknames mapped to their IDs #: A dictionary, containing user nicknames mapped to their IDs
nicknames = attr.ib(factory=dict, type=Mapping[str, str]) nicknames = attr.ib(factory=dict, type=Mapping[str, str])
#: The groups's message color #: The groups's message color
color = attr.ib(None, type=str) color = attr.ib(None, type=Optional[str])
#: The groups's default emoji #: The groups's default emoji
emoji = attr.ib(None, type=str) emoji = attr.ib(None, type=Optional[str])
# User ids of thread admins # User ids of thread admins
admins = attr.ib(factory=set, type=Set[str]) admins = attr.ib(factory=set, type=Set[str])
# True if users need approval to join # True if users need approval to join
approval_mode = attr.ib(None, type=bool) approval_mode = attr.ib(None, type=Optional[bool])
# Set containing user IDs requesting to join # Set containing user IDs requesting to join
approval_requests = attr.ib(factory=set, type=Set[str]) approval_requests = attr.ib(factory=set, type=Set[str])
# Link for joining group # Link for joining group
join_link = attr.ib(None, type=str) join_link = attr.ib(None, type=Optional[str])
@classmethod @classmethod
def _from_graphql(cls, session, data): def _from_graphql(cls, session, data):

View File

@@ -4,6 +4,8 @@ from ._abc import ThreadABC
from .._common import attrs_default from .._common import attrs_default
from .. import _session, _models from .. import _session, _models
from typing import Optional
@attrs_default @attrs_default
class Page(ThreadABC): class Page(ThreadABC):
@@ -35,25 +37,25 @@ class PageData(Page):
""" """
#: The page's picture #: The page's picture
photo = attr.ib(type="_models.Image") photo = attr.ib(type=_models.Image)
#: The name of the page #: The name of the page
name = attr.ib(type=str) name = attr.ib(type=str)
#: When the thread was last active / when the last message was sent #: When the thread was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime) last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the thread #: Number of messages in the thread
message_count = attr.ib(None, type=int) message_count = attr.ib(None, type=Optional[int])
#: Set `Plan` #: Set `Plan`
plan = attr.ib(None, type="_models.PlanData") plan = attr.ib(None, type=Optional[_models.PlanData])
#: The page's custom URL #: The page's custom URL
url = attr.ib(None, type=str) url = attr.ib(None, type=Optional[str])
#: The name of the page's location city #: The name of the page's location city
city = attr.ib(None, type=str) city = attr.ib(None, type=Optional[str])
#: Amount of likes the page has #: Amount of likes the page has
likes = attr.ib(None, type=int) likes = attr.ib(None, type=Optional[int])
#: Some extra information about the page #: Some extra information about the page
sub_title = attr.ib(None, type=str) sub_title = attr.ib(None, type=Optional[str])
#: The page's category #: The page's category
category = attr.ib(None, type=str) category = attr.ib(None, type=Optional[str])
@classmethod @classmethod
def _from_graphql(cls, session, data): def _from_graphql(cls, session, data):

View File

@@ -4,6 +4,8 @@ from ._abc import ThreadABC
from .._common import log, attrs_default from .._common import log, attrs_default
from .. import _util, _session, _models from .. import _util, _session, _models
from typing import Optional
GENDERS = { GENDERS = {
# For standard requests # For standard requests
@@ -103,7 +105,7 @@ class UserData(User):
""" """
#: The user's picture #: The user's picture
photo = attr.ib(type="_models.Image") photo = attr.ib(type=_models.Image)
#: The name of the user #: The name of the user
name = attr.ib(type=str) name = attr.ib(type=str)
#: Whether the user and the client are friends #: Whether the user and the client are friends
@@ -111,27 +113,27 @@ class UserData(User):
#: The users first name #: The users first name
first_name = attr.ib(type=str) first_name = attr.ib(type=str)
#: The users last name #: The users last name
last_name = attr.ib(None, type=str) last_name = attr.ib(None, type=Optional[str])
#: Datetime when the thread was last active / when the last message was sent #: When the thread was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime) last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the thread #: Number of messages in the thread
message_count = attr.ib(None, type=int) message_count = attr.ib(None, type=Optional[int])
#: Set `Plan` #: Set `Plan`
plan = attr.ib(None, type="_models.PlanData") plan = attr.ib(None, type=Optional[_models.PlanData])
#: The profile URL. ``None`` for Messenger-only users #: The profile URL. ``None`` for Messenger-only users
url = attr.ib(None, type=str) url = attr.ib(None, type=Optional[str])
#: The user's gender #: The user's gender
gender = attr.ib(None, type=str) gender = attr.ib(None, type=Optional[str])
#: From 0 to 1. How close the client is to the user #: From 0 to 1. How close the client is to the user
affinity = attr.ib(None, type=float) affinity = attr.ib(None, type=Optional[float])
#: The user's nickname #: The user's nickname
nickname = attr.ib(None, type=str) nickname = attr.ib(None, type=Optional[str])
#: The clients nickname, as seen by the user #: The clients nickname, as seen by the user
own_nickname = attr.ib(None, type=str) own_nickname = attr.ib(None, type=Optional[str])
#: The message color #: The message color
color = attr.ib(None, type=str) color = attr.ib(None, type=Optional[str])
#: The default emoji #: The default emoji
emoji = attr.ib(None, type=str) emoji = attr.ib(None, type=Optional[str])
@staticmethod @staticmethod
def _get_other_user(data): def _get_other_user(data):

View File

@@ -9,15 +9,12 @@ from . import _exception
from typing import Iterable, Optional, Any, Mapping, Sequence from typing import Iterable, Optional, Any, Mapping, Sequence
#: Default list of user agents
USER_AGENTS = [ def int_or_none(inp: Any) -> Optional[int]:
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36", try:
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/601.1.10 (KHTML, like Gecko) Version/8.0.5 Safari/601.1.10", return int(inp)
"Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36", except Exception:
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1", return None
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
]
def get_limits(limit: Optional[int], max_limit: int) -> Iterable[int]: def get_limits(limit: Optional[int], max_limit: int) -> Iterable[int]:

View File

@@ -1,6 +1,10 @@
[pytest] [pytest]
xfail_strict = true xfail_strict = true
markers =
online: Online tests, that require a user account set up. Meant to be used \
manually, to check whether Facebook has broken something.
addopts = addopts =
--strict --strict
-m "not online"
testpaths = tests testpaths = tests
filterwarnings = error filterwarnings = error

View File

@@ -123,6 +123,37 @@ def test_title_set(session):
) == parse_delta(session, data) ) == parse_delta(session, data)
def test_title_removed(session):
data = {
"irisSeqId": "11223344",
"irisTags": ["DeltaThreadName", "is_from_iris_fanout"],
"messageMetadata": {
"actorFbId": "3456",
"adminText": "You removed the group name.",
"folderId": {"systemFolderId": "INBOX"},
"messageId": "mid.$XYZ",
"offlineThreadingId": "1122334455",
"skipBumpThread": False,
"tags": [],
"threadKey": {"threadFbId": "4321"},
"threadReadStateEffect": "KEEP_AS_IS",
"timestamp": "1500000000000",
"unsendType": "deny_log_message",
},
"name": "",
"participants": ["1234", "2345", "3456", "4567"],
"requestContext": {"apiArgs": {}},
"tqSeqId": "1111",
"class": "ThreadName",
}
assert TitleSet(
author=User(session=session, id="3456"),
thread=Group(session=session, id="4321"),
title=None,
at=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc),
) == parse_delta(session, data)
def test_forced_fetch(session): def test_forced_fetch(session):
data = { data = {
"forceInsert": False, "forceInsert": False,

28
tests/online/conftest.py Normal file
View File

@@ -0,0 +1,28 @@
import fbchat
import pytest
import logging
import getpass
@pytest.fixture(scope="session")
def session(pytestconfig):
session_cookies = pytestconfig.cache.get("session_cookies", None)
try:
session = fbchat.Session.from_cookies(session_cookies)
except fbchat.FacebookError:
logging.exception("Error while logging in with cookies!")
session = fbchat.Session.login(input("Email: "), getpass.getpass("Password: "))
yield session
pytestconfig.cache.set("session_cookies", session.get_cookies())
@pytest.fixture
def client(session):
return fbchat.Client(session=session)
@pytest.fixture
def listener(session):
return fbchat.Listener(session=session, chat_on=False, foreground=False)

View File

@@ -0,0 +1,94 @@
import pytest
import fbchat
import os
pytestmark = pytest.mark.online
def test_fetch(client):
client.fetch_users()
def test_search_for_users(client):
list(client.search_for_users("test", 10))
def test_search_for_pages(client):
list(client.search_for_pages("test", 100))
def test_search_for_groups(client):
list(client.search_for_groups("test", 1000))
def test_search_for_threads(client):
list(client.search_for_threads("test", 1000))
with pytest.raises(fbchat.HTTPError, match="rate limited"):
list(client.search_for_threads("test", 10000))
def test_message_search(client):
list(client.search_messages("test", 500))
def test_fetch_thread_info(client):
list(client.fetch_thread_info(["4"]))[0]
def test_fetch_threads(client):
list(client.fetch_threads(20))
list(client.fetch_threads(200))
def test_undocumented(client):
client.fetch_unread()
client.fetch_unseen()
@pytest.mark.skip(reason="need a way to get an image id")
def test_fetch_image_url(client):
client.fetch_image_url("TODO")
@pytest.fixture
def open_resource(pytestconfig):
def get_resource_inner(filename):
path = os.path.join(pytestconfig.rootdir, "tests", "resources", filename)
return open(path, "rb")
return get_resource_inner
def test_upload_image(client, open_resource):
with open_resource("image.png") as f:
_ = client.upload([("image.png", f, "image/png")])
def test_upload_many(client, open_resource):
with open_resource("image.png") as f_png, open_resource(
"image.jpg"
) as f_jpg, open_resource("image.gif") as f_gif, open_resource(
"file.json"
) as f_json, open_resource(
"file.txt"
) as f_txt, open_resource(
"audio.mp3"
) as f_mp3, open_resource(
"video.mp4"
) as f_mp4:
_ = client.upload(
[
("image.png", f_png, "image/png"),
("image.jpg", f_jpg, "image/jpeg"),
("image.gif", f_gif, "image/gif"),
("file.json", f_json, "application/json"),
("file.txt", f_txt, "text/plain"),
("audio.mp3", f_mp3, "audio/mpeg"),
("video.mp4", f_mp4, "video/mp4"),
]
)
# def test_mark_as_read(client):
# client.mark_as_read([thread1, thread2])

View File

@@ -1,15 +1,47 @@
import datetime import datetime
import pytest import pytest
from fbchat import ParseError
from fbchat._session import ( from fbchat._session import (
parse_server_js_define,
base36encode, base36encode,
prefix_url, prefix_url,
generate_message_id, generate_message_id,
session_factory,
client_id_factory, client_id_factory,
is_home, find_form_request,
get_error_data, get_error_data,
) )
def test_parse_server_js_define():
html = """
some data;require("TimeSliceImpl").guard(function(){(require("ServerJSDefine")).handleDefines([["DTSGInitialData",[],{"token":"123"},100]])
<script>require("TimeSliceImpl").guard(function() {require("ServerJSDefine").handleDefines([["DTSGInitData",[],{"token":"123","async_get_token":"12345"},3333]])
</script>
other irrelevant data
"""
define = parse_server_js_define(html)
assert define == {
"DTSGInitialData": {"token": "123"},
"DTSGInitData": {"async_get_token": "12345", "token": "123"},
}
def test_parse_server_js_define_error():
with pytest.raises(ParseError, match="Could not find any"):
parse_server_js_define("")
html = 'function(){(require("ServerJSDefine")).handleDefines([{"a": function(){}}])'
with pytest.raises(ParseError, match="Invalid"):
parse_server_js_define(html + html)
html = 'function(){require("ServerJSDefine").handleDefines({"a": "b"})'
with pytest.raises(ParseError, match="Invalid"):
parse_server_js_define(html + html)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"number,expected", "number,expected",
[(1, "1"), (10, "a"), (123, "3f"), (1000, "rs"), (123456789, "21i3v9")], [(1, "1"), (10, "a"), (123, "3f"), (1000, "rs"), (123456789, "21i3v9")],
@@ -19,8 +51,10 @@ def test_base36encode(number, expected):
def test_prefix_url(): def test_prefix_url():
assert prefix_url("/") == "https://www.facebook.com/" static_url = "https://upload.messenger.com/"
assert prefix_url("/abc") == "https://www.facebook.com/abc" assert prefix_url(static_url) == static_url
assert prefix_url("/") == "https://www.messenger.com/"
assert prefix_url("/abc") == "https://www.messenger.com/abc"
def test_generate_message_id(): def test_generate_message_id():
@@ -28,46 +62,115 @@ def test_generate_message_id():
assert generate_message_id(datetime.datetime.utcnow(), "def") assert generate_message_id(datetime.datetime.utcnow(), "def")
def test_session_factory():
session = session_factory()
assert session.headers
def test_client_id_factory(): def test_client_id_factory():
# Returns random output, so hard to test more thoroughly # Returns random output, so hard to test more thoroughly
assert client_id_factory() assert client_id_factory()
def test_is_home(): def test_find_form_request():
assert not is_home("https://m.facebook.com/login/?...") html = """
assert is_home("https://m.facebook.com/home.php?refsrc=...") <div>
<form action="/checkpoint/?next=https%3A%2F%2Fwww.messenger.com%2F" class="checkpoint" id="u_0_c" method="post" onsubmit="">
<input autocomplete="off" name="jazoest" type="hidden" value="some-number" />
<input autocomplete="off" name="fb_dtsg" type="hidden" value="some-base64" />
<input class="hidden_elem" data-default-submit="true" name="submit[Continue]" type="submit" />
<input autocomplete="off" name="nh" type="hidden" value="some-hex" />
<div class="_4-u2 _5x_7 _p0k _5x_9 _4-u8">
<div class="_2e9n" id="u_0_d">
<strong id="u_0_e">Two factor authentication required</strong>
<div id="u_0_f"></div>
</div>
<div class="_2ph_">
<input autocomplete="off" name="no_fido" type="hidden" value="true" />
<div class="_50f4">You've asked us to require a 6-digit login code when anyone tries to access your account from a new device or browser.</div>
<div class="_3-8y _50f4">Enter the 6-digit code from your Code Generator or 3rd party app below.</div>
<div class="_2pie _2pio">
<span>
<input aria-label="Login code" autocomplete="off" class="inputtext" id="approvals_code" name="approvals_code" placeholder="Login code" tabindex="1" type="text" />
</span>
</div>
</div>
<div class="_5hzs" id="checkpointBottomBar">
<div class="_2s5p">
<button class="_42ft _4jy0 _2kak _4jy4 _4jy1 selected _51sy" id="checkpointSubmitButton" name="submit[Continue]" type="submit" value="Continue">Continue</button>
</div>
<div class="_2s5q">
<div class="_25b6" id="u_0_g">
<a href="#" id="u_0_h" role="button">Need another way to authenticate?</a>
</div>
</div>
</div>
</div>
</form>
</div>
"""
url, data = find_form_request(html)
assert url.startswith("https://www.facebook.com/checkpoint/")
assert {
"jazoest": "some-number",
"fb_dtsg": "some-base64",
"nh": "some-hex",
"no_fido": "true",
"approvals_code": "[missing]",
"submit[Continue]": "Continue",
} == data
def test_find_form_request_error():
with pytest.raises(ParseError, match="Could not find form to submit"):
assert find_form_request("")
with pytest.raises(ParseError, match="Could not find url to submit to"):
assert find_form_request("<form></form>")
@pytest.mark.skip
def test_get_error_data(): def test_get_error_data():
html = """<?xml version="1.0" encoding="utf-8"?> html = """<!DOCTYPE html>
<!DOCTYPE html PUBLIC "-//WAPFORUM//DTD XHTML Mobile 1.0//EN" "http://www.wapforum.org/DTD/xhtml-mobile10.dtd"> <html lang="da" id="facebook" class="no_js">
<html xmlns="http://www.w3.org/1999/xhtml">
<head> <head>
<title>Log in to Facebook | Facebook</title> <meta charset="utf-8" />
<meta name="referrer" content="origin-when-crossorigin" id="meta_referrer" /> <title id="pageTitle">Messenger</title>
<style type="text/css">...</style> <meta name="referrer" content="default" id="meta_referrer" />
<meta name="description" content="..." />
<link rel="canonical" href="https://www.facebook.com/login/" />
</head> </head>
<body tabindex="0" class="b c d e f g"> <body class="_605a x1 Locale_da_DK" dir="ltr">
<div class="h"><div id="viewport">...<div id="objects_container"><div class="g" id="root" role="main"> <div class="_3v_o" id="XMessengerDotComLoginViewPlaceholder">
<table class="x" role="presentation"><tbody><tr><td class="y"> <form id="login_form" action="/login/password/" method="post" onsubmit="">
<div class="z ba bb" style="" id="login_error"> <input type="hidden" name="jazoest" value="2222" autocomplete="off" />
<div class="bc"> <input type="hidden" name="lsd" value="xyz-abc" autocomplete="off" />
<span>The password you entered is incorrect. <a href="/recover/initiate/?ars=facebook_login_pw_error&amp;email=abc@mail.com&amp;__ccr=XXX" class="bd" aria-label="Have you forgotten your password?">Did you forget your password?</a></span> <div class="_3403 _3404">
<div>Type your password again</div>
<div>The password you entered is incorrect. <a href="https://www.facebook.com/recover/initiate?ars=facebook_login_pw_error">Did you forget your password?</a></div>
</div> </div>
<div id="loginform">
<input type="hidden" autocomplete="off" id="initial_request_id" name="initial_request_id" value="xxx" />
<input type="hidden" autocomplete="off" name="timezone" value="" id="u_0_1" />
<input type="hidden" autocomplete="off" name="lgndim" value="" id="u_0_2" />
<input type="hidden" name="lgnrnd" value="aaa" />
<input type="hidden" id="lgnjs" name="lgnjs" value="n" />
<input type="text" class="inputtext _55r1 _43di" id="email" name="email" placeholder="E-mail or phone number" value="some@email.com" tabindex="0" aria-label="E-mail or phone number" />
<input type="password" class="inputtext _55r1 _43di" name="pass" id="pass" tabindex="0" placeholder="Password" aria-label="Password" />
<button value="1" class="_42ft _4jy0 _2m_r _43dh _4jy4 _517h _51sy" id="loginbutton" name="login" tabindex="0" type="submit">Continue</button>
<div class="_43dj">
<div class="uiInputLabel clearfix">
<label class="uiInputLabelInput">
<input type="checkbox" value="1" name="persistent" tabindex="0" class="" id="u_0_0" />
<span class=""></span>
</label>
<label for="u_0_0" class="uiInputLabelLabel">Stay logged in</label>
</div>
<input type="hidden" autocomplete="off" id="default_persistent" name="default_persistent" value="0" />
</div>
</form>
</div> </div>
...
</td></tr></tbody></table>
<div style="display:none"></div><span><img src="https://facebook.com/security/hsts-pixel.gif" width="0" height="0" style="display:none" /></span>
</div></div><div></div></div></div>
</body> </body>
</html> </html>
""" """
url = "https://m.facebook.com/login/?email=abc@mail.com&li=XXX&e=1348092"
msg = "The password you entered is incorrect. Did you forget your password?" msg = "The password you entered is incorrect. Did you forget your password?"
assert (1348092, msg) == get_error_data(html) assert msg == get_error_data(html)