Compare commits

...

48 Commits

Author SHA1 Message Date
769b034d38 Update path
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-06-20 15:17:34 +03:00
fd3d5f7301 Update README
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-03-29 15:42:15 +03:00
2fa1b58336 Add session to_file and from_file
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-03-28 14:50:52 +03:00
9523350dc5 Add pyproject.toml and pytest.ini
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-03-28 11:59:32 +03:00
356db553b7 Sync to maraid/fbchat
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-03-28 11:49:48 +03:00
55712756d7 Remove unused features
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2023-03-28 11:44:43 +03:00
Mads Marquart
916a14062d Add unmaintained notice 2020-09-23 12:07:26 +02:00
Mads Marquart
43aa16c32d Remove stupid, obviously flaky test 2020-06-14 23:18:07 +02:00
Mads Marquart
427ae6bc5e Bump version: 2.0.0a4 -> 2.0.0a5 2020-06-14 23:15:50 +02:00
Mads Marquart
d650946531 Add act cookie on login 2020-06-14 23:00:40 +02:00
Mads Marquart
8ac6dc4ae6 Update SERVER_JS_DEFINE_REGEX, so logging in on newer FB versions work 2020-06-14 22:27:26 +02:00
Mads Marquart
a6cf1d5c89 Add _util.now, fixing a few places where datetimes were incorrectly used 2020-06-14 22:26:52 +02:00
Mads Marquart
65b42e6532 Add example of replying to a message 2020-06-07 14:41:05 +02:00
Mads Marquart
8824a1c253 Set override Facebook's auto-locale detection during login
The locale is only used during error handling, and makes it harder for
users to report errors
2020-06-07 13:59:27 +02:00
Mads Marquart
520258e339 Bump version: 2.0.0a3 -> 2.0.0a4 2020-06-07 12:52:59 +02:00
Mads Marquart
435dfaf6d8 Better GraphQL error reporting 2020-06-07 12:48:21 +02:00
Mads Marquart
cf0e1e3a93 Test on_2fa_callback with authentication applications 2020-06-07 12:37:36 +02:00
Mads Marquart
2319fc7c4a Handle early return from two_factor_helper 2020-06-07 12:35:24 +02:00
Mads Marquart
b35240bdda Handle locked accounts 2020-06-07 12:35:07 +02:00
Mads Marquart
6141cc5a41 Update SERVER_JS_DEFINE_REGEX 2020-06-07 12:04:51 +02:00
Mads Marquart
b1e438dae1 Few fixes to 2FA flow 2020-05-16 19:30:03 +02:00
Mads Marquart
3c0f411be7 Fix typo in 2FA logic 2020-05-10 12:01:41 +02:00
Mads Marquart
9ad0090b02 Merge pull request #563 from smilexs4/patch-2
Fix typo in example
2020-05-10 11:53:56 +02:00
Mads Marquart
bec151a560 Merge pull request #562 from smilexs4/patch-1
Fix typo in example
2020-05-10 11:53:39 +02:00
smilexs4
2087182ecf Update interract.py
Changed fbchat.Message parameter from session to thread
2020-05-08 18:45:25 +03:00
smilexs4
09627b71ae Update fetch.py
Solved the exception:

TypeError: __init__() takes 1 positional argument but 2 were given
2020-05-08 17:08:01 +03:00
Mads Marquart
078bf9fc16 Add send online tests 2020-05-07 12:26:39 +02:00
Mads Marquart
d33e36866d Finish Client online tests 2020-05-07 12:10:45 +02:00
Mads Marquart
2a382ffaed Fix Client.mark_as_(un)read, and add tests 2020-05-07 11:59:05 +02:00
Mads Marquart
18a3ffb90d Fix Client.fetch_image_url in some cases
Sometimes (or always?), jsmods require includes a JS version specifier.

This means we couldn't find the url
2020-05-07 11:46:42 +02:00
Mads Marquart
db284cefdf Bump version: 2.0.0a2 -> 2.0.0a3 2020-05-07 11:10:42 +02:00
Mads Marquart
d11f417caa Make logins persistent 2020-05-07 10:56:47 +02:00
Mads Marquart
3b71258f2c Fix tests 2020-05-07 10:23:29 +02:00
Mads Marquart
81584d328b Add more session tests and small error improvements 2020-05-07 10:15:51 +02:00
Mads Marquart
7be2acad7d Initial re-add of 2FA 2020-05-06 23:34:27 +02:00
Mads Marquart
079d4093c4 Use messenger.com URLs instead of facebook.com
This should allow people who have only created a messenger account to
log in.

Also parse required `fb_dtsg` and `client_revision` values better.

The 2-fa flow is removed for now, I'll re-add it later.
2020-05-06 21:57:24 +02:00
Mads Marquart
cce947b18c Fix docs warnings 2020-05-06 13:31:09 +02:00
Mads Marquart
2545a01450 Re-add a few online tests, to easily check when Facebook breaks stuff 2020-05-06 13:31:09 +02:00
Mads Marquart
5d763dfbce Merge pull request #559 from xaadu/patch-1
Fix mistake in session handling example
2020-05-06 11:33:21 +02:00
Mads Marquart
0981be42b9 Fix errors in examples 2020-05-06 11:32:22 +02:00
Abdullah Zayed
93b71bf198 First Object then File Pointer
json.dump() receives object as first argument and File Pointer as 2nd argument.
2020-04-28 12:58:19 +06:00
Mads Marquart
af3758c8a9 Fix TitleSet.title attribute 2020-03-13 11:21:33 +01:00
Mads Marquart
f64c487a2d Bump version: 2.0.0a1 -> 2.0.0a2 2020-03-11 15:45:02 +01:00
Mads Marquart
11534604fe Remove user agent randomization
Caused problems with logging in, and didn't really help on anything
2020-03-11 15:44:34 +01:00
Mads Marquart
9990952fa6 Add Connect and Disconnect events 2020-03-11 15:27:00 +01:00
Mads Marquart
7ee7361646 Clean up event parsing 2020-03-11 15:10:25 +01:00
Mads Marquart
89c6af516c Fix various documentation mistakes 2020-03-11 15:00:50 +01:00
Mads Marquart
c27f599e37 Fix type specifiers in models 2020-03-11 14:43:28 +01:00
63 changed files with 1042 additions and 1243 deletions

View File

@@ -1,34 +0,0 @@
---
name: Bug report
about: Create a report if you're having trouble with `fbchat`
---
## Description of the problem
Example: Logging in fails when the character `%` is in the password. A specific password that fails is `a_password_with_%`
## Code to reproduce
```py
# Example code
from fbchat import Client
client = Client("[REDACTED_USERNAME]", "a_password_with_%")
```
## Traceback
```
Traceback (most recent call last):
File "<test.py>", line 1, in <module>
File "[site-packages]/fbchat/client.py", line 78, in __init__
self.login(email, password, max_tries)
File "[site-packages]/fbchat/client.py", line 407, in login
raise FBchatException('Login failed. Check email/password. (Failed on URL: {})'.format(login_url))
fbchat.FBchatException: Login failed. Check email/password. (Failed on URL: https://m.facebook.com/login.php?login_attempt=1)
```
## Environment information
- Python version
- `fbchat` version
- If relevant, output from `$ python -m pip list`
If you have done any research, include that.
Make sure to redact all personal information.

View File

@@ -1,19 +0,0 @@
---
name: Feature request
about: Suggest a feature that you'd like to see implemented
---
## Description
Example: There's no way to send messages to groups
## Research (if applicable)
Example: I've found the URL `https://facebook.com/send_message.php`, to which you can send a POST requests with the following JSON:
```json
{
"text": message_content,
"fbid": group_id,
"some_variable": ?
}
```
But I don't know how what `some_variable` does, and it doesn't work without it. I've found some examples of `some_variable` to be: `MTIzNDU2Nzg5MA`, `MTIzNDU2Nzg5MQ` and `MTIzNDU2Nzg5Mg`

View File

@@ -1,18 +0,0 @@
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
version: 2
formats:
- pdf
- htmlzip
python:
version: 3.6
install:
- path: .
extra_requirements:
- docs
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py
fail_on_warning: true

View File

@@ -1,53 +0,0 @@
sudo: false
language: python
python: 3.6
cache: pip
before_install: pip install flit
# Use `--deps production` so that we don't install unnecessary dependencies
install: flit install --deps production --extras test
script: pytest
jobs:
include:
- python: 3.5
- python: 3.6
- python: 3.7
- python: pypy3.5
- name: Lint
before_install: skip
install: pip install black
script: black --check --verbose .
- stage: deploy
name: GitHub Releases
if: tag IS present
install: skip
script: flit build
deploy:
provider: releases
api_key: $GITHUB_OAUTH_TOKEN
file_glob: true
file: dist/*
skip_cleanup: true
draft: false
on:
tags: true
- stage: deploy
name: PyPI
if: tag IS present
install: skip
script: skip
deploy:
provider: script
script: flit publish
on:
tags: true
notifications:
email:
on_success: never
on_failure: change

4
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"esbonio.sphinx.confDir": "",
"python.formatting.provider": "autopep8"
}

View File

@@ -1,30 +1,6 @@
``fbchat`` - Facebook Messenger for Python
==========================================
.. image:: https://badgen.net/pypi/v/fbchat
:target: https://pypi.python.org/pypi/fbchat
:alt: Project version
.. image:: https://badgen.net/badge/python/3.5,3.6,3.7,3.8,pypy?list=|
:target: https://pypi.python.org/pypi/fbchat
:alt: Supported python versions: 3.5, 3.6, 3.7, 3.8 and pypy
.. image:: https://badgen.net/pypi/license/fbchat
:target: https://github.com/carpedm20/fbchat/tree/master/LICENSE
:alt: License: BSD 3-Clause
.. image:: https://readthedocs.org/projects/fbchat/badge/?version=stable
:target: https://fbchat.readthedocs.io
:alt: Documentation
.. image:: https://badgen.net/travis/carpedm20/fbchat
:target: https://travis-ci.org/carpedm20/fbchat
:alt: Travis CI
.. image:: https://badgen.net/badge/code%20style/black/black
:target: https://github.com/ambv/black
:alt: Code style
A powerful and efficient library to interact with
`Facebook's Messenger <https://www.facebook.com/messages/>`__, using just your email and password.
@@ -38,16 +14,13 @@ This is *not* an official API, Facebook has that `over here <https://developers.
- Creating groups, setting the group emoji, changing nicknames, creating polls, etc.
- Listening for, an reacting to messages and other events in real-time.
- Type hints, and it has a modern codebase (e.g. only Python 3.5 and upwards).
- ``async``/``await`` (COMING).
Essentially, everything you need to make an amazing Facebook bot!
Version Warning
---------------
``v2`` is currently being developed at the ``master`` branch and it's highly unstable. If you want to view the old ``v1``, go `here <https://github.com/carpedm20/fbchat/tree/v1>`__.
Additionally, you can view the project's progress `here <https://github.com/carpedm20/fbchat/projects/2>`__.
``v2`` is currently being developed at the ``master`` branch and it's highly unstable.
Caveats
@@ -58,14 +31,6 @@ Caveats
However, there's a catch! **Using this library may not comply with Facebook's Terms Of Service!**, so be responsible Facebook citizens! We are not responsible if your account gets banned!
Additionally, **the APIs the library is calling is undocumented!** In theory, this means that your code could break tomorrow, without the slightest warning!
If this happens to you, please report it, so that we can fix it as soon as possible!
.. inclusion-marker-intro-end
.. This message doesn't make sense in the docs at Read The Docs, so we exclude it
With that out of the way, you may go to `Read The Docs <https://fbchat.readthedocs.io/>`__ to see the full documentation!
.. inclusion-marker-installation-start
Installation
@@ -73,40 +38,10 @@ Installation
.. code-block::
$ pip install fbchat
If you don't have `pip <https://pip.pypa.io/>`_, `this guide <http://docs.python-guide.org/en/latest/starting/installation/>`_ can guide you through the process.
You can also install directly from source, provided you have ``pip>=19.0``:
.. code-block::
$ pip install git+https://github.com/carpedm20/fbchat.git
.. inclusion-marker-installation-end
Example Usage
-------------
.. code-block::
import getpass
import fbchat
session = fbchat.Session.login("<email/phone number>", getpass.getpass())
user = fbchat.User(session=session, id=session.user_id)
user.send_text("Test message!")
More examples are available `here <https://github.com/carpedm20/fbchat/tree/master/examples>`__.
Maintainer
----------
- Mads Marquart / `@madsmtm <https://github.com/madsmtm>`__
$ pip install git+https://git.karaolidis.com/karaolidis/fbchat.git
Acknowledgements
----------------
This project was originally inspired by `facebook-chat-api <https://github.com/Schmavery/facebook-chat-api>`__.
This project is a fork of `fbchat <https://github.com/fbchat-dev/fbchat>`__ and was originally inspired by `facebook-chat-api <https://github.com/Schmavery/facebook-chat-api>`__.

View File

@@ -1,19 +0,0 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 59 KiB

View File

@@ -1,26 +0,0 @@
{% extends '!layout.html' %}
{% block extrahead %}
<script async defer src="https://buttons.github.io/buttons.js"></script>
<!-- Alabaster (krTheme++) Hacks, modified version of Kenneth Reitz' https://github.com/kennethreitz/requests/blob/master/docs/_templates/hacks.html -->
<style type="text/css">
/* Rezzy requires precise alignment. */
img.logo {margin-left: -20px!important;}
/* "Quick Search" should be capitalized. */
div#searchbox h3 {text-transform: capitalize;}
/* Go button should be behind input field */
div.sphinxsidebar div#searchbox input[type="text"] {width: 160px}
div#searchbox form div {display: inline-block;}
/* Make the document a little wider, less code is cut-off. */
div.document {width: 1008px;}
/* Much-improved spacing around code blocks. */
div.highlight pre {padding: 11px 14px;}
/* Remain Responsive! */
@media screen and (max-width: 1008px) {
div.sphinxsidebar {display: none;}
div.document {width: 100%!important;}
/* Have code blocks escape the document right-margin. */
div.highlight pre {margin-right: -30px;}
}
</style>
{% endblock %}

