Clean up utility functions

This commit is contained in:
Mads Marquart
2020-01-23 16:19:09 +01:00
parent 4015bed474
commit 16081fbb19
8 changed files with 143 additions and 143 deletions

View File

@@ -418,10 +418,11 @@ class Client:
Warning:
This is not finished, and the API may change at any point!
"""
at = datetime.datetime.utcnow()
form = {
"folders[0]": "inbox",
"client": "mercury",
"last_action_timestamp": _util.now() - 60 * 1000
"last_action_timestamp": _util.datetime_to_millis(at),
# 'last_action_timestamp': 0
}
j = self.session._payload_post("/ajax/mercury/unread_threads.php", form)
@@ -547,11 +548,10 @@ class Client:
"""
return self._read_status(False, threads, at)
def mark_as_seen(self):
def mark_as_seen(self, at: datetime.datetime):
# TODO: Documenting this
j = self.session._payload_post(
"/ajax/mercury/mark_seen.php", {"seen_timestamp": _util.now()}
)
data = {"seen_timestamp": _util.datetime_to_millis(at)}
j = self.session._payload_post("/ajax/mercury/mark_seen.php", data)
def move_threads(
self, location: _models.ThreadLocation, threads: Iterable[_threads.ThreadABC]

View File

@@ -34,12 +34,12 @@ def queries_to_json(*queries):
return _util.json_minimal(rtn)
def response_to_json(content):
content = _util.strip_json_cruft(content) # Usually only needed in some error cases
def response_to_json(text):
text = _util.strip_json_cruft(text) # Usually only needed in some error cases
try:
j = json.loads(content, cls=ConcatJSONDecoder)
j = json.loads(text, cls=ConcatJSONDecoder)
except Exception as e:
raise _exception.ParseError("Error while parsing JSON", data=content) from e
raise _exception.ParseError("Error while parsing JSON", data=text) from e
rtn = [None] * (len(j))
for x in j:

View File

@@ -1,7 +1,7 @@
import attr
from . import Image, Attachment
from .._common import attrs_default
from .. import _util
from .. import _util, _exception
@attrs_default
@@ -26,6 +26,8 @@ class LocationAttachment(Attachment):
def _from_graphql(cls, data):
url = data.get("url")
address = _util.get_url_parameter(_util.get_url_parameter(url, "u"), "where1")
if not address:
raise _exception.ParseError("Could not find location address", data=data)
try:
latitude, longitude = [float(x) for x in address.split(", ")]
address = None

View File

@@ -1,5 +1,6 @@
import attr
import bs4
import datetime
import re
import requests
import random
@@ -13,6 +14,32 @@ from typing import Optional, Tuple, Mapping, BinaryIO, Sequence, Iterable, Calla
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
def base36encode(number: int) -> str:
"""Convert from Base10 to Base36."""
# Taken from https://en.wikipedia.org/wiki/Base36#Python_implementation
chars = "0123456789abcdefghijklmnopqrstuvwxyz"
sign = "-" if number < 0 else ""
number = abs(number)
result = ""
while number > 0:
number, remainder = divmod(number, 36)
result = chars[remainder] + result
return sign + result
def prefix_url(url: str) -> str:
return "https://www.facebook.com" + url
def generate_message_id(now: datetime.datetime, client_id: str) -> str:
k = _util.datetime_to_millis(now)
l = int(random.random() * 4294967295)
return "<{}:{}-{}@mail.projektitan.com>".format(k, l, client_id)
def get_user_id(session: requests.Session) -> str:
# TODO: Optimize this `.get_dict()` call!
cookies = session.cookies.get_dict()
@@ -106,7 +133,7 @@ def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]:
code = None
try:
code = int(_util.get_url_parameter(url, "e"))
except (IndexError, ValueError):
except (TypeError, ValueError):
pass
soup = bs4.BeautifulSoup(
@@ -143,7 +170,7 @@ class Session:
self._counter += 1 # TODO: Make this operation atomic / thread-safe
return {
"__a": 1,
"__req": _util.str_base(self._counter, 36),
"__req": base36encode(self._counter),
"__rev": self._revision,
"fb_dtsg": self._fb_dtsg,
}
@@ -244,14 +271,14 @@ class Session:
"""
logout_h = self._logout_h
if not logout_h:
url = _util.prefix_url("/bluebar/modern_settings_menu/")
url = prefix_url("/bluebar/modern_settings_menu/")
try:
h_r = self._session.post(url, data={"pmid": "4"})
except requests.RequestException as e:
_exception.handle_requests_error(e)
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
url = _util.prefix_url("/logout.php")
url = prefix_url("/logout.php")
try:
r = self._session.get(url, params={"ref": "mb", "h": logout_h})
except requests.RequestException as e:
@@ -264,7 +291,7 @@ class Session:
user_id = get_user_id(session)
try:
r = session.get(_util.prefix_url("/"))
r = session.get(prefix_url("/"))
except requests.RequestException as e:
_exception.handle_requests_error(e)
@@ -320,26 +347,24 @@ class Session:
session.cookies = requests.cookies.merge_cookies(session.cookies, cookies)
return cls._from_session(session=session)
def _get(self, url, params, error_retries=3):
params.update(self._get_params())
try:
r = self._session.get(_util.prefix_url(url), params=params)
except requests.RequestException as e:
_exception.handle_requests_error(e)
content = _util.check_request(r)
return _util.to_json(content)
def _post(self, url, data, files=None, as_graphql=False):
data.update(self._get_params())
try:
r = self._session.post(_util.prefix_url(url), data=data, files=files)
r = self._session.post(prefix_url(url), data=data, files=files)
except requests.RequestException as e:
_exception.handle_requests_error(e)
content = _util.check_request(r)
# Facebook's encoding is always UTF-8
r.encoding = "utf-8"
_exception.handle_http_error(r.status_code)
if r.text is None or len(r.text) == 0:
raise _exception.HTTPError("Error when sending request: Got empty response")
if as_graphql:
return _graphql.response_to_json(content)
return _graphql.response_to_json(r.text)
else:
return _util.to_json(content)
text = _util.strip_json_cruft(r.text)
j = parse_json(text)
log.debug(j)
return j
def _payload_post(self, url, data, files=None):
j = self._post(url, data, files=files)
@@ -393,14 +418,15 @@ class Session:
]
def _do_send_request(self, data):
now = datetime.datetime.utcnow()
offline_threading_id = _util.generate_offline_threading_id()
data["client"] = "mercury"
data["author"] = "fbid:{}".format(self._user_id)
data["timestamp"] = _util.now()
data["timestamp"] = _util.datetime_to_millis(now)
data["source"] = "source:chat:web"
data["offline_threading_id"] = offline_threading_id
data["message_id"] = offline_threading_id
data["threading_id"] = _util.generate_message_id(self._client_id)
data["threading_id"] = generate_message_id(now, self._client_id)
data["ephemeral_ttl_mode:"] = "0"
j = self._post("/messaging/send/", data)

View File

@@ -38,10 +38,6 @@ def get_limits(limit: Optional[int], max_limit: int) -> Iterable[int]:
yield remainder
def now():
return int(time.time() * 1000)
def json_minimal(data: Any) -> str:
"""Get JSON data in minimal form."""
return json.dumps(data, separators=(",", ":"))
@@ -55,73 +51,21 @@ def strip_json_cruft(text: str) -> str:
raise _exception.ParseError("No JSON object found", data=text) from e
def get_decoded_r(r):
return get_decoded(r._content)
def get_decoded(content):
return content.decode("utf-8")
def parse_json(content: str) -> Any:
def parse_json(text: str) -> Any:
try:
return json.loads(content)
return json.loads(text)
except ValueError as e:
raise _exception.ParseError("Error while parsing JSON", data=content) from e
def digit_to_char(digit):
if digit < 10:
return str(digit)
return chr(ord("a") + digit - 10)
def str_base(number, base):
if number < 0:
return "-" + str_base(-number, base)
(d, m) = divmod(number, base)
if d > 0:
return str_base(d, base) + digit_to_char(m)
return digit_to_char(m)
def generate_message_id(client_id=None):
k = now()
l = int(random.random() * 4294967295)
return "<{}:{}-{}@mail.projektitan.com>".format(k, l, client_id)
def get_signature_id():
return hex(int(random.random() * 2147483648))
raise _exception.ParseError("Error while parsing JSON", data=text) from e
def generate_offline_threading_id():
ret = now()
ret = _util.datetime_to_millis(datetime.datetime.utcnow())
value = int(random.random() * 4294967295)
string = ("0000000000000000000000" + format(value, "b"))[-22:]
msgs = format(ret, "b") + string
return str(int(msgs, 2))
def check_request(r):
_exception.handle_http_error(r.status_code)
content = get_decoded_r(r)
check_content(content)
return content
def check_content(content, as_json=True):
if content is None or len(content) == 0:
raise _exception.HTTPError("Error when sending request: Got empty response")
def to_json(content):
content = strip_json_cruft(content)
j = parse_json(content)
log.debug(j)
return j
def get_jsmods_require(j, index):
if j.get("jsmods") and j["jsmods"].get("require"):
try:
@@ -145,19 +89,11 @@ def mimetype_to_key(mimetype: str) -> str:
return "file_id"
def get_url_parameters(url: str, *args):
def get_url_parameter(url: str, param: str) -> Optional[str]:
params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
return [params[arg][0] for arg in args if params.get(arg)]
def get_url_parameter(url: str, param: str) -> str:
return get_url_parameters(url, param)[0]
def prefix_url(url: str) -> str:
if url.startswith("/"):
return "https://www.facebook.com" + url
return url
if not params.get(param):
return None
return params[param][0]
def seconds_to_datetime(timestamp_in_seconds: float) -> datetime.datetime:

View File

@@ -4,7 +4,7 @@ from fbchat._graphql import ConcatJSONDecoder, queries_to_json, response_to_json
@pytest.mark.parametrize(
"content,result",
"text,result",
[
("", []),
('{"a":"b"}', [{"a": "b"}]),
@@ -12,8 +12,8 @@ from fbchat._graphql import ConcatJSONDecoder, queries_to_json, response_to_json
(' \n{"a": "b" } \n { "b" \n\n : "c" }', [{"a": "b"}, {"b": "c"}]),
],
)
def test_concat_json_decoder(content, result):
assert result == json.loads(content, cls=ConcatJSONDecoder)
def test_concat_json_decoder(text, result):
assert result == json.loads(text, cls=ConcatJSONDecoder)
def test_queries_to_json():

73
tests/test_session.py Normal file
View File

@@ -0,0 +1,73 @@
import datetime
import pytest
from fbchat._session import (
base36encode,
prefix_url,
generate_message_id,
client_id_factory,
is_home,
get_error_data,
)
@pytest.mark.parametrize(
"number,expected",
[(1, "1"), (10, "a"), (123, "3f"), (1000, "rs"), (123456789, "21i3v9")],
)
def test_base36encode(number, expected):
assert base36encode(number) == expected
def test_prefix_url():
assert prefix_url("/") == "https://www.facebook.com/"
assert prefix_url("/abc") == "https://www.facebook.com/abc"
def test_generate_message_id():
# Returns random output, so hard to test more thoroughly
assert generate_message_id(datetime.datetime.utcnow(), "def")
def test_client_id_factory():
# Returns random output, so hard to test more thoroughly
assert client_id_factory()
def test_is_home():
assert not is_home("https://m.facebook.com/login/?...")
assert is_home("https://m.facebook.com/home.php?refsrc=...")
@pytest.mark.skip
def test_get_error_data():
html = """<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//WAPFORUM//DTD XHTML Mobile 1.0//EN" "http://www.wapforum.org/DTD/xhtml-mobile10.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title>Log in to Facebook | Facebook</title>
<meta name="referrer" content="origin-when-crossorigin" id="meta_referrer" />
<style type="text/css">...</style>
<meta name="description" content="..." />
<link rel="canonical" href="https://www.facebook.com/login/" />
</head>
<body tabindex="0" class="b c d e f g">
<div class="h"><div id="viewport">...<div id="objects_container"><div class="g" id="root" role="main">
<table class="x" role="presentation"><tbody><tr><td class="y">
<div class="z ba bb" style="" id="login_error">
<div class="bc">
<span>The password you entered is incorrect. <a href="/recover/initiate/?ars=facebook_login_pw_error&amp;email=abc@mail.com&amp;__ccr=XXX" class="bd" aria-label="Have you forgotten your password?">Did you forget your password?</a></span>
</div>
</div>
...
</td></tr></tbody></table>
<div style="display:none"></div><span><img src="https://facebook.com/security/hsts-pixel.gif" width="0" height="0" style="display:none" /></span>
</div></div><div></div></div></div>
</body>
</html>
"""
url = "https://m.facebook.com/login/?email=abc@mail.com&li=XXX&e=1348092"
msg = "The password you entered is incorrect. Did you forget your password?"
assert (1348092, msg) == get_error_data(html)

View File

@@ -4,13 +4,9 @@ import datetime
from fbchat._util import (
strip_json_cruft,
parse_json,
str_base,
generate_message_id,
get_signature_id,
get_jsmods_require,
mimetype_to_key,
get_url_parameter,
prefix_url,
seconds_to_datetime,
millis_to_datetime,
datetime_to_seconds,
@@ -42,31 +38,6 @@ def test_parse_json_invalid():
parse_json("No JSON object here!")
@pytest.mark.parametrize(
"number,base,expected",
[
(123, 10, "123"),
(1, 36, "1"),
(10, 36, "a"),
(123, 36, "3f"),
(1000, 36, "rs"),
(123456789, 36, "21i3v9"),
],
)
def test_str_base(number, base, expected):
assert str_base(number, base) == expected
def test_generate_message_id():
# Returns random output, so hard to test more thoroughly
generate_message_id("abc")
def test_get_signature_id():
# Returns random output, so hard to test more thoroughly
get_signature_id()
def test_get_jsmods_require_get_image_url():
data = {
"__ar": 1,
@@ -118,15 +89,7 @@ def test_mimetype_to_key():
def test_get_url_parameter():
assert get_url_parameter("http://example.com?a=b&c=d", "c") == "d"
assert get_url_parameter("http://example.com?a=b&a=c", "a") == "b"
with pytest.raises(IndexError):
get_url_parameter("http://example.com", "a")
def test_prefix_url():
assert prefix_url("/") == "https://www.facebook.com/"
assert prefix_url("/abc") == "https://www.facebook.com/abc"
assert prefix_url("abc") == "abc"
assert prefix_url("https://m.facebook.com/abc") == "https://m.facebook.com/abc"
assert get_url_parameter("http://example.com", "a") is None
DT_0 = datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)