Compare commits

...

34 Commits

Author SHA1 Message Date
Mads Marquart
520258e339 Bump version: 2.0.0a3 -> 2.0.0a4 2020-06-07 12:52:59 +02:00
Mads Marquart
435dfaf6d8 Better GraphQL error reporting 2020-06-07 12:48:21 +02:00
Mads Marquart
cf0e1e3a93 Test on_2fa_callback with authentication applications 2020-06-07 12:37:36 +02:00
Mads Marquart
2319fc7c4a Handle early return from two_factor_helper 2020-06-07 12:35:24 +02:00
Mads Marquart
b35240bdda Handle locked accounts 2020-06-07 12:35:07 +02:00
Mads Marquart
6141cc5a41 Update SERVER_JS_DEFINE_REGEX 2020-06-07 12:04:51 +02:00
Mads Marquart
b1e438dae1 Few fixes to 2FA flow 2020-05-16 19:30:03 +02:00
Mads Marquart
3c0f411be7 Fix typo in 2FA logic 2020-05-10 12:01:41 +02:00
Mads Marquart
9ad0090b02 Merge pull request #563 from smilexs4/patch-2
Fix typo in example
2020-05-10 11:53:56 +02:00
Mads Marquart
bec151a560 Merge pull request #562 from smilexs4/patch-1
Fix typo in example
2020-05-10 11:53:39 +02:00
smilexs4
2087182ecf Update interract.py
Changed fbchat.Message parameter from session to thread
2020-05-08 18:45:25 +03:00
smilexs4
09627b71ae Update fetch.py
Solved the exception:

TypeError: __init__() takes 1 positional argument but 2 were given
2020-05-08 17:08:01 +03:00
Mads Marquart
078bf9fc16 Add send online tests 2020-05-07 12:26:39 +02:00
Mads Marquart
d33e36866d Finish Client online tests 2020-05-07 12:10:45 +02:00
Mads Marquart
2a382ffaed Fix Client.mark_as_(un)read, and add tests 2020-05-07 11:59:05 +02:00
Mads Marquart
18a3ffb90d Fix Client.fetch_image_url in some cases
Sometimes (or always?), jsmods require includes a JS version specifier.

This means we couldn't find the url
2020-05-07 11:46:42 +02:00
Mads Marquart
db284cefdf Bump version: 2.0.0a2 -> 2.0.0a3 2020-05-07 11:10:42 +02:00
Mads Marquart
d11f417caa Make logins persistent 2020-05-07 10:56:47 +02:00
Mads Marquart
3b71258f2c Fix tests 2020-05-07 10:23:29 +02:00
Mads Marquart
81584d328b Add more session tests and small error improvements 2020-05-07 10:15:51 +02:00
Mads Marquart
7be2acad7d Initial re-add of 2FA 2020-05-06 23:34:27 +02:00
Mads Marquart
079d4093c4 Use messenger.com URLs instead of facebook.com
This should allow people who have only created a messenger account to
log in.

Also parse required `fb_dtsg` and `client_revision` values better.

The 2-fa flow is removed for now, I'll re-add it later.
2020-05-06 21:57:24 +02:00
Mads Marquart
cce947b18c Fix docs warnings 2020-05-06 13:31:09 +02:00
Mads Marquart
2545a01450 Re-add a few online tests, to easily check when Facebook breaks stuff 2020-05-06 13:31:09 +02:00
Mads Marquart
5d763dfbce Merge pull request #559 from xaadu/patch-1
Fix mistake in session handling example
2020-05-06 11:33:21 +02:00
Mads Marquart
0981be42b9 Fix errors in examples 2020-05-06 11:32:22 +02:00
Abdullah Zayed
93b71bf198 First Object then File Pointer
json.dump() receives object as first argument and File Pointer as 2nd argument.
2020-04-28 12:58:19 +06:00
Mads Marquart
af3758c8a9 Fix TitleSet.title attribute 2020-03-13 11:21:33 +01:00
Mads Marquart
f64c487a2d Bump version: 2.0.0a1 -> 2.0.0a2 2020-03-11 15:45:02 +01:00
Mads Marquart
11534604fe Remove user agent randomization
Caused problems with logging in, and didn't really help on anything
2020-03-11 15:44:34 +01:00
Mads Marquart
9990952fa6 Add Connect and Disconnect events 2020-03-11 15:27:00 +01:00
Mads Marquart
7ee7361646 Clean up event parsing 2020-03-11 15:10:25 +01:00
Mads Marquart
89c6af516c Fix various documentation mistakes 2020-03-11 15:00:50 +01:00
Mads Marquart
c27f599e37 Fix type specifiers in models 2020-03-11 14:43:28 +01:00
37 changed files with 892 additions and 375 deletions

View File

@@ -15,4 +15,6 @@ python:
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py
fail_on_warning: true
# Disabled, until we can find a way to get sphinx-autodoc-typehints play nice with our
# module renaming!
fail_on_warning: false

View File

@@ -4,5 +4,8 @@ Exceptions
.. autoexception:: FacebookError()
.. autoexception:: HTTPError()
.. autoexception:: ParseError()
.. autoexception:: NotLoggedIn()
.. autoexception:: ExternalError()
.. autoexception:: GraphQLError()
.. autoexception:: InvalidParameters()
.. autoexception:: PleaseRefresh()

View File

@@ -1,3 +1,4 @@
.. highlight:: sh
.. See README.rst for explanation of these markers
.. include:: ../README.rst

View File

@@ -46,7 +46,7 @@ A thread basically just means "something I can chat with", but more precisely, i
- The conversation between you and a single Facebook user (`User`)
- The conversation between you and a Facebook Page (`Page`)
You can get your own user ID with `Session.user.id`.
You can get your own user ID from `Session.user` with ``session.user.id``.
Getting the ID of a specific group thread is fairly trivial, you only need to login to `<https://www.messenger.com/>`_, click on the group you want to find the ID of, and then read the id from the address bar.
The URL will look something like this: ``https://www.messenger.com/t/1234567890``, where ``1234567890`` would be the ID of the group.
@@ -111,19 +111,19 @@ Some functionality, like adding users to a `Group`, or blocking a `User`, logica
With that out of the way, let's see some examples!
The simplest way of interracting with a thread is by sending a message::
The simplest way of interacting with a thread is by sending a message::
# Send a message to the user
message = user.send_text("test message")
There are many types of messages you can send, see the full API documentation for more.
Notice how we held on to the sent message? The return type i a `Message` instance, so you can interract with it afterwards::
Notice how we held on to the sent message? The return type i a `Message` instance, so you can interact with it afterwards::
# React to the message with the 😍 emoji
message.react("😍")
Besides sending messages, you can also interract with threads in other ways. An example is to change the thread color::
Besides sending messages, you can also interact with threads in other ways. An example is to change the thread color::
# Will change the thread color to the default blue
thread.set_color("#0084ff")

View File

@@ -2,7 +2,7 @@ import fbchat
session = fbchat.Session.login("<email>", "<password>")
client = fbchat.Client(session)
client = fbchat.Client(session=session)
# Fetches a list of all users you're currently chatting with, as `User` objects
users = client.fetch_all_users()
@@ -65,5 +65,5 @@ print("thread's name: {}".format(thread.name))
images = list(thread.fetch_images(limit=20))
for image in images:
if isinstance(image, fbchat.ImageAttachment):
url = c.fetch_image_url(image.id)
url = client.fetch_image_url(image.id)
print(url)

View File

@@ -60,7 +60,7 @@ thread.set_color("#0084ff")
# Will change the thread emoji to `👍`
thread.set_emoji("👍")
message = fbchat.Message(session=session, id="<message id>")
message = fbchat.Message(thread=thread, id="<message id>")
# Will react to a message with a 😍 emoji
message.react("😍")

View File

@@ -80,7 +80,7 @@ def on_person_removed(sender, event: fbchat.PersonRemoved):
return
if event.author.id != session.user.id:
print(f"{event.removed.id} got removed. They will be re-added")
event.thread.add_participants([removed.id])
event.thread.add_participants([event.removed.id])
# Login, and start listening for events

View File

@@ -5,7 +5,7 @@ def on_message(event):
# We can only kick people from group chats, so no need to try if it's a user chat
if not isinstance(event.thread, fbchat.Group):
return
if message.text == "Remove me!":
if event.message.text == "Remove me!":
print(f"{event.author.id} will be removed from {event.thread.id}")
event.thread.remove_participant(event.author.id)

View File

@@ -18,7 +18,7 @@ def load_cookies(filename):
def save_cookies(filename, cookies):
with open(filename, "w") as f:
json.dump(f, cookies)
json.dump(cookies, f)
def load_session(cookies):

View File

