Merge pull request #318 from kapi2289/master

Bunch of new methods, bunch of fixes, bunch of tests
This commit is contained in:
Mads Marquart
2018-08-29 23:55:17 +02:00
committed by GitHub
20 changed files with 1758 additions and 305 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -128,6 +128,71 @@ def graphql_to_attachment(a):
uid=a.get('legacy_attachment_id')
)
def graphql_to_poll(a):
rtn = Poll(
title=a.get('title') if a.get('title') else a.get("text"),
options=[graphql_to_poll_option(m) for m in a.get('options')]
)
rtn.uid = int(a["id"])
rtn.options_count = a.get("total_count")
return rtn
def graphql_to_poll_option(a):
if a.get('viewer_has_voted') is None:
vote = None
elif isinstance(a['viewer_has_voted'], bool):
vote = a['viewer_has_voted']
else:
vote = a['viewer_has_voted'] == 'true'
rtn = PollOption(
text=a.get('text'),
vote=vote
)
rtn.uid = int(a["id"])
rtn.voters = [m.get('node').get('id') for m in a.get('voters').get('edges')] if isinstance(a.get('voters'), dict) else a.get('voters')
rtn.votes_count = a.get('voters').get('count') if isinstance(a.get('voters'), dict) else a.get('total_count')
return rtn
def graphql_to_plan(a):
if a.get('event_members'):
rtn = Plan(
time=a.get('event_time'),
title=a.get('title'),
location=a.get('location_name')
)
if a.get('location_id') != 0:
rtn.location_id = str(a.get('location_id'))
rtn.uid = a.get('oid')
rtn.author_id = a.get('creator_id')
guests = a.get("event_members")
rtn.going = [uid for uid in guests if guests[uid] == "GOING"]
rtn.declined = [uid for uid in guests if guests[uid] == "DECLINED"]
rtn.invited = [uid for uid in guests if guests[uid] == "INVITED"]
return rtn
elif a.get('id') is None:
rtn = Plan(
time=a.get('event_time'),
title=a.get('event_title'),
location=a.get('event_location_name'),
location_id=a.get('event_location_id')
)
rtn.uid = a.get('event_id')
rtn.author_id = a.get('event_creator_id')
guests = json.loads(a.get('guest_state_list'))
else:
rtn = Plan(
time=a.get('time'),
title=a.get('event_title'),
location=a.get('location_name')
)
rtn.uid = a.get('id')
rtn.author_id = a.get('lightweight_event_creator').get('id')
guests = a.get('event_reminder_members').get('edges')
rtn.going = [m.get('node').get('id') for m in guests if m.get('guest_list_state') == "GOING"]
rtn.declined = [m.get('node').get('id') for m in guests if m.get('guest_list_state') == "DECLINED"]
rtn.invited = [m.get('node').get('id') for m in guests if m.get('guest_list_state') == "INVITED"]
return rtn
def graphql_to_message(message):
if message.get('message_sender') is None:
message['message_sender'] = {}
@@ -155,6 +220,7 @@ def graphql_to_user(user):
if user.get('profile_picture') is None:
user['profile_picture'] = {}
c_info = get_customization_info(user)
plan = graphql_to_plan(user['event_reminders']['nodes'][0]) if user.get('event_reminders', dict()).get('nodes') else None
return User(
user['id'],
url=user.get('url'),
@@ -169,7 +235,8 @@ def graphql_to_user(user):
own_nickname=c_info.get('own_nickname'),
photo=user['profile_picture'].get('uri'),
name=user.get('name'),
message_count=user.get('messages_count')
message_count=user.get('messages_count'),
plan=plan,
)
def graphql_to_thread(thread):
@@ -191,6 +258,8 @@ def graphql_to_thread(thread):
else:
last_name = user.get('name').split(first_name, 1).pop().strip()
plan = graphql_to_plan(thread['event_reminders']['nodes'][0]) if thread.get('event_reminders', dict()).get('nodes') else None
return User(
user['id'],
url=user.get('url'),
@@ -206,7 +275,8 @@ def graphql_to_thread(thread):
own_nickname=c_info.get('own_nickname'),
photo=user['big_image_src'].get('uri'),
message_count=thread.get('messages_count'),
last_message_timestamp=last_message_timestamp
last_message_timestamp=last_message_timestamp,
plan=plan,
)
else:
raise FBchatException('Unknown thread type: {}, with data: {}'.format(thread.get('thread_type'), thread))
@@ -218,6 +288,7 @@ def graphql_to_group(group):
last_message_timestamp = None
if 'last_message' in group:
last_message_timestamp = group['last_message']['nodes'][0]['timestamp_precise']
plan = graphql_to_plan(group['event_reminders']['nodes'][0]) if group.get('event_reminders', dict()).get('nodes') else None
return Group(
group['thread_key']['thread_fbid'],
participants=set([node['messaging_actor']['id'] for node in group['all_participants']['nodes']]),
@@ -227,13 +298,15 @@ def graphql_to_group(group):
photo=group['image'].get('uri'),
name=group.get('name'),
message_count=group.get('messages_count'),
last_message_timestamp=last_message_timestamp
last_message_timestamp=last_message_timestamp,
plan=plan,
)
def graphql_to_room(room):
if room.get('image') is None:
room['image'] = {}
c_info = get_customization_info(room)
plan = graphql_to_plan(room['event_reminders']['nodes'][0]) if room.get('event_reminders', dict()).get('nodes') else None
return Room(
room['thread_key']['thread_fbid'],
participants=set([node['messaging_actor']['id'] for node in room['all_participants']['nodes']]),
@@ -248,6 +321,7 @@ def graphql_to_room(room):
approval_requests = set(node.get('id') for node in room['thread_queue_metadata'].get('approval_requests', {}).get('nodes')),
join_link = room['joinable_mode'].get('link'),
privacy_mode = bool(room.get('privacy_mode')),
plan=plan,
)
def graphql_to_page(page):
@@ -255,6 +329,7 @@ def graphql_to_page(page):
page['profile_picture'] = {}
if page.get('city') is None:
page['city'] = {}
plan = graphql_to_plan(page['event_reminders']['nodes'][0]) if page.get('event_reminders', dict()).get('nodes') else None
return Page(
page['id'],
url=page.get('url'),
@@ -262,7 +337,8 @@ def graphql_to_page(page):
category=page.get('category_type'),
photo=page['profile_picture'].get('uri'),
name=page.get('name'),
message_count=page.get('messages_count')
message_count=page.get('messages_count'),
plan=plan,
)
def graphql_queries_to_json(*queries):

