Compare commits

..

12 Commits

Author SHA1 Message Date
Mads Marquart
db284cefdf Bump version: 2.0.0a2 -> 2.0.0a3 2020-05-07 11:10:42 +02:00
Mads Marquart
d11f417caa Make logins persistent 2020-05-07 10:56:47 +02:00
Mads Marquart
3b71258f2c Fix tests 2020-05-07 10:23:29 +02:00
Mads Marquart
81584d328b Add more session tests and small error improvements 2020-05-07 10:15:51 +02:00
Mads Marquart
7be2acad7d Initial re-add of 2FA 2020-05-06 23:34:27 +02:00
Mads Marquart
079d4093c4 Use messenger.com URLs instead of facebook.com
This should allow people who have only created a messenger account to
log in.

Also parse required `fb_dtsg` and `client_revision` values better.

The 2-fa flow is removed for now, I'll re-add it later.
2020-05-06 21:57:24 +02:00
Mads Marquart
cce947b18c Fix docs warnings 2020-05-06 13:31:09 +02:00
Mads Marquart
2545a01450 Re-add a few online tests, to easily check when Facebook breaks stuff 2020-05-06 13:31:09 +02:00
Mads Marquart
5d763dfbce Merge pull request #559 from xaadu/patch-1
Fix mistake in session handling example
2020-05-06 11:33:21 +02:00
Mads Marquart
0981be42b9 Fix errors in examples 2020-05-06 11:32:22 +02:00
Abdullah Zayed
93b71bf198 First Object then File Pointer
json.dump() receives object as first argument and File Pointer as 2nd argument.
2020-04-28 12:58:19 +06:00
Mads Marquart
af3758c8a9 Fix TitleSet.title attribute 2020-03-13 11:21:33 +01:00
20 changed files with 532 additions and 213 deletions

View File

@@ -15,4 +15,6 @@ python:
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py
fail_on_warning: true
# Disabled, until we can find a way to get sphinx-autodoc-typehints play nice with our
# module renaming!
fail_on_warning: false

View File

@@ -65,5 +65,5 @@ print("thread's name: {}".format(thread.name))
images = list(thread.fetch_images(limit=20))
for image in images:
if isinstance(image, fbchat.ImageAttachment):
url = c.fetch_image_url(image.id)
url = client.fetch_image_url(image.id)
print(url)

View File

@@ -80,7 +80,7 @@ def on_person_removed(sender, event: fbchat.PersonRemoved):
return
if event.author.id != session.user.id:
print(f"{event.removed.id} got removed. They will be re-added")
event.thread.add_participants([removed.id])
event.thread.add_participants([event.removed.id])
# Login, and start listening for events

View File

@@ -5,7 +5,7 @@ def on_message(event):
# We can only kick people from group chats, so no need to try if it's a user chat
if not isinstance(event.thread, fbchat.Group):
return
if message.text == "Remove me!":
if event.message.text == "Remove me!":
print(f"{event.author.id} will be removed from {event.thread.id}")
event.thread.remove_participant(event.author.id)

View File

@@ -18,7 +18,7 @@ def load_cookies(filename):
def save_cookies(filename, cookies):
with open(filename, "w") as f:
json.dump(f, cookies)
json.dump(cookies, f)
def load_session(cookies):

View File

@@ -65,6 +65,7 @@ from ._models import (
EmojiSize,
Mention,
Message,
MessageSnippet,
MessageData,
)
@@ -117,7 +118,7 @@ from ._listen import Listener
from ._client import Client
__version__ = "2.0.0a2"
__version__ = "2.0.0a3"
__all__ = ("Session", "Listener", "Client")

View File

@@ -524,7 +524,9 @@ class Client:
data = {"voice_clip": voice_clip}
j = self.session._payload_post(
"https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict
"https://upload.messenger.com/ajax/mercury/upload.php",
data,
files=file_dict,
)
if len(j["metadata"]) != len(file_dict):

View File

@@ -54,15 +54,15 @@ class TitleSet(ThreadEvent):
"""Somebody changed a group's title."""
thread = attr.ib(type="_threads.Group") # Set the correct type
#: The new title
title = attr.ib(type=str)
#: The new title. If ``None``, the title was removed
title = attr.ib(type=Optional[str])
#: When the title was set
at = attr.ib(type=datetime.datetime)
@classmethod
def _parse(cls, session, data):
author, thread, at = cls._parse_metadata(session, data)
return cls(author=author, thread=thread, title=data["name"], at=at)
return cls(author=author, thread=thread, title=data["name"] or None, at=at)
@attrs_event