@@ -65,6 +65,7 @@ from ._models import (
EmojiSize,
Mention,
Message,
MessageSnippet,
MessageData,
)
@@ -74,6 +75,8 @@ from ._events import (
Event,
UnknownEvent,
ThreadEvent,
Connect,
Disconnect,
# _client_payload
ReactionEvent,
UserStatusEvent,
@@ -115,7 +118,7 @@ from ._listen import Listener
from ._client import Client
__version__ = "2.0.0a1"
__version__ = "2.0.0a4"
__all__ = ("Session", "Listener", "Client")

View File

@@ -33,10 +33,10 @@ class Client:
But does not include deactivated, deleted or memorialized users (logically,
since you can't chat with those).
The order these are returned is arbitary.
The order these are returned is arbitrary.
Example:
Get the name of an arbitary user that you're currently chatting with.
Get the name of an arbitrary user that you're currently chatting with.
>>> users = client.fetch_users()
>>> users[0].name
@@ -211,7 +211,7 @@ class Client:
Warning! If someone send a message to a thread that matches the query, while
we're searching, some snippets will get returned twice, and some will be lost.
This is fundamentally unfixable, it's just how the endpoint is implemented.
This is fundamentally not fixable, it's just how the endpoint is implemented.
Args:
query: Text to search for
@@ -524,7 +524,9 @@ class Client:
data = {"voice_clip": voice_clip}
j = self.session._payload_post(
"https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict
"https://upload.messenger.com/ajax/mercury/upload.php",
data,
files=file_dict,
)
if len(j["metadata"]) != len(file_dict):
@@ -556,7 +558,7 @@ class Client:
"shouldSendReadReceipt": "true",
}
for threads in threads:
for thread in threads:
data["ids[{}]".format(thread.id)] = "true" if read else "false"
j = self.session._payload_post("/ajax/mercury/change_read_status.php", data)

View File

@@ -24,8 +24,7 @@ class Typing(ThreadEvent):
return cls(author=author, thread=author, status=status)
@classmethod
def _parse(cls, session, data):
# TODO: Rename this method
def _parse_thread_typing(cls, session, data):
author = _threads.User(session=session, id=str(data["sender_fbid"]))
thread = _threads.Group(session=session, id=str(data["thread"]))
status = data["state"] == 1
@@ -68,6 +67,25 @@ class Presence(Event):
return cls(statuses=statuses, full=data["list_type"] == "full")
@attrs_event
class Connect(Event):
"""The client was connected to Facebook.
This is not guaranteed to be triggered the same amount of times `Disconnect`!
"""
@attrs_event
class Disconnect(Event):
"""The client lost the connection to Facebook.
This is not guaranteed to be triggered the same amount of times `Connect`!
"""
#: The reason / error string for the disconnect
reason = attr.ib(type=str)
def parse_events(session, topic, data):
# See Mqtt._configure_connect_options for information about these topics
try:
@@ -90,7 +108,7 @@ def parse_events(session, topic, data):
) from e
elif topic == "/thread_typing":
yield Typing._parse(session, data)
yield Typing._parse_thread_typing(session, data)
elif topic == "/orca_typing_notifications":
yield Typing._parse_orca(session, data)

View File

@@ -1,5 +1,4 @@
import attr
import abc
from .._common import kw_only
from .. import _exception, _util, _threads
@@ -10,14 +9,9 @@ attrs_event = attr.s(slots=True, kw_only=kw_only, frozen=True)
@attrs_event
class Event(metaclass=abc.ABCMeta):
class Event:
"""Base class for all events."""
@classmethod
@abc.abstractmethod
def _parse(cls, session, data):
raise NotImplementedError
@staticmethod
def _get_thread(session, data):
# TODO: Handle pages? Is it even possible?
@@ -60,3 +54,9 @@ class ThreadEvent(Event):
thread = cls._get_thread(session, metadata)
at = _util.millis_to_datetime(int(metadata["timestamp"]))
return author, thread, at
@classmethod
def _parse_fetch(cls, session, data):
author = _threads.User(session=session, id=data["message_sender"]["id"])
at = _util.millis_to_datetime(int(data["timestamp_precise"]))
return author, at

View File

@@ -54,15 +54,15 @@ class TitleSet(ThreadEvent):
"""Somebody changed a group's title."""
thread = attr.ib(type="_threads.Group") # Set the correct type
#: The new title
title = attr.ib(type=str)
#: The new title. If ``None``, the title was removed
title = attr.ib(type=Optional[str])
#: When the title was set
at = attr.ib(type=datetime.datetime)
@classmethod
def _parse(cls, session, data):
author, thread, at = cls._parse_metadata(session, data)
return cls(author=author, thread=thread, title=data["name"], at=at)
return cls(author=author, thread=thread, title=data["name"] or None, at=at)
@attrs_event

View File

@@ -1,7 +1,7 @@
import attr
import requests
from typing import Any
from typing import Any, Optional
# Not frozen, since that doesn't work in PyPy
@attr.s(slots=True, auto_exc=True)
@@ -20,7 +20,7 @@ class HTTPError(FacebookError):
"""Base class for errors with the HTTP(s) connection to Facebook."""
#: The returned HTTP status code, if relevant
status_code = attr.ib(None, type=int)
status_code = attr.ib(None, type=Optional[int])
def __str__(self):
if not self.status_code:
@@ -58,7 +58,7 @@ class ExternalError(FacebookError):
#: The error message that Facebook returned (Possibly in the user's own language)
description = attr.ib(type=str)
#: The error code that Facebook returned
code = attr.ib(None, type=int)
code = attr.ib(None, type=Optional[int])
def __str__(self):
if self.code:
@@ -73,7 +73,7 @@ class GraphQLError(ExternalError):
# TODO: Handle multiple errors
#: Query debug information
debug_info = attr.ib(None, type=str)
debug_info = attr.ib(None, type=Optional[str])
def __str__(self):
if self.debug_info:
@@ -124,11 +124,11 @@ def handle_graphql_errors(j):
errors = j["errors"]
if errors:
error = errors[0] # TODO: Handle multiple errors
# TODO: Use `severity` and `description`
# TODO: Use `severity`
raise GraphQLError(
# TODO: What data is always available?
message=error.get("summary", "Unknown error"),
description=error.get("message", ""),
description=error.get("message") or error.get("description") or "",
code=error.get("code"),
debug_info=error.get("debug_info"),
)

View File

