Merge pull request #497 from carpedm20/public-session

Rename State -> Session, and make the class public.
This commit is contained in:
Mads Marquart
2020-01-09 10:49:43 +01:00
committed by GitHub
9 changed files with 155 additions and 175 deletions

View File

@@ -13,6 +13,7 @@ _logging.getLogger(__name__).addHandler(_logging.NullHandler())
from . import _core, _util
from ._core import Image
from ._exception import FBchatException, FBchatFacebookError
from ._session import Session
from ._thread import ThreadType, ThreadLocation, ThreadColor, Thread
from ._user import TypingStatus, User, ActiveStatus
from ._group import Group
@@ -44,4 +45,4 @@ __license__ = "BSD 3-Clause"
__author__ = "Taehoon Kim; Moreels Pieter-Jan; Mads Marquart"
__email__ = "carpedm20@gmail.com"
__all__ = ("Client",)
__all__ = ("Session", "Client")

View File

@@ -5,7 +5,7 @@ import requests
from collections import OrderedDict
from ._core import log
from . import _util, _graphql, _state
from . import _util, _graphql, _session
from ._exception import FBchatException, FBchatFacebookError
from ._thread import ThreadType, ThreadLocation, ThreadColor
@@ -38,9 +38,9 @@ ACONTEXT = {
class Client:
"""A client for the Facebook Chat (Messenger).
This is the main class, which contains all the methods you use to interact with
Facebook. You can extend this class, and overwrite the ``on`` methods, to provide
custom event handling (mainly useful while listening).
This contains all the methods you use to interact with Facebook. You can extend this
class, and overwrite the ``on`` methods, to provide custom event handling (mainly
useful while listening).
"""
@property
@@ -51,7 +51,7 @@ class Client:
"""
return self._uid
def __init__(self, email, password, session_cookies=None):
def __init__(self, session):
"""Initialize and log in the client.
Args:
@@ -67,27 +67,24 @@ class Client:
self._pull_channel = 0
self._mark_alive = True
self._buddylist = dict()
self._session = session
self._uid = session.user_id
# If session cookies aren't set, not properly loaded or gives us an invalid session, then do the login
if (
not session_cookies
or not self.set_session(session_cookies)
or not self.is_logged_in()
):
self.login(email, password)
def __repr__(self):
return "Client(session={!r})".format(self._session)
"""
INTERNAL REQUEST METHODS
"""
def _get(self, url, params):
return self._state._get(url, params)
return self._session._get(url, params)
def _post(self, url, params, files=None):
return self._state._post(url, params, files=files)
return self._session._post(url, params, files=files)
def _payload_post(self, url, data, files=None):
return self._state._payload_post(url, data, files=files)
return self._session._payload_post(url, data, files=files)
def graphql_requests(self, *queries):
"""Execute GraphQL queries.
@@ -101,7 +98,7 @@ class Client:
Raises:
FBchatException: If request failed
"""
return tuple(self._state._graphql_requests(*queries))
return tuple(self._session._graphql_requests(*queries))
def graphql_request(self, query):
"""Shorthand for ``graphql_requests(query)[0]``.
@@ -115,83 +112,6 @@ class Client:
END INTERNAL REQUEST METHODS
"""
"""
LOGIN METHODS
"""
def is_logged_in(self):
"""Send a request to Facebook to check the login status.
Returns:
bool: True if the client is still logged in
"""
return self._state.is_logged_in()
def get_session(self):
"""Retrieve session cookies.
Returns:
dict: A dictionary containing session cookies
"""
return self._state.get_cookies()
def set_session(self, session_cookies):
"""Load session cookies.
Args:
session_cookies (dict): A dictionary containing session cookies
Returns:
bool: False if ``session_cookies`` does not contain proper cookies
"""
try:
# Load cookies into current session
self._state = _state.State.from_cookies(session_cookies)
self._uid = self._state.user_id
except Exception as e:
log.exception("Failed loading session")
return False
return True
def login(self, email, password):
"""Login the user, using ``email`` and ``password``.
If the user is already logged in, this will do a re-login.
Args:
email: Facebook ``email`` or ``id`` or ``phone number``
password: Facebook account password
Raises:
FBchatException: On failed login
"""
self.on_logging_in(email=email)
if not (email and password):
raise ValueError("Email and password not set")
self._state = _state.State.login(
email, password, on_2fa_callback=self.on_2fa_code
)
self._uid = self._state.user_id
self.on_logged_in(email=email)
def logout(self):
"""Safely log out the client.
Returns:
bool: True if the action was successful
"""
if self._state.logout():
self._state = None
self._uid = None
return True
return False
"""
END LOGIN METHODS
"""
"""
FETCH METHODS
"""
@@ -936,7 +856,7 @@ class Client:
def _do_send_request(self, data, get_thread_id=False):
"""Send the data to `SendURL`, and returns the message ID or None on failure."""
mid, thread_id = self._state._do_send_request(data)
mid, thread_id = self._session._do_send_request(data)
if get_thread_id:
return mid, thread_id
else:
@@ -1107,7 +1027,7 @@ class Client:
)
def _upload(self, files, voice_clip=False):
return self._state._upload(files, voice_clip=voice_clip)
return self._session._upload(files, voice_clip=voice_clip)
def _send_files(
self, files, message=None, thread_id=None, thread_type=ThreadType.USER
@@ -1997,7 +1917,7 @@ class Client:
data = {
"seq": self._seq,
"channel": "p_" + self._uid,
"clientid": self._state._client_id,
"clientid": self._session._client_id,
"partition": -2,
"cap": 0,
"uid": self._uid,
@@ -2019,7 +1939,7 @@ class Client:
"msgs_recv": 0,
"sticky_token": self._sticky,
"sticky_pool": self._pool,
"clientid": self._state._client_id,
"clientid": self._session._client_id,
"state": "active" if self._mark_alive else "offline",
}
j = self._get(
@@ -2743,26 +2663,6 @@ class Client:
EVENTS
"""
def on_logging_in(self, email=None):
"""Called when the client is logging in.
Args:
email: The email of the client
"""
log.info("Logging in {}...".format(email))
def on_2fa_code(self):
"""Called when a 2FA code is needed to progress."""
return input("Please enter your 2FA code --> ")
def on_logged_in(self, email=None):
"""Called when the client is successfully logged in.
Args:
email: The email of the client
"""
log.info("Login of {} successful.".format(email))
def on_listening(self):
"""Called when the client is listening."""
log.info("Listening...")

View File

@@ -5,7 +5,7 @@ import requests
import random
import urllib.parse
from ._core import log, attrs_default
from ._core import log, kw_only
from . import _graphql, _util, _exception
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
@@ -98,11 +98,14 @@ def _2fa_helper(session, code, r):
return r
@attrs_default
class State:
"""Stores and manages state required for most Facebook requests."""
@attr.s(slots=True, kw_only=kw_only, repr=False)
class Session:
"""Stores and manages state required for most Facebook requests.
user_id = attr.ib()
This is the main class, which is used to login to Facebook.
"""
_user_id = attr.ib()
_fb_dtsg = attr.ib()
_revision = attr.ib()
_session = attr.ib(factory=session_factory)
@@ -110,7 +113,16 @@ class State:
_client_id = attr.ib(factory=client_id_factory)
_logout_h = attr.ib(None)
def get_params(self):
@property
def user_id(self):
"""The logged in user's ID."""
return self._user_id
def __repr__(self):
# An alternative repr, to illustrate that you can't create the class directly
return "<fbchat.Session user_id={}>".format(self._user_id)
def _get_params(self):
self._counter += 1 # TODO: Make this operation atomic / thread-safe
return {
"__a": 1,
@@ -120,7 +132,18 @@ class State:
}
@classmethod
def login(cls, email, password, on_2fa_callback):
def login(cls, email, password, on_2fa_callback=None):
"""Login the user, using ``email`` and ``password``.
Args:
email: Facebook ``email`` or ``id`` or ``phone number``
password: Facebook account password
on_2fa_callback: Function that will be called, in case a 2FA code is needed.
This should return the requested 2FA code.
Raises:
FBchatException: On failed login
"""
session = session_factory()
soup = find_input_fields(session.get("https://m.facebook.com/").text)
@@ -137,6 +160,10 @@ class State:
# Usually, 'Checkpoint' will refer to 2FA
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
if not on_2fa_callback:
raise _exception.FBchatException(
"2FA code required, please add `on_2fa_callback` to .login"
)
code = on_2fa_callback()
r = _2fa_helper(session, code, r)
@@ -145,7 +172,7 @@ class State:
r = session.get("https://m.facebook.com/login/save-device/cancel/")
if is_home(r.url):
return cls.from_session(session=session)
return cls._from_session(session=session)
else:
raise _exception.FBchatException(
"Login failed. Check email/password. "
@@ -153,12 +180,24 @@ class State:
)
def is_logged_in(self):
"""Send a request to Facebook to check the login status.
Returns:
bool: Whether the user is still logged in
"""
# Send a request to the login url, to see if we're directed to the home page
url = "https://m.facebook.com/login.php?login_attempt=1"
r = self._session.get(url, allow_redirects=False)
return "Location" in r.headers and is_home(r.headers["Location"])
def logout(self):
"""Safely log out the user.
The session object must not be used after this action has been performed!
Raises:
FBchatException: On failed logout
"""
logout_h = self._logout_h
if not logout_h:
url = _util.prefix_url("/bluebar/modern_settings_menu/")
@@ -166,10 +205,14 @@ class State:
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
url = _util.prefix_url("/logout.php")
return self._session.get(url, params={"ref": "mb", "h": logout_h}).ok
r = self._session.get(url, params={"ref": "mb", "h": logout_h})
if not r.ok:
raise exception.FBchatException(
"Failed logging out: {}".format(r.status_code)
)
@classmethod
def from_session(cls, session):
def _from_session(cls, session):
# TODO: Automatically set user_id when the cookie changes in the session
user_id = get_user_id(session)
@@ -198,22 +241,35 @@ class State:
)
def get_cookies(self):
"""Retrieve session cookies, that can later be used in `from_cookies`.
Returns:
dict: A dictionary containing session cookies
"""
return self._session.cookies.get_dict()
@classmethod
def from_cookies(cls, cookies):
"""Load a session from session cookies.
Args:
cookies (dict): A dictionary containing session cookies
Raises:
FBchatException: If given invalid cookies
"""
session = session_factory()
session.cookies = requests.cookies.merge_cookies(session.cookies, cookies)
return cls.from_session(session=session)
return cls._from_session(session=session)
def _get(self, url, params, error_retries=3):
params.update(self.get_params())
params.update(self._get_params())
r = self._session.get(_util.prefix_url(url), params=params)
content = _util.check_request(r)
return _util.to_json(content)
def _post(self, url, data, files=None, as_graphql=False):
data.update(self.get_params())
data.update(self._get_params())
r = self._session.post(_util.prefix_url(url), data=data, files=files)
content = _util.check_request(r)
if as_graphql:
@@ -266,7 +322,7 @@ class State:
def _do_send_request(self, data):
offline_threading_id = _util.generate_offline_threading_id()
data["client"] = "mercury"
data["author"] = "fbid:{}".format(self.user_id)
data["author"] = "fbid:{}".format(self._user_id)
data["timestamp"] = _util.now()
data["source"] = "source:chat:web"
data["offline_threading_id"] = offline_threading_id