Compare commits
20 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
6116bc9ca4 | ||
|
7bf6a9fadc | ||
|
4490360e11 | ||
|
a4dfe0d279 | ||
|
47679d1d3b | ||
|
62e17daf78 | ||
|
1f359f2a72 | ||
|
cebe7a28c0 | ||
|
e614800d5f | ||
|
151a114235 | ||
|
38f66147cb | ||
|
ffa26c20b5 | ||
|
430ada7f84 | ||
|
988e37eb42 | ||
|
1938b90bce | ||
|
f61d1403f3 | ||
|
d228f34f64 | ||
|
97049556ed | ||
|
b64c6a94cc | ||
|
edc655bae7 |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -29,3 +29,7 @@ my_tests.py
|
||||
my_test_data.json
|
||||
my_data.json
|
||||
tests.data
|
||||
.pytest_cache
|
||||
|
||||
# Virtual environment
|
||||
venv/
|
||||
|
35
.travis.yml
Normal file
35
.travis.yml
Normal file
@@ -0,0 +1,35 @@
|
||||
sudo: false
|
||||
language: python
|
||||
python:
|
||||
- 2.7
|
||||
- 3.4
|
||||
- 3.5
|
||||
- 3.6
|
||||
|
||||
env:
|
||||
global:
|
||||
# These two accounts are made specifically for this purpose, and the passwords are really only encrypted for obscurity
|
||||
# In reality, you could probably still login with the accounts by looking at log output from travis-ci.org
|
||||
- client1_email=travis.fbchat1@gmail.com # Facebook ID: 100023782141139
|
||||
- secure: "W1NON6qaLnvYIOVoC93MXkmbAIkUkHcGREBwN0BSVM3cLuMduk4VVkz6PY2T8bnntGYVwicXwcn5aNJ6pDue17TBZqGPk/tdpws8mnAZUtBYhpkIFTTlyh5kJSZejx9fd5s4nceGpH6ofCCnNxPp2PdHKU8piqnQYZVQ4cFNNDE=" # client1_password
|
||||
- client2_email=fbchat.travis2@gmail.com # Facebook ID: 100026538491708
|
||||
- secure: "V7RB3go2Tc/DdW1x9DkMI+vCfnOgiS3ygmFCABs/GjfPZjZL7VLMJgYGlx0cjeeeN+Oxa2GrhczRAKeMdGB6Ss2lGGAVs6cjJ56ODuBHWT6/FNzLjtDkTnjD+Kfh0l8ZOdxTF3MQ6M/9hU6z5ek+XYGr7u+/7wOYZ5L2cK5MaQ0=" # client2_password
|
||||
- group_id=1463789480385605
|
||||
|
||||
before_script:
|
||||
- if [[ "$TRAVIS_PYTHON_VERSION" = "2.7" ]]; then export PYTEST_ADDOPTS='-m ""'; fi; # expensive tests (otherwise disabled in pytest.ini)
|
||||
- if [[ "$TRAVIS_PULL_REQUEST" != false ]]; then export PYTEST_ADDOPTS='-m offline'; fi; # offline tests only
|
||||
|
||||
script: python -m pytest || python -m pytest --lf; # Run failed tests twice
|
||||
|
||||
cache: pip
|
||||
|
||||
deploy:
|
||||
provider: pypi
|
||||
user: madsmtm
|
||||
password:
|
||||
secure: "VA0MLSrwIW/T2KjMwjLZCzrLHw8pJT6tAvb48t7qpBdm8x192hax61pz1TaBZoJvlzyBPFKvluftuclTc7yEFwzXe7Gjqgd/ODKZl/wXDr36hQ7BBOLPZujdwmWLvTzMh3eJZlvkgcLCzrvK3j2oW8cM/+FZeVi/5/FhVuJ4ofs="
|
||||
on:
|
||||
python: 3.6
|
||||
branch: master
|
||||
tags: true
|
@@ -17,7 +17,7 @@ from .client import *
|
||||
|
||||
|
||||
__copyright__ = 'Copyright 2015 - {} by Taehoon Kim'.format(datetime.now().year)
|
||||
__version__ = '1.3.6'
|
||||
__version__ = '1.3.8'
|
||||
__license__ = 'BSD'
|
||||
__author__ = 'Taehoon Kim; Moreels Pieter-Jan; Mads Marquart'
|
||||
__email__ = 'carpedm20@gmail.com'
|
||||
|
@@ -138,8 +138,9 @@ class Client(object):
|
||||
return self._graphql(payload, error_retries=error_retries-1)
|
||||
raise e
|
||||
|
||||
def _cleanGet(self, url, query=None, timeout=30):
|
||||
return self._session.get(url, headers=self._header, params=query, timeout=timeout, verify=self.ssl_verify)
|
||||
def _cleanGet(self, url, query=None, timeout=30, allow_redirects=True):
|
||||
return self._session.get(url, headers=self._header, params=query, timeout=timeout, verify=self.ssl_verify,
|
||||
allow_redirects=allow_redirects)
|
||||
|
||||
def _cleanPost(self, url, query=None, timeout=30):
|
||||
self.req_counter += 1
|
||||
@@ -209,8 +210,18 @@ class Client(object):
|
||||
|
||||
r = self._get(self.req_url.BASE)
|
||||
soup = bs(r.text, "lxml")
|
||||
self.fb_dtsg = soup.find("input", {'name':'fb_dtsg'})['value']
|
||||
self.fb_h = soup.find("input", {'name':'h'})['value']
|
||||
|
||||
fb_dtsg_element = soup.find("input", {'name': 'fb_dtsg'})
|
||||
if fb_dtsg_element:
|
||||
self.fb_dtsg = fb_dtsg_element['value']
|
||||
else:
|
||||
self.fb_dtsg = re.search(r'name="fb_dtsg" value="(.*?)"', r.text).group(1)
|
||||
|
||||
|
||||
fb_h_element = soup.find("input", {'name':'h'})
|
||||
if fb_h_element:
|
||||
self.fb_h = fb_h_element['value']
|
||||
|
||||
for i in self.fb_dtsg:
|
||||
self.ttstamp += str(ord(i))
|
||||
self.ttstamp += '2'
|
||||
@@ -325,8 +336,8 @@ class Client(object):
|
||||
:rtype: bool
|
||||
"""
|
||||
# Send a request to the login url, to see if we're directed to the home page
|
||||
r = self._cleanGet(self.req_url.LOGIN)
|
||||
return 'home' in r.url
|
||||
r = self._cleanGet(self.req_url.LOGIN, allow_redirects=False)
|
||||
return 'Location' in r.headers and 'home' in r.headers['Location']
|
||||
|
||||
def getSession(self):
|
||||
"""Retrieves session cookies
|
||||
@@ -400,6 +411,11 @@ class Client(object):
|
||||
:return: True if the action was successful
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
if not hasattr(self, 'fb_h'):
|
||||
h_r = self._post(self.req_url.MODERN_SETTINGS_MENU, {'pmid': '4'})
|
||||
self.fb_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
|
||||
|
||||
data = {
|
||||
'ref': "mb",
|
||||
'h': self.fb_h
|
||||
@@ -1022,7 +1038,6 @@ class Client(object):
|
||||
:param user_ids: One or more user IDs to add
|
||||
:param thread_id: Group ID to add people to. See :ref:`intro_threads`
|
||||
:type user_ids: list
|
||||
:return: :ref:`Message ID <intro_message_ids>` of the executed action
|
||||
:raises: FBchatException if request failed
|
||||
"""
|
||||
thread_id, thread_type = self._getThread(thread_id, None)
|
||||
@@ -1122,7 +1137,7 @@ class Client(object):
|
||||
thread_id, thread_type = self._getThread(thread_id, None)
|
||||
|
||||
data = {
|
||||
'color_choice': color.value,
|
||||
'color_choice': color.value if color != ThreadColor.MESSENGER_BLUE else '',
|
||||
'thread_or_other_fbid': thread_id
|
||||
}
|
||||
|
||||
@@ -1513,13 +1528,18 @@ class Client(object):
|
||||
self.onInbox(unseen=m["unseen"], unread=m["unread"], recent_unread=m["recent_unread"], msg=m)
|
||||
|
||||
# Typing
|
||||
elif mtype == "typ":
|
||||
elif mtype == "typ" or mtype == "ttyp":
|
||||
author_id = str(m.get("from"))
|
||||
thread_id = str(m.get("to"))
|
||||
if thread_id == self.uid:
|
||||
thread_type = ThreadType.USER
|
||||
else:
|
||||
thread_id = m.get("thread_fbid")
|
||||
if thread_id:
|
||||
thread_type = ThreadType.GROUP
|
||||
thread_id = str(thread_id)
|
||||
else:
|
||||
thread_type = ThreadType.USER
|
||||
if author_id == self.uid:
|
||||
thread_id = m.get("to")
|
||||
else:
|
||||
thread_id = author_id
|
||||
typing_status = TypingStatus(m.get("st"))
|
||||
self.onTyping(author_id=author_id, status=typing_status, thread_id=thread_id, thread_type=thread_type, msg=m)
|
||||
|
||||
|
@@ -190,7 +190,7 @@ def graphql_to_thread(thread):
|
||||
url=user.get('url'),
|
||||
name=user.get('name'),
|
||||
first_name=user.get('short_name'),
|
||||
last_name=user.get('name').split(user.get('short_name'),1)[1].strip(),
|
||||
last_name=user.get('name').split(user.get('short_name'),1).pop().strip(),
|
||||
is_friend=user.get('is_viewer_friend'),
|
||||
gender=GENDERS.get(user.get('gender')),
|
||||
affinity=user.get('affinity'),
|
||||
|
@@ -469,7 +469,7 @@ class EmojiSize(Enum):
|
||||
|
||||
class ThreadColor(Enum):
|
||||
"""Used to specify a thread colors"""
|
||||
MESSENGER_BLUE = ''
|
||||
MESSENGER_BLUE = '#0084ff'
|
||||
VIKING = '#44bec7'
|
||||
GOLDEN_POPPY = '#ffc300'
|
||||
RADICAL_RED = '#fa3c4c'
|
||||
|
@@ -121,6 +121,7 @@ class ReqUrl(object):
|
||||
GRAPHQL = "https://www.facebook.com/api/graphqlbatch/"
|
||||
ATTACHMENT_PHOTO = "https://www.facebook.com/mercury/attachments/photo/"
|
||||
EVENT_REMINDER = "https://www.facebook.com/ajax/eventreminder/create"
|
||||
MODERN_SETTINGS_MENU = "https://www.facebook.com/bluebar/modern_settings_menu/"
|
||||
|
||||
pull_channel = 0
|
||||
|
||||
|
6
pytest.ini
Normal file
6
pytest.ini
Normal file
@@ -0,0 +1,6 @@
|
||||
[pytest]
|
||||
xfail_strict=true
|
||||
markers =
|
||||
offline: Offline tests, aka. tests that can be executed without the need of a client
|
||||
expensive: Expensive tests, which should be executed sparingly
|
||||
addopts = -m "not expensive"
|
@@ -2,3 +2,4 @@ requests
|
||||
lxml
|
||||
beautifulsoup4
|
||||
enum34; python_version < '3.4'
|
||||
six
|
||||
|
261
tests.py
261
tests.py
@@ -1,261 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
import json
|
||||
import logging
|
||||
import unittest
|
||||
from getpass import getpass
|
||||
from sys import argv
|
||||
from os import path, chdir
|
||||
from glob import glob
|
||||
from fbchat import Client
|
||||
from fbchat.models import *
|
||||
import py_compile
|
||||
|
||||
logging_level = logging.ERROR
|
||||
|
||||
"""
|
||||
|
||||
Testing script for `fbchat`.
|
||||
Full documentation on https://fbchat.readthedocs.io/
|
||||
|
||||
"""
|
||||
|
||||
test_sticker_id = '767334476626295'
|
||||
|
||||
class CustomClient(Client):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.got_qprimer = False
|
||||
super(type(self), self).__init__(*args, **kwargs)
|
||||
|
||||
def onQprimer(self, msg, **kwargs):
|
||||
self.got_qprimer = True
|
||||
|
||||
class TestFbchat(unittest.TestCase):
|
||||
def test_examples(self):
|
||||
# Checks for syntax errors in the examples
|
||||
chdir('examples')
|
||||
for f in glob('*.txt'):
|
||||
print(f)
|
||||
with self.assertRaises(py_compile.PyCompileError):
|
||||
py_compile.compile(f)
|
||||
|
||||
chdir('..')
|
||||
|
||||
def test_loginFunctions(self):
|
||||
self.assertTrue(client.isLoggedIn())
|
||||
|
||||
client.logout()
|
||||
|
||||
self.assertFalse(client.isLoggedIn())
|
||||
|
||||
with self.assertRaises(Exception):
|
||||
client.login('<email>', '<password>', max_tries=1)
|
||||
|
||||
client.login(email, password)
|
||||
|
||||
self.assertTrue(client.isLoggedIn())
|
||||
|
||||
def test_sessions(self):
|
||||
global client
|
||||
session_cookies = client.getSession()
|
||||
client = CustomClient(email, password, session_cookies=session_cookies, logging_level=logging_level)
|
||||
|
||||
self.assertTrue(client.isLoggedIn())
|
||||
|
||||
def test_defaultThread(self):
|
||||
# setDefaultThread
|
||||
for thread in threads:
|
||||
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
|
||||
self.assertTrue(client.send(Message(text='test_default_recipient★')))
|
||||
|
||||
# resetDefaultThread
|
||||
client.resetDefaultThread()
|
||||
with self.assertRaises(ValueError):
|
||||
client.send(Message(text='should_not_send'))
|
||||
|
||||
def test_fetchAllUsers(self):
|
||||
users = client.fetchAllUsers()
|
||||
self.assertGreater(len(users), 0)
|
||||
|
||||
def test_searchFor(self):
|
||||
users = client.searchForUsers('Mark Zuckerberg')
|
||||
self.assertGreater(len(users), 0)
|
||||
|
||||
u = users[0]
|
||||
|
||||
# Test if values are set correctly
|
||||
self.assertEqual(u.uid, '4')
|
||||
self.assertEqual(u.type, ThreadType.USER)
|
||||
self.assertEqual(u.photo[:4], 'http')
|
||||
self.assertEqual(u.url[:4], 'http')
|
||||
self.assertEqual(u.name, 'Mark Zuckerberg')
|
||||
|
||||
group_name = client.changeThreadTitle('tést_searchFor', thread_id=group_id, thread_type=ThreadType.GROUP)
|
||||
groups = client.searchForGroups('té')
|
||||
self.assertGreater(len(groups), 0)
|
||||
|
||||
def test_send(self):
|
||||
for thread in threads:
|
||||
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
|
||||
|
||||
self.assertIsNotNone(client.send(Message(emoji_size=EmojiSize.SMALL)))
|
||||
self.assertIsNotNone(client.send(Message(emoji_size=EmojiSize.MEDIUM)))
|
||||
self.assertIsNotNone(client.send(Message(text='😆', emoji_size=EmojiSize.LARGE)))
|
||||
|
||||
self.assertIsNotNone(client.send(Message(text='test_send★')))
|
||||
with self.assertRaises(FBchatFacebookError):
|
||||
self.assertIsNotNone(client.send(Message(text='test_send_should_fail★'), thread_id=thread['id'], thread_type=(ThreadType.GROUP if thread['type'] == ThreadType.USER else ThreadType.USER)))
|
||||
|
||||
self.assertIsNotNone(client.send(Message(text='Hi there @user', mentions=[Mention(user_id, offset=9, length=5)])))
|
||||
self.assertIsNotNone(client.send(Message(text='Hi there @group', mentions=[Mention(group_id, offset=9, length=6)])))
|
||||
|
||||
self.assertIsNotNone(client.send(Message(sticker=Sticker(test_sticker_id))))
|
||||
|
||||
def test_sendImages(self):
|
||||
image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png'
|
||||
image_local_url = path.join(path.dirname(__file__), 'tests/image.png')
|
||||
for thread in threads:
|
||||
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
|
||||
mentions = [Mention(thread['id'], offset=26, length=4)]
|
||||
self.assertTrue(client.sendRemoteImage(image_url, Message(text='test_send_image_remote_to_@you★', mentions=mentions)))
|
||||
self.assertTrue(client.sendLocalImage(image_local_url, Message(text='test_send_image_local_to__@you★', mentions=mentions)))
|
||||
|
||||
def test_fetchThreadList(self):
|
||||
threads = client.fetchThreadList(limit=2)
|
||||
self.assertEqual(len(threads), 2)
|
||||
|
||||
def test_fetchThreadMessages(self):
|
||||
for thread in threads:
|
||||
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
|
||||
client.send(Message(text='test_getThreadInfo★'))
|
||||
|
||||
messages = client.fetchThreadMessages(limit=1)
|
||||
self.assertEqual(messages[0].author, client.uid)
|
||||
self.assertEqual(messages[0].text, 'test_getThreadInfo★')
|
||||
|
||||
def test_listen(self):
|
||||
client.startListening()
|
||||
client.doOneListen()
|
||||
client.stopListening()
|
||||
|
||||
self.assertTrue(client.got_qprimer)
|
||||
|
||||
def test_fetchInfo(self):
|
||||
info = client.fetchUserInfo('4')['4']
|
||||
self.assertEqual(info.name, 'Mark Zuckerberg')
|
||||
|
||||
info = client.fetchGroupInfo(group_id)[group_id]
|
||||
self.assertEqual(info.type, ThreadType.GROUP)
|
||||
|
||||
def test_removeAddFromGroup(self):
|
||||
client.removeUserFromGroup(user_id, thread_id=group_id)
|
||||
client.addUsersToGroup(user_id, thread_id=group_id)
|
||||
|
||||
def test_changeThreadTitle(self):
|
||||
for thread in threads:
|
||||
client.changeThreadTitle('test_changeThreadTitle★', thread_id=thread['id'], thread_type=thread['type'])
|
||||
|
||||
def test_changeNickname(self):
|
||||
for thread in threads:
|
||||
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
|
||||
client.changeNickname('test_changeNicknameSelf★', client.uid)
|
||||
client.changeNickname('test_changeNicknameOther★', user_id)
|
||||
|
||||
def test_changeThreadEmoji(self):
|
||||
for thread in threads:
|
||||
client.changeThreadEmoji('😀', thread_id=thread['id'])
|
||||
client.changeThreadEmoji('😀', thread_id=thread['id'])
|
||||
|
||||
def test_changeThreadColor(self):
|
||||
for thread in threads:
|
||||
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
|
||||
client.changeThreadColor(ThreadColor.BRILLIANT_ROSE)
|
||||
client.changeThreadColor(ThreadColor.MESSENGER_BLUE)
|
||||
|
||||
def test_reactToMessage(self):
|
||||
for thread in threads:
|
||||
mid = client.send(Message(text='test_reactToMessage★'), thread_id=thread['id'], thread_type=thread['type'])
|
||||
client.reactToMessage(mid, MessageReaction.LOVE)
|
||||
|
||||
def test_setTypingStatus(self):
|
||||
for thread in threads:
|
||||
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
|
||||
client.setTypingStatus(TypingStatus.TYPING)
|
||||
client.setTypingStatus(TypingStatus.STOPPED)
|
||||
|
||||
|
||||
def start_test(param_client, param_group_id, param_user_id, param_threads, tests=[]):
|
||||
global client
|
||||
global group_id
|
||||
global user_id
|
||||
global threads
|
||||
|
||||
client = param_client
|
||||
group_id = param_group_id
|
||||
user_id = param_user_id
|
||||
threads = param_threads
|
||||
|
||||
tests = ['test_' + test if 'test_' != test[:5] else test for test in tests]
|
||||
|
||||
if len(tests) == 0:
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestFbchat)
|
||||
else:
|
||||
suite = unittest.TestSuite(map(TestFbchat, tests))
|
||||
print('Starting test(s)')
|
||||
unittest.TextTestRunner(verbosity=2).run(suite)
|
||||
|
||||
|
||||
client = None
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Python 3 does not use raw_input, whereas Python 2 does
|
||||
try:
|
||||
input = raw_input
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
try:
|
||||
with open(path.join(path.dirname(__file__), 'tests/my_data.json'), 'r') as f:
|
||||
j = json.load(f)
|
||||
email = j['email']
|
||||
password = j['password']
|
||||
user_id = j['user_thread_id']
|
||||
group_id = j['group_thread_id']
|
||||
session = j.get('session')
|
||||
except (IOError, IndexError) as e:
|
||||
email = input('Email: ')
|
||||
password = getpass()
|
||||
group_id = input('Please enter a group thread id (To test group functionality): ')
|
||||
user_id = input('Please enter a user thread id (To test kicking/adding functionality): ')
|
||||
threads = [
|
||||
{
|
||||
'id': user_id,
|
||||
'type': ThreadType.USER
|
||||
},
|
||||
{
|
||||
'id': group_id,
|
||||
'type': ThreadType.GROUP
|
||||
}
|
||||
]
|
||||
|
||||
print('Logging in...')
|
||||
client = CustomClient(email, password, logging_level=logging_level, session_cookies=session)
|
||||
|
||||
# Warning! Taking user input directly like this could be dangerous! Use only for testing purposes!
|
||||
start_test(client, group_id, user_id, threads, argv[1:])
|
||||
|
||||
with open(path.join(path.dirname(__file__), 'tests/my_data.json'), 'w') as f:
|
||||
session = None
|
||||
try:
|
||||
session = client.getSession()
|
||||
except Exception:
|
||||
print('Unable to fetch client session!')
|
||||
json.dump({
|
||||
'email': email,
|
||||
'password': password,
|
||||
'user_thread_id': user_id,
|
||||
'group_thread_id': group_id,
|
||||
'session': session
|
||||
}, f)
|
101
tests/conftest.py
Normal file
101
tests/conftest.py
Normal file
@@ -0,0 +1,101 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
import json
|
||||
|
||||
from utils import *
|
||||
from contextlib import contextmanager
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def user(client2):
|
||||
return {"id": client2.uid, "type": ThreadType.USER}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def group(pytestconfig):
|
||||
return {"id": load_variable("group_id", pytestconfig.cache), "type": ThreadType.GROUP}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=["user", "group"])
|
||||
def thread(request, user, group):
|
||||
return user if request.param == "user" else group
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client1(pytestconfig):
|
||||
with load_client(1, pytestconfig.cache) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client2(pytestconfig):
|
||||
with load_client(2, pytestconfig.cache) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture # (scope="session")
|
||||
def client(client1, thread):
|
||||
client1.setDefaultThread(thread["id"], thread["type"])
|
||||
yield client1
|
||||
client1.resetDefaultThread()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=["client1", "client2"])
|
||||
def client_all(request, client1, client2):
|
||||
return client1 if request.param == "client1" else client2
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def catch_event(client2):
|
||||
t = ClientThread(client2)
|
||||
t.start()
|
||||
|
||||
@contextmanager
|
||||
def inner(method_name):
|
||||
caught = CaughtValue()
|
||||
old_method = getattr(client2, method_name)
|
||||
|
||||
# Will be called by the other thread
|
||||
def catch_value(*args, **kwargs):
|
||||
old_method(*args, **kwargs)
|
||||
# Make sure the `set` is only called once
|
||||
if not caught.is_set():
|
||||
caught.set(kwargs)
|
||||
|
||||
setattr(client2, method_name, catch_value)
|
||||
yield caught
|
||||
caught.wait()
|
||||
if not caught.is_set():
|
||||
raise ValueError("The value could not be caught")
|
||||
setattr(client2, method_name, old_method)
|
||||
|
||||
yield inner
|
||||
|
||||
t.should_stop.set()
|
||||
|
||||
try:
|
||||
# Make the client send a messages to itself, so the blocking pull request will return
|
||||
# This is probably not safe, since the client is making two requests simultaneously
|
||||
client2.sendMessage("Shutdown", client2.uid)
|
||||
finally:
|
||||
t.join()
|
||||
|
||||
|
||||
@pytest.fixture # (scope="session")
|
||||
def compare(client, thread):
|
||||
def inner(caught_event, **kwargs):
|
||||
d = {
|
||||
"author_id": client.uid,
|
||||
"thread_id": client.uid
|
||||
if thread["type"] == ThreadType.USER
|
||||
else thread["id"],
|
||||
"thread_type": thread["type"],
|
||||
}
|
||||
d.update(kwargs)
|
||||
return subset(caught_event.res, **d)
|
||||
|
||||
return inner
|
55
tests/test_base.py
Normal file
55
tests/test_base.py
Normal file
@@ -0,0 +1,55 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
import py_compile
|
||||
|
||||
from glob import glob
|
||||
from os import path, environ
|
||||
from fbchat import Client
|
||||
from fbchat.models import FBchatUserError, Message
|
||||
|
||||
|
||||
@pytest.mark.offline
|
||||
def test_examples():
|
||||
# Compiles the examples, to check for syntax errors
|
||||
for name in glob(path.join(path.dirname(__file__), "../examples", "*.py")):
|
||||
py_compile.compile(name)
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
@pytest.mark.expensive
|
||||
def test_login(client1):
|
||||
assert client1.isLoggedIn()
|
||||
email = client1.email
|
||||
password = client1.password
|
||||
|
||||
client1.logout()
|
||||
|
||||
assert not client1.isLoggedIn()
|
||||
|
||||
with pytest.raises(FBchatUserError):
|
||||
client1.login("<invalid email>", "<invalid password>", max_tries=1)
|
||||
|
||||
client1.login(email, password)
|
||||
|
||||
assert client1.isLoggedIn()
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
def test_sessions(client1):
|
||||
session = client1.getSession()
|
||||
Client("no email needed", "no password needed", session_cookies=session)
|
||||
client1.setSession(session)
|
||||
assert client1.isLoggedIn()
|
||||
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
def test_default_thread(client1, thread):
|
||||
client1.setDefaultThread(thread["id"], thread["type"])
|
||||
assert client1.send(Message(text="Sent to the specified thread"))
|
||||
|
||||
client1.resetDefaultThread()
|
||||
with pytest.raises(ValueError):
|
||||
client1.send(Message(text="Should not be sent"))
|
79
tests/test_fetch.py
Normal file
79
tests/test_fetch.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from os import path
|
||||
from fbchat.models import ThreadType, Message, Mention, EmojiSize, Sticker
|
||||
from utils import subset
|
||||
|
||||
|
||||
def test_fetch_all_users(client):
|
||||
users = client.fetchAllUsers()
|
||||
assert len(users) > 0
|
||||
|
||||
|
||||
def test_fetch_thread_list(client):
|
||||
threads = client.fetchThreadList(limit=2)
|
||||
assert len(threads) == 2
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"emoji, emoji_size",
|
||||
[
|
||||
("😆", EmojiSize.SMALL),
|
||||
("😆", EmojiSize.MEDIUM),
|
||||
("😆", EmojiSize.LARGE),
|
||||
# These fail because the emoji is made into a sticker
|
||||
# This should be fixed
|
||||
pytest.mark.xfail((None, EmojiSize.SMALL)),
|
||||
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
|
||||
pytest.mark.xfail((None, EmojiSize.LARGE)),
|
||||
],
|
||||
)
|
||||
def test_fetch_message_emoji(client, emoji, emoji_size):
|
||||
mid = client.sendEmoji(emoji, emoji_size)
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(
|
||||
vars(message), uid=mid, author=client.uid, text=emoji, emoji_size=emoji_size
|
||||
)
|
||||
|
||||
|
||||
def test_fetch_message_mentions(client):
|
||||
text = "This is a test of fetchThreadMessages"
|
||||
mentions = [Mention(client.uid, offset=10, length=4)]
|
||||
|
||||
mid = client.send(Message(text, mentions=mentions))
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(vars(message), uid=mid, author=client.uid, text=text)
|
||||
for i, m in enumerate(mentions):
|
||||
assert vars(message.mentions[i]) == vars(m)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sticker_id", ["767334476626295"])
|
||||
def test_fetch_message_sticker(client, sticker_id):
|
||||
mid = client.send(Message(sticker=Sticker(sticker_id)))
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(vars(message), uid=mid, author=client.uid)
|
||||
assert subset(vars(message.sticker), uid=sticker_id)
|
||||
|
||||
|
||||
def test_fetch_info(client1, group):
|
||||
info = client1.fetchUserInfo("4")["4"]
|
||||
assert info.name == "Mark Zuckerberg"
|
||||
|
||||
info = client1.fetchGroupInfo(group["id"])[group["id"]]
|
||||
assert info.type == ThreadType.GROUP
|
||||
|
||||
|
||||
def test_fetch_image_url(client):
|
||||
url = path.join(path.dirname(__file__), "image.png")
|
||||
|
||||
client.sendLocalImage(url)
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert client.fetchImageUrl(message.attachments[0].uid)
|
12
tests/test_message_management.py
Normal file
12
tests/test_message_management.py
Normal file
@@ -0,0 +1,12 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from fbchat.models import Message, MessageReaction
|
||||
|
||||
|
||||
def test_set_reaction(client):
|
||||
mid = client.send(Message(text="This message will be reacted to"))
|
||||
client.reactToMessage(mid, MessageReaction.LOVE)
|
18
tests/test_search.py
Normal file
18
tests/test_search.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
|
||||
def test_search_for(client1):
|
||||
users = client1.searchForUsers("Mark Zuckerberg")
|
||||
assert len(users) > 0
|
||||
|
||||
u = users[0]
|
||||
|
||||
assert u.uid == "4"
|
||||
assert u.type == ThreadType.USER
|
||||
assert u.photo[:4] == "http"
|
||||
assert u.url[:4] == "http"
|
||||
assert u.name == "Mark Zuckerberg"
|
110
tests/test_send.py
Normal file
110
tests/test_send.py
Normal file
@@ -0,0 +1,110 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from os import path
|
||||
from fbchat.models import Message, Mention, EmojiSize, FBchatFacebookError, Sticker
|
||||
from utils import subset
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"text",
|
||||
[
|
||||
"test_send",
|
||||
"😆",
|
||||
"\\\n\t%?&'\"",
|
||||
"ˁҭʚ¹Ʋջوװ՞ޱɣࠚԹБɑȑңКએ֭ʗыԈٌʼőԈ×௴nચϚࠖణٔє܅Ԇޑط",
|
||||
"a" * 20000, # Maximum amount of characters you can send
|
||||
],
|
||||
)
|
||||
def test_send_text(client, catch_event, compare, text):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.sendMessage(text)
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"emoji, emoji_size",
|
||||
[
|
||||
("😆", EmojiSize.SMALL),
|
||||
("😆", EmojiSize.MEDIUM),
|
||||
("😆", EmojiSize.LARGE),
|
||||
# These fail because the emoji is made into a sticker
|
||||
# This should be fixed
|
||||
pytest.mark.xfail((None, EmojiSize.SMALL)),
|
||||
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
|
||||
pytest.mark.xfail((None, EmojiSize.LARGE)),
|
||||
],
|
||||
)
|
||||
def test_send_emoji(client, catch_event, compare, emoji, emoji_size):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.sendEmoji(emoji, emoji_size)
|
||||
|
||||
assert compare(x, mid=mid, message=emoji)
|
||||
assert subset(
|
||||
vars(x.res["message_object"]),
|
||||
uid=mid,
|
||||
author=client.uid,
|
||||
text=emoji,
|
||||
emoji_size=emoji_size,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(raises=FBchatFacebookError)
|
||||
@pytest.mark.parametrize("message", [Message("a" * 20001)])
|
||||
def test_send_invalid(client, message):
|
||||
client.send(message)
|
||||
|
||||
|
||||
def test_send_mentions(client, client2, thread, catch_event, compare):
|
||||
text = "Hi there @me, @other and @thread"
|
||||
mentions = [
|
||||
dict(thread_id=client.uid, offset=9, length=3),
|
||||
dict(thread_id=client2.uid, offset=14, length=6),
|
||||
dict(thread_id=thread["id"], offset=26, length=7),
|
||||
]
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.send(Message(text, mentions=[Mention(**d) for d in mentions]))
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
# The mentions are not ordered by offset
|
||||
for m in x.res["message_object"].mentions:
|
||||
assert vars(m) in mentions
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sticker_id",
|
||||
["767334476626295", pytest.mark.xfail("0", raises=FBchatFacebookError)],
|
||||
)
|
||||
def test_send_sticker(client, catch_event, compare, sticker_id):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.send(Message(sticker=Sticker(sticker_id)))
|
||||
|
||||
assert compare(x, mid=mid)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid)
|
||||
assert subset(vars(x.res["message_object"].sticker), uid=sticker_id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"method_name, url",
|
||||
[
|
||||
(
|
||||
"sendRemoteImage",
|
||||
"https://github.com/carpedm20/fbchat/raw/master/tests/image.png",
|
||||
),
|
||||
("sendLocalImage", path.join(path.dirname(__file__), "image.png")),
|
||||
],
|
||||
)
|
||||
def test_send_images(client, catch_event, compare, method_name, url):
|
||||
text = "An image sent with {}".format(method_name)
|
||||
with catch_event("onMessage") as x:
|
||||
mid = getattr(client, method_name)(url, Message(text))
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
assert x.res["message_object"].attachments[0]
|
12
tests/test_tests.py
Normal file
12
tests/test_tests.py
Normal file
@@ -0,0 +1,12 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_catch_event(client2, catch_event):
|
||||
mid = "test"
|
||||
with catch_event("onMessage") as x:
|
||||
client2.onMessage(mid=mid)
|
||||
assert x.res['mid'] == mid
|
100
tests/test_thread_interraction.py
Normal file
100
tests/test_thread_interraction.py
Normal file
@@ -0,0 +1,100 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from fbchat.models import (
|
||||
Message,
|
||||
ThreadType,
|
||||
FBchatFacebookError,
|
||||
TypingStatus,
|
||||
ThreadColor,
|
||||
)
|
||||
from utils import random_hex, subset
|
||||
from os import environ
|
||||
|
||||
|
||||
def test_remove_from_and_add_to_group(client1, client2, group, catch_event):
|
||||
# Test both methods, while ensuring that the user gets added to the group
|
||||
try:
|
||||
with catch_event("onPersonRemoved") as x:
|
||||
client1.removeUserFromGroup(client2.uid, group["id"])
|
||||
assert subset(
|
||||
x.res, removed_id=client2.uid, author_id=client1.uid, thread_id=group["id"]
|
||||
)
|
||||
finally:
|
||||
with catch_event("onPeopleAdded") as x:
|
||||
client1.addUsersToGroup(client2.uid, group["id"])
|
||||
assert subset(
|
||||
x.res, added_ids=[client2.uid], author_id=client1.uid, thread_id=group["id"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
raises=FBchatFacebookError, reason="Apparently changeThreadTitle is broken"
|
||||
)
|
||||
def test_change_title(client1, catch_event, group):
|
||||
title = random_hex()
|
||||
with catch_event("onTitleChange") as x:
|
||||
mid = client1.changeThreadTitle(title, group["id"])
|
||||
assert subset(
|
||||
x.res,
|
||||
mid=mid,
|
||||
author_id=client1.uid,
|
||||
new_title=title,
|
||||
thread_id=group["id"],
|
||||
thread_type=ThreadType.GROUP,
|
||||
)
|
||||
|
||||
|
||||
def test_change_nickname(client, client_all, catch_event, compare):
|
||||
nickname = random_hex()
|
||||
with catch_event("onNicknameChange") as x:
|
||||
client.changeNickname(nickname, client_all.uid)
|
||||
assert compare(x, changed_for=client_all.uid, new_nickname=nickname)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("emoji", ["😀", "😂", "😕", "😍"])
|
||||
def test_change_emoji(client, catch_event, compare, emoji):
|
||||
with catch_event("onEmojiChange") as x:
|
||||
client.changeThreadEmoji(emoji)
|
||||
assert compare(x, new_emoji=emoji)
|
||||
|
||||
|
||||
@pytest.mark.xfail(raises=FBchatFacebookError)
|
||||
@pytest.mark.parametrize("emoji", ["🙃", "not an emoji"])
|
||||
def test_change_emoji_invalid(client, emoji):
|
||||
client.changeThreadEmoji(emoji)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"color",
|
||||
[
|
||||
x
|
||||
if x in [ThreadColor.MESSENGER_BLUE, ThreadColor.PUMPKIN]
|
||||
else pytest.mark.expensive(x)
|
||||
for x in ThreadColor
|
||||
],
|
||||
)
|
||||
def test_change_color(client, catch_event, compare, color):
|
||||
with catch_event("onColorChange") as x:
|
||||
client.changeThreadColor(color)
|
||||
assert compare(x, new_color=color)
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
raises=FBchatFacebookError, strict=False, reason="Should fail, but doesn't"
|
||||
)
|
||||
def test_change_color_invalid(client):
|
||||
class InvalidColor:
|
||||
value = "#0077ff"
|
||||
|
||||
client.changeThreadColor(InvalidColor())
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", TypingStatus)
|
||||
def test_typing_status(client, catch_event, compare, status):
|
||||
with catch_event("onTyping") as x:
|
||||
client.setTypingStatus(status)
|
||||
assert compare(x, status=status)
|
83
tests/utils.py
Normal file
83
tests/utils.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import threading
|
||||
import logging
|
||||
import six
|
||||
|
||||
from os import environ
|
||||
from random import randrange
|
||||
from contextlib import contextmanager
|
||||
from six import viewitems
|
||||
from fbchat import Client
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
log = logging.getLogger("fbchat.tests").addHandler(logging.NullHandler())
|
||||
|
||||
|
||||
class ClientThread(threading.Thread):
|
||||
def __init__(self, client, *args, **kwargs):
|
||||
self.client = client
|
||||
self.should_stop = threading.Event()
|
||||
super(ClientThread, self).__init__(*args, **kwargs)
|
||||
|
||||
def start(self):
|
||||
self.client.startListening()
|
||||
self.client.doOneListen() # QPrimer, Facebook now knows we're about to start pulling
|
||||
super(ClientThread, self).start()
|
||||
|
||||
def run(self):
|
||||
while not self.should_stop.is_set() and self.client.doOneListen():
|
||||
pass
|
||||
|
||||
self.client.stopListening()
|
||||
|
||||
|
||||
if six.PY2:
|
||||
event_class = threading._Event
|
||||
else:
|
||||
event_class = threading.Event
|
||||
|
||||
|
||||
class CaughtValue(event_class):
|
||||
def set(self, res):
|
||||
self.res = res
|
||||
super(CaughtValue, self).set()
|
||||
|
||||
def wait(self, timeout=3):
|
||||
super(CaughtValue, self).wait(timeout=timeout)
|
||||
|
||||
|
||||
def random_hex(length=20):
|
||||
return "{:X}".format(randrange(16 ** length))
|
||||
|
||||
|
||||
def subset(a, **b):
|
||||
print(a)
|
||||
print(b)
|
||||
return viewitems(b) <= viewitems(a)
|
||||
|
||||
|
||||
def load_variable(name, cache):
|
||||
var = environ.get(name, None)
|
||||
if var is not None:
|
||||
if cache.get(name, None) != var:
|
||||
cache.set(name, var)
|
||||
return var
|
||||
|
||||
var = cache.get(name, None)
|
||||
if var is None:
|
||||
raise ValueError("Variable {!r} neither in environment nor cache".format(name))
|
||||
return var
|
||||
|
||||
|
||||
@contextmanager
|
||||
def load_client(n, cache):
|
||||
client = Client(
|
||||
load_variable("client{}_email".format(n), cache),
|
||||
load_variable("client{}_password".format(n), cache),
|
||||
session_cookies=cache.get("client{}_session".format(n), None),
|
||||
)
|
||||
yield client
|
||||
cache.set("client{}_session".format(n), client.getSession())
|
Reference in New Issue
Block a user