@@ -5,10 +5,10 @@ import requests
from ._common import log, kw_only
from . import _util, _exception, _session, _graphql, _events
from typing import Iterable, Optional, Mapping
from typing import Iterable, Optional, Mapping, List
HOST = "edge-chat.facebook.com"
HOST = "edge-chat.messenger.com"
TOPICS = [
# Things that happen in chats (e.g. messages)
@@ -118,7 +118,7 @@ class Listener:
_mqtt = attr.ib(factory=mqtt_factory, type=paho.mqtt.client.Client)
_sync_token = attr.ib(None, type=Optional[str])
_sequence_id = attr.ib(None, type=Optional[int])
_tmp_events = attr.ib(factory=list, type=Iterable[_events.Event])
_tmp_events = attr.ib(factory=list, type=List[_events.Event])
def __attrs_post_init__(self):
# Configure callbacks
@@ -152,6 +152,7 @@ class Listener:
"The MQTT listener was disconnected for too long,"
" events may have been lost"
)
# TODO: Find a way to tell the user that they may now be missing events
self._sync_token = None
self._sequence_id = None
return False
@@ -270,10 +271,10 @@ class Listener:
headers = {
"Cookie": get_cookie_header(
self.session._session, "https://edge-chat.facebook.com/chat"
self.session._session, "https://edge-chat.messenger.com/chat"
),
"User-Agent": self.session._session.headers["User-Agent"],
"Origin": "https://www.facebook.com",
"Origin": "https://www.messenger.com",
"Host": HOST,
}
@@ -282,11 +283,12 @@ class Listener:
path="/chat?sid={}".format(session_id), headers=headers
)
def _reconnect(self):
def _reconnect(self) -> bool:
# Try reconnecting
self._configure_connect_options()
try:
self._mqtt.reconnect()
return True
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
@@ -296,6 +298,7 @@ class Listener:
log.debug("MQTT reconnection failed: %s", e)
# Wait before reconnecting
self._mqtt._reconnect_wait()
return False
def listen(self) -> Iterable[_events.Event]:
"""Run the listening loop continually.
@@ -315,12 +318,10 @@ class Listener:
self._sequence_id = fetch_sequence_id(self.session)
# Make sure we're connected
while True:
# Beware, internal API, may have to change this to something more stable!
if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async:
self._reconnect()
else:
break
while not self._reconnect():
pass
yield _events.Connect()
while True:
rc = self._mqtt.loop(timeout=1.0)
@@ -339,18 +340,23 @@ class Listener:
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
# If known/expected error
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
log.warning("Connection lost, retrying")
yield _events.Disconnect(reason="Connection lost, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340
log.warning("Connection error, retrying")
yield _events.Disconnect(reason="Connection error, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
raise _exception.NotLoggedIn("MQTT connection refused")
else:
err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err)
reason = "MQTT Error: {}, retrying".format(err)
yield _events.Disconnect(reason=reason)
self._reconnect()
while not self._reconnect():
pass
yield _events.Connect()
if self._tmp_events:
yield from self._tmp_events
@@ -364,7 +370,7 @@ class Listener:
The `Listener` object should not be used after this is called!
Example:
Stop the listener when recieving a message with the text "/stop"
Stop the listener when receiving a message with the text "/stop"
>>> for event in listener.listen():
... if isinstance(event, fbchat.MessageEvent):
@@ -374,7 +380,7 @@ class Listener:
self._mqtt.disconnect()
def set_foreground(self, value: bool) -> None:
"""Set the `foreground` value while listening."""
"""Set the ``foreground`` value while listening."""
# TODO: Document what this actually does!
payload = _util.json_minimal({"foreground": value})
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
@@ -383,7 +389,7 @@ class Listener:
# info.wait_for_publish()
def set_chat_on(self, value: bool) -> None:
"""Set the `chat_on` value while listening."""
"""Set the ``chat_on`` value while listening."""
# TODO: Document what this actually does!
# TODO: Is this the right request to make?
data = {"make_user_available_when_in_foreground": value}

View File

@@ -3,7 +3,7 @@ from . import Image
from .._common import attrs_default
from .. import _util
from typing import Sequence
from typing import Optional, Sequence
@attrs_default
@@ -11,7 +11,7 @@ class Attachment:
"""Represents a Facebook attachment."""
#: The attachment ID
id = attr.ib(None, type=str)
id = attr.ib(None, type=Optional[str])
@attrs_default
@@ -24,21 +24,21 @@ class ShareAttachment(Attachment):
"""Represents a shared item (e.g. URL) attachment."""
#: ID of the author of the shared post
author = attr.ib(None, type=str)
author = attr.ib(None, type=Optional[str])
#: Target URL
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
#: Original URL if Facebook redirects the URL
original_url = attr.ib(None, type=str)
original_url = attr.ib(None, type=Optional[str])
#: Title of the attachment
title = attr.ib(None, type=str)
title = attr.ib(None, type=Optional[str])
#: Description of the attachment
description = attr.ib(None, type=str)
description = attr.ib(None, type=Optional[str])
#: Name of the source
source = attr.ib(None, type=str)
source = attr.ib(None, type=Optional[str])
#: The attached image
image = attr.ib(None, type=Image)
image = attr.ib(None, type=Optional[Image])
#: URL of the original image if Facebook uses ``safe_image``
original_image_url = attr.ib(None, type=str)
original_image_url = attr.ib(None, type=Optional[str])
#: List of additional attachments
attachments = attr.ib(factory=list, type=Sequence[Attachment])

View File

@@ -4,6 +4,8 @@ import enum
from .._common import attrs_default
from .. import _util
from typing import Optional
class ThreadLocation(enum.Enum):
"""Used to specify where a thread is located (inbox, pending, archived, other)."""
@@ -21,11 +23,11 @@ class ThreadLocation(enum.Enum):
@attrs_default
class ActiveStatus:
#: Whether the user is active now
active = attr.ib(None, type=bool)
#: Datetime when the user was last active
last_active = attr.ib(None, type=datetime.datetime)
active = attr.ib(type=bool)
#: When the user was last active
last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Whether the user is playing Messenger game now
in_game = attr.ib(None, type=bool)
in_game = attr.ib(None, type=Optional[bool])
@classmethod
def _from_orca_presence(cls, data):
@@ -42,9 +44,9 @@ class Image:
#: URL to the image
url = attr.ib(type=str)
#: Width of the image
width = attr.ib(None, type=int)
width = attr.ib(None, type=Optional[int])
#: Height of the image
height = attr.ib(None, type=int)
height = attr.ib(None, type=Optional[int])
@classmethod
def _from_uri(cls, data):

View File

@@ -4,7 +4,7 @@ from . import Image, Attachment
from .._common import attrs_default
from .. import _util
from typing import Set
from typing import Set, Optional
@attrs_default
@@ -12,13 +12,13 @@ class FileAttachment(Attachment):
"""Represents a file that has been sent as a Facebook attachment."""
#: URL where you can download the file
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
#: Size of the file in bytes
size = attr.ib(None, type=int)
size = attr.ib(None, type=Optional[int])
#: Name of the file
name = attr.ib(None, type=str)
name = attr.ib(None, type=Optional[str])
#: Whether Facebook determines that this file may be harmful
is_malicious = attr.ib(None, type=bool)
is_malicious = attr.ib(None, type=Optional[bool])
@classmethod
def _from_graphql(cls, data, size=None):
@@ -36,13 +36,13 @@ class AudioAttachment(Attachment):
"""Represents an audio file that has been sent as a Facebook attachment."""
#: Name of the file
filename = attr.ib(None, type=str)
filename = attr.ib(None, type=Optional[str])
#: URL of the audio file
url = attr.ib(None, type=str)
#: Duration of the audio clip as a timedelta
duration = attr.ib(None, type=datetime.timedelta)
url = attr.ib(None, type=Optional[str])
#: Duration of the audio clip
duration = attr.ib(None, type=Optional[datetime.timedelta])
#: Audio type
audio_type = attr.ib(None, type=str)
audio_type = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, data):
@@ -63,13 +63,13 @@ class ImageAttachment(Attachment):
"""
#: The extension of the original image (e.g. ``png``)
original_extension = attr.ib(None, type=str)
original_extension = attr.ib(None, type=Optional[str])
#: Width of original image
width = attr.ib(None, converter=lambda x: None if x is None else int(x), type=int)
width = attr.ib(None, converter=_util.int_or_none, type=Optional[int])
#: Height of original image
height = attr.ib(None, converter=lambda x: None if x is None else int(x), type=int)
height = attr.ib(None, converter=_util.int_or_none, type=Optional[int])
#: Whether the image is animated
is_animated = attr.ib(None, type=bool)
is_animated = attr.ib(None, type=Optional[bool])
#: A set, containing variously sized / various types of previews of the image
previews = attr.ib(factory=set, type=Set[Image])
@@ -113,15 +113,15 @@ class VideoAttachment(Attachment):
"""Represents a video that has been sent as a Facebook attachment."""
#: Size of the original video in bytes
size = attr.ib(None, type=int)
size = attr.ib(None, type=Optional[int])
#: Width of original video
width = attr.ib(None, type=int)
width = attr.ib(None, type=Optional[int])
#: Height of original video
height = attr.ib(None, type=int)
#: Length of video as a timedelta
duration = attr.ib(None, type=datetime.timedelta)
height = attr.ib(None, type=Optional[int])
#: Length of video
duration = attr.ib(None, type=Optional[datetime.timedelta])
#: URL to very compressed preview video
preview_url = attr.ib(None, type=str)
preview_url = attr.ib(None, type=Optional[str])
#: A set, containing variously sized previews of the video
previews = attr.ib(factory=set, type=Set[Image])

View File

@@ -1,8 +1,11 @@
import attr
import datetime
from . import Image, Attachment
from .._common import attrs_default
from .. import _util, _exception
from typing import Optional
@attrs_default
class LocationAttachment(Attachment):
@@ -12,15 +15,15 @@ class LocationAttachment(Attachment):
"""
#: Latitude of the location
latitude = attr.ib(None, type=float)
latitude = attr.ib(None, type=Optional[float])
#: Longitude of the location
longitude = attr.ib(None, type=float)
longitude = attr.ib(None, type=Optional[float])
#: Image showing the map of the location
image = attr.ib(None, type=Image)
image = attr.ib(None, type=Optional[Image])
#: URL to Bing maps with the location
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
# Address of the location
address = attr.ib(None, type=str)
address = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, data):
@@ -51,11 +54,11 @@ class LiveLocationAttachment(LocationAttachment):
"""Represents a live user location."""
#: Name of the location
name = attr.ib(None)
#: Datetime when live location expires
expires_at = attr.ib(None)
name = attr.ib(None, type=Optional[str])
#: When live location expires
expires_at = attr.ib(None, type=Optional[datetime.datetime])
#: True if live location is expired
is_expired = attr.ib(None)
is_expired = attr.ib(None, type=Optional[bool])
@classmethod
def _from_pull(cls, data):

View File

@@ -4,8 +4,8 @@ import enum
from string import Formatter
from . import _attachment, _location, _file, _quick_reply, _sticker
from .._common import log, attrs_default
from .. import _exception, _util, _session, _threads
from typing import Optional, Mapping, Sequence
from .. import _exception, _util
from typing import Optional, Mapping, Sequence, Any
class EmojiSize(enum.Enum):
@@ -85,7 +85,7 @@ class Message:
"""
#: The thread that this message belongs to.
thread = attr.ib(type="_threads.ThreadABC")
thread = attr.ib()
#: The message ID.
id = attr.ib(converter=str, type=str)
@@ -224,7 +224,7 @@ class MessageSnippet(Message):
#: ID of the sender
author = attr.ib(type=str)
#: Datetime of when the message was sent
#: When the message was sent
created_at = attr.ib(type=datetime.datetime)
#: The actual message
text = attr.ib(type=str)
@@ -252,34 +252,34 @@ class MessageData(Message):
#: ID of the sender
author = attr.ib(type=str)
#: Datetime of when the message was sent
#: When the message was sent
created_at = attr.ib(type=datetime.datetime)
#: The actual message
text = attr.ib(None, type=str)
text = attr.ib(None, type=Optional[str])
#: A list of `Mention` objects
mentions = attr.ib(factory=list, type=Sequence[Mention])
#: Size of a sent emoji
emoji_size = attr.ib(None, type=EmojiSize)
emoji_size = attr.ib(None, type=Optional[EmojiSize])
#: Whether the message is read
is_read = attr.ib(None, type=bool)
#: A list of people IDs who read the message, works only with `Client.fetch_thread_messages`
is_read = attr.ib(None, type=Optional[bool])
#: People IDs who read the message, only works with `ThreadABC.fetch_messages`
read_by = attr.ib(factory=list, type=bool)
#: A dictionary with user's IDs as keys, and their reaction as values
reactions = attr.ib(factory=dict, type=Mapping[str, str])
#: A `Sticker`
sticker = attr.ib(None, type=_sticker.Sticker)
sticker = attr.ib(None, type=Optional[_sticker.Sticker])
#: A list of attachments
attachments = attr.ib(factory=list, type=Sequence[_attachment.Attachment])
#: A list of `QuickReply`
quick_replies = attr.ib(factory=list, type=Sequence[_quick_reply.QuickReply])
#: Whether the message is unsent (deleted for everyone)
unsent = attr.ib(False, type=bool)
unsent = attr.ib(False, type=Optional[bool])
#: Message ID you want to reply to
reply_to_id = attr.ib(None, type=str)
reply_to_id = attr.ib(None, type=Optional[str])
#: Replied message
replied_to = attr.ib(None, type="MessageData")
replied_to = attr.ib(None, type=Optional[Any])
#: Whether the message was forwarded
forwarded = attr.ib(False, type=bool)
forwarded = attr.ib(False, type=Optional[bool])
@staticmethod
def _get_forwarded_from_tags(tags):

