Move listen methods out of Client and into MQTT class

Make MQTT class / `Listener` public
This commit is contained in:
Mads Marquart
2020-01-21 01:29:43 +01:00
parent 01f8578dea
commit 9b75db898a
6 changed files with 168 additions and 217 deletions

View File

@@ -1,19 +1,17 @@
import fbchat
# Subclass fbchat.Client and override required methods
class EchoBot(fbchat.Client):
def on_message(self, author_id, message_object, thread, **kwargs):
self.mark_as_delivered(thread.id, message_object.id)
self.mark_as_read(thread.id)
print("{} from {} in {}".format(message_object, thread))
# If you're not the author, echo
if author_id != self.session.user_id:
thread.send_text(message_object.text)
session = fbchat.Session.login("<email>", "<password>")
echo_bot = EchoBot(session)
echo_bot.listen()
listener = fbchat.Listener.connect(session, chat_on=False, foreground=False)
def on_message(event):
print(f"{event.message.text} from {event.author.id} in {event.thread.id}")
# If you're not the author, echo
if event.author.id != session.user_id:
event.thread.send_text(event.message.text)
for event in listener.listen():
if isinstance(event, fbchat.MessageEvent):
on_message(event)

View File

@@ -14,64 +14,77 @@ old_nicknames = {
"12345678904": "User nr. 4's nickname",
}
class KeepBot(fbchat.Client):
def on_color_change(self, author_id, new_color, thread, **kwargs):
if old_thread_id == thread.id and old_color != new_color:
print(
"{} changed the thread color. It will be changed back".format(author_id)
)
thread.set_color(old_color)
def on_emoji_change(self, author_id, new_emoji, thread, **kwargs):
if old_thread_id == thread.id and new_emoji != old_emoji:
print(
"{} changed the thread emoji. It will be changed back".format(author_id)
)
thread.set_emoji(old_emoji)
def on_people_added(self, added_ids, author_id, thread, **kwargs):
if old_thread_id == thread.id and author_id != self.session.user_id:
print("{} got added. They will be removed".format(added_ids))
for added_id in added_ids:
thread.remove_participant(added_id)
def on_person_removed(self, removed_id, author_id, thread, **kwargs):
# No point in trying to add ourself
if (
old_thread_id == thread.id
and removed_id != self.session.user_id
and author_id != self.session.user_id
):
print("{} got removed. They will be re-added".format(removed_id))
thread.add_participants(removed_id)
def on_title_change(self, author_id, new_title, thread, **kwargs):
if old_thread_id == thread.id and old_title != new_title:
print(
"{} changed the thread title. It will be changed back".format(author_id)
)
thread.set_title(old_title)
def on_nickname_change(
self, author_id, changed_for, new_nickname, thread, **kwargs
):
if (
old_thread_id == thread.id
and changed_for in old_nicknames
and old_nicknames[changed_for] != new_nickname
):
print(
"{} changed {}'s' nickname. It will be changed back".format(
author_id, changed_for
)
)
thread.set_nickname(
changed_for, old_nicknames[changed_for],
)
session = fbchat.Session.login("<email>", "<password>")
keep_bot = KeepBot(session)
keep_bot.listen()
listener = fbchat.Listener.connect(session, chat_on=False, foreground=False)
def on_color_set(event: fbchat.ColorSet):
if old_thread_id != event.thread.id:
return
if old_color != event.color:
print(f"{event.author.id} changed the thread color. It will be changed back")
event.thread.set_color(old_color)
def on_emoji_set(event: fbchat.EmojiSet):
if old_thread_id != event.thread.id:
return
if old_emoji != event.emoji:
print(f"{event.author.id} changed the thread emoji. It will be changed back")
event.thread.set_emoji(old_emoji)
def on_title_set(event: fbchat.TitleSet):
if old_thread_id != event.thread.id:
return
if old_title != event.title:
print(f"{event.author.id} changed the thread title. It will be changed back")
event.thread.set_title(old_title)
def on_nickname_set(event: fbchat.NicknameSet):
if old_thread_id != event.thread.id:
return
old_nickname = old_nicknames.get(event.subject.id)
if old_nickname != event.nickname:
print(
f"{event.author.id} changed {event.subject.id}'s' nickname."
" It will be changed back"
)
event.thread.set_nickname(event.subject.id, old_nickname)
def on_people_added(event: fbchat.PeopleAdded):
if old_thread_id != event.thread.id:
return
if event.author.id != session.user_id:
print(f"{', '.join(x.id for x in event.added)} got added. They will be removed")
for added in event.added:
event.thread.remove_participant(added.id)
def on_person_removed(event: fbchat.PersonRemoved):
if old_thread_id != event.thread.id:
return
# No point in trying to add ourself
if event.removed.id == session.user_id:
return
if event.author.id != session.user_id:
print(f"{event.removed.id} got removed. They will be re-added")
event.thread.add_participants([removed.id])
for event in listener.listen():
if isinstance(event, fbchat.ColorSet):
on_color_set(event)
elif isinstance(event, fbchat.EmojiSet):
on_emoji_set(event)
elif isinstance(event, fbchat.TitleSet):
on_title_set(event)
elif isinstance(event, fbchat.NicknameSet):
on_nickname_set(event)
elif isinstance(event, fbchat.PeopleAdded):
on_people_added(event)
elif isinstance(event, fbchat.PersonRemoved):
on_person_removed(event)