View File

@@ -8,7 +8,7 @@ from . import _util, _exception, _session, _graphql, _events
from typing import Iterable, Optional, Mapping, List
HOST = "edge-chat.facebook.com"
HOST = "edge-chat.messenger.com"
TOPICS = [
# Things that happen in chats (e.g. messages)
@@ -271,10 +271,10 @@ class Listener:
headers = {
"Cookie": get_cookie_header(
self.session._session, "https://edge-chat.facebook.com/chat"
self.session._session, "https://edge-chat.messenger.com/chat"
),
"User-Agent": self.session._session.headers["User-Agent"],
"Origin": "https://www.facebook.com",
"Origin": "https://www.messenger.com",
"Host": HOST,
}

View File

@@ -4,8 +4,8 @@ import enum
from string import Formatter
from . import _attachment, _location, _file, _quick_reply, _sticker
from .._common import log, attrs_default
from .. import _exception, _util, _session, _threads
from typing import Optional, Mapping, Sequence
from .. import _exception, _util
from typing import Optional, Mapping, Sequence, Any
class EmojiSize(enum.Enum):
@@ -85,7 +85,7 @@ class Message:
"""
#: The thread that this message belongs to.
thread = attr.ib(type="_threads.ThreadABC")
thread = attr.ib()
#: The message ID.
id = attr.ib(converter=str, type=str)
@@ -277,7 +277,7 @@ class MessageData(Message):
#: Message ID you want to reply to
reply_to_id = attr.ib(None, type=Optional[str])
#: Replied message
replied_to = attr.ib(None, type=Optional["MessageData"])
replied_to = attr.ib(None, type=Optional[Any])
#: Whether the message was forwarded
forwarded = attr.ib(False, type=Optional[bool])

View File

@@ -1,17 +1,51 @@
import attr
import bs4
import datetime
import re
import requests
import random
import urllib.parse
import re
import json
# TODO: Only import when required
# Or maybe just replace usage with `html.parser`?
import bs4
from ._common import log, kw_only
from . import _graphql, _util, _exception
from typing import Optional, Tuple, Mapping, Callable
from typing import Optional, Mapping, Callable, Any
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
SERVER_JS_DEFINE_REGEX = re.compile(r'require\("ServerJSDefine"\)\)?\.handleDefines\(')
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
def parse_server_js_define(html: str) -> Mapping[str, Any]:
"""Parse ``ServerJSDefine`` entries from a HTML document."""
# Find points where we should start parsing
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
# Skip leading entry
_, *define_splits = define_splits
rtn = []
if not define_splits:
raise _exception.ParseError("Could not find any ServerJSDefine", data=html)
if len(define_splits) < 2:
raise _exception.ParseError("Could not find enough ServerJSDefine", data=html)
if len(define_splits) > 2:
raise _exception.ParseError("Found too many ServerJSDefine", data=define_splits)
# Parse entries (should be two)
for entry in define_splits:
try:
parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0)
except json.JSONDecodeError as e:
raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e
if not isinstance(parsed, list):
raise _exception.ParseError("Invalid ServerJSDefine", data=parsed)
rtn.extend(parsed)
# Convert to a dict
return _util.get_jsmods_define(rtn)
def base36encode(number: int) -> str:
@@ -32,7 +66,7 @@ def base36encode(number: int) -> str:
def prefix_url(url: str) -> str:
if url.startswith("/"):
return "https://www.facebook.com" + url
return "https://www.messenger.com" + url
return url
@@ -51,15 +85,11 @@ def get_user_id(session: requests.Session) -> str:
return str(rtn)
def find_input_fields(html: str):
return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input"))
def session_factory() -> requests.Session:
from . import __version__
session = requests.session()
session.headers["Referer"] = "https://www.facebook.com"
session.headers["Referer"] = "https://www.messenger.com/"
# We won't try to set a fake user agent to mask our presence!
# Facebook allows us access anyhow, and it makes our motives clearer:
# We're not trying to cheat Facebook, we simply want to access their service
@@ -71,81 +101,86 @@ def client_id_factory() -> str:
return hex(int(random.random() * 2 ** 31))[2:]
def is_home(url: str) -> bool:
parts = urllib.parse.urlparse(url)
# Check the urls `/home.php` and `/`
return "home" in parts.path or "/" == parts.path
def find_form_request(html: str):
soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form"))
form = soup.form
if not form:
raise _exception.ParseError("Could not find form to submit", data=soup)
url = form.get("action")
if not url:
raise _exception.ParseError("Could not find url to submit to", data=form)
# From what I've seen, it'll always do this!
if url.startswith("/"):
url = "https://www.facebook.com" + url
# It's okay to set missing values to something crap, the values are localized, and
# hence are not available in the raw HTML
data = {
x["name"]: x.get("value", "[missing]")
for x in form.find_all(["input", "button"])
}
return url, data
def _2fa_helper(session: requests.Session, code: int, r):
soup = find_input_fields(r.text)
data = dict()
def two_factor_helper(session: requests.Session, r, on_2fa_callback):
url, data = find_form_request(r.content.decode("utf-8"))
url = "https://m.facebook.com/login/checkpoint/"
data["approvals_code"] = str(code)
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"]
data["nh"] = soup.find("input", {"name": "nh"})["value"]
data["submit[Submit Code]"] = "Submit Code"
data["codes_submitted"] = "0"
log.info("Submitting 2FA code.")
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["approvals_code"]
del data["submit[Submit Code]"]
del data["codes_submitted"]
# You don't have to type a code if your device is already saved
# Repeats if you get the code wrong
while "approvals_code" not in data:
data["approvals_code"] = on_2fa_callback()
log.info("Submitting 2FA code")
r = session.post(url, data=data, allow_redirects=False)
url, data = find_form_request(r.content.decode("utf-8"))
# TODO: Can be missing if checkup flow was done on another device in the meantime?
if "name_action_selected" in data:
data["name_action_selected"] = "save_device"
data["submit[Continue]"] = "Continue"
log.info("Saving browser.")
# At this stage, we have dtsg, nh, name_action_selected, submit[Continue]
r = session.post(url, data=data)
log.info("Saving browser")
r = session.post(url, data=data, allow_redirects=False)
url, data = find_form_request(r.content.decode("utf-8"))
if is_home(r.url):
return r
log.info("Starting Facebook checkup flow")
r = session.post(url, data=data, allow_redirects=False)
del data["name_action_selected"]
log.info("Starting Facebook checkup flow.")
# At this stage, we have dtsg, nh, submit[Continue]
r = session.post(url, data=data)
url, data = find_form_request(r.content.decode("utf-8"))
if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
data["submit[This was me]"] = "[any value]"
del data["submit[This wasn't me]"]
log.info("Verifying login attempt")
r = session.post(url, data=data, allow_redirects=False)
if is_home(r.url):
return r
del data["submit[Continue]"]
data["submit[This was me]"] = "This Was Me"
log.info("Verifying login attempt.")
# At this stage, we have dtsg, nh, submit[This was me]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[This was me]"]
data["submit[Continue]"] = "Continue"
url, data = find_form_request(r.content.decode("utf-8"))
if "name_action_selected" not in data:
raise _exception.ParseError("Could not fill out form properly (3)", data=data)
data["name_action_selected"] = "save_device"
log.info("Saving device again.")
# At this stage, we have dtsg, nh, submit[Continue], name_action_selected
r = session.post(url, data=data)
return r
log.info("Saving device again")
r = session.post(url, data=data, allow_redirects=False)
print(r.status_code, r.url, r.headers)
return r.headers.get("Location")
def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]:
"""Get error code and message from a request."""
code = None
try:
code = int(_util.get_url_parameter(url, "e"))
except (TypeError, ValueError):
pass
def get_error_data(html: str) -> Optional[str]:
"""Get error message from a request."""
soup = bs4.BeautifulSoup(
html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error")
html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
)
return code, soup.get_text() or None
# Attempt to extract and format the error string
# The error message is in the user's own language!
return " ".join(list(soup.stripped_strings)[1:3]) or None
def get_fb_dtsg(define) -> Optional[str]:
if "DTSGInitData" in define:
return define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
return define["DTSGInitialData"]["token"]
return None
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
@@ -161,7 +196,6 @@ class Session:
_session = attr.ib(factory=session_factory, type=requests.Session)
_counter = attr.ib(0, type=int)
_client_id = attr.ib(factory=client_id_factory, type=str)
_logout_h = attr.ib(None, type=Optional[str])
@property
def user(self):
@@ -185,6 +219,7 @@ class Session:
"fb_dtsg": self._fb_dtsg,
}
# TODO: Add ability to load previous cookies in here, to avoid 2fa flow
@classmethod
def login(
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
@@ -194,65 +229,90 @@ class Session:
Args:
email: Facebook ``email``, ``id`` or ``phone number``
password: Facebook account password
on_2fa_callback: Function that will be called, in case a 2FA code is needed.
This should return the requested 2FA code.
on_2fa_callback: Function that will be called, in case a two factor
authentication code is needed. This should return the requested code.
Only tested with SMS codes, might not work with authentication apps.
Note: Facebook limits the amount of codes they will give you, so if you
don't receive a code, be patient, and try again later!
Example:
>>> import getpass
>>> import fbchat
>>> session = fbchat.Session.login("<email or phone>", getpass.getpass())
>>> import getpass
>>> session = fbchat.Session.login(
... input("Email: "),
... getpass.getpass(),
... on_2fa_callback=lambda: input("2FA Code: ")
... )
Email: abc@gmail.com
Password: ****
2FA Code: 123456
>>> session.user.id
"1234"
"""
session = session_factory()
try:
r = session.get("https://m.facebook.com/")
except requests.RequestException as e:
_exception.handle_requests_error(e)
soup = find_input_fields(r.text)
data = {
# "jazoest": "2754",
# "lsd": "AVqqqRUa",
"initial_request_id": "x", # any, just has to be present
# "timezone": "-120",
# "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=",
# "lgnrnd": "044039_RGm9",
"lgnjs": "n",
"email": email,
"pass": password,
"login": "1",
"persistent": "1", # Changes the cookie type to have a long "expires"
"default_persistent": "0",
}
data = dict(
(elem["name"], elem["value"])
for elem in soup
if elem.has_attr("value") and elem.has_attr("name")
try:
# Should hit a redirect to https://www.messenger.com/
# If this does happen, the session is logged in!
r = session.post(
"https://www.messenger.com/login/password/",
data=data,
allow_redirects=False,
)
data["email"] = email
data["pass"] = password
data["login"] = "Log In"
try:
url = "https://m.facebook.com/login.php?login_attempt=1"
r = session.post(url, data=data)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
# Usually, 'Checkpoint' will refer to 2FA
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
url = r.headers.get("Location")
# We weren't redirected, hence the email or password was wrong
if not url:
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn(error)
if "checkpoint" in url:
if not on_2fa_callback:
raise ValueError(
"2FA code required, please add `on_2fa_callback` to .login"
raise _exception.NotLoggedIn(
"2FA code required! Please supply `on_2fa_callback` to .login"
)
code = on_2fa_callback()
try:
r = _2fa_helper(session, code, r)
except requests.RequestException as e:
_exception.handle_requests_error(e)
# Get a facebook.com url that handles the 2FA flow
# This probably works differently for Messenger-only accounts
url = _util.get_url_parameter(url, "next")
# Explicitly allow redirects
r = session.get(url, allow_redirects=True)
url = two_factor_helper(session, r, on_2fa_callback)
# Sometimes Facebook tries to show the user a "Save Device" dialog
if "save-device" in r.url:
try:
r = session.get("https://m.facebook.com/login/save-device/cancel/")
except requests.RequestException as e:
_exception.handle_requests_error(e)
if not url.startswith("https://www.messenger.com/login/auth_token/"):
raise _exception.ParseError("Failed 2fa flow", data=url)
if is_home(r.url):
r = session.get(url, allow_redirects=False)
url = r.headers.get("Location")
if url != "https://www.messenger.com/":
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error))
try:
return cls._from_session(session=session)
else:
code, msg = get_error_data(r.text, r.url)
raise _exception.ExternalError(
"Login failed at url {!r}".format(r.url), msg, code=code
)
except _exception.NotLoggedIn as e:
raise _exception.ParseError("Failed loading session", data=r) from e
def is_logged_in(self) -> bool:
"""Send a request to Facebook to check the login status.
@@ -264,12 +324,12 @@ class Session:
>>> assert session.is_logged_in()
"""
# Send a request to the login url, to see if we're directed to the home page
url = "https://m.facebook.com/login.php?login_attempt=1"
try:
r = self._session.get(url, allow_redirects=False)
r = self._session.get(prefix_url("/login/"), allow_redirects=False)
except requests.RequestException as e:
_exception.handle_requests_error(e)
return "Location" in r.headers and is_home(r.headers["Location"])
_exception.handle_http_error(r.status_code)
return "https://www.messenger.com/" == r.headers.get("Location")
def logout(self) -> None:
"""Safely log out the user.
@@ -279,57 +339,52 @@ class Session:
Example:
>>> session.logout()
"""
logout_h = self._logout_h
if not logout_h:
url = prefix_url("/bluebar/modern_settings_menu/")
data = {"fb_dtsg": self._fb_dtsg}
try:
h_r = self._session.post(url, data={"pmid": "4"})
except requests.RequestException as e:
_exception.handle_requests_error(e)
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
url = prefix_url("/logout.php")
try:
r = self._session.get(url, params={"ref": "mb", "h": logout_h})
r = self._session.post(
prefix_url("/logout/"), data=data, allow_redirects=False
)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
if "Location" not in r.headers:
raise _exception.FacebookError("Failed logging out, was not redirected!")
if "https://www.messenger.com/login/" != r.headers["Location"]:
raise _exception.FacebookError(
"Failed logging out, got bad redirect: {}".format(r.headers["Location"])
)
@classmethod
def _from_session(cls, session):
# TODO: Automatically set user_id when the cookie changes in the session
user_id = get_user_id(session)
# Make a request to the main page to retrieve ServerJSDefine entries
try:
r = session.get(prefix_url("/"))
r = session.get(prefix_url("/"), allow_redirects=False)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
soup = find_input_fields(r.text)
define = parse_server_js_define(r.content.decode("utf-8"))
fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"})
if fb_dtsg_element:
fb_dtsg = fb_dtsg_element["value"]
else:
# Fall back to searching with a regex
res = FB_DTSG_REGEX.search(r.text)
if not res:
raise _exception.NotLoggedIn("Could not find fb_dtsg")
fb_dtsg = res.group(1)
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0])
logout_h_element = soup.find("input", {"name": "h"})
logout_h = logout_h_element["value"] if logout_h_element else None
return cls(
user_id=user_id,
fb_dtsg=fb_dtsg,
revision=revision,
session=session,
logout_h=logout_h,
fb_dtsg = get_fb_dtsg(define)
if fb_dtsg is None:
raise _exception.ParseError("Could not find fb_dtsg", data=define)
if not fb_dtsg:
# Happens when the client is not actually logged in
raise _exception.NotLoggedIn(
"Found empty fb_dtsg, the session was probably invalid."
)
try:
revision = int(define["SiteData"]["client_revision"])
except TypeError:
raise _exception.ParseError("Could not find client revision", data=define)
return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session)
def get_cookies(self) -> Mapping[str, str]:
"""Retrieve session cookies, that can later be used in `from_cookies`.
@@ -383,10 +438,9 @@ class Session:
# update fb_dtsg token if received in response
if "jsmods" in j:
define = _util.get_jsmods_define(j["jsmods"]["define"])
if "DTSGInitData" in define:
self._fb_dtsg = define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
self._fb_dtsg = define["DTSGInitialData"]["token"]
fb_dtsg = get_fb_dtsg(define)
if fb_dtsg:
self._fb_dtsg = fb_dtsg
try:
return j["payload"]

View File

@@ -313,7 +313,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
def search_messages(
self, query: str, limit: int
) -> Iterable["_models.MessageSnippet"]:
) -> Iterable[_models.MessageSnippet]:
"""Find and get message IDs by query.
Warning! If someone send a message to the thread that matches the query, while