View File

@@ -4,7 +4,7 @@ import enum
from .._common import attrs_default
from .. import _exception, _util, _session
from typing import Mapping, Sequence
from typing import Mapping, Sequence, Optional
class GuestStatus(enum.Enum):
@@ -132,13 +132,13 @@ class PlanData(Plan):
#: Plan title
title = attr.ib(type=str)
#: Plan location name
location = attr.ib(None, converter=lambda x: x or "", type=str)
location = attr.ib(None, converter=lambda x: x or "", type=Optional[str])
#: Plan location ID
location_id = attr.ib(None, converter=lambda x: x or "", type=str)
location_id = attr.ib(None, converter=lambda x: x or "", type=Optional[str])
#: ID of the plan creator
author_id = attr.ib(None, type=str)
author_id = attr.ib(None, type=Optional[str])
#: `User` ids mapped to their `GuestStatus`
guests = attr.ib(None, type=Mapping[str, GuestStatus])
guests = attr.ib(None, type=Optional[Mapping[str, GuestStatus]])
@property
def going(self) -> Sequence[str]:

View File

@@ -2,7 +2,7 @@ import attr
from . import Attachment
from .._common import attrs_default
from typing import Any
from typing import Any, Optional
@attrs_default
@@ -24,9 +24,9 @@ class QuickReplyText(QuickReply):
"""Represents a text quick reply."""
#: Title of the quick reply
title = attr.ib(None, type=str)
#: URL of the quick reply image (optional)
image_url = attr.ib(None, type=str)
title = attr.ib(None, type=Optional[str])
#: URL of the quick reply image
image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply
_type = "text"
@@ -43,8 +43,8 @@ class QuickReplyLocation(QuickReply):
class QuickReplyPhoneNumber(QuickReply):
"""Represents a phone number quick reply (Doesn't work on mobile)."""
#: URL of the quick reply image (optional)
image_url = attr.ib(None, type=str)
#: URL of the quick reply image
image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply
_type = "user_phone_number"
@@ -53,8 +53,8 @@ class QuickReplyPhoneNumber(QuickReply):
class QuickReplyEmail(QuickReply):
"""Represents an email quick reply (Doesn't work on mobile)."""
#: URL of the quick reply image (optional)
image_url = attr.ib(None, type=str)
#: URL of the quick reply image
image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply
_type = "user_email"

View File

@@ -2,34 +2,36 @@ import attr
from . import Image, Attachment
from .._common import attrs_default
from typing import Optional
@attrs_default
class Sticker(Attachment):
"""Represents a Facebook sticker that has been sent to a thread as an attachment."""
#: The sticker-pack's ID
pack = attr.ib(None, type=str)
pack = attr.ib(None, type=Optional[str])
#: Whether the sticker is animated
is_animated = attr.ib(False, type=bool)
# If the sticker is animated, the following should be present
#: URL to a medium spritemap
medium_sprite_image = attr.ib(None, type=str)
medium_sprite_image = attr.ib(None, type=Optional[str])
#: URL to a large spritemap
large_sprite_image = attr.ib(None, type=str)
large_sprite_image = attr.ib(None, type=Optional[str])
#: The amount of frames present in the spritemap pr. row
frames_per_row = attr.ib(None, type=int)
frames_per_row = attr.ib(None, type=Optional[int])
#: The amount of frames present in the spritemap pr. column
frames_per_col = attr.ib(None, type=int)
frames_per_col = attr.ib(None, type=Optional[int])
#: The total amount of frames in the spritemap
frame_count = attr.ib(None, type=int)
frame_count = attr.ib(None, type=Optional[int])
#: The frame rate the spritemap is intended to be played in
frame_rate = attr.ib(None, type=int)
frame_rate = attr.ib(None, type=Optional[int])
#: The sticker's image
image = attr.ib(None, type=Image)
image = attr.ib(None, type=Optional[Image])
#: The sticker's label/name
label = attr.ib(None, type=str)
label = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, data):

View File