View File

@@ -37,7 +37,9 @@ class Thread(object):
last_message_timestamp = None
#: Number of messages in the thread
message_count = None
def __init__(self, _type, uid, photo=None, name=None, last_message_timestamp=None, message_count=None):
#: Set :class:`Plan`
plan = None
def __init__(self, _type, uid, photo=None, name=None, last_message_timestamp=None, message_count=None, plan=None):
"""Represents a Facebook thread"""
self.uid = str(uid)
self.type = _type
@@ -45,6 +47,7 @@ class Thread(object):
self.name = name
self.last_message_timestamp = last_message_timestamp
self.message_count = message_count
self.plan = plan
def __repr__(self):
return self.__unicode__()
@@ -436,6 +439,87 @@ class Mention(object):
def __unicode__(self):
return '<Mention {}: offset={} length={}>'.format(self.thread_id, self.offset, self.length)
class Poll(object):
#: ID of the poll
uid = None
#: Title of the poll
title = None
#: List of :class:`PollOption`, can be fetched with :func:`fbchat.Client.fetchPollOptions`
options = None
#: Options count
options_count = None
def __init__(self, title, options):
"""Represents a poll"""
self.title = title
self.options = options
def __repr__(self):
return self.__unicode__()
def __unicode__(self):
return '<Poll ({}): {} options={}>'.format(self.uid, repr(self.title), self.options)
class PollOption(object):
#: ID of the poll option
uid = None
#: Text of the poll option
text = None
#: Whether vote when creating or client voted
vote = None
#: ID of the users who voted for this poll option
voters = None
#: Votes count
votes_count = None
def __init__(self, text, vote=False):
"""Represents a poll option"""
self.text = text
self.vote = vote
def __repr__(self):
return self.__unicode__()
def __unicode__(self):
return '<PollOption ({}): {} voters={}>'.format(self.uid, repr(self.text), self.voters)
class Plan(object):
#: ID of the plan
uid = None
#: Plan time (unix time stamp), only precise down to the minute
time = None
#: Plan title
title = None
#: Plan location name
location = None
#: Plan location ID
location_id = None
#: ID of the plan creator
author_id = None
#: List of the people IDs who will take part in the plan
going = None
#: List of the people IDs who won't take part in the plan
declined = None
#: List of the people IDs who are invited to the plan
invited = None
def __init__(self, time, title, location=None, location_id=None):
"""Represents a plan"""
self.time = int(time)
self.title = title
self.location = location or ''
self.location_id = location_id or ''
self.author_id = None
self.going = []
self.declined = []
self.invited = []
def __repr__(self):
return self.__unicode__()
def __unicode__(self):
return '<Plan ({}): {} time={}, location={}, location_id={}>'.format(self.uid, repr(self.title), self.time, repr(self.location), repr(self.location_id))
class Enum(enum.Enum):
"""Used internally by fbchat to support enumerations"""
def __repr__(self):

View File

