Format using black (without string normalization)

This commit is contained in:
Mads Marquart
2019-01-31 20:54:32 +01:00
parent f25faec108
commit d20fc3b9ce
17 changed files with 2150 additions and 720 deletions

View File

@@ -19,6 +19,7 @@
import os
import sys
sys.path.insert(0, os.path.abspath('..'))
import fbchat
@@ -39,7 +40,7 @@ extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.todo',
'sphinx.ext.viewcode'
'sphinx.ext.viewcode',
]
# Add any paths that contain templates here, relative to this directory.
@@ -121,15 +122,12 @@ latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
@@ -138,20 +136,14 @@ latex_elements = {
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, project + '.tex', title,
author, 'manual'),
]
latex_documents = [(master_doc, project + '.tex', title, author, 'manual')]
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, project, title,
[author], 1)
]
man_pages = [(master_doc, project, title, [author], 1)]
# -- Options for Texinfo output -------------------------------------------
@@ -160,9 +152,7 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, project, title,
author, project, description,
'Miscellaneous'),
(master_doc, project, title, author, project, description, 'Miscellaneous')
]
@@ -170,7 +160,6 @@ texinfo_documents = [
intersphinx_mapping = {'https://docs.python.org/3/': None}
add_function_parentheses = False
html_theme_options = {
@@ -178,12 +167,10 @@ html_theme_options = {
'github_user': 'carpedm20',
'github_repo': project,
'github_banner': True,
'show_related': False
'show_related': False,
}
html_sidebars = {
'**': ['sidebar.html', 'searchbox.html']
}
html_sidebars = {'**': ['sidebar.html', 'searchbox.html']}
html_show_sphinx = False
html_show_sourcelink = False

View File

@@ -14,5 +14,6 @@ class EchoBot(Client):
if author_id != self.uid:
self.send(message_object, thread_id=thread_id, thread_type=thread_type)
client = EchoBot("<email>", "<password>")
client.listen()

View File

@@ -12,22 +12,48 @@ thread_type = ThreadType.GROUP
client.send(Message(text='<message>'), thread_id=thread_id, thread_type=thread_type)
# Will send the default `like` emoji
client.send(Message(emoji_size=EmojiSize.LARGE), thread_id=thread_id, thread_type=thread_type)
client.send(
Message(emoji_size=EmojiSize.LARGE), thread_id=thread_id, thread_type=thread_type
)
# Will send the emoji `👍`
client.send(Message(text='👍', emoji_size=EmojiSize.LARGE), thread_id=thread_id, thread_type=thread_type)
client.send(
Message(text='👍', emoji_size=EmojiSize.LARGE),
thread_id=thread_id,
thread_type=thread_type,
)
# Will send the sticker with ID `767334476626295`
client.send(Message(sticker=Sticker('767334476626295')), thread_id=thread_id, thread_type=thread_type)
client.send(
Message(sticker=Sticker('767334476626295')),
thread_id=thread_id,
thread_type=thread_type,
)
# Will send a message with a mention
client.send(Message(text='This is a @mention', mentions=[Mention(thread_id, offset=10, length=8)]), thread_id=thread_id, thread_type=thread_type)
client.send(
Message(
text='This is a @mention', mentions=[Mention(thread_id, offset=10, length=8)]
),
thread_id=thread_id,
thread_type=thread_type,
)
# Will send the image located at `<image path>`
client.sendLocalImage('<image path>', message=Message(text='This is a local image'), thread_id=thread_id, thread_type=thread_type)
client.sendLocalImage(
'<image path>',
message=Message(text='This is a local image'),
thread_id=thread_id,
thread_type=thread_type,
)
# Will download the image at the url `<image url>`, and then send it
client.sendRemoteImage('<image url>', message=Message(text='This is a remote image'), thread_id=thread_id, thread_type=thread_type)
client.sendRemoteImage(
'<image url>',
message=Message(text='This is a remote image'),
thread_id=thread_id,
thread_type=thread_type,
)
# Only do these actions if the thread is a group
@@ -39,17 +65,23 @@ if thread_type == ThreadType.GROUP:
client.addUsersToGroup('<user id>', thread_id=thread_id)
# Will add the users with IDs `<1st user id>`, `<2nd user id>` and `<3th user id>` to the thread
client.addUsersToGroup(['<1st user id>', '<2nd user id>', '<3rd user id>'], thread_id=thread_id)
client.addUsersToGroup(
['<1st user id>', '<2nd user id>', '<3rd user id>'], thread_id=thread_id
)
# Will change the nickname of the user `<user_id>` to `<new nickname>`
client.changeNickname('<new nickname>', '<user id>', thread_id=thread_id, thread_type=thread_type)
client.changeNickname(
'<new nickname>', '<user id>', thread_id=thread_id, thread_type=thread_type
)
# Will change the title of the thread to `<title>`
client.changeThreadTitle('<title>', thread_id=thread_id, thread_type=thread_type)
# Will set the typing status of the thread to `TYPING`
client.setTypingStatus(TypingStatus.TYPING, thread_id=thread_id, thread_type=thread_type)
client.setTypingStatus(
TypingStatus.TYPING, thread_id=thread_id, thread_type=thread_type
)
# Will change the thread color to `MESSENGER_BLUE`
client.changeThreadColor(ThreadColor.MESSENGER_BLUE, thread_id=thread_id)

View File

@@ -14,18 +14,23 @@ old_nicknames = {
'12345678901': "User nr. 1's nickname",
'12345678902': "User nr. 2's nickname",
'12345678903': "User nr. 3's nickname",
'12345678904': "User nr. 4's nickname"
'12345678904': "User nr. 4's nickname",
}
class KeepBot(Client):
def onColorChange(self, author_id, new_color, thread_id, thread_type, **kwargs):
if old_thread_id == thread_id and old_color != new_color:
log.info("{} changed the thread color. It will be changed back".format(author_id))
log.info(
"{} changed the thread color. It will be changed back".format(author_id)
)
self.changeThreadColor(old_color, thread_id=thread_id)
def onEmojiChange(self, author_id, new_emoji, thread_id, thread_type, **kwargs):
if old_thread_id == thread_id and new_emoji != old_emoji:
log.info("{} changed the thread emoji. It will be changed back".format(author_id))
log.info(
"{} changed the thread emoji. It will be changed back".format(author_id)
)
self.changeThreadEmoji(old_emoji, thread_id=thread_id)
def onPeopleAdded(self, added_ids, author_id, thread_id, **kwargs):
@@ -36,19 +41,43 @@ class KeepBot(Client):
def onPersonRemoved(self, removed_id, author_id, thread_id, **kwargs):
# No point in trying to add ourself
if old_thread_id == thread_id and removed_id != self.uid and author_id != self.uid:
if (
old_thread_id == thread_id
and removed_id != self.uid
and author_id != self.uid
):
log.info("{} got removed. They will be re-added".format(removed_id))
self.addUsersToGroup(removed_id, thread_id=thread_id)
def onTitleChange(self, author_id, new_title, thread_id, thread_type, **kwargs):
if old_thread_id == thread_id and old_title != new_title:
log.info("{} changed the thread title. It will be changed back".format(author_id))
self.changeThreadTitle(old_title, thread_id=thread_id, thread_type=thread_type)
log.info(
"{} changed the thread title. It will be changed back".format(author_id)
)
self.changeThreadTitle(
old_title, thread_id=thread_id, thread_type=thread_type
)
def onNicknameChange(
self, author_id, changed_for, new_nickname, thread_id, thread_type, **kwargs
):
if (
old_thread_id == thread_id
and changed_for in old_nicknames
and old_nicknames[changed_for] != new_nickname
):
log.info(
"{} changed {}'s' nickname. It will be changed back".format(
author_id, changed_for
)
)
self.changeNickname(
old_nicknames[changed_for],
changed_for,
thread_id=thread_id,
thread_type=thread_type,
)
def onNicknameChange(self, author_id, changed_for, new_nickname, thread_id, thread_type, **kwargs):
if old_thread_id == thread_id and changed_for in old_nicknames and old_nicknames[changed_for] != new_nickname:
log.info("{} changed {}'s' nickname. It will be changed back".format(author_id, changed_for))
self.changeNickname(old_nicknames[changed_for], changed_for, thread_id=thread_id, thread_type=thread_type)
client = KeepBot("<email>", "<password>")
client.listen()

View File

@@ -3,6 +3,7 @@
from fbchat import log, Client
from fbchat.models import *
class RemoveBot(Client):
def onMessage(self, author_id, message_object, thread_id, thread_type, **kwargs):
# We can only kick people from group chats, so no need to try if it's a user chat
@@ -11,7 +12,14 @@ class RemoveBot(Client):
self.removeUserFromGroup(author_id, thread_id=thread_id)
else:
# Sends the data to the inherited onMessage, so that we can still see when a message is recieved
super(RemoveBot, self).onMessage(author_id=author_id, message_object=message_object, thread_id=thread_id, thread_type=thread_type, **kwargs)
super(RemoveBot, self).onMessage(
author_id=author_id,
message_object=message_object,
thread_id=thread_id,
thread_type=thread_type,
**kwargs
)
client = RemoveBot("<email>", "<password>")
client.listen()

View File

@@ -19,6 +19,4 @@ __license__ = 'BSD 3-Clause'
__author__ = 'Taehoon Kim; Moreels Pieter-Jan; Mads Marquart'
__email__ = 'carpedm20@gmail.com'
__all__ = [
'Client',
]
__all__ = ['Client']

File diff suppressed because it is too large Load Diff

View File

@@ -10,6 +10,7 @@ from .utils import *
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
class ConcatJSONDecoder(json.JSONDecoder):
def decode(self, s, _w=WHITESPACE.match):
s_len = len(s)
@@ -21,8 +22,11 @@ class ConcatJSONDecoder(json.JSONDecoder):
end = _w(s, end).end()
objs.append(obj)
return objs
# End shameless copy
def graphql_color_to_enum(color):
if color is None:
return None
@@ -32,6 +36,7 @@ def graphql_color_to_enum(color):
color_value = '#{}'.format(color.lower())
return enum_extend_if_invalid(ThreadColor, color_value)
def get_customization_info(thread):
if thread is None or thread.get('customization_info') is None:
return {}
@@ -39,9 +44,13 @@ def get_customization_info(thread):
rtn = {
'emoji': info.get('emoji'),
'color': graphql_color_to_enum(info.get('outgoing_bubble_color'))
'color': graphql_color_to_enum(info.get('outgoing_bubble_color')),
}
if thread.get('thread_type') == 'GROUP' or thread.get('is_group_thread') or thread.get('thread_key', {}).get('thread_fbid'):
if (
thread.get('thread_type') == 'GROUP'
or thread.get('is_group_thread')
or thread.get('thread_key', {}).get('thread_fbid')
):
rtn['nicknames'] = {}
for k in info.get('participant_customizations', []):
rtn['nicknames'][k['participant_id']] = k.get('nickname')
@@ -64,9 +73,7 @@ def get_customization_info(thread):
def graphql_to_sticker(s):
if not s:
return None
sticker = Sticker(
uid=s['id']
)
sticker = Sticker(uid=s['id'])
if s.get('pack'):
sticker.pack = s['pack'].get('id')
if s.get('sprite_image'):
@@ -83,19 +90,21 @@ def graphql_to_sticker(s):
sticker.label = s['label']
return sticker
def graphql_to_attachment(a):
_type = a['__typename']
if _type in ['MessageImage', 'MessageAnimatedImage']:
return ImageAttachment(
original_extension=a.get('original_extension') or (a['filename'].split('-')[0] if a.get('filename') else None),
original_extension=a.get('original_extension')
or (a['filename'].split('-')[0] if a.get('filename') else None),
width=a.get('original_dimensions', {}).get('width'),
height=a.get('original_dimensions', {}).get('height'),
is_animated=_type=='MessageAnimatedImage',
is_animated=_type == 'MessageAnimatedImage',
thumbnail_url=a.get('thumbnail', {}).get('uri'),
preview=a.get('preview') or a.get('preview_image'),
large_preview=a.get('large_preview'),
animated_preview=a.get('animated_image'),
uid=a.get('legacy_attachment_id')
uid=a.get('legacy_attachment_id'),
)
elif _type == 'MessageVideo':
return VideoAttachment(
@@ -106,26 +115,25 @@ def graphql_to_attachment(a):
small_image=a.get('chat_image'),
medium_image=a.get('inbox_image'),
large_image=a.get('large_image'),
uid=a.get('legacy_attachment_id')
uid=a.get('legacy_attachment_id'),
)
elif _type == 'MessageAudio':
return AudioAttachment(
filename=a.get('filename'),
url=a.get('playable_url'),
duration=a.get('playable_duration_in_ms'),
audio_type=a.get('audio_type')
audio_type=a.get('audio_type'),
)
elif _type == 'MessageFile':
return FileAttachment(
url=a.get('url'),
name=a.get('filename'),
is_malicious=a.get('is_malicious'),
uid=a.get('message_file_fbid')
uid=a.get('message_file_fbid'),
)
else:
return Attachment(
uid=a.get('legacy_attachment_id')
)
return Attachment(uid=a.get('legacy_attachment_id'))
def graphql_to_extensible_attachment(a):
story = a.get('story_attachment')
@@ -134,7 +142,9 @@ def graphql_to_extensible_attachment(a):
if target:
_type = target['__typename']
if _type == 'MessageLocation':
latitude, longitude = get_url_parameter(get_url_parameter(story['url'], 'u'), 'where1').split(", ")
latitude, longitude = get_url_parameter(
get_url_parameter(story['url'], 'u'), 'where1'
).split(", ")
rtn = LocationAttachment(
uid=int(story['deduplication_key']),
latitude=float(latitude),
@@ -149,10 +159,16 @@ def graphql_to_extensible_attachment(a):
elif _type == 'MessageLiveLocation':
rtn = LiveLocationAttachment(
uid=int(story['target']['live_location_id']),
latitude=story['target']['coordinate']['latitude'] if story['target'].get('coordinate') else None,
longitude=story['target']['coordinate']['longitude'] if story['target'].get('coordinate') else None,
latitude=story['target']['coordinate']['latitude']
if story['target'].get('coordinate')
else None,
longitude=story['target']['coordinate']['longitude']
if story['target'].get('coordinate')
else None,
name=story['title_with_entities']['text'],
expiration_time=story['target']['expiration_time'] if story['target'].get('expiration_time') else None,
expiration_time=story['target']['expiration_time']
if story['target'].get('expiration_time')
else None,
is_expired=story['target']['is_expired'],
)
if story['media']:
@@ -164,22 +180,41 @@ def graphql_to_extensible_attachment(a):
elif _type in ['ExternalUrl', 'Story']:
return ShareAttachment(
uid=a.get('legacy_attachment_id'),
author=story['target']['actors'][0]['id'] if story['target'].get('actors') else None,
author=story['target']['actors'][0]['id']
if story['target'].get('actors')
else None,
url=story['url'],
original_url=get_url_parameter(story['url'], 'u') if "/l.php?u=" in story['url'] else story['url'],
original_url=get_url_parameter(story['url'], 'u')
if "/l.php?u=" in story['url']
else story['url'],
title=story['title_with_entities'].get('text'),
description=story['description'].get('text') if story.get('description') else None,
description=story['description'].get('text')
if story.get('description')
else None,
source=story['source']['text'],
image_url=story['media']['image']['uri'] if story.get('media') else None,
original_image_url=(get_url_parameter(story['media']['image']['uri'], 'url') if "/safe_image.php" in story['media']['image']['uri'] else story['media']['image']['uri']) if story.get('media') else None,
image_width=story['media']['image']['width'] if story.get('media') else None,
image_height=story['media']['image']['height'] if story.get('media') else None,
attachments=[graphql_to_subattachment(attachment) for attachment in story.get('subattachments')],
image_url=story['media']['image']['uri']
if story.get('media')
else None,
original_image_url=(
get_url_parameter(story['media']['image']['uri'], 'url')
if "/safe_image.php" in story['media']['image']['uri']
else story['media']['image']['uri']
)
if story.get('media')
else None,
image_width=story['media']['image']['width']
if story.get('media')
else None,
image_height=story['media']['image']['height']
if story.get('media')
else None,
attachments=[
graphql_to_subattachment(attachment)
for attachment in story.get('subattachments')
],
)
else:
return UnsentMessage(
uid=a.get('legacy_attachment_id'),
)
return UnsentMessage(uid=a.get('legacy_attachment_id'))
def graphql_to_subattachment(a):
@@ -192,25 +227,32 @@ def graphql_to_subattachment(a):
uid=a['target'].get('video_id'),
)
def graphql_to_live_location(a):
return LiveLocationAttachment(
uid=a['id'],
latitude=a['coordinate']['latitude'] / (10 ** 8) if not a.get('stopReason') else None,
longitude=a['coordinate']['longitude'] / (10 ** 8) if not a.get('stopReason') else None,
latitude=a['coordinate']['latitude'] / (10 ** 8)
if not a.get('stopReason')
else None,
longitude=a['coordinate']['longitude'] / (10 ** 8)
if not a.get('stopReason')
else None,
name=a.get('locationTitle'),
expiration_time=a['expirationTime'],
is_expired=bool(a.get('stopReason')),
)
def graphql_to_poll(a):
rtn = Poll(
title=a.get('title') if a.get('title') else a.get('text'),
options=[graphql_to_poll_option(m) for m in a.get('options')]
options=[graphql_to_poll_option(m) for m in a.get('options')],
)
rtn.uid = int(a["id"])
rtn.options_count = a.get("total_count")
return rtn
def graphql_to_poll_option(a):
if a.get('viewer_has_voted') is None:
vote = None
@@ -218,21 +260,27 @@ def graphql_to_poll_option(a):
vote = a['viewer_has_voted']
else:
vote = a['viewer_has_voted'] == 'true'
rtn = PollOption(
text=a.get('text'),
vote=vote
)
rtn = PollOption(text=a.get('text'), vote=vote)
rtn.uid = int(a["id"])
rtn.voters = [m.get('node').get('id') for m in a.get('voters').get('edges')] if isinstance(a.get('voters'), dict) else a.get('voters')
rtn.votes_count = a.get('voters').get('count') if isinstance(a.get('voters'), dict) else a.get('total_count')
rtn.voters = (
[m.get('node').get('id') for m in a.get('voters').get('edges')]
if isinstance(a.get('voters'), dict)
else a.get('voters')
)
rtn.votes_count = (
a.get('voters').get('count')
if isinstance(a.get('voters'), dict)
else a.get('total_count')
)
return rtn
def graphql_to_plan(a):
if a.get('event_members'):
rtn = Plan(
time=a.get('event_time'),
title=a.get('title'),
location=a.get('location_name')
location=a.get('location_name'),
)
if a.get('location_id') != 0:
rtn.location_id = str(a.get('location_id'))
@@ -248,7 +296,7 @@ def graphql_to_plan(a):
time=a.get('event_time'),
title=a.get('event_title'),
location=a.get('event_location_name'),
location_id=a.get('event_location_id')
location_id=a.get('event_location_id'),
)
rtn.uid = a.get('event_id')
rtn.author_id = a.get('event_creator_id')
@@ -257,25 +305,40 @@ def graphql_to_plan(a):
rtn = Plan(
time=a.get('time'),
title=a.get('event_title'),
location=a.get('location_name')
location=a.get('location_name'),
)
rtn.uid = a.get('id')
rtn.author_id = a.get('lightweight_event_creator').get('id')
guests = a.get('event_reminder_members').get('edges')
rtn.going = [m.get('node').get('id') for m in guests if m.get('guest_list_state') == "GOING"]
rtn.declined = [m.get('node').get('id') for m in guests if m.get('guest_list_state') == "DECLINED"]
rtn.invited = [m.get('node').get('id') for m in guests if m.get('guest_list_state') == "INVITED"]
rtn.going = [
m.get('node').get('id') for m in guests if m.get('guest_list_state') == "GOING"
]
rtn.declined = [
m.get('node').get('id')
for m in guests
if m.get('guest_list_state') == "DECLINED"
]
rtn.invited = [
m.get('node').get('id')
for m in guests
if m.get('guest_list_state') == "INVITED"
]
return rtn
def graphql_to_quick_reply(q, is_response=False):
data = dict()
_type = q.get('content_type').lower()
if q.get('payload'): data["payload"] = q["payload"]
if q.get('data'): data["data"] = q["data"]
if q.get('image_url') and _type is not QuickReplyLocation._type: data["image_url"] = q["image_url"]
if q.get('payload'):
data["payload"] = q["payload"]
if q.get('data'):
data["data"] = q["data"]
if q.get('image_url') and _type is not QuickReplyLocation._type:
data["image_url"] = q["image_url"]
data["is_response"] = is_response
if _type == QuickReplyText._type:
if q.get('title') is not None: data["title"] = q["title"]
if q.get('title') is not None:
data["title"] = q["title"]
rtn = QuickReplyText(**data)
elif _type == QuickReplyLocation._type:
rtn = QuickReplyLocation(**data)
@@ -285,6 +348,7 @@ def graphql_to_quick_reply(q, is_response=False):
rtn = QuickReplyEmail(**data)
return rtn
def graphql_to_message(message):
if message.get('message_sender') is None:
message['message_sender'] = {}
@@ -292,9 +356,16 @@ def graphql_to_message(message):
message['message'] = {}
rtn = Message(
text=message.get('message').get('text'),
mentions=[Mention(m.get('entity', {}).get('id'), offset=m.get('offset'), length=m.get('length')) for m in message.get('message').get('ranges', [])],
mentions=[
Mention(
m.get('entity', {}).get('id'),
offset=m.get('offset'),
length=m.get('length'),
)
for m in message.get('message').get('ranges', [])
],
emoji_size=get_emojisize_from_tags(message.get('tags_list')),
sticker=graphql_to_sticker(message.get('sticker'))
sticker=graphql_to_sticker(message.get('sticker')),
)
rtn.uid = str(message.get('message_id'))
rtn.author = str(message.get('message_sender').get('id'))
@@ -307,13 +378,18 @@ def graphql_to_message(message):
for r in message.get('message_reactions')
}
if message.get('blob_attachments') is not None:
rtn.attachments = [graphql_to_attachment(attachment) for attachment in message['blob_attachments']]
rtn.attachments = [
graphql_to_attachment(attachment)
for attachment in message['blob_attachments']
]
if message.get('platform_xmd_encoded'):
quick_replies = json.loads(message['platform_xmd_encoded']).get('quick_replies')
if isinstance(quick_replies, list):
rtn.quick_replies = [graphql_to_quick_reply(q) for q in quick_replies]
elif isinstance(quick_replies, dict):
rtn.quick_replies = [graphql_to_quick_reply(quick_replies, is_response=True)]
rtn.quick_replies = [
graphql_to_quick_reply(quick_replies, is_response=True)
]
if message.get('extensible_attachment') is not None:
attachment = graphql_to_extensible_attachment(message['extensible_attachment'])
if isinstance(attachment, UnsentMessage):
@@ -322,13 +398,18 @@ def graphql_to_message(message):
rtn.attachments.append(attachment)
return rtn
def graphql_to_user(user):
if user.get('profile_picture') is None:
user['profile_picture'] = {}
c_info = get_customization_info(user)
plan = None
if user.get('event_reminders'):
plan = graphql_to_plan(user['event_reminders']['nodes'][0]) if user['event_reminders'].get('nodes') else None
plan = (
graphql_to_plan(user['event_reminders']['nodes'][0])
if user['event_reminders'].get('nodes')
else None
)
return User(
user['id'],
url=user.get('url'),
@@ -347,6 +428,7 @@ def graphql_to_user(user):
plan=plan,
)
def graphql_to_thread(thread):
if thread['thread_type'] == 'GROUP':
return graphql_to_group(thread)
@@ -354,11 +436,17 @@ def graphql_to_thread(thread):
if thread.get('big_image_src') is None:
thread['big_image_src'] = {}
c_info = get_customization_info(thread)
participants = [node['messaging_actor'] for node in thread['all_participants']['nodes']]
user = next(p for p in participants if p['id'] == thread['thread_key']['other_user_id'])
participants = [
node['messaging_actor'] for node in thread['all_participants']['nodes']
]
user = next(
p for p in participants if p['id'] == thread['thread_key']['other_user_id']
)
last_message_timestamp = None
if 'last_message' in thread:
last_message_timestamp = thread['last_message']['nodes'][0]['timestamp_precise']
last_message_timestamp = thread['last_message']['nodes'][0][
'timestamp_precise'
]
first_name = user.get('short_name')
if first_name is None:
@@ -368,7 +456,11 @@ def graphql_to_thread(thread):
plan = None
if thread.get('event_reminders'):
plan = graphql_to_plan(thread['event_reminders']['nodes'][0]) if thread['event_reminders'].get('nodes') else None
plan = (
graphql_to_plan(thread['event_reminders']['nodes'][0])
if thread['event_reminders'].get('nodes')
else None
)
return User(
user['id'],
@@ -389,7 +481,12 @@ def graphql_to_thread(thread):
plan=plan,
)
else:
raise FBchatException('Unknown thread type: {}, with data: {}'.format(thread.get('thread_type'), thread))
raise FBchatException(
'Unknown thread type: {}, with data: {}'.format(
thread.get('thread_type'), thread
)
)
def graphql_to_group(group):
if group.get('image') is None:
@@ -400,17 +497,32 @@ def graphql_to_group(group):
last_message_timestamp = group['last_message']['nodes'][0]['timestamp_precise']
plan = None
if group.get('event_reminders'):
plan = graphql_to_plan(group['event_reminders']['nodes'][0]) if group['event_reminders'].get('nodes') else None
plan = (
graphql_to_plan(group['event_reminders']['nodes'][0])
if group['event_reminders'].get('nodes')
else None
)
return Group(
group['thread_key']['thread_fbid'],
participants=set([node['messaging_actor']['id'] for node in group['all_participants']['nodes']]),
participants=set(
[
node['messaging_actor']['id']
for node in group['all_participants']['nodes']
]
),
nicknames=c_info.get('nicknames'),
color=c_info.get('color'),
emoji=c_info.get('emoji'),
admins = set([node.get('id') for node in group.get('thread_admins')]),
approval_mode = bool(group.get('approval_mode')) if group.get('approval_mode') is not None else None,
approval_requests = set(node["requester"]['id'] for node in group['group_approval_queue']['nodes']) if group.get('group_approval_queue') else None,
join_link = group['joinable_mode'].get('link'),
admins=set([node.get('id') for node in group.get('thread_admins')]),
approval_mode=bool(group.get('approval_mode'))
if group.get('approval_mode') is not None
else None,
approval_requests=set(
node["requester"]['id'] for node in group['group_approval_queue']['nodes']
)
if group.get('group_approval_queue')
else None,
join_link=group['joinable_mode'].get('link'),
photo=group['image'].get('uri'),
name=group.get('name'),
message_count=group.get('messages_count'),
@@ -418,6 +530,7 @@ def graphql_to_group(group):
plan=plan,
)
def graphql_to_page(page):
if page.get('profile_picture') is None:
page['profile_picture'] = {}
@@ -425,7 +538,11 @@ def graphql_to_page(page):
page['city'] = {}
plan = None
if page.get('event_reminders'):
plan = graphql_to_plan(page['event_reminders']['nodes'][0]) if page['event_reminders'].get('nodes') else None
plan = (
graphql_to_plan(page['event_reminders']['nodes'][0])
if page['event_reminders'].get('nodes')
else None
)
return Page(
page['id'],
url=page.get('url'),
@@ -437,6 +554,7 @@ def graphql_to_page(page):
plan=plan,
)
def graphql_queries_to_json(*queries):
"""
Queries should be a list of GraphQL objects
@@ -446,14 +564,15 @@ def graphql_queries_to_json(*queries):
rtn['q{}'.format(i)] = query.value
return json.dumps(rtn)
def graphql_response_to_json(content):
content = strip_to_json(content) # Usually only needed in some error cases
content = strip_to_json(content) # Usually only needed in some error cases
try:
j = json.loads(content, cls=ConcatJSONDecoder)
except Exception:
raise FBchatException('Error while parsing JSON: {}'.format(repr(content)))
rtn = [None]*(len(j))
rtn = [None] * (len(j))
for x in j:
if 'error_results' in x:
del rtn[-1]
@@ -470,25 +589,18 @@ def graphql_response_to_json(content):
return rtn
class GraphQL(object):
def __init__(self, query=None, doc_id=None, params=None):
if params is None:
params = {}
if query is not None:
self.value = {
'priority': 0,
'q': query,
'query_params': params
}
self.value = {'priority': 0, 'q': query, 'query_params': params}
elif doc_id is not None:
self.value = {
'doc_id': doc_id,
'query_params': params
}
self.value = {'doc_id': doc_id, 'query_params': params}
else:
raise FBchatUserError('A query or doc_id must be specified')
FRAGMENT_USER = """
QueryFragment User: User {
id,
@@ -582,7 +694,8 @@ class GraphQL(object):
}
"""
SEARCH_USER = """
SEARCH_USER = (
"""
Query SearchUser(<search> = '', <limit> = 10) {
entities_named(<search>) {
search_results.of_type(user).first(<limit>) as users {
@@ -592,9 +705,12 @@ class GraphQL(object):
}
}
}
""" + FRAGMENT_USER
"""
+ FRAGMENT_USER
)
SEARCH_GROUP = """
SEARCH_GROUP = (
"""
Query SearchGroup(<search> = '', <limit> = 10, <pic_size> = 32) {
viewer() {
message_threads.with_thread_name(<search>).last(<limit>) as groups {
@@ -604,9 +720,12 @@ class GraphQL(object):
}
}
}
""" + FRAGMENT_GROUP
"""
+ FRAGMENT_GROUP
)
SEARCH_PAGE = """
SEARCH_PAGE = (
"""
Query SearchPage(<search> = '', <limit> = 10) {
entities_named(<search>) {
search_results.of_type(page).first(<limit>) as pages {
@@ -616,9 +735,12 @@ class GraphQL(object):
}
}
}
""" + FRAGMENT_PAGE
"""
+ FRAGMENT_PAGE
)
SEARCH_THREAD = """
SEARCH_THREAD = (
"""
Query SearchThread(<search> = '', <limit> = 10) {
entities_named(<search>) {
search_results.first(<limit>) as threads {
@@ -631,4 +753,8 @@ class GraphQL(object):
}
}
}
""" + FRAGMENT_USER + FRAGMENT_GROUP + FRAGMENT_PAGE
"""
+ FRAGMENT_USER
+ FRAGMENT_GROUP
+ FRAGMENT_PAGE
)

View File

@@ -8,6 +8,7 @@ from string import Formatter
class FBchatException(Exception):
"""Custom exception thrown by fbchat. All exceptions in the fbchat module inherits this"""
class FBchatFacebookError(FBchatException):
#: The error code that Facebook returned
fb_error_code = None
@@ -15,16 +16,25 @@ class FBchatFacebookError(FBchatException):
fb_error_message = None
#: The status code that was sent in the http response (eg. 404) (Usually only set if not successful, aka. not 200)
request_status_code = None
def __init__(self, message, fb_error_code=None, fb_error_message=None, request_status_code=None):
def __init__(
self,
message,
fb_error_code=None,
fb_error_message=None,
request_status_code=None,
):
super(FBchatFacebookError, self).__init__(message)
"""Thrown by fbchat when Facebook returns an error"""
self.fb_error_code = str(fb_error_code)
self.fb_error_message = fb_error_message
self.request_status_code = request_status_code
class FBchatUserError(FBchatException):
"""Thrown by fbchat when wrong values are entered"""
class Thread(object):
#: The unique identifier of the thread. Can be used a `thread_id`. See :ref:`intro_threads` for more info
uid = None
@@ -40,7 +50,17 @@ class Thread(object):
message_count = None
#: Set :class:`Plan`
plan = None
def __init__(self, _type, uid, photo=None, name=None, last_message_timestamp=None, message_count=None, plan=None):
def __init__(
self,
_type,
uid,
photo=None,
name=None,
last_message_timestamp=None,
message_count=None,
plan=None,
):
"""Represents a Facebook thread"""
self.uid = str(uid)
self.type = _type
@@ -79,7 +99,21 @@ class User(Thread):
#: The default emoji
emoji = None
def __init__(self, uid, url=None, first_name=None, last_name=None, is_friend=None, gender=None, affinity=None, nickname=None, own_nickname=None, color=None, emoji=None, **kwargs):
def __init__(
self,
uid,
url=None,
first_name=None,
last_name=None,
is_friend=None,
gender=None,
affinity=None,
nickname=None,
own_nickname=None,
color=None,
emoji=None,
**kwargs
):
"""Represents a Facebook user. Inherits `Thread`"""
super(User, self).__init__(ThreadType.USER, uid, **kwargs)
self.url = url
@@ -112,7 +146,20 @@ class Group(Thread):
# Link for joining group
join_link = None
def __init__(self, uid, participants=None, nicknames=None, color=None, emoji=None, admins=None, approval_mode=None, approval_requests=None, join_link=None, privacy_mode=None, **kwargs):
def __init__(
self,
uid,
participants=None,
nicknames=None,
color=None,
emoji=None,
admins=None,
approval_mode=None,
approval_requests=None,
join_link=None,
privacy_mode=None,
**kwargs
):
"""Represents a Facebook group. Inherits `Thread`"""
super(Group, self).__init__(ThreadType.GROUP, uid, **kwargs)
if participants is None:
@@ -156,7 +203,16 @@ class Page(Thread):
#: The page's category
category = None
def __init__(self, uid, url=None, city=None, likes=None, sub_title=None, category=None, **kwargs):
def __init__(
self,
uid,
url=None,
city=None,
likes=None,
sub_title=None,
category=None,
**kwargs
):
"""Represents a Facebook page. Inherits `Thread`"""
super(Page, self).__init__(ThreadType.PAGE, uid, **kwargs)
self.url = url
@@ -196,7 +252,15 @@ class Message(object):
#: Whether the message is unsent (deleted for everyone)
unsent = None
def __init__(self, text=None, mentions=None, emoji_size=None, sticker=None, attachments=None, quick_replies=None):
def __init__(
self,
text=None,
mentions=None,
emoji_size=None,
sticker=None,
attachments=None,
quick_replies=None,
):
"""Represents a Facebook message"""
self.text = text
if mentions is None:
@@ -218,7 +282,9 @@ class Message(object):
return self.__unicode__()
def __unicode__(self):
return '<Message ({}): {}, mentions={} emoji_size={} attachments={}>'.format(self.uid, repr(self.text), self.mentions, self.emoji_size, self.attachments)
return '<Message ({}): {}, mentions={} emoji_size={} attachments={}>'.format(
self.uid, repr(self.text), self.mentions, self.emoji_size, self.attachments
)
@classmethod
def formatMentions(cls, text, *args, **kwargs):
@@ -246,26 +312,34 @@ class Message(object):
offset += len(literal_text)
result += literal_text
if field_name is None: continue
if field_name is None:
continue
if field_name == '':
field_name = str(i)
i += 1
elif automatic and field_name.isdigit():
raise ValueError("cannot switch from automatic field numbering to manual field specification")
raise ValueError(
"cannot switch from automatic field numbering to manual field specification"
)
thread_id, name = f.get_field(field_name, args, kwargs)[0]
if format_spec: name = f.format_field(name, format_spec)
if conversion: name = f.convert_field(name, conversion)
if format_spec:
name = f.format_field(name, format_spec)
if conversion:
name = f.convert_field(name, conversion)
result += name
mentions.append(Mention(thread_id=thread_id, offset=offset, length=len(name)))
mentions.append(
Mention(thread_id=thread_id, offset=offset, length=len(name))
)
offset += len(name)
message = cls(text=result, mentions=mentions)
return message
class Attachment(object):
#: The attachment ID
uid = None
@@ -274,12 +348,13 @@ class Attachment(object):
"""Represents a Facebook attachment"""
self.uid = uid
class UnsentMessage(Attachment):
class UnsentMessage(Attachment):
def __init__(self, *args, **kwargs):
"""Represents an unsent message attachment"""
super(UnsentMessage, self).__init__(*args, **kwargs)
class Sticker(Attachment):
#: The sticker-pack's ID
pack = None
@@ -311,6 +386,7 @@ class Sticker(Attachment):
"""Represents a Facebook sticker that has been sent to a Facebook thread as an attachment"""
super(Sticker, self).__init__(*args, **kwargs)
class ShareAttachment(Attachment):
#: ID of the author of the shared post
author = None
@@ -335,7 +411,21 @@ class ShareAttachment(Attachment):
#: List of additional attachments
attachments = None
def __init__(self, author=None, url=None, original_url=None, title=None, description=None, source=None, image_url=None, original_image_url=None, image_width=None, image_height=None, attachments=None, **kwargs):
def __init__(
self,
author=None,
url=None,
original_url=None,
title=None,
description=None,
source=None,
image_url=None,
original_image_url=None,
image_width=None,
image_height=None,
attachments=None,
**kwargs
):
"""Represents a shared item (eg. URL) that has been sent as a Facebook attachment"""
super(ShareAttachment, self).__init__(**kwargs)
self.author = author
@@ -352,6 +442,7 @@ class ShareAttachment(Attachment):
attachments = []
self.attachments = attachments
class LocationAttachment(Attachment):
#: Latidute of the location
latitude = None
@@ -372,6 +463,7 @@ class LocationAttachment(Attachment):
self.latitude = latitude
self.longitude = longitude
class LiveLocationAttachment(LocationAttachment):
#: Name of the location
name = None
@@ -386,6 +478,7 @@ class LiveLocationAttachment(LocationAttachment):
self.expiration_time = expiration_time
self.is_expired = is_expired
class FileAttachment(Attachment):
#: Url where you can download the file
url = None
@@ -404,6 +497,7 @@ class FileAttachment(Attachment):
self.name = name
self.is_malicious = is_malicious
class AudioAttachment(Attachment):
#: Name of the file
filename = None
@@ -414,7 +508,9 @@ class AudioAttachment(Attachment):
#: Audio type
audio_type = None
def __init__(self, filename=None, url=None, duration=None, audio_type=None, **kwargs):
def __init__(
self, filename=None, url=None, duration=None, audio_type=None, **kwargs
):
"""Represents an audio file that has been sent as a Facebook attachment"""
super(AudioAttachment, self).__init__(**kwargs)
self.filename = filename
@@ -422,6 +518,7 @@ class AudioAttachment(Attachment):
self.duration = duration
self.audio_type = audio_type
class ImageAttachment(Attachment):
#: The extension of the original image (eg. 'png')
original_extension = None
@@ -457,7 +554,18 @@ class ImageAttachment(Attachment):
#: Height of the animated preview image
animated_preview_height = None
def __init__(self, original_extension=None, width=None, height=None, is_animated=None, thumbnail_url=None, preview=None, large_preview=None, animated_preview=None, **kwargs):
def __init__(
self,
original_extension=None,
width=None,
height=None,
is_animated=None,
thumbnail_url=None,
preview=None,
large_preview=None,
animated_preview=None,
**kwargs
):
"""
Represents an image that has been sent as a Facebook attachment
To retrieve the full image url, use: :func:`fbchat.Client.fetchImageUrl`,
@@ -492,6 +600,7 @@ class ImageAttachment(Attachment):
self.animated_preview_width = animated_preview.get('width')
self.animated_preview_height = animated_preview.get('height')
class VideoAttachment(Attachment):
#: Size of the original video in bytes
size = None
@@ -525,7 +634,18 @@ class VideoAttachment(Attachment):
#: Height of the large preview image
large_image_height = None
def __init__(self, size=None, width=None, height=None, duration=None, preview_url=None, small_image=None, medium_image=None, large_image=None, **kwargs):
def __init__(
self,
size=None,
width=None,
height=None,
duration=None,
preview_url=None,
small_image=None,
medium_image=None,
large_image=None,
**kwargs
):
"""Represents a video that has been sent as a Facebook attachment"""
super(VideoAttachment, self).__init__(**kwargs)
self.size = size
@@ -571,7 +691,10 @@ class Mention(object):
return self.__unicode__()
def __unicode__(self):
return '<Mention {}: offset={} length={}>'.format(self.thread_id, self.offset, self.length)
return '<Mention {}: offset={} length={}>'.format(
self.thread_id, self.offset, self.length
)
class QuickReply(object):
#: Payload of the quick reply
@@ -595,6 +718,7 @@ class QuickReply(object):
def __unicode__(self):
return '<{}: payload={!r}>'.format(self.__class__.__name__, self.payload)
class QuickReplyText(QuickReply):
#: Title of the quick reply
title = None
@@ -609,6 +733,7 @@ class QuickReplyText(QuickReply):
self.title = title
self.image_url = image_url
class QuickReplyLocation(QuickReply):
#: Type of the quick reply
_type = "location"
@@ -618,6 +743,7 @@ class QuickReplyLocation(QuickReply):
super(QuickReplyLocation, self).__init__(**kwargs)
self.is_response = False
class QuickReplyPhoneNumber(QuickReply):
#: URL of the quick reply image (optional)
image_url = None
@@ -629,6 +755,7 @@ class QuickReplyPhoneNumber(QuickReply):
super(QuickReplyPhoneNumber, self).__init__(**kwargs)
self.image_url = image_url
class QuickReplyEmail(QuickReply):
#: URL of the quick reply image (optional)
image_url = None
@@ -640,6 +767,7 @@ class QuickReplyEmail(QuickReply):
super(QuickReplyEmail, self).__init__(**kwargs)
self.image_url = image_url
class Poll(object):
#: ID of the poll
uid = None
@@ -659,7 +787,10 @@ class Poll(object):
return self.__unicode__()
def __unicode__(self):
return '<Poll ({}): {} options={}>'.format(self.uid, repr(self.title), self.options)
return '<Poll ({}): {} options={}>'.format(
self.uid, repr(self.title), self.options
)
class PollOption(object):
#: ID of the poll option
@@ -682,7 +813,10 @@ class PollOption(object):
return self.__unicode__()
def __unicode__(self):
return '<PollOption ({}): {} voters={}>'.format(self.uid, repr(self.text), self.voters)
return '<PollOption ({}): {} voters={}>'.format(
self.uid, repr(self.text), self.voters
)
class Plan(object):
#: ID of the plan
@@ -719,7 +853,14 @@ class Plan(object):
return self.__unicode__()
def __unicode__(self):
return '<Plan ({}): {} time={}, location={}, location_id={}>'.format(self.uid, repr(self.title), self.time, repr(self.location), repr(self.location_id))
return '<Plan ({}): {} time={}, location={}, location_id={}>'.format(
self.uid,
repr(self.title),
self.time,
repr(self.location),
repr(self.location_id),
)
class ActiveStatus(object):
#: Whether the user is active now
@@ -738,41 +879,55 @@ class ActiveStatus(object):
return self.__unicode__()
def __unicode__(self):
return '<ActiveStatus: active={} last_active={} in_game={}>'.format(self.active, self.last_active, self.in_game)
return '<ActiveStatus: active={} last_active={} in_game={}>'.format(
self.active, self.last_active, self.in_game
)
class Enum(aenum.Enum):
"""Used internally by fbchat to support enumerations"""
def __repr__(self):
# For documentation:
return '{}.{}'.format(type(self).__name__, self.name)
class ThreadType(Enum):
"""Used to specify what type of Facebook thread is being used. See :ref:`intro_threads` for more info"""
USER = 1
GROUP = 2
ROOM = 2
PAGE = 3
class ThreadLocation(Enum):
"""Used to specify where a thread is located (inbox, pending, archived, other)."""
INBOX = 'INBOX'
PENDING = 'PENDING'
ARCHIVED = 'ARCHIVED'
OTHER = 'OTHER'
class TypingStatus(Enum):
"""Used to specify whether the user is typing or has stopped typing"""
STOPPED = 0
TYPING = 1
class EmojiSize(Enum):
"""Used to specify the size of a sent emoji"""
LARGE = '369239383222810'
MEDIUM = '369239343222814'
SMALL = '369239263222822'
class ThreadColor(Enum):
"""Used to specify a thread colors"""
MESSENGER_BLUE = '#0084ff'
VIKING = '#44bec7'
GOLDEN_POPPY = '#ffc300'
@@ -789,8 +944,10 @@ class ThreadColor(Enum):
BRILLIANT_ROSE = '#ff5ca1'
BILOBA_FLOWER = '#a695c7'
class MessageReaction(Enum):
"""Used to specify a message reaction"""
LOVE = '😍'
SMILE = '😆'
WOW = '😮'

View File

@@ -16,10 +16,12 @@ from .models import *
try:
from urllib.parse import urlencode, parse_qs, urlparse
basestring = (str, bytes)
except ImportError:
from urllib import urlencode
from urlparse import parse_qs, urlparse
basestring = basestring
# Python 2's `input` executes the input, whereas `raw_input` just returns the input
@@ -42,7 +44,7 @@ USER_AGENTS = [
"Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6"
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
]
LIKES = {
@@ -51,7 +53,7 @@ LIKES = {
'small': EmojiSize.SMALL,
'l': EmojiSize.LARGE,
'm': EmojiSize.MEDIUM,
's': EmojiSize.SMALL
's': EmojiSize.SMALL,
}
@@ -69,24 +71,25 @@ GENDERS = {
9: 'male_plural',
10: 'neuter_plural',
11: 'unknown_plural',
# For graphql requests
'UNKNOWN': 'unknown',
'FEMALE': 'female_singular',
'MALE': 'male_singular',
#'': 'female_singular_guess',
#'': 'male_singular_guess',
#'': 'mixed',
# '': 'female_singular_guess',
# '': 'male_singular_guess',
# '': 'mixed',
'NEUTER': 'neuter_singular',
#'': 'unknown_singular',
#'': 'female_plural',
#'': 'male_plural',
#'': 'neuter_plural',
#'': 'unknown_plural',
# '': 'unknown_singular',
# '': 'female_plural',
# '': 'male_plural',
# '': 'neuter_plural',
# '': 'unknown_plural',
}
class ReqUrl(object):
"""A class containing all urls used by `fbchat`"""
SEARCH = "https://www.facebook.com/ajax/typeahead/search.php"
LOGIN = "https://m.facebook.com/login.php?login_attempt=1"
SEND = "https://www.facebook.com/messaging/send/"
@@ -94,8 +97,12 @@ class ReqUrl(object):
UNSEEN_THREADS = "https://www.facebook.com/mercury/unseen_thread_ids/"
THREADS = "https://www.facebook.com/ajax/mercury/threadlist_info.php"
MOVE_THREAD = "https://www.facebook.com/ajax/mercury/move_thread.php"
ARCHIVED_STATUS = "https://www.facebook.com/ajax/mercury/change_archived_status.php?dpr=1"
PINNED_STATUS = "https://www.facebook.com/ajax/mercury/change_pinned_status.php?dpr=1"
ARCHIVED_STATUS = (
"https://www.facebook.com/ajax/mercury/change_archived_status.php?dpr=1"
)
PINNED_STATUS = (
"https://www.facebook.com/ajax/mercury/change_pinned_status.php?dpr=1"
)
MESSAGES = "https://www.facebook.com/ajax/mercury/thread_info.php"
READ_STATUS = "https://www.facebook.com/ajax/mercury/change_read_status.php"
DELIVERED = "https://www.facebook.com/ajax/mercury/delivery_receipts.php"
@@ -135,8 +142,12 @@ class ReqUrl(object):
DELETE_THREAD = "https://www.facebook.com/ajax/mercury/delete_thread.php?dpr=1"
DELETE_MESSAGES = "https://www.facebook.com/ajax/mercury/delete_messages.php?dpr=1"
MUTE_THREAD = "https://www.facebook.com/ajax/mercury/change_mute_thread.php?dpr=1"
MUTE_REACTIONS = "https://www.facebook.com/ajax/mercury/change_reactions_mute_thread/?dpr=1"
MUTE_MENTIONS = "https://www.facebook.com/ajax/mercury/change_mentions_mute_thread/?dpr=1"
MUTE_REACTIONS = (
"https://www.facebook.com/ajax/mercury/change_reactions_mute_thread/?dpr=1"
)
MUTE_MENTIONS = (
"https://www.facebook.com/ajax/mercury/change_mentions_mute_thread/?dpr=1"
)
CREATE_POLL = "https://www.facebook.com/messaging/group_polling/create_poll/?dpr=1"
UPDATE_VOTE = "https://www.facebook.com/messaging/group_polling/update_vote/?dpr=1"
GET_POLL_OPTIONS = "https://www.facebook.com/ajax/mercury/get_poll_options"
@@ -148,41 +159,51 @@ class ReqUrl(object):
def change_pull_channel(self, channel=None):
if channel is None:
self.pull_channel = (self.pull_channel + 1) % 5 # Pull channel will be 0-4
self.pull_channel = (self.pull_channel + 1) % 5 # Pull channel will be 0-4
else:
self.pull_channel = channel
self.STICKY = "https://{}-edge-chat.facebook.com/pull".format(self.pull_channel)
self.PING = "https://{}-edge-chat.facebook.com/active_ping".format(self.pull_channel)
self.PING = "https://{}-edge-chat.facebook.com/active_ping".format(
self.pull_channel
)
facebookEncoding = 'UTF-8'
def now():
return int(time()*1000)
return int(time() * 1000)
def strip_to_json(text):
try:
return text[text.index('{'):]
return text[text.index('{') :]
except ValueError:
raise FBchatException('No JSON object found: {!r}'.format(text))
def get_decoded_r(r):
return get_decoded(r._content)
def get_decoded(content):
return content.decode(facebookEncoding)
def parse_json(content):
return json.loads(content)
def get_json(r):
return json.loads(strip_to_json(get_decoded_r(r)))
def digitToChar(digit):
if digit < 10:
return str(digit)
return chr(ord('a') + digit - 10)
def str_base(number, base):
if number < 0:
return '-' + str_base(-number, base)
@@ -191,14 +212,17 @@ def str_base(number, base):
return str_base(d, base) + digitToChar(m)
return digitToChar(m)
def generateMessageID(client_id=None):
k = now()
l = int(random() * 4294967295)
return "<{}:{}-{}@mail.projektitan.com>".format(k, l, client_id)
def getSignatureID():
return hex(int(random() * 2147483648))
def generateOfflineThreadingID():
ret = now()
value = int(random() * 4294967295)
@@ -206,20 +230,39 @@ def generateOfflineThreadingID():
msgs = format(ret, 'b') + string
return str(int(msgs, 2))
def check_json(j):
if j.get('error') is None:
return
if 'errorDescription' in j:
# 'errorDescription' is in the users own language!
raise FBchatFacebookError('Error #{} when sending request: {}'.format(j['error'], j['errorDescription']), fb_error_code=j['error'], fb_error_message=j['errorDescription'])
raise FBchatFacebookError(
'Error #{} when sending request: {}'.format(
j['error'], j['errorDescription']
),
fb_error_code=j['error'],
fb_error_message=j['errorDescription'],
)
elif 'debug_info' in j['error'] and 'code' in j['error']:
raise FBchatFacebookError('Error #{} when sending request: {}'.format(j['error']['code'], repr(j['error']['debug_info'])), fb_error_code=j['error']['code'], fb_error_message=j['error']['debug_info'])
raise FBchatFacebookError(
'Error #{} when sending request: {}'.format(
j['error']['code'], repr(j['error']['debug_info'])
),
fb_error_code=j['error']['code'],
fb_error_message=j['error']['debug_info'],
)
else:
raise FBchatFacebookError('Error {} when sending request'.format(j['error']), fb_error_code=j['error'])
raise FBchatFacebookError(
'Error {} when sending request'.format(j['error']), fb_error_code=j['error']
)
def check_request(r, as_json=True):
if not r.ok:
raise FBchatFacebookError('Error when sending request: Got {} response'.format(r.status_code), request_status_code=r.status_code)
raise FBchatFacebookError(
'Error when sending request: Got {} response'.format(r.status_code),
request_status_code=r.status_code,
)
content = get_decoded_r(r)
@@ -238,14 +281,20 @@ def check_request(r, as_json=True):
else:
return content
def get_jsmods_require(j, index):
if j.get('jsmods') and j['jsmods'].get('require'):
try:
return j['jsmods']['require'][0][index][0]
except (KeyError, IndexError) as e:
log.warning('Error when getting jsmods_require: {}. Facebook might have changed protocol'.format(j))
log.warning(
'Error when getting jsmods_require: {}. Facebook might have changed protocol'.format(
j
)
)
return None
def get_emojisize_from_tags(tags):
if tags is None:
return None
@@ -254,15 +303,19 @@ def get_emojisize_from_tags(tags):
try:
return LIKES[tmp[0].split(':')[1]]
except (KeyError, IndexError):
log.exception('Could not determine emoji size from {} - {}'.format(tags, tmp))
log.exception(
'Could not determine emoji size from {} - {}'.format(tags, tmp)
)
return None
def require_list(list_):
if isinstance(list_, list):
return set(list_)
else:
return set([list_])
def mimetype_to_key(mimetype):
if not mimetype:
return "file_id"
@@ -280,11 +333,13 @@ def get_files_from_urls(file_urls):
r = requests.get(file_url)
# We could possibly use r.headers.get('Content-Disposition'), see
# https://stackoverflow.com/a/37060758
files.append((
basename(file_url),
r.content,
r.headers.get('Content-Type') or guess_type(file_url)[0],
))
files.append(
(
basename(file_url),
r.content,
r.headers.get('Content-Type') or guess_type(file_url)[0],
)
)
return files
@@ -292,26 +347,31 @@ def get_files_from_urls(file_urls):
def get_files_from_paths(filenames):
files = []
for filename in filenames:
files.append((
basename(filename),
open(filename, 'rb'),
guess_type(filename)[0],
))
files.append(
(basename(filename), open(filename, 'rb'), guess_type(filename)[0])
)
yield files
for fn, fp, ft in files:
fp.close()
def enum_extend_if_invalid(enumeration, value):
try:
return enumeration(value)
except ValueError:
log.warning("Failed parsing {.__name__}({!r}). Extending enum.".format(enumeration, value))
log.warning(
"Failed parsing {.__name__}({!r}). Extending enum.".format(
enumeration, value
)
)
aenum.extend_enum(enumeration, "UNKNOWN_{}".format(value).upper(), value)
return enumeration(value)
def get_url_parameters(url, *args):
params = parse_qs(urlparse(url).query)
return [params[arg][0] for arg in args if params.get(arg)]
def get_url_parameter(url, param):
return get_url_parameters(url, param)[0]

View File

@@ -1,3 +1,15 @@
[tool.black]
line-length = 88
exclude = '''
/(
\.git
| \.pytest_cache
| build
| dist
| venv
)/
'''
[build-system]
requires = ["flit"]
build-backend = "flit.buildapi"

View File

@@ -17,17 +17,21 @@ def user(client2):
@pytest.fixture(scope="session")
def group(pytestconfig):
return {"id": load_variable("group_id", pytestconfig.cache), "type": ThreadType.GROUP}
return {
"id": load_variable("group_id", pytestconfig.cache),
"type": ThreadType.GROUP,
}
@pytest.fixture(scope="session", params=[
"user", "group", pytest.param("none", marks=[pytest.mark.xfail()])
])
@pytest.fixture(
scope="session",
params=["user", "group", pytest.param("none", marks=[pytest.mark.xfail()])],
)
def thread(request, user, group):
return {
"user": user,
"group": group,
"none": {"id": "0", "type": ThreadType.GROUP}
"none": {"id": "0", "type": ThreadType.GROUP},
}[request.param]

View File

@@ -48,7 +48,9 @@ def test_fetch_message_mentions(client, thread, message_with_mentions):
mid = client.send(message_with_mentions)
message, = client.fetchThreadMessages(limit=1)
assert subset(vars(message), uid=mid, author=client.uid, text=message_with_mentions.text)
assert subset(
vars(message), uid=mid, author=client.uid, text=message_with_mentions.text
)
# The mentions are not ordered by offset
for m in message.mentions:
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]
@@ -58,7 +60,9 @@ def test_fetch_message_info_mentions(client, thread, message_with_mentions):
mid = client.send(message_with_mentions)
message = client.fetchMessageInfo(mid, thread_id=thread["id"])
assert subset(vars(message), uid=mid, author=client.uid, text=message_with_mentions.text)
assert subset(
vars(message), uid=mid, author=client.uid, text=message_with_mentions.text
)
# The mentions are not ordered by offset
for m in message.mentions:
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]

View File

@@ -9,17 +9,17 @@ from utils import random_hex, subset
from time import time
@pytest.fixture(scope="module", params=[
Plan(int(time()) + 100, random_hex()),
pytest.param(
Plan(int(time()), random_hex()),
marks=[pytest.mark.xfail(raises=FBchatFacebookError)]
),
pytest.param(
Plan(0, None),
marks=[pytest.mark.xfail()],
),
])
@pytest.fixture(
scope="module",
params=[
Plan(int(time()) + 100, random_hex()),
pytest.param(
Plan(int(time()), random_hex()),
marks=[pytest.mark.xfail(raises=FBchatFacebookError)],
),
pytest.param(Plan(0, None), marks=[pytest.mark.xfail()]),
],
)
def plan_data(request, client, user, thread, catch_event, compare):
with catch_event("onPlanCreated") as x:
client.createPlan(request.param, thread["id"])
@@ -50,15 +50,14 @@ def test_fetch_plan_info(client, catch_event, plan_data):
event, plan = plan_data
fetched_plan = client.fetchPlanInfo(plan.uid)
assert subset(
vars(fetched_plan),
time=plan.time,
title=plan.title,
author_id=int(client.uid),
vars(fetched_plan), time=plan.time, title=plan.title, author_id=int(client.uid)
)
@pytest.mark.parametrize("take_part", [False, True])
def test_change_plan_participation(client, thread, catch_event, compare, plan_data, take_part):
def test_change_plan_participation(
client, thread, catch_event, compare, plan_data, take_part
):
event, plan = plan_data
with catch_event("onPlanParticipation") as x:
client.changePlanParticipation(plan, take_part=take_part)
@@ -94,18 +93,22 @@ def test_on_plan_ended(client, thread, catch_event, compare):
with catch_event("onPlanEnded") as x:
client.createPlan(Plan(int(time()) + 120, "Wait for ending"))
x.wait(180)
assert subset(x.res, thread_id=client.uid if thread["type"] == ThreadType.USER else thread["id"], thread_type=thread["type"])
assert subset(
x.res,
thread_id=client.uid if thread["type"] == ThreadType.USER else thread["id"],
thread_type=thread["type"],
)
#createPlan(self, plan, thread_id=None)
#editPlan(self, plan, new_plan)
#deletePlan(self, plan)
#changePlanParticipation(self, plan, take_part=True)
# createPlan(self, plan, thread_id=None)
# editPlan(self, plan, new_plan)
# deletePlan(self, plan)
# changePlanParticipation(self, plan, take_part=True)
#onPlanCreated(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#onPlanEnded(self, mid=None, plan=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#onPlanEdited(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#onPlanDeleted(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#onPlanParticipation(self, mid=None, plan=None, take_part=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
# onPlanCreated(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
# onPlanEnded(self, mid=None, plan=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
# onPlanEdited(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
# onPlanDeleted(self, mid=None, plan=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
# onPlanParticipation(self, mid=None, plan=None, take_part=None, author_id=None, thread_id=None, thread_type=None, ts=None, metadata=None, msg=None)
#fetchPlanInfo(self, plan_id)
# fetchPlanInfo(self, plan_id)

View File

@@ -8,28 +8,40 @@ from fbchat.models import Poll, PollOption, ThreadType
from utils import random_hex, subset
@pytest.fixture(scope="module", params=[
Poll(title=random_hex(), options=[]),
Poll(title=random_hex(), options=[
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=True),
]),
Poll(title=random_hex(), options=[
PollOption(random_hex(), vote=False),
PollOption(random_hex(), vote=False),
]),
Poll(title=random_hex(), options=[
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=False),
PollOption(random_hex(), vote=False),
PollOption(random_hex()),
PollOption(random_hex()),
]),
pytest.param(
Poll(title=None, options=[]), marks=[pytest.mark.xfail(raises=ValueError)]
),
])
@pytest.fixture(
scope="module",
params=[
Poll(title=random_hex(), options=[]),
Poll(
title=random_hex(),
options=[
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=True),
],
),
Poll(
title=random_hex(),
options=[
PollOption(random_hex(), vote=False),
PollOption(random_hex(), vote=False),
],
),
Poll(
title=random_hex(),
options=[
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=True),
PollOption(random_hex(), vote=False),
PollOption(random_hex(), vote=False),
PollOption(random_hex()),
PollOption(random_hex()),
],
),
pytest.param(
Poll(title=None, options=[]), marks=[pytest.mark.xfail(raises=ValueError)]
),
],
)
def poll_data(request, client1, group, catch_event):
with catch_event("onPollCreated") as x:
client1.createPoll(request.param, thread_id=group["id"])
@@ -45,11 +57,17 @@ def test_create_poll(client1, group, catch_event, poll_data):
thread_id=group["id"],
thread_type=ThreadType.GROUP,
)
assert subset(vars(event["poll"]), title=poll.title, options_count=len(poll.options))
for recv_option in event["poll"].options: # The recieved options may not be the full list
assert subset(
vars(event["poll"]), title=poll.title, options_count=len(poll.options)
)
for recv_option in event[
"poll"
].options: # The recieved options may not be the full list
old_option, = list(filter(lambda o: o.text == recv_option.text, poll.options))
voters = [client1.uid] if old_option.vote else []
assert subset(vars(recv_option), voters=voters, votes_count=len(voters), vote=False)
assert subset(
vars(recv_option), voters=voters, votes_count=len(voters), vote=False
)
def test_fetch_poll_options(client1, group, catch_event, poll_data):
@@ -62,11 +80,15 @@ def test_fetch_poll_options(client1, group, catch_event, poll_data):
@pytest.mark.trylast
def test_update_poll_vote(client1, group, catch_event, poll_data):
event, poll, options = poll_data
new_vote_ids = [o.uid for o in options[0:len(options):2] if not o.vote]
re_vote_ids = [o.uid for o in options[0:len(options):2] if o.vote]
new_vote_ids = [o.uid for o in options[0 : len(options) : 2] if not o.vote]
re_vote_ids = [o.uid for o in options[0 : len(options) : 2] if o.vote]
new_options = [random_hex(), random_hex()]
with catch_event("onPollVoted") as x:
client1.updatePollVote(event["poll"].uid, option_ids=new_vote_ids + re_vote_ids, new_options=new_options)
client1.updatePollVote(
event["poll"].uid,
option_ids=new_vote_ids + re_vote_ids,
new_options=new_options,
)
assert subset(
x.res,
@@ -74,8 +96,12 @@ def test_update_poll_vote(client1, group, catch_event, poll_data):
thread_id=group["id"],
thread_type=ThreadType.GROUP,
)
assert subset(vars(x.res["poll"]), title=poll.title, options_count=len(options + new_options))
assert subset(
vars(x.res["poll"]), title=poll.title, options_count=len(options + new_options)
)
for o in new_vote_ids:
assert o in x.res["added_options"]
assert len(x.res["added_options"]) == len(new_vote_ids) + len(new_options)
assert set(x.res["removed_options"]) == set(o.uid for o in options if o.vote and o.uid not in re_vote_ids)
assert set(x.res["removed_options"]) == set(
o.uid for o in options if o.vote and o.uid not in re_vote_ids
)