@@ -1,17 +1,55 @@
import attr
import bs4
import datetime
import re
import requests
import random
import urllib.parse
import re
import json
# TODO: Only import when required
# Or maybe just replace usage with `html.parser`?
import bs4
from ._common import log, kw_only
from . import _graphql, _util, _exception
from typing import Optional, Tuple, Mapping, Callable
from typing import Optional, Mapping, Callable, Any
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
SERVER_JS_DEFINE_REGEX = re.compile(
r'require(?:\("ServerJS"\).{,100}\.handle\({.*"define":)|(?:\("ServerJSDefine"\)\)?\.handleDefines\()'
)
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
def parse_server_js_define(html: str) -> Mapping[str, Any]:
"""Parse ``ServerJSDefine`` entries from a HTML document."""
# Find points where we should start parsing
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
# TODO: Extract jsmods "require" and "define" from `bigPipe.onPageletArrive`?
# Skip leading entry
_, *define_splits = define_splits
rtn = []
if not define_splits:
raise _exception.ParseError("Could not find any ServerJSDefine", data=html)
if len(define_splits) < 2:
raise _exception.ParseError("Could not find enough ServerJSDefine", data=html)
if len(define_splits) > 2:
raise _exception.ParseError("Found too many ServerJSDefine", data=define_splits)
# Parse entries (should be two)
for entry in define_splits:
try:
parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0)
except json.JSONDecodeError as e:
raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e
if not isinstance(parsed, list):
raise _exception.ParseError("Invalid ServerJSDefine", data=parsed)
rtn.extend(parsed)
# Convert to a dict
return _util.get_jsmods_define(rtn)
def base36encode(number: int) -> str:
@@ -32,7 +70,7 @@ def base36encode(number: int) -> str:
def prefix_url(url: str) -> str:
if url.startswith("/"):
return "https://www.facebook.com" + url
return "https://www.messenger.com" + url
return url
@@ -51,15 +89,15 @@ def get_user_id(session: requests.Session) -> str:
return str(rtn)
def find_input_fields(html: str):
return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input"))
def session_factory() -> requests.Session:
from . import __version__
session = requests.session()
session.headers["Referer"] = "https://www.facebook.com"
# TODO: Deprecate setting the user agent manually
session.headers["User-Agent"] = random.choice(_util.USER_AGENTS)
session.headers["Referer"] = "https://www.messenger.com/"
# We won't try to set a fake user agent to mask our presence!
# Facebook allows us access anyhow, and it makes our motives clearer:
# We're not trying to cheat Facebook, we simply want to access their service
session.headers["User-Agent"] = "fbchat/{}".format(__version__)
return session
@@ -67,81 +105,96 @@ def client_id_factory() -> str:
return hex(int(random.random() * 2 ** 31))[2:]
def is_home(url: str) -> bool:
parts = urllib.parse.urlparse(url)
# Check the urls `/home.php` and `/`
return "home" in parts.path or "/" == parts.path
def find_form_request(html: str):
soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form"))
form = soup.form
if not form:
raise _exception.ParseError("Could not find form to submit", data=html)
url = form.get("action")
if not url:
raise _exception.ParseError("Could not find url to submit to", data=form)
# From what I've seen, it'll always do this!
if url.startswith("/"):
url = "https://www.facebook.com" + url
# It's okay to set missing values to something crap, the values are localized, and
# hence are not available in the raw HTML
data = {
x["name"]: x.get("value", "[missing]")
for x in form.find_all(["input", "button"])
}
return url, data
def _2fa_helper(session: requests.Session, code: int, r):
soup = find_input_fields(r.text)
data = dict()
def two_factor_helper(session: requests.Session, r, on_2fa_callback):
url, data = find_form_request(r.content.decode("utf-8"))
url = "https://m.facebook.com/login/checkpoint/"
# You don't have to type a code if your device is already saved
# Repeats if you get the code wrong
while "approvals_code" in data:
data["approvals_code"] = on_2fa_callback()
log.info("Submitting 2FA code")
r = session.post(url, data=data, allow_redirects=False)
log.debug("2FA location: %s", r.headers.get("Location"))
url, data = find_form_request(r.content.decode("utf-8"))
data["approvals_code"] = str(code)
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"]
data["nh"] = soup.find("input", {"name": "nh"})["value"]
data["submit[Submit Code]"] = "Submit Code"
data["codes_submitted"] = "0"
log.info("Submitting 2FA code.")
# TODO: Can be missing if checkup flow was done on another device in the meantime?
if "name_action_selected" in data:
data["name_action_selected"] = "save_device"
log.info("Saving browser")
r = session.post(url, data=data, allow_redirects=False)
log.debug("2FA location: %s", r.headers.get("Location"))
url = r.headers.get("Location")
if url and url.startswith("https://www.messenger.com/login/auth_token/"):
return url
url, data = find_form_request(r.content.decode("utf-8"))
r = session.post(url, data=data)
log.info("Starting Facebook checkup flow")
r = session.post(url, data=data, allow_redirects=False)
log.debug("2FA location: %s", r.headers.get("Location"))
if is_home(r.url):
return r
del data["approvals_code"]
del data["submit[Submit Code]"]
del data["codes_submitted"]
url, data = find_form_request(r.content.decode("utf-8"))
if "verification_method" in data:
raise _exception.NotLoggedIn(
"Your account is locked, and you need to log in using a browser, and verify it there!"
)
if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
data["submit[This was me]"] = "[any value]"
del data["submit[This wasn't me]"]
log.info("Verifying login attempt")
r = session.post(url, data=data, allow_redirects=False)
log.debug("2FA location: %s", r.headers.get("Location"))
url, data = find_form_request(r.content.decode("utf-8"))
if "name_action_selected" not in data:
raise _exception.ParseError("Could not fill out form properly (3)", data=data)
data["name_action_selected"] = "save_device"
data["submit[Continue]"] = "Continue"
log.info("Saving browser.")
# At this stage, we have dtsg, nh, name_action_selected, submit[Continue]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["name_action_selected"]
log.info("Starting Facebook checkup flow.")
# At this stage, we have dtsg, nh, submit[Continue]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[Continue]"]
data["submit[This was me]"] = "This Was Me"
log.info("Verifying login attempt.")
# At this stage, we have dtsg, nh, submit[This was me]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[This was me]"]
data["submit[Continue]"] = "Continue"
data["name_action_selected"] = "save_device"
log.info("Saving device again.")
# At this stage, we have dtsg, nh, submit[Continue], name_action_selected
r = session.post(url, data=data)
return r
log.info("Saving device again")
r = session.post(url, data=data, allow_redirects=False)
log.debug("2FA location: %s", r.headers.get("Location"))
return r.headers.get("Location")
def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]:
"""Get error code and message from a request."""
code = None
try:
code = int(_util.get_url_parameter(url, "e"))
except (TypeError, ValueError):
pass
def get_error_data(html: str) -> Optional[str]:
"""Get error message from a request."""
soup = bs4.BeautifulSoup(
html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error")
html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
)
return code, soup.get_text() or None
# Attempt to extract and format the error string
# The error message is in the user's own language!
return " ".join(list(soup.stripped_strings)[1:3]) or None
def get_fb_dtsg(define) -> Optional[str]:
if "DTSGInitData" in define:
return define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
return define["DTSGInitialData"]["token"]
return None
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
@@ -157,7 +210,6 @@ class Session:
_session = attr.ib(factory=session_factory, type=requests.Session)
_counter = attr.ib(0, type=int)
_client_id = attr.ib(factory=client_id_factory, type=str)
_logout_h = attr.ib(None, type=str)
@property
def user(self):
@@ -181,6 +233,7 @@ class Session:
"fb_dtsg": self._fb_dtsg,
}
# TODO: Add ability to load previous cookies in here, to avoid 2fa flow
@classmethod
def login(
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
@@ -190,65 +243,99 @@ class Session:
Args:
email: Facebook ``email``, ``id`` or ``phone number``
password: Facebook account password
on_2fa_callback: Function that will be called, in case a 2FA code is needed.
This should return the requested 2FA code.
on_2fa_callback: Function that will be called, in case a two factor
authentication code is needed. This should return the requested code.
Tested using SMS and authentication applications. If you have both
enabled, you might not receive an SMS code, and you'll have to use the
authentication application.
Note: Facebook limits the amount of codes they will give you, so if you
don't receive a code, be patient, and try again later!
Example:
>>> import getpass
>>> import fbchat
>>> session = fbchat.Session.login("<email or phone>", getpass.getpass())
>>> import getpass
>>> session = fbchat.Session.login(
... input("Email: "),
... getpass.getpass(),
... on_2fa_callback=lambda: input("2FA Code: ")
... )
Email: abc@gmail.com
Password: ****
2FA Code: 123456
>>> session.user.id
"1234"
"""
session = session_factory()
try:
r = session.get("https://m.facebook.com/")
except requests.RequestException as e:
_exception.handle_requests_error(e)
soup = find_input_fields(r.text)
data = dict(
(elem["name"], elem["value"])
for elem in soup
if elem.has_attr("value") and elem.has_attr("name")
)
data["email"] = email
data["pass"] = password
data["login"] = "Log In"
data = {
# "jazoest": "2754",
# "lsd": "AVqqqRUa",
"initial_request_id": "x", # any, just has to be present
# "timezone": "-120",
# "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=",
# "lgnrnd": "044039_RGm9",
"lgnjs": "n",
"email": email,
"pass": password,
"login": "1",
"persistent": "1", # Changes the cookie type to have a long "expires"
"default_persistent": "0",
}
try:
url = "https://m.facebook.com/login.php?login_attempt=1"
r = session.post(url, data=data)
except requests.RequestException as e:
_exception.handle_requests_error(e)
# Usually, 'Checkpoint' will refer to 2FA
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
if not on_2fa_callback:
raise ValueError(
"2FA code required, please add `on_2fa_callback` to .login"
)
code = on_2fa_callback()
try:
r = _2fa_helper(session, code, r)
except requests.RequestException as e:
_exception.handle_requests_error(e)
# Sometimes Facebook tries to show the user a "Save Device" dialog
if "save-device" in r.url:
try:
r = session.get("https://m.facebook.com/login/save-device/cancel/")
except requests.RequestException as e:
_exception.handle_requests_error(e)
if is_home(r.url):
return cls._from_session(session=session)
else:
code, msg = get_error_data(r.text, r.url)
raise _exception.ExternalError(
"Login failed at url {!r}".format(r.url), msg, code=code
# Should hit a redirect to https://www.messenger.com/
# If this does happen, the session is logged in!
r = session.post(
"https://www.messenger.com/login/password/",
data=data,
allow_redirects=False,
)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
url = r.headers.get("Location")
# We weren't redirected, hence the email or password was wrong
if not url:
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn(error)
if "checkpoint" in url:
if not on_2fa_callback:
raise _exception.NotLoggedIn(
"2FA code required! Please supply `on_2fa_callback` to .login"
)
# Get a facebook.com/checkpoint/start url that handles the 2FA flow
# This probably works differently for Messenger-only accounts
url = _util.get_url_parameter(url, "next")
if not url.startswith("https://www.facebook.com/checkpoint/start/"):
raise _exception.ParseError("Failed 2fa flow (1)", data=url)
r = session.get(url, allow_redirects=False)
url = r.headers.get("Location")
if not url or not url.startswith("https://www.facebook.com/checkpoint/"):
raise _exception.ParseError("Failed 2fa flow (2)", data=url)
r = session.get(url, allow_redirects=False)
url = two_factor_helper(session, r, on_2fa_callback)
if not url.startswith("https://www.messenger.com/login/auth_token/"):
raise _exception.ParseError("Failed 2fa flow (3)", data=url)
r = session.get(url, allow_redirects=False)
url = r.headers.get("Location")
if url != "https://www.messenger.com/":
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error))
try:
return cls._from_session(session=session)
except _exception.NotLoggedIn as e:
raise _exception.ParseError("Failed loading session", data=r) from e
def is_logged_in(self) -> bool:
"""Send a request to Facebook to check the login status.
@@ -260,12 +347,12 @@ class Session:
>>> assert session.is_logged_in()
"""
# Send a request to the login url, to see if we're directed to the home page
url = "https://m.facebook.com/login.php?login_attempt=1"
try:
r = self._session.get(url, allow_redirects=False)
r = self._session.get(prefix_url("/login/"), allow_redirects=False)
except requests.RequestException as e:
_exception.handle_requests_error(e)
return "Location" in r.headers and is_home(r.headers["Location"])
_exception.handle_http_error(r.status_code)
return "https://www.messenger.com/" == r.headers.get("Location")
def logout(self) -> None:
"""Safely log out the user.
@@ -275,56 +362,51 @@ class Session:
Example:
>>> session.logout()
"""
logout_h = self._logout_h
if not logout_h:
url = prefix_url("/bluebar/modern_settings_menu/")
try:
h_r = self._session.post(url, data={"pmid": "4"})
except requests.RequestException as e:
_exception.handle_requests_error(e)
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
url = prefix_url("/logout.php")
data = {"fb_dtsg": self._fb_dtsg}
try:
r = self._session.get(url, params={"ref": "mb", "h": logout_h})
r = self._session.post(
prefix_url("/logout/"), data=data, allow_redirects=False
)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
if "Location" not in r.headers:
raise _exception.FacebookError("Failed logging out, was not redirected!")
if "https://www.messenger.com/login/" != r.headers["Location"]:
raise _exception.FacebookError(
"Failed logging out, got bad redirect: {}".format(r.headers["Location"])
)
@classmethod
def _from_session(cls, session):
# TODO: Automatically set user_id when the cookie changes in the session
user_id = get_user_id(session)
# Make a request to the main page to retrieve ServerJSDefine entries
try:
r = session.get(prefix_url("/"))
r = session.get(prefix_url("/"), allow_redirects=False)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
soup = find_input_fields(r.text)
define = parse_server_js_define(r.content.decode("utf-8"))
fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"})
if fb_dtsg_element:
fb_dtsg = fb_dtsg_element["value"]
else:
# Fall back to searching with a regex
res = FB_DTSG_REGEX.search(r.text)
if not res:
raise _exception.NotLoggedIn("Could not find fb_dtsg")
fb_dtsg = res.group(1)
fb_dtsg = get_fb_dtsg(define)
if fb_dtsg is None:
raise _exception.ParseError("Could not find fb_dtsg", data=define)
if not fb_dtsg:
# Happens when the client is not actually logged in
raise _exception.NotLoggedIn(
"Found empty fb_dtsg, the session was probably invalid."
)
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0])
try:
revision = int(define["SiteData"]["client_revision"])
except TypeError:
raise _exception.ParseError("Could not find client revision", data=define)
logout_h_element = soup.find("input", {"name": "h"})
logout_h = logout_h_element["value"] if logout_h_element else None
return cls(
user_id=user_id,
fb_dtsg=fb_dtsg,
revision=revision,
session=session,
logout_h=logout_h,
)
return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session)
def get_cookies(self) -> Mapping[str, str]:
"""Retrieve session cookies, that can later be used in `from_cookies`.
@@ -379,10 +461,9 @@ class Session:
# update fb_dtsg token if received in response
if "jsmods" in j:
define = _util.get_jsmods_define(j["jsmods"]["define"])
if "DTSGInitData" in define:
self._fb_dtsg = define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
self._fb_dtsg = define["DTSGInitialData"]["token"]
fb_dtsg = get_fb_dtsg(define)
if fb_dtsg:
self._fb_dtsg = fb_dtsg
try:
return j["payload"]

View File

@@ -211,7 +211,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
Example:
Send a pinned location in Beijing, China.
>>> thread.send_location(39.9390731, 116.117273)
>>> thread.send_pinned_location(39.9390731, 116.117273)
"""
self._send_location(False, latitude=latitude, longitude=longitude)
@@ -313,13 +313,13 @@ class ThreadABC(metaclass=abc.ABCMeta):
def search_messages(
self, query: str, limit: int
) -> Iterable["_models.MessageSnippet"]:
) -> Iterable[_models.MessageSnippet]:
"""Find and get message IDs by query.
Warning! If someone send a message to the thread that matches the query, while
we're searching, some snippets will get returned twice.
This is fundamentally unfixable, it's just how the endpoint is implemented.
This is fundamentally not fixable, it's just how the endpoint is implemented.
The returned message snippets are ordered by last sent first.

View File

@@ -4,7 +4,8 @@ from ._abc import ThreadABC
from . import _user
from .._common import attrs_default
from .. import _util, _session, _graphql, _models
from typing import Sequence, Iterable, Set, Mapping
from typing import Sequence, Iterable, Set, Mapping, Optional
@attrs_default
@@ -179,31 +180,31 @@ class GroupData(Group):
"""
#: The group's picture
photo = attr.ib(None, type="_models.Image")
photo = attr.ib(None, type=Optional[_models.Image])
#: The name of the group
name = attr.ib(None, type=str)
name = attr.ib(None, type=Optional[str])
#: When the group was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime)
last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the group
message_count = attr.ib(None, type=int)
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type="_models.PlanData")
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The group thread's participant user ids
participants = attr.ib(factory=set, type=Set[str])
#: A dictionary, containing user nicknames mapped to their IDs
nicknames = attr.ib(factory=dict, type=Mapping[str, str])
#: The groups's message color
color = attr.ib(None, type=str)
color = attr.ib(None, type=Optional[str])
#: The groups's default emoji
emoji = attr.ib(None, type=str)
emoji = attr.ib(None, type=Optional[str])
# User ids of thread admins
admins = attr.ib(factory=set, type=Set[str])
# True if users need approval to join
approval_mode = attr.ib(None, type=bool)
approval_mode = attr.ib(None, type=Optional[bool])
# Set containing user IDs requesting to join
approval_requests = attr.ib(factory=set, type=Set[str])
# Link for joining group
join_link = attr.ib(None, type=str)
join_link = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, session, data):

View File

@@ -4,6 +4,8 @@ from ._abc import ThreadABC
from .._common import attrs_default
from .. import _session, _models
from typing import Optional
@attrs_default
class Page(ThreadABC):
@@ -35,25 +37,25 @@ class PageData(Page):
"""
#: The page's picture
photo = attr.ib(type="_models.Image")
photo = attr.ib(type=_models.Image)
#: The name of the page
name = attr.ib(type=str)
#: When the thread was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime)
last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the thread
message_count = attr.ib(None, type=int)
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type="_models.PlanData")
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The page's custom URL
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
#: The name of the page's location city
city = attr.ib(None, type=str)
city = attr.ib(None, type=Optional[str])
#: Amount of likes the page has
likes = attr.ib(None, type=int)
likes = attr.ib(None, type=Optional[int])
#: Some extra information about the page
sub_title = attr.ib(None, type=str)
sub_title = attr.ib(None, type=Optional[str])
#: The page's category
category = attr.ib(None, type=str)
category = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, session, data):

