Format strings using black

This commit is contained in:
Mads Marquart
2019-01-31 20:55:22 +01:00
parent d20fc3b9ce
commit e0710a2ec1
16 changed files with 823 additions and 823 deletions

View File

@@ -20,7 +20,7 @@
import os
import sys
sys.path.insert(0, os.path.abspath('..'))
sys.path.insert(0, os.path.abspath(".."))
import fbchat
import tests
@@ -37,27 +37,27 @@ from fbchat import __copyright__, __author__, __version__, __description__
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.todo',
'sphinx.ext.viewcode',
"sphinx.ext.autodoc",
"sphinx.ext.intersphinx",
"sphinx.ext.todo",
"sphinx.ext.viewcode",
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
source_suffix = ".rst"
# The master toctree document.
master_doc = 'index'
master_doc = "index"
# General information about the project.
project = 'fbchat'
title = 'fbchat Documentation'
project = "fbchat"
title = "fbchat Documentation"
copyright = __copyright__
author = __author__
description = __description__
@@ -81,10 +81,10 @@ language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
pygments_style = "sphinx"
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True
@@ -96,7 +96,7 @@ todo_include_todos = True
# a list of builtin themes.
#
html_theme = 'alabaster'
html_theme = "alabaster"
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
@@ -107,13 +107,13 @@ html_theme = 'alabaster'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
html_static_path = ["_static"]
# -- Options for HTMLHelp output ------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = project + 'doc'
htmlhelp_basename = project + "doc"
# -- Options for LaTeX output ---------------------------------------------
@@ -136,7 +136,7 @@ latex_elements = {
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [(master_doc, project + '.tex', title, author, 'manual')]
latex_documents = [(master_doc, project + ".tex", title, author, "manual")]
# -- Options for manual page output ---------------------------------------
@@ -152,27 +152,27 @@ man_pages = [(master_doc, project, title, [author], 1)]
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, project, title, author, project, description, 'Miscellaneous')
(master_doc, project, title, author, project, description, "Miscellaneous")
]
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'https://docs.python.org/3/': None}
intersphinx_mapping = {"https://docs.python.org/3/": None}
add_function_parentheses = False
html_theme_options = {
'show_powered_by': False,
'github_user': 'carpedm20',
'github_repo': project,
'github_banner': True,
'show_related': False,
"show_powered_by": False,
"github_user": "carpedm20",
"github_repo": project,
"github_banner": True,
"show_related": False,
}
html_sidebars = {'**': ['sidebar.html', 'searchbox.html']}
html_sidebars = {"**": ["sidebar.html", "searchbox.html"]}
html_show_sphinx = False
html_show_sourcelink = False
autoclass_content = 'init'
autoclass_content = "init"
html_short_title = description

View File

@@ -3,10 +3,10 @@
from fbchat import Client
from fbchat.models import *
client = Client('<email>', '<password>')
client = Client("<email>", "<password>")
print('Own id: {}'.format(client.uid))
print("Own id: {}".format(client.uid))
client.send(Message(text='Hi me!'), thread_id=client.uid, thread_type=ThreadType.USER)
client.send(Message(text="Hi me!"), thread_id=client.uid, thread_type=ThreadType.USER)
client.logout()

View File

@@ -3,7 +3,7 @@
from fbchat import Client
from fbchat.models import *
client = Client('<email>', '<password>')
client = Client("<email>", "<password>")
# Fetches a list of all users you're currently chatting with, as `User` objects
users = client.fetchAllUsers()
@@ -13,9 +13,9 @@ print("users' names: {}".format([user.name for user in users]))
# If we have a user id, we can use `fetchUserInfo` to fetch a `User` object
user = client.fetchUserInfo('<user id>')['<user id>']
user = client.fetchUserInfo("<user id>")["<user id>"]
# We can also query both mutiple users together, which returns list of `User` objects
users = client.fetchUserInfo('<1st user id>', '<2nd user id>', '<3rd user id>')
users = client.fetchUserInfo("<1st user id>", "<2nd user id>", "<3rd user id>")
print("user's name: {}".format(user.name))
print("users' names: {}".format([users[k].name for k in users]))
@@ -23,9 +23,9 @@ print("users' names: {}".format([users[k].name for k in users]))
# `searchForUsers` searches for the user and gives us a list of the results,
# and then we just take the first one, aka. the most likely one:
user = client.searchForUsers('<name of user>')[0]
user = client.searchForUsers("<name of user>")[0]
print('user ID: {}'.format(user.uid))
print("user ID: {}".format(user.uid))
print("user's name: {}".format(user.name))
print("user's photo: {}".format(user.photo))
print("Is user client's friend: {}".format(user.is_friend))
@@ -40,7 +40,7 @@ print("Threads: {}".format(threads))
# Gets the last 10 messages sent to the thread
messages = client.fetchThreadMessages(thread_id='<thread id>', limit=10)
messages = client.fetchThreadMessages(thread_id="<thread id>", limit=10)
# Since the message come in reversed order, reverse them
messages.reverse()
@@ -50,13 +50,13 @@ for message in messages:
# If we have a thread id, we can use `fetchThreadInfo` to fetch a `Thread` object
thread = client.fetchThreadInfo('<thread id>')['<thread id>']
thread = client.fetchThreadInfo("<thread id>")["<thread id>"]
print("thread's name: {}".format(thread.name))
print("thread's type: {}".format(thread.type))
# `searchForThreads` searches works like `searchForUsers`, but gives us a list of threads instead
thread = client.searchForThreads('<name of thread>')[0]
thread = client.searchForThreads("<name of thread>")[0]
print("thread's name: {}".format(thread.name))
print("thread's type: {}".format(thread.type))

View File