View File

@@ -38,7 +38,12 @@ def test_send_mentions(client, catch_event, compare, message_with_mentions):
mid = client.send(message_with_mentions)
assert compare(x, mid=mid, message=message_with_mentions.text)
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=message_with_mentions.text)
assert subset(
vars(x.res["message_object"]),
uid=mid,
author=client.uid,
text=message_with_mentions.text,
)
# The mentions are not ordered by offset
for m in x.res["message_object"].mentions:
assert vars(m) in [vars(x) for x in message_with_mentions.mentions]
@@ -76,7 +81,15 @@ def test_send_images(client, catch_event, compare, method_name, url):
def test_send_local_files(client, catch_event, compare):
files = ["image.png", "image.jpg", "image.gif", "file.json", "file.txt", "audio.mp3", "video.mp4"]
files = [
"image.png",
"image.jpg",
"image.gif",
"file.json",
"file.txt",
"audio.mp3",
"video.mp4",
]
text = "Files sent locally"
with catch_event("onMessage") as x:
mid = client.sendLocalFiles(
@@ -95,7 +108,10 @@ def test_send_remote_files(client, catch_event, compare):
text = "Files sent from remote"
with catch_event("onMessage") as x:
mid = client.sendRemoteFiles(
["https://github.com/carpedm20/fbchat/raw/master/tests/{}".format(f) for f in files],
[
"https://github.com/carpedm20/fbchat/raw/master/tests/{}".format(f)
for f in files
],
message=Message(text),
)

View File

@@ -67,14 +67,19 @@ def test_change_nickname(client, client_all, catch_event, compare):
assert compare(x, changed_for=client_all.uid, new_nickname=nickname)
@pytest.mark.parametrize("emoji", [
"😀",
"😂",
"😕",
"😍",
pytest.param("🙃", marks=[pytest.mark.xfail(raises=FBchatFacebookError)]),
pytest.param("not an emoji", marks=[pytest.mark.xfail(raises=FBchatFacebookError)]),
])
@pytest.mark.parametrize(
"emoji",
[
"😀",
"😂",
"😕",
"😍",
pytest.param("🙃", marks=[pytest.mark.xfail(raises=FBchatFacebookError)]),
pytest.param(
"not an emoji", marks=[pytest.mark.xfail(raises=FBchatFacebookError)]
),
],
)
def test_change_emoji(client, catch_event, compare, emoji):
with catch_event("onEmojiChange") as x:
client.changeThreadEmoji(emoji)
@@ -85,7 +90,9 @@ def test_change_image_local(client1, group, catch_event):
url = path.join(path.dirname(__file__), "resources", "image.png")
with catch_event("onImageChange") as x:
image_id = client1.changeGroupImageLocal(url, group["id"])
assert subset(x.res, new_image=image_id, author_id=client1.uid, thread_id=group["id"])
assert subset(
x.res, new_image=image_id, author_id=client1.uid, thread_id=group["id"]
)
# To be changed when merged into master
@@ -93,7 +100,9 @@ def test_change_image_remote(client1, group, catch_event):
url = "https://github.com/carpedm20/fbchat/raw/master/tests/image.png"
with catch_event("onImageChange") as x:
image_id = client1.changeGroupImageRemote(url, group["id"])
assert subset(x.res, new_image=image_id, author_id=client1.uid, thread_id=group["id"])
assert subset(
x.res, new_image=image_id, author_id=client1.uid, thread_id=group["id"]
)
@pytest.mark.parametrize(
@@ -138,6 +147,7 @@ def test_change_approval_mode(client1, group, catch_event, require_admin_approva
thread_id=group["id"],
)
@pytest.mark.parametrize("mute_time", [0, 10, 100, 1000, -1])
def test_mute_thread(client, mute_time):
assert client.muteThread(mute_time)