Compare commits
32 Commits
v2.0.0a2
...
8ac6dc4ae6
Author | SHA1 | Date | |
---|---|---|---|
|
8ac6dc4ae6 | ||
|
a6cf1d5c89 | ||
|
65b42e6532 | ||
|
8824a1c253 | ||
|
520258e339 | ||
|
435dfaf6d8 | ||
|
cf0e1e3a93 | ||
|
2319fc7c4a | ||
|
b35240bdda | ||
|
6141cc5a41 | ||
|
b1e438dae1 | ||
|
3c0f411be7 | ||
|
9ad0090b02 | ||
|
bec151a560 | ||
|
2087182ecf | ||
|
09627b71ae | ||
|
078bf9fc16 | ||
|
d33e36866d | ||
|
2a382ffaed | ||
|
18a3ffb90d | ||
|
db284cefdf | ||
|
d11f417caa | ||
|
3b71258f2c | ||
|
81584d328b | ||
|
7be2acad7d | ||
|
079d4093c4 | ||
|
cce947b18c | ||
|
2545a01450 | ||
|
5d763dfbce | ||
|
0981be42b9 | ||
|
93b71bf198 | ||
|
af3758c8a9 |
@@ -15,4 +15,6 @@ python:
|
|||||||
# Build documentation in the docs/ directory with Sphinx
|
# Build documentation in the docs/ directory with Sphinx
|
||||||
sphinx:
|
sphinx:
|
||||||
configuration: docs/conf.py
|
configuration: docs/conf.py
|
||||||
fail_on_warning: true
|
# Disabled, until we can find a way to get sphinx-autodoc-typehints play nice with our
|
||||||
|
# module renaming!
|
||||||
|
fail_on_warning: false
|
||||||
|
@@ -2,7 +2,7 @@ import fbchat
|
|||||||
|
|
||||||
session = fbchat.Session.login("<email>", "<password>")
|
session = fbchat.Session.login("<email>", "<password>")
|
||||||
|
|
||||||
client = fbchat.Client(session)
|
client = fbchat.Client(session=session)
|
||||||
|
|
||||||
# Fetches a list of all users you're currently chatting with, as `User` objects
|
# Fetches a list of all users you're currently chatting with, as `User` objects
|
||||||
users = client.fetch_all_users()
|
users = client.fetch_all_users()
|
||||||
@@ -65,5 +65,5 @@ print("thread's name: {}".format(thread.name))
|
|||||||
images = list(thread.fetch_images(limit=20))
|
images = list(thread.fetch_images(limit=20))
|
||||||
for image in images:
|
for image in images:
|
||||||
if isinstance(image, fbchat.ImageAttachment):
|
if isinstance(image, fbchat.ImageAttachment):
|
||||||
url = c.fetch_image_url(image.id)
|
url = client.fetch_image_url(image.id)
|
||||||
print(url)
|
print(url)
|
||||||
|
@@ -60,7 +60,7 @@ thread.set_color("#0084ff")
|
|||||||
# Will change the thread emoji to `👍`
|
# Will change the thread emoji to `👍`
|
||||||
thread.set_emoji("👍")
|
thread.set_emoji("👍")
|
||||||
|
|
||||||
message = fbchat.Message(session=session, id="<message id>")
|
message = fbchat.Message(thread=thread, id="<message id>")
|
||||||
|
|
||||||
# Will react to a message with a 😍 emoji
|
# Will react to a message with a 😍 emoji
|
||||||
message.react("😍")
|
message.react("😍")
|
||||||
|
@@ -80,7 +80,7 @@ def on_person_removed(sender, event: fbchat.PersonRemoved):
|
|||||||
return
|
return
|
||||||
if event.author.id != session.user.id:
|
if event.author.id != session.user.id:
|
||||||
print(f"{event.removed.id} got removed. They will be re-added")
|
print(f"{event.removed.id} got removed. They will be re-added")
|
||||||
event.thread.add_participants([removed.id])
|
event.thread.add_participants([event.removed.id])
|
||||||
|
|
||||||
|
|
||||||
# Login, and start listening for events
|
# Login, and start listening for events
|
||||||
|
@@ -5,7 +5,7 @@ def on_message(event):
|
|||||||
# We can only kick people from group chats, so no need to try if it's a user chat
|
# We can only kick people from group chats, so no need to try if it's a user chat
|
||||||
if not isinstance(event.thread, fbchat.Group):
|
if not isinstance(event.thread, fbchat.Group):
|
||||||
return
|
return
|
||||||
if message.text == "Remove me!":
|
if event.message.text == "Remove me!":
|
||||||
print(f"{event.author.id} will be removed from {event.thread.id}")
|
print(f"{event.author.id} will be removed from {event.thread.id}")
|
||||||
event.thread.remove_participant(event.author.id)
|
event.thread.remove_participant(event.author.id)
|
||||||
|
|
||||||
|
@@ -18,7 +18,7 @@ def load_cookies(filename):
|
|||||||
|
|
||||||
def save_cookies(filename, cookies):
|
def save_cookies(filename, cookies):
|
||||||
with open(filename, "w") as f:
|
with open(filename, "w") as f:
|
||||||
json.dump(f, cookies)
|
json.dump(cookies, f)
|
||||||
|
|
||||||
|
|
||||||
def load_session(cookies):
|
def load_session(cookies):
|
||||||
|
@@ -65,6 +65,7 @@ from ._models import (
|
|||||||
EmojiSize,
|
EmojiSize,
|
||||||
Mention,
|
Mention,
|
||||||
Message,
|
Message,
|
||||||
|
MessageSnippet,
|
||||||
MessageData,
|
MessageData,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -117,7 +118,7 @@ from ._listen import Listener
|
|||||||
|
|
||||||
from ._client import Client
|
from ._client import Client
|
||||||
|
|
||||||
__version__ = "2.0.0a2"
|
__version__ = "2.0.0a4"
|
||||||
|
|
||||||
__all__ = ("Session", "Listener", "Client")
|
__all__ = ("Session", "Listener", "Client")
|
||||||
|
|
||||||
|
@@ -419,7 +419,7 @@ class Client:
|
|||||||
Warning:
|
Warning:
|
||||||
This is not finished, and the API may change at any point!
|
This is not finished, and the API may change at any point!
|
||||||
"""
|
"""
|
||||||
at = datetime.datetime.utcnow()
|
at = _util.now()
|
||||||
form = {
|
form = {
|
||||||
"folders[0]": "inbox",
|
"folders[0]": "inbox",
|
||||||
"client": "mercury",
|
"client": "mercury",
|
||||||
@@ -524,7 +524,9 @@ class Client:
|
|||||||
data = {"voice_clip": voice_clip}
|
data = {"voice_clip": voice_clip}
|
||||||
|
|
||||||
j = self.session._payload_post(
|
j = self.session._payload_post(
|
||||||
"https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict
|
"https://upload.messenger.com/ajax/mercury/upload.php",
|
||||||
|
data,
|
||||||
|
files=file_dict,
|
||||||
)
|
)
|
||||||
|
|
||||||
if len(j["metadata"]) != len(file_dict):
|
if len(j["metadata"]) != len(file_dict):
|
||||||
@@ -556,7 +558,7 @@ class Client:
|
|||||||
"shouldSendReadReceipt": "true",
|
"shouldSendReadReceipt": "true",
|
||||||
}
|
}
|
||||||
|
|
||||||
for threads in threads:
|
for thread in threads:
|
||||||
data["ids[{}]".format(thread.id)] = "true" if read else "false"
|
data["ids[{}]".format(thread.id)] = "true" if read else "false"
|
||||||
|
|
||||||
j = self.session._payload_post("/ajax/mercury/change_read_status.php", data)
|
j = self.session._payload_post("/ajax/mercury/change_read_status.php", data)
|
||||||
|
@@ -54,15 +54,15 @@ class TitleSet(ThreadEvent):
|
|||||||
"""Somebody changed a group's title."""
|
"""Somebody changed a group's title."""
|
||||||
|
|
||||||
thread = attr.ib(type="_threads.Group") # Set the correct type
|
thread = attr.ib(type="_threads.Group") # Set the correct type
|
||||||
#: The new title
|
#: The new title. If ``None``, the title was removed
|
||||||
title = attr.ib(type=str)
|
title = attr.ib(type=Optional[str])
|
||||||
#: When the title was set
|
#: When the title was set
|
||||||
at = attr.ib(type=datetime.datetime)
|
at = attr.ib(type=datetime.datetime)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _parse(cls, session, data):
|
def _parse(cls, session, data):
|
||||||
author, thread, at = cls._parse_metadata(session, data)
|
author, thread, at = cls._parse_metadata(session, data)
|
||||||
return cls(author=author, thread=thread, title=data["name"], at=at)
|
return cls(author=author, thread=thread, title=data["name"] or None, at=at)
|
||||||
|
|
||||||
|
|
||||||
@attrs_event
|
@attrs_event
|
||||||
|
@@ -124,11 +124,11 @@ def handle_graphql_errors(j):
|
|||||||
errors = j["errors"]
|
errors = j["errors"]
|
||||||
if errors:
|
if errors:
|
||||||
error = errors[0] # TODO: Handle multiple errors
|
error = errors[0] # TODO: Handle multiple errors
|
||||||
# TODO: Use `severity` and `description`
|
# TODO: Use `severity`
|
||||||
raise GraphQLError(
|
raise GraphQLError(
|
||||||
# TODO: What data is always available?
|
# TODO: What data is always available?
|
||||||
message=error.get("summary", "Unknown error"),
|
message=error.get("summary", "Unknown error"),
|
||||||
description=error.get("message", ""),
|
description=error.get("message") or error.get("description") or "",
|
||||||
code=error.get("code"),
|
code=error.get("code"),
|
||||||
debug_info=error.get("debug_info"),
|
debug_info=error.get("debug_info"),
|
||||||
)
|
)
|
||||||
|
@@ -8,7 +8,7 @@ from . import _util, _exception, _session, _graphql, _events
|
|||||||
from typing import Iterable, Optional, Mapping, List
|
from typing import Iterable, Optional, Mapping, List
|
||||||
|
|
||||||
|
|
||||||
HOST = "edge-chat.facebook.com"
|
HOST = "edge-chat.messenger.com"
|
||||||
|
|
||||||
TOPICS = [
|
TOPICS = [
|
||||||
# Things that happen in chats (e.g. messages)
|
# Things that happen in chats (e.g. messages)
|
||||||
@@ -271,10 +271,10 @@ class Listener:
|
|||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
"Cookie": get_cookie_header(
|
"Cookie": get_cookie_header(
|
||||||
self.session._session, "https://edge-chat.facebook.com/chat"
|
self.session._session, "https://edge-chat.messenger.com/chat"
|
||||||
),
|
),
|
||||||
"User-Agent": self.session._session.headers["User-Agent"],
|
"User-Agent": self.session._session.headers["User-Agent"],
|
||||||
"Origin": "https://www.facebook.com",
|
"Origin": "https://www.messenger.com",
|
||||||
"Host": HOST,
|
"Host": HOST,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -4,8 +4,8 @@ import enum
|
|||||||
from string import Formatter
|
from string import Formatter
|
||||||
from . import _attachment, _location, _file, _quick_reply, _sticker
|
from . import _attachment, _location, _file, _quick_reply, _sticker
|
||||||
from .._common import log, attrs_default
|
from .._common import log, attrs_default
|
||||||
from .. import _exception, _util, _session, _threads
|
from .. import _exception, _util
|
||||||
from typing import Optional, Mapping, Sequence
|
from typing import Optional, Mapping, Sequence, Any
|
||||||
|
|
||||||
|
|
||||||
class EmojiSize(enum.Enum):
|
class EmojiSize(enum.Enum):
|
||||||
@@ -85,7 +85,7 @@ class Message:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
#: The thread that this message belongs to.
|
#: The thread that this message belongs to.
|
||||||
thread = attr.ib(type="_threads.ThreadABC")
|
thread = attr.ib()
|
||||||
#: The message ID.
|
#: The message ID.
|
||||||
id = attr.ib(converter=str, type=str)
|
id = attr.ib(converter=str, type=str)
|
||||||
|
|
||||||
@@ -277,7 +277,7 @@ class MessageData(Message):
|
|||||||
#: Message ID you want to reply to
|
#: Message ID you want to reply to
|
||||||
reply_to_id = attr.ib(None, type=Optional[str])
|
reply_to_id = attr.ib(None, type=Optional[str])
|
||||||
#: Replied message
|
#: Replied message
|
||||||
replied_to = attr.ib(None, type=Optional["MessageData"])
|
replied_to = attr.ib(None, type=Optional[Any])
|
||||||
#: Whether the message was forwarded
|
#: Whether the message was forwarded
|
||||||
forwarded = attr.ib(False, type=Optional[bool])
|
forwarded = attr.ib(False, type=Optional[bool])
|
||||||
|
|
||||||
|
@@ -1,17 +1,55 @@
|
|||||||
import attr
|
import attr
|
||||||
import bs4
|
|
||||||
import datetime
|
import datetime
|
||||||
import re
|
|
||||||
import requests
|
import requests
|
||||||
import random
|
import random
|
||||||
import urllib.parse
|
import re
|
||||||
|
import json
|
||||||
|
|
||||||
|
# TODO: Only import when required
|
||||||
|
# Or maybe just replace usage with `html.parser`?
|
||||||
|
import bs4
|
||||||
|
|
||||||
from ._common import log, kw_only
|
from ._common import log, kw_only
|
||||||
from . import _graphql, _util, _exception
|
from . import _graphql, _util, _exception
|
||||||
|
|
||||||
from typing import Optional, Tuple, Mapping, Callable
|
from typing import Optional, Mapping, Callable, Any
|
||||||
|
|
||||||
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
|
|
||||||
|
SERVER_JS_DEFINE_REGEX = re.compile(
|
||||||
|
r'(?:"ServerJS".{,100}\.handle\({.*"define":)|(?:require\("ServerJSDefine"\)\)?\.handleDefines\()'
|
||||||
|
)
|
||||||
|
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
|
||||||
|
|
||||||
|
|
||||||
|
def parse_server_js_define(html: str) -> Mapping[str, Any]:
|
||||||
|
"""Parse ``ServerJSDefine`` entries from a HTML document."""
|
||||||
|
# Find points where we should start parsing
|
||||||
|
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
|
||||||
|
|
||||||
|
# TODO: Extract jsmods "require" and "define" from `bigPipe.onPageletArrive`?
|
||||||
|
|
||||||
|
# Skip leading entry
|
||||||
|
_, *define_splits = define_splits
|
||||||
|
|
||||||
|
rtn = []
|
||||||
|
if not define_splits:
|
||||||
|
raise _exception.ParseError("Could not find any ServerJSDefine", data=html)
|
||||||
|
if len(define_splits) < 2:
|
||||||
|
raise _exception.ParseError("Could not find enough ServerJSDefine", data=html)
|
||||||
|
if len(define_splits) > 2:
|
||||||
|
raise _exception.ParseError("Found too many ServerJSDefine", data=define_splits)
|
||||||
|
# Parse entries (should be two)
|
||||||
|
for entry in define_splits:
|
||||||
|
try:
|
||||||
|
parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e
|
||||||
|
if not isinstance(parsed, list):
|
||||||
|
raise _exception.ParseError("Invalid ServerJSDefine", data=parsed)
|
||||||
|
rtn.extend(parsed)
|
||||||
|
|
||||||
|
# Convert to a dict
|
||||||
|
return _util.get_jsmods_define(rtn)
|
||||||
|
|
||||||
|
|
||||||
def base36encode(number: int) -> str:
|
def base36encode(number: int) -> str:
|
||||||
@@ -32,7 +70,7 @@ def base36encode(number: int) -> str:
|
|||||||
|
|
||||||
def prefix_url(url: str) -> str:
|
def prefix_url(url: str) -> str:
|
||||||
if url.startswith("/"):
|
if url.startswith("/"):
|
||||||
return "https://www.facebook.com" + url
|
return "https://www.messenger.com" + url
|
||||||
return url
|
return url
|
||||||
|
|
||||||
|
|
||||||
@@ -51,15 +89,17 @@ def get_user_id(session: requests.Session) -> str:
|
|||||||
return str(rtn)
|
return str(rtn)
|
||||||
|
|
||||||
|
|
||||||
def find_input_fields(html: str):
|
|
||||||
return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input"))
|
|
||||||
|
|
||||||
|
|
||||||
def session_factory() -> requests.Session:
|
def session_factory() -> requests.Session:
|
||||||
from . import __version__
|
from . import __version__
|
||||||
|
|
||||||
session = requests.session()
|
session = requests.session()
|
||||||
session.headers["Referer"] = "https://www.facebook.com"
|
# Override Facebook's locale detection during the login process.
|
||||||
|
# The locale is only used when giving errors back to the user, so giving the errors
|
||||||
|
# back in English makes it easier for users to report.
|
||||||
|
session.cookies = session.cookies = requests.cookies.merge_cookies(
|
||||||
|
session.cookies, {"locale": "en_US"}
|
||||||
|
)
|
||||||
|
session.headers["Referer"] = "https://www.messenger.com/"
|
||||||
# We won't try to set a fake user agent to mask our presence!
|
# We won't try to set a fake user agent to mask our presence!
|
||||||
# Facebook allows us access anyhow, and it makes our motives clearer:
|
# Facebook allows us access anyhow, and it makes our motives clearer:
|
||||||
# We're not trying to cheat Facebook, we simply want to access their service
|
# We're not trying to cheat Facebook, we simply want to access their service
|
||||||
@@ -71,81 +111,95 @@ def client_id_factory() -> str:
|
|||||||
return hex(int(random.random() * 2 ** 31))[2:]
|
return hex(int(random.random() * 2 ** 31))[2:]
|
||||||
|
|
||||||
|
|
||||||
def is_home(url: str) -> bool:
|
def find_form_request(html: str):
|
||||||
parts = urllib.parse.urlparse(url)
|
soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form"))
|
||||||
# Check the urls `/home.php` and `/`
|
|
||||||
return "home" in parts.path or "/" == parts.path
|
form = soup.form
|
||||||
|
if not form:
|
||||||
|
raise _exception.ParseError("Could not find form to submit", data=html)
|
||||||
|
|
||||||
|
url = form.get("action")
|
||||||
|
if not url:
|
||||||
|
raise _exception.ParseError("Could not find url to submit to", data=form)
|
||||||
|
|
||||||
|
# From what I've seen, it'll always do this!
|
||||||
|
if url.startswith("/"):
|
||||||
|
url = "https://www.facebook.com" + url
|
||||||
|
|
||||||
|
# It's okay to set missing values to something crap, the values are localized, and
|
||||||
|
# hence are not available in the raw HTML
|
||||||
|
data = {
|
||||||
|
x["name"]: x.get("value", "[missing]")
|
||||||
|
for x in form.find_all(["input", "button"])
|
||||||
|
}
|
||||||
|
return url, data
|
||||||
|
|
||||||
|
|
||||||
def _2fa_helper(session: requests.Session, code: int, r):
|
def two_factor_helper(session: requests.Session, r, on_2fa_callback):
|
||||||
soup = find_input_fields(r.text)
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
data = dict()
|
|
||||||
|
|
||||||
url = "https://m.facebook.com/login/checkpoint/"
|
# You don't have to type a code if your device is already saved
|
||||||
|
# Repeats if you get the code wrong
|
||||||
|
while "approvals_code" in data:
|
||||||
|
data["approvals_code"] = on_2fa_callback()
|
||||||
|
log.info("Submitting 2FA code")
|
||||||
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
|
|
||||||
data["approvals_code"] = str(code)
|
# TODO: Can be missing if checkup flow was done on another device in the meantime?
|
||||||
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"]
|
if "name_action_selected" in data:
|
||||||
data["nh"] = soup.find("input", {"name": "nh"})["value"]
|
data["name_action_selected"] = "save_device"
|
||||||
data["submit[Submit Code]"] = "Submit Code"
|
log.info("Saving browser")
|
||||||
data["codes_submitted"] = "0"
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
log.info("Submitting 2FA code.")
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
|
url = r.headers.get("Location")
|
||||||
|
if url and url.startswith("https://www.messenger.com/login/auth_token/"):
|
||||||
|
return url
|
||||||
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
|
|
||||||
r = session.post(url, data=data)
|
log.info("Starting Facebook checkup flow")
|
||||||
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
|
|
||||||
if is_home(r.url):
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
return r
|
if "verification_method" in data:
|
||||||
|
raise _exception.NotLoggedIn(
|
||||||
del data["approvals_code"]
|
"Your account is locked, and you need to log in using a browser, and verify it there!"
|
||||||
del data["submit[Submit Code]"]
|
)
|
||||||
del data["codes_submitted"]
|
if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
|
||||||
|
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
|
||||||
|
data["submit[This was me]"] = "[any value]"
|
||||||
|
del data["submit[This wasn't me]"]
|
||||||
|
log.info("Verifying login attempt")
|
||||||
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
|
|
||||||
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
|
if "name_action_selected" not in data:
|
||||||
|
raise _exception.ParseError("Could not fill out form properly (3)", data=data)
|
||||||
data["name_action_selected"] = "save_device"
|
data["name_action_selected"] = "save_device"
|
||||||
data["submit[Continue]"] = "Continue"
|
log.info("Saving device again")
|
||||||
log.info("Saving browser.")
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
# At this stage, we have dtsg, nh, name_action_selected, submit[Continue]
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
r = session.post(url, data=data)
|
return r.headers.get("Location")
|
||||||
|
|
||||||
if is_home(r.url):
|
|
||||||
return r
|
|
||||||
|
|
||||||
del data["name_action_selected"]
|
|
||||||
log.info("Starting Facebook checkup flow.")
|
|
||||||
# At this stage, we have dtsg, nh, submit[Continue]
|
|
||||||
r = session.post(url, data=data)
|
|
||||||
|
|
||||||
if is_home(r.url):
|
|
||||||
return r
|
|
||||||
|
|
||||||
del data["submit[Continue]"]
|
|
||||||
data["submit[This was me]"] = "This Was Me"
|
|
||||||
log.info("Verifying login attempt.")
|
|
||||||
# At this stage, we have dtsg, nh, submit[This was me]
|
|
||||||
r = session.post(url, data=data)
|
|
||||||
|
|
||||||
if is_home(r.url):
|
|
||||||
return r
|
|
||||||
|
|
||||||
del data["submit[This was me]"]
|
|
||||||
data["submit[Continue]"] = "Continue"
|
|
||||||
data["name_action_selected"] = "save_device"
|
|
||||||
log.info("Saving device again.")
|
|
||||||
# At this stage, we have dtsg, nh, submit[Continue], name_action_selected
|
|
||||||
r = session.post(url, data=data)
|
|
||||||
return r
|
|
||||||
|
|
||||||
|
|
||||||
def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]:
|
def get_error_data(html: str) -> Optional[str]:
|
||||||
"""Get error code and message from a request."""
|
"""Get error message from a request."""
|
||||||
code = None
|
|
||||||
try:
|
|
||||||
code = int(_util.get_url_parameter(url, "e"))
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
soup = bs4.BeautifulSoup(
|
soup = bs4.BeautifulSoup(
|
||||||
html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error")
|
html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
|
||||||
)
|
)
|
||||||
return code, soup.get_text() or None
|
# Attempt to extract and format the error string
|
||||||
|
return " ".join(list(soup.stripped_strings)[1:3]) or None
|
||||||
|
|
||||||
|
|
||||||
|
def get_fb_dtsg(define) -> Optional[str]:
|
||||||
|
if "DTSGInitData" in define:
|
||||||
|
return define["DTSGInitData"]["token"]
|
||||||
|
elif "DTSGInitialData" in define:
|
||||||
|
return define["DTSGInitialData"]["token"]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
|
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
|
||||||
@@ -161,7 +215,6 @@ class Session:
|
|||||||
_session = attr.ib(factory=session_factory, type=requests.Session)
|
_session = attr.ib(factory=session_factory, type=requests.Session)
|
||||||
_counter = attr.ib(0, type=int)
|
_counter = attr.ib(0, type=int)
|
||||||
_client_id = attr.ib(factory=client_id_factory, type=str)
|
_client_id = attr.ib(factory=client_id_factory, type=str)
|
||||||
_logout_h = attr.ib(None, type=Optional[str])
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def user(self):
|
def user(self):
|
||||||
@@ -185,6 +238,7 @@ class Session:
|
|||||||
"fb_dtsg": self._fb_dtsg,
|
"fb_dtsg": self._fb_dtsg,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# TODO: Add ability to load previous cookies in here, to avoid 2fa flow
|
||||||
@classmethod
|
@classmethod
|
||||||
def login(
|
def login(
|
||||||
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
|
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
|
||||||
@@ -194,65 +248,99 @@ class Session:
|
|||||||
Args:
|
Args:
|
||||||
email: Facebook ``email``, ``id`` or ``phone number``
|
email: Facebook ``email``, ``id`` or ``phone number``
|
||||||
password: Facebook account password
|
password: Facebook account password
|
||||||
on_2fa_callback: Function that will be called, in case a 2FA code is needed.
|
on_2fa_callback: Function that will be called, in case a two factor
|
||||||
This should return the requested 2FA code.
|
authentication code is needed. This should return the requested code.
|
||||||
|
|
||||||
|
Tested using SMS and authentication applications. If you have both
|
||||||
|
enabled, you might not receive an SMS code, and you'll have to use the
|
||||||
|
authentication application.
|
||||||
|
|
||||||
|
Note: Facebook limits the amount of codes they will give you, so if you
|
||||||
|
don't receive a code, be patient, and try again later!
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
>>> import getpass
|
|
||||||
>>> import fbchat
|
>>> import fbchat
|
||||||
>>> session = fbchat.Session.login("<email or phone>", getpass.getpass())
|
>>> import getpass
|
||||||
|
>>> session = fbchat.Session.login(
|
||||||
|
... input("Email: "),
|
||||||
|
... getpass.getpass(),
|
||||||
|
... on_2fa_callback=lambda: input("2FA Code: ")
|
||||||
|
... )
|
||||||
|
Email: abc@gmail.com
|
||||||
|
Password: ****
|
||||||
|
2FA Code: 123456
|
||||||
>>> session.user.id
|
>>> session.user.id
|
||||||
"1234"
|
"1234"
|
||||||
"""
|
"""
|
||||||
session = session_factory()
|
session = session_factory()
|
||||||
|
|
||||||
try:
|
data = {
|
||||||
r = session.get("https://m.facebook.com/")
|
# "jazoest": "2754",
|
||||||
except requests.RequestException as e:
|
# "lsd": "AVqqqRUa",
|
||||||
_exception.handle_requests_error(e)
|
"initial_request_id": "x", # any, just has to be present
|
||||||
soup = find_input_fields(r.text)
|
# "timezone": "-120",
|
||||||
|
# "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=",
|
||||||
data = dict(
|
# "lgnrnd": "044039_RGm9",
|
||||||
(elem["name"], elem["value"])
|
"lgnjs": "n",
|
||||||
for elem in soup
|
"email": email,
|
||||||
if elem.has_attr("value") and elem.has_attr("name")
|
"pass": password,
|
||||||
)
|
"login": "1",
|
||||||
data["email"] = email
|
"persistent": "1", # Changes the cookie type to have a long "expires"
|
||||||
data["pass"] = password
|
"default_persistent": "0",
|
||||||
data["login"] = "Log In"
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
url = "https://m.facebook.com/login.php?login_attempt=1"
|
# Should hit a redirect to https://www.messenger.com/
|
||||||
r = session.post(url, data=data)
|
# If this does happen, the session is logged in!
|
||||||
except requests.RequestException as e:
|
r = session.post(
|
||||||
_exception.handle_requests_error(e)
|
"https://www.messenger.com/login/password/",
|
||||||
|
data=data,
|
||||||
# Usually, 'Checkpoint' will refer to 2FA
|
allow_redirects=False,
|
||||||
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
|
|
||||||
if not on_2fa_callback:
|
|
||||||
raise ValueError(
|
|
||||||
"2FA code required, please add `on_2fa_callback` to .login"
|
|
||||||
)
|
|
||||||
code = on_2fa_callback()
|
|
||||||
try:
|
|
||||||
r = _2fa_helper(session, code, r)
|
|
||||||
except requests.RequestException as e:
|
|
||||||
_exception.handle_requests_error(e)
|
|
||||||
|
|
||||||
# Sometimes Facebook tries to show the user a "Save Device" dialog
|
|
||||||
if "save-device" in r.url:
|
|
||||||
try:
|
|
||||||
r = session.get("https://m.facebook.com/login/save-device/cancel/")
|
|
||||||
except requests.RequestException as e:
|
|
||||||
_exception.handle_requests_error(e)
|
|
||||||
|
|
||||||
if is_home(r.url):
|
|
||||||
return cls._from_session(session=session)
|
|
||||||
else:
|
|
||||||
code, msg = get_error_data(r.text, r.url)
|
|
||||||
raise _exception.ExternalError(
|
|
||||||
"Login failed at url {!r}".format(r.url), msg, code=code
|
|
||||||
)
|
)
|
||||||
|
except requests.RequestException as e:
|
||||||
|
_exception.handle_requests_error(e)
|
||||||
|
_exception.handle_http_error(r.status_code)
|
||||||
|
|
||||||
|
url = r.headers.get("Location")
|
||||||
|
|
||||||
|
# We weren't redirected, hence the email or password was wrong
|
||||||
|
if not url:
|
||||||
|
error = get_error_data(r.content.decode("utf-8"))
|
||||||
|
raise _exception.NotLoggedIn(error)
|
||||||
|
|
||||||
|
if "checkpoint" in url:
|
||||||
|
if not on_2fa_callback:
|
||||||
|
raise _exception.NotLoggedIn(
|
||||||
|
"2FA code required! Please supply `on_2fa_callback` to .login"
|
||||||
|
)
|
||||||
|
# Get a facebook.com/checkpoint/start url that handles the 2FA flow
|
||||||
|
# This probably works differently for Messenger-only accounts
|
||||||
|
url = _util.get_url_parameter(url, "next")
|
||||||
|
if not url.startswith("https://www.facebook.com/checkpoint/start/"):
|
||||||
|
raise _exception.ParseError("Failed 2fa flow (1)", data=url)
|
||||||
|
|
||||||
|
r = session.get(url, allow_redirects=False)
|
||||||
|
url = r.headers.get("Location")
|
||||||
|
if not url or not url.startswith("https://www.facebook.com/checkpoint/"):
|
||||||
|
raise _exception.ParseError("Failed 2fa flow (2)", data=url)
|
||||||
|
|
||||||
|
r = session.get(url, allow_redirects=False)
|
||||||
|
url = two_factor_helper(session, r, on_2fa_callback)
|
||||||
|
|
||||||
|
if not url.startswith("https://www.messenger.com/login/auth_token/"):
|
||||||
|
raise _exception.ParseError("Failed 2fa flow (3)", data=url)
|
||||||
|
|
||||||
|
r = session.get(url, allow_redirects=False)
|
||||||
|
url = r.headers.get("Location")
|
||||||
|
|
||||||
|
if url != "https://www.messenger.com/":
|
||||||
|
error = get_error_data(r.content.decode("utf-8"))
|
||||||
|
raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error))
|
||||||
|
|
||||||
|
try:
|
||||||
|
return cls._from_session(session=session)
|
||||||
|
except _exception.NotLoggedIn as e:
|
||||||
|
raise _exception.ParseError("Failed loading session", data=r) from e
|
||||||
|
|
||||||
def is_logged_in(self) -> bool:
|
def is_logged_in(self) -> bool:
|
||||||
"""Send a request to Facebook to check the login status.
|
"""Send a request to Facebook to check the login status.
|
||||||
@@ -264,12 +352,12 @@ class Session:
|
|||||||
>>> assert session.is_logged_in()
|
>>> assert session.is_logged_in()
|
||||||
"""
|
"""
|
||||||
# Send a request to the login url, to see if we're directed to the home page
|
# Send a request to the login url, to see if we're directed to the home page
|
||||||
url = "https://m.facebook.com/login.php?login_attempt=1"
|
|
||||||
try:
|
try:
|
||||||
r = self._session.get(url, allow_redirects=False)
|
r = self._session.get(prefix_url("/login/"), allow_redirects=False)
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
_exception.handle_requests_error(e)
|
_exception.handle_requests_error(e)
|
||||||
return "Location" in r.headers and is_home(r.headers["Location"])
|
_exception.handle_http_error(r.status_code)
|
||||||
|
return "https://www.messenger.com/" == r.headers.get("Location")
|
||||||
|
|
||||||
def logout(self) -> None:
|
def logout(self) -> None:
|
||||||
"""Safely log out the user.
|
"""Safely log out the user.
|
||||||
@@ -279,56 +367,51 @@ class Session:
|
|||||||
Example:
|
Example:
|
||||||
>>> session.logout()
|
>>> session.logout()
|
||||||
"""
|
"""
|
||||||
logout_h = self._logout_h
|
data = {"fb_dtsg": self._fb_dtsg}
|
||||||
if not logout_h:
|
|
||||||
url = prefix_url("/bluebar/modern_settings_menu/")
|
|
||||||
try:
|
|
||||||
h_r = self._session.post(url, data={"pmid": "4"})
|
|
||||||
except requests.RequestException as e:
|
|
||||||
_exception.handle_requests_error(e)
|
|
||||||
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
|
|
||||||
|
|
||||||
url = prefix_url("/logout.php")
|
|
||||||
try:
|
try:
|
||||||
r = self._session.get(url, params={"ref": "mb", "h": logout_h})
|
r = self._session.post(
|
||||||
|
prefix_url("/logout/"), data=data, allow_redirects=False
|
||||||
|
)
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
_exception.handle_requests_error(e)
|
_exception.handle_requests_error(e)
|
||||||
_exception.handle_http_error(r.status_code)
|
_exception.handle_http_error(r.status_code)
|
||||||
|
|
||||||
|
if "Location" not in r.headers:
|
||||||
|
raise _exception.FacebookError("Failed logging out, was not redirected!")
|
||||||
|
if "https://www.messenger.com/login/" != r.headers["Location"]:
|
||||||
|
raise _exception.FacebookError(
|
||||||
|
"Failed logging out, got bad redirect: {}".format(r.headers["Location"])
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _from_session(cls, session):
|
def _from_session(cls, session):
|
||||||
# TODO: Automatically set user_id when the cookie changes in the session
|
# TODO: Automatically set user_id when the cookie changes in the session
|
||||||
user_id = get_user_id(session)
|
user_id = get_user_id(session)
|
||||||
|
|
||||||
|
# Make a request to the main page to retrieve ServerJSDefine entries
|
||||||
try:
|
try:
|
||||||
r = session.get(prefix_url("/"))
|
r = session.get(prefix_url("/"), allow_redirects=False)
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
_exception.handle_requests_error(e)
|
_exception.handle_requests_error(e)
|
||||||
|
_exception.handle_http_error(r.status_code)
|
||||||
|
|
||||||
soup = find_input_fields(r.text)
|
define = parse_server_js_define(r.content.decode("utf-8"))
|
||||||
|
|
||||||
fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"})
|
fb_dtsg = get_fb_dtsg(define)
|
||||||
if fb_dtsg_element:
|
if fb_dtsg is None:
|
||||||
fb_dtsg = fb_dtsg_element["value"]
|
raise _exception.ParseError("Could not find fb_dtsg", data=define)
|
||||||
else:
|
if not fb_dtsg:
|
||||||
# Fall back to searching with a regex
|
# Happens when the client is not actually logged in
|
||||||
res = FB_DTSG_REGEX.search(r.text)
|
raise _exception.NotLoggedIn(
|
||||||
if not res:
|
"Found empty fb_dtsg, the session was probably invalid."
|
||||||
raise _exception.NotLoggedIn("Could not find fb_dtsg")
|
)
|
||||||
fb_dtsg = res.group(1)
|
|
||||||
|
|
||||||
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0])
|
try:
|
||||||
|
revision = int(define["SiteData"]["client_revision"])
|
||||||
|
except TypeError:
|
||||||
|
raise _exception.ParseError("Could not find client revision", data=define)
|
||||||
|
|
||||||
logout_h_element = soup.find("input", {"name": "h"})
|
return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session)
|
||||||
logout_h = logout_h_element["value"] if logout_h_element else None
|
|
||||||
|
|
||||||
return cls(
|
|
||||||
user_id=user_id,
|
|
||||||
fb_dtsg=fb_dtsg,
|
|
||||||
revision=revision,
|
|
||||||
session=session,
|
|
||||||
logout_h=logout_h,
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_cookies(self) -> Mapping[str, str]:
|
def get_cookies(self) -> Mapping[str, str]:
|
||||||
"""Retrieve session cookies, that can later be used in `from_cookies`.
|
"""Retrieve session cookies, that can later be used in `from_cookies`.
|
||||||
@@ -383,10 +466,9 @@ class Session:
|
|||||||
# update fb_dtsg token if received in response
|
# update fb_dtsg token if received in response
|
||||||
if "jsmods" in j:
|
if "jsmods" in j:
|
||||||
define = _util.get_jsmods_define(j["jsmods"]["define"])
|
define = _util.get_jsmods_define(j["jsmods"]["define"])
|
||||||
if "DTSGInitData" in define:
|
fb_dtsg = get_fb_dtsg(define)
|
||||||
self._fb_dtsg = define["DTSGInitData"]["token"]
|
if fb_dtsg:
|
||||||
elif "DTSGInitialData" in define:
|
self._fb_dtsg = fb_dtsg
|
||||||
self._fb_dtsg = define["DTSGInitialData"]["token"]
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return j["payload"]
|
return j["payload"]
|
||||||
@@ -404,7 +486,7 @@ class Session:
|
|||||||
return self._post("/api/graphqlbatch/", data, as_graphql=True)
|
return self._post("/api/graphqlbatch/", data, as_graphql=True)
|
||||||
|
|
||||||
def _do_send_request(self, data):
|
def _do_send_request(self, data):
|
||||||
now = datetime.datetime.utcnow()
|
now = _util.now()
|
||||||
offline_threading_id = _util.generate_offline_threading_id()
|
offline_threading_id = _util.generate_offline_threading_id()
|
||||||
data["client"] = "mercury"
|
data["client"] = "mercury"
|
||||||
data["author"] = "fbid:{}".format(self._user_id)
|
data["author"] = "fbid:{}".format(self._user_id)
|
||||||
|
@@ -116,8 +116,14 @@ class ThreadABC(metaclass=abc.ABCMeta):
|
|||||||
reply_to_id: Optional message to reply to
|
reply_to_id: Optional message to reply to
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
Send a message with a mention to a thread.
|
||||||
|
|
||||||
>>> mention = fbchat.Mention(thread_id="1234", offset=5, length=2)
|
>>> mention = fbchat.Mention(thread_id="1234", offset=5, length=2)
|
||||||
>>> thread.send_text("A message", mentions=[mention])
|
>>> message_id = thread.send_text("A message", mentions=[mention])
|
||||||
|
|
||||||
|
Reply to the message.
|
||||||
|
|
||||||
|
>>> thread.send_text("A reply", reply_to_id=message_id)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The sent message
|
The sent message
|
||||||
@@ -211,7 +217,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
|
|||||||
Example:
|
Example:
|
||||||
Send a pinned location in Beijing, China.
|
Send a pinned location in Beijing, China.
|
||||||
|
|
||||||
>>> thread.send_location(39.9390731, 116.117273)
|
>>> thread.send_pinned_location(39.9390731, 116.117273)
|
||||||
"""
|
"""
|
||||||
self._send_location(False, latitude=latitude, longitude=longitude)
|
self._send_location(False, latitude=latitude, longitude=longitude)
|
||||||
|
|
||||||
@@ -313,7 +319,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
def search_messages(
|
def search_messages(
|
||||||
self, query: str, limit: int
|
self, query: str, limit: int
|
||||||
) -> Iterable["_models.MessageSnippet"]:
|
) -> Iterable[_models.MessageSnippet]:
|
||||||
"""Find and get message IDs by query.
|
"""Find and get message IDs by query.
|
||||||
|
|
||||||
Warning! If someone send a message to the thread that matches the query, while
|
Warning! If someone send a message to the thread that matches the query, while
|
||||||
|
@@ -180,7 +180,7 @@ class GroupData(Group):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
#: The group's picture
|
#: The group's picture
|
||||||
photo = attr.ib(None, type=Optional["_models.Image"])
|
photo = attr.ib(None, type=Optional[_models.Image])
|
||||||
#: The name of the group
|
#: The name of the group
|
||||||
name = attr.ib(None, type=Optional[str])
|
name = attr.ib(None, type=Optional[str])
|
||||||
#: When the group was last active / when the last message was sent
|
#: When the group was last active / when the last message was sent
|
||||||
@@ -188,7 +188,7 @@ class GroupData(Group):
|
|||||||
#: Number of messages in the group
|
#: Number of messages in the group
|
||||||
message_count = attr.ib(None, type=Optional[int])
|
message_count = attr.ib(None, type=Optional[int])
|
||||||
#: Set `Plan`
|
#: Set `Plan`
|
||||||
plan = attr.ib(None, type=Optional["_models.PlanData"])
|
plan = attr.ib(None, type=Optional[_models.PlanData])
|
||||||
#: The group thread's participant user ids
|
#: The group thread's participant user ids
|
||||||
participants = attr.ib(factory=set, type=Set[str])
|
participants = attr.ib(factory=set, type=Set[str])
|
||||||
#: A dictionary, containing user nicknames mapped to their IDs
|
#: A dictionary, containing user nicknames mapped to their IDs
|
||||||
|
@@ -37,7 +37,7 @@ class PageData(Page):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
#: The page's picture
|
#: The page's picture
|
||||||
photo = attr.ib(type="_models.Image")
|
photo = attr.ib(type=_models.Image)
|
||||||
#: The name of the page
|
#: The name of the page
|
||||||
name = attr.ib(type=str)
|
name = attr.ib(type=str)
|
||||||
#: When the thread was last active / when the last message was sent
|
#: When the thread was last active / when the last message was sent
|
||||||
@@ -45,7 +45,7 @@ class PageData(Page):
|
|||||||
#: Number of messages in the thread
|
#: Number of messages in the thread
|
||||||
message_count = attr.ib(None, type=Optional[int])
|
message_count = attr.ib(None, type=Optional[int])
|
||||||
#: Set `Plan`
|
#: Set `Plan`
|
||||||
plan = attr.ib(None, type=Optional["_models.PlanData"])
|
plan = attr.ib(None, type=Optional[_models.PlanData])
|
||||||
#: The page's custom URL
|
#: The page's custom URL
|
||||||
url = attr.ib(None, type=Optional[str])
|
url = attr.ib(None, type=Optional[str])
|
||||||
#: The name of the page's location city
|
#: The name of the page's location city
|
||||||
|
@@ -105,7 +105,7 @@ class UserData(User):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
#: The user's picture
|
#: The user's picture
|
||||||
photo = attr.ib(type="_models.Image")
|
photo = attr.ib(type=_models.Image)
|
||||||
#: The name of the user
|
#: The name of the user
|
||||||
name = attr.ib(type=str)
|
name = attr.ib(type=str)
|
||||||
#: Whether the user and the client are friends
|
#: Whether the user and the client are friends
|
||||||
@@ -119,7 +119,7 @@ class UserData(User):
|
|||||||
#: Number of messages in the thread
|
#: Number of messages in the thread
|
||||||
message_count = attr.ib(None, type=Optional[int])
|
message_count = attr.ib(None, type=Optional[int])
|
||||||
#: Set `Plan`
|
#: Set `Plan`
|
||||||
plan = attr.ib(None, type=Optional["_models.PlanData"])
|
plan = attr.ib(None, type=Optional[_models.PlanData])
|
||||||
#: The profile URL. ``None`` for Messenger-only users
|
#: The profile URL. ``None`` for Messenger-only users
|
||||||
url = attr.ib(None, type=Optional[str])
|
url = attr.ib(None, type=Optional[str])
|
||||||
#: The user's gender
|
#: The user's gender
|
||||||
|
@@ -56,23 +56,26 @@ def parse_json(text: str) -> Any:
|
|||||||
|
|
||||||
|
|
||||||
def generate_offline_threading_id():
|
def generate_offline_threading_id():
|
||||||
ret = datetime_to_millis(datetime.datetime.utcnow())
|
ret = datetime_to_millis(now())
|
||||||
value = int(random.random() * 4294967295)
|
value = int(random.random() * 4294967295)
|
||||||
string = ("0000000000000000000000" + format(value, "b"))[-22:]
|
string = ("0000000000000000000000" + format(value, "b"))[-22:]
|
||||||
msgs = format(ret, "b") + string
|
msgs = format(ret, "b") + string
|
||||||
return str(int(msgs, 2))
|
return str(int(msgs, 2))
|
||||||
|
|
||||||
|
|
||||||
|
def remove_version_from_module(module):
|
||||||
|
return module.split("@", 1)[0]
|
||||||
|
|
||||||
|
|
||||||
def get_jsmods_require(require) -> Mapping[str, Sequence[Any]]:
|
def get_jsmods_require(require) -> Mapping[str, Sequence[Any]]:
|
||||||
rtn = {}
|
rtn = {}
|
||||||
for item in require:
|
for item in require:
|
||||||
if len(item) == 1:
|
if len(item) == 1:
|
||||||
(module,) = item
|
(module,) = item
|
||||||
rtn[module] = []
|
rtn[remove_version_from_module(module)] = []
|
||||||
continue
|
continue
|
||||||
method = "{}.{}".format(item[0], item[1])
|
module, method, requirements, arguments = item
|
||||||
requirements = item[2]
|
method = "{}.{}".format(remove_version_from_module(module), method)
|
||||||
arguments = item[3]
|
|
||||||
rtn[method] = arguments
|
rtn[method] = arguments
|
||||||
return rtn
|
return rtn
|
||||||
|
|
||||||
@@ -155,3 +158,11 @@ def timedelta_to_seconds(td: datetime.timedelta) -> int:
|
|||||||
The returned seconds will be rounded to the nearest whole number.
|
The returned seconds will be rounded to the nearest whole number.
|
||||||
"""
|
"""
|
||||||
return round(td.total_seconds())
|
return round(td.total_seconds())
|
||||||
|
|
||||||
|
|
||||||
|
def now() -> datetime.datetime:
|
||||||
|
"""The current time.
|
||||||
|
|
||||||
|
Similar to datetime.datetime.now(), but returns a non-naive datetime.
|
||||||
|
"""
|
||||||
|
return datetime.datetime.now(tz=datetime.timezone.utc)
|
||||||
|
@@ -1,6 +1,10 @@
|
|||||||
[pytest]
|
[pytest]
|
||||||
xfail_strict = true
|
xfail_strict = true
|
||||||
|
markers =
|
||||||
|
online: Online tests, that require a user account set up. Meant to be used \
|
||||||
|
manually, to check whether Facebook has broken something.
|
||||||
addopts =
|
addopts =
|
||||||
--strict
|
--strict
|
||||||
|
-m "not online"
|
||||||
testpaths = tests
|
testpaths = tests
|
||||||
filterwarnings = error
|
filterwarnings = error
|
||||||
|
@@ -123,6 +123,37 @@ def test_title_set(session):
|
|||||||
) == parse_delta(session, data)
|
) == parse_delta(session, data)
|
||||||
|
|
||||||
|
|
||||||
|
def test_title_removed(session):
|
||||||
|
data = {
|
||||||
|
"irisSeqId": "11223344",
|
||||||
|
"irisTags": ["DeltaThreadName", "is_from_iris_fanout"],
|
||||||
|
"messageMetadata": {
|
||||||
|
"actorFbId": "3456",
|
||||||
|
"adminText": "You removed the group name.",
|
||||||
|
"folderId": {"systemFolderId": "INBOX"},
|
||||||
|
"messageId": "mid.$XYZ",
|
||||||
|
"offlineThreadingId": "1122334455",
|
||||||
|
"skipBumpThread": False,
|
||||||
|
"tags": [],
|
||||||
|
"threadKey": {"threadFbId": "4321"},
|
||||||
|
"threadReadStateEffect": "KEEP_AS_IS",
|
||||||
|
"timestamp": "1500000000000",
|
||||||
|
"unsendType": "deny_log_message",
|
||||||
|
},
|
||||||
|
"name": "",
|
||||||
|
"participants": ["1234", "2345", "3456", "4567"],
|
||||||
|
"requestContext": {"apiArgs": {}},
|
||||||
|
"tqSeqId": "1111",
|
||||||
|
"class": "ThreadName",
|
||||||
|
}
|
||||||
|
assert TitleSet(
|
||||||
|
author=User(session=session, id="3456"),
|
||||||
|
thread=Group(session=session, id="4321"),
|
||||||
|
title=None,
|
||||||
|
at=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc),
|
||||||
|
) == parse_delta(session, data)
|
||||||
|
|
||||||
|
|
||||||
def test_forced_fetch(session):
|
def test_forced_fetch(session):
|
||||||
data = {
|
data = {
|
||||||
"forceInsert": False,
|
"forceInsert": False,
|
||||||
|
67
tests/online/conftest.py
Normal file
67
tests/online/conftest.py
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
import fbchat
|
||||||
|
import pytest
|
||||||
|
import logging
|
||||||
|
import getpass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def session(pytestconfig):
|
||||||
|
session_cookies = pytestconfig.cache.get("session_cookies", None)
|
||||||
|
try:
|
||||||
|
session = fbchat.Session.from_cookies(session_cookies)
|
||||||
|
except fbchat.FacebookError:
|
||||||
|
logging.exception("Error while logging in with cookies!")
|
||||||
|
session = fbchat.Session.login(input("Email: "), getpass.getpass("Password: "))
|
||||||
|
|
||||||
|
yield session
|
||||||
|
|
||||||
|
pytestconfig.cache.set("session_cookies", session.get_cookies())
|
||||||
|
|
||||||
|
# TODO: Allow the main session object to be closed - and perhaps used in `with`?
|
||||||
|
session._session.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client(session):
|
||||||
|
return fbchat.Client(session=session)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def user(pytestconfig, session):
|
||||||
|
user_id = pytestconfig.cache.get("user_id", None)
|
||||||
|
if not user_id:
|
||||||
|
user_id = input("A user you're chatting with's id: ")
|
||||||
|
pytestconfig.cache.set("user_id", user_id)
|
||||||
|
return fbchat.User(session=session, id=user_id)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def group(pytestconfig, session):
|
||||||
|
group_id = pytestconfig.cache.get("group_id", None)
|
||||||
|
if not group_id:
|
||||||
|
group_id = input("A group you're chatting with's id: ")
|
||||||
|
pytestconfig.cache.set("group_id", group_id)
|
||||||
|
return fbchat.Group(session=session, id=group_id)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
scope="session",
|
||||||
|
params=[
|
||||||
|
"user",
|
||||||
|
"group",
|
||||||
|
"self",
|
||||||
|
pytest.param("invalid", marks=[pytest.mark.xfail()]),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def any_thread(request, session, user, group):
|
||||||
|
return {
|
||||||
|
"user": user,
|
||||||
|
"group": group,
|
||||||
|
"self": session.user,
|
||||||
|
"invalid": fbchat.Thread(session=session, id="0"),
|
||||||
|
}[request.param]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def listener(session):
|
||||||
|
return fbchat.Listener(session=session, chat_on=False, foreground=False)
|
116
tests/online/test_client.py
Normal file
116
tests/online/test_client.py
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
import pytest
|
||||||
|
import fbchat
|
||||||
|
import os
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.online
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch(client):
|
||||||
|
client.fetch_users()
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_for_users(client):
|
||||||
|
list(client.search_for_users("test", 10))
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_for_pages(client):
|
||||||
|
list(client.search_for_pages("test", 100))
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_for_groups(client):
|
||||||
|
list(client.search_for_groups("test", 1000))
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_for_threads(client):
|
||||||
|
list(client.search_for_threads("test", 1000))
|
||||||
|
|
||||||
|
with pytest.raises(fbchat.HTTPError, match="rate limited"):
|
||||||
|
list(client.search_for_threads("test", 10000))
|
||||||
|
|
||||||
|
|
||||||
|
def test_message_search(client):
|
||||||
|
list(client.search_messages("test", 500))
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_thread_info(client):
|
||||||
|
list(client.fetch_thread_info(["4"]))[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_threads(client):
|
||||||
|
list(client.fetch_threads(20))
|
||||||
|
list(client.fetch_threads(200))
|
||||||
|
|
||||||
|
|
||||||
|
def test_undocumented(client):
|
||||||
|
client.fetch_unread()
|
||||||
|
client.fetch_unseen()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def open_resource(pytestconfig):
|
||||||
|
def get_resource_inner(filename):
|
||||||
|
path = os.path.join(pytestconfig.rootdir, "tests", "resources", filename)
|
||||||
|
return open(path, "rb")
|
||||||
|
|
||||||
|
return get_resource_inner
|
||||||
|
|
||||||
|
|
||||||
|
def test_upload_and_fetch_image_url(client, open_resource):
|
||||||
|
with open_resource("image.png") as f:
|
||||||
|
((id, mimetype),) = client.upload([("image.png", f, "image/png")])
|
||||||
|
assert mimetype == "image/png"
|
||||||
|
|
||||||
|
assert client.fetch_image_url(id).startswith("http")
|
||||||
|
|
||||||
|
|
||||||
|
def test_upload_image(client, open_resource):
|
||||||
|
with open_resource("image.png") as f:
|
||||||
|
_ = client.upload([("image.png", f, "image/png")])
|
||||||
|
|
||||||
|
|
||||||
|
def test_upload_many(client, open_resource):
|
||||||
|
with open_resource("image.png") as f_png, open_resource(
|
||||||
|
"image.jpg"
|
||||||
|
) as f_jpg, open_resource("image.gif") as f_gif, open_resource(
|
||||||
|
"file.json"
|
||||||
|
) as f_json, open_resource(
|
||||||
|
"file.txt"
|
||||||
|
) as f_txt, open_resource(
|
||||||
|
"audio.mp3"
|
||||||
|
) as f_mp3, open_resource(
|
||||||
|
"video.mp4"
|
||||||
|
) as f_mp4:
|
||||||
|
_ = client.upload(
|
||||||
|
[
|
||||||
|
("image.png", f_png, "image/png"),
|
||||||
|
("image.jpg", f_jpg, "image/jpeg"),
|
||||||
|
("image.gif", f_gif, "image/gif"),
|
||||||
|
("file.json", f_json, "application/json"),
|
||||||
|
("file.txt", f_txt, "text/plain"),
|
||||||
|
("audio.mp3", f_mp3, "audio/mpeg"),
|
||||||
|
("video.mp4", f_mp4, "video/mp4"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mark_as_read(client, user, group):
|
||||||
|
client.mark_as_read([user, group], fbchat._util.now())
|
||||||
|
|
||||||
|
|
||||||
|
def test_mark_as_unread(client, user, group):
|
||||||
|
client.mark_as_unread([user, group], fbchat._util.now())
|
||||||
|
|
||||||
|
|
||||||
|
def test_move_threads(client, user, group):
|
||||||
|
client.move_threads(fbchat.ThreadLocation.PENDING, [user, group])
|
||||||
|
client.move_threads(fbchat.ThreadLocation.INBOX, [user, group])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="need to have threads to delete")
|
||||||
|
def test_delete_threads():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="need to have messages to delete")
|
||||||
|
def test_delete_messages():
|
||||||
|
pass
|
42
tests/online/test_send.py
Normal file
42
tests/online/test_send.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import pytest
|
||||||
|
import fbchat
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.online
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Verify return values
|
||||||
|
|
||||||
|
|
||||||
|
def test_wave(any_thread):
|
||||||
|
assert any_thread.wave(True)
|
||||||
|
assert any_thread.wave(False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_text(any_thread):
|
||||||
|
assert any_thread.send_text("Test")
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_text_with_mention(any_thread):
|
||||||
|
mention = fbchat.Mention(thread_id=any_thread.id, offset=5, length=8)
|
||||||
|
assert any_thread.send_text("Test @mention", mentions=[mention])
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_emoji(any_thread):
|
||||||
|
assert any_thread.send_emoji("😀", size=fbchat.EmojiSize.LARGE)
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_sticker(any_thread):
|
||||||
|
assert any_thread.send_sticker("1889713947839631")
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_location(any_thread):
|
||||||
|
any_thread.send_location(51.5287718, -0.2416815)
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_pinned_location(any_thread):
|
||||||
|
any_thread.send_pinned_location(39.9390731, 116.117273)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="need a way to use the uploaded files from test_client.py")
|
||||||
|
def test_send_files(any_thread):
|
||||||
|
pass
|
@@ -1,15 +1,61 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import pytest
|
import pytest
|
||||||
|
from fbchat import ParseError, _util
|
||||||
from fbchat._session import (
|
from fbchat._session import (
|
||||||
|
parse_server_js_define,
|
||||||
base36encode,
|
base36encode,
|
||||||
prefix_url,
|
prefix_url,
|
||||||
generate_message_id,
|
generate_message_id,
|
||||||
|
session_factory,
|
||||||
client_id_factory,
|
client_id_factory,
|
||||||
is_home,
|
find_form_request,
|
||||||
get_error_data,
|
get_error_data,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_server_js_define_old():
|
||||||
|
html = """
|
||||||
|
some data;require("TimeSliceImpl").guard(function(){(require("ServerJSDefine")).handleDefines([["DTSGInitialData",[],{"token":"123"},100]])
|
||||||
|
|
||||||
|
<script>require("TimeSliceImpl").guard(function() {require("ServerJSDefine").handleDefines([["DTSGInitData",[],{"token":"123","async_get_token":"12345"},3333]])
|
||||||
|
|
||||||
|
</script>
|
||||||
|
other irrelevant data
|
||||||
|
"""
|
||||||
|
define = parse_server_js_define(html)
|
||||||
|
assert define == {
|
||||||
|
"DTSGInitialData": {"token": "123"},
|
||||||
|
"DTSGInitData": {"async_get_token": "12345", "token": "123"},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_server_js_define_new():
|
||||||
|
html = """
|
||||||
|
some data;require("TimeSliceImpl").guard(function(){new (require("ServerJS"))().handle({"define":[["DTSGInitialData",[],{"token":""},100]],"require":[...]});}, "ServerJS define", {"root":true})();
|
||||||
|
more data
|
||||||
|
<script><script>require("TimeSliceImpl").guard(function(){var s=new (require("ServerJS"))();s.handle({"define":[["DTSGInitData",[],{"token":"","async_get_token":""},3333]],"require":[...]});require("Run").onAfterLoad(function(){s.cleanup(require("TimeSliceImpl"))});}, "ServerJS define", {"root":true})();</script>
|
||||||
|
other irrelevant data
|
||||||
|
"""
|
||||||
|
define = parse_server_js_define(html)
|
||||||
|
assert define == {
|
||||||
|
"DTSGInitialData": {"token": ""},
|
||||||
|
"DTSGInitData": {"async_get_token": "", "token": ""},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_server_js_define_error():
|
||||||
|
with pytest.raises(ParseError, match="Could not find any"):
|
||||||
|
parse_server_js_define("")
|
||||||
|
|
||||||
|
html = 'function(){(require("ServerJSDefine")).handleDefines([{"a": function(){}}])'
|
||||||
|
with pytest.raises(ParseError, match="Invalid"):
|
||||||
|
parse_server_js_define(html + html)
|
||||||
|
|
||||||
|
html = 'function(){require("ServerJSDefine").handleDefines({"a": "b"})'
|
||||||
|
with pytest.raises(ParseError, match="Invalid"):
|
||||||
|
parse_server_js_define(html + html)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"number,expected",
|
"number,expected",
|
||||||
[(1, "1"), (10, "a"), (123, "3f"), (1000, "rs"), (123456789, "21i3v9")],
|
[(1, "1"), (10, "a"), (123, "3f"), (1000, "rs"), (123456789, "21i3v9")],
|
||||||
@@ -19,13 +65,20 @@ def test_base36encode(number, expected):
|
|||||||
|
|
||||||
|
|
||||||
def test_prefix_url():
|
def test_prefix_url():
|
||||||
assert prefix_url("/") == "https://www.facebook.com/"
|
static_url = "https://upload.messenger.com/"
|
||||||
assert prefix_url("/abc") == "https://www.facebook.com/abc"
|
assert prefix_url(static_url) == static_url
|
||||||
|
assert prefix_url("/") == "https://www.messenger.com/"
|
||||||
|
assert prefix_url("/abc") == "https://www.messenger.com/abc"
|
||||||
|
|
||||||
|
|
||||||
def test_generate_message_id():
|
def test_generate_message_id():
|
||||||
# Returns random output, so hard to test more thoroughly
|
# Returns random output, so hard to test more thoroughly
|
||||||
assert generate_message_id(datetime.datetime.utcnow(), "def")
|
assert generate_message_id(_util.now(), "def")
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_factory():
|
||||||
|
session = session_factory()
|
||||||
|
assert session.headers
|
||||||
|
|
||||||
|
|
||||||
def test_client_id_factory():
|
def test_client_id_factory():
|
||||||
@@ -33,41 +86,105 @@ def test_client_id_factory():
|
|||||||
assert client_id_factory()
|
assert client_id_factory()
|
||||||
|
|
||||||
|
|
||||||
def test_is_home():
|
def test_find_form_request():
|
||||||
assert not is_home("https://m.facebook.com/login/?...")
|
html = """
|
||||||
assert is_home("https://m.facebook.com/home.php?refsrc=...")
|
<div>
|
||||||
|
<form action="/checkpoint/?next=https%3A%2F%2Fwww.messenger.com%2F" class="checkpoint" id="u_0_c" method="post" onsubmit="">
|
||||||
|
<input autocomplete="off" name="jazoest" type="hidden" value="some-number" />
|
||||||
|
<input autocomplete="off" name="fb_dtsg" type="hidden" value="some-base64" />
|
||||||
|
<input class="hidden_elem" data-default-submit="true" name="submit[Continue]" type="submit" />
|
||||||
|
<input autocomplete="off" name="nh" type="hidden" value="some-hex" />
|
||||||
|
<div class="_4-u2 _5x_7 _p0k _5x_9 _4-u8">
|
||||||
|
<div class="_2e9n" id="u_0_d">
|
||||||
|
<strong id="u_0_e">Two factor authentication required</strong>
|
||||||
|
<div id="u_0_f"></div>
|
||||||
|
</div>
|
||||||
|
<div class="_2ph_">
|
||||||
|
<input autocomplete="off" name="no_fido" type="hidden" value="true" />
|
||||||
|
<div class="_50f4">You've asked us to require a 6-digit login code when anyone tries to access your account from a new device or browser.</div>
|
||||||
|
<div class="_3-8y _50f4">Enter the 6-digit code from your Code Generator or 3rd party app below.</div>
|
||||||
|
<div class="_2pie _2pio">
|
||||||
|
<span>
|
||||||
|
<input aria-label="Login code" autocomplete="off" class="inputtext" id="approvals_code" name="approvals_code" placeholder="Login code" tabindex="1" type="text" />
|
||||||
|
</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<div class="_5hzs" id="checkpointBottomBar">
|
||||||
|
<div class="_2s5p">
|
||||||
|
<button class="_42ft _4jy0 _2kak _4jy4 _4jy1 selected _51sy" id="checkpointSubmitButton" name="submit[Continue]" type="submit" value="Continue">Continue</button>
|
||||||
|
</div>
|
||||||
|
<div class="_2s5q">
|
||||||
|
<div class="_25b6" id="u_0_g">
|
||||||
|
<a href="#" id="u_0_h" role="button">Need another way to authenticate?</a>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
"""
|
||||||
|
url, data = find_form_request(html)
|
||||||
|
assert url.startswith("https://www.facebook.com/checkpoint/")
|
||||||
|
assert {
|
||||||
|
"jazoest": "some-number",
|
||||||
|
"fb_dtsg": "some-base64",
|
||||||
|
"nh": "some-hex",
|
||||||
|
"no_fido": "true",
|
||||||
|
"approvals_code": "[missing]",
|
||||||
|
"submit[Continue]": "Continue",
|
||||||
|
} == data
|
||||||
|
|
||||||
|
|
||||||
|
def test_find_form_request_error():
|
||||||
|
with pytest.raises(ParseError, match="Could not find form to submit"):
|
||||||
|
assert find_form_request("")
|
||||||
|
with pytest.raises(ParseError, match="Could not find url to submit to"):
|
||||||
|
assert find_form_request("<form></form>")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip
|
|
||||||
def test_get_error_data():
|
def test_get_error_data():
|
||||||
html = """<?xml version="1.0" encoding="utf-8"?>
|
html = """<!DOCTYPE html>
|
||||||
<!DOCTYPE html PUBLIC "-//WAPFORUM//DTD XHTML Mobile 1.0//EN" "http://www.wapforum.org/DTD/xhtml-mobile10.dtd">
|
<html lang="da" id="facebook" class="no_js">
|
||||||
<html xmlns="http://www.w3.org/1999/xhtml">
|
|
||||||
|
|
||||||
<head>
|
<head>
|
||||||
<title>Log in to Facebook | Facebook</title>
|
<meta charset="utf-8" />
|
||||||
<meta name="referrer" content="origin-when-crossorigin" id="meta_referrer" />
|
<title id="pageTitle">Messenger</title>
|
||||||
<style type="text/css">...</style>
|
<meta name="referrer" content="default" id="meta_referrer" />
|
||||||
<meta name="description" content="..." />
|
|
||||||
<link rel="canonical" href="https://www.facebook.com/login/" />
|
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body tabindex="0" class="b c d e f g">
|
<body class="_605a x1 Locale_da_DK" dir="ltr">
|
||||||
<div class="h"><div id="viewport">...<div id="objects_container"><div class="g" id="root" role="main">
|
<div class="_3v_o" id="XMessengerDotComLoginViewPlaceholder">
|
||||||
<table class="x" role="presentation"><tbody><tr><td class="y">
|
<form id="login_form" action="/login/password/" method="post" onsubmit="">
|
||||||
<div class="z ba bb" style="" id="login_error">
|
<input type="hidden" name="jazoest" value="2222" autocomplete="off" />
|
||||||
<div class="bc">
|
<input type="hidden" name="lsd" value="xyz-abc" autocomplete="off" />
|
||||||
<span>The password you entered is incorrect. <a href="/recover/initiate/?ars=facebook_login_pw_error&email=abc@mail.com&__ccr=XXX" class="bd" aria-label="Have you forgotten your password?">Did you forget your password?</a></span>
|
<div class="_3403 _3404">
|
||||||
|
<div>Type your password again</div>
|
||||||
|
<div>The password you entered is incorrect. <a href="https://www.facebook.com/recover/initiate?ars=facebook_login_pw_error">Did you forget your password?</a></div>
|
||||||
</div>
|
</div>
|
||||||
|
<div id="loginform">
|
||||||
|
<input type="hidden" autocomplete="off" id="initial_request_id" name="initial_request_id" value="xxx" />
|
||||||
|
<input type="hidden" autocomplete="off" name="timezone" value="" id="u_0_1" />
|
||||||
|
<input type="hidden" autocomplete="off" name="lgndim" value="" id="u_0_2" />
|
||||||
|
<input type="hidden" name="lgnrnd" value="aaa" />
|
||||||
|
<input type="hidden" id="lgnjs" name="lgnjs" value="n" />
|
||||||
|
<input type="text" class="inputtext _55r1 _43di" id="email" name="email" placeholder="E-mail or phone number" value="some@email.com" tabindex="0" aria-label="E-mail or phone number" />
|
||||||
|
<input type="password" class="inputtext _55r1 _43di" name="pass" id="pass" tabindex="0" placeholder="Password" aria-label="Password" />
|
||||||
|
<button value="1" class="_42ft _4jy0 _2m_r _43dh _4jy4 _517h _51sy" id="loginbutton" name="login" tabindex="0" type="submit">Continue</button>
|
||||||
|
<div class="_43dj">
|
||||||
|
<div class="uiInputLabel clearfix">
|
||||||
|
<label class="uiInputLabelInput">
|
||||||
|
<input type="checkbox" value="1" name="persistent" tabindex="0" class="" id="u_0_0" />
|
||||||
|
<span class=""></span>
|
||||||
|
</label>
|
||||||
|
<label for="u_0_0" class="uiInputLabelLabel">Stay logged in</label>
|
||||||
|
</div>
|
||||||
|
<input type="hidden" autocomplete="off" id="default_persistent" name="default_persistent" value="0" />
|
||||||
|
</div>
|
||||||
|
</form>
|
||||||
</div>
|
</div>
|
||||||
...
|
|
||||||
</td></tr></tbody></table>
|
|
||||||
<div style="display:none"></div><span><img src="https://facebook.com/security/hsts-pixel.gif" width="0" height="0" style="display:none" /></span>
|
|
||||||
</div></div><div></div></div></div>
|
|
||||||
</body>
|
</body>
|
||||||
|
|
||||||
</html>
|
</html>
|
||||||
"""
|
"""
|
||||||
url = "https://m.facebook.com/login/?email=abc@mail.com&li=XXX&e=1348092"
|
|
||||||
msg = "The password you entered is incorrect. Did you forget your password?"
|
msg = "The password you entered is incorrect. Did you forget your password?"
|
||||||
assert (1348092, msg) == get_error_data(html)
|
assert msg == get_error_data(html)
|
||||||
|
@@ -15,6 +15,7 @@ from fbchat._util import (
|
|||||||
seconds_to_timedelta,
|
seconds_to_timedelta,
|
||||||
millis_to_timedelta,
|
millis_to_timedelta,
|
||||||
timedelta_to_seconds,
|
timedelta_to_seconds,
|
||||||
|
now,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -68,6 +69,17 @@ def test_get_jsmods_require():
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_jsmods_require_version_specifier():
|
||||||
|
data = [
|
||||||
|
["DimensionTracking@1234"],
|
||||||
|
["CavalryLoggerImpl@2345", "startInstrumentation", [], []],
|
||||||
|
]
|
||||||
|
assert get_jsmods_require(data) == {
|
||||||
|
"DimensionTracking": [],
|
||||||
|
"CavalryLoggerImpl.startInstrumentation": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def test_get_jsmods_require_get_image_url():
|
def test_get_jsmods_require_get_image_url():
|
||||||
data = [
|
data = [
|
||||||
[
|
[
|
||||||
@@ -234,3 +246,7 @@ def test_timedelta_to_seconds():
|
|||||||
assert timedelta_to_seconds(datetime.timedelta(seconds=1)) == 1
|
assert timedelta_to_seconds(datetime.timedelta(seconds=1)) == 1
|
||||||
assert timedelta_to_seconds(datetime.timedelta(hours=1)) == 3600
|
assert timedelta_to_seconds(datetime.timedelta(hours=1)) == 3600
|
||||||
assert timedelta_to_seconds(datetime.timedelta(days=1)) == 86400
|
assert timedelta_to_seconds(datetime.timedelta(days=1)) == 86400
|
||||||
|
|
||||||
|
|
||||||
|
def test_now():
|
||||||
|
assert datetime_to_millis(now()) == datetime_to_millis(datetime.datetime.now())
|
||||||
|
Reference in New Issue
Block a user