View File

@@ -1,13 +0,0 @@
<h3>
<a href="{{ pathto(master_doc) }}">{{ _(project) }}</a>
</h3>
<p>
<a class="github-button" href="https://github.com/carpedm20/fbchat" data-size="large" data-show-count="true" aria-label="Star carpedm20/fbchat on GitHub">Star</a>
</p>
<p>
{{ _(shorttitle) }}
</p>
{{ toctree() }}

View File

@@ -1,13 +0,0 @@
Attachments
===========
.. autoclass:: Attachment()
.. autoclass:: ShareAttachment()
.. autoclass:: Sticker()
.. autoclass:: LocationAttachment()
.. autoclass:: LiveLocationAttachment()
.. autoclass:: FileAttachment()
.. autoclass:: AudioAttachment()
.. autoclass:: ImageAttachment()
.. autoclass:: VideoAttachment()
.. autoclass:: ImageAttachment()

View File

@@ -1,4 +0,0 @@
Client
======
.. autoclass:: Client

View File

@@ -1,4 +0,0 @@
Events
======
.. autoclass:: Listener

View File

@@ -1,8 +0,0 @@
Exceptions
==========
.. autoexception:: FacebookError()
.. autoexception:: HTTPError()
.. autoexception:: ParseError()
.. autoexception:: ExternalError()
.. autoexception:: GraphQLError()

View File

@@ -1,21 +0,0 @@
.. module:: fbchat
.. Note: we're using () to hide the __init__ method where relevant
Full API
========
If you are looking for information on a specific function, class, or method, this part of the documentation is for you.
.. toctree::
:maxdepth: 1
session
client
threads
thread_data
messages
exceptions
attachments
events
misc

View File

@@ -1,8 +0,0 @@
Messages
========
.. autoclass:: Message
.. autoclass:: Mention
.. autoclass:: EmojiSize(Enum)
:undoc-members:
.. autoclass:: MessageData()

View File

@@ -1,20 +0,0 @@
Miscellaneous
=============
.. autoclass:: ThreadLocation(Enum)
:undoc-members:
.. autoclass:: ActiveStatus()
.. autoclass:: QuickReply
.. autoclass:: QuickReplyText
.. autoclass:: QuickReplyLocation
.. autoclass:: QuickReplyPhoneNumber
.. autoclass:: QuickReplyEmail
.. autoclass:: Poll
.. autoclass:: PollOption
.. autoclass:: Plan
.. autoclass:: PlanData()
.. autoclass:: GuestStatus(Enum)
:undoc-members:

View File

@@ -1,4 +0,0 @@
Session
=======
.. autoclass:: Session()

View File

@@ -1,6 +0,0 @@
Thread Data
===========
.. autoclass:: PageData()
.. autoclass:: UserData()
.. autoclass:: GroupData()

View File

@@ -1,8 +0,0 @@
Threads
=======
.. autoclass:: ThreadABC()
.. autoclass:: Thread
.. autoclass:: Page
.. autoclass:: User
.. autoclass:: Group

View File

@@ -1,194 +0,0 @@
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
import os
import sys
sys.path.insert(0, os.path.abspath(".."))
os.environ["_FBCHAT_DISABLE_FIX_MODULE_METADATA"] = "1"
import fbchat
del os.environ["_FBCHAT_DISABLE_FIX_MODULE_METADATA"]
# -- Project information -----------------------------------------------------
project = fbchat.__name__
copyright = "Copyright 2015 - 2018 by Taehoon Kim and 2018 - 2020 by Mads Marquart"
author = "Taehoon Kim; Moreels Pieter-Jan; Mads Marquart"
description = fbchat.__doc__.split("\n")[0]
# The short X.Y version
version = fbchat.__version__
# The full version, including alpha/beta/rc tags
release = fbchat.__version__
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
needs_sphinx = "2.0"
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.intersphinx",
"sphinx.ext.viewcode",
"sphinx.ext.napoleon",
"sphinxcontrib.spelling",
"sphinx_autodoc_typehints",
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]
# The master toctree document.
master_doc = "index"
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
rst_prolog = ".. currentmodule:: " + project
# The reST default role (used for this markup: `text`) to use for all
# documents.
#
default_role = "any"
# Make the reference parsing more strict
#
nitpicky = True
# Prefer strict Python highlighting
#
highlight_language = "python3"
# If true, '()' will be appended to :func: etc. cross-reference text.
#
add_function_parentheses = False
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = "alabaster"
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
html_theme_options = {
"show_powered_by": False,
"github_user": "carpedm20",
"github_repo": project,
"github_banner": True,
"show_related": False,
}
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
html_sidebars = {"**": ["sidebar.html", "searchbox.html"]}
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#
html_show_sphinx = False
# If true, links to the reST sources are added to the pages.
#
html_show_sourcelink = False
# A shorter title for the navigation bar. Default is the same as html_title.
#
html_short_title = description
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = project + "doc"
# -- Options for LaTeX output ------------------------------------------------
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [(master_doc, project + ".tex", project, author, "manual")]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [(master_doc, project, project, [x.strip() for x in author.split(";")], 1)]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, project, project, author, project, description, "Miscellaneous",)
]
# -- Options for Epub output -------------------------------------------------
# A list of files that should not be packed into the epub file.
epub_exclude_files = ["search.html"]
# -- Extension configuration -------------------------------------------------
# -- Options for autodoc extension ---------------------------------------
autoclass_content = "class"
autodoc_member_order = "bysource"
autodoc_default_options = {"members": True}
# -- Options for intersphinx extension ---------------------------------------
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {"https://docs.python.org/": None}
# -- Options for napoleon extension ----------------------------------------------
# Use Google style docstrings
napoleon_google_docstring = True
napoleon_numpy_docstring = False
# napoleon_use_admonition_for_examples = False
# napoleon_use_admonition_for_notes = False
# napoleon_use_admonition_for_references = False
# -- Options for spelling extension ----------------------------------------------
spelling_word_list_filename = [
"spelling/names.txt",
"spelling/technical.txt",
"spelling/fixes.txt",
]
spelling_ignore_wiki_words = False
# spelling_ignore_acronyms = False
spelling_ignore_python_builtins = False
spelling_ignore_importable_modules = False

View File

@@ -1,55 +0,0 @@
.. _examples:
Examples
========
These are a few examples on how to use ``fbchat``. Remember to swap out ``<email>`` and ``<password>`` for your email and password
Basic example
-------------
This will show basic usage of ``fbchat``
.. literalinclude:: ../examples/basic_usage.py
Interacting with Threads
------------------------
This will interact with the thread in every way ``fbchat`` supports
.. literalinclude:: ../examples/interract.py
Fetching Information
--------------------
This will show the different ways of fetching information about users and threads
.. literalinclude:: ../examples/fetch.py
``Echobot``
-----------
This will reply to any message with the same message
.. literalinclude:: ../examples/echobot.py
Remove Bot
----------
This will remove a user from a group if they write the message ``Remove me!``
.. literalinclude:: ../examples/removebot.py
"Prevent changes"-Bot
---------------------
This will prevent chat color, emoji, nicknames and chat name from being changed.
It will also prevent people from being added and removed
.. literalinclude:: ../examples/keepbot.py

View File

@@ -1,23 +0,0 @@
Frequently Asked Questions
==========================
The new version broke my application
------------------------------------
``fbchat`` follows `Scemantic Versioning <https://semver.org/>`__ quite rigorously!
That means that breaking changes can *only* occur in major versions (e.g. ``v1.9.6`` -> ``v2.0.0``).
If you find that something breaks, and you didn't update to a new major version, then it is a bug, and we would be grateful if you reported it!
In case you're stuck with an old codebase, you can downgrade to a previous version of ``fbchat``, e.g. version ``1.9.6``:
.. code-block:: sh
$ pip install fbchat==1.9.6
Will you be supporting creating posts/events/pages and so on?
-------------------------------------------------------------
We won't be focusing on anything else than chat-related things. This library is called ``fbCHAT``, after all!

View File

@@ -1,22 +0,0 @@
.. See README.rst for explanation of these markers
.. include:: ../README.rst
:end-before: inclusion-marker-intro-end
With that said, let's get started!
.. include:: ../README.rst
:start-after: inclusion-marker-installation-start
:end-before: inclusion-marker-installation-end
Documentation Overview
----------------------
.. toctree::
:maxdepth: 2
intro
examples
faq
api/index

View File

@@ -1,152 +0,0 @@
Introduction
============
Welcome, this page will guide you through the basic concepts of using ``fbchat``.
The hardest, and most error prone part is logging in, and managing your login session, so that is what we will look at first.
Logging In
----------
Everything in ``fbchat`` starts with getting an instance of `Session`. Currently there are two ways of doing that, `Session.login` and `Session.from_cookies`.
The follow example will prompt you for you password, and use it to login::
import getpass
import fbchat
session = fbchat.Session.login("<email/phone number>", getpass.getpass())
# If your account requires a two factor authentication code:
session = fbchat.Session.login(
"<your email/phone number>",
getpass.getpass(),
lambda: getpass.getpass("2FA code"),
)
However, **this is not something you should do often!** Logging in/out all the time *will* get your Facebook account locked!
Instead, you should start by using `Session.login`, and then store the cookies with `Session.get_cookies`, so that they can be used instead the next time your application starts.
Usability-wise, this is also better, since you won't have to re-type your password every time you want to login.
The following, quite lengthy, yet very import example, illustrates a way to do this:
.. literalinclude:: ../examples/session_handling.py
Assuming you have successfully completed the above, congratulations! Using ``fbchat`` should be mostly trouble free from now on!
Understanding Thread Ids
------------------------
At the core of any thread is its unique identifier, its ID.
A thread basically just means "something I can chat with", but more precisely, it can refer to a few things:
- A Messenger group thread (`Group`)
- The conversation between you and a single Facebook user (`User`)
- The conversation between you and a Facebook Page (`Page`)
You can get your own user ID with `Session.user.id`.
Getting the ID of a specific group thread is fairly trivial, you only need to login to `<https://www.messenger.com/>`_, click on the group you want to find the ID of, and then read the id from the address bar.
The URL will look something like this: ``https://www.messenger.com/t/1234567890``, where ``1234567890`` would be the ID of the group.
The same method can be applied to some user accounts, though if they have set a custom URL, then you will have to use a different method.
An image to illustrate the process is shown below:
.. image:: /_static/find-group-id.png
:alt: An image illustrating how to find the ID of a group
Once you have an ID, you can use it to create a `Group` or a `User` instance, which will allow you to do all sorts of things. To do this, you need a valid, logged in session::
group = fbchat.Group(session=session, id="<The id you found>")
# Or for user threads
user = fbchat.User(session=session, id="<The id you found>")
Just like threads, every message, poll, plan, attachment, action etc. you send or do on Facebook has a unique ID.
Below is an example of using such a message ID to get a `Message` instance::
# Provide the thread the message was created in, and it's ID
message = fbchat.Message(thread=user, id="<The message id>")
Fetching Information
--------------------
Managing these ids yourself quickly becomes very cumbersome! Luckily, there are other, easier ways of getting `Group`/`User` instances.
You would start by creating a `Client` instance, which is basically just a helper on top of `Session`, that will allow you to do various things::
client = fbchat.Client(session=session)
Now, you could search for threads using `Client.search_for_threads`, or fetch a list of them using `Client.fetch_threads`::
# Fetch the 5 most likely search results
# Uses Facebook's search functions, you don't have to specify the whole name, first names will usually be enough
threads = list(client.search_for_threads("<name of the thread to search for>", limit=5))
# Fetch the 5 most recent threads in your account
threads = list(client.fetch_threads(limit=5))
Note the `list` statements; this is because the methods actually return `generators <https://wiki.python.org/moin/Generators>`__. If you don't know what that means, don't worry, it is just something you can use to make your code faster later.
The examples above will actually fetch `UserData`/`GroupData`, which are subclasses of `User`/`Group`. These model have extra properties, so you could for example print the names and ids of the fetched threads like this::
for thread in threads:
print(f"{thread.id}: {thread.name}")
Once you have a thread, you can use that to fetch the messages therein::
for message in thread.fetch_messages(limit=20):
print(message.text)
Interacting with Threads
------------------------
Once you have a `User`/`Group` instance, you can do things on them as described in `ThreadABC`, since they are subclasses of that.
Some functionality, like adding users to a `Group`, or blocking a `User`, logically only works the relevant threads, so see the full API documentation for that.
With that out of the way, let's see some examples!
The simplest way of interracting with a thread is by sending a message::
# Send a message to the user
message = user.send_text("test message")
There are many types of messages you can send, see the full API documentation for more.
Notice how we held on to the sent message? The return type i a `Message` instance, so you can interract with it afterwards::
# React to the message with the 😍 emoji
message.react("😍")
Besides sending messages, you can also interract with threads in other ways. An example is to change the thread color::
# Will change the thread color to the default blue
thread.set_color("#0084ff")
Listening & Events
------------------
Now, we are finally at the point we have all been waiting for: Creating an automatic Facebook bot!
To get started, you create the functions you want to call on certain events::
def my_function(event: fbchat.MessageEvent):
print(f"Message from {event.author.id}: {event.message.text}")
Then you create a `fbchat.Listener` object::
listener = fbchat.Listener(session=session, chat_on=False, foreground=False)
Which you can then use to receive events, and send them to your functions::
for event in listener.listen():
if isinstance(event, fbchat.MessageEvent):
my_function(event)
View the :ref:`examples` to see some more examples illustrating the event system.