@@ -5,8 +5,12 @@ import re
import json
from time import time
from random import random
from contextlib import contextmanager
from mimetypes import guess_type
from os.path import basename
import warnings
import logging
import requests
from .models import *
try:
@@ -48,16 +52,6 @@ LIKES = {
's': EmojiSize.SMALL
}
MessageReactionFix = {
'😍': ('0001f60d', '%F0%9F%98%8D'),
'😆': ('0001f606', '%F0%9F%98%86'),
'😮': ('0001f62e', '%F0%9F%98%AE'),
'😢': ('0001f622', '%F0%9F%98%A2'),
'😠': ('0001f620', '%F0%9F%98%A0'),
'👍': ('0001f44d', '%F0%9F%91%8D'),
'👎': ('0001f44e', '%F0%9F%91%8E')
}
GENDERS = {
# For standard requests
@@ -97,6 +91,9 @@ class ReqUrl(object):
UNREAD_THREADS = "https://www.facebook.com/ajax/mercury/unread_threads.php"
UNSEEN_THREADS = "https://www.facebook.com/mercury/unseen_thread_ids/"
THREADS = "https://www.facebook.com/ajax/mercury/threadlist_info.php"
MOVE_THREAD = "https://www.facebook.com/ajax/mercury/move_thread.php"
ARCHIVED_STATUS = "https://www.facebook.com/ajax/mercury/change_archived_status.php?dpr=1"
PINNED_STATUS = "https://www.facebook.com/ajax/mercury/change_pinned_status.php?dpr=1"
MESSAGES = "https://www.facebook.com/ajax/mercury/thread_info.php"
READ_STATUS = "https://www.facebook.com/ajax/mercury/change_read_status.php"
DELIVERED = "https://www.facebook.com/ajax/mercury/delivery_receipts.php"
@@ -122,10 +119,27 @@ class ReqUrl(object):
TYPING = "https://www.facebook.com/ajax/messaging/typ.php"
GRAPHQL = "https://www.facebook.com/api/graphqlbatch/"
ATTACHMENT_PHOTO = "https://www.facebook.com/mercury/attachments/photo/"
EVENT_REMINDER = "https://www.facebook.com/ajax/eventreminder/create"
PLAN_CREATE = "https://www.facebook.com/ajax/eventreminder/create"
PLAN_INFO = "https://www.facebook.com/ajax/eventreminder"
PLAN_CHANGE = "https://www.facebook.com/ajax/eventreminder/submit"
PLAN_PARTICIPATION = "https://www.facebook.com/ajax/eventreminder/rsvp"
MODERN_SETTINGS_MENU = "https://www.facebook.com/bluebar/modern_settings_menu/"
REMOVE_FRIEND = "https://m.facebook.com/a/removefriend.php"
BLOCK_USER = "https://www.facebook.com/messaging/block_messages/?dpr=1"
UNBLOCK_USER = "https://www.facebook.com/messaging/unblock_messages/?dpr=1"
SAVE_ADMINS = "https://www.facebook.com/messaging/save_admins/?dpr=1"
APPROVAL_MODE = "https://www.facebook.com/messaging/set_approval_mode/?dpr=1"
CREATE_GROUP = "https://m.facebook.com/messages/send/?icm=1"
DELETE_THREAD = "https://www.facebook.com/ajax/mercury/delete_thread.php?dpr=1"
DELETE_MESSAGES = "https://www.facebook.com/ajax/mercury/delete_messages.php?dpr=1"
MUTE_THREAD = "https://www.facebook.com/ajax/mercury/change_mute_thread.php?dpr=1"
MUTE_REACTIONS = "https://www.facebook.com/ajax/mercury/change_reactions_mute_thread/?dpr=1"
MUTE_MENTIONS = "https://www.facebook.com/ajax/mercury/change_mentions_mute_thread/?dpr=1"
CREATE_POLL = "https://www.facebook.com/messaging/group_polling/create_poll/?dpr=1"
UPDATE_VOTE = "https://www.facebook.com/messaging/group_polling/update_vote/?dpr=1"
GET_POLL_OPTIONS = "https://www.facebook.com/ajax/mercury/get_poll_options"
SEARCH_MESSAGES = "https://www.facebook.com/ajax/mercury/search_snippets.php?dpr=1"
MARK_SPAM = "https://www.facebook.com/ajax/mercury/mark_spam.php?dpr=1"
pull_channel = 0
@@ -239,3 +253,47 @@ def get_emojisize_from_tags(tags):
except (KeyError, IndexError):
log.exception('Could not determine emoji size from {} - {}'.format(tags, tmp))
return None
def require_list(list_):
if isinstance(list_, list):
return set(list_)
else:
return set([list_])
def mimetype_to_key(mimetype):
if not mimetype:
return "file_id"
if mimetype == "image/gif":
return "gif_id"
x = mimetype.split("/")
if x[0] in ["video", "image", "audio"]:
return "%s_id" % x[0]
return "file_id"
def get_files_from_urls(file_urls):
files = []
for file_url in file_urls:
r = requests.get(file_url)
# We could possibly use r.headers.get('Content-Disposition'), see
# https://stackoverflow.com/a/37060758
files.append((
basename(file_url),
r.content,
r.headers.get('Content-Type') or guess_type(file_url)[0],
))
return files
@contextmanager
def get_files_from_paths(filenames):
files = []
for filename in filenames:
files.append((
basename(filename),
open(filename, 'rb'),
guess_type(filename)[0],
))
yield files
for fn, fp, ft in files:
fp.close()

View File

@@ -7,7 +7,7 @@ import json
from utils import *
from contextlib import contextmanager
from fbchat.models import ThreadType
from fbchat.models import ThreadType, Message, Mention
@pytest.fixture(scope="session")
@@ -20,9 +20,13 @@ def group(pytestconfig):
return {"id": load_variable("group_id", pytestconfig.cache), "type": ThreadType.GROUP}
@pytest.fixture(scope="session", params=["user", "group"])
@pytest.fixture(scope="session", params=["user", "group", pytest.mark.xfail("none")])
def thread(request, user, group):
return user if request.param == "user" else group
return {
"user": user,
"group": group,
"none": {"id": "0", "type": ThreadType.GROUP}
}[request.param]
@pytest.fixture(scope="session")
@@ -37,7 +41,7 @@ def client2(pytestconfig):
yield c
@pytest.fixture # (scope="session")
@pytest.fixture(scope="module")
def client(client1, thread):
client1.setDefaultThread(thread["id"], thread["type"])
yield client1
@@ -80,12 +84,12 @@ def catch_event(client2):
try:
# Make the client send a messages to itself, so the blocking pull request will return
# This is probably not safe, since the client is making two requests simultaneously
client2.sendMessage("Shutdown", client2.uid)
client2.sendMessage(random_hex(), client2.uid)
finally:
t.join()
@pytest.fixture # (scope="session")
@pytest.fixture(scope="module")
def compare(client, thread):
def inner(caught_event, **kwargs):
d = {
@@ -99,3 +103,21 @@ def compare(client, thread):
return subset(caught_event.res, **d)
return inner
@pytest.fixture(params=["me", "other", "me other"])
def message_with_mentions(request, client, client2, group):
text = "Hi there ["
mentions = []
if 'me' in request.param:
mentions.append(Mention(thread_id=client.uid, offset=len(text), length=2))
text += "me, "
if 'other' in request.param:
mentions.append(Mention(thread_id=client2.uid, offset=len(text), length=5))
text += "other, "
# Unused, because Facebook don't properly support sending mentions with groups as targets
if 'group' in request.param:
mentions.append(Mention(thread_id=group["id"], offset=len(text), length=5))
text += "group, "
text += "nothing]"
return Message(text, mentions=mentions)

View File

@@ -1,6 +0,0 @@
{
"email": "",
"password": "",
"user_thread_id": "",
"group_thread_id": ""
}

BIN
tests/resources/audio.mp3 Normal file

Binary file not shown.

View File

@@ -0,0 +1,4 @@
{
"some": "data",
"in": "here"
}

1
tests/resources/file.txt Normal file
View File

@@ -0,0 +1 @@
This is just a text file

BIN
tests/resources/image.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

BIN
tests/resources/image.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 KiB

View File

Before

Width:  |  Height:  |  Size: 1.6 KiB

After

Width:  |  Height:  |  Size: 1.6 KiB

BIN
tests/resources/video.mp4 Normal file

Binary file not shown.

View File

@@ -6,32 +6,20 @@ import pytest
from os import path
from fbchat.models import ThreadType, Message, Mention, EmojiSize, Sticker
from utils import subset
from utils import subset, STICKER_LIST, EMOJI_LIST
def test_fetch_all_users(client):
users = client.fetchAllUsers()
def test_fetch_all_users(client1):
users = client1.fetchAllUsers()
assert len(users) > 0
def test_fetch_thread_list(client):
threads = client.fetchThreadList(limit=2)
def test_fetch_thread_list(client1):
threads = client1.fetchThreadList(limit=2)
assert len(threads) == 2
@pytest.mark.parametrize(
"emoji, emoji_size",
[
("😆", EmojiSize.SMALL),
("😆", EmojiSize.MEDIUM),
("😆", EmojiSize.LARGE),
# These fail because the emoji is made into a sticker
# This should be fixed
pytest.mark.xfail((None, EmojiSize.SMALL)),
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
pytest.mark.xfail((None, EmojiSize.LARGE)),
],
)
@pytest.mark.parametrize("emoji, emoji_size", EMOJI_LIST)
def test_fetch_message_emoji(client, emoji, emoji_size):
mid = client.sendEmoji(emoji, emoji_size)
message, = client.fetchThreadMessages(limit=1)
@@ -41,25 +29,52 @@ def test_fetch_message_emoji(client, emoji, emoji_size):
)
def test_fetch_message_mentions(client):
text = "This is a test of fetchThreadMessages"
mentions = [Mention(client.uid, offset=10, length=4)]
@pytest.mark.parametrize("emoji, emoji_size", EMOJI_LIST)
def test_fetch_message_info_emoji(client, thread, emoji, emoji_size):
mid = client.sendEmoji(emoji, emoji_size)
message = client.fetchMessageInfo(mid, thread_id=thread["id"])
mid = client.send(Message(text, mentions=mentions))
assert subset(
vars(message), uid=mid, author=client.uid, text=emoji, emoji_size=emoji_size
)
def test_fetch_message_mentions(client, thread, message_with_mentions):
mid = client.send(message_with_mentions)
message, = client.fetchThreadMessages(limit=1)
assert subset(vars(message), uid=mid, author=client.uid, text=text)
for i, m in enumerate(mentions):
assert vars(message.mentions[i]) == vars(m)
assert subset(vars(message), uid=mid, author=client.uid, text=message_with_mentions.text)
# The mentions are not ordered by offset
for m in message.mentions:
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]
@pytest.mark.parametrize("sticker_id", ["767334476626295"])
def test_fetch_message_sticker(client, sticker_id):
mid = client.send(Message(sticker=Sticker(sticker_id)))
def test_fetch_message_info_mentions(client, thread, message_with_mentions):
mid = client.send(message_with_mentions)
message = client.fetchMessageInfo(mid, thread_id=thread["id"])
assert subset(vars(message), uid=mid, author=client.uid, text=message_with_mentions.text)
# The mentions are not ordered by offset
for m in message.mentions:
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]
@pytest.mark.parametrize("sticker", STICKER_LIST)
def test_fetch_message_sticker(client, sticker):
mid = client.send(Message(sticker=sticker))
message, = client.fetchThreadMessages(limit=1)
assert subset(vars(message), uid=mid, author=client.uid)
assert subset(vars(message.sticker), uid=sticker_id)
assert subset(vars(message.sticker), uid=sticker.uid)
@pytest.mark.parametrize("sticker", STICKER_LIST)
def test_fetch_message_info_sticker(client, thread, sticker):
mid = client.send(Message(sticker=sticker))
message = client.fetchMessageInfo(mid, thread_id=thread["id"])
assert subset(vars(message), uid=mid, author=client.uid)
assert subset(vars(message.sticker), uid=sticker.uid)
def test_fetch_info(client1, group):
@@ -71,9 +86,7 @@ def test_fetch_info(client1, group):
def test_fetch_image_url(client):
url = path.join(path.dirname(__file__), "image.png")
client.sendLocalImage(url)
client.sendLocalFiles([path.join(path.dirname(__file__), "resources", "image.png")])
message, = client.fetchThreadMessages(limit=1)
assert client.fetchImageUrl(message.attachments[0].uid)

