Add top level MQTT topic parsing to a separate file

This commit is contained in:
Mads Marquart
2020-01-20 16:46:26 +01:00
parent 0a6bf221e6
commit 01f8578dea
11 changed files with 281 additions and 178 deletions

View File

@@ -42,6 +42,7 @@ from ._quick_reply import (
from ._poll import Poll, PollOption
from ._plan import GuestStatus, Plan, PlanData
# Listen events
from ._event_common import Event, UnknownEvent, ThreadEvent
from ._client_payload import (
ReactionEvent,
@@ -77,6 +78,7 @@ from ._delta_type import (
PlanDeleted,
PlanResponded,
)
from ._event import Typing, FriendRequest, Presence
from ._client import Client

View File

@@ -14,9 +14,7 @@ from . import (
_thread,
_message,
_event_common,
_client_payload,
_delta_class,
_delta_type,
_event,
)
from ._thread import ThreadLocation
@@ -55,7 +53,6 @@ class Client:
session: The session to use when making requests.
"""
self._mark_alive = True
self._buddylist = dict()
self._session = session
self._mqtt = None
@@ -463,22 +460,6 @@ class Client:
data = self._get_private_data()
return [j["display_email"] for j in data["all_emails"]]
def get_user_active_status(self, user_id):
"""Fetch friend active status as an `ActiveStatus` object.
Return ``None`` if status isn't known.
Warning:
Only works when listening.
Args:
user_id: ID of the user
Returns:
ActiveStatus: Given user active status
"""
return self._buddylist.get(str(user_id))
def mark_as_delivered(self, thread_id, message_id):
"""Mark a message as delivered.
@@ -609,88 +590,12 @@ class Client:
LISTEN METHODS
"""
def _parse_delta(self, delta):
# Client payload (that weird numbers)
if delta.get("class") == "ClientPayload":
for event in _client_payload.parse_client_payloads(self.session, delta):
self.on_event(event)
elif delta.get("class"):
event = _delta_class.parse_delta(self.session, delta)
if event:
self.on_event(event)
elif delta.get("type"):
self.on_event(_delta_type.parse_delta(self.session, delta))
# Unknown message type
else:
self.on_unknown_messsage_type(msg=delta)
def _parse_payload(self, topic, m):
# Things that directly change chat
if topic == "/t_ms":
if "deltas" not in m:
return
for delta in m["deltas"]:
self._parse_delta(delta)
# TODO: Remove old parsing below
# Inbox
elif topic == "inbox":
self.on_inbox(
unseen=m["unseen"],
unread=m["unread"],
recent_unread=m["recent_unread"],
)
# Typing
# /thread_typing {'sender_fbid': X, 'state': 1, 'type': 'typ', 'thread': 'Y'}
# /orca_typing_notifications {'type': 'typ', 'sender_fbid': X, 'state': 0}
elif topic in ("/thread_typing", "/orca_typing_notifications"):
author_id = str(m["sender_fbid"])
thread_id = m.get("thread")
if thread_id:
thread = _group.Group(session=self.session, id=str(thread_id))
else:
thread = _user.User(session=self.session, id=author_id)
self.on_typing(
author_id=author_id, status=m["state"] == 1, thread=thread,
)
# Other notifications
elif topic == "/legacy_web":
# Friend request
if m["type"] == "jewel_requests_add":
self.on_friend_request(from_id=str(m["from"]))
else:
self.on_unknown_messsage_type(msg=m)
# Chat timestamp / Buddylist overlay
elif topic == "/orca_presence":
if m["list_type"] == "full":
self._buddylist = {} # Refresh internal list
statuses = dict()
for data in m["list"]:
user_id = str(data["u"])
statuses[user_id] = ActiveStatus._from_orca_presence(data)
self._buddylist[user_id] = statuses[user_id]
# TODO: Which one should we call?
self.on_chat_timestamp(buddylist=statuses)
self.on_buddylist_overlay(statuses=statuses)
# Unknown message type
else:
self.on_unknown_messsage_type(msg=m)
def _parse_message(self, topic, data):
try:
self._parse_payload(topic, data)
except Exception as e:
self.on_message_error(exception=e, msg=data)
for event in _event.parse_events(self.session, topic, data):
self.on_event(event)
except _exception.ParseError:
log.exception("Failed parsing MQTT data")
def _start_listening(self):
if not self._mqtt:
@@ -740,78 +645,6 @@ class Client:
"""
self._mark_alive = markAlive
"""
END LISTEN METHODS
"""
"""
EVENTS
"""
def on_event(self, event: _event_common.Event):
"""Called when the client is listening, and an event happens."""
log.info("Got event: %s", event)
def on_friend_request(self, from_id=None):
"""Called when the client is listening, and somebody sends a friend request.
Args:
from_id: The ID of the person that sent the request
"""
log.info("Friend request from {}".format(from_id))
def on_inbox(self, unseen=None, unread=None, recent_unread=None):
"""
Todo:
Documenting this
Args:
unseen: --
unread: --
recent_unread: --
"""
log.info("Inbox event: {}, {}, {}".format(unseen, unread, recent_unread))
def on_typing(self, author_id=None, status=None, thread=None):
"""Called when the client is listening, and somebody starts or stops typing into a chat.
Args:
author_id: The ID of the person who sent the action
is_typing: ``True`` if the user started typing, ``False`` if they stopped.
thread: Thread that the action was sent to. See :ref:`intro_threads`
"""
pass
def on_chat_timestamp(self, buddylist=None):
"""Called when the client receives chat online presence update.
Args:
buddylist: A list of dictionaries with friend id and last seen timestamp
"""
log.debug("Chat Timestamps received: {}".format(buddylist))
def on_buddylist_overlay(self, statuses=None):
"""Called when the client is listening and client receives information about friend active status.
Args:
statuses (dict): Dictionary with user IDs as keys and `ActiveStatus` as values
"""
def on_unknown_messsage_type(self, msg=None):
"""Called when the client is listening, and some unknown data was received.
Args:
"""
log.debug("Unknown message received: {}".format(msg))
def on_message_error(self, exception=None, msg=None):
"""Called when an error was encountered while parsing received data.
Args:
exception: The exception that was encountered
"""
log.exception("Exception in parsing of {}".format(msg))
"""
END EVENTS
"""

View File

@@ -121,7 +121,7 @@ def parse_client_delta(session, data):
return UnsendEvent._parse(session, data["deltaRecallMessageData"])
elif "deltaMessageReply" in data:
return MessageReplyEvent._parse(session, data["deltaMessageReply"])
return UnknownEvent(data=data)
return UnknownEvent(source="client payload", data=data)
def parse_client_payloads(session, data):

View File

@@ -184,4 +184,4 @@ def parse_delta(session, data):
return X._parse(session, data)
elif class_ == "NewMessage":
return MessageEvent._parse(session, data)
return UnknownEvent(data=data)
return UnknownEvent(source="Delta class", data=data)

View File

@@ -326,4 +326,4 @@ def parse_delta(session, data):
return PlanDeleted._parse(session, data)
elif type_ == "lightweight_event_rsvp":
return PlanResponded._parse(session, data)
return UnknownEvent(data=data)
return UnknownEvent(source="Delta type", data=data)

123
fbchat/_event.py Normal file
View File

@@ -0,0 +1,123 @@
import attr
import datetime
from ._event_common import attrs_event, Event, UnknownEvent, ThreadEvent
from . import (
_exception,
_util,
_user,
_group,
_thread,
_client_payload,
_delta_class,
_delta_type,
)
from typing import Mapping
@attrs_event
class Typing(ThreadEvent):
"""Somebody started/stopped typing in a thread."""
#: ``True`` if the user started typing, ``False`` if they stopped
status = attr.ib(type=bool)
@classmethod
def _parse_orca(cls, session, data):
author = _user.User(session=session, id=str(data["sender_fbid"]))
status = data["state"] == 1
return cls(author=author, thread=author, status=status)
@classmethod
def _parse(cls, session, data):
# TODO: Rename this method
author = _user.User(session=session, id=str(data["sender_fbid"]))
thread = _group.Group(session=session, id=str(data["thread"]))
status = data["state"] == 1
return cls(author=author, thread=thread, status=status)
@attrs_event
class FriendRequest(Event):
"""Somebody sent a friend request."""
#: The user that sent the request
author = attr.ib(type=_user.User)
@classmethod
def _parse(cls, session, data):
author = _user.User(session=session, id=str(data["from"]))
return cls(author=author)
@attrs_event
class Presence(Event):
"""The list of active statuses was updated.
Chat online presence update.
"""
# TODO: Document this better!
#: User ids mapped to their active status
statuses = attr.ib(type=Mapping[str, _user.ActiveStatus])
#: ``True`` if the list is fully updated and ``False`` if it's partially updated
full = attr.ib(type=bool)
@classmethod
def _parse(cls, session, data):
statuses = {
str(d["u"]): _user.ActiveStatus._from_orca_presence(d) for d in data["list"]
}
return cls(statuses=statuses, full=data["list_type"] == "full")
def parse_delta(session, data):
try:
class_ = data.get("class")
if class_ == "ClientPayload":
yield from _client_payload.parse_client_payloads(session, data)
elif class_ == "AdminTextMessage":
yield _delta_type.parse_delta(session, data)
else:
event = _delta_class.parse_delta(session, data)
if event: # Skip `None`
yield event
except _exception.ParseError:
raise
except Exception as e:
raise _exception.ParseError("Error parsing delta", data=data) from e
def parse_events(session, topic, data):
# See Mqtt._configure_connect_options for information about these topics
try:
if topic == "/t_ms":
if "deltas" not in data:
return
for delta in data["deltas"]:
yield from parse_delta(session, delta)
elif topic == "/thread_typing":
yield Typing._parse(session, data)
elif topic == "/orca_typing_notifications":
yield Typing._parse_orca(session, data)
elif topic == "/legacy_web":
if data.get("type") == "jewel_requests_add":
yield FriendRequest._parse(session, data)
else:
yield UnknownEvent(source="/legacy_web", data=data)
elif topic == "/orca_presence":
yield Presence._parse(session, data)
else:
yield UnknownEvent(source=topic, data=data)
except _exception.ParseError:
raise
except Exception as e:
raise _exception.ParseError(
"Error parsing MQTT topic {}".format(topic), data=data
) from e

View File

@@ -21,6 +21,8 @@ class Event(metaclass=abc.ABCMeta):
class UnknownEvent(Event):
"""Represent an unknown event."""
#: Some data describing the unknown event's origin
source = attr.ib()
#: The unknown data. This cannot be relied on, it's only for debugging purposes.
data = attr.ib()

View File

@@ -153,7 +153,9 @@ def test_message_reply(session):
def test_parse_client_delta_unknown(session):
assert UnknownEvent(data={"abc": 10}) == parse_client_delta(session, {"abc": 10})
assert UnknownEvent(
source="client payload", data={"abc": 10}
) == parse_client_delta(session, {"abc": 10})
def test_parse_client_payloads_empty(session):

View File

@@ -272,4 +272,6 @@ def test_noop(session):
def test_parse_delta_unknown(session):
assert UnknownEvent(data={"abc": 10}) == parse_delta(session, {"abc": 10})
assert UnknownEvent(source="Delta class", data={"abc": 10}) == parse_delta(
session, {"abc": 10}
)

View File

@@ -951,4 +951,6 @@ def test_plan_participation(session):
def test_parse_delta_unknown(session):
assert UnknownEvent(data={"abc": 10}) == parse_delta(session, {"abc": 10})
assert UnknownEvent(source="Delta type", data={"abc": 10}) == parse_delta(
session, {"abc": 10}
)

137
tests/test_event.py Normal file
View File

@@ -0,0 +1,137 @@
import datetime
from fbchat import (
_util,
User,
Group,
Message,
ParseError,
UnknownEvent,
Typing,
FriendRequest,
Presence,
ReactionEvent,
UnfetchedThreadEvent,
ActiveStatus,
)
from fbchat._event import parse_delta, parse_events
def test_t_ms_full(session):
"""A full example of parsing of data in /t_ms."""
payload = {
"deltas": [
{
"deltaMessageReaction": {
"threadKey": {"threadFbId": 4321},
"messageId": "mid.$XYZ",
"action": 0,
"userId": 1234,
"reaction": "😢",
"senderId": 1234,
"offlineThreadingId": "1122334455",
}
}
]
}
data = {
"deltas": [
{
"payload": [ord(x) for x in _util.json_minimal(payload)],
"class": "ClientPayload",
},
{"class": "NoOp",},
{
"forceInsert": False,
"messageId": "mid.$ABC",
"threadKey": {"threadFbId": "4321"},
"class": "ForcedFetch",
},
],
"firstDeltaSeqId": 111111,
"lastIssuedSeqId": 111113,
"queueEntityId": 1234,
}
thread = Group(session=session, id="4321")
assert [
ReactionEvent(
author=User(session=session, id="1234"),
thread=thread,
message=Message(thread=thread, id="mid.$XYZ"),
reaction="😢",
),
UnfetchedThreadEvent(
thread=thread, message=Message(thread=thread, id="mid.$ABC"),
),
] == list(parse_events(session, "/t_ms", data))
def test_thread_typing(session):
data = {"sender_fbid": 1234, "state": 0, "type": "typ", "thread": "4321"}
(event,) = parse_events(session, "/thread_typing", data)
assert event == Typing(
author=User(session=session, id="1234"),
thread=Group(session=session, id="4321"),
status=False,
)
def test_orca_typing_notifications(session):
data = {"type": "typ", "sender_fbid": 1234, "state": 1}
(event,) = parse_events(session, "/orca_typing_notifications", data)
assert event == Typing(
author=User(session=session, id="1234"),
thread=User(session=session, id="1234"),
status=True,
)
def test_friend_request(session):
data = {"type": "jewel_requests_add", "from": "1234"}
(event,) = parse_events(session, "/legacy_web", data)
assert event == FriendRequest(author=User(session=session, id="1234"))
def test_orca_presence_inc(session):
data = {
"list_type": "inc",
"list": [
{"u": 1234, "p": 0, "l": 1500000000, "vc": 74},
{"u": 2345, "p": 2, "c": 9969664, "vc": 10},
],
}
(event,) = parse_events(session, "/orca_presence", data)
la = datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc)
assert event == Presence(
statuses={
"1234": ActiveStatus(active=False, last_active=la),
"2345": ActiveStatus(active=True),
},
full=False,
)
def test_orca_presence_full(session):
data = {
"list_type": "full",
"list": [
{"u": 1234, "p": 2, "c": 5767242},
{"u": 2345, "p": 2, "l": 1500000000},
{"u": 3456, "p": 2, "c": 9961482},
{"u": 4567, "p": 0, "l": 1500000000},
{"u": 5678, "p": 0},
{"u": 6789, "p": 2, "c": 14168154},
],
}
(event,) = parse_events(session, "/orca_presence", data)
la = datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc)
assert event == Presence(
statuses={
"1234": ActiveStatus(active=True),
"2345": ActiveStatus(active=True, last_active=la),
"3456": ActiveStatus(active=True),
"4567": ActiveStatus(active=False, last_active=la),
"5678": ActiveStatus(active=False),
"6789": ActiveStatus(active=True),
},
full=True,
)