View File

@@ -1,35 +0,0 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
:end
popd

View File

@@ -1,3 +0,0 @@
premade
todo
emoji

View File

@@ -1,3 +0,0 @@
Facebook
GraphQL
GitHub

View File

@@ -1,17 +0,0 @@
iterables
iterable
mimetype
timestamp
metadata
spam
spammy
admin
admins
unsend
unsends
unmute
spritemap
online
inbox
subclassing
codebase

View File

@@ -2,7 +2,7 @@ import fbchat
session = fbchat.Session.login("<email>", "<password>")
client = fbchat.Client(session)
client = fbchat.Client(session=session)
# Fetches a list of all users you're currently chatting with, as `User` objects
users = client.fetch_all_users()
@@ -65,5 +65,5 @@ print("thread's name: {}".format(thread.name))
images = list(thread.fetch_images(limit=20))
for image in images:
if isinstance(image, fbchat.ImageAttachment):
url = c.fetch_image_url(image.id)
url = client.fetch_image_url(image.id)
print(url)

View File

@@ -60,7 +60,7 @@ thread.set_color("#0084ff")
# Will change the thread emoji to `👍`
thread.set_emoji("👍")
message = fbchat.Message(session=session, id="<message id>")
message = fbchat.Message(thread=thread, id="<message id>")
# Will react to a message with a 😍 emoji
message.react("😍")

View File

@@ -80,7 +80,7 @@ def on_person_removed(sender, event: fbchat.PersonRemoved):
return
if event.author.id != session.user.id:
print(f"{event.removed.id} got removed. They will be re-added")
event.thread.add_participants([removed.id])
event.thread.add_participants([event.removed.id])
# Login, and start listening for events

View File

@@ -5,7 +5,7 @@ def on_message(event):
# We can only kick people from group chats, so no need to try if it's a user chat
if not isinstance(event.thread, fbchat.Group):
return
if message.text == "Remove me!":
if event.message.text == "Remove me!":
print(f"{event.author.id} will be removed from {event.thread.id}")
event.thread.remove_participant(event.author.id)

View File

@@ -18,7 +18,7 @@ def load_cookies(filename):
def save_cookies(filename, cookies):
with open(filename, "w") as f:
json.dump(f, cookies)
json.dump(cookies, f)
def load_session(cookies):

View File