View File

@@ -5,8 +5,19 @@ from __future__ import unicode_literals
import pytest
from fbchat.models import Message, MessageReaction
from utils import subset
def test_set_reaction(client):
mid = client.send(Message(text="This message will be reacted to"))
client.reactToMessage(mid, MessageReaction.LOVE)
def test_delete_messages(client):
text1 = "This message will stay"
text2 = "This message will be removed"
mid1 = client.sendMessage(text1)
mid2 = client.sendMessage(text2)
client.deleteMessages(mid2)
message, = client.fetchThreadMessages(limit=1)
assert subset(vars(message), uid=mid1, author=client.uid, text=text1)

105
tests/test_plans.py Normal file
View File

@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import pytest
from fbchat.models import Plan, FBchatFacebookError, ThreadType
from utils import random_hex, subset
from time import time
@pytest.fixture(scope="module", params=[
Plan(int(time()) + 100, random_hex()),
pytest.mark.xfail(Plan(int(time()), random_hex()), raises=FBchatFacebookError),
pytest.mark.xfail(Plan(0, None)),
])
def plan_data(request, client, user, thread, catch_event, compare):
with catch_event("onPlanCreated") as x:
client.createPlan(request.param, thread["id"])
assert compare(x)
assert subset(
vars(x.res["plan"]),
time=request.param.time,
title=request.param.title,
author_id=client.uid,
going=[client.uid],
declined=[],
)
plan_id = x.res["plan"]
assert user["id"] in x.res["plan"].invited
request.param.uid = x.res["plan"].uid
yield x.res, request.param
with catch_event("onPlanDeleted") as x:
client.deletePlan(plan_id)
assert compare(x)
@pytest.mark.tryfirst
def test_create_delete_plan(plan_data):
pass
def test_fetch_plan_info(client, catch_event, plan_data):
event, plan = plan_data
fetched_plan = client.fetchPlanInfo(plan.uid)
assert subset(
vars(fetched_plan),
time=plan.time,
title=plan.title,
author_id=int(client.uid),
)
@pytest.mark.parametrize("take_part", [False, True])
def test_change_plan_participation(client, thread, catch_event, compare, plan_data, take_part):
event, plan = plan_data
with catch_event("onPlanParticipation") as x:
client.changePlanParticipation(plan, take_part=take_part)
assert compare(x, take_part=take_part)
assert subset(
vars(x.res["plan"]),
time=plan.time,
title=plan.title,
author_id=client.uid,
going=[client.uid] if take_part else [],
declined=[client.uid] if not take_part else [],
)
@pytest.mark.trylast
def test_edit_plan(client, thread, catch_event, compare, plan_data):
event, plan = plan_data
new_plan = Plan(plan.time + 100, random_hex())
with catch_event("onPlanEdited") as x:
client.editPlan(plan, new_plan)
assert compare(x)
assert subset(
vars(x.res["plan"]),
time=new_plan.time,
title=new_plan.title,
author_id=client.uid,
)
@pytest.mark.trylast
@pytest.mark.expensive
def test_on_plan_ended(client, thread, catch_event, compare):
with catch_event("onPlanEnded") as x:
client.createPlan(Plan(int(time()) + 120, "Wait for ending"))
x.wait(180)
assert subset(x.res, thread_id=client.uid if thread["type"] == ThreadType.USER else thread["id"], thread_type=thread["type"])
#createPlan(self, plan, thread_id=None)
#editPlan(self, plan, new_plan)
#deletePlan(self, plan)
#changePlanParticipation(self, plan, take_part=True)
#onPlanCreated(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#onPlanEnded(self, mid=None, plan=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#onPlanEdited(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#onPlanDeleted(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#onPlanParticipation(self, mid=None, plan=None, take_part=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#fetchPlanInfo(self, plan_id)

