Remove old, broken tests
This commit is contained in:
@@ -1,9 +1,4 @@
|
||||
import pytest
|
||||
import json
|
||||
|
||||
from utils import *
|
||||
from contextlib import contextmanager
|
||||
from fbchat import Message, Mention
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@@ -16,120 +11,3 @@ def session():
|
||||
return "<FakeSession>"
|
||||
|
||||
return FakeSession()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def user(client2):
|
||||
return {"id": client2.id, "type": None}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def group(pytestconfig):
|
||||
return {
|
||||
"id": load_variable("group_id", pytestconfig.cache),
|
||||
"type": None,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="session",
|
||||
params=["user", "group", pytest.param("none", marks=[pytest.mark.xfail()])],
|
||||
)
|
||||
def thread(request, user, group):
|
||||
return {"user": user, "group": group, "none": {"id": "0", "type": None},}[
|
||||
request.param
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client1(pytestconfig):
|
||||
with load_client(1, pytestconfig.cache) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client2(pytestconfig):
|
||||
with load_client(2, pytestconfig.cache) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def client(client1, thread):
|
||||
# TODO: These are commented out since `setDefaultThread` is removed - But now any
|
||||
# test that use this fixture is likely broken!
|
||||
# client1.setDefaultThread(thread["id"], thread["type"])
|
||||
yield client1
|
||||
# client1.resetDefaultThread()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=["client1", "client2"])
|
||||
def client_all(request, client1, client2):
|
||||
return client1 if request.param == "client1" else client2
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def catch_event(client2):
|
||||
t = ClientThread(client2)
|
||||
t.start()
|
||||
|
||||
@contextmanager
|
||||
def inner(method_name):
|
||||
caught = CaughtValue()
|
||||
old_method = getattr(client2, method_name)
|
||||
|
||||
# Will be called by the other thread
|
||||
def catch_value(*args, **kwargs):
|
||||
old_method(*args, **kwargs)
|
||||
# Make sure the `set` is only called once
|
||||
if not caught.is_set():
|
||||
caught.set(kwargs)
|
||||
|
||||
setattr(client2, method_name, catch_value)
|
||||
yield caught
|
||||
caught.wait()
|
||||
if not caught.is_set():
|
||||
raise ValueError("The value could not be caught")
|
||||
setattr(client2, method_name, old_method)
|
||||
|
||||
yield inner
|
||||
|
||||
t.should_stop.set()
|
||||
|
||||
try:
|
||||
# Make the client send a messages to itself, so the blocking pull request will return
|
||||
# This is probably not safe, since the client is making two requests simultaneously
|
||||
client2.send(Message(text=random_hex()), client2.id)
|
||||
finally:
|
||||
t.join()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def compare(client, thread):
|
||||
def inner(caught_event, **kwargs):
|
||||
d = {
|
||||
"author_id": client.id,
|
||||
"thread_id": client.id if thread["type"] == None else thread["id"],
|
||||
"thread_type": thread["type"],
|
||||
}
|
||||
d.update(kwargs)
|
||||
return subset(caught_event.res, **d)
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
@pytest.fixture(params=["me", "other", "me other"])
|
||||
def message_with_mentions(request, client, client2, group):
|
||||
text = "Hi there ["
|
||||
mentions = []
|
||||
if "me" in request.param:
|
||||
mentions.append(Mention(thread_id=client.id, offset=len(text), length=2))
|
||||
text += "me, "
|
||||
if "other" in request.param:
|
||||
mentions.append(Mention(thread_id=client2.id, offset=len(text), length=5))
|
||||
text += "other, "
|
||||
# Unused, because Facebook don't properly support sending mentions with groups as targets
|
||||
if "group" in request.param:
|
||||
mentions.append(Mention(thread_id=group["id"], offset=len(text), length=5))
|
||||
text += "group, "
|
||||
text += "nothing]"
|
||||
return Message(text, mentions=mentions)
|
||||
|
@@ -1,40 +0,0 @@
|
||||
import pytest
|
||||
import py_compile
|
||||
|
||||
from glob import glob
|
||||
from os import path, environ
|
||||
from fbchat import FacebookError, Message, Client
|
||||
|
||||
|
||||
def test_examples():
|
||||
# Compiles the examples, to check for syntax errors
|
||||
for name in glob(path.join(path.dirname(__file__), "../examples", "*.py")):
|
||||
py_compile.compile(name)
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
@pytest.mark.online
|
||||
def test_login(client1):
|
||||
assert client1.is_logged_in()
|
||||
email = client1.email
|
||||
password = client1.password
|
||||
|
||||
client1.logout()
|
||||
|
||||
assert not client1.is_logged_in()
|
||||
|
||||
with pytest.raises(FacebookError):
|
||||
client1.login("<invalid email>", "<invalid password>", max_tries=1)
|
||||
|
||||
client1.login(email, password)
|
||||
|
||||
assert client1.is_logged_in()
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
@pytest.mark.online
|
||||
def test_sessions(client1):
|
||||
session = client1.get_session()
|
||||
Client("no email needed", "no password needed", session_cookies=session)
|
||||
client1.set_session(session)
|
||||
assert client1.is_logged_in()
|
@@ -1,100 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from os import path
|
||||
from fbchat import Message, Mention, EmojiSize, Sticker
|
||||
from utils import subset, STICKER_LIST, EMOJI_LIST
|
||||
|
||||
pytestmark = pytest.mark.online
|
||||
|
||||
|
||||
def test_fetch_all_users(client1):
|
||||
users = client1.fetch_all_users()
|
||||
assert len(users) > 0
|
||||
|
||||
|
||||
def test_fetch_thread_list(client1):
|
||||
threads = client1.fetch_thread_list(limit=2)
|
||||
assert len(threads) == 2
|
||||
|
||||
|
||||
def test_fetch_threads(client1):
|
||||
threads = client1.fetch_threads(limit=2)
|
||||
assert len(threads) == 2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("emoji, emoji_size", EMOJI_LIST)
|
||||
def test_fetch_message_emoji(client, emoji, emoji_size):
|
||||
mid = client.send_emoji(emoji, emoji_size)
|
||||
(message,) = client.fetch_thread_messages(limit=1)
|
||||
|
||||
assert subset(
|
||||
vars(message), id=mid, author=client.id, text=emoji, emoji_size=emoji_size
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("emoji, emoji_size", EMOJI_LIST)
|
||||
def test_fetch_message_info_emoji(client, thread, emoji, emoji_size):
|
||||
mid = client.send_emoji(emoji, emoji_size)
|
||||
message = client.fetch_message_info(mid, thread_id=thread["id"])
|
||||
|
||||
assert subset(
|
||||
vars(message), id=mid, author=client.id, text=emoji, emoji_size=emoji_size
|
||||
)
|
||||
|
||||
|
||||
def test_fetch_message_mentions(client, thread, message_with_mentions):
|
||||
mid = client.send(message_with_mentions)
|
||||
(message,) = client.fetch_thread_messages(limit=1)
|
||||
|
||||
assert subset(
|
||||
vars(message), id=mid, author=client.id, text=message_with_mentions.text
|
||||
)
|
||||
# The mentions are not ordered by offset
|
||||
for m in message.mentions:
|
||||
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]
|
||||
|
||||
|
||||
def test_fetch_message_info_mentions(client, thread, message_with_mentions):
|
||||
mid = client.send(message_with_mentions)
|
||||
message = client.fetch_message_info(mid, thread_id=thread["id"])
|
||||
|
||||
assert subset(
|
||||
vars(message), id=mid, author=client.id, text=message_with_mentions.text
|
||||
)
|
||||
# The mentions are not ordered by offset
|
||||
for m in message.mentions:
|
||||
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sticker", STICKER_LIST)
|
||||
def test_fetch_message_sticker(client, sticker):
|
||||
mid = client.send(Message(sticker=sticker))
|
||||
(message,) = client.fetch_thread_messages(limit=1)
|
||||
|
||||
assert subset(vars(message), id=mid, author=client.id)
|
||||
assert subset(vars(message.sticker), id=sticker.id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sticker", STICKER_LIST)
|
||||
def test_fetch_message_info_sticker(client, thread, sticker):
|
||||
mid = client.send(Message(sticker=sticker))
|
||||
message = client.fetch_message_info(mid, thread_id=thread["id"])
|
||||
|
||||
assert subset(vars(message), id=mid, author=client.id)
|
||||
assert subset(vars(message.sticker), id=sticker.id)
|
||||
|
||||
|
||||
def test_fetch_info(client1, group):
|
||||
info = client1.fetch_user_info("4")["4"]
|
||||
assert info.name == "Mark Zuckerberg"
|
||||
|
||||
info = client1.fetch_group_info(group["id"])[group["id"]]
|
||||
|
||||
|
||||
def test_fetch_image_url(client):
|
||||
client.send_local_files(
|
||||
[path.join(path.dirname(__file__), "resources", "image.png")]
|
||||
)
|
||||
(message,) = client.fetch_thread_messages(limit=1)
|
||||
|
||||
assert client.fetch_image_url(message.attachments[0].id)
|
@@ -1,16 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from fbchat import Message
|
||||
from utils import subset
|
||||
|
||||
pytestmark = pytest.mark.online
|
||||
|
||||
|
||||
def test_delete_messages(client):
|
||||
text1 = "This message will stay"
|
||||
text2 = "This message will be removed"
|
||||
mid1 = client.send(Message(text=text1))
|
||||
mid2 = client.send(Message(text=text2))
|
||||
client.delete_messages(mid2)
|
||||
(message,) = client.fetch_thread_messages(limit=1)
|
||||
assert subset(vars(message), id=mid1, author=client.id, text=text1)
|
@@ -1,112 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from fbchat import PlanData
|
||||
from utils import random_hex, subset
|
||||
from time import time
|
||||
|
||||
pytestmark = pytest.mark.online
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="module",
|
||||
params=[
|
||||
# PlanData(time=int(time()) + 100, title=random_hex()),
|
||||
# pytest.param(
|
||||
# PlanData(time=int(time()), title=random_hex()),
|
||||
# marks=[pytest.mark.xfail(raises=FBchatFacebookError)],
|
||||
# ),
|
||||
# pytest.param(PlanData(time=0, title=None), marks=[pytest.mark.xfail()]),
|
||||
],
|
||||
)
|
||||
def plan_data(request, client, user, thread, catch_event, compare):
|
||||
with catch_event("on_plan_created") as x:
|
||||
client.create_plan(request.param, thread["id"])
|
||||
assert compare(x)
|
||||
assert subset(
|
||||
vars(x.res["plan"]),
|
||||
time=request.param.time,
|
||||
title=request.param.title,
|
||||
author_id=client.id,
|
||||
going=[client.id],
|
||||
declined=[],
|
||||
)
|
||||
plan_id = x.res["plan"]
|
||||
assert user["id"] in x.res["plan"].invited
|
||||
request.param.id = x.res["plan"].id
|
||||
yield x.res, request.param
|
||||
with catch_event("on_plan_deleted") as x:
|
||||
client.delete_plan(plan_id)
|
||||
assert compare(x)
|
||||
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
def test_create_delete_plan(plan_data):
|
||||
pass
|
||||
|
||||
|
||||
def test_fetch_plan_info(client, catch_event, plan_data):
|
||||
event, plan = plan_data
|
||||
fetched_plan = client.fetch_plan_info(plan.id)
|
||||
assert subset(
|
||||
vars(fetched_plan), time=plan.time, title=plan.title, author_id=int(client.id)
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("take_part", [False, True])
|
||||
def test_change_plan_participation(
|
||||
client, thread, catch_event, compare, plan_data, take_part
|
||||
):
|
||||
event, plan = plan_data
|
||||
with catch_event("on_plan_participation") as x:
|
||||
client.change_plan_participation(plan, take_part=take_part)
|
||||
assert compare(x, take_part=take_part)
|
||||
assert subset(
|
||||
vars(x.res["plan"]),
|
||||
time=plan.time,
|
||||
title=plan.title,
|
||||
author_id=client.id,
|
||||
going=[client.id] if take_part else [],
|
||||
declined=[client.id] if not take_part else [],
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
def test_edit_plan(client, thread, catch_event, compare, plan_data):
|
||||
event, plan = plan_data
|
||||
new_plan = PlanData(plan.time + 100, random_hex())
|
||||
with catch_event("on_plan_edited") as x:
|
||||
client.edit_plan(plan, new_plan)
|
||||
assert compare(x)
|
||||
assert subset(
|
||||
vars(x.res["plan"]),
|
||||
time=new_plan.time,
|
||||
title=new_plan.title,
|
||||
author_id=client.id,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
@pytest.mark.skip
|
||||
def test_on_plan_ended(client, thread, catch_event, compare):
|
||||
with catch_event("on_plan_ended") as x:
|
||||
client.create_plan(PlanData(int(time()) + 120, "Wait for ending"))
|
||||
x.wait(180)
|
||||
assert subset(
|
||||
x.res,
|
||||
thread_id=client.id if thread["type"] is None else thread["id"],
|
||||
thread_type=thread["type"],
|
||||
)
|
||||
|
||||
|
||||
# create_plan(self, plan, thread_id=None)
|
||||
# edit_plan(self, plan, new_plan)
|
||||
# delete_plan(self, plan)
|
||||
# change_plan_participation(self, plan, take_part=True)
|
||||
|
||||
# on_plan_created(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
|
||||
# on_plan_ended(self, mid=None, plan=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
|
||||
# on_plan_edited(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
|
||||
# on_plan_deleted(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
|
||||
# on_plan_participation(self, mid=None, plan=None, take_part=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
|
||||
|
||||
# fetch_plan_info(self, plan_id)
|
@@ -1,81 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from fbchat import Poll, PollOption
|
||||
from utils import random_hex, subset
|
||||
|
||||
pytestmark = pytest.mark.online
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="module",
|
||||
params=[
|
||||
(random_hex(), []),
|
||||
(random_hex(), [(random_hex(), True), (random_hex(), True),],),
|
||||
(random_hex(), [(random_hex(), False), (random_hex(), False),],),
|
||||
(
|
||||
random_hex(),
|
||||
[
|
||||
(random_hex(), True),
|
||||
(random_hex(), True),
|
||||
(random_hex(), False),
|
||||
(random_hex(), False),
|
||||
(random_hex()),
|
||||
(random_hex()),
|
||||
],
|
||||
),
|
||||
pytest.param((None, []), marks=[pytest.mark.xfail(raises=ValueError)]),
|
||||
],
|
||||
)
|
||||
def poll_data(request, client1, group, catch_event):
|
||||
with catch_event("on_poll_created") as x:
|
||||
client1.create_poll(request.param[0], request.param[1], thread_id=group["id"])
|
||||
options = client1.fetch_poll_options(x.res["poll"].id)
|
||||
return x.res, request.param, options
|
||||
|
||||
|
||||
def test_create_poll(client1, group, catch_event, poll_data):
|
||||
event, poll, _ = poll_data
|
||||
assert subset(event, author_id=client1.id, thread=group)
|
||||
assert subset(
|
||||
vars(event["poll"]), title=poll.title, options_count=len(poll.options)
|
||||
)
|
||||
for recv_option in event[
|
||||
"poll"
|
||||
].options: # The recieved options may not be the full list
|
||||
(old_option,) = list(filter(lambda o: o.text == recv_option.text, poll.options))
|
||||
voters = [client1.id] if old_option.vote else []
|
||||
assert subset(
|
||||
vars(recv_option), voters=voters, votes_count=len(voters), vote=False
|
||||
)
|
||||
|
||||
|
||||
def test_fetch_poll_options(client1, group, catch_event, poll_data):
|
||||
_, poll, options = poll_data
|
||||
assert len(options) == len(poll.options)
|
||||
for option in options:
|
||||
assert subset(vars(option))
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
def test_update_poll_vote(client1, group, catch_event, poll_data):
|
||||
event, poll, options = poll_data
|
||||
new_vote_ids = [o.id for o in options[0 : len(options) : 2] if not o.vote]
|
||||
re_vote_ids = [o.id for o in options[0 : len(options) : 2] if o.vote]
|
||||
new_options = [random_hex(), random_hex()]
|
||||
with catch_event("on_poll_voted") as x:
|
||||
client1.update_poll_vote(
|
||||
event["poll"].id,
|
||||
option_ids=new_vote_ids + re_vote_ids,
|
||||
new_options=new_options,
|
||||
)
|
||||
|
||||
assert subset(x.res, author_id=client1.id, thread=group)
|
||||
assert subset(
|
||||
vars(x.res["poll"]), title=poll.title, options_count=len(options + new_options)
|
||||
)
|
||||
for o in new_vote_ids:
|
||||
assert o in x.res["added_options"]
|
||||
assert len(x.res["added_options"]) == len(new_vote_ids) + len(new_options)
|
||||
assert set(x.res["removed_options"]) == set(
|
||||
o.id for o in options if o.vote and o.id not in re_vote_ids
|
||||
)
|
@@ -1,15 +0,0 @@
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.online
|
||||
|
||||
|
||||
def test_search_for(client1):
|
||||
users = client1.search_for_users("Mark Zuckerberg")
|
||||
assert len(users) > 0
|
||||
|
||||
u = users[0]
|
||||
|
||||
assert u.id == "4"
|
||||
assert u.photo[:4] == "http"
|
||||
assert u.url[:4] == "http"
|
||||
assert u.name == "Mark Zuckerberg"
|
@@ -1,123 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from os import path
|
||||
from fbchat import Message, Mention
|
||||
from utils import subset, STICKER_LIST, EMOJI_LIST, TEXT_LIST
|
||||
|
||||
pytestmark = pytest.mark.online
|
||||
|
||||
|
||||
@pytest.mark.parametrize("text", TEXT_LIST)
|
||||
def test_send_text(client, catch_event, compare, text):
|
||||
with catch_event("on_message") as x:
|
||||
mid = client.send(Message(text=text))
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), id=mid, author=client.id, text=text)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("emoji, emoji_size", EMOJI_LIST)
|
||||
def test_send_emoji(client, catch_event, compare, emoji, emoji_size):
|
||||
with catch_event("on_message") as x:
|
||||
mid = client.send_emoji(emoji, emoji_size)
|
||||
|
||||
assert compare(x, mid=mid, message=emoji)
|
||||
assert subset(
|
||||
vars(x.res["message_object"]),
|
||||
id=mid,
|
||||
author=client.id,
|
||||
text=emoji,
|
||||
emoji_size=emoji_size,
|
||||
)
|
||||
|
||||
|
||||
def test_send_mentions(client, catch_event, compare, message_with_mentions):
|
||||
with catch_event("on_message") as x:
|
||||
mid = client.send(message_with_mentions)
|
||||
|
||||
assert compare(x, mid=mid, message=message_with_mentions.text)
|
||||
assert subset(
|
||||
vars(x.res["message_object"]),
|
||||
id=mid,
|
||||
author=client.id,
|
||||
text=message_with_mentions.text,
|
||||
)
|
||||
# The mentions are not ordered by offset
|
||||
for m in x.res["message_object"].mentions:
|
||||
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sticker", STICKER_LIST)
|
||||
def test_send_sticker(client, catch_event, compare, sticker):
|
||||
with catch_event("on_message") as x:
|
||||
mid = client.send(Message(sticker=sticker))
|
||||
|
||||
assert compare(x, mid=mid)
|
||||
assert subset(vars(x.res["message_object"]), id=mid, author=client.id)
|
||||
assert subset(vars(x.res["message_object"].sticker), id=sticker.id)
|
||||
|
||||
|
||||
# Kept for backwards compatibility
|
||||
@pytest.mark.parametrize(
|
||||
"method_name, url",
|
||||
[
|
||||
(
|
||||
"sendRemoteImage",
|
||||
"https://github.com/carpedm20/fbchat/raw/master/tests/image.png",
|
||||
),
|
||||
("sendLocalImage", path.join(path.dirname(__file__), "resources", "image.png")),
|
||||
],
|
||||
)
|
||||
def test_send_images(client, catch_event, compare, method_name, url):
|
||||
text = "An image sent with {}".format(method_name)
|
||||
with catch_event("on_message") as x:
|
||||
mid = getattr(client, method_name)(url, Message(text))
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), id=mid, author=client.id, text=text)
|
||||
assert x.res["message_object"].attachments[0]
|
||||
|
||||
|
||||
def test_send_local_files(client, catch_event, compare):
|
||||
files = [
|
||||
"image.png",
|
||||
"image.jpg",
|
||||
"image.gif",
|
||||
"file.json",
|
||||
"file.txt",
|
||||
"audio.mp3",
|
||||
"video.mp4",
|
||||
]
|
||||
text = "Files sent locally"
|
||||
with catch_event("on_message") as x:
|
||||
mid = client.send_local_files(
|
||||
[path.join(path.dirname(__file__), "resources", f) for f in files],
|
||||
message=Message(text),
|
||||
)
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), id=mid, author=client.id, text=text)
|
||||
assert len(x.res["message_object"].attachments) == len(files)
|
||||
|
||||
|
||||
# To be changed when merged into master
|
||||
def test_send_remote_files(client, catch_event, compare):
|
||||
files = ["image.png", "data.json"]
|
||||
text = "Files sent from remote"
|
||||
with catch_event("on_message") as x:
|
||||
mid = client.send_remote_files(
|
||||
[
|
||||
"https://github.com/carpedm20/fbchat/raw/master/tests/{}".format(f)
|
||||
for f in files
|
||||
],
|
||||
message=Message(text),
|
||||
)
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), id=mid, author=client.id, text=text)
|
||||
assert len(x.res["message_object"].attachments) == len(files)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("wave_first", [True, False])
|
||||
def test_wave(client, wave_first):
|
||||
client.wave(wave_first)
|
@@ -1,10 +0,0 @@
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.online
|
||||
|
||||
|
||||
def test_catch_event(client2, catch_event):
|
||||
mid = "test"
|
||||
with catch_event("on_message") as x:
|
||||
client2.on_message(mid=mid)
|
||||
assert x.res["mid"] == mid
|
@@ -1,139 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from fbchat import Message, FacebookError
|
||||
from utils import random_hex, subset
|
||||
from os import path
|
||||
|
||||
pytestmark = pytest.mark.online
|
||||
|
||||
|
||||
def test_remove_from_and_add_to_group(client1, client2, group, catch_event):
|
||||
# Test both methods, while ensuring that the user gets added to the group
|
||||
try:
|
||||
with catch_event("on_person_removed") as x:
|
||||
client1.remove_user_from_group(client2.id, group["id"])
|
||||
assert subset(
|
||||
x.res, removed_id=client2.id, author_id=client1.id, thread_id=group["id"]
|
||||
)
|
||||
finally:
|
||||
with catch_event("on_people_added") as x:
|
||||
client1.add_users_to_group(client2.id, group["id"])
|
||||
assert subset(
|
||||
x.res, added_ids=[client2.id], author_id=client1.id, thread_id=group["id"]
|
||||
)
|
||||
|
||||
|
||||
def test_remove_from_and_add_admins_to_group(client1, client2, group, catch_event):
|
||||
# Test both methods, while ensuring that the user gets added as group admin
|
||||
try:
|
||||
with catch_event("on_admin_removed") as x:
|
||||
client1.remove_group_admins(client2.id, group["id"])
|
||||
assert subset(
|
||||
x.res, removed_id=client2.id, author_id=client1.id, thread_id=group["id"]
|
||||
)
|
||||
finally:
|
||||
with catch_event("on_admin_added") as x:
|
||||
client1.add_group_admins(client2.id, group["id"])
|
||||
assert subset(
|
||||
x.res, added_id=client2.id, author_id=client1.id, thread_id=group["id"]
|
||||
)
|
||||
|
||||
|
||||
def test_change_title(client1, group, catch_event):
|
||||
title = random_hex()
|
||||
with catch_event("on_title_change") as x:
|
||||
client1.change_thread_title(title, group["id"])
|
||||
assert subset(x.res, author_id=client1.id, new_title=title, thread=group)
|
||||
|
||||
|
||||
def test_change_nickname(client, client_all, catch_event, compare):
|
||||
nickname = random_hex()
|
||||
with catch_event("on_nickname_change") as x:
|
||||
client.change_nickname(nickname, client_all.id)
|
||||
assert compare(x, changed_for=client_all.id, new_nickname=nickname)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"emoji",
|
||||
[
|
||||
"😀",
|
||||
"😂",
|
||||
"😕",
|
||||
"😍",
|
||||
pytest.param("🙃", marks=[pytest.mark.xfail(raises=FacebookError)]),
|
||||
pytest.param("not an emoji", marks=[pytest.mark.xfail(raises=FacebookError)]),
|
||||
],
|
||||
)
|
||||
def test_change_emoji(client, catch_event, compare, emoji):
|
||||
with catch_event("on_emoji_change") as x:
|
||||
client.change_thread_emoji(emoji)
|
||||
assert compare(x, new_emoji=emoji)
|
||||
|
||||
|
||||
def test_change_image_local(client1, group, catch_event):
|
||||
url = path.join(path.dirname(__file__), "resources", "image.png")
|
||||
with catch_event("on_image_change") as x:
|
||||
image_id = client1.change_group_image_local(url, group["id"])
|
||||
assert subset(
|
||||
x.res, new_image=image_id, author_id=client1.id, thread_id=group["id"]
|
||||
)
|
||||
|
||||
|
||||
# To be changed when merged into master
|
||||
def test_change_image_remote(client1, group, catch_event):
|
||||
url = "https://github.com/carpedm20/fbchat/raw/master/tests/image.png"
|
||||
with catch_event("on_image_change") as x:
|
||||
image_id = client1.change_group_image_remote(url, group["id"])
|
||||
assert subset(
|
||||
x.res, new_image=image_id, author_id=client1.id, thread_id=group["id"]
|
||||
)
|
||||
|
||||
|
||||
def test_change_color(client, catch_event, compare):
|
||||
with catch_event("on_color_change") as x:
|
||||
client.change_thread_color("#44bec7")
|
||||
assert compare(x, new_color="#44bec7")
|
||||
|
||||
|
||||
@pytest.mark.xfail(raises=FacebookError, reason="Should fail, but doesn't")
|
||||
def test_change_color_invalid(client):
|
||||
class InvalidColor:
|
||||
value = "#0077ff"
|
||||
|
||||
client.change_thread_color(InvalidColor())
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", [True, False])
|
||||
def test_typing_status(client, catch_event, compare, status):
|
||||
with catch_event("on_typing") as x:
|
||||
client.set_typing_status(status)
|
||||
assert compare(x, status=status)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("require_admin_approval", [True, False])
|
||||
def test_change_approval_mode(client1, group, catch_event, require_admin_approval):
|
||||
with catch_event("on_approval_mode_change") as x:
|
||||
client1.change_group_approval_mode(require_admin_approval, group["id"])
|
||||
|
||||
assert subset(
|
||||
x.res,
|
||||
approval_mode=require_admin_approval,
|
||||
author_id=client1.id,
|
||||
thread_id=group["id"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("mute_time", [0, 10, 100, 1000, -1])
|
||||
def test_mute_thread(client, mute_time):
|
||||
assert client.mute_thread(mute_time)
|
||||
assert client.unmute_thread()
|
||||
|
||||
|
||||
def test_mute_thread_reactions(client):
|
||||
assert client.mute_thread_reactions()
|
||||
assert client.unmute_thread_reactions()
|
||||
|
||||
|
||||
def test_mute_thread_mentions(client):
|
||||
assert client.mute_thread_mentions()
|
||||
assert client.unmute_thread_mentions()
|
@@ -1,98 +0,0 @@
|
||||
import threading
|
||||
import logging
|
||||
import pytest
|
||||
|
||||
from os import environ
|
||||
from random import randrange
|
||||
from contextlib import contextmanager
|
||||
from fbchat import EmojiSize, FacebookError, Sticker, Client
|
||||
|
||||
log = logging.getLogger("fbchat.tests").addHandler(logging.NullHandler())
|
||||
|
||||
|
||||
EMOJI_LIST = [
|
||||
("😆", EmojiSize.SMALL),
|
||||
("😆", EmojiSize.MEDIUM),
|
||||
("😆", EmojiSize.LARGE),
|
||||
# These fail in `catch_event` because the emoji is made into a sticker
|
||||
# This should be fixed
|
||||
pytest.param(None, EmojiSize.SMALL, marks=[pytest.mark.xfail()]),
|
||||
pytest.param(None, EmojiSize.MEDIUM, marks=[pytest.mark.xfail()]),
|
||||
pytest.param(None, EmojiSize.LARGE, marks=[pytest.mark.xfail()]),
|
||||
]
|
||||
|
||||
STICKER_LIST = [
|
||||
Sticker(id="767334476626295"),
|
||||
pytest.param(Sticker(id="0"), marks=[pytest.mark.xfail(raises=FacebookError)]),
|
||||
pytest.param(Sticker(id=None), marks=[pytest.mark.xfail(raises=FacebookError)]),
|
||||
]
|
||||
|
||||
TEXT_LIST = [
|
||||
"test_send",
|
||||
"😆",
|
||||
"\\\n\t%?&'\"",
|
||||
"ˁҭʚ¹Ʋջوװ՞ޱɣࠚԹБɑȑңКએ֭ʗыԈٌʼőԈ×௴nચϚࠖణٔє܅Ԇޑط",
|
||||
"a" * 20000, # Maximum amount of characters you can send
|
||||
pytest.param("a" * 20001, marks=[pytest.mark.xfail(raises=FacebookError)]),
|
||||
pytest.param(None, marks=[pytest.mark.xfail(raises=FacebookError)]),
|
||||
]
|
||||
|
||||
|
||||
class ClientThread(threading.Thread):
|
||||
# TODO: Refactor this to work with the new listening setup
|
||||
def __init__(self, client, *args, **kwargs):
|
||||
self.client = client
|
||||
self.should_stop = threading.Event()
|
||||
super(ClientThread, self).__init__(*args, **kwargs)
|
||||
|
||||
def start(self):
|
||||
self.client._do_one_listen() # QPrimer, Facebook now knows we're about to start pulling
|
||||
super(ClientThread, self).start()
|
||||
|
||||
def run(self):
|
||||
while not self.should_stop.is_set() and self.client._do_one_listen():
|
||||
pass
|
||||
|
||||
|
||||
class CaughtValue(threading.Event):
|
||||
def set(self, res):
|
||||
self.res = res
|
||||
super(CaughtValue, self).set()
|
||||
|
||||
def wait(self, timeout=3):
|
||||
super(CaughtValue, self).wait(timeout=timeout)
|
||||
|
||||
|
||||
def random_hex(length=20):
|
||||
return "{:X}".format(randrange(16 ** length))
|
||||
|
||||
|
||||
def subset(a, **b):
|
||||
print(a)
|
||||
print(b)
|
||||
return b.items() <= a.items()
|
||||
|
||||
|
||||
def load_variable(name, cache):
|
||||
var = environ.get(name, None)
|
||||
if var is not None:
|
||||
if cache.get(name, None) != var:
|
||||
cache.set(name, var)
|
||||
return var
|
||||
|
||||
var = cache.get(name, None)
|
||||
if var is None:
|
||||
raise ValueError("Variable {!r} neither in environment nor cache".format(name))
|
||||
return var
|
||||
|
||||
|
||||
@contextmanager
|
||||
def load_client(n, cache):
|
||||
client = Client(
|
||||
load_variable("client{}_email".format(n), cache),
|
||||
load_variable("client{}_password".format(n), cache),
|
||||
session_cookies=cache.get("client{}_session".format(n), None),
|
||||
max_tries=1,
|
||||
)
|
||||
yield client
|
||||
cache.set("client{}_session".format(n), client.get_session())
|
Reference in New Issue
Block a user