@@ -65,6 +65,7 @@ from ._models import (
EmojiSize,
Mention,
Message,
MessageSnippet,
MessageData,
)
@@ -74,6 +75,8 @@ from ._events import (
Event,
UnknownEvent,
ThreadEvent,
Connect,
Disconnect,
# _client_payload
ReactionEvent,
UserStatusEvent,
@@ -115,7 +118,7 @@ from ._listen import Listener
from ._client import Client
__version__ = "2.0.0a1"
__version__ = "2.0.0a5"
__all__ = ("Session", "Listener", "Client")

View File

@@ -33,10 +33,10 @@ class Client:
But does not include deactivated, deleted or memorialized users (logically,
since you can't chat with those).
The order these are returned is arbitary.
The order these are returned is arbitrary.
Example:
Get the name of an arbitary user that you're currently chatting with.
Get the name of an arbitrary user that you're currently chatting with.
>>> users = client.fetch_users()
>>> users[0].name
@@ -211,7 +211,7 @@ class Client:
Warning! If someone send a message to a thread that matches the query, while
we're searching, some snippets will get returned twice, and some will be lost.
This is fundamentally unfixable, it's just how the endpoint is implemented.
This is fundamentally not fixable, it's just how the endpoint is implemented.
Args:
query: Text to search for
@@ -419,7 +419,7 @@ class Client:
Warning:
This is not finished, and the API may change at any point!
"""
at = datetime.datetime.utcnow()
at = _util.now()
form = {
"folders[0]": "inbox",
"client": "mercury",
@@ -524,7 +524,9 @@ class Client:
data = {"voice_clip": voice_clip}
j = self.session._payload_post(
"https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict
"https://upload.messenger.com/ajax/mercury/upload.php",
data,
files=file_dict,
)
if len(j["metadata"]) != len(file_dict):
@@ -556,7 +558,7 @@ class Client:
"shouldSendReadReceipt": "true",
}
for threads in threads:
for thread in threads:
data["ids[{}]".format(thread.id)] = "true" if read else "false"
j = self.session._payload_post("/ajax/mercury/change_read_status.php", data)

View File

@@ -24,8 +24,7 @@ class Typing(ThreadEvent):
return cls(author=author, thread=author, status=status)
@classmethod
def _parse(cls, session, data):
# TODO: Rename this method
def _parse_thread_typing(cls, session, data):
author = _threads.User(session=session, id=str(data["sender_fbid"]))
thread = _threads.Group(session=session, id=str(data["thread"]))
status = data["state"] == 1
@@ -68,6 +67,25 @@ class Presence(Event):
return cls(statuses=statuses, full=data["list_type"] == "full")
@attrs_event
class Connect(Event):
"""The client was connected to Facebook.
This is not guaranteed to be triggered the same amount of times `Disconnect`!
"""
@attrs_event
class Disconnect(Event):
"""The client lost the connection to Facebook.
This is not guaranteed to be triggered the same amount of times `Connect`!
"""
#: The reason / error string for the disconnect
reason = attr.ib(type=str)
def parse_events(session, topic, data):
# See Mqtt._configure_connect_options for information about these topics
try:
@@ -90,7 +108,7 @@ def parse_events(session, topic, data):
) from e
elif topic == "/thread_typing":
yield Typing._parse(session, data)
yield Typing._parse_thread_typing(session, data)
elif topic == "/orca_typing_notifications":
yield Typing._parse_orca(session, data)

View File

@@ -1,5 +1,4 @@
import attr
import abc
from .._common import kw_only
from .. import _exception, _util, _threads
@@ -10,14 +9,9 @@ attrs_event = attr.s(slots=True, kw_only=kw_only, frozen=True)
@attrs_event
class Event(metaclass=abc.ABCMeta):
class Event:
"""Base class for all events."""
@classmethod
@abc.abstractmethod
def _parse(cls, session, data):
raise NotImplementedError
@staticmethod
def _get_thread(session, data):
# TODO: Handle pages? Is it even possible?
@@ -60,3 +54,9 @@ class ThreadEvent(Event):
thread = cls._get_thread(session, metadata)
at = _util.millis_to_datetime(int(metadata["timestamp"]))
return author, thread, at
@classmethod
def _parse_fetch(cls, session, data):
author = _threads.User(session=session, id=data["message_sender"]["id"])
at = _util.millis_to_datetime(int(data["timestamp_precise"]))
return author, at

View File

@@ -54,15 +54,15 @@ class TitleSet(ThreadEvent):
"""Somebody changed a group's title."""
thread = attr.ib(type="_threads.Group") # Set the correct type
#: The new title
title = attr.ib(type=str)
#: The new title. If ``None``, the title was removed
title = attr.ib(type=Optional[str])
#: When the title was set
at = attr.ib(type=datetime.datetime)
@classmethod
def _parse(cls, session, data):
author, thread, at = cls._parse_metadata(session, data)
return cls(author=author, thread=thread, title=data["name"], at=at)
return cls(author=author, thread=thread, title=data["name"] or None, at=at)
@attrs_event

View File

@@ -1,7 +1,7 @@
import attr
import requests
from typing import Any
from typing import Any, Optional
# Not frozen, since that doesn't work in PyPy
@attr.s(slots=True, auto_exc=True)
@@ -20,7 +20,7 @@ class HTTPError(FacebookError):
"""Base class for errors with the HTTP(s) connection to Facebook."""
#: The returned HTTP status code, if relevant
status_code = attr.ib(None, type=int)
status_code = attr.ib(None, type=Optional[int])
def __str__(self):
if not self.status_code:
@@ -58,7 +58,7 @@ class ExternalError(FacebookError):
#: The error message that Facebook returned (Possibly in the user's own language)
description = attr.ib(type=str)
#: The error code that Facebook returned
code = attr.ib(None, type=int)
code = attr.ib(None, type=Optional[int])
def __str__(self):
if self.code:
@@ -73,7 +73,7 @@ class GraphQLError(ExternalError):
# TODO: Handle multiple errors
#: Query debug information
debug_info = attr.ib(None, type=str)
debug_info = attr.ib(None, type=Optional[str])
def __str__(self):
if self.debug_info:
@@ -124,11 +124,11 @@ def handle_graphql_errors(j):
errors = j["errors"]
if errors:
error = errors[0] # TODO: Handle multiple errors
# TODO: Use `severity` and `description`
# TODO: Use `severity`
raise GraphQLError(
# TODO: What data is always available?
message=error.get("summary", "Unknown error"),
description=error.get("message", ""),
description=error.get("message") or error.get("description") or "",
code=error.get("code"),
debug_info=error.get("debug_info"),
)

View File

@@ -5,10 +5,10 @@ import requests
from ._common import log, kw_only
from . import _util, _exception, _session, _graphql, _events
from typing import Iterable, Optional, Mapping
from typing import Iterable, Optional, Mapping, List
HOST = "edge-chat.facebook.com"
HOST = "edge-chat.messenger.com"
TOPICS = [
# Things that happen in chats (e.g. messages)
@@ -118,7 +118,7 @@ class Listener:
_mqtt = attr.ib(factory=mqtt_factory, type=paho.mqtt.client.Client)
_sync_token = attr.ib(None, type=Optional[str])
_sequence_id = attr.ib(None, type=Optional[int])
_tmp_events = attr.ib(factory=list, type=Iterable[_events.Event])
_tmp_events = attr.ib(factory=list, type=List[_events.Event])
def __attrs_post_init__(self):
# Configure callbacks
@@ -152,6 +152,7 @@ class Listener:
"The MQTT listener was disconnected for too long,"
" events may have been lost"
)
# TODO: Find a way to tell the user that they may now be missing events
self._sync_token = None
self._sequence_id = None
return False
@@ -270,10 +271,10 @@ class Listener:
headers = {
"Cookie": get_cookie_header(
self.session._session, "https://edge-chat.facebook.com/chat"
self.session._session, "https://edge-chat.messenger.com/chat"
),
"User-Agent": self.session._session.headers["User-Agent"],
"Origin": "https://www.facebook.com",
"Origin": "https://www.messenger.com",
"Host": HOST,
}
@@ -282,11 +283,12 @@ class Listener:
path="/chat?sid={}".format(session_id), headers=headers
)
def _reconnect(self):
def _reconnect(self) -> bool:
# Try reconnecting
self._configure_connect_options()
try:
self._mqtt.reconnect()
return True
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
@@ -296,6 +298,7 @@ class Listener:
log.debug("MQTT reconnection failed: %s", e)
# Wait before reconnecting
self._mqtt._reconnect_wait()
return False
def listen(self) -> Iterable[_events.Event]:
"""Run the listening loop continually.
@@ -315,12 +318,10 @@ class Listener:
self._sequence_id = fetch_sequence_id(self.session)
# Make sure we're connected
while True:
# Beware, internal API, may have to change this to something more stable!
if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async:
self._reconnect()
else:
break
while not self._reconnect():
pass
yield _events.Connect()
while True:
rc = self._mqtt.loop(timeout=1.0)
@@ -339,18 +340,23 @@ class Listener:
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
# If known/expected error
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
log.warning("Connection lost, retrying")
yield _events.Disconnect(reason="Connection lost, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340
log.warning("Connection error, retrying")
yield _events.Disconnect(reason="Connection error, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
raise _exception.NotLoggedIn("MQTT connection refused")
else:
err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err)
reason = "MQTT Error: {}, retrying".format(err)
yield _events.Disconnect(reason=reason)
self._reconnect()
while not self._reconnect():
pass
yield _events.Connect()
if self._tmp_events:
yield from self._tmp_events
@@ -364,7 +370,7 @@ class Listener:
The `Listener` object should not be used after this is called!
Example:
Stop the listener when recieving a message with the text "/stop"
Stop the listener when receiving a message with the text "/stop"
>>> for event in listener.listen():
... if isinstance(event, fbchat.MessageEvent):
@@ -374,7 +380,7 @@ class Listener:
self._mqtt.disconnect()
def set_foreground(self, value: bool) -> None:
"""Set the `foreground` value while listening."""
"""Set the ``foreground`` value while listening."""
# TODO: Document what this actually does!
payload = _util.json_minimal({"foreground": value})
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
@@ -383,7 +389,7 @@ class Listener:
# info.wait_for_publish()
def set_chat_on(self, value: bool) -> None:
"""Set the `chat_on` value while listening."""
"""Set the ``chat_on`` value while listening."""
# TODO: Document what this actually does!
# TODO: Is this the right request to make?
data = {"make_user_available_when_in_foreground": value}

View File

@@ -3,7 +3,7 @@ from . import Image
from .._common import attrs_default
from .. import _util
from typing import Sequence
from typing import Optional, Sequence
@attrs_default
@@ -11,7 +11,7 @@ class Attachment:
"""Represents a Facebook attachment."""
#: The attachment ID
id = attr.ib(None, type=str)
id = attr.ib(None, type=Optional[str])
@attrs_default
@@ -24,21 +24,21 @@ class ShareAttachment(Attachment):
"""Represents a shared item (e.g. URL) attachment."""
#: ID of the author of the shared post
author = attr.ib(None, type=str)
author = attr.ib(None, type=Optional[str])
#: Target URL
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
#: Original URL if Facebook redirects the URL
original_url = attr.ib(None, type=str)
original_url = attr.ib(None, type=Optional[str])
#: Title of the attachment
title = attr.ib(None, type=str)
title = attr.ib(None, type=Optional[str])
#: Description of the attachment
description = attr.ib(None, type=str)
description = attr.ib(None, type=Optional[str])
#: Name of the source
source = attr.ib(None, type=str)
source = attr.ib(None, type=Optional[str])
#: The attached image
image = attr.ib(None, type=Image)
image = attr.ib(None, type=Optional[Image])
#: URL of the original image if Facebook uses ``safe_image``
original_image_url = attr.ib(None, type=str)
original_image_url = attr.ib(None, type=Optional[str])
#: List of additional attachments
attachments = attr.ib(factory=list, type=Sequence[Attachment])

View File

@@ -4,6 +4,8 @@ import enum
from .._common import attrs_default
from .. import _util
from typing import Optional
class ThreadLocation(enum.Enum):
"""Used to specify where a thread is located (inbox, pending, archived, other)."""
@@ -21,11 +23,11 @@ class ThreadLocation(enum.Enum):
@attrs_default
class ActiveStatus:
#: Whether the user is active now
active = attr.ib(None, type=bool)
#: Datetime when the user was last active
last_active = attr.ib(None, type=datetime.datetime)
active = attr.ib(type=bool)
#: When the user was last active
last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Whether the user is playing Messenger game now
in_game = attr.ib(None, type=bool)
in_game = attr.ib(None, type=Optional[bool])
@classmethod
def _from_orca_presence(cls, data):
@@ -42,9 +44,9 @@ class Image:
#: URL to the image
url = attr.ib(type=str)
#: Width of the image
width = attr.ib(None, type=int)
width = attr.ib(None, type=Optional[int])
#: Height of the image
height = attr.ib(None, type=int)
height = attr.ib(None, type=Optional[int])
@classmethod
def _from_uri(cls, data):

View File

@@ -4,7 +4,7 @@ from . import Image, Attachment
from .._common import attrs_default
from .. import _util
from typing import Set
from typing import Set, Optional
@attrs_default
@@ -12,13 +12,13 @@ class FileAttachment(Attachment):
"""Represents a file that has been sent as a Facebook attachment."""
#: URL where you can download the file
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
#: Size of the file in bytes
size = attr.ib(None, type=int)
size = attr.ib(None, type=Optional[int])
#: Name of the file
name = attr.ib(None, type=str)
name = attr.ib(None, type=Optional[str])
#: Whether Facebook determines that this file may be harmful
is_malicious = attr.ib(None, type=bool)
is_malicious = attr.ib(None, type=Optional[bool])
@classmethod
def _from_graphql(cls, data, size=None):
@@ -36,13 +36,13 @@ class AudioAttachment(Attachment):
"""Represents an audio file that has been sent as a Facebook attachment."""
#: Name of the file
filename = attr.ib(None, type=str)
filename = attr.ib(None, type=Optional[str])
#: URL of the audio file
url = attr.ib(None, type=str)
#: Duration of the audio clip as a timedelta
duration = attr.ib(None, type=datetime.timedelta)
url = attr.ib(None, type=Optional[str])
#: Duration of the audio clip
duration = attr.ib(None, type=Optional[datetime.timedelta])
#: Audio type
audio_type = attr.ib(None, type=str)
audio_type = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, data):
@@ -63,13 +63,13 @@ class ImageAttachment(Attachment):
"""
#: The extension of the original image (e.g. ``png``)
original_extension = attr.ib(None, type=str)
original_extension = attr.ib(None, type=Optional[str])
#: Width of original image
width = attr.ib(None, converter=lambda x: None if x is None else int(x), type=int)
width = attr.ib(None, converter=_util.int_or_none, type=Optional[int])
#: Height of original image
height = attr.ib(None, converter=lambda x: None if x is None else int(x), type=int)
height = attr.ib(None, converter=_util.int_or_none, type=Optional[int])
#: Whether the image is animated
is_animated = attr.ib(None, type=bool)
is_animated = attr.ib(None, type=Optional[bool])
#: A set, containing variously sized / various types of previews of the image
previews = attr.ib(factory=set, type=Set[Image])
@@ -113,15 +113,15 @@ class VideoAttachment(Attachment):
"""Represents a video that has been sent as a Facebook attachment."""
#: Size of the original video in bytes
size = attr.ib(None, type=int)
size = attr.ib(None, type=Optional[int])
#: Width of original video
width = attr.ib(None, type=int)
width = attr.ib(None, type=Optional[int])
#: Height of original video
height = attr.ib(None, type=int)
#: Length of video as a timedelta
duration = attr.ib(None, type=datetime.timedelta)
height = attr.ib(None, type=Optional[int])
#: Length of video
duration = attr.ib(None, type=Optional[datetime.timedelta])
#: URL to very compressed preview video
preview_url = attr.ib(None, type=str)
preview_url = attr.ib(None, type=Optional[str])
#: A set, containing variously sized previews of the video
previews = attr.ib(factory=set, type=Set[Image])

View File

@@ -1,8 +1,11 @@
import attr
import datetime
from . import Image, Attachment
from .._common import attrs_default
from .. import _util, _exception
from typing import Optional
@attrs_default
class LocationAttachment(Attachment):
@@ -12,15 +15,15 @@ class LocationAttachment(Attachment):
"""
#: Latitude of the location
latitude = attr.ib(None, type=float)
latitude = attr.ib(None, type=Optional[float])
#: Longitude of the location
longitude = attr.ib(None, type=float)
longitude = attr.ib(None, type=Optional[float])
#: Image showing the map of the location
image = attr.ib(None, type=Image)
image = attr.ib(None, type=Optional[Image])
#: URL to Bing maps with the location
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
# Address of the location
address = attr.ib(None, type=str)
address = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, data):
@@ -51,11 +54,11 @@ class LiveLocationAttachment(LocationAttachment):
"""Represents a live user location."""
#: Name of the location
name = attr.ib(None)
#: Datetime when live location expires
expires_at = attr.ib(None)
name = attr.ib(None, type=Optional[str])
#: When live location expires
expires_at = attr.ib(None, type=Optional[datetime.datetime])
#: True if live location is expired
is_expired = attr.ib(None)
is_expired = attr.ib(None, type=Optional[bool])
@classmethod
def _from_pull(cls, data):

View File

@@ -4,8 +4,8 @@ import enum
from string import Formatter
from . import _attachment, _location, _file, _quick_reply, _sticker
from .._common import log, attrs_default
from .. import _exception, _util, _session, _threads
from typing import Optional, Mapping, Sequence
from .. import _exception, _util
from typing import Optional, Mapping, Sequence, Any
class EmojiSize(enum.Enum):
@@ -85,7 +85,7 @@ class Message:
"""
#: The thread that this message belongs to.
thread = attr.ib(type="_threads.ThreadABC")
thread = attr.ib()
#: The message ID.
id = attr.ib(converter=str, type=str)
@@ -125,20 +125,12 @@ class Message:
def react(self, reaction: Optional[str]):
"""React to the message, or removes reaction.
Currently, you can use "", "😍", "😆", "😮", "😢", "😠", "👍" or "👎". It
should be possible to add support for more, but we haven't figured that out yet.
Args:
reaction: Reaction emoji to use, or if ``None``, removes reaction.
Example:
>>> message.react("😍")
"""
if reaction and reaction not in SENDABLE_REACTIONS:
raise ValueError(
"Invalid reaction! Please use one of: {}".format(SENDABLE_REACTIONS)
)
data = {
"action": "ADD_REACTION" if reaction else "REMOVE_REACTION",
"client_mutation_id": "1",
@@ -224,7 +216,7 @@ class MessageSnippet(Message):
#: ID of the sender
author = attr.ib(type=str)
#: Datetime of when the message was sent
#: When the message was sent
created_at = attr.ib(type=datetime.datetime)
#: The actual message
text = attr.ib(type=str)
@@ -252,34 +244,34 @@ class MessageData(Message):
#: ID of the sender
author = attr.ib(type=str)
#: Datetime of when the message was sent
#: When the message was sent
created_at = attr.ib(type=datetime.datetime)
#: The actual message
text = attr.ib(None, type=str)
text = attr.ib(None, type=Optional[str])
#: A list of `Mention` objects
mentions = attr.ib(factory=list, type=Sequence[Mention])
#: Size of a sent emoji
emoji_size = attr.ib(None, type=EmojiSize)
emoji_size = attr.ib(None, type=Optional[EmojiSize])
#: Whether the message is read
is_read = attr.ib(None, type=bool)
#: A list of people IDs who read the message, works only with `Client.fetch_thread_messages`
is_read = attr.ib(None, type=Optional[bool])
#: People IDs who read the message, only works with `ThreadABC.fetch_messages`
read_by = attr.ib(factory=list, type=bool)
#: A dictionary with user's IDs as keys, and their reaction as values
reactions = attr.ib(factory=dict, type=Mapping[str, str])
#: A `Sticker`
sticker = attr.ib(None, type=_sticker.Sticker)
sticker = attr.ib(None, type=Optional[_sticker.Sticker])
#: A list of attachments
attachments = attr.ib(factory=list, type=Sequence[_attachment.Attachment])
#: A list of `QuickReply`
quick_replies = attr.ib(factory=list, type=Sequence[_quick_reply.QuickReply])
#: Whether the message is unsent (deleted for everyone)
unsent = attr.ib(False, type=bool)
unsent = attr.ib(False, type=Optional[bool])
#: Message ID you want to reply to
reply_to_id = attr.ib(None, type=str)
reply_to_id = attr.ib(None, type=Optional[str])
#: Replied message
replied_to = attr.ib(None, type="MessageData")
replied_to = attr.ib(None, type=Optional[Any])
#: Whether the message was forwarded
forwarded = attr.ib(False, type=bool)
forwarded = attr.ib(False, type=Optional[bool])
@staticmethod
def _get_forwarded_from_tags(tags):

View File

@@ -4,7 +4,7 @@ import enum
from .._common import attrs_default
from .. import _exception, _util, _session
from typing import Mapping, Sequence
from typing import Mapping, Sequence, Optional
class GuestStatus(enum.Enum):
@@ -132,13 +132,13 @@ class PlanData(Plan):
#: Plan title
title = attr.ib(type=str)
#: Plan location name
location = attr.ib(None, converter=lambda x: x or "", type=str)
location = attr.ib(None, converter=lambda x: x or "", type=Optional[str])
#: Plan location ID
location_id = attr.ib(None, converter=lambda x: x or "", type=str)
location_id = attr.ib(None, converter=lambda x: x or "", type=Optional[str])
#: ID of the plan creator
author_id = attr.ib(None, type=str)
author_id = attr.ib(None, type=Optional[str])
#: `User` ids mapped to their `GuestStatus`
guests = attr.ib(None, type=Mapping[str, GuestStatus])
guests = attr.ib(None, type=Optional[Mapping[str, GuestStatus]])
@property
def going(self) -> Sequence[str]:

View File

@@ -2,7 +2,7 @@ import attr
from . import Attachment
from .._common import attrs_default
from typing import Any
from typing import Any, Optional
@attrs_default
@@ -24,9 +24,9 @@ class QuickReplyText(QuickReply):
"""Represents a text quick reply."""
#: Title of the quick reply
title = attr.ib(None, type=str)
#: URL of the quick reply image (optional)
image_url = attr.ib(None, type=str)
title = attr.ib(None, type=Optional[str])
#: URL of the quick reply image
image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply
_type = "text"
@@ -43,8 +43,8 @@ class QuickReplyLocation(QuickReply):
class QuickReplyPhoneNumber(QuickReply):
"""Represents a phone number quick reply (Doesn't work on mobile)."""
#: URL of the quick reply image (optional)
image_url = attr.ib(None, type=str)
#: URL of the quick reply image
image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply
_type = "user_phone_number"
@@ -53,8 +53,8 @@ class QuickReplyPhoneNumber(QuickReply):
class QuickReplyEmail(QuickReply):
"""Represents an email quick reply (Doesn't work on mobile)."""
#: URL of the quick reply image (optional)
image_url = attr.ib(None, type=str)
#: URL of the quick reply image
image_url = attr.ib(None, type=Optional[str])
#: Type of the quick reply
_type = "user_email"

View File

@@ -2,34 +2,36 @@ import attr
from . import Image, Attachment
from .._common import attrs_default
from typing import Optional
@attrs_default
class Sticker(Attachment):
"""Represents a Facebook sticker that has been sent to a thread as an attachment."""
#: The sticker-pack's ID
pack = attr.ib(None, type=str)
pack = attr.ib(None, type=Optional[str])
#: Whether the sticker is animated
is_animated = attr.ib(False, type=bool)
# If the sticker is animated, the following should be present
#: URL to a medium spritemap
medium_sprite_image = attr.ib(None, type=str)
medium_sprite_image = attr.ib(None, type=Optional[str])
#: URL to a large spritemap
large_sprite_image = attr.ib(None, type=str)
large_sprite_image = attr.ib(None, type=Optional[str])
#: The amount of frames present in the spritemap pr. row
frames_per_row = attr.ib(None, type=int)
frames_per_row = attr.ib(None, type=Optional[int])
#: The amount of frames present in the spritemap pr. column
frames_per_col = attr.ib(None, type=int)
frames_per_col = attr.ib(None, type=Optional[int])
#: The total amount of frames in the spritemap
frame_count = attr.ib(None, type=int)
frame_count = attr.ib(None, type=Optional[int])
#: The frame rate the spritemap is intended to be played in
frame_rate = attr.ib(None, type=int)
frame_rate = attr.ib(None, type=Optional[int])
#: The sticker's image
image = attr.ib(None, type=Image)
image = attr.ib(None, type=Optional[Image])
#: The sticker's label/name
label = attr.ib(None, type=str)
label = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, data):

View File

@@ -1,17 +1,56 @@
import attr
import bs4
import datetime
import re
import requests
import random
import urllib.parse
import re
import json
# TODO: Only import when required
# Or maybe just replace usage with `html.parser`?
import bs4
from ._common import log, kw_only
from . import _graphql, _util, _exception
from typing import Optional, Tuple, Mapping, Callable
from typing import Optional, Mapping, Callable, Any
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
SERVER_JS_DEFINE_REGEX = re.compile(
r'(?:"ServerJS".{,100}\.handle\({.*"define":)'
r'|(?:ServerJS.{,100}\.handleWithCustomApplyEach\(ScheduledApplyEach,{.*"define":)'
r'|(?:require\("ServerJSDefine"\)\)?\.handleDefines\()'
r'|(?:"require":\[\["ScheduledServerJS".{,100}"define":)'
)
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
def parse_server_js_define(html: str) -> Mapping[str, Any]:
"""Parse ``ServerJSDefine`` entries from a HTML document."""
# Find points where we should start parsing
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
# TODO: Extract jsmods "require" and "define" from `bigPipe.onPageletArrive`?
# Skip leading entry
_, *define_splits = define_splits
rtn = []
if not define_splits:
raise _exception.ParseError("Could not find any ServerJSDefine", data=html)
if len(define_splits) < 2:
raise _exception.ParseError("Could not find enough ServerJSDefine", data=html)
# Parse entries (should be two)
for entry in define_splits:
try:
parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0)
except json.JSONDecodeError as e:
raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e
if not isinstance(parsed, list):
raise _exception.ParseError("Invalid ServerJSDefine", data=parsed)
rtn.extend(parsed)
# Convert to a dict
return _util.get_jsmods_define(rtn)
def base36encode(number: int) -> str:
@@ -32,7 +71,7 @@ def base36encode(number: int) -> str:
def prefix_url(url: str) -> str:
if url.startswith("/"):
return "https://www.facebook.com" + url
return "https://www.messenger.com" + url
return url
@@ -51,97 +90,131 @@ def get_user_id(session: requests.Session) -> str:
return str(rtn)
def find_input_fields(html: str):
return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input"))
def session_factory() -> requests.Session:
from . import __version__
session = requests.session()
session.headers["Referer"] = "https://www.facebook.com"
# TODO: Deprecate setting the user agent manually
session.headers["User-Agent"] = random.choice(_util.USER_AGENTS)
# Override Facebook's locale detection during the login process.
# The locale is only used when giving errors back to the user, so giving the errors
# back in English makes it easier for users to report.
session.cookies = session.cookies = requests.cookies.merge_cookies(
session.cookies, {"locale": "en_US"}
)
session.headers["Referer"] = "https://www.messenger.com/"
# We won't try to set a fake user agent to mask our presence!
# Facebook allows us access anyhow, and it makes our motives clearer:
# We're not trying to cheat Facebook, we simply want to access their service
session.headers["User-Agent"] = "fbchat/{}".format(__version__)
return session
def login_cookies(at: datetime.datetime):
return {"act": "{}/0".format(_util.datetime_to_millis(at))}
def client_id_factory() -> str:
return hex(int(random.random() * 2 ** 31))[2:]
return hex(int(random.random() * 2**31))[2:]
def is_home(url: str) -> bool:
parts = urllib.parse.urlparse(url)
# Check the urls `/home.php` and `/`
return "home" in parts.path or "/" == parts.path
def find_form_request(html: str):
soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form"))
form = soup.form
if not form:
raise _exception.ParseError("Could not find form to submit", data=html)
url = form.get("action")
if not url:
raise _exception.ParseError("Could not find url to submit to", data=form)
# From what I've seen, it'll always do this!
if url.startswith("/"):
url = "https://www.facebook.com" + url
# It's okay to set missing values to something crap, the values are localized, and
# hence are not available in the raw HTML
data = {
x["name"]: x.get("value", "[missing]")
for x in form.find_all(["input", "button"])
}
return url, data
def _2fa_helper(session: requests.Session, code: int, r):
soup = find_input_fields(r.text)
data = dict()
def two_factor_helper(session: requests.Session, r, on_2fa_callback):
url, data = find_form_request(r.content.decode("utf-8"))
url = "https://m.facebook.com/login/checkpoint/"
# You don't have to type a code if your device is already saved
# Repeats if you get the code wrong
while "approvals_code" in data:
data["approvals_code"] = on_2fa_callback()
log.info("Submitting 2FA code")
r = session.post(
url, data=data, allow_redirects=False, cookies=login_cookies(_util.now())
)
log.debug("2FA location: %s", r.headers.get("Location"))
url, data = find_form_request(r.content.decode("utf-8"))
data["approvals_code"] = str(code)
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"]
data["nh"] = soup.find("input", {"name": "nh"})["value"]
data["submit[Submit Code]"] = "Submit Code"
data["codes_submitted"] = "0"
log.info("Submitting 2FA code.")
# TODO: Can be missing if checkup flow was done on another device in the meantime?
if "name_action_selected" in data:
data["name_action_selected"] = "save_device"
log.info("Saving browser")
r = session.post(
url, data=data, allow_redirects=False, cookies=login_cookies(_util.now())
)
log.debug("2FA location: %s", r.headers.get("Location"))
url = r.headers.get("Location")
if url and url.startswith("https://www.messenger.com/login/auth_token/"):
return url
url, data = find_form_request(r.content.decode("utf-8"))
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["approvals_code"]
del data["submit[Submit Code]"]
del data["codes_submitted"]
data["name_action_selected"] = "save_device"
data["submit[Continue]"] = "Continue"
log.info("Saving browser.")
# At this stage, we have dtsg, nh, name_action_selected, submit[Continue]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["name_action_selected"]
log.info("Starting Facebook checkup flow.")
# At this stage, we have dtsg, nh, submit[Continue]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[Continue]"]
data["submit[This was me]"] = "This Was Me"
log.info("Verifying login attempt.")
# At this stage, we have dtsg, nh, submit[This was me]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[This was me]"]
data["submit[Continue]"] = "Continue"
data["name_action_selected"] = "save_device"
log.info("Saving device again.")
# At this stage, we have dtsg, nh, submit[Continue], name_action_selected
r = session.post(url, data=data)
return r
def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]:
"""Get error code and message from a request."""
code = None
try:
code = int(_util.get_url_parameter(url, "e"))
except (TypeError, ValueError):
pass
soup = bs4.BeautifulSoup(
html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error")
log.info("Starting Facebook checkup flow")
r = session.post(
url, data=data, allow_redirects=False, cookies=login_cookies(_util.now())
)
return code, soup.get_text() or None
log.debug("2FA location: %s", r.headers.get("Location"))
url, data = find_form_request(r.content.decode("utf-8"))
if "verification_method" in data:
raise _exception.NotLoggedIn(
"Your account is locked, and you need to log in using a browser, and verify it there!"
)
if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
data["submit[This was me]"] = "[any value]"
del data["submit[This wasn't me]"]
log.info("Verifying login attempt")
r = session.post(
url, data=data, allow_redirects=False, cookies=login_cookies(_util.now())
)
log.debug("2FA location: %s", r.headers.get("Location"))
url, data = find_form_request(r.content.decode("utf-8"))
if "name_action_selected" not in data:
raise _exception.ParseError("Could not fill out form properly (3)", data=data)
data["name_action_selected"] = "save_device"
log.info("Saving device again")
r = session.post(
url, data=data, allow_redirects=False, cookies=login_cookies(_util.now())
)
log.debug("2FA location: %s", r.headers.get("Location"))
return r.headers.get("Location")
def get_error_data(html: str) -> Optional[str]:
"""Get error message from a request."""
soup = bs4.BeautifulSoup(
html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
)
# Attempt to extract and format the error string
return " ".join(list(soup.stripped_strings)[1:3]) or None
def get_fb_dtsg(define) -> Optional[str]:
if "DTSGInitData" in define:
return define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
return define["DTSGInitialData"]["token"]
return None
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
@@ -157,7 +230,6 @@ class Session:
_session = attr.ib(factory=session_factory, type=requests.Session)
_counter = attr.ib(0, type=int)
_client_id = attr.ib(factory=client_id_factory, type=str)
_logout_h = attr.ib(None, type=str)
@property
def user(self):
@@ -181,6 +253,7 @@ class Session:
"fb_dtsg": self._fb_dtsg,
}
# TODO: Add ability to load previous cookies in here, to avoid 2fa flow
@classmethod
def login(
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
@@ -190,65 +263,121 @@ class Session:
Args:
email: Facebook ``email``, ``id`` or ``phone number``
password: Facebook account password
on_2fa_callback: Function that will be called, in case a 2FA code is needed.
This should return the requested 2FA code.
on_2fa_callback: Function that will be called, in case a two factor
authentication code is needed. This should return the requested code.
Tested using SMS and authentication applications. If you have both
enabled, you might not receive an SMS code, and you'll have to use the
authentication application.
Note: Facebook limits the amount of codes they will give you, so if you
don't receive a code, be patient, and try again later!
Example:
>>> import getpass
>>> import fbchat
>>> session = fbchat.Session.login("<email or phone>", getpass.getpass())
>>> import getpass
>>> session = fbchat.Session.login(
... input("Email: "),
... getpass.getpass(),
... on_2fa_callback=lambda: input("2FA Code: ")
... )
Email: abc@gmail.com
Password: ****
2FA Code: 123456
>>> session.user.id
"1234"
"""
session = session_factory()
try:
r = session.get("https://m.facebook.com/")
except requests.RequestException as e:
_exception.handle_requests_error(e)
soup = find_input_fields(r.text)
data = dict(
(elem["name"], elem["value"])
for elem in soup
if elem.has_attr("value") and elem.has_attr("name")
)
data["email"] = email
data["pass"] = password
data["login"] = "Log In"
data = {
# "jazoest": "2754",
# "lsd": "AVqqqRUa",
"initial_request_id": "x", # any, just has to be present
# "timezone": "-120",
# "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=",
# "lgnrnd": "044039_RGm9",
"lgnjs": "n",
"email": email,
"pass": password,
"login": "1",
"persistent": "1", # Changes the cookie type to have a long "expires"
"default_persistent": "0",
}
try:
url = "https://m.facebook.com/login.php?login_attempt=1"
r = session.post(url, data=data)
except requests.RequestException as e:
_exception.handle_requests_error(e)
# Usually, 'Checkpoint' will refer to 2FA
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
if not on_2fa_callback:
raise ValueError(
"2FA code required, please add `on_2fa_callback` to .login"
)
code = on_2fa_callback()
try:
r = _2fa_helper(session, code, r)
except requests.RequestException as e:
_exception.handle_requests_error(e)
# Sometimes Facebook tries to show the user a "Save Device" dialog
if "save-device" in r.url:
try:
r = session.get("https://m.facebook.com/login/save-device/cancel/")
except requests.RequestException as e:
_exception.handle_requests_error(e)
if is_home(r.url):
return cls._from_session(session=session)
else:
code, msg = get_error_data(r.text, r.url)
raise _exception.ExternalError(
"Login failed at url {!r}".format(r.url), msg, code=code
# Should hit a redirect to https://www.messenger.com/
# If this does happen, the session is logged in!
r = session.post(
"https://www.messenger.com/login/password/",
data=data,
allow_redirects=False,
cookies=login_cookies(_util.now()),
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"accept-language": "en-HU,en;q=0.9,hu-HU;q=0.8,hu;q=0.7,en-US;q=0.6",
"cache-control": "max-age=0",
"origin": "https://www.messenger.com",
"referer": "https://www.messenger.com/login/",
"sec-ch-ua": '" Not;A Brand";v="99", "Google Chrome";v="91", "Chromium";v="91"',
"sec-ch-ua-mobile": "?0",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
},
)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
url = r.headers.get("Location")
# We weren't redirected, hence the email or password was wrong
if not url:
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn(error)
if "checkpoint" in url:
if not on_2fa_callback:
raise _exception.NotLoggedIn(
"2FA code required! Please supply `on_2fa_callback` to .login"
)
# Get a facebook.com/checkpoint/start url that handles the 2FA flow
# This probably works differently for Messenger-only accounts
url = _util.get_url_parameter(url, "next")
if not url.startswith("https://www.facebook.com/checkpoint/start/"):
raise _exception.ParseError("Failed 2fa flow (1)", data=url)
r = session.get(
url, allow_redirects=False, cookies=login_cookies(_util.now())
)
url = r.headers.get("Location")
if not url or not url.startswith("https://www.facebook.com/checkpoint/"):
raise _exception.ParseError("Failed 2fa flow (2)", data=url)
r = session.get(
url, allow_redirects=False, cookies=login_cookies(_util.now())
)
url = two_factor_helper(session, r, on_2fa_callback)
if not url.startswith("https://www.messenger.com/login/auth_token/"):
raise _exception.ParseError("Failed 2fa flow (3)", data=url)
r = session.get(
url, allow_redirects=False, cookies=login_cookies(_util.now())
)
url = r.headers.get("Location")
if url != "https://www.messenger.com/":
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error))
try:
return cls._from_session(session=session)
except _exception.NotLoggedIn as e:
raise _exception.ParseError("Failed loading session", data=r) from e
def is_logged_in(self) -> bool:
"""Send a request to Facebook to check the login status.
@@ -260,12 +389,12 @@ class Session:
>>> assert session.is_logged_in()
"""
# Send a request to the login url, to see if we're directed to the home page
url = "https://m.facebook.com/login.php?login_attempt=1"
try:
r = self._session.get(url, allow_redirects=False)
r = self._session.get(prefix_url("/login/"), allow_redirects=False)
except requests.RequestException as e:
_exception.handle_requests_error(e)
return "Location" in r.headers and is_home(r.headers["Location"])
_exception.handle_http_error(r.status_code)
return "https://www.messenger.com/" == r.headers.get("Location")
def logout(self) -> None:
"""Safely log out the user.
@@ -275,56 +404,51 @@ class Session:
Example:
>>> session.logout()
"""
logout_h = self._logout_h
if not logout_h:
url = prefix_url("/bluebar/modern_settings_menu/")
try:
h_r = self._session.post(url, data={"pmid": "4"})
except requests.RequestException as e:
_exception.handle_requests_error(e)
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
url = prefix_url("/logout.php")
data = {"fb_dtsg": self._fb_dtsg}
try:
r = self._session.get(url, params={"ref": "mb", "h": logout_h})
r = self._session.post(
prefix_url("/logout/"), data=data, allow_redirects=False
)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
if "Location" not in r.headers:
raise _exception.FacebookError("Failed logging out, was not redirected!")
if "https://www.messenger.com/login/" != r.headers["Location"]:
raise _exception.FacebookError(
"Failed logging out, got bad redirect: {}".format(r.headers["Location"])
)
@classmethod
def _from_session(cls, session):
# TODO: Automatically set user_id when the cookie changes in the session
user_id = get_user_id(session)
# Make a request to the main page to retrieve ServerJSDefine entries
try:
r = session.get(prefix_url("/"))
r = session.get(prefix_url("/"), allow_redirects=True)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
soup = find_input_fields(r.text)
define = parse_server_js_define(r.content.decode("utf-8"))
fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"})
if fb_dtsg_element:
fb_dtsg = fb_dtsg_element["value"]
else:
# Fall back to searching with a regex
res = FB_DTSG_REGEX.search(r.text)
if not res:
raise _exception.NotLoggedIn("Could not find fb_dtsg")
fb_dtsg = res.group(1)
fb_dtsg = get_fb_dtsg(define)
if fb_dtsg is None:
raise _exception.ParseError("Could not find fb_dtsg", data=define)
if not fb_dtsg:
# Happens when the client is not actually logged in
raise _exception.NotLoggedIn(
"Found empty fb_dtsg, the session was probably invalid."
)
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0])
try:
revision = int(define["SiteData"]["client_revision"])
except TypeError:
raise _exception.ParseError("Could not find client revision", data=define)
logout_h_element = soup.find("input", {"name": "h"})
logout_h = logout_h_element["value"] if logout_h_element else None
return cls(
user_id=user_id,
fb_dtsg=fb_dtsg,
revision=revision,
session=session,
logout_h=logout_h,
)
return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session)
def get_cookies(self) -> Mapping[str, str]:
"""Retrieve session cookies, that can later be used in `from_cookies`.
@@ -379,10 +503,9 @@ class Session:
# update fb_dtsg token if received in response
if "jsmods" in j:
define = _util.get_jsmods_define(j["jsmods"]["define"])
if "DTSGInitData" in define:
self._fb_dtsg = define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
self._fb_dtsg = define["DTSGInitialData"]["token"]
fb_dtsg = get_fb_dtsg(define)
if fb_dtsg:
self._fb_dtsg = fb_dtsg
try:
return j["payload"]
@@ -400,7 +523,7 @@ class Session:
return self._post("/api/graphqlbatch/", data, as_graphql=True)
def _do_send_request(self, data):
now = datetime.datetime.utcnow()
now = _util.now()
offline_threading_id = _util.generate_offline_threading_id()
data["client"] = "mercury"
data["author"] = "fbid:{}".format(self._user_id)
@@ -425,3 +548,37 @@ class Session:
return message_ids[0]
except (KeyError, IndexError, TypeError) as e:
raise _exception.ParseError("No message IDs could be found", data=j) from e
def _uri_share_data(self, data):
data["image_height"] = 960
data["image_width"] = 960
data["__user"] = self.user.id
j = self._post("/message_share_attachment/fromURI/", data)
return j["payload"]["share_data"]
def to_file(self, filename):
"""Save the session to a file.
Args:
filename: The file to save the session to
Example:
>>> session = fbchat.Session.from_cookies(cookies)
>>> session.to_file("session.json")
"""
with open(filename, "w") as f:
json.dump(self.get_cookies(), f)
@classmethod
def from_file(cls, filename):
"""Load a session from a file.
Args:
filename: The file to load the session from
Example:
>>> session = fbchat.Session.from_file("session.json")
"""
with open(filename, "r") as f:
cookies = json.load(f)
return cls.from_cookies(cookies)

View File

@@ -105,6 +105,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
mentions: Iterable["_models.Mention"] = None,
files: Iterable[Tuple[str, str]] = None,
reply_to_id: str = None,
uri: str = None
) -> str:
"""Send a message to the thread.
@@ -114,10 +115,17 @@ class ThreadABC(metaclass=abc.ABCMeta):
files: Optional tuples, each containing an uploaded file's ID and mimetype.
See `ThreadABC.send_files` for an example.
reply_to_id: Optional message to reply to
uri: Uri to formulate a sharable attachment with
Example:
Send a message with a mention to a thread.
>>> mention = fbchat.Mention(thread_id="1234", offset=5, length=2)
>>> thread.send_text("A message", mentions=[mention])
>>> message_id = thread.send_text("A message", mentions=[mention])
Reply to the message.
>>> thread.send_text("A reply", reply_to_id=message_id)
Returns:
The sent message
@@ -132,6 +140,9 @@ class ThreadABC(metaclass=abc.ABCMeta):
if files:
data["has_attachment"] = True
if uri:
data.update(self._generate_shareable_attachment(uri))
for i, (file_id, mimetype) in enumerate(files or ()):
data["{}s[{}]".format(_util.mimetype_to_key(mimetype), i)] = file_id
@@ -211,7 +222,7 @@ class ThreadABC(metaclass=abc.ABCMeta):
Example:
Send a pinned location in Beijing, China.
>>> thread.send_location(39.9390731, 116.117273)
>>> thread.send_pinned_location(39.9390731, 116.117273)
"""
self._send_location(False, latitude=latitude, longitude=longitude)
@@ -229,7 +240,53 @@ class ThreadABC(metaclass=abc.ABCMeta):
>>> thread.send_files(files)
"""
return self.send_text(text=None, files=files)
def send_uri(self, uri: str, **kwargs):
"""Send a uri preview to a thread.
Args:
uri: uri to preview
"""
if kwargs.get('text') is None:
kwargs['text'] = None
self.send_text(uri=uri, **kwargs)
def _generate_shareable_attachment(self, uri):
"""Send a uri preview to a thread.
Args:
uri: uri to preview
Returns:
:ref:`Message ID <intro_message_ids>` of the sent message
Raises:
FBchatException: If request failed
"""
url_data = self.session._uri_share_data({"uri": uri})
data = self._to_send_data()
data["action_type"] = "ma-type:user-generated-message"
data["shareable_attachment[share_type]"] = url_data["share_type"]
# Most uri params will come back as dict
if isinstance(url_data["share_params"], dict):
data["has_attachment"] = True
for key in url_data["share_params"]:
if isinstance(url_data["share_params"][key], dict):
for key2 in url_data["share_params"][key]:
data[
"shareable_attachment[share_params][{}][{}]".format(
key, key2
)
] = url_data["share_params"][key][key2]
else:
data[
"shareable_attachment[share_params][{}]".format(key)
] = url_data["share_params"][key]
# Some (such as facebook profile pages) will just be a list
else:
data["has_attachment"] = False
for index, val in enumerate(url_data["share_params"]):
data["shareable_attachment[share_params][{}]".format(index)] = val
return data
# xmd = {"quick_replies": []}
# for quick_reply in quick_replies:
# # TODO: Move this to `_quick_reply.py`
@@ -313,13 +370,13 @@ class ThreadABC(metaclass=abc.ABCMeta):
def search_messages(
self, query: str, limit: int
) -> Iterable["_models.MessageSnippet"]:
) -> Iterable[_models.MessageSnippet]:
"""Find and get message IDs by query.
Warning! If someone send a message to the thread that matches the query, while
we're searching, some snippets will get returned twice.
This is fundamentally unfixable, it's just how the endpoint is implemented.
This is fundamentally not fixable, it's just how the endpoint is implemented.
The returned message snippets are ordered by last sent first.