79
tests/test_polls.py Normal file
View File

@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import pytest
from fbchat.models import Poll, PollOption, ThreadType
from utils import random_hex, subset
@pytest.fixture(scope="module", params=[
Poll(title=random_hex(), options=[]),
Poll(title=random_hex(), options=[
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=True),
]),
Poll(title=random_hex(), options=[
PollOption(random_hex(), vote=False),
PollOption(random_hex(), vote=False),
]),
Poll(title=random_hex(), options=[
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=False),
PollOption(random_hex(), vote=False),
PollOption(random_hex()),
PollOption(random_hex()),
]),
pytest.mark.xfail(Poll(title=None, options=[]), raises=ValueError),
])
def poll_data(request, client1, group, catch_event):
with catch_event("onPollCreated") as x:
client1.createPoll(request.param, thread_id=group["id"])
options = client1.fetchPollOptions(x.res["poll"].uid)
return x.res, request.param, options
def test_create_poll(client1, group, catch_event, poll_data):
event, poll, _ = poll_data
assert subset(
event,
author_id=client1.uid,
thread_id=group["id"],
thread_type=ThreadType.GROUP,
)
assert subset(vars(event["poll"]), title=poll.title, options_count=len(poll.options))
for recv_option in event["poll"].options: # The recieved options may not be the full list
old_option, = list(filter(lambda o: o.text == recv_option.text, poll.options))
voters = [client1.uid] if old_option.vote else []
assert subset(vars(recv_option), voters=voters, votes_count=len(voters), vote=False)
def test_fetch_poll_options(client1, group, catch_event, poll_data):
_, poll, options = poll_data
assert len(options) == len(poll.options)
for option in options:
assert subset(vars(option))
@pytest.mark.trylast
def test_update_poll_vote(client1, group, catch_event, poll_data):
event, poll, options = poll_data
new_vote_ids = [o.uid for o in options[0:len(options):2] if not o.vote]
re_vote_ids = [o.uid for o in options[0:len(options):2] if o.vote]
new_options = [random_hex(), random_hex()]
with catch_event("onPollVoted") as x:
client1.updatePollVote(event["poll"].uid, option_ids=new_vote_ids + re_vote_ids, new_options=new_options)
assert subset(
x.res,
author_id=client1.uid,
thread_id=group["id"],
thread_type=ThreadType.GROUP,
)
assert subset(vars(x.res["poll"]), title=poll.title, options_count=len(options + new_options))
for o in new_vote_ids:
assert o in x.res["added_options"]
assert len(x.res["added_options"]) == len(new_vote_ids) + len(new_options)
assert set(x.res["removed_options"]) == set(o.uid for o in options if o.vote and o.uid not in re_vote_ids)

