TravisCI integration and updated test suite (#296)
* Make TravisCI setup * Use pytest, move tests to seperate files * Added system to check if `onX` events were successfully executed
This commit is contained in:
101
tests/conftest.py
Normal file
101
tests/conftest.py
Normal file
@@ -0,0 +1,101 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
import json
|
||||
|
||||
from utils import *
|
||||
from contextlib import contextmanager
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def user(client2):
|
||||
return {"id": client2.uid, "type": ThreadType.USER}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def group(pytestconfig):
|
||||
return {"id": load_variable("group_id", pytestconfig.cache), "type": ThreadType.GROUP}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=["user", "group"])
|
||||
def thread(request, user, group):
|
||||
return user if request.param == "user" else group
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client1(pytestconfig):
|
||||
with load_client(1, pytestconfig.cache) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client2(pytestconfig):
|
||||
with load_client(2, pytestconfig.cache) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture # (scope="session")
|
||||
def client(client1, thread):
|
||||
client1.setDefaultThread(thread["id"], thread["type"])
|
||||
yield client1
|
||||
client1.resetDefaultThread()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=["client1", "client2"])
|
||||
def client_all(request, client1, client2):
|
||||
return client1 if request.param == "client1" else client2
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def catch_event(client2):
|
||||
t = ClientThread(client2)
|
||||
t.start()
|
||||
|
||||
@contextmanager
|
||||
def inner(method_name):
|
||||
caught = CaughtValue()
|
||||
old_method = getattr(client2, method_name)
|
||||
|
||||
# Will be called by the other thread
|
||||
def catch_value(*args, **kwargs):
|
||||
old_method(*args, **kwargs)
|
||||
# Make sure the `set` is only called once
|
||||
if not caught.is_set():
|
||||
caught.set(kwargs)
|
||||
|
||||
setattr(client2, method_name, catch_value)
|
||||
yield caught
|
||||
caught.wait()
|
||||
if not caught.is_set():
|
||||
raise ValueError("The value could not be caught")
|
||||
setattr(client2, method_name, old_method)
|
||||
|
||||
yield inner
|
||||
|
||||
t.should_stop.set()
|
||||
|
||||
try:
|
||||
# Make the client send a messages to itself, so the blocking pull request will return
|
||||
# This is probably not safe, since the client is making two requests simultaneously
|
||||
client2.sendMessage("Shutdown", client2.uid)
|
||||
finally:
|
||||
t.join()
|
||||
|
||||
|
||||
@pytest.fixture # (scope="session")
|
||||
def compare(client, thread):
|
||||
def inner(caught_event, **kwargs):
|
||||
d = {
|
||||
"author_id": client.uid,
|
||||
"thread_id": client.uid
|
||||
if thread["type"] == ThreadType.USER
|
||||
else thread["id"],
|
||||
"thread_type": thread["type"],
|
||||
}
|
||||
d.update(kwargs)
|
||||
return subset(caught_event.res, **d)
|
||||
|
||||
return inner
|
55
tests/test_base.py
Normal file
55
tests/test_base.py
Normal file
@@ -0,0 +1,55 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
import py_compile
|
||||
|
||||
from glob import glob
|
||||
from os import path, environ
|
||||
from fbchat import Client
|
||||
from fbchat.models import FBchatUserError, Message
|
||||
|
||||
|
||||
@pytest.mark.offline
|
||||
def test_examples():
|
||||
# Compiles the examples, to check for syntax errors
|
||||
for name in glob(path.join(path.dirname(__file__), "../examples", "*.py")):
|
||||
py_compile.compile(name)
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
@pytest.mark.expensive
|
||||
def test_login(client1):
|
||||
assert client1.isLoggedIn()
|
||||
email = client1.email
|
||||
password = client1.password
|
||||
|
||||
client1.logout()
|
||||
|
||||
assert not client1.isLoggedIn()
|
||||
|
||||
with pytest.raises(FBchatUserError):
|
||||
client1.login("<invalid email>", "<invalid password>", max_tries=1)
|
||||
|
||||
client1.login(email, password)
|
||||
|
||||
assert client1.isLoggedIn()
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
def test_sessions(client1):
|
||||
session = client1.getSession()
|
||||
Client("no email needed", "no password needed", session_cookies=session)
|
||||
client1.setSession(session)
|
||||
assert client1.isLoggedIn()
|
||||
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
def test_default_thread(client1, thread):
|
||||
client1.setDefaultThread(thread["id"], thread["type"])
|
||||
assert client1.send(Message(text="Sent to the specified thread"))
|
||||
|
||||
client1.resetDefaultThread()
|
||||
with pytest.raises(ValueError):
|
||||
client1.send(Message(text="Should not be sent"))
|
79
tests/test_fetch.py
Normal file
79
tests/test_fetch.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from os import path
|
||||
from fbchat.models import ThreadType, Message, Mention, EmojiSize, Sticker
|
||||
from utils import subset
|
||||
|
||||
|
||||
def test_fetch_all_users(client):
|
||||
users = client.fetchAllUsers()
|
||||
assert len(users) > 0
|
||||
|
||||
|
||||
def test_fetch_thread_list(client):
|
||||
threads = client.fetchThreadList(limit=2)
|
||||
assert len(threads) == 2
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"emoji, emoji_size",
|
||||
[
|
||||
("😆", EmojiSize.SMALL),
|
||||
("😆", EmojiSize.MEDIUM),
|
||||
("😆", EmojiSize.LARGE),
|
||||
# These fail because the emoji is made into a sticker
|
||||
# This should be fixed
|
||||
pytest.mark.xfail((None, EmojiSize.SMALL)),
|
||||
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
|
||||
pytest.mark.xfail((None, EmojiSize.LARGE)),
|
||||
],
|
||||
)
|
||||
def test_fetch_message_emoji(client, emoji, emoji_size):
|
||||
mid = client.sendEmoji(emoji, emoji_size)
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(
|
||||
vars(message), uid=mid, author=client.uid, text=emoji, emoji_size=emoji_size
|
||||
)
|
||||
|
||||
|
||||
def test_fetch_message_mentions(client):
|
||||
text = "This is a test of fetchThreadMessages"
|
||||
mentions = [Mention(client.uid, offset=10, length=4)]
|
||||
|
||||
mid = client.send(Message(text, mentions=mentions))
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(vars(message), uid=mid, author=client.uid, text=text)
|
||||
for i, m in enumerate(mentions):
|
||||
assert vars(message.mentions[i]) == vars(m)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sticker_id", ["767334476626295"])
|
||||
def test_fetch_message_sticker(client, sticker_id):
|
||||
mid = client.send(Message(sticker=Sticker(sticker_id)))
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(vars(message), uid=mid, author=client.uid)
|
||||
assert subset(vars(message.sticker), uid=sticker_id)
|
||||
|
||||
|
||||
def test_fetch_info(client1, group):
|
||||
info = client1.fetchUserInfo("4")["4"]
|
||||
assert info.name == "Mark Zuckerberg"
|
||||
|
||||
info = client1.fetchGroupInfo(group["id"])[group["id"]]
|
||||
assert info.type == ThreadType.GROUP
|
||||
|
||||
|
||||
def test_fetch_image_url(client):
|
||||
url = path.join(path.dirname(__file__), "image.png")
|
||||
|
||||
client.sendLocalImage(url)
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert client.fetchImageUrl(message.attachments[0].uid)
|
12
tests/test_message_management.py
Normal file
12
tests/test_message_management.py
Normal file
@@ -0,0 +1,12 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from fbchat.models import Message, MessageReaction
|
||||
|
||||
|
||||
def test_set_reaction(client):
|
||||
mid = client.send(Message(text="This message will be reacted to"))
|
||||
client.reactToMessage(mid, MessageReaction.LOVE)
|
18
tests/test_search.py
Normal file
18
tests/test_search.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
|
||||
def test_search_for(client1):
|
||||
users = client1.searchForUsers("Mark Zuckerberg")
|
||||
assert len(users) > 0
|
||||
|
||||
u = users[0]
|
||||
|
||||
assert u.uid == "4"
|
||||
assert u.type == ThreadType.USER
|
||||
assert u.photo[:4] == "http"
|
||||
assert u.url[:4] == "http"
|
||||
assert u.name == "Mark Zuckerberg"
|
110
tests/test_send.py
Normal file
110
tests/test_send.py
Normal file
@@ -0,0 +1,110 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from os import path
|
||||
from fbchat.models import Message, Mention, EmojiSize, FBchatFacebookError, Sticker
|
||||
from utils import subset
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"text",
|
||||
[
|
||||
"test_send",
|
||||
"😆",
|
||||
"\\\n\t%?&'\"",
|
||||
"ˁҭʚ¹Ʋջوװ՞ޱɣࠚԹБɑȑңКએ֭ʗыԈٌʼőԈ×௴nચϚࠖణٔє܅Ԇޑط",
|
||||
"a" * 20000, # Maximum amount of characters you can send
|
||||
],
|
||||
)
|
||||
def test_send_text(client, catch_event, compare, text):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.sendMessage(text)
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"emoji, emoji_size",
|
||||
[
|
||||
("😆", EmojiSize.SMALL),
|
||||
("😆", EmojiSize.MEDIUM),
|
||||
("😆", EmojiSize.LARGE),
|
||||
# These fail because the emoji is made into a sticker
|
||||
# This should be fixed
|
||||
pytest.mark.xfail((None, EmojiSize.SMALL)),
|
||||
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
|
||||
pytest.mark.xfail((None, EmojiSize.LARGE)),
|
||||
],
|
||||
)
|
||||
def test_send_emoji(client, catch_event, compare, emoji, emoji_size):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.sendEmoji(emoji, emoji_size)
|
||||
|
||||
assert compare(x, mid=mid, message=emoji)
|
||||
assert subset(
|
||||
vars(x.res["message_object"]),
|
||||
uid=mid,
|
||||
author=client.uid,
|
||||
text=emoji,
|
||||
emoji_size=emoji_size,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(raises=FBchatFacebookError)
|
||||
@pytest.mark.parametrize("message", [Message("a" * 20001)])
|
||||
def test_send_invalid(client, message):
|
||||
client.send(message)
|
||||
|
||||
|
||||
def test_send_mentions(client, client2, thread, catch_event, compare):
|
||||
text = "Hi there @me, @other and @thread"
|
||||
mentions = [
|
||||
dict(thread_id=client.uid, offset=9, length=3),
|
||||
dict(thread_id=client2.uid, offset=14, length=6),
|
||||
dict(thread_id=thread["id"], offset=26, length=7),
|
||||
]
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.send(Message(text, mentions=[Mention(**d) for d in mentions]))
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
# The mentions are not ordered by offset
|
||||
for m in x.res["message_object"].mentions:
|
||||
assert vars(m) in mentions
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sticker_id",
|
||||
["767334476626295", pytest.mark.xfail("0", raises=FBchatFacebookError)],
|
||||
)
|
||||
def test_send_sticker(client, catch_event, compare, sticker_id):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.send(Message(sticker=Sticker(sticker_id)))
|
||||
|
||||
assert compare(x, mid=mid)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid)
|
||||
assert subset(vars(x.res["message_object"].sticker), uid=sticker_id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"method_name, url",
|
||||
[
|
||||
(
|
||||
"sendRemoteImage",
|
||||
"https://github.com/carpedm20/fbchat/raw/master/tests/image.png",
|
||||
),
|
||||
("sendLocalImage", path.join(path.dirname(__file__), "image.png")),
|
||||
],
|
||||
)
|
||||
def test_send_images(client, catch_event, compare, method_name, url):
|
||||
text = "An image sent with {}".format(method_name)
|
||||
with catch_event("onMessage") as x:
|
||||
mid = getattr(client, method_name)(url, Message(text))
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
assert x.res["message_object"].attachments[0]
|
12
tests/test_tests.py
Normal file
12
tests/test_tests.py
Normal file
@@ -0,0 +1,12 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_catch_event(client2, catch_event):
|
||||
mid = "test"
|
||||
with catch_event("onMessage") as x:
|
||||
client2.onMessage(mid=mid)
|
||||
assert x.res['mid'] == mid
|
112
tests/test_thread_interraction.py
Normal file
112
tests/test_thread_interraction.py
Normal file
@@ -0,0 +1,112 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from fbchat.models import (
|
||||
Message,
|
||||
ThreadType,
|
||||
FBchatFacebookError,
|
||||
TypingStatus,
|
||||
ThreadColor,
|
||||
)
|
||||
from utils import random_hex, subset
|
||||
from os import environ
|
||||
|
||||
|
||||
def test_remove_from_and_add_to_group(client1, client2, group, catch_event):
|
||||
# Test both methods, while ensuring that the user gets added to the group
|
||||
try:
|
||||
with catch_event("onPersonRemoved") as x:
|
||||
client1.removeUserFromGroup(client2.uid, group["id"])
|
||||
assert subset(
|
||||
x.res, removed_id=client2.uid, author_id=client1.uid, thread_id=group["id"]
|
||||
)
|
||||
finally:
|
||||
with catch_event("onPeopleAdded") as x:
|
||||
mid = client1.addUsersToGroup(client2.uid, group["id"])
|
||||
assert subset(
|
||||
x.res,
|
||||
mid=mid,
|
||||
added_ids=[client2.uid],
|
||||
author_id=client1.uid,
|
||||
thread_id=group["id"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
raises=FBchatFacebookError, reason="Apparently changeThreadTitle is broken"
|
||||
)
|
||||
def test_change_title(client1, catch_event, group):
|
||||
title = random_hex()
|
||||
with catch_event("onTitleChange") as x:
|
||||
mid = client1.changeThreadTitle(title, group["id"])
|
||||
assert subset(
|
||||
x.res,
|
||||
mid=mid,
|
||||
author_id=client1.uid,
|
||||
new_title=title,
|
||||
thread_id=group["id"],
|
||||
thread_type=ThreadType.GROUP,
|
||||
)
|
||||
|
||||
|
||||
def test_change_nickname(client, client_all, catch_event, compare):
|
||||
nickname = random_hex()
|
||||
with catch_event("onNicknameChange") as x:
|
||||
client.changeNickname(nickname, client_all.uid)
|
||||
assert compare(x, changed_for=client_all.uid, new_nickname=nickname)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("emoji", ["😀", "😂", "😕", "😍"])
|
||||
def test_change_emoji(client, catch_event, compare, emoji):
|
||||
with catch_event("onEmojiChange") as x:
|
||||
client.changeThreadEmoji(emoji)
|
||||
assert compare(x, new_emoji=emoji)
|
||||
|
||||
|
||||
@pytest.mark.xfail(raises=FBchatFacebookError)
|
||||
@pytest.mark.parametrize("emoji", ["🙃", "not an emoji"])
|
||||
def test_change_emoji_invalid(client, emoji):
|
||||
client.changeThreadEmoji(emoji)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"color",
|
||||
[
|
||||
pytest.mark.xfail(
|
||||
x, reason="Apparently changing ThreadColor.MESSENGER_BLUE is broken"
|
||||
)
|
||||
if x == ThreadColor.MESSENGER_BLUE
|
||||
else x
|
||||
if x == ThreadColor.PUMPKIN
|
||||
else pytest.mark.expensive(x)
|
||||
for x in ThreadColor
|
||||
],
|
||||
)
|
||||
def test_change_color(client, catch_event, compare, color):
|
||||
if color == ThreadColor.MESSENGER_BLUE:
|
||||
pytest.xfail(reason="Apparently changing ThreadColor.MESSENGER_BLUE is broken")
|
||||
with catch_event("onColorChange") as x:
|
||||
client.changeThreadColor(color)
|
||||
assert compare(x, new_color=color)
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
raises=FBchatFacebookError, strict=False, reason="Should fail, but doesn't"
|
||||
)
|
||||
def test_change_color_invalid(client):
|
||||
class InvalidColor:
|
||||
value = "#0077ff"
|
||||
|
||||
client.changeThreadColor(InvalidColor())
|
||||
|
||||
|
||||
@pytest.mark.xfail(reason="Apparently onTyping is broken")
|
||||
@pytest.mark.parametrize("status", TypingStatus)
|
||||
def test_typing_status(client, catch_event, compare, status):
|
||||
with catch_event("onTyping") as x:
|
||||
client.setTypingStatus(status)
|
||||
# x.wait(40)
|
||||
assert compare(x, status=status)
|
83
tests/utils.py
Normal file
83
tests/utils.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import threading
|
||||
import logging
|
||||
import six
|
||||
|
||||
from os import environ
|
||||
from random import randrange
|
||||
from contextlib import contextmanager
|
||||
from six import viewitems
|
||||
from fbchat import Client
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
log = logging.getLogger("fbchat.tests").addHandler(logging.NullHandler())
|
||||
|
||||
|
||||
class ClientThread(threading.Thread):
|
||||
def __init__(self, client, *args, **kwargs):
|
||||
self.client = client
|
||||
self.should_stop = threading.Event()
|
||||
super(ClientThread, self).__init__(*args, **kwargs)
|
||||
|
||||
def start(self):
|
||||
self.client.startListening()
|
||||
self.client.doOneListen() # QPrimer, Facebook now knows we're about to start pulling
|
||||
super(ClientThread, self).start()
|
||||
|
||||
def run(self):
|
||||
while not self.should_stop.is_set() and self.client.doOneListen():
|
||||
pass
|
||||
|
||||
self.client.stopListening()
|
||||
|
||||
|
||||
if six.PY2:
|
||||
event_class = threading._Event
|
||||
else:
|
||||
event_class = threading.Event
|
||||
|
||||
|
||||
class CaughtValue(event_class):
|
||||
def set(self, res):
|
||||
self.res = res
|
||||
super(CaughtValue, self).set()
|
||||
|
||||
def wait(self, timeout=3):
|
||||
super(CaughtValue, self).wait(timeout=timeout)
|
||||
|
||||
|
||||
def random_hex(length=20):
|
||||
return "{:X}".format(randrange(16 ** length))
|
||||
|
||||
|
||||
def subset(a, **b):
|
||||
print(a)
|
||||
print(b)
|
||||
return viewitems(b) <= viewitems(a)
|
||||
|
||||
|
||||
def load_variable(name, cache):
|
||||
var = environ.get(name, None)
|
||||
if var is not None:
|
||||
if cache.get(name, None) != var:
|
||||
cache.set(name, var)
|
||||
return var
|
||||
|
||||
var = cache.get(name, None)
|
||||
if var is None:
|
||||
raise ValueError("Variable {!r} neither in environment nor cache".format(name))
|
||||
return var
|
||||
|
||||
|
||||
@contextmanager
|
||||
def load_client(n, cache):
|
||||
client = Client(
|
||||
load_variable("client{}_email".format(n), cache),
|
||||
load_variable("client{}_password".format(n), cache),
|
||||
session_cookies=cache.get("client{}_session".format(n), None),
|
||||
)
|
||||
yield client
|
||||
cache.set("client{}_session".format(n), client.getSession())
|
Reference in New Issue
Block a user