View File

@@ -4,7 +4,8 @@ from ._abc import ThreadABC
from . import _user
from .._common import attrs_default
from .. import _util, _session, _graphql, _models
from typing import Sequence, Iterable, Set, Mapping
from typing import Sequence, Iterable, Set, Mapping, Optional
@attrs_default
@@ -179,31 +180,31 @@ class GroupData(Group):
"""
#: The group's picture
photo = attr.ib(None, type="_models.Image")
photo = attr.ib(None, type=Optional[_models.Image])
#: The name of the group
name = attr.ib(None, type=str)
name = attr.ib(None, type=Optional[str])
#: When the group was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime)
last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the group
message_count = attr.ib(None, type=int)
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type="_models.PlanData")
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The group thread's participant user ids
participants = attr.ib(factory=set, type=Set[str])
#: A dictionary, containing user nicknames mapped to their IDs
nicknames = attr.ib(factory=dict, type=Mapping[str, str])
#: The groups's message color
color = attr.ib(None, type=str)
color = attr.ib(None, type=Optional[str])
#: The groups's default emoji
emoji = attr.ib(None, type=str)
emoji = attr.ib(None, type=Optional[str])
# User ids of thread admins
admins = attr.ib(factory=set, type=Set[str])
# True if users need approval to join
approval_mode = attr.ib(None, type=bool)
approval_mode = attr.ib(None, type=Optional[bool])
# Set containing user IDs requesting to join
approval_requests = attr.ib(factory=set, type=Set[str])
# Link for joining group
join_link = attr.ib(None, type=str)
join_link = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, session, data):

View File

@@ -4,6 +4,8 @@ from ._abc import ThreadABC
from .._common import attrs_default
from .. import _session, _models
from typing import Optional
@attrs_default
class Page(ThreadABC):
@@ -35,25 +37,25 @@ class PageData(Page):
"""
#: The page's picture
photo = attr.ib(type="_models.Image")
photo = attr.ib(type=_models.Image)
#: The name of the page
name = attr.ib(type=str)
#: When the thread was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime)
last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the thread
message_count = attr.ib(None, type=int)
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type="_models.PlanData")
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The page's custom URL
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
#: The name of the page's location city
city = attr.ib(None, type=str)
city = attr.ib(None, type=Optional[str])
#: Amount of likes the page has
likes = attr.ib(None, type=int)
likes = attr.ib(None, type=Optional[int])
#: Some extra information about the page
sub_title = attr.ib(None, type=str)
sub_title = attr.ib(None, type=Optional[str])
#: The page's category
category = attr.ib(None, type=str)
category = attr.ib(None, type=Optional[str])
@classmethod
def _from_graphql(cls, session, data):