View File

@@ -5,20 +5,11 @@ from __future__ import unicode_literals
import pytest
from os import path
from fbchat.models import Message, Mention, EmojiSize, FBchatFacebookError, Sticker
from utils import subset
from fbchat.models import FBchatFacebookError, Message, Mention
from utils import subset, STICKER_LIST, EMOJI_LIST, TEXT_LIST
@pytest.mark.parametrize(
"text",
[
"test_send",
"😆",
"\\\n\t%?&'\"",
"ˁҭʚ¹Ʋջوװ՞ޱɣࠚԹБɑȑңКએ֭ʗыԈٌʼőԈ×௴nચϚࠖణٔє܅Ԇޑط",
"a" * 20000, # Maximum amount of characters you can send
],
)
@pytest.mark.parametrize("text", TEXT_LIST)
def test_send_text(client, catch_event, compare, text):
with catch_event("onMessage") as x:
mid = client.sendMessage(text)
@@ -27,19 +18,7 @@ def test_send_text(client, catch_event, compare, text):
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
@pytest.mark.parametrize(
"emoji, emoji_size",
[
("😆", EmojiSize.SMALL),
("😆", EmojiSize.MEDIUM),
("😆", EmojiSize.LARGE),
# These fail because the emoji is made into a sticker
# This should be fixed
pytest.mark.xfail((None, EmojiSize.SMALL)),
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
pytest.mark.xfail((None, EmojiSize.LARGE)),
],
)
@pytest.mark.parametrize("emoji, emoji_size", EMOJI_LIST)
def test_send_emoji(client, catch_event, compare, emoji, emoji_size):
with catch_event("onMessage") as x:
mid = client.sendEmoji(emoji, emoji_size)
@@ -54,42 +33,28 @@ def test_send_emoji(client, catch_event, compare, emoji, emoji_size):
)
@pytest.mark.xfail(raises=FBchatFacebookError)
@pytest.mark.parametrize("message", [Message("a" * 20001)])
def test_send_invalid(client, message):
client.send(message)
def test_send_mentions(client, client2, thread, catch_event, compare):
text = "Hi there @me, @other and @thread"
mentions = [
dict(thread_id=client.uid, offset=9, length=3),
dict(thread_id=client2.uid, offset=14, length=6),
dict(thread_id=thread["id"], offset=26, length=7),
]
def test_send_mentions(client, catch_event, compare, message_with_mentions):
with catch_event("onMessage") as x:
mid = client.send(Message(text, mentions=[Mention(**d) for d in mentions]))
mid = client.send(message_with_mentions)
assert compare(x, mid=mid, message=text)
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
assert compare(x, mid=mid, message=message_with_mentions.text)
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=message_with_mentions.text)
# The mentions are not ordered by offset
for m in x.res["message_object"].mentions:
assert vars(m) in mentions
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]
@pytest.mark.parametrize(
"sticker_id",
["767334476626295", pytest.mark.xfail("0", raises=FBchatFacebookError)],
)
def test_send_sticker(client, catch_event, compare, sticker_id):
@pytest.mark.parametrize("sticker", STICKER_LIST)
def test_send_sticker(client, catch_event, compare, sticker):
with catch_event("onMessage") as x:
mid = client.send(Message(sticker=Sticker(sticker_id)))
mid = client.send(Message(sticker=sticker))
assert compare(x, mid=mid)
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid)
assert subset(vars(x.res["message_object"].sticker), uid=sticker_id)
assert subset(vars(x.res["message_object"].sticker), uid=sticker.uid)
# Kept for backwards compatibility
@pytest.mark.parametrize(
"method_name, url",
[
@@ -97,7 +62,7 @@ def test_send_sticker(client, catch_event, compare, sticker_id):
"sendRemoteImage",
"https://github.com/carpedm20/fbchat/raw/master/tests/image.png",
),
("sendLocalImage", path.join(path.dirname(__file__), "image.png")),
("sendLocalImage", path.join(path.dirname(__file__), "resources", "image.png")),
],
)
def test_send_images(client, catch_event, compare, method_name, url):
@@ -108,3 +73,37 @@ def test_send_images(client, catch_event, compare, method_name, url):
assert compare(x, mid=mid, message=text)
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
assert x.res["message_object"].attachments[0]
def test_send_local_files(client, catch_event, compare):
files = ["image.png", "image.jpg", "image.gif", "file.json", "file.txt", "audio.mp3", "video.mp4"]
text = "Files sent locally"
with catch_event("onMessage") as x:
mid = client.sendLocalFiles(
[path.join(path.dirname(__file__), "resources", f) for f in files],
message=Message(text),
)
assert compare(x, mid=mid, message=text)
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
assert len(x.res["message_object"].attachments) == len(files)
# To be changed when merged into master
def test_send_remote_files(client, catch_event, compare):
files = ["image.png", "data.json"]
text = "Files sent from remote"
with catch_event("onMessage") as x:
mid = client.sendRemoteFiles(
["https://github.com/carpedm20/fbchat/raw/master/tests/{}".format(f) for f in files],
message=Message(text),
)
assert compare(x, mid=mid, message=text)
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
assert len(x.res["message_object"].attachments) == len(files)
@pytest.mark.parametrize('wave_first', [True, False])
def test_wave(client, wave_first):
client.wave(wave_first)

