Compare commits
20 Commits
v2.0.0a3
...
8ac6dc4ae6
Author | SHA1 | Date | |
---|---|---|---|
|
8ac6dc4ae6 | ||
|
a6cf1d5c89 | ||
|
65b42e6532 | ||
|
8824a1c253 | ||
|
520258e339 | ||
|
435dfaf6d8 | ||
|
cf0e1e3a93 | ||
|
2319fc7c4a | ||
|
b35240bdda | ||
|
6141cc5a41 | ||
|
b1e438dae1 | ||
|
3c0f411be7 | ||
|
9ad0090b02 | ||
|
bec151a560 | ||
|
2087182ecf | ||
|
09627b71ae | ||
|
078bf9fc16 | ||
|
d33e36866d | ||
|
2a382ffaed | ||
|
18a3ffb90d |
@@ -2,7 +2,7 @@ import fbchat
|
|||||||
|
|
||||||
session = fbchat.Session.login("<email>", "<password>")
|
session = fbchat.Session.login("<email>", "<password>")
|
||||||
|
|
||||||
client = fbchat.Client(session)
|
client = fbchat.Client(session=session)
|
||||||
|
|
||||||
# Fetches a list of all users you're currently chatting with, as `User` objects
|
# Fetches a list of all users you're currently chatting with, as `User` objects
|
||||||
users = client.fetch_all_users()
|
users = client.fetch_all_users()
|
||||||
|
@@ -60,7 +60,7 @@ thread.set_color("#0084ff")
|
|||||||
# Will change the thread emoji to `👍`
|
# Will change the thread emoji to `👍`
|
||||||
thread.set_emoji("👍")
|
thread.set_emoji("👍")
|
||||||
|
|
||||||
message = fbchat.Message(session=session, id="<message id>")
|
message = fbchat.Message(thread=thread, id="<message id>")
|
||||||
|
|
||||||
# Will react to a message with a 😍 emoji
|
# Will react to a message with a 😍 emoji
|
||||||
message.react("😍")
|
message.react("😍")
|
||||||
|
@@ -118,7 +118,7 @@ from ._listen import Listener
|
|||||||
|
|
||||||
from ._client import Client
|
from ._client import Client
|
||||||
|
|
||||||
__version__ = "2.0.0a3"
|
__version__ = "2.0.0a4"
|
||||||
|
|
||||||
__all__ = ("Session", "Listener", "Client")
|
__all__ = ("Session", "Listener", "Client")
|
||||||
|
|
||||||
|
@@ -419,7 +419,7 @@ class Client:
|
|||||||
Warning:
|
Warning:
|
||||||
This is not finished, and the API may change at any point!
|
This is not finished, and the API may change at any point!
|
||||||
"""
|
"""
|
||||||
at = datetime.datetime.utcnow()
|
at = _util.now()
|
||||||
form = {
|
form = {
|
||||||
"folders[0]": "inbox",
|
"folders[0]": "inbox",
|
||||||
"client": "mercury",
|
"client": "mercury",
|
||||||
@@ -558,7 +558,7 @@ class Client:
|
|||||||
"shouldSendReadReceipt": "true",
|
"shouldSendReadReceipt": "true",
|
||||||
}
|
}
|
||||||
|
|
||||||
for threads in threads:
|
for thread in threads:
|
||||||
data["ids[{}]".format(thread.id)] = "true" if read else "false"
|
data["ids[{}]".format(thread.id)] = "true" if read else "false"
|
||||||
|
|
||||||
j = self.session._payload_post("/ajax/mercury/change_read_status.php", data)
|
j = self.session._payload_post("/ajax/mercury/change_read_status.php", data)
|
||||||
|
@@ -124,11 +124,11 @@ def handle_graphql_errors(j):
|
|||||||
errors = j["errors"]
|
errors = j["errors"]
|
||||||
if errors:
|
if errors:
|
||||||
error = errors[0] # TODO: Handle multiple errors
|
error = errors[0] # TODO: Handle multiple errors
|
||||||
# TODO: Use `severity` and `description`
|
# TODO: Use `severity`
|
||||||
raise GraphQLError(
|
raise GraphQLError(
|
||||||
# TODO: What data is always available?
|
# TODO: What data is always available?
|
||||||
message=error.get("summary", "Unknown error"),
|
message=error.get("summary", "Unknown error"),
|
||||||
description=error.get("message", ""),
|
description=error.get("message") or error.get("description") or "",
|
||||||
code=error.get("code"),
|
code=error.get("code"),
|
||||||
debug_info=error.get("debug_info"),
|
debug_info=error.get("debug_info"),
|
||||||
)
|
)
|
||||||
|
@@ -15,7 +15,9 @@ from . import _graphql, _util, _exception
|
|||||||
from typing import Optional, Mapping, Callable, Any
|
from typing import Optional, Mapping, Callable, Any
|
||||||
|
|
||||||
|
|
||||||
SERVER_JS_DEFINE_REGEX = re.compile(r'require\("ServerJSDefine"\)\)?\.handleDefines\(')
|
SERVER_JS_DEFINE_REGEX = re.compile(
|
||||||
|
r'(?:"ServerJS".{,100}\.handle\({.*"define":)|(?:require\("ServerJSDefine"\)\)?\.handleDefines\()'
|
||||||
|
)
|
||||||
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
|
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
|
||||||
|
|
||||||
|
|
||||||
@@ -24,6 +26,8 @@ def parse_server_js_define(html: str) -> Mapping[str, Any]:
|
|||||||
# Find points where we should start parsing
|
# Find points where we should start parsing
|
||||||
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
|
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
|
||||||
|
|
||||||
|
# TODO: Extract jsmods "require" and "define" from `bigPipe.onPageletArrive`?
|
||||||
|
|
||||||
# Skip leading entry
|
# Skip leading entry
|
||||||
_, *define_splits = define_splits
|
_, *define_splits = define_splits
|
||||||
|
|
||||||
@@ -89,6 +93,12 @@ def session_factory() -> requests.Session:
|
|||||||
from . import __version__
|
from . import __version__
|
||||||
|
|
||||||
session = requests.session()
|
session = requests.session()
|
||||||
|
# Override Facebook's locale detection during the login process.
|
||||||
|
# The locale is only used when giving errors back to the user, so giving the errors
|
||||||
|
# back in English makes it easier for users to report.
|
||||||
|
session.cookies = session.cookies = requests.cookies.merge_cookies(
|
||||||
|
session.cookies, {"locale": "en_US"}
|
||||||
|
)
|
||||||
session.headers["Referer"] = "https://www.messenger.com/"
|
session.headers["Referer"] = "https://www.messenger.com/"
|
||||||
# We won't try to set a fake user agent to mask our presence!
|
# We won't try to set a fake user agent to mask our presence!
|
||||||
# Facebook allows us access anyhow, and it makes our motives clearer:
|
# Facebook allows us access anyhow, and it makes our motives clearer:
|
||||||
@@ -106,7 +116,7 @@ def find_form_request(html: str):
|
|||||||
|
|
||||||
form = soup.form
|
form = soup.form
|
||||||
if not form:
|
if not form:
|
||||||
raise _exception.ParseError("Could not find form to submit", data=soup)
|
raise _exception.ParseError("Could not find form to submit", data=html)
|
||||||
|
|
||||||
url = form.get("action")
|
url = form.get("action")
|
||||||
if not url:
|
if not url:
|
||||||
@@ -130,10 +140,11 @@ def two_factor_helper(session: requests.Session, r, on_2fa_callback):
|
|||||||
|
|
||||||
# You don't have to type a code if your device is already saved
|
# You don't have to type a code if your device is already saved
|
||||||
# Repeats if you get the code wrong
|
# Repeats if you get the code wrong
|
||||||
while "approvals_code" not in data:
|
while "approvals_code" in data:
|
||||||
data["approvals_code"] = on_2fa_callback()
|
data["approvals_code"] = on_2fa_callback()
|
||||||
log.info("Submitting 2FA code")
|
log.info("Submitting 2FA code")
|
||||||
r = session.post(url, data=data, allow_redirects=False)
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
url, data = find_form_request(r.content.decode("utf-8"))
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
|
|
||||||
# TODO: Can be missing if checkup flow was done on another device in the meantime?
|
# TODO: Can be missing if checkup flow was done on another device in the meantime?
|
||||||
@@ -141,18 +152,28 @@ def two_factor_helper(session: requests.Session, r, on_2fa_callback):
|
|||||||
data["name_action_selected"] = "save_device"
|
data["name_action_selected"] = "save_device"
|
||||||
log.info("Saving browser")
|
log.info("Saving browser")
|
||||||
r = session.post(url, data=data, allow_redirects=False)
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
|
url = r.headers.get("Location")
|
||||||
|
if url and url.startswith("https://www.messenger.com/login/auth_token/"):
|
||||||
|
return url
|
||||||
url, data = find_form_request(r.content.decode("utf-8"))
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
|
|
||||||
log.info("Starting Facebook checkup flow")
|
log.info("Starting Facebook checkup flow")
|
||||||
r = session.post(url, data=data, allow_redirects=False)
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
|
|
||||||
url, data = find_form_request(r.content.decode("utf-8"))
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
|
if "verification_method" in data:
|
||||||
|
raise _exception.NotLoggedIn(
|
||||||
|
"Your account is locked, and you need to log in using a browser, and verify it there!"
|
||||||
|
)
|
||||||
if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
|
if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
|
||||||
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
|
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
|
||||||
data["submit[This was me]"] = "[any value]"
|
data["submit[This was me]"] = "[any value]"
|
||||||
del data["submit[This wasn't me]"]
|
del data["submit[This wasn't me]"]
|
||||||
log.info("Verifying login attempt")
|
log.info("Verifying login attempt")
|
||||||
r = session.post(url, data=data, allow_redirects=False)
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
|
|
||||||
url, data = find_form_request(r.content.decode("utf-8"))
|
url, data = find_form_request(r.content.decode("utf-8"))
|
||||||
if "name_action_selected" not in data:
|
if "name_action_selected" not in data:
|
||||||
@@ -160,8 +181,7 @@ def two_factor_helper(session: requests.Session, r, on_2fa_callback):
|
|||||||
data["name_action_selected"] = "save_device"
|
data["name_action_selected"] = "save_device"
|
||||||
log.info("Saving device again")
|
log.info("Saving device again")
|
||||||
r = session.post(url, data=data, allow_redirects=False)
|
r = session.post(url, data=data, allow_redirects=False)
|
||||||
|
log.debug("2FA location: %s", r.headers.get("Location"))
|
||||||
print(r.status_code, r.url, r.headers)
|
|
||||||
return r.headers.get("Location")
|
return r.headers.get("Location")
|
||||||
|
|
||||||
|
|
||||||
@@ -171,7 +191,6 @@ def get_error_data(html: str) -> Optional[str]:
|
|||||||
html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
|
html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
|
||||||
)
|
)
|
||||||
# Attempt to extract and format the error string
|
# Attempt to extract and format the error string
|
||||||
# The error message is in the user's own language!
|
|
||||||
return " ".join(list(soup.stripped_strings)[1:3]) or None
|
return " ".join(list(soup.stripped_strings)[1:3]) or None
|
||||||
|
|
||||||
|
|
||||||
@@ -232,7 +251,9 @@ class Session:
|
|||||||
on_2fa_callback: Function that will be called, in case a two factor
|
on_2fa_callback: Function that will be called, in case a two factor
|
||||||
authentication code is needed. This should return the requested code.
|
authentication code is needed. This should return the requested code.
|
||||||
|
|
||||||
Only tested with SMS codes, might not work with authentication apps.
|
Tested using SMS and authentication applications. If you have both
|
||||||
|
enabled, you might not receive an SMS code, and you'll have to use the
|
||||||
|
authentication application.
|
||||||
|
|
||||||
Note: Facebook limits the amount of codes they will give you, so if you
|
Note: Facebook limits the amount of codes they will give you, so if you
|
||||||
don't receive a code, be patient, and try again later!
|
don't receive a code, be patient, and try again later!
|
||||||
@@ -292,15 +313,22 @@ class Session:
|
|||||||
raise _exception.NotLoggedIn(
|
raise _exception.NotLoggedIn(
|
||||||
"2FA code required! Please supply `on_2fa_callback` to .login"
|
"2FA code required! Please supply `on_2fa_callback` to .login"
|
||||||
)
|
)
|
||||||
# Get a facebook.com url that handles the 2FA flow
|
# Get a facebook.com/checkpoint/start url that handles the 2FA flow
|
||||||
# This probably works differently for Messenger-only accounts
|
# This probably works differently for Messenger-only accounts
|
||||||
url = _util.get_url_parameter(url, "next")
|
url = _util.get_url_parameter(url, "next")
|
||||||
# Explicitly allow redirects
|
if not url.startswith("https://www.facebook.com/checkpoint/start/"):
|
||||||
r = session.get(url, allow_redirects=True)
|
raise _exception.ParseError("Failed 2fa flow (1)", data=url)
|
||||||
|
|
||||||
|
r = session.get(url, allow_redirects=False)
|
||||||
|
url = r.headers.get("Location")
|
||||||
|
if not url or not url.startswith("https://www.facebook.com/checkpoint/"):
|
||||||
|
raise _exception.ParseError("Failed 2fa flow (2)", data=url)
|
||||||
|
|
||||||
|
r = session.get(url, allow_redirects=False)
|
||||||
url = two_factor_helper(session, r, on_2fa_callback)
|
url = two_factor_helper(session, r, on_2fa_callback)
|
||||||
|
|
||||||
if not url.startswith("https://www.messenger.com/login/auth_token/"):
|
if not url.startswith("https://www.messenger.com/login/auth_token/"):
|
||||||
raise _exception.ParseError("Failed 2fa flow", data=url)
|
raise _exception.ParseError("Failed 2fa flow (3)", data=url)
|
||||||
|
|
||||||
r = session.get(url, allow_redirects=False)
|
r = session.get(url, allow_redirects=False)
|
||||||
url = r.headers.get("Location")
|
url = r.headers.get("Location")
|
||||||
@@ -458,7 +486,7 @@ class Session:
|
|||||||
return self._post("/api/graphqlbatch/", data, as_graphql=True)
|
return self._post("/api/graphqlbatch/", data, as_graphql=True)
|
||||||
|
|
||||||
def _do_send_request(self, data):
|
def _do_send_request(self, data):
|
||||||
now = datetime.datetime.utcnow()
|
now = _util.now()
|
||||||
offline_threading_id = _util.generate_offline_threading_id()
|
offline_threading_id = _util.generate_offline_threading_id()
|
||||||
data["client"] = "mercury"
|
data["client"] = "mercury"
|
||||||
data["author"] = "fbid:{}".format(self._user_id)
|
data["author"] = "fbid:{}".format(self._user_id)
|
||||||
|
@@ -116,8 +116,14 @@ class ThreadABC(metaclass=abc.ABCMeta):
|
|||||||
reply_to_id: Optional message to reply to
|
reply_to_id: Optional message to reply to
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
Send a message with a mention to a thread.
|
||||||
|
|
||||||
>>> mention = fbchat.Mention(thread_id="1234", offset=5, length=2)
|
>>> mention = fbchat.Mention(thread_id="1234", offset=5, length=2)
|
||||||
>>> thread.send_text("A message", mentions=[mention])
|
>>> message_id = thread.send_text("A message", mentions=[mention])
|
||||||
|
|
||||||
|
Reply to the message.
|
||||||
|
|
||||||
|
>>> thread.send_text("A reply", reply_to_id=message_id)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The sent message
|
The sent message
|
||||||
@@ -211,7 +217,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
|
|||||||
Example:
|
Example:
|
||||||
Send a pinned location in Beijing, China.
|
Send a pinned location in Beijing, China.
|
||||||
|
|
||||||
>>> thread.send_location(39.9390731, 116.117273)
|
>>> thread.send_pinned_location(39.9390731, 116.117273)
|
||||||
"""
|
"""
|
||||||
self._send_location(False, latitude=latitude, longitude=longitude)
|
self._send_location(False, latitude=latitude, longitude=longitude)
|
||||||
|
|
||||||
|
@@ -56,23 +56,26 @@ def parse_json(text: str) -> Any:
|
|||||||
|
|
||||||
|
|
||||||
def generate_offline_threading_id():
|
def generate_offline_threading_id():
|
||||||
ret = datetime_to_millis(datetime.datetime.utcnow())
|
ret = datetime_to_millis(now())
|
||||||
value = int(random.random() * 4294967295)
|
value = int(random.random() * 4294967295)
|
||||||
string = ("0000000000000000000000" + format(value, "b"))[-22:]
|
string = ("0000000000000000000000" + format(value, "b"))[-22:]
|
||||||
msgs = format(ret, "b") + string
|
msgs = format(ret, "b") + string
|
||||||
return str(int(msgs, 2))
|
return str(int(msgs, 2))
|
||||||
|
|
||||||
|
|
||||||
|
def remove_version_from_module(module):
|
||||||
|
return module.split("@", 1)[0]
|
||||||
|
|
||||||
|
|
||||||
def get_jsmods_require(require) -> Mapping[str, Sequence[Any]]:
|
def get_jsmods_require(require) -> Mapping[str, Sequence[Any]]:
|
||||||
rtn = {}
|
rtn = {}
|
||||||
for item in require:
|
for item in require:
|
||||||
if len(item) == 1:
|
if len(item) == 1:
|
||||||
(module,) = item
|
(module,) = item
|
||||||
rtn[module] = []
|
rtn[remove_version_from_module(module)] = []
|
||||||
continue
|
continue
|
||||||
method = "{}.{}".format(item[0], item[1])
|
module, method, requirements, arguments = item
|
||||||
requirements = item[2]
|
method = "{}.{}".format(remove_version_from_module(module), method)
|
||||||
arguments = item[3]
|
|
||||||
rtn[method] = arguments
|
rtn[method] = arguments
|
||||||
return rtn
|
return rtn
|
||||||
|
|
||||||
@@ -155,3 +158,11 @@ def timedelta_to_seconds(td: datetime.timedelta) -> int:
|
|||||||
The returned seconds will be rounded to the nearest whole number.
|
The returned seconds will be rounded to the nearest whole number.
|
||||||
"""
|
"""
|
||||||
return round(td.total_seconds())
|
return round(td.total_seconds())
|
||||||
|
|
||||||
|
|
||||||
|
def now() -> datetime.datetime:
|
||||||
|
"""The current time.
|
||||||
|
|
||||||
|
Similar to datetime.datetime.now(), but returns a non-naive datetime.
|
||||||
|
"""
|
||||||
|
return datetime.datetime.now(tz=datetime.timezone.utc)
|
||||||
|
@@ -17,12 +17,51 @@ def session(pytestconfig):
|
|||||||
|
|
||||||
pytestconfig.cache.set("session_cookies", session.get_cookies())
|
pytestconfig.cache.set("session_cookies", session.get_cookies())
|
||||||
|
|
||||||
|
# TODO: Allow the main session object to be closed - and perhaps used in `with`?
|
||||||
|
session._session.close()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def client(session):
|
def client(session):
|
||||||
return fbchat.Client(session=session)
|
return fbchat.Client(session=session)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def user(pytestconfig, session):
|
||||||
|
user_id = pytestconfig.cache.get("user_id", None)
|
||||||
|
if not user_id:
|
||||||
|
user_id = input("A user you're chatting with's id: ")
|
||||||
|
pytestconfig.cache.set("user_id", user_id)
|
||||||
|
return fbchat.User(session=session, id=user_id)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def group(pytestconfig, session):
|
||||||
|
group_id = pytestconfig.cache.get("group_id", None)
|
||||||
|
if not group_id:
|
||||||
|
group_id = input("A group you're chatting with's id: ")
|
||||||
|
pytestconfig.cache.set("group_id", group_id)
|
||||||
|
return fbchat.Group(session=session, id=group_id)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
scope="session",
|
||||||
|
params=[
|
||||||
|
"user",
|
||||||
|
"group",
|
||||||
|
"self",
|
||||||
|
pytest.param("invalid", marks=[pytest.mark.xfail()]),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def any_thread(request, session, user, group):
|
||||||
|
return {
|
||||||
|
"user": user,
|
||||||
|
"group": group,
|
||||||
|
"self": session.user,
|
||||||
|
"invalid": fbchat.Thread(session=session, id="0"),
|
||||||
|
}[request.param]
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def listener(session):
|
def listener(session):
|
||||||
return fbchat.Listener(session=session, chat_on=False, foreground=False)
|
return fbchat.Listener(session=session, chat_on=False, foreground=False)
|
||||||
|
@@ -46,11 +46,6 @@ def test_undocumented(client):
|
|||||||
client.fetch_unseen()
|
client.fetch_unseen()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="need a way to get an image id")
|
|
||||||
def test_fetch_image_url(client):
|
|
||||||
client.fetch_image_url("TODO")
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def open_resource(pytestconfig):
|
def open_resource(pytestconfig):
|
||||||
def get_resource_inner(filename):
|
def get_resource_inner(filename):
|
||||||
@@ -60,6 +55,14 @@ def open_resource(pytestconfig):
|
|||||||
return get_resource_inner
|
return get_resource_inner
|
||||||
|
|
||||||
|
|
||||||
|
def test_upload_and_fetch_image_url(client, open_resource):
|
||||||
|
with open_resource("image.png") as f:
|
||||||
|
((id, mimetype),) = client.upload([("image.png", f, "image/png")])
|
||||||
|
assert mimetype == "image/png"
|
||||||
|
|
||||||
|
assert client.fetch_image_url(id).startswith("http")
|
||||||
|
|
||||||
|
|
||||||
def test_upload_image(client, open_resource):
|
def test_upload_image(client, open_resource):
|
||||||
with open_resource("image.png") as f:
|
with open_resource("image.png") as f:
|
||||||
_ = client.upload([("image.png", f, "image/png")])
|
_ = client.upload([("image.png", f, "image/png")])
|
||||||
@@ -90,5 +93,24 @@ def test_upload_many(client, open_resource):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# def test_mark_as_read(client):
|
def test_mark_as_read(client, user, group):
|
||||||
# client.mark_as_read([thread1, thread2])
|
client.mark_as_read([user, group], fbchat._util.now())
|
||||||
|
|
||||||
|
|
||||||
|
def test_mark_as_unread(client, user, group):
|
||||||
|
client.mark_as_unread([user, group], fbchat._util.now())
|
||||||
|
|
||||||
|
|
||||||
|
def test_move_threads(client, user, group):
|
||||||
|
client.move_threads(fbchat.ThreadLocation.PENDING, [user, group])
|
||||||
|
client.move_threads(fbchat.ThreadLocation.INBOX, [user, group])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="need to have threads to delete")
|
||||||
|
def test_delete_threads():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="need to have messages to delete")
|
||||||
|
def test_delete_messages():
|
||||||
|
pass
|
||||||
|
42
tests/online/test_send.py
Normal file
42
tests/online/test_send.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import pytest
|
||||||
|
import fbchat
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.online
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Verify return values
|
||||||
|
|
||||||
|
|
||||||
|
def test_wave(any_thread):
|
||||||
|
assert any_thread.wave(True)
|
||||||
|
assert any_thread.wave(False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_text(any_thread):
|
||||||
|
assert any_thread.send_text("Test")
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_text_with_mention(any_thread):
|
||||||
|
mention = fbchat.Mention(thread_id=any_thread.id, offset=5, length=8)
|
||||||
|
assert any_thread.send_text("Test @mention", mentions=[mention])
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_emoji(any_thread):
|
||||||
|
assert any_thread.send_emoji("😀", size=fbchat.EmojiSize.LARGE)
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_sticker(any_thread):
|
||||||
|
assert any_thread.send_sticker("1889713947839631")
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_location(any_thread):
|
||||||
|
any_thread.send_location(51.5287718, -0.2416815)
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_pinned_location(any_thread):
|
||||||
|
any_thread.send_pinned_location(39.9390731, 116.117273)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="need a way to use the uploaded files from test_client.py")
|
||||||
|
def test_send_files(any_thread):
|
||||||
|
pass
|
@@ -1,6 +1,6 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import pytest
|
import pytest
|
||||||
from fbchat import ParseError
|
from fbchat import ParseError, _util
|
||||||
from fbchat._session import (
|
from fbchat._session import (
|
||||||
parse_server_js_define,
|
parse_server_js_define,
|
||||||
base36encode,
|
base36encode,
|
||||||
@@ -13,7 +13,7 @@ from fbchat._session import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_parse_server_js_define():
|
def test_parse_server_js_define_old():
|
||||||
html = """
|
html = """
|
||||||
some data;require("TimeSliceImpl").guard(function(){(require("ServerJSDefine")).handleDefines([["DTSGInitialData",[],{"token":"123"},100]])
|
some data;require("TimeSliceImpl").guard(function(){(require("ServerJSDefine")).handleDefines([["DTSGInitialData",[],{"token":"123"},100]])
|
||||||
|
|
||||||
@@ -29,6 +29,20 @@ def test_parse_server_js_define():
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_server_js_define_new():
|
||||||
|
html = """
|
||||||
|
some data;require("TimeSliceImpl").guard(function(){new (require("ServerJS"))().handle({"define":[["DTSGInitialData",[],{"token":""},100]],"require":[...]});}, "ServerJS define", {"root":true})();
|
||||||
|
more data
|
||||||
|
<script><script>require("TimeSliceImpl").guard(function(){var s=new (require("ServerJS"))();s.handle({"define":[["DTSGInitData",[],{"token":"","async_get_token":""},3333]],"require":[...]});require("Run").onAfterLoad(function(){s.cleanup(require("TimeSliceImpl"))});}, "ServerJS define", {"root":true})();</script>
|
||||||
|
other irrelevant data
|
||||||
|
"""
|
||||||
|
define = parse_server_js_define(html)
|
||||||
|
assert define == {
|
||||||
|
"DTSGInitialData": {"token": ""},
|
||||||
|
"DTSGInitData": {"async_get_token": "", "token": ""},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def test_parse_server_js_define_error():
|
def test_parse_server_js_define_error():
|
||||||
with pytest.raises(ParseError, match="Could not find any"):
|
with pytest.raises(ParseError, match="Could not find any"):
|
||||||
parse_server_js_define("")
|
parse_server_js_define("")
|
||||||
@@ -59,7 +73,7 @@ def test_prefix_url():
|
|||||||
|
|
||||||
def test_generate_message_id():
|
def test_generate_message_id():
|
||||||
# Returns random output, so hard to test more thoroughly
|
# Returns random output, so hard to test more thoroughly
|
||||||
assert generate_message_id(datetime.datetime.utcnow(), "def")
|
assert generate_message_id(_util.now(), "def")
|
||||||
|
|
||||||
|
|
||||||
def test_session_factory():
|
def test_session_factory():
|
||||||
|
@@ -15,6 +15,7 @@ from fbchat._util import (
|
|||||||
seconds_to_timedelta,
|
seconds_to_timedelta,
|
||||||
millis_to_timedelta,
|
millis_to_timedelta,
|
||||||
timedelta_to_seconds,
|
timedelta_to_seconds,
|
||||||
|
now,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -68,6 +69,17 @@ def test_get_jsmods_require():
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_jsmods_require_version_specifier():
|
||||||
|
data = [
|
||||||
|
["DimensionTracking@1234"],
|
||||||
|
["CavalryLoggerImpl@2345", "startInstrumentation", [], []],
|
||||||
|
]
|
||||||
|
assert get_jsmods_require(data) == {
|
||||||
|
"DimensionTracking": [],
|
||||||
|
"CavalryLoggerImpl.startInstrumentation": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def test_get_jsmods_require_get_image_url():
|
def test_get_jsmods_require_get_image_url():
|
||||||
data = [
|
data = [
|
||||||
[
|
[
|
||||||
@@ -234,3 +246,7 @@ def test_timedelta_to_seconds():
|
|||||||
assert timedelta_to_seconds(datetime.timedelta(seconds=1)) == 1
|
assert timedelta_to_seconds(datetime.timedelta(seconds=1)) == 1
|
||||||
assert timedelta_to_seconds(datetime.timedelta(hours=1)) == 3600
|
assert timedelta_to_seconds(datetime.timedelta(hours=1)) == 3600
|
||||||
assert timedelta_to_seconds(datetime.timedelta(days=1)) == 86400
|
assert timedelta_to_seconds(datetime.timedelta(days=1)) == 86400
|
||||||
|
|
||||||
|
|
||||||
|
def test_now():
|
||||||
|
assert datetime_to_millis(now()) == datetime_to_millis(datetime.datetime.now())
|
||||||
|
Reference in New Issue
Block a user