View File

@@ -4,6 +4,8 @@ from ._abc import ThreadABC
from .._common import log, attrs_default
from .. import _util, _session, _models
from typing import Optional
GENDERS = {
# For standard requests
@@ -103,7 +105,7 @@ class UserData(User):
"""
#: The user's picture
photo = attr.ib(type="_models.Image")
photo = attr.ib(type=_models.Image)
#: The name of the user
name = attr.ib(type=str)
#: Whether the user and the client are friends
@@ -111,27 +113,27 @@ class UserData(User):
#: The users first name
first_name = attr.ib(type=str)
#: The users last name
last_name = attr.ib(None, type=str)
#: Datetime when the thread was last active / when the last message was sent
last_active = attr.ib(None, type=datetime.datetime)
last_name = attr.ib(None, type=Optional[str])
#: When the thread was last active / when the last message was sent
last_active = attr.ib(None, type=Optional[datetime.datetime])
#: Number of messages in the thread
message_count = attr.ib(None, type=int)
message_count = attr.ib(None, type=Optional[int])
#: Set `Plan`
plan = attr.ib(None, type="_models.PlanData")
plan = attr.ib(None, type=Optional[_models.PlanData])
#: The profile URL. ``None`` for Messenger-only users
url = attr.ib(None, type=str)
url = attr.ib(None, type=Optional[str])
#: The user's gender
gender = attr.ib(None, type=str)
gender = attr.ib(None, type=Optional[str])
#: From 0 to 1. How close the client is to the user
affinity = attr.ib(None, type=float)
affinity = attr.ib(None, type=Optional[float])
#: The user's nickname
nickname = attr.ib(None, type=str)
nickname = attr.ib(None, type=Optional[str])
#: The clients nickname, as seen by the user
own_nickname = attr.ib(None, type=str)
own_nickname = attr.ib(None, type=Optional[str])
#: The message color
color = attr.ib(None, type=str)
color = attr.ib(None, type=Optional[str])
#: The default emoji
emoji = attr.ib(None, type=str)
emoji = attr.ib(None, type=Optional[str])
@staticmethod
def _get_other_user(data):

View File

@@ -9,15 +9,12 @@ from . import _exception
from typing import Iterable, Optional, Any, Mapping, Sequence
#: Default list of user agents
USER_AGENTS = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/601.1.10 (KHTML, like Gecko) Version/8.0.5 Safari/601.1.10",
"Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
]
def int_or_none(inp: Any) -> Optional[int]:
try:
return int(inp)
except Exception:
return None
def get_limits(limit: Optional[int], max_limit: int) -> Iterable[int]:
@@ -59,23 +56,26 @@ def parse_json(text: str) -> Any:
def generate_offline_threading_id():
ret = datetime_to_millis(datetime.datetime.utcnow())
ret = datetime_to_millis(now())
value = int(random.random() * 4294967295)
string = ("0000000000000000000000" + format(value, "b"))[-22:]
msgs = format(ret, "b") + string
return str(int(msgs, 2))
def remove_version_from_module(module):
return module.split("@", 1)[0]
def get_jsmods_require(require) -> Mapping[str, Sequence[Any]]:
rtn = {}
for item in require:
if len(item) == 1:
(module,) = item
rtn[module] = []
rtn[remove_version_from_module(module)] = []
continue
method = "{}.{}".format(item[0], item[1])
requirements = item[2]
arguments = item[3]
module, method, requirements, arguments = item
method = "{}.{}".format(remove_version_from_module(module), method)
rtn[method] = arguments
return rtn
@@ -158,3 +158,11 @@ def timedelta_to_seconds(td: datetime.timedelta) -> int:
The returned seconds will be rounded to the nearest whole number.
"""
return round(td.total_seconds())
def now() -> datetime.datetime:
"""The current time.
Similar to datetime.datetime.now(), but returns a non-naive datetime.
"""
return datetime.datetime.now(tz=datetime.timezone.utc)

View File

@@ -12,7 +12,7 @@ author = "Taehoon Kim"
author-email = "carpedm20@gmail.com"
maintainer = "Mads Marquart"
maintainer-email = "madsmtm@gmail.com"
home-page = "https://github.com/carpedm20/fbchat/"
home-page = "https://git.karaolidis.com/karaolidis/fbchat/"
requires = [
"attrs>=19.1",
"requests~=2.19",
@@ -47,8 +47,7 @@ keywords = "Facebook FB Messenger Library Chat Api Bot"
license = "BSD 3-Clause"
[tool.flit.metadata.urls]
Documentation = "https://fbchat.readthedocs.io/"
Repository = "https://github.com/carpedm20/fbchat/"
Repository = "https://git.karaolidis.com/karaolidis/fbchat/"
[tool.flit.metadata.requires-extra]
test = [

View File

@@ -1,6 +1,10 @@
[pytest]
xfail_strict = true
markers =
online: Online tests, that require a user account set up. Meant to be used \
manually, to check whether Facebook has broken something.
addopts =
--strict
-m "not online"
testpaths = tests
filterwarnings = error

View File

@@ -123,6 +123,37 @@ def test_title_set(session):
) == parse_delta(session, data)
def test_title_removed(session):
data = {
"irisSeqId": "11223344",
"irisTags": ["DeltaThreadName", "is_from_iris_fanout"],
"messageMetadata": {
"actorFbId": "3456",
"adminText": "You removed the group name.",
"folderId": {"systemFolderId": "INBOX"},
"messageId": "mid.$XYZ",
"offlineThreadingId": "1122334455",
"skipBumpThread": False,
"tags": [],
"threadKey": {"threadFbId": "4321"},
"threadReadStateEffect": "KEEP_AS_IS",
"timestamp": "1500000000000",
"unsendType": "deny_log_message",
},
"name": "",
"participants": ["1234", "2345", "3456", "4567"],
"requestContext": {"apiArgs": {}},
"tqSeqId": "1111",
"class": "ThreadName",
}
assert TitleSet(
author=User(session=session, id="3456"),
thread=Group(session=session, id="4321"),
title=None,
at=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc),
) == parse_delta(session, data)
def test_forced_fetch(session):
data = {
"forceInsert": False,

67
tests/online/conftest.py Normal file
View File

@@ -0,0 +1,67 @@
import fbchat
import pytest
import logging
import getpass
@pytest.fixture(scope="session")
def session(pytestconfig):
session_cookies = pytestconfig.cache.get("session_cookies", None)
try:
session = fbchat.Session.from_cookies(session_cookies)
except fbchat.FacebookError:
logging.exception("Error while logging in with cookies!")
session = fbchat.Session.login(input("Email: "), getpass.getpass("Password: "))
yield session
pytestconfig.cache.set("session_cookies", session.get_cookies())
# TODO: Allow the main session object to be closed - and perhaps used in `with`?
session._session.close()
@pytest.fixture
def client(session):
return fbchat.Client(session=session)
@pytest.fixture(scope="session")
def user(pytestconfig, session):
user_id = pytestconfig.cache.get("user_id", None)
if not user_id:
user_id = input("A user you're chatting with's id: ")
pytestconfig.cache.set("user_id", user_id)
return fbchat.User(session=session, id=user_id)
@pytest.fixture(scope="session")
def group(pytestconfig, session):
group_id = pytestconfig.cache.get("group_id", None)
if not group_id:
group_id = input("A group you're chatting with's id: ")
pytestconfig.cache.set("group_id", group_id)
return fbchat.Group(session=session, id=group_id)
@pytest.fixture(
scope="session",
params=[
"user",
"group",
"self",
pytest.param("invalid", marks=[pytest.mark.xfail()]),
],
)
def any_thread(request, session, user, group):
return {
"user": user,
"group": group,
"self": session.user,
"invalid": fbchat.Thread(session=session, id="0"),
}[request.param]
@pytest.fixture
def listener(session):
return fbchat.Listener(session=session, chat_on=False, foreground=False)

116
tests/online/test_client.py Normal file
View File

@@ -0,0 +1,116 @@
import pytest
import fbchat
import os
pytestmark = pytest.mark.online
def test_fetch(client):
client.fetch_users()
def test_search_for_users(client):
list(client.search_for_users("test", 10))
def test_search_for_pages(client):
list(client.search_for_pages("test", 100))
def test_search_for_groups(client):
list(client.search_for_groups("test", 1000))
def test_search_for_threads(client):
list(client.search_for_threads("test", 1000))
with pytest.raises(fbchat.HTTPError, match="rate limited"):
list(client.search_for_threads("test", 10000))
def test_message_search(client):
list(client.search_messages("test", 500))
def test_fetch_thread_info(client):
list(client.fetch_thread_info(["4"]))[0]
def test_fetch_threads(client):
list(client.fetch_threads(20))
list(client.fetch_threads(200))
def test_undocumented(client):
client.fetch_unread()
client.fetch_unseen()
@pytest.fixture
def open_resource(pytestconfig):
def get_resource_inner(filename):
path = os.path.join(pytestconfig.rootdir, "tests", "resources", filename)
return open(path, "rb")
return get_resource_inner
def test_upload_and_fetch_image_url(client, open_resource):
with open_resource("image.png") as f:
((id, mimetype),) = client.upload([("image.png", f, "image/png")])
assert mimetype == "image/png"
assert client.fetch_image_url(id).startswith("http")
def test_upload_image(client, open_resource):
with open_resource("image.png") as f:
_ = client.upload([("image.png", f, "image/png")])
def test_upload_many(client, open_resource):
with open_resource("image.png") as f_png, open_resource(
"image.jpg"
) as f_jpg, open_resource("image.gif") as f_gif, open_resource(
"file.json"
) as f_json, open_resource(
"file.txt"
) as f_txt, open_resource(
"audio.mp3"
) as f_mp3, open_resource(
"video.mp4"
) as f_mp4:
_ = client.upload(
[
("image.png", f_png, "image/png"),
("image.jpg", f_jpg, "image/jpeg"),
("image.gif", f_gif, "image/gif"),
("file.json", f_json, "application/json"),
("file.txt", f_txt, "text/plain"),
("audio.mp3", f_mp3, "audio/mpeg"),
("video.mp4", f_mp4, "video/mp4"),
]
)
def test_mark_as_read(client, user, group):
client.mark_as_read([user, group], fbchat._util.now())
def test_mark_as_unread(client, user, group):
client.mark_as_unread([user, group], fbchat._util.now())
def test_move_threads(client, user, group):
client.move_threads(fbchat.ThreadLocation.PENDING, [user, group])
client.move_threads(fbchat.ThreadLocation.INBOX, [user, group])
@pytest.mark.skip(reason="need to have threads to delete")
def test_delete_threads():
pass
@pytest.mark.skip(reason="need to have messages to delete")
def test_delete_messages():
pass

42
tests/online/test_send.py Normal file
View File

@@ -0,0 +1,42 @@
import pytest
import fbchat
pytestmark = pytest.mark.online
# TODO: Verify return values
def test_wave(any_thread):
assert any_thread.wave(True)
assert any_thread.wave(False)
def test_send_text(any_thread):
assert any_thread.send_text("Test")
def test_send_text_with_mention(any_thread):
mention = fbchat.Mention(thread_id=any_thread.id, offset=5, length=8)
assert any_thread.send_text("Test @mention", mentions=[mention])
def test_send_emoji(any_thread):
assert any_thread.send_emoji("😀", size=fbchat.EmojiSize.LARGE)
def test_send_sticker(any_thread):
assert any_thread.send_sticker("1889713947839631")
def test_send_location(any_thread):
any_thread.send_location(51.5287718, -0.2416815)
def test_send_pinned_location(any_thread):
any_thread.send_pinned_location(39.9390731, 116.117273)
@pytest.mark.skip(reason="need a way to use the uploaded files from test_client.py")
def test_send_files(any_thread):
pass

View File

@@ -1,15 +1,61 @@
import datetime
import pytest
from fbchat import ParseError, _util
from fbchat._session import (
parse_server_js_define,
base36encode,
prefix_url,
generate_message_id,
session_factory,
client_id_factory,
is_home,
find_form_request,
get_error_data,
)
def test_parse_server_js_define_old():
html = """
some data;require("TimeSliceImpl").guard(function(){(require("ServerJSDefine")).handleDefines([["DTSGInitialData",[],{"token":"123"},100]])
<script>require("TimeSliceImpl").guard(function() {require("ServerJSDefine").handleDefines([["DTSGInitData",[],{"token":"123","async_get_token":"12345"},3333]])
</script>
other irrelevant data
"""
define = parse_server_js_define(html)
assert define == {
"DTSGInitialData": {"token": "123"},
"DTSGInitData": {"async_get_token": "12345", "token": "123"},
}
def test_parse_server_js_define_new():
html = """
some data;require("TimeSliceImpl").guard(function(){new (require("ServerJS"))().handle({"define":[["DTSGInitialData",[],{"token":""},100]],"require":[...]});}, "ServerJS define", {"root":true})();
more data
<script><script>require("TimeSliceImpl").guard(function(){var s=new (require("ServerJS"))();s.handle({"define":[["DTSGInitData",[],{"token":"","async_get_token":""},3333]],"require":[...]});require("Run").onAfterLoad(function(){s.cleanup(require("TimeSliceImpl"))});}, "ServerJS define", {"root":true})();</script>
other irrelevant data
"""
define = parse_server_js_define(html)
assert define == {
"DTSGInitialData": {"token": ""},
"DTSGInitData": {"async_get_token": "", "token": ""},
}
def test_parse_server_js_define_error():
with pytest.raises(ParseError, match="Could not find any"):
parse_server_js_define("")
html = 'function(){(require("ServerJSDefine")).handleDefines([{"a": function(){}}])'
with pytest.raises(ParseError, match="Invalid"):
parse_server_js_define(html + html)
html = 'function(){require("ServerJSDefine").handleDefines({"a": "b"})'
with pytest.raises(ParseError, match="Invalid"):
parse_server_js_define(html + html)
@pytest.mark.parametrize(
"number,expected",
[(1, "1"), (10, "a"), (123, "3f"), (1000, "rs"), (123456789, "21i3v9")],
@@ -19,13 +65,20 @@ def test_base36encode(number, expected):
def test_prefix_url():
assert prefix_url("/") == "https://www.facebook.com/"
assert prefix_url("/abc") == "https://www.facebook.com/abc"
static_url = "https://upload.messenger.com/"
assert prefix_url(static_url) == static_url
assert prefix_url("/") == "https://www.messenger.com/"
assert prefix_url("/abc") == "https://www.messenger.com/abc"
def test_generate_message_id():
# Returns random output, so hard to test more thoroughly
assert generate_message_id(datetime.datetime.utcnow(), "def")
assert generate_message_id(_util.now(), "def")
def test_session_factory():
session = session_factory()
assert session.headers
def test_client_id_factory():
@@ -33,41 +86,105 @@ def test_client_id_factory():
assert client_id_factory()
def test_is_home():
assert not is_home("https://m.facebook.com/login/?...")
assert is_home("https://m.facebook.com/home.php?refsrc=...")
def test_find_form_request():
html = """
<div>
<form action="/checkpoint/?next=https%3A%2F%2Fwww.messenger.com%2F" class="checkpoint" id="u_0_c" method="post" onsubmit="">
<input autocomplete="off" name="jazoest" type="hidden" value="some-number" />
<input autocomplete="off" name="fb_dtsg" type="hidden" value="some-base64" />
<input class="hidden_elem" data-default-submit="true" name="submit[Continue]" type="submit" />
<input autocomplete="off" name="nh" type="hidden" value="some-hex" />
<div class="_4-u2 _5x_7 _p0k _5x_9 _4-u8">
<div class="_2e9n" id="u_0_d">
<strong id="u_0_e">Two factor authentication required</strong>
<div id="u_0_f"></div>
</div>
<div class="_2ph_">
<input autocomplete="off" name="no_fido" type="hidden" value="true" />
<div class="_50f4">You've asked us to require a 6-digit login code when anyone tries to access your account from a new device or browser.</div>
<div class="_3-8y _50f4">Enter the 6-digit code from your Code Generator or 3rd party app below.</div>
<div class="_2pie _2pio">
<span>
<input aria-label="Login code" autocomplete="off" class="inputtext" id="approvals_code" name="approvals_code" placeholder="Login code" tabindex="1" type="text" />
</span>
</div>
</div>
<div class="_5hzs" id="checkpointBottomBar">
<div class="_2s5p">
<button class="_42ft _4jy0 _2kak _4jy4 _4jy1 selected _51sy" id="checkpointSubmitButton" name="submit[Continue]" type="submit" value="Continue">Continue</button>
</div>
<div class="_2s5q">
<div class="_25b6" id="u_0_g">
<a href="#" id="u_0_h" role="button">Need another way to authenticate?</a>
</div>
</div>
</div>
</div>
</form>
</div>
"""
url, data = find_form_request(html)
assert url.startswith("https://www.facebook.com/checkpoint/")
assert {
"jazoest": "some-number",
"fb_dtsg": "some-base64",
"nh": "some-hex",
"no_fido": "true",
"approvals_code": "[missing]",
"submit[Continue]": "Continue",
} == data
def test_find_form_request_error():
with pytest.raises(ParseError, match="Could not find form to submit"):
assert find_form_request("")
with pytest.raises(ParseError, match="Could not find url to submit to"):
assert find_form_request("<form></form>")
@pytest.mark.skip
def test_get_error_data():
html = """<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//WAPFORUM//DTD XHTML Mobile 1.0//EN" "http://www.wapforum.org/DTD/xhtml-mobile10.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
html = """<!DOCTYPE html>
<html lang="da" id="facebook" class="no_js">
<head>
<title>Log in to Facebook | Facebook</title>
<meta name="referrer" content="origin-when-crossorigin" id="meta_referrer" />
<style type="text/css">...</style>
<meta name="description" content="..." />
<link rel="canonical" href="https://www.facebook.com/login/" />
<meta charset="utf-8" />
<title id="pageTitle">Messenger</title>
<meta name="referrer" content="default" id="meta_referrer" />
</head>
<body tabindex="0" class="b c d e f g">
<div class="h"><div id="viewport">...<div id="objects_container"><div class="g" id="root" role="main">
<table class="x" role="presentation"><tbody><tr><td class="y">
<div class="z ba bb" style="" id="login_error">
<div class="bc">
<span>The password you entered is incorrect. <a href="/recover/initiate/?ars=facebook_login_pw_error&amp;email=abc@mail.com&amp;__ccr=XXX" class="bd" aria-label="Have you forgotten your password?">Did you forget your password?</a></span>
<body class="_605a x1 Locale_da_DK" dir="ltr">
<div class="_3v_o" id="XMessengerDotComLoginViewPlaceholder">
<form id="login_form" action="/login/password/" method="post" onsubmit="">
<input type="hidden" name="jazoest" value="2222" autocomplete="off" />
<input type="hidden" name="lsd" value="xyz-abc" autocomplete="off" />
<div class="_3403 _3404">
<div>Type your password again</div>
<div>The password you entered is incorrect. <a href="https://www.facebook.com/recover/initiate?ars=facebook_login_pw_error">Did you forget your password?</a></div>
</div>
<div id="loginform">
<input type="hidden" autocomplete="off" id="initial_request_id" name="initial_request_id" value="xxx" />
<input type="hidden" autocomplete="off" name="timezone" value="" id="u_0_1" />
<input type="hidden" autocomplete="off" name="lgndim" value="" id="u_0_2" />
<input type="hidden" name="lgnrnd" value="aaa" />
<input type="hidden" id="lgnjs" name="lgnjs" value="n" />
<input type="text" class="inputtext _55r1 _43di" id="email" name="email" placeholder="E-mail or phone number" value="some@email.com" tabindex="0" aria-label="E-mail or phone number" />
<input type="password" class="inputtext _55r1 _43di" name="pass" id="pass" tabindex="0" placeholder="Password" aria-label="Password" />
<button value="1" class="_42ft _4jy0 _2m_r _43dh _4jy4 _517h _51sy" id="loginbutton" name="login" tabindex="0" type="submit">Continue</button>
<div class="_43dj">
<div class="uiInputLabel clearfix">
<label class="uiInputLabelInput">
<input type="checkbox" value="1" name="persistent" tabindex="0" class="" id="u_0_0" />
<span class=""></span>
</label>
<label for="u_0_0" class="uiInputLabelLabel">Stay logged in</label>
</div>
<input type="hidden" autocomplete="off" id="default_persistent" name="default_persistent" value="0" />
</div>
</form>
</div>
...
</td></tr></tbody></table>
<div style="display:none"></div><span><img src="https://facebook.com/security/hsts-pixel.gif" width="0" height="0" style="display:none" /></span>
</div></div><div></div></div></div>
</body>
</html>
"""
url = "https://m.facebook.com/login/?email=abc@mail.com&li=XXX&e=1348092"
msg = "The password you entered is incorrect. Did you forget your password?"
assert (1348092, msg) == get_error_data(html)
assert msg == get_error_data(html)

View File

@@ -68,6 +68,17 @@ def test_get_jsmods_require():
}
def test_get_jsmods_require_version_specifier():
data = [
["DimensionTracking@1234"],
["CavalryLoggerImpl@2345", "startInstrumentation", [], []],
]
assert get_jsmods_require(data) == {
"DimensionTracking": [],
"CavalryLoggerImpl.startInstrumentation": [],
}
def test_get_jsmods_require_get_image_url():
data = [
[