View File

@@ -4,6 +4,8 @@ from ._abc import ThreadABC
from .._common import log, attrs_default
from .. import _util, _session, _models
from typing import Optional
GENDERS = {
# For standard requests
@@ -103,7 +105,7 @@ class UserData(User):
"""
#: The user's picture
photo = attr.ib(type="_models.Image")
photo = attr.ib(type=_models.Image)
#: The name of the user
name = attr.ib(type=str)
#: Whether the user and the client are friends
@@ -111,27 +113,27 @@ class UserData(User):
#: The users first name
first_name = attr.ib(type=str)
#: The users last name
last_name = attr.ib(None, type=str)
#: Datetime when the thread was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime)
last_name = attr.ib(None, type=Optional[str])
#: When the thread was last active / when the last message was sent
last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the thread
message_count = attr.ib(None, type=int)
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type="_models.PlanData")
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The profile URL. ``None`` for Messenger-only users
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
#: The user's gender
gender = attr.ib(None, type=str)
gender = attr.ib(None, type=Optional[str])
#: From 0 to 1. How close the client is to the user
affinity = attr.ib(None, type=float)
affinity = attr.ib(None, type=Optional[float])
#: The user's nickname
nickname = attr.ib(None, type=str)
nickname = attr.ib(None, type=Optional[str])
#: The clients nickname, as seen by the user
own_nickname = attr.ib(None, type=str)
own_nickname = attr.ib(None, type=Optional[str])
#: The message color
color = attr.ib(None, type=str)
color = attr.ib(None, type=Optional[str])
#: The default emoji
emoji = attr.ib(None, type=str)
emoji = attr.ib(None, type=Optional[str])
@staticmethod
def _get_other_user(data):

View File

@@ -9,15 +9,12 @@ from . import _exception
from typing import Iterable, Optional, Any, Mapping, Sequence
#: Default list of user agents
USER_AGENTS = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/601.1.10 (KHTML, like Gecko) Version/8.0.5 Safari/601.1.10",
"Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
]
def int_or_none(inp: Any) -> Optional[int]:
try:
return int(inp)
except Exception:
return None
def get_limits(limit: Optional[int], max_limit: int) -> Iterable[int]:
@@ -66,16 +63,19 @@ def generate_offline_threading_id():
return str(int(msgs, 2))
def remove_version_from_module(module):
return module.split("@", 1)[0]
def get_jsmods_require(require) -> Mapping[str, Sequence[Any]]:
rtn = {}
for item in require:
if len(item) == 1:
(module,) = item
rtn[module] = []
rtn[remove_version_from_module(module)] = []
continue
method = "{}.{}".format(item[0], item[1])
requirements = item[2]
arguments = item[3]
module, method, requirements, arguments = item
method = "{}.{}".format(remove_version_from_module(module), method)
rtn[method] = arguments
return rtn

View File

@@ -1,6 +1,10 @@
[pytest]
xfail_strict = true
markers =
online: Online tests, that require a user account set up. Meant to be used \
manually, to check whether Facebook has broken something.
addopts =
--strict
-m "not online"
testpaths = tests
filterwarnings = error

View File

@@ -123,6 +123,37 @@ def test_title_set(session):
) == parse_delta(session, data)
def test_title_removed(session):
data = {
"irisSeqId": "11223344",
"irisTags": ["DeltaThreadName", "is_from_iris_fanout"],
"messageMetadata": {
"actorFbId": "3456",
"adminText": "You removed the group name.",
"folderId": {"systemFolderId": "INBOX"},
"messageId": "mid.$XYZ",
"offlineThreadingId": "1122334455",
"skipBumpThread": False,
"tags": [],
"threadKey": {"threadFbId": "4321"},
"threadReadStateEffect": "KEEP_AS_IS",
"timestamp": "1500000000000",
"unsendType": "deny_log_message",
},
"name": "",
"participants": ["1234", "2345", "3456", "4567"],
"requestContext": {"apiArgs": {}},
"tqSeqId": "1111",
"class": "ThreadName",
}
assert TitleSet(
author=User(session=session, id="3456"),
thread=Group(session=session, id="4321"),
title=None,
at=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc),
) == parse_delta(session, data)
def test_forced_fetch(session):
data = {
"forceInsert": False,

67
tests/online/conftest.py Normal file
View File

@@ -0,0 +1,67 @@
import fbchat
import pytest
import logging
import getpass
@pytest.fixture(scope="session")
def session(pytestconfig):
session_cookies = pytestconfig.cache.get("session_cookies", None)
try:
session = fbchat.Session.from_cookies(session_cookies)
except fbchat.FacebookError:
logging.exception("Error while logging in with cookies!")
session = fbchat.Session.login(input("Email: "), getpass.getpass("Password: "))
yield session
pytestconfig.cache.set("session_cookies", session.get_cookies())
# TODO: Allow the main session object to be closed - and perhaps used in `with`?
session._session.close()
@pytest.fixture
def client(session):
return fbchat.Client(session=session)
@pytest.fixture(scope="session")
def user(pytestconfig, session):
user_id = pytestconfig.cache.get("user_id", None)
if not user_id:
user_id = input("A user you're chatting with's id: ")
pytestconfig.cache.set("user_id", user_id)
return fbchat.User(session=session, id=user_id)
@pytest.fixture(scope="session")
def group(pytestconfig, session):
group_id = pytestconfig.cache.get("group_id", None)
if not group_id:
group_id = input("A group you're chatting with's id: ")
pytestconfig.cache.set("group_id", group_id)
return fbchat.Group(session=session, id=group_id)
@pytest.fixture(
scope="session",
params=[
"user",
"group",
"self",
pytest.param("invalid", marks=[pytest.mark.xfail()]),
],
)
def any_thread(request, session, user, group):
return {
"user": user,
"group": group,
"self": session.user,
"invalid": fbchat.Thread(session=session, id="0"),
}[request.param]
@pytest.fixture
def listener(session):
return fbchat.Listener(session=session, chat_on=False, foreground=False)

117
tests/online/test_client.py Normal file
View File

@@ -0,0 +1,117 @@
import pytest
import fbchat
import datetime
import os
pytestmark = pytest.mark.online
def test_fetch(client):
client.fetch_users()
def test_search_for_users(client):
list(client.search_for_users("test", 10))
def test_search_for_pages(client):
list(client.search_for_pages("test", 100))
def test_search_for_groups(client):
list(client.search_for_groups("test", 1000))
def test_search_for_threads(client):
list(client.search_for_threads("test", 1000))
with pytest.raises(fbchat.HTTPError, match="rate limited"):
list(client.search_for_threads("test", 10000))
def test_message_search(client):
list(client.search_messages("test", 500))
def test_fetch_thread_info(client):
list(client.fetch_thread_info(["4"]))[0]
def test_fetch_threads(client):
list(client.fetch_threads(20))
list(client.fetch_threads(200))
def test_undocumented(client):
client.fetch_unread()
client.fetch_unseen()
@pytest.fixture
def open_resource(pytestconfig):
def get_resource_inner(filename):
path = os.path.join(pytestconfig.rootdir, "tests", "resources", filename)
return open(path, "rb")
return get_resource_inner
def test_upload_and_fetch_image_url(client, open_resource):
with open_resource("image.png") as f:
((id, mimetype),) = client.upload([("image.png", f, "image/png")])
assert mimetype == "image/png"
assert client.fetch_image_url(id).startswith("http")
def test_upload_image(client, open_resource):
with open_resource("image.png") as f:
_ = client.upload([("image.png", f, "image/png")])
def test_upload_many(client, open_resource):
with open_resource("image.png") as f_png, open_resource(
"image.jpg"
) as f_jpg, open_resource("image.gif") as f_gif, open_resource(
"file.json"
) as f_json, open_resource(
"file.txt"
) as f_txt, open_resource(
"audio.mp3"
) as f_mp3, open_resource(
"video.mp4"
) as f_mp4:
_ = client.upload(
[
("image.png", f_png, "image/png"),
("image.jpg", f_jpg, "image/jpeg"),
("image.gif", f_gif, "image/gif"),
("file.json", f_json, "application/json"),
("file.txt", f_txt, "text/plain"),
("audio.mp3", f_mp3, "audio/mpeg"),
("video.mp4", f_mp4, "video/mp4"),
]
)
def test_mark_as_read(client, user, group):
client.mark_as_read([user, group], datetime.datetime.now())
def test_mark_as_unread(client, user, group):
client.mark_as_unread([user, group], datetime.datetime.now())
def test_move_threads(client, user, group):
client.move_threads(fbchat.ThreadLocation.PENDING, [user, group])
client.move_threads(fbchat.ThreadLocation.INBOX, [user, group])
@pytest.mark.skip(reason="need to have threads to delete")
def test_delete_threads():
pass
@pytest.mark.skip(reason="need to have messages to delete")
def test_delete_messages():
pass

42
tests/online/test_send.py Normal file
View File

@@ -0,0 +1,42 @@
import pytest
import fbchat
pytestmark = pytest.mark.online
# TODO: Verify return values
def test_wave(any_thread):
assert any_thread.wave(True)
assert any_thread.wave(False)
def test_send_text(any_thread):
assert any_thread.send_text("Test")
def test_send_text_with_mention(any_thread):
mention = fbchat.Mention(thread_id=any_thread.id, offset=5, length=8)
assert any_thread.send_text("Test @mention", mentions=[mention])
def test_send_emoji(any_thread):
assert any_thread.send_emoji("😀", size=fbchat.EmojiSize.LARGE)
def test_send_sticker(any_thread):
assert any_thread.send_sticker("1889713947839631")
def test_send_location(any_thread):
any_thread.send_location(51.5287718, -0.2416815)
def test_send_pinned_location(any_thread):
any_thread.send_pinned_location(39.9390731, 116.117273)
@pytest.mark.skip(reason="need a way to use the uploaded files from test_client.py")
def test_send_files(any_thread):
pass

View File

@@ -1,15 +1,61 @@
import datetime
import pytest
from fbchat import ParseError
from fbchat._session import (
parse_server_js_define,
base36encode,
prefix_url,
generate_message_id,
session_factory,
client_id_factory,
is_home,
find_form_request,
get_error_data,
)
def test_parse_server_js_define_old():
html = """
some data;require("TimeSliceImpl").guard(function(){(require("ServerJSDefine")).handleDefines([["DTSGInitialData",[],{"token":"123"},100]])
<script>require("TimeSliceImpl").guard(function() {require("ServerJSDefine").handleDefines([["DTSGInitData",[],{"token":"123","async_get_token":"12345"},3333]])
</script>
other irrelevant data
"""
define = parse_server_js_define(html)
assert define == {
"DTSGInitialData": {"token": "123"},
"DTSGInitData": {"async_get_token": "12345", "token": "123"},
}
def test_parse_server_js_define_new():
html = """
some data;require("TimeSliceImpl").guard(function(){new (require("ServerJS"))().handle({"define":[["DTSGInitialData",[],{"token":""},100]],"require":[...]});}, "ServerJS define", {"root":true})();
more data
<script><script>require("TimeSliceImpl").guard(function(){var s=new (require("ServerJS"))();s.handle({"define":[["DTSGInitData",[],{"token":"","async_get_token":""},3333]],"require":[...]});require("Run").onAfterLoad(function(){s.cleanup(require("TimeSliceImpl"))});}, "ServerJS define", {"root":true})();</script>
other irrelevant data
"""
define = parse_server_js_define(html)
assert define == {
"DTSGInitialData": {"token": ""},
"DTSGInitData": {"async_get_token": "", "token": ""},
}
def test_parse_server_js_define_error():
with pytest.raises(ParseError, match="Could not find any"):
parse_server_js_define("")
html = 'function(){(require("ServerJSDefine")).handleDefines([{"a": function(){}}])'
with pytest.raises(ParseError, match="Invalid"):
parse_server_js_define(html + html)
html = 'function(){require("ServerJSDefine").handleDefines({"a": "b"})'
with pytest.raises(ParseError, match="Invalid"):
parse_server_js_define(html + html)
@pytest.mark.parametrize(
"number,expected",
[(1, "1"), (10, "a"), (123, "3f"), (1000, "rs"), (123456789, "21i3v9")],
@@ -19,8 +65,10 @@ def test_base36encode(number, expected):
def test_prefix_url():
assert prefix_url("/") == "https://www.facebook.com/"
assert prefix_url("/abc") == "https://www.facebook.com/abc"
static_url = "https://upload.messenger.com/"
assert prefix_url(static_url) == static_url
assert prefix_url("/") == "https://www.messenger.com/"
assert prefix_url("/abc") == "https://www.messenger.com/abc"
def test_generate_message_id():
@@ -28,46 +76,115 @@ def test_generate_message_id():
assert generate_message_id(datetime.datetime.utcnow(), "def")
def test_session_factory():
session = session_factory()
assert session.headers
def test_client_id_factory():
# Returns random output, so hard to test more thoroughly
assert client_id_factory()
def test_is_home():
assert not is_home("https://m.facebook.com/login/?...")
assert is_home("https://m.facebook.com/home.php?refsrc=...")
def test_find_form_request():
html = """
<div>
<form action="/checkpoint/?next=https%3A%2F%2Fwww.messenger.com%2F" class="checkpoint" id="u_0_c" method="post" onsubmit="">
<input autocomplete="off" name="jazoest" type="hidden" value="some-number" />
<input autocomplete="off" name="fb_dtsg" type="hidden" value="some-base64" />
<input class="hidden_elem" data-default-submit="true" name="submit[Continue]" type="submit" />
<input autocomplete="off" name="nh" type="hidden" value="some-hex" />
<div class="_4-u2 _5x_7 _p0k _5x_9 _4-u8">
<div class="_2e9n" id="u_0_d">
<strong id="u_0_e">Two factor authentication required</strong>
<div id="u_0_f"></div>
</div>
<div class="_2ph_">
<input autocomplete="off" name="no_fido" type="hidden" value="true" />
<div class="_50f4">You've asked us to require a 6-digit login code when anyone tries to access your account from a new device or browser.</div>
<div class="_3-8y _50f4">Enter the 6-digit code from your Code Generator or 3rd party app below.</div>
<div class="_2pie _2pio">
<span>
<input aria-label="Login code" autocomplete="off" class="inputtext" id="approvals_code" name="approvals_code" placeholder="Login code" tabindex="1" type="text" />
</span>
</div>
</div>
<div class="_5hzs" id="checkpointBottomBar">
<div class="_2s5p">
<button class="_42ft _4jy0 _2kak _4jy4 _4jy1 selected _51sy" id="checkpointSubmitButton" name="submit[Continue]" type="submit" value="Continue">Continue</button>
</div>
<div class="_2s5q">
<div class="_25b6" id="u_0_g">
<a href="#" id="u_0_h" role="button">Need another way to authenticate?</a>
</div>
</div>
</div>
</div>
</form>
</div>
"""
url, data = find_form_request(html)
assert url.startswith("https://www.facebook.com/checkpoint/")
assert {
"jazoest": "some-number",
"fb_dtsg": "some-base64",
"nh": "some-hex",
"no_fido": "true",
"approvals_code": "[missing]",
"submit[Continue]": "Continue",
} == data
def test_find_form_request_error():
with pytest.raises(ParseError, match="Could not find form to submit"):
assert find_form_request("")
with pytest.raises(ParseError, match="Could not find url to submit to"):
assert find_form_request("<form></form>")
@pytest.mark.skip
def test_get_error_data():
html = """<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//WAPFORUM//DTD XHTML Mobile 1.0//EN" "http://www.wapforum.org/DTD/xhtml-mobile10.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
html = """<!DOCTYPE html>
<html lang="da" id="facebook" class="no_js">
<head>
<title>Log in to Facebook | Facebook</title>
<meta name="referrer" content="origin-when-crossorigin" id="meta_referrer" />
<style type="text/css">...</style>
<meta name="description" content="..." />
<link rel="canonical" href="https://www.facebook.com/login/" />
<meta charset="utf-8" />
<title id="pageTitle">Messenger</title>
<meta name="referrer" content="default" id="meta_referrer" />
</head>
<body tabindex="0" class="b c d e f g">
<div class="h"><div id="viewport">...<div id="objects_container"><div class="g" id="root" role="main">
<table class="x" role="presentation"><tbody><tr><td class="y">
<div class="z ba bb" style="" id="login_error">
<div class="bc">
<span>The password you entered is incorrect. <a href="/recover/initiate/?ars=facebook_login_pw_error&amp;email=abc@mail.com&amp;__ccr=XXX" class="bd" aria-label="Have you forgotten your password?">Did you forget your password?</a></span>
<body class="_605a x1 Locale_da_DK" dir="ltr">
<div class="_3v_o" id="XMessengerDotComLoginViewPlaceholder">
<form id="login_form" action="/login/password/" method="post" onsubmit="">
<input type="hidden" name="jazoest" value="2222" autocomplete="off" />
<input type="hidden" name="lsd" value="xyz-abc" autocomplete="off" />
<div class="_3403 _3404">
<div>Type your password again</div>
<div>The password you entered is incorrect. <a href="https://www.facebook.com/recover/initiate?ars=facebook_login_pw_error">Did you forget your password?</a></div>
</div>
<div id="loginform">
<input type="hidden" autocomplete="off" id="initial_request_id" name="initial_request_id" value="xxx" />
<input type="hidden" autocomplete="off" name="timezone" value="" id="u_0_1" />
<input type="hidden" autocomplete="off" name="lgndim" value="" id="u_0_2" />
<input type="hidden" name="lgnrnd" value="aaa" />
<input type="hidden" id="lgnjs" name="lgnjs" value="n" />
<input type="text" class="inputtext _55r1 _43di" id="email" name="email" placeholder="E-mail or phone number" value="some@email.com" tabindex="0" aria-label="E-mail or phone number" />
<input type="password" class="inputtext _55r1 _43di" name="pass" id="pass" tabindex="0" placeholder="Password" aria-label="Password" />
<button value="1" class="_42ft _4jy0 _2m_r _43dh _4jy4 _517h _51sy" id="loginbutton" name="login" tabindex="0" type="submit">Continue</button>
<div class="_43dj">
<div class="uiInputLabel clearfix">
<label class="uiInputLabelInput">
<input type="checkbox" value="1" name="persistent" tabindex="0" class="" id="u_0_0" />
<span class=""></span>
</label>
<label for="u_0_0" class="uiInputLabelLabel">Stay logged in</label>
</div>
<input type="hidden" autocomplete="off" id="default_persistent" name="default_persistent" value="0" />
</div>
</form>
</div>
...
</td></tr></tbody></table>
<div style="display:none"></div><span><img src="https://facebook.com/security/hsts-pixel.gif" width="0" height="0" style="display:none" /></span>
</div></div><div></div></div></div>
</body>
</html>
"""
url = "https://m.facebook.com/login/?email=abc@mail.com&li=XXX&e=1348092"
msg = "The password you entered is incorrect. Did you forget your password?"
assert (1348092, msg) == get_error_data(html)
assert msg == get_error_data(html)

View File

@@ -68,6 +68,17 @@ def test_get_jsmods_require():
}
def test_get_jsmods_require_version_specifier():
data = [
["DimensionTracking@1234"],
["CavalryLoggerImpl@2345", "startInstrumentation", [], []],
]
assert get_jsmods_require(data) == {
"DimensionTracking": [],
"CavalryLoggerImpl.startInstrumentation": [],
}
def test_get_jsmods_require_get_image_url():
data = [
[