View File

@@ -1,23 +1,19 @@
import fbchat
class RemoveBot(fbchat.Client):
def on_message(self, author_id, message_object, thread, **kwargs):
# We can only kick people from group chats, so no need to try if it's a user chat
if message_object.text == "Remove me!" and isinstance(thread, fbchat.Group):
print("{} will be removed from {}".format(author_id, thread))
thread.remove_participant(author_id)
else:
# Sends the data to the inherited on_message, so that we can still see when a message is recieved
super(RemoveBot, self).on_message(
author_id=author_id,
message_object=message_object,
thread=thread,
**kwargs,
)
session = fbchat.Session.login("<email>", "<password>")
remove_bot = RemoveBot(session)
remove_bot.listen()
listener = fbchat.Listener.connect(session, chat_on=False, foreground=False)
def on_message(event):
# We can only kick people from group chats, so no need to try if it's a user chat
if not isinstance(event.thread, fbchat.Group):
return
if message.text == "Remove me!":
print(f"{event.author.id} will be removed from {event.thread.id}")
event.thread.remove_participant(event.author.id)
for event in listener.listen():
if isinstance(event, fbchat.MessageEvent):
on_message(event)

View File

@@ -79,6 +79,7 @@ from ._delta_type import (
PlanResponded,
)
from ._event import Typing, FriendRequest, Presence
from ._mqtt import Listener
from ._client import Client
@@ -92,7 +93,7 @@ __license__ = "BSD 3-Clause"
__author__ = "Taehoon Kim; Moreels Pieter-Jan; Mads Marquart"
__email__ = "carpedm20@gmail.com"
__all__ = ("Session", "Client")
__all__ = ("Session", "Listener", "Client")
# Everything below is taken from the excellent trio project:

View File