@@ -5,11 +5,11 @@ from fbchat.models import *
client = Client("<email>", "<password>")
thread_id = '1234567890'
thread_id = "1234567890"
thread_type = ThreadType.GROUP
# Will send a message to the thread
client.send(Message(text='<message>'), thread_id=thread_id, thread_type=thread_type)
client.send(Message(text="<message>"), thread_id=thread_id, thread_type=thread_type)
# Will send the default `like` emoji
client.send(
@@ -18,14 +18,14 @@ client.send(
# Will send the emoji `👍`
client.send(
Message(text='👍', emoji_size=EmojiSize.LARGE),
Message(text="👍", emoji_size=EmojiSize.LARGE),
thread_id=thread_id,
thread_type=thread_type,
)
# Will send the sticker with ID `767334476626295`
client.send(
Message(sticker=Sticker('767334476626295')),
Message(sticker=Sticker("767334476626295")),
thread_id=thread_id,
thread_type=thread_type,
)
@@ -33,7 +33,7 @@ client.send(
# Will send a message with a mention
client.send(
Message(
text='This is a @mention', mentions=[Mention(thread_id, offset=10, length=8)]
text="This is a @mention", mentions=[Mention(thread_id, offset=10, length=8)]
),
thread_id=thread_id,
thread_type=thread_type,
@@ -41,16 +41,16 @@ client.send(
# Will send the image located at `<image path>`
client.sendLocalImage(
'<image path>',
message=Message(text='This is a local image'),
"<image path>",
message=Message(text="This is a local image"),
thread_id=thread_id,
thread_type=thread_type,
)
# Will download the image at the url `<image url>`, and then send it
client.sendRemoteImage(
'<image url>',
message=Message(text='This is a remote image'),
"<image url>",
message=Message(text="This is a remote image"),
thread_id=thread_id,
thread_type=thread_type,
)
@@ -59,24 +59,24 @@ client.sendRemoteImage(
# Only do these actions if the thread is a group
if thread_type == ThreadType.GROUP:
# Will remove the user with ID `<user id>` from the thread
client.removeUserFromGroup('<user id>', thread_id=thread_id)
client.removeUserFromGroup("<user id>", thread_id=thread_id)
# Will add the user with ID `<user id>` to the thread
client.addUsersToGroup('<user id>', thread_id=thread_id)
client.addUsersToGroup("<user id>", thread_id=thread_id)
# Will add the users with IDs `<1st user id>`, `<2nd user id>` and `<3th user id>` to the thread
client.addUsersToGroup(
['<1st user id>', '<2nd user id>', '<3rd user id>'], thread_id=thread_id
["<1st user id>", "<2nd user id>", "<3rd user id>"], thread_id=thread_id
)
# Will change the nickname of the user `<user_id>` to `<new nickname>`
client.changeNickname(
'<new nickname>', '<user id>', thread_id=thread_id, thread_type=thread_type
"<new nickname>", "<user id>", thread_id=thread_id, thread_type=thread_type
)
# Will change the title of the thread to `<title>`
client.changeThreadTitle('<title>', thread_id=thread_id, thread_type=thread_type)
client.changeThreadTitle("<title>", thread_id=thread_id, thread_type=thread_type)
# Will set the typing status of the thread to `TYPING`
client.setTypingStatus(
@@ -87,7 +87,7 @@ client.setTypingStatus(
client.changeThreadColor(ThreadColor.MESSENGER_BLUE, thread_id=thread_id)
# Will change the thread emoji to `👍`
client.changeThreadEmoji('👍', thread_id=thread_id)
client.changeThreadEmoji("👍", thread_id=thread_id)
# Will react to a message with a 😍 emoji
client.reactToMessage('<message id>', MessageReaction.LOVE)
client.reactToMessage("<message id>", MessageReaction.LOVE)

View File

@@ -4,17 +4,17 @@ from fbchat import log, Client
from fbchat.models import *
# Change this to your group id
old_thread_id = '1234567890'
old_thread_id = "1234567890"
# Change these to match your liking
old_color = ThreadColor.MESSENGER_BLUE
old_emoji = '👍'
old_title = 'Old group chat name'
old_emoji = "👍"
old_title = "Old group chat name"
old_nicknames = {
'12345678901': "User nr. 1's nickname",
'12345678902': "User nr. 2's nickname",
'12345678903': "User nr. 3's nickname",
'12345678904': "User nr. 4's nickname",
"12345678901": "User nr. 1's nickname",
"12345678902": "User nr. 2's nickname",
"12345678903": "User nr. 3's nickname",
"12345678904": "User nr. 4's nickname",
}

View File

@@ -7,8 +7,8 @@ from fbchat.models import *
class RemoveBot(Client):
def onMessage(self, author_id, message_object, thread_id, thread_type, **kwargs):
# We can only kick people from group chats, so no need to try if it's a user chat
if message_object.text == 'Remove me!' and thread_type == ThreadType.GROUP:
log.info('{} will be removed from {}'.format(author_id, thread_id))
if message_object.text == "Remove me!" and thread_type == ThreadType.GROUP:
log.info("{} will be removed from {}".format(author_id, thread_id))
self.removeUserFromGroup(author_id, thread_id=thread_id)
else:
# Sends the data to the inherited onMessage, so that we can still see when a message is recieved

View File

@@ -9,14 +9,14 @@ from __future__ import unicode_literals
from .client import *
__title__ = 'fbchat'
__version__ = '1.6.1'
__description__ = 'Facebook Chat (Messenger) for Python'
__title__ = "fbchat"
__version__ = "1.6.1"
__description__ = "Facebook Chat (Messenger) for Python"
__copyright__ = 'Copyright 2015 - 2019 by Taehoon Kim'
__license__ = 'BSD 3-Clause'
__copyright__ = "Copyright 2015 - 2019 by Taehoon Kim"
__license__ = "BSD 3-Clause"
__author__ = 'Taehoon Kim; Moreels Pieter-Jan; Mads Marquart'
__email__ = 'carpedm20@gmail.com'
__author__ = "Taehoon Kim; Moreels Pieter-Jan; Mads Marquart"
__email__ = "carpedm20@gmail.com"
__all__ = ['Client']
__all__ = ["Client"]

File diff suppressed because it is too large Load Diff

View File

@@ -8,7 +8,7 @@ from .utils import *
# Shameless copy from https://stackoverflow.com/a/8730674
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
WHITESPACE = re.compile(r"[ \t\n\r]*", FLAGS)
class ConcatJSONDecoder(json.JSONDecoder):
@@ -33,220 +33,220 @@ def graphql_color_to_enum(color):
if not color:
return ThreadColor.MESSENGER_BLUE
color = color[2:] # Strip the alpha value
color_value = '#{}'.format(color.lower())
color_value = "#{}".format(color.lower())
return enum_extend_if_invalid(ThreadColor, color_value)
def get_customization_info(thread):
if thread is None or thread.get('customization_info') is None:
if thread is None or thread.get("customization_info") is None:
return {}
info = thread['customization_info']
info = thread["customization_info"]
rtn = {
'emoji': info.get('emoji'),
'color': graphql_color_to_enum(info.get('outgoing_bubble_color')),
"emoji": info.get("emoji"),
"color": graphql_color_to_enum(info.get("outgoing_bubble_color")),
}
if (
thread.get('thread_type') == 'GROUP'
or thread.get('is_group_thread')
or thread.get('thread_key', {}).get('thread_fbid')
thread.get("thread_type") == "GROUP"
or thread.get("is_group_thread")
or thread.get("thread_key", {}).get("thread_fbid")
):
rtn['nicknames'] = {}
for k in info.get('participant_customizations', []):
rtn['nicknames'][k['participant_id']] = k.get('nickname')
elif info.get('participant_customizations'):
uid = thread.get('thread_key', {}).get('other_user_id') or thread.get('id')
pc = info['participant_customizations']
rtn["nicknames"] = {}
for k in info.get("participant_customizations", []):
rtn["nicknames"][k["participant_id"]] = k.get("nickname")
elif info.get("participant_customizations"):
uid = thread.get("thread_key", {}).get("other_user_id") or thread.get("id")
pc = info["participant_customizations"]
if len(pc) > 0:
if pc[0].get('participant_id') == uid:
rtn['nickname'] = pc[0].get('nickname')
if pc[0].get("participant_id") == uid:
rtn["nickname"] = pc[0].get("nickname")
else:
rtn['own_nickname'] = pc[0].get('nickname')
rtn["own_nickname"] = pc[0].get("nickname")
if len(pc) > 1:
if pc[1].get('participant_id') == uid:
rtn['nickname'] = pc[1].get('nickname')
if pc[1].get("participant_id") == uid:
rtn["nickname"] = pc[1].get("nickname")
else:
rtn['own_nickname'] = pc[1].get('nickname')
rtn["own_nickname"] = pc[1].get("nickname")
return rtn
def graphql_to_sticker(s):
if not s:
return None
sticker = Sticker(uid=s['id'])
if s.get('pack'):
sticker.pack = s['pack'].get('id')
if s.get('sprite_image'):
sticker = Sticker(uid=s["id"])
if s.get("pack"):
sticker.pack = s["pack"].get("id")
if s.get("sprite_image"):
sticker.is_animated = True
sticker.medium_sprite_image = s['sprite_image'].get('uri')
sticker.large_sprite_image = s['sprite_image_2x'].get('uri')
sticker.frames_per_row = s.get('frames_per_row')
sticker.frames_per_col = s.get('frames_per_column')
sticker.frame_rate = s.get('frame_rate')
sticker.url = s.get('url')
sticker.width = s.get('width')
sticker.height = s.get('height')
if s.get('label'):
sticker.label = s['label']
sticker.medium_sprite_image = s["sprite_image"].get("uri")
sticker.large_sprite_image = s["sprite_image_2x"].get("uri")
sticker.frames_per_row = s.get("frames_per_row")
sticker.frames_per_col = s.get("frames_per_column")
sticker.frame_rate = s.get("frame_rate")
sticker.url = s.get("url")
sticker.width = s.get("width")
sticker.height = s.get("height")
if s.get("label"):
sticker.label = s["label"]
return sticker
def graphql_to_attachment(a):
_type = a['__typename']
if _type in ['MessageImage', 'MessageAnimatedImage']:
_type = a["__typename"]
if _type in ["MessageImage", "MessageAnimatedImage"]:
return ImageAttachment(
original_extension=a.get('original_extension')
or (a['filename'].split('-')[0] if a.get('filename') else None),
width=a.get('original_dimensions', {}).get('width'),
height=a.get('original_dimensions', {}).get('height'),
is_animated=_type == 'MessageAnimatedImage',
thumbnail_url=a.get('thumbnail', {}).get('uri'),
preview=a.get('preview') or a.get('preview_image'),
large_preview=a.get('large_preview'),
animated_preview=a.get('animated_image'),
uid=a.get('legacy_attachment_id'),
original_extension=a.get("original_extension")
or (a["filename"].split("-")[0] if a.get("filename") else None),
width=a.get("original_dimensions", {}).get("width"),
height=a.get("original_dimensions", {}).get("height"),
is_animated=_type == "MessageAnimatedImage",
thumbnail_url=a.get("thumbnail", {}).get("uri"),
preview=a.get("preview") or a.get("preview_image"),
large_preview=a.get("large_preview"),
animated_preview=a.get("animated_image"),
uid=a.get("legacy_attachment_id"),
)
elif _type == 'MessageVideo':
elif _type == "MessageVideo":
return VideoAttachment(
width=a.get('original_dimensions', {}).get('width'),
height=a.get('original_dimensions', {}).get('height'),
duration=a.get('playable_duration_in_ms'),
preview_url=a.get('playable_url'),
small_image=a.get('chat_image'),
medium_image=a.get('inbox_image'),
large_image=a.get('large_image'),
uid=a.get('legacy_attachment_id'),
width=a.get("original_dimensions", {}).get("width"),
height=a.get("original_dimensions", {}).get("height"),
duration=a.get("playable_duration_in_ms"),
preview_url=a.get("playable_url"),
small_image=a.get("chat_image"),
medium_image=a.get("inbox_image"),
large_image=a.get("large_image"),
uid=a.get("legacy_attachment_id"),
)
elif _type == 'MessageAudio':
elif _type == "MessageAudio":
return AudioAttachment(
filename=a.get('filename'),
url=a.get('playable_url'),
duration=a.get('playable_duration_in_ms'),
audio_type=a.get('audio_type'),
filename=a.get("filename"),
url=a.get("playable_url"),
duration=a.get("playable_duration_in_ms"),
audio_type=a.get("audio_type"),
)
elif _type == 'MessageFile':
elif _type == "MessageFile":
return FileAttachment(
url=a.get('url'),
name=a.get('filename'),
is_malicious=a.get('is_malicious'),
uid=a.get('message_file_fbid'),
url=a.get("url"),
name=a.get("filename"),
is_malicious=a.get("is_malicious"),
uid=a.get("message_file_fbid"),
)
else:
return Attachment(uid=a.get('legacy_attachment_id'))
return Attachment(uid=a.get("legacy_attachment_id"))
def graphql_to_extensible_attachment(a):
story = a.get('story_attachment')
story = a.get("story_attachment")
if story:
target = story.get('target')
target = story.get("target")
if target:
_type = target['__typename']
if _type == 'MessageLocation':
_type = target["__typename"]
if _type == "MessageLocation":
latitude, longitude = get_url_parameter(
get_url_parameter(story['url'], 'u'), 'where1'
get_url_parameter(story["url"], "u"), "where1"
).split(", ")
rtn = LocationAttachment(
uid=int(story['deduplication_key']),
uid=int(story["deduplication_key"]),
latitude=float(latitude),
longitude=float(longitude),
)
if story['media']:
rtn.image_url = story['media']['image']['uri']
rtn.image_width = story['media']['image']['width']
rtn.image_height = story['media']['image']['height']
rtn.url = story['url']
if story["media"]:
rtn.image_url = story["media"]["image"]["uri"]
rtn.image_width = story["media"]["image"]["width"]
rtn.image_height = story["media"]["image"]["height"]
rtn.url = story["url"]
return rtn
elif _type == 'MessageLiveLocation':
elif _type == "MessageLiveLocation":
rtn = LiveLocationAttachment(
uid=int(story['target']['live_location_id']),
latitude=story['target']['coordinate']['latitude']
if story['target'].get('coordinate')
uid=int(story["target"]["live_location_id"]),
latitude=story["target"]["coordinate"]["latitude"]
if story["target"].get("coordinate")
else None,
longitude=story['target']['coordinate']['longitude']
if story['target'].get('coordinate')
longitude=story["target"]["coordinate"]["longitude"]
if story["target"].get("coordinate")
else None,
name=story['title_with_entities']['text'],
expiration_time=story['target']['expiration_time']
if story['target'].get('expiration_time')
name=story["title_with_entities"]["text"],
expiration_time=story["target"]["expiration_time"]
if story["target"].get("expiration_time")
else None,
is_expired=story['target']['is_expired'],
is_expired=story["target"]["is_expired"],
)
if story['media']:
rtn.image_url = story['media']['image']['uri']
rtn.image_width = story['media']['image']['width']
rtn.image_height = story['media']['image']['height']
rtn.url = story['url']
if story["media"]:
rtn.image_url = story["media"]["image"]["uri"]
rtn.image_width = story["media"]["image"]["width"]
rtn.image_height = story["media"]["image"]["height"]
rtn.url = story["url"]
return rtn
elif _type in ['ExternalUrl', 'Story']:
elif _type in ["ExternalUrl", "Story"]:
return ShareAttachment(
uid=a.get('legacy_attachment_id'),
author=story['target']['actors'][0]['id']
if story['target'].get('actors')
uid=a.get("legacy_attachment_id"),
author=story["target"]["actors"][0]["id"]
if story["target"].get("actors")
else None,
url=story['url'],
original_url=get_url_parameter(story['url'], 'u')
if "/l.php?u=" in story['url']
else story['url'],
title=story['title_with_entities'].get('text'),
description=story['description'].get('text')
if story.get('description')
url=story["url"],
original_url=get_url_parameter(story["url"], "u")
if "/l.php?u=" in story["url"]
else story["url"],
title=story["title_with_entities"].get("text"),
description=story["description"].get("text")
if story.get("description")
else None,
source=story['source']['text'],
image_url=story['media']['image']['uri']
if story.get('media')
source=story["source"]["text"],
image_url=story["media"]["image"]["uri"]
if story.get("media")
else None,
original_image_url=(
get_url_parameter(story['media']['image']['uri'], 'url')
if "/safe_image.php" in story['media']['image']['uri']
else story['media']['image']['uri']
get_url_parameter(story["media"]["image"]["uri"], "url")
if "/safe_image.php" in story["media"]["image"]["uri"]
else story["media"]["image"]["uri"]
)
if story.get('media')
if story.get("media")
else None,
image_width=story['media']['image']['width']
if story.get('media')
image_width=story["media"]["image"]["width"]
if story.get("media")
else None,
image_height=story['media']['image']['height']
if story.get('media')
image_height=story["media"]["image"]["height"]
if story.get("media")
else None,
attachments=[
graphql_to_subattachment(attachment)
for attachment in story.get('subattachments')
for attachment in story.get("subattachments")
],
)
else:
return UnsentMessage(uid=a.get('legacy_attachment_id'))
return UnsentMessage(uid=a.get("legacy_attachment_id"))
def graphql_to_subattachment(a):
_type = a['target']['__typename']
if _type == 'Video':
_type = a["target"]["__typename"]
if _type == "Video":
return VideoAttachment(
duration=a['media'].get('playable_duration_in_ms'),
preview_url=a['media'].get('playable_url'),
medium_image=a['media'].get('image'),
uid=a['target'].get('video_id'),
duration=a["media"].get("playable_duration_in_ms"),
preview_url=a["media"].get("playable_url"),
medium_image=a["media"].get("image"),
uid=a["target"].get("video_id"),
)
def graphql_to_live_location(a):
return LiveLocationAttachment(
uid=a['id'],
latitude=a['coordinate']['latitude'] / (10 ** 8)
if not a.get('stopReason')
uid=a["id"],
latitude=a["coordinate"]["latitude"] / (10 ** 8)
if not a.get("stopReason")
else None,
longitude=a['coordinate']['longitude'] / (10 ** 8)
if not a.get('stopReason')
longitude=a["coordinate"]["longitude"] / (10 ** 8)
if not a.get("stopReason")
else None,
name=a.get('locationTitle'),
expiration_time=a['expirationTime'],
is_expired=bool(a.get('stopReason')),
name=a.get("locationTitle"),
expiration_time=a["expirationTime"],
is_expired=bool(a.get("stopReason")),
)
def graphql_to_poll(a):
rtn = Poll(
title=a.get('title') if a.get('title') else a.get('text'),
options=[graphql_to_poll_option(m) for m in a.get('options')],
title=a.get("title") if a.get("title") else a.get("text"),
options=[graphql_to_poll_option(m) for m in a.get("options")],
)
rtn.uid = int(a["id"])
rtn.options_count = a.get("total_count")
@@ -254,90 +254,90 @@ def graphql_to_poll(a):
def graphql_to_poll_option(a):
if a.get('viewer_has_voted') is None:
if a.get("viewer_has_voted") is None:
vote = None
elif isinstance(a['viewer_has_voted'], bool):
vote = a['viewer_has_voted']
elif isinstance(a["viewer_has_voted"], bool):
vote = a["viewer_has_voted"]
else:
vote = a['viewer_has_voted'] == 'true'
rtn = PollOption(text=a.get('text'), vote=vote)
vote = a["viewer_has_voted"] == "true"
rtn = PollOption(text=a.get("text"), vote=vote)
rtn.uid = int(a["id"])
rtn.voters = (
[m.get('node').get('id') for m in a.get('voters').get('edges')]
if isinstance(a.get('voters'), dict)
else a.get('voters')
[m.get("node").get("id") for m in a.get("voters").get("edges")]
if isinstance(a.get("voters"), dict)
else a.get("voters")
)
rtn.votes_count = (
a.get('voters').get('count')
if isinstance(a.get('voters'), dict)
else a.get('total_count')
a.get("voters").get("count")
if isinstance(a.get("voters"), dict)
else a.get("total_count")
)
return rtn
def graphql_to_plan(a):
if a.get('event_members'):
if a.get("event_members"):
rtn = Plan(
time=a.get('event_time'),
title=a.get('title'),
location=a.get('location_name'),
time=a.get("event_time"),
title=a.get("title"),
location=a.get("location_name"),
)
if a.get('location_id') != 0:
rtn.location_id = str(a.get('location_id'))
rtn.uid = a.get('oid')
rtn.author_id = a.get('creator_id')
if a.get("location_id") != 0:
rtn.location_id = str(a.get("location_id"))
rtn.uid = a.get("oid")
rtn.author_id = a.get("creator_id")
guests = a.get("event_members")
rtn.going = [uid for uid in guests if guests[uid] == "GOING"]
rtn.declined = [uid for uid in guests if guests[uid] == "DECLINED"]
rtn.invited = [uid for uid in guests if guests[uid] == "INVITED"]
return rtn
elif a.get('id') is None:
elif a.get("id") is None:
rtn = Plan(
time=a.get('event_time'),
title=a.get('event_title'),
location=a.get('event_location_name'),
location_id=a.get('event_location_id'),
time=a.get("event_time"),
title=a.get("event_title"),
location=a.get("event_location_name"),
location_id=a.get("event_location_id"),
)
rtn.uid = a.get('event_id')
rtn.author_id = a.get('event_creator_id')
guests = json.loads(a.get('guest_state_list'))
rtn.uid = a.get("event_id")
rtn.author_id = a.get("event_creator_id")
guests = json.loads(a.get("guest_state_list"))
else:
rtn = Plan(
time=a.get('time'),
title=a.get('event_title'),
location=a.get('location_name'),
time=a.get("time"),
title=a.get("event_title"),
location=a.get("location_name"),
)
rtn.uid = a.get('id')
rtn.author_id = a.get('lightweight_event_creator').get('id')
guests = a.get('event_reminder_members').get('edges')
rtn.uid = a.get("id")
rtn.author_id = a.get("lightweight_event_creator").get("id")
guests = a.get("event_reminder_members").get("edges")
rtn.going = [
m.get('node').get('id') for m in guests if m.get('guest_list_state') == "GOING"
m.get("node").get("id") for m in guests if m.get("guest_list_state") == "GOING"
]
rtn.declined = [
m.get('node').get('id')
m.get("node").get("id")
for m in guests
if m.get('guest_list_state') == "DECLINED"
if m.get("guest_list_state") == "DECLINED"
]
rtn.invited = [
m.get('node').get('id')
m.get("node").get("id")
for m in guests
if m.get('guest_list_state') == "INVITED"
if m.get("guest_list_state") == "INVITED"
]
return rtn
def graphql_to_quick_reply(q, is_response=False):
data = dict()
_type = q.get('content_type').lower()
if q.get('payload'):
_type = q.get("content_type").lower()
if q.get("payload"):
data["payload"] = q["payload"]
if q.get('data'):
if q.get("data"):
data["data"] = q["data"]
if q.get('image_url') and _type is not QuickReplyLocation._type:
if q.get("image_url") and _type is not QuickReplyLocation._type:
data["image_url"] = q["image_url"]
data["is_response"] = is_response
if _type == QuickReplyText._type:
if q.get('title') is not None:
if q.get("title") is not None:
data["title"] = q["title"]
rtn = QuickReplyText(**data)
elif _type == QuickReplyLocation._type:
@@ -350,48 +350,48 @@ def graphql_to_quick_reply(q, is_response=False):
def graphql_to_message(message):
if message.get('message_sender') is None:
message['message_sender'] = {}
if message.get('message') is None:
message['message'] = {}
if message.get("message_sender") is None:
message["message_sender"] = {}
if message.get("message") is None:
message["message"] = {}
rtn = Message(
text=message.get('message').get('text'),
text=message.get("message").get("text"),
mentions=[
Mention(
m.get('entity', {}).get('id'),
offset=m.get('offset'),
length=m.get('length'),
m.get("entity", {}).get("id"),
offset=m.get("offset"),
length=m.get("length"),
)
for m in message.get('message').get('ranges', [])
for m in message.get("message").get("ranges", [])
],
emoji_size=get_emojisize_from_tags(message.get('tags_list')),
sticker=graphql_to_sticker(message.get('sticker')),
emoji_size=get_emojisize_from_tags(message.get("tags_list")),
sticker=graphql_to_sticker(message.get("sticker")),
)
rtn.uid = str(message.get('message_id'))
rtn.author = str(message.get('message_sender').get('id'))
rtn.timestamp = message.get('timestamp_precise')
rtn.uid = str(message.get("message_id"))
rtn.author = str(message.get("message_sender").get("id"))
rtn.timestamp = message.get("timestamp_precise")
rtn.unsent = False
if message.get('unread') is not None:
rtn.is_read = not message['unread']
if message.get("unread") is not None:
rtn.is_read = not message["unread"]
rtn.reactions = {
str(r['user']['id']): enum_extend_if_invalid(MessageReaction, r['reaction'])
for r in message.get('message_reactions')
str(r["user"]["id"]): enum_extend_if_invalid(MessageReaction, r["reaction"])
for r in message.get("message_reactions")
}
if message.get('blob_attachments') is not None:
if message.get("blob_attachments") is not None:
rtn.attachments = [
graphql_to_attachment(attachment)
for attachment in message['blob_attachments']
for attachment in message["blob_attachments"]
]
if message.get('platform_xmd_encoded'):
quick_replies = json.loads(message['platform_xmd_encoded']).get('quick_replies')
if message.get("platform_xmd_encoded"):
quick_replies = json.loads(message["platform_xmd_encoded"]).get("quick_replies")
if isinstance(quick_replies, list):
rtn.quick_replies = [graphql_to_quick_reply(q) for q in quick_replies]
elif isinstance(quick_replies, dict):
rtn.quick_replies = [
graphql_to_quick_reply(quick_replies, is_response=True)
]
if message.get('extensible_attachment') is not None:
attachment = graphql_to_extensible_attachment(message['extensible_attachment'])
if message.get("extensible_attachment") is not None:
attachment = graphql_to_extensible_attachment(message["extensible_attachment"])
if isinstance(attachment, UnsentMessage):
rtn.unsent = True
elif attachment:
@@ -400,157 +400,157 @@ def graphql_to_message(message):
def graphql_to_user(user):
if user.get('profile_picture') is None:
user['profile_picture'] = {}
if user.get("profile_picture") is None:
user["profile_picture"] = {}
c_info = get_customization_info(user)
plan = None
if user.get('event_reminders'):
if user.get("event_reminders"):
plan = (
graphql_to_plan(user['event_reminders']['nodes'][0])
if user['event_reminders'].get('nodes')
graphql_to_plan(user["event_reminders"]["nodes"][0])
if user["event_reminders"].get("nodes")
else None
)
return User(
user['id'],
url=user.get('url'),
first_name=user.get('first_name'),
last_name=user.get('last_name'),
is_friend=user.get('is_viewer_friend'),
gender=GENDERS.get(user.get('gender')),
affinity=user.get('affinity'),
nickname=c_info.get('nickname'),
color=c_info.get('color'),
emoji=c_info.get('emoji'),
own_nickname=c_info.get('own_nickname'),
photo=user['profile_picture'].get('uri'),
name=user.get('name'),
message_count=user.get('messages_count'),
user["id"],
url=user.get("url"),
first_name=user.get("first_name"),
last_name=user.get("last_name"),
is_friend=user.get("is_viewer_friend"),
gender=GENDERS.get(user.get("gender")),
affinity=user.get("affinity"),
nickname=c_info.get("nickname"),
color=c_info.get("color"),
emoji=c_info.get("emoji"),
own_nickname=c_info.get("own_nickname"),
photo=user["profile_picture"].get("uri"),
name=user.get("name"),
message_count=user.get("messages_count"),
plan=plan,
)
def graphql_to_thread(thread):
if thread['thread_type'] == 'GROUP':
if thread["thread_type"] == "GROUP":
return graphql_to_group(thread)
elif thread['thread_type'] == 'ONE_TO_ONE':
if thread.get('big_image_src') is None:
thread['big_image_src'] = {}
elif thread["thread_type"] == "ONE_TO_ONE":
if thread.get("big_image_src") is None:
thread["big_image_src"] = {}
c_info = get_customization_info(thread)
participants = [
node['messaging_actor'] for node in thread['all_participants']['nodes']
node["messaging_actor"] for node in thread["all_participants"]["nodes"]
]
user = next(
p for p in participants if p['id'] == thread['thread_key']['other_user_id']
p for p in participants if p["id"] == thread["thread_key"]["other_user_id"]
)
last_message_timestamp = None
if 'last_message' in thread:
last_message_timestamp = thread['last_message']['nodes'][0][
'timestamp_precise'
if "last_message" in thread:
last_message_timestamp = thread["last_message"]["nodes"][0][
"timestamp_precise"
]
first_name = user.get('short_name')
first_name = user.get("short_name")
if first_name is None:
last_name = None
else:
last_name = user.get('name').split(first_name, 1).pop().strip()
last_name = user.get("name").split(first_name, 1).pop().strip()
plan = None
if thread.get('event_reminders'):
if thread.get("event_reminders"):
plan = (
graphql_to_plan(thread['event_reminders']['nodes'][0])
if thread['event_reminders'].get('nodes')
graphql_to_plan(thread["event_reminders"]["nodes"][0])
if thread["event_reminders"].get("nodes")
else None
)
return User(
user['id'],
url=user.get('url'),
name=user.get('name'),
user["id"],
url=user.get("url"),
name=user.get("name"),
first_name=first_name,
last_name=last_name,
is_friend=user.get('is_viewer_friend'),
gender=GENDERS.get(user.get('gender')),
affinity=user.get('affinity'),
nickname=c_info.get('nickname'),
color=c_info.get('color'),
emoji=c_info.get('emoji'),
own_nickname=c_info.get('own_nickname'),
photo=user['big_image_src'].get('uri'),
message_count=thread.get('messages_count'),
is_friend=user.get("is_viewer_friend"),
gender=GENDERS.get(user.get("gender")),
affinity=user.get("affinity"),
nickname=c_info.get("nickname"),
color=c_info.get("color"),
emoji=c_info.get("emoji"),
own_nickname=c_info.get("own_nickname"),
photo=user["big_image_src"].get("uri"),
message_count=thread.get("messages_count"),
last_message_timestamp=last_message_timestamp,
plan=plan,
)
else:
raise FBchatException(
'Unknown thread type: {}, with data: {}'.format(
thread.get('thread_type'), thread
"Unknown thread type: {}, with data: {}".format(
thread.get("thread_type"), thread
)
)
def graphql_to_group(group):
if group.get('image') is None:
group['image'] = {}
if group.get("image") is None:
group["image"] = {}
c_info = get_customization_info(group)
last_message_timestamp = None
if 'last_message' in group:
last_message_timestamp = group['last_message']['nodes'][0]['timestamp_precise']
if "last_message" in group:
last_message_timestamp = group["last_message"]["nodes"][0]["timestamp_precise"]
plan = None
if group.get('event_reminders'):
if group.get("event_reminders"):
plan = (
graphql_to_plan(group['event_reminders']['nodes'][0])
if group['event_reminders'].get('nodes')
graphql_to_plan(group["event_reminders"]["nodes"][0])
if group["event_reminders"].get("nodes")
else None
)
return Group(
group['thread_key']['thread_fbid'],
group["thread_key"]["thread_fbid"],
participants=set(
[
node['messaging_actor']['id']
for node in group['all_participants']['nodes']
node["messaging_actor"]["id"]
for node in group["all_participants"]["nodes"]
]
),
nicknames=c_info.get('nicknames'),
color=c_info.get('color'),
emoji=c_info.get('emoji'),
admins=set([node.get('id') for node in group.get('thread_admins')]),
approval_mode=bool(group.get('approval_mode'))
if group.get('approval_mode') is not None
nicknames=c_info.get("nicknames"),
color=c_info.get("color"),
emoji=c_info.get("emoji"),
admins=set([node.get("id") for node in group.get("thread_admins")]),
approval_mode=bool(group.get("approval_mode"))
if group.get("approval_mode") is not None
else None,
approval_requests=set(
node["requester"]['id'] for node in group['group_approval_queue']['nodes']
node["requester"]["id"] for node in group["group_approval_queue"]["nodes"]
)
if group.get('group_approval_queue')
if group.get("group_approval_queue")
else None,
join_link=group['joinable_mode'].get('link'),
photo=group['image'].get('uri'),
name=group.get('name'),
message_count=group.get('messages_count'),
join_link=group["joinable_mode"].get("link"),
photo=group["image"].get("uri"),
name=group.get("name"),
message_count=group.get("messages_count"),
last_message_timestamp=last_message_timestamp,
plan=plan,
)
def graphql_to_page(page):
if page.get('profile_picture') is None:
page['profile_picture'] = {}
if page.get('city') is None:
page['city'] = {}
if page.get("profile_picture") is None:
page["profile_picture"] = {}
if page.get("city") is None:
page["city"] = {}
plan = None
if page.get('event_reminders'):
if page.get("event_reminders"):
plan = (
graphql_to_plan(page['event_reminders']['nodes'][0])
if page['event_reminders'].get('nodes')
graphql_to_plan(page["event_reminders"]["nodes"][0])
if page["event_reminders"].get("nodes")
else None
)
return Page(
page['id'],
url=page.get('url'),
city=page.get('city').get('name'),
category=page.get('category_type'),
photo=page['profile_picture'].get('uri'),
name=page.get('name'),
message_count=page.get('messages_count'),
page["id"],
url=page.get("url"),
city=page.get("city").get("name"),
category=page.get("category_type"),
photo=page["profile_picture"].get("uri"),
name=page.get("name"),
message_count=page.get("messages_count"),
plan=plan,
)
@@ -561,7 +561,7 @@ def graphql_queries_to_json(*queries):
"""
rtn = {}
for i, query in enumerate(queries):
rtn['q{}'.format(i)] = query.value
rtn["q{}".format(i)] = query.value
return json.dumps(rtn)
@@ -570,20 +570,20 @@ def graphql_response_to_json(content):
try:
j = json.loads(content, cls=ConcatJSONDecoder)
except Exception:
raise FBchatException('Error while parsing JSON: {}'.format(repr(content)))
raise FBchatException("Error while parsing JSON: {}".format(repr(content)))
rtn = [None] * (len(j))
for x in j:
if 'error_results' in x:
if "error_results" in x:
del rtn[-1]
continue
check_json(x)
[(key, value)] = x.items()
check_json(value)
if 'response' in value:
rtn[int(key[1:])] = value['response']
if "response" in value:
rtn[int(key[1:])] = value["response"]
else:
rtn[int(key[1:])] = value['data']
rtn[int(key[1:])] = value["data"]
log.debug(rtn)
@@ -595,11 +595,11 @@ class GraphQL(object):
if params is None:
params = {}
if query is not None:
self.value = {'priority': 0, 'q': query, 'query_params': params}
self.value = {"priority": 0, "q": query, "query_params": params}
elif doc_id is not None:
self.value = {'doc_id': doc_id, 'query_params': params}
self.value = {"doc_id": doc_id, "query_params": params}
else:
raise FBchatUserError('A query or doc_id must be specified')
raise FBchatUserError("A query or doc_id must be specified")
FRAGMENT_USER = """
QueryFragment User: User {

View File

@@ -74,7 +74,7 @@ class Thread(object):
return self.__unicode__()
def __unicode__(self):
return '<{} {} ({})>'.format(self.type.name, self.name, self.uid)
return "<{} {} ({})>".format(self.type.name, self.name, self.uid)
class User(Thread):
@@ -282,7 +282,7 @@ class Message(object):
return self.__unicode__()
def __unicode__(self):
return '<Message ({}): {}, mentions={} emoji_size={} attachments={}>'.format(
return "<Message ({}): {}, mentions={} emoji_size={} attachments={}>".format(
self.uid, repr(self.text), self.mentions, self.emoji_size, self.attachments
)
@@ -305,7 +305,7 @@ class Message(object):
offset = 0
f = Formatter()
field_names = [field_name[1] for field_name in f.parse(text)]
automatic = '' in field_names
automatic = "" in field_names
i = 0
for (literal_text, field_name, format_spec, conversion) in f.parse(text):
@@ -315,7 +315,7 @@ class Message(object):
if field_name is None:
continue
if field_name == '':
if field_name == "":
field_name = str(i)
i += 1
elif automatic and field_name.isdigit():
@@ -584,21 +584,21 @@ class ImageAttachment(Attachment):
if preview is None:
preview = {}
self.preview_url = preview.get('uri')
self.preview_width = preview.get('width')
self.preview_height = preview.get('height')
self.preview_url = preview.get("uri")
self.preview_width = preview.get("width")
self.preview_height = preview.get("height")
if large_preview is None:
large_preview = {}
self.large_preview_url = large_preview.get('uri')
self.large_preview_width = large_preview.get('width')
self.large_preview_height = large_preview.get('height')
self.large_preview_url = large_preview.get("uri")
self.large_preview_width = large_preview.get("width")
self.large_preview_height = large_preview.get("height")
if animated_preview is None:
animated_preview = {}
self.animated_preview_url = animated_preview.get('uri')
self.animated_preview_width = animated_preview.get('width')
self.animated_preview_height = animated_preview.get('height')
self.animated_preview_url = animated_preview.get("uri")
self.animated_preview_width = animated_preview.get("width")
self.animated_preview_height = animated_preview.get("height")
class VideoAttachment(Attachment):
@@ -656,21 +656,21 @@ class VideoAttachment(Attachment):
if small_image is None:
small_image = {}
self.small_image_url = small_image.get('uri')
self.small_image_width = small_image.get('width')
self.small_image_height = small_image.get('height')
self.small_image_url = small_image.get("uri")
self.small_image_width = small_image.get("width")
self.small_image_height = small_image.get("height")
if medium_image is None:
medium_image = {}
self.medium_image_url = medium_image.get('uri')
self.medium_image_width = medium_image.get('width')
self.medium_image_height = medium_image.get('height')
self.medium_image_url = medium_image.get("uri")
self.medium_image_width = medium_image.get("width")
self.medium_image_height = medium_image.get("height")
if large_image is None:
large_image = {}
self.large_image_url = large_image.get('uri')
self.large_image_width = large_image.get('width')
self.large_image_height = large_image.get('height')
self.large_image_url = large_image.get("uri")
self.large_image_width = large_image.get("width")
self.large_image_height = large_image.get("height")
class Mention(object):
@@ -691,7 +691,7 @@ class Mention(object):
return self.__unicode__()
def __unicode__(self):
return '<Mention {}: offset={} length={}>'.format(
return "<Mention {}: offset={} length={}>".format(
self.thread_id, self.offset, self.length
)
@@ -716,7 +716,7 @@ class QuickReply(object):
return self.__unicode__()
def __unicode__(self):
return '<{}: payload={!r}>'.format(self.__class__.__name__, self.payload)
return "<{}: payload={!r}>".format(self.__class__.__name__, self.payload)
class QuickReplyText(QuickReply):
@@ -787,7 +787,7 @@ class Poll(object):
return self.__unicode__()
def __unicode__(self):
return '<Poll ({}): {} options={}>'.format(
return "<Poll ({}): {} options={}>".format(
self.uid, repr(self.title), self.options
)
@@ -813,7 +813,7 @@ class PollOption(object):
return self.__unicode__()
def __unicode__(self):
return '<PollOption ({}): {} voters={}>'.format(
return "<PollOption ({}): {} voters={}>".format(
self.uid, repr(self.text), self.voters
)
@@ -842,8 +842,8 @@ class Plan(object):
"""Represents a plan"""
self.time = int(time)
self.title = title
self.location = location or ''
self.location_id = location_id or ''
self.location = location or ""
self.location_id = location_id or ""
self.author_id = None
self.going = []
self.declined = []
@@ -853,7 +853,7 @@ class Plan(object):
return self.__unicode__()
def __unicode__(self):
return '<Plan ({}): {} time={}, location={}, location_id={}>'.format(
return "<Plan ({}): {} time={}, location={}, location_id={}>".format(
self.uid,
repr(self.title),
self.time,
@@ -879,7 +879,7 @@ class ActiveStatus(object):
return self.__unicode__()
def __unicode__(self):
return '<ActiveStatus: active={} last_active={} in_game={}>'.format(
return "<ActiveStatus: active={} last_active={} in_game={}>".format(
self.active, self.last_active, self.in_game
)
@@ -889,7 +889,7 @@ class Enum(aenum.Enum):
def __repr__(self):
# For documentation:
return '{}.{}'.format(type(self).__name__, self.name)
return "{}.{}".format(type(self).__name__, self.name)
class ThreadType(Enum):
@@ -904,10 +904,10 @@ class ThreadType(Enum):
class ThreadLocation(Enum):
"""Used to specify where a thread is located (inbox, pending, archived, other)."""
INBOX = 'INBOX'
PENDING = 'PENDING'
ARCHIVED = 'ARCHIVED'
OTHER = 'OTHER'
INBOX = "INBOX"
PENDING = "PENDING"
ARCHIVED = "ARCHIVED"
OTHER = "OTHER"
class TypingStatus(Enum):
@@ -920,38 +920,38 @@ class TypingStatus(Enum):
class EmojiSize(Enum):
"""Used to specify the size of a sent emoji"""
LARGE = '369239383222810'
MEDIUM = '369239343222814'
SMALL = '369239263222822'
LARGE = "369239383222810"
MEDIUM = "369239343222814"
SMALL = "369239263222822"
class ThreadColor(Enum):
"""Used to specify a thread colors"""
MESSENGER_BLUE = '#0084ff'
VIKING = '#44bec7'
GOLDEN_POPPY = '#ffc300'
RADICAL_RED = '#fa3c4c'
SHOCKING = '#d696bb'
PICTON_BLUE = '#6699cc'
FREE_SPEECH_GREEN = '#13cf13'
PUMPKIN = '#ff7e29'
LIGHT_CORAL = '#e68585'
MEDIUM_SLATE_BLUE = '#7646ff'
DEEP_SKY_BLUE = '#20cef5'
FERN = '#67b868'
CAMEO = '#d4a88c'
BRILLIANT_ROSE = '#ff5ca1'
BILOBA_FLOWER = '#a695c7'
MESSENGER_BLUE = "#0084ff"
VIKING = "#44bec7"
GOLDEN_POPPY = "#ffc300"
RADICAL_RED = "#fa3c4c"
SHOCKING = "#d696bb"
PICTON_BLUE = "#6699cc"
FREE_SPEECH_GREEN = "#13cf13"
PUMPKIN = "#ff7e29"
LIGHT_CORAL = "#e68585"
MEDIUM_SLATE_BLUE = "#7646ff"
DEEP_SKY_BLUE = "#20cef5"
FERN = "#67b868"
CAMEO = "#d4a88c"
BRILLIANT_ROSE = "#ff5ca1"
BILOBA_FLOWER = "#a695c7"
class MessageReaction(Enum):
"""Used to specify a message reaction"""
LOVE = '😍'
SMILE = '😆'
WOW = '😮'
SAD = '😢'
ANGRY = '😠'
YES = '👍'
NO = '👎'
LOVE = "😍"
SMILE = "😆"
WOW = "😮"
SAD = "😢"
ANGRY = "😠"
YES = "👍"
NO = "👎"

View File

@@ -48,37 +48,37 @@ USER_AGENTS = [
]
LIKES = {
'large': EmojiSize.LARGE,
'medium': EmojiSize.MEDIUM,
'small': EmojiSize.SMALL,
'l': EmojiSize.LARGE,
'm': EmojiSize.MEDIUM,
's': EmojiSize.SMALL,
"large": EmojiSize.LARGE,
"medium": EmojiSize.MEDIUM,
"small": EmojiSize.SMALL,
"l": EmojiSize.LARGE,
"m": EmojiSize.MEDIUM,
"s": EmojiSize.SMALL,
}
GENDERS = {
# For standard requests
0: 'unknown',
1: 'female_singular',
2: 'male_singular',
3: 'female_singular_guess',
4: 'male_singular_guess',
5: 'mixed',
6: 'neuter_singular',
7: 'unknown_singular',
8: 'female_plural',
9: 'male_plural',
10: 'neuter_plural',
11: 'unknown_plural',
0: "unknown",
1: "female_singular",
2: "male_singular",
3: "female_singular_guess",
4: "male_singular_guess",
5: "mixed",
6: "neuter_singular",
7: "unknown_singular",
8: "female_plural",
9: "male_plural",
10: "neuter_plural",
11: "unknown_plural",
# For graphql requests
'UNKNOWN': 'unknown',
'FEMALE': 'female_singular',
'MALE': 'male_singular',
"UNKNOWN": "unknown",
"FEMALE": "female_singular",
"MALE": "male_singular",
# '': 'female_singular_guess',
# '': 'male_singular_guess',
# '': 'mixed',
'NEUTER': 'neuter_singular',
"NEUTER": "neuter_singular",
# '': 'unknown_singular',
# '': 'female_plural',
# '': 'male_plural',
@@ -168,7 +168,7 @@ class ReqUrl(object):
)
facebookEncoding = 'UTF-8'
facebookEncoding = "UTF-8"
def now():
@@ -177,9 +177,9 @@ def now():
def strip_to_json(text):
try:
return text[text.index('{') :]
return text[text.index("{") :]
except ValueError:
raise FBchatException('No JSON object found: {!r}'.format(text))
raise FBchatException("No JSON object found: {!r}".format(text))
def get_decoded_r(r):
@@ -201,12 +201,12 @@ def get_json(r):
def digitToChar(digit):
if digit < 10:
return str(digit)
return chr(ord('a') + digit - 10)
return chr(ord("a") + digit - 10)
def str_base(number, base):
if number < 0:
return '-' + str_base(-number, base)
return "-" + str_base(-number, base)
(d, m) = divmod(number, base)
if d > 0:
return str_base(d, base) + digitToChar(m)
@@ -226,55 +226,55 @@ def getSignatureID():
def generateOfflineThreadingID():
ret = now()
value = int(random() * 4294967295)
string = ("0000000000000000000000" + format(value, 'b'))[-22:]
msgs = format(ret, 'b') + string
string = ("0000000000000000000000" + format(value, "b"))[-22:]
msgs = format(ret, "b") + string
return str(int(msgs, 2))
def check_json(j):
if j.get('error') is None:
if j.get("error") is None:
return
if 'errorDescription' in j:
if "errorDescription" in j:
# 'errorDescription' is in the users own language!
raise FBchatFacebookError(
'Error #{} when sending request: {}'.format(
j['error'], j['errorDescription']
"Error #{} when sending request: {}".format(
j["error"], j["errorDescription"]
),
fb_error_code=j['error'],
fb_error_message=j['errorDescription'],
fb_error_code=j["error"],
fb_error_message=j["errorDescription"],
)
elif 'debug_info' in j['error'] and 'code' in j['error']:
elif "debug_info" in j["error"] and "code" in j["error"]:
raise FBchatFacebookError(
'Error #{} when sending request: {}'.format(
j['error']['code'], repr(j['error']['debug_info'])
"Error #{} when sending request: {}".format(
j["error"]["code"], repr(j["error"]["debug_info"])
),
fb_error_code=j['error']['code'],
fb_error_message=j['error']['debug_info'],
fb_error_code=j["error"]["code"],
fb_error_message=j["error"]["debug_info"],
)
else:
raise FBchatFacebookError(
'Error {} when sending request'.format(j['error']), fb_error_code=j['error']
"Error {} when sending request".format(j["error"]), fb_error_code=j["error"]
)
def check_request(r, as_json=True):
if not r.ok:
raise FBchatFacebookError(
'Error when sending request: Got {} response'.format(r.status_code),
"Error when sending request: Got {} response".format(r.status_code),
request_status_code=r.status_code,
)
content = get_decoded_r(r)
if content is None or len(content) == 0:
raise FBchatFacebookError('Error when sending request: Got empty response')
raise FBchatFacebookError("Error when sending request: Got empty response")
if as_json:
content = strip_to_json(content)
try:
j = json.loads(content)
except ValueError:
raise FBchatFacebookError('Error while parsing JSON: {!r}'.format(content))
raise FBchatFacebookError("Error while parsing JSON: {!r}".format(content))
check_json(j)
log.debug(j)
return j
@@ -283,12 +283,12 @@ def check_request(r, as_json=True):
def get_jsmods_require(j, index):
if j.get('jsmods') and j['jsmods'].get('require'):
if j.get("jsmods") and j["jsmods"].get("require"):
try:
return j['jsmods']['require'][0][index][0]
return j["jsmods"]["require"][0][index][0]
except (KeyError, IndexError) as e:
log.warning(
'Error when getting jsmods_require: {}. Facebook might have changed protocol'.format(
"Error when getting jsmods_require: {}. Facebook might have changed protocol".format(
j
)
)
@@ -298,13 +298,13 @@ def get_jsmods_require(j, index):
def get_emojisize_from_tags(tags):
if tags is None:
return None
tmp = [tag for tag in tags if tag.startswith('hot_emoji_size:')]
tmp = [tag for tag in tags if tag.startswith("hot_emoji_size:")]
if len(tmp) > 0:
try:
return LIKES[tmp[0].split(':')[1]]
return LIKES[tmp[0].split(":")[1]]
except (KeyError, IndexError):
log.exception(
'Could not determine emoji size from {} - {}'.format(tags, tmp)
"Could not determine emoji size from {} - {}".format(tags, tmp)
)
return None
@@ -337,7 +337,7 @@ def get_files_from_urls(file_urls):
(
basename(file_url),
r.content,
r.headers.get('Content-Type') or guess_type(file_url)[0],
r.headers.get("Content-Type") or guess_type(file_url)[0],
)
)
return files
@@ -348,7 +348,7 @@ def get_files_from_paths(filenames):
files = []
for filename in filenames:
files.append(
(basename(filename), open(filename, 'rb'), guess_type(filename)[0])
(basename(filename), open(filename, "rb"), guess_type(filename)[0])
)
yield files
for fn, fp, ft in files:

View File

@@ -115,14 +115,14 @@ def compare(client, thread):
def message_with_mentions(request, client, client2, group):
text = "Hi there ["
mentions = []
if 'me' in request.param:
if "me" in request.param:
mentions.append(Mention(thread_id=client.uid, offset=len(text), length=2))
text += "me, "
if 'other' in request.param:
if "other" in request.param:
mentions.append(Mention(thread_id=client2.uid, offset=len(text), length=5))
text += "other, "
# Unused, because Facebook don't properly support sending mentions with groups as targets
if 'group' in request.param:
if "group" in request.param:
mentions.append(Mention(thread_id=group["id"], offset=len(text), length=5))
text += "group, "
text += "nothing]"

View File

@@ -120,6 +120,6 @@ def test_send_remote_files(client, catch_event, compare):
assert len(x.res["message_object"].attachments) == len(files)
@pytest.mark.parametrize('wave_first', [True, False])
@pytest.mark.parametrize("wave_first", [True, False])
def test_wave(client, wave_first):
client.wave(wave_first)

View File

@@ -9,4 +9,4 @@ def test_catch_event(client2, catch_event):
mid = "test"
with catch_event("onMessage") as x:
client2.onMessage(mid=mid)
assert x.res['mid'] == mid
assert x.res["mid"] == mid

View File

@@ -135,7 +135,7 @@ def test_typing_status(client, catch_event, compare, status):
assert compare(x, status=status)
@pytest.mark.parametrize('require_admin_approval', [True, False])
@pytest.mark.parametrize("require_admin_approval", [True, False])
def test_change_approval_mode(client1, group, catch_event, require_admin_approval):
with catch_event("onApprovalModeChange") as x:
client1.changeGroupApprovalMode(require_admin_approval, group["id"])

View File

@@ -106,7 +106,7 @@ def load_client(n, cache):
client = Client(
load_variable("client{}_email".format(n), cache),
load_variable("client{}_password".format(n), cache),
user_agent='Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36',
user_agent="Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
session_cookies=cache.get("client{}_session".format(n), None),
max_tries=1,
)