View File

@@ -12,7 +12,7 @@ from fbchat.models import (
ThreadColor,
)
from utils import random_hex, subset
from os import environ
from os import path
def test_remove_from_and_add_to_group(client1, client2, group, catch_event):
@@ -31,16 +31,28 @@ def test_remove_from_and_add_to_group(client1, client2, group, catch_event):
)
@pytest.mark.xfail(
raises=FBchatFacebookError, reason="Apparently changeThreadTitle is broken"
)
def test_change_title(client1, catch_event, group):
def test_remove_from_and_add_admins_to_group(client1, client2, group, catch_event):
# Test both methods, while ensuring that the user gets added as group admin
try:
with catch_event("onAdminRemoved") as x:
client1.removeGroupAdmins(client2.uid, group["id"])
assert subset(
x.res, removed_id=client2.uid, author_id=client1.uid, thread_id=group["id"]
)
finally:
with catch_event("onAdminAdded") as x:
client1.addGroupAdmins(client2.uid, group["id"])
assert subset(
x.res, added_id=client2.uid, author_id=client1.uid, thread_id=group["id"]
)
def test_change_title(client1, group, catch_event):
title = random_hex()
with catch_event("onTitleChange") as x:
mid = client1.changeThreadTitle(title, group["id"])
client1.changeThreadTitle(title, group["id"], thread_type=ThreadType.GROUP)
assert subset(
x.res,
mid=mid,
author_id=client1.uid,
new_title=title,
thread_id=group["id"],
@@ -55,17 +67,33 @@ def test_change_nickname(client, client_all, catch_event, compare):
assert compare(x, changed_for=client_all.uid, new_nickname=nickname)
@pytest.mark.parametrize("emoji", ["😀", "😂", "😕", "😍"])
@pytest.mark.parametrize("emoji", [
"😀",
"😂",
"😕",
"😍",
pytest.mark.xfail("🙃", raises=FBchatFacebookError),
pytest.mark.xfail("not an emoji", raises=FBchatFacebookError)
])
def test_change_emoji(client, catch_event, compare, emoji):
with catch_event("onEmojiChange") as x:
client.changeThreadEmoji(emoji)
assert compare(x, new_emoji=emoji)
@pytest.mark.xfail(raises=FBchatFacebookError)
@pytest.mark.parametrize("emoji", ["🙃", "not an emoji"])
def test_change_emoji_invalid(client, emoji):
client.changeThreadEmoji(emoji)
def test_change_image_local(client1, group, catch_event):
url = path.join(path.dirname(__file__), "resources", "image.png")
with catch_event("onImageChange") as x:
image_id = client1.changeGroupImageLocal(url, group["id"])
assert subset(x.res, new_image=image_id, author_id=client1.uid, thread_id=group["id"])
# To be changed when merged into master
def test_change_image_remote(client1, group, catch_event):
url = "https://github.com/carpedm20/fbchat/raw/master/tests/image.png"
with catch_event("onImageChange") as x:
image_id = client1.changeGroupImageRemote(url, group["id"])
assert subset(x.res, new_image=image_id, author_id=client1.uid, thread_id=group["id"])
@pytest.mark.parametrize(
@@ -83,9 +111,7 @@ def test_change_color(client, catch_event, compare, color):
assert compare(x, new_color=color)
@pytest.mark.xfail(
raises=FBchatFacebookError, strict=False, reason="Should fail, but doesn't"
)
@pytest.mark.xfail(raises=FBchatFacebookError, reason="Should fail, but doesn't")
def test_change_color_invalid(client):
class InvalidColor:
value = "#0077ff"
@@ -98,3 +124,31 @@ def test_typing_status(client, catch_event, compare, status):
with catch_event("onTyping") as x:
client.setTypingStatus(status)
assert compare(x, status=status)
@pytest.mark.parametrize('require_admin_approval', [True, False])
def test_change_approval_mode(client1, group, catch_event, require_admin_approval):
with catch_event("onApprovalModeChange") as x:
client1.changeGroupApprovalMode(require_admin_approval, group["id"])
assert subset(
x.res,
approval_mode=require_admin_approval,
author_id=client1.uid,
thread_id=group["id"],
)
@pytest.mark.parametrize("mute_time", [0, 10, 100, 1000, -1])
def test_mute_thread(client, mute_time):
assert client.muteThread(mute_time)
assert client.unmuteThread()
def test_mute_thread_reactions(client):
assert client.muteThreadReactions()
assert client.unmuteThreadReactions()
def test_mute_thread_mentions(client):
assert client.muteThreadMentions()
assert client.unmuteThreadMentions()

View File

@@ -5,17 +5,46 @@ from __future__ import unicode_literals
import threading
import logging
import six
import pytest
from os import environ
from random import randrange
from contextlib import contextmanager
from six import viewitems
from fbchat import Client
from fbchat.models import ThreadType
from fbchat.models import ThreadType, EmojiSize, FBchatFacebookError, Sticker
log = logging.getLogger("fbchat.tests").addHandler(logging.NullHandler())
EMOJI_LIST = [
("😆", EmojiSize.SMALL),
("😆", EmojiSize.MEDIUM),
("😆", EmojiSize.LARGE),
# These fail in `catch_event` because the emoji is made into a sticker
# This should be fixed
pytest.mark.xfail((None, EmojiSize.SMALL)),
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
pytest.mark.xfail((None, EmojiSize.LARGE)),
]
STICKER_LIST = [
Sticker("767334476626295"),
pytest.mark.xfail(Sticker("0"), raises=FBchatFacebookError),
pytest.mark.xfail(Sticker(None), raises=FBchatFacebookError),
]
TEXT_LIST = [
"test_send",
"😆",
"\\\n\t%?&'\"",
"ˁҭʚ¹Ʋջوװ՞ޱɣࠚԹБɑȑңКએ֭ʗыԈٌʼőԈ×௴nચϚࠖణٔє܅Ԇޑط",
"a" * 20000, # Maximum amount of characters you can send
pytest.mark.xfail("a" * 20001, raises=FBchatFacebookError),
pytest.mark.xfail(None, raises=FBchatFacebookError),
]
class ClientThread(threading.Thread):
def __init__(self, client, *args, **kwargs):
self.client = client
@@ -79,6 +108,7 @@ def load_client(n, cache):
load_variable("client{}_password".format(n), cache),
user_agent='Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36',
session_cookies=cache.get("client{}_session".format(n), None),
max_tries=1,
)
yield client
cache.set("client{}_session".format(n), client.getSession())