@@ -1,43 +1,27 @@
import attr
import datetime
from ._core import log
from ._core import log, attrs_default
from . import (
_exception,
_util,
_graphql,
_mqtt,
_session,
_poll,
_user,
_page,
_group,
_thread,
_message,
_event_common,
_event,
)
from ._thread import ThreadLocation
from ._user import User, UserData, ActiveStatus
from ._user import User, UserData
from ._group import Group, GroupData
from ._page import Page, PageData
from ._message import EmojiSize, Mention, Message
from ._attachment import Attachment
from ._sticker import Sticker
from ._location import LocationAttachment, LiveLocationAttachment
from ._file import ImageAttachment, VideoAttachment
from ._quick_reply import (
QuickReply,
QuickReplyText,
QuickReplyLocation,
QuickReplyPhoneNumber,
QuickReplyEmail,
)
from ._plan import PlanData
from typing import Sequence, Iterable, Tuple, Optional
@attrs_default
class Client:
"""A client for the Facebook Chat (Messenger).
@@ -46,24 +30,14 @@ class Client:
useful while listening).
"""
def __init__(self, session):
"""Initialize the client model.
Args:
session: The session to use when making requests.
"""
self._mark_alive = True
self._session = session
self._mqtt = None
#: The session to use when making requests.
_session = attr.ib(type=_session.Session)
@property
def session(self):
"""The session that's used when making requests."""
return self._session
def __repr__(self):
return "Client(session={!r})".format(self._session)
def fetch_users(self) -> Sequence[_user.UserData]:
"""Fetch users the client is currently chatting with.
@@ -585,66 +559,3 @@ class Client:
data["message_ids[{}]".format(i)] = message_id
j = self.session._payload_post("/ajax/mercury/delete_messages.php?dpr=1", data)
return True
"""
LISTEN METHODS
"""
def _parse_message(self, topic, data):
try:
for event in _event.parse_events(self.session, topic, data):
self.on_event(event)
except _exception.ParseError:
log.exception("Failed parsing MQTT data")
def _start_listening(self):
if not self._mqtt:
self._mqtt = _mqtt.Mqtt.connect(
session=self.session,
on_message=self._parse_message,
chat_on=self._mark_alive,
foreground=True,
)
def _do_one_listen(self):
# TODO: Remove this wierd check, and let the user handle the chat_on parameter
if self._mark_alive != self._mqtt._chat_on:
self._mqtt.set_chat_on(self._mark_alive)
return self._mqtt.loop_once()
def _stop_listening(self):
if not self._mqtt:
return
self._mqtt.disconnect()
# TODO: Preserve the _mqtt object
# Currently, there's some issues when disconnecting
self._mqtt = None
def listen(self, markAlive=None):
"""Initialize and runs the listening loop continually.
Args:
markAlive (bool): Whether this should ping the Facebook server each time the loop runs
"""
if markAlive is not None:
self.set_active_status(markAlive)
self._start_listening()
while self._do_one_listen():
pass
self._stop_listening()
def set_active_status(self, markAlive):
"""Change active status while listening.
Args:
markAlive (bool): Whether to show if client is active
"""
self._mark_alive = markAlive
def on_event(self, event: _event_common.Event):
"""Called when the client is listening, and an event happens."""
log.info("Got event: %s", event)

View File

@@ -2,11 +2,13 @@ import attr
import random
import paho.mqtt.client
import requests
from ._core import log
from . import _util, _exception, _graphql
from ._core import log, attrs_default
from . import _util, _exception, _session, _graphql, _event_common, _event
from typing import Iterable
def get_cookie_header(session, url):
def get_cookie_header(session: requests.Session, url: str) -> str:
"""Extract a cookie header from a requests session."""
# The cookies are extracted this way to make sure they're escaped correctly
return requests.cookies.get_cookie_header(
@@ -14,25 +16,34 @@ def get_cookie_header(session, url):
)
def generate_session_id():
def generate_session_id() -> int:
"""Generate a random session ID between 1 and 9007199254740991."""
return random.randint(1, 2 ** 53)
@attr.s(slots=True)
class Mqtt(object):
_session = attr.ib()
_mqtt = attr.ib()
_on_message = attr.ib()
_chat_on = attr.ib()
_foreground = attr.ib()
_sequence_id = attr.ib()
_sync_token = attr.ib(None)
@attrs_default
class Listener:
"""Helper, to listen for incoming Facebook events."""
_session = attr.ib(type=_session.Session)
_mqtt = attr.ib(type=paho.mqtt.client.Client)
_chat_on = attr.ib(type=bool)
_foreground = attr.ib(type=bool)
_sequence_id = attr.ib(type=int)
_sync_token = attr.ib(None, type=str)
_events = attr.ib(None, type=Iterable[_event_common.Event])
_HOST = "edge-chat.facebook.com"
@classmethod
def connect(cls, session, on_message, chat_on, foreground):
def connect(cls, session, chat_on: bool, foreground: bool):
"""Initialize a connection to the Facebook MQTT service.
Args:
session: The session to use when making requests.
chat_on: Whether ...
foreground: Whether ...
"""
mqtt = paho.mqtt.client.Client(
client_id="mqttwsclient",
clean_session=True,
@@ -50,7 +61,6 @@ class Mqtt(object):
self = cls(
session=session,
mqtt=mqtt,
on_message=on_message,
chat_on=chat_on,
foreground=foreground,
sequence_id=cls._fetch_sequence_id(session),
@@ -108,11 +118,14 @@ class Mqtt(object):
log.debug("MQTT payload: %s, %s", message.topic, j)
# Call the external callback
self._on_message(message.topic, j)
try:
# TODO: Don't handle this in a callback
self._events = list(_event.parse_events(self._session, message.topic, j))
except _exception.ParseError:
log.exception("Failed parsing MQTT data")
@staticmethod
def _fetch_sequence_id(session):
def _fetch_sequence_id(session) -> int:
"""Fetch sequence ID."""
params = {
"limit": 1,
@@ -258,14 +271,11 @@ class Mqtt(object):
path="/chat?sid={}".format(session_id), headers=headers
)
def loop_once(self):
"""Run the listening loop once.
Returns whether to keep listening or not.
"""
def _loop_once(self) -> bool:
rc = self._mqtt.loop(timeout=1.0)
# If disconnect() has been called
# Beware, internal API, may have to change this to something more stable!
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
return False # Stop listening
@@ -298,23 +308,45 @@ class Mqtt(object):
return True # Keep listening
def disconnect(self):
def listen(self) -> Iterable[_event_common.Event]:
"""Run the listening loop continually.
Yields events when they arrive.
This will automatically reconnect on errors.
"""
while self._loop_once():
if self._events:
yield from self._events
self._events = None
def disconnect(self) -> None:
"""Disconnect the MQTT listener.
Can be called while listening, which will stop the listening loop.
The `Listener` object should not be used after this is called!
"""
self._mqtt.disconnect()
def set_foreground(self, value):
def set_foreground(self, value: bool) -> None:
"""Set the `foreground` value while listening."""
# TODO: Document what this actually does!
payload = _util.json_minimal({"foreground": value})
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
self._foreground = value
# TODO: We can't wait for this, since the loop is running with .loop_forever()
# TODO: We can't wait for this, since the loop is running within the same thread
# info.wait_for_publish()
def set_chat_on(self, value):
def set_chat_on(self, value: bool) -> None:
"""Set the `chat_on` value while listening."""
# TODO: Document what this actually does!
# TODO: Is this the right request to make?
data = {"make_user_available_when_in_foreground": value}
payload = _util.json_minimal(data)
info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1)
self._chat_on = value
# TODO: We can't wait for this, since the loop is running with .loop_forever()
# TODO: We can't wait for this, since the loop is running within the same thread
# info.wait_for_publish()
# def send_additional_contacts(self, additional_contacts):