View File

@@ -180,7 +180,7 @@ class GroupData(Group):
"""
#: The group's picture
photo = attr.ib(None, type=Optional["_models.Image"])
photo = attr.ib(None, type=Optional[_models.Image])
#: The name of the group
name = attr.ib(None, type=Optional[str])
#: When the group was last active / when the last message was sent
@@ -188,7 +188,7 @@ class GroupData(Group):
#: Number of messages in the group
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type=Optional["_models.PlanData"])
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The group thread's participant user ids
participants = attr.ib(factory=set, type=Set[str])
#: A dictionary, containing user nicknames mapped to their IDs

View File

@@ -37,7 +37,7 @@ class PageData(Page):
"""
#: The page's picture
photo = attr.ib(type="_models.Image")
photo = attr.ib(type=_models.Image)
#: The name of the page
name = attr.ib(type=str)
#: When the thread was last active / when the last message was sent
@@ -45,7 +45,7 @@ class PageData(Page):
#: Number of messages in the thread
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type=Optional["_models.PlanData"])
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The page's custom URL
url = attr.ib(None, type=Optional[str])
#: The name of the page's location city

View File

@@ -105,7 +105,7 @@ class UserData(User):
"""
#: The user's picture
photo = attr.ib(type="_models.Image")
photo = attr.ib(type=_models.Image)
#: The name of the user
name = attr.ib(type=str)
#: Whether the user and the client are friends
@@ -119,7 +119,7 @@ class UserData(User):
#: Number of messages in the thread
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type=Optional["_models.PlanData"])
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The profile URL. ``None`` for Messenger-only users
url = attr.ib(None, type=Optional[str])
#: The user's gender

View File

@@ -1,6 +1,10 @@
[pytest]
xfail_strict = true
markers =
online: Online tests, that require a user account set up. Meant to be used \
manually, to check whether Facebook has broken something.
addopts =
--strict
-m "not online"
testpaths = tests
filterwarnings = error

View File

@@ -123,6 +123,37 @@ def test_title_set(session):
) == parse_delta(session, data)
def test_title_removed(session):
data = {
"irisSeqId": "11223344",
"irisTags": ["DeltaThreadName", "is_from_iris_fanout"],
"messageMetadata": {
"actorFbId": "3456",
"adminText": "You removed the group name.",
"folderId": {"systemFolderId": "INBOX"},
"messageId": "mid.$XYZ",
"offlineThreadingId": "1122334455",
"skipBumpThread": False,
"tags": [],
"threadKey": {"threadFbId": "4321"},
"threadReadStateEffect": "KEEP_AS_IS",
"timestamp": "1500000000000",
"unsendType": "deny_log_message",
},
"name": "",
"participants": ["1234", "2345", "3456", "4567"],
"requestContext": {"apiArgs": {}},
"tqSeqId": "1111",
"class": "ThreadName",
}
assert TitleSet(
author=User(session=session, id="3456"),
thread=Group(session=session, id="4321"),
title=None,
at=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc),
) == parse_delta(session, data)
def test_forced_fetch(session):
data = {
"forceInsert": False,

28
tests/online/conftest.py Normal file
View File

@@ -0,0 +1,28 @@
import fbchat
import pytest
import logging
import getpass
@pytest.fixture(scope="session")
def session(pytestconfig):
session_cookies = pytestconfig.cache.get("session_cookies", None)
try:
session = fbchat.Session.from_cookies(session_cookies)
except fbchat.FacebookError:
logging.exception("Error while logging in with cookies!")
session = fbchat.Session.login(input("Email: "), getpass.getpass("Password: "))
yield session
pytestconfig.cache.set("session_cookies", session.get_cookies())
@pytest.fixture
def client(session):
return fbchat.Client(session=session)
@pytest.fixture
def listener(session):
return fbchat.Listener(session=session, chat_on=False, foreground=False)

View File

@@ -0,0 +1,94 @@
import pytest
import fbchat
import os
pytestmark = pytest.mark.online
def test_fetch(client):
client.fetch_users()
def test_search_for_users(client):
list(client.search_for_users("test", 10))
def test_search_for_pages(client):
list(client.search_for_pages("test", 100))
def test_search_for_groups(client):
list(client.search_for_groups("test", 1000))
def test_search_for_threads(client):
list(client.search_for_threads("test", 1000))
with pytest.raises(fbchat.HTTPError, match="rate limited"):
list(client.search_for_threads("test", 10000))
def test_message_search(client):
list(client.search_messages("test", 500))
def test_fetch_thread_info(client):
list(client.fetch_thread_info(["4"]))[0]
def test_fetch_threads(client):
list(client.fetch_threads(20))
list(client.fetch_threads(200))
def test_undocumented(client):
client.fetch_unread()
client.fetch_unseen()
@pytest.mark.skip(reason="need a way to get an image id")
def test_fetch_image_url(client):
client.fetch_image_url("TODO")
@pytest.fixture
def open_resource(pytestconfig):
def get_resource_inner(filename):
path = os.path.join(pytestconfig.rootdir, "tests", "resources", filename)
return open(path, "rb")
return get_resource_inner
def test_upload_image(client, open_resource):
with open_resource("image.png") as f:
_ = client.upload([("image.png", f, "image/png")])
def test_upload_many(client, open_resource):
with open_resource("image.png") as f_png, open_resource(
"image.jpg"
) as f_jpg, open_resource("image.gif") as f_gif, open_resource(
"file.json"
) as f_json, open_resource(
"file.txt"
) as f_txt, open_resource(
"audio.mp3"
) as f_mp3, open_resource(
"video.mp4"
) as f_mp4:
_ = client.upload(
[
("image.png", f_png, "image/png"),
("image.jpg", f_jpg, "image/jpeg"),
("image.gif", f_gif, "image/gif"),
("file.json", f_json, "application/json"),
("file.txt", f_txt, "text/plain"),
("audio.mp3", f_mp3, "audio/mpeg"),
("video.mp4", f_mp4, "video/mp4"),
]
)
# def test_mark_as_read(client):
# client.mark_as_read([thread1, thread2])

View File

@@ -1,15 +1,47 @@
import datetime
import pytest
from fbchat import ParseError
from fbchat._session import (
parse_server_js_define,
base36encode,
prefix_url,
generate_message_id,
session_factory,
client_id_factory,
is_home,
find_form_request,
get_error_data,
)
def test_parse_server_js_define():
html = """
some data;require("TimeSliceImpl").guard(function(){(require("ServerJSDefine")).handleDefines([["DTSGInitialData",[],{"token":"123"},100]])
<script>require("TimeSliceImpl").guard(function() {require("ServerJSDefine").handleDefines([["DTSGInitData",[],{"token":"123","async_get_token":"12345"},3333]])
</script>
other irrelevant data
"""
define = parse_server_js_define(html)
assert define == {
"DTSGInitialData": {"token": "123"},
"DTSGInitData": {"async_get_token": "12345", "token": "123"},
}
def test_parse_server_js_define_error():
with pytest.raises(ParseError, match="Could not find any"):
parse_server_js_define("")
html = 'function(){(require("ServerJSDefine")).handleDefines([{"a": function(){}}])'
with pytest.raises(ParseError, match="Invalid"):
parse_server_js_define(html + html)
html = 'function(){require("ServerJSDefine").handleDefines({"a": "b"})'
with pytest.raises(ParseError, match="Invalid"):
parse_server_js_define(html + html)
@pytest.mark.parametrize(
"number,expected",
[(1, "1"), (10, "a"), (123, "3f"), (1000, "rs"), (123456789, "21i3v9")],
@@ -19,8 +51,10 @@ def test_base36encode(number, expected):
def test_prefix_url():
assert prefix_url("/") == "https://www.facebook.com/"
assert prefix_url("/abc") == "https://www.facebook.com/abc"
static_url = "https://upload.messenger.com/"
assert prefix_url(static_url) == static_url
assert prefix_url("/") == "https://www.messenger.com/"
assert prefix_url("/abc") == "https://www.messenger.com/abc"
def test_generate_message_id():
@@ -28,46 +62,115 @@ def test_generate_message_id():
assert generate_message_id(datetime.datetime.utcnow(), "def")
def test_session_factory():
session = session_factory()
assert session.headers
def test_client_id_factory():
# Returns random output, so hard to test more thoroughly
assert client_id_factory()
def test_is_home():
assert not is_home("https://m.facebook.com/login/?...")
assert is_home("https://m.facebook.com/home.php?refsrc=...")
def test_find_form_request():
html = """
<div>
<form action="/checkpoint/?next=https%3A%2F%2Fwww.messenger.com%2F" class="checkpoint" id="u_0_c" method="post" onsubmit="">
<input autocomplete="off" name="jazoest" type="hidden" value="some-number" />
<input autocomplete="off" name="fb_dtsg" type="hidden" value="some-base64" />
<input class="hidden_elem" data-default-submit="true" name="submit[Continue]" type="submit" />
<input autocomplete="off" name="nh" type="hidden" value="some-hex" />
<div class="_4-u2 _5x_7 _p0k _5x_9 _4-u8">
<div class="_2e9n" id="u_0_d">
<strong id="u_0_e">Two factor authentication required</strong>
<div id="u_0_f"></div>
</div>
<div class="_2ph_">
<input autocomplete="off" name="no_fido" type="hidden" value="true" />
<div class="_50f4">You've asked us to require a 6-digit login code when anyone tries to access your account from a new device or browser.</div>
<div class="_3-8y _50f4">Enter the 6-digit code from your Code Generator or 3rd party app below.</div>
<div class="_2pie _2pio">
<span>
<input aria-label="Login code" autocomplete="off" class="inputtext" id="approvals_code" name="approvals_code" placeholder="Login code" tabindex="1" type="text" />
</span>
</div>
</div>
<div class="_5hzs" id="checkpointBottomBar">
<div class="_2s5p">
<button class="_42ft _4jy0 _2kak _4jy4 _4jy1 selected _51sy" id="checkpointSubmitButton" name="submit[Continue]" type="submit" value="Continue">Continue</button>
</div>
<div class="_2s5q">
<div class="_25b6" id="u_0_g">
<a href="#" id="u_0_h" role="button">Need another way to authenticate?</a>
</div>
</div>
</div>
</div>
</form>
</div>
"""
url, data = find_form_request(html)
assert url.startswith("https://www.facebook.com/checkpoint/")
assert {
"jazoest": "some-number",
"fb_dtsg": "some-base64",
"nh": "some-hex",
"no_fido": "true",
"approvals_code": "[missing]",
"submit[Continue]": "Continue",
} == data
def test_find_form_request_error():
with pytest.raises(ParseError, match="Could not find form to submit"):
assert find_form_request("")
with pytest.raises(ParseError, match="Could not find url to submit to"):
assert find_form_request("<form></form>")
@pytest.mark.skip
def test_get_error_data():
html = """<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//WAPFORUM//DTD XHTML Mobile 1.0//EN" "http://www.wapforum.org/DTD/xhtml-mobile10.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
html = """<!DOCTYPE html>
<html lang="da" id="facebook" class="no_js">
<head>
<title>Log in to Facebook | Facebook</title>
<meta name="referrer" content="origin-when-crossorigin" id="meta_referrer" />
<style type="text/css">...</style>
<meta name="description" content="..." />
<link rel="canonical" href="https://www.facebook.com/login/" />
<meta charset="utf-8" />
<title id="pageTitle">Messenger</title>
<meta name="referrer" content="default" id="meta_referrer" />
</head>
<body tabindex="0" class="b c d e f g">
<div class="h"><div id="viewport">...<div id="objects_container"><div class="g" id="root" role="main">
<table class="x" role="presentation"><tbody><tr><td class="y">
<div class="z ba bb" style="" id="login_error">
<div class="bc">
<span>The password you entered is incorrect. <a href="/recover/initiate/?ars=facebook_login_pw_error&amp;email=abc@mail.com&amp;__ccr=XXX" class="bd" aria-label="Have you forgotten your password?">Did you forget your password?</a></span>
<body class="_605a x1 Locale_da_DK" dir="ltr">
<div class="_3v_o" id="XMessengerDotComLoginViewPlaceholder">
<form id="login_form" action="/login/password/" method="post" onsubmit="">
<input type="hidden" name="jazoest" value="2222" autocomplete="off" />
<input type="hidden" name="lsd" value="xyz-abc" autocomplete="off" />
<div class="_3403 _3404">
<div>Type your password again</div>
<div>The password you entered is incorrect. <a href="https://www.facebook.com/recover/initiate?ars=facebook_login_pw_error">Did you forget your password?</a></div>
</div>
<div id="loginform">
<input type="hidden" autocomplete="off" id="initial_request_id" name="initial_request_id" value="xxx" />
<input type="hidden" autocomplete="off" name="timezone" value="" id="u_0_1" />
<input type="hidden" autocomplete="off" name="lgndim" value="" id="u_0_2" />
<input type="hidden" name="lgnrnd" value="aaa" />
<input type="hidden" id="lgnjs" name="lgnjs" value="n" />
<input type="text" class="inputtext _55r1 _43di" id="email" name="email" placeholder="E-mail or phone number" value="some@email.com" tabindex="0" aria-label="E-mail or phone number" />
<input type="password" class="inputtext _55r1 _43di" name="pass" id="pass" tabindex="0" placeholder="Password" aria-label="Password" />
<button value="1" class="_42ft _4jy0 _2m_r _43dh _4jy4 _517h _51sy" id="loginbutton" name="login" tabindex="0" type="submit">Continue</button>
<div class="_43dj">
<div class="uiInputLabel clearfix">
<label class="uiInputLabelInput">
<input type="checkbox" value="1" name="persistent" tabindex="0" class="" id="u_0_0" />
<span class=""></span>
</label>
<label for="u_0_0" class="uiInputLabelLabel">Stay logged in</label>
</div>
<input type="hidden" autocomplete="off" id="default_persistent" name="default_persistent" value="0" />
</div>
</form>
</div>
...
</td></tr></tbody></table>
<div style="display:none"></div><span><img src="https://facebook.com/security/hsts-pixel.gif" width="0" height="0" style="display:none" /></span>
</div></div><div></div></div></div>
</body>
</html>
"""
url = "https://m.facebook.com/login/?email=abc@mail.com&li=XXX&e=1348092"
msg = "The password you entered is incorrect. Did you forget your password?"
assert (1348092, msg) == get_error_data(html)
assert msg == get_error_data(html)