Merge pull request #515 from carpedm20/refactor-listen-parsing
Refactor listen parsing
This commit is contained in:
@@ -2,11 +2,13 @@ import attr
|
||||
import random
|
||||
import paho.mqtt.client
|
||||
import requests
|
||||
from ._core import log
|
||||
from . import _util, _exception, _graphql
|
||||
from ._core import log, attrs_default
|
||||
from . import _util, _exception, _session, _graphql, _event_common, _event
|
||||
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
def get_cookie_header(session, url):
|
||||
def get_cookie_header(session: requests.Session, url: str) -> str:
|
||||
"""Extract a cookie header from a requests session."""
|
||||
# The cookies are extracted this way to make sure they're escaped correctly
|
||||
return requests.cookies.get_cookie_header(
|
||||
@@ -14,25 +16,34 @@ def get_cookie_header(session, url):
|
||||
)
|
||||
|
||||
|
||||
def generate_session_id():
|
||||
def generate_session_id() -> int:
|
||||
"""Generate a random session ID between 1 and 9007199254740991."""
|
||||
return random.randint(1, 2 ** 53)
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class Mqtt(object):
|
||||
_session = attr.ib()
|
||||
_mqtt = attr.ib()
|
||||
_on_message = attr.ib()
|
||||
_chat_on = attr.ib()
|
||||
_foreground = attr.ib()
|
||||
_sequence_id = attr.ib()
|
||||
_sync_token = attr.ib(None)
|
||||
@attrs_default
|
||||
class Listener:
|
||||
"""Helper, to listen for incoming Facebook events."""
|
||||
|
||||
_session = attr.ib(type=_session.Session)
|
||||
_mqtt = attr.ib(type=paho.mqtt.client.Client)
|
||||
_chat_on = attr.ib(type=bool)
|
||||
_foreground = attr.ib(type=bool)
|
||||
_sequence_id = attr.ib(type=int)
|
||||
_sync_token = attr.ib(None, type=str)
|
||||
_events = attr.ib(None, type=Iterable[_event_common.Event])
|
||||
|
||||
_HOST = "edge-chat.facebook.com"
|
||||
|
||||
@classmethod
|
||||
def connect(cls, session, on_message, chat_on, foreground):
|
||||
def connect(cls, session, chat_on: bool, foreground: bool):
|
||||
"""Initialize a connection to the Facebook MQTT service.
|
||||
|
||||
Args:
|
||||
session: The session to use when making requests.
|
||||
chat_on: Whether ...
|
||||
foreground: Whether ...
|
||||
"""
|
||||
mqtt = paho.mqtt.client.Client(
|
||||
client_id="mqttwsclient",
|
||||
clean_session=True,
|
||||
@@ -50,7 +61,6 @@ class Mqtt(object):
|
||||
self = cls(
|
||||
session=session,
|
||||
mqtt=mqtt,
|
||||
on_message=on_message,
|
||||
chat_on=chat_on,
|
||||
foreground=foreground,
|
||||
sequence_id=cls._fetch_sequence_id(session),
|
||||
@@ -109,11 +119,14 @@ class Mqtt(object):
|
||||
|
||||
log.debug("MQTT payload: %s, %s", message.topic, j)
|
||||
|
||||
# Call the external callback
|
||||
self._on_message(message.topic, j)
|
||||
try:
|
||||
# TODO: Don't handle this in a callback
|
||||
self._events = list(_event.parse_events(self._session, message.topic, j))
|
||||
except _exception.ParseError:
|
||||
log.exception("Failed parsing MQTT data")
|
||||
|
||||
@staticmethod
|
||||
def _fetch_sequence_id(session):
|
||||
def _fetch_sequence_id(session) -> int:
|
||||
"""Fetch sequence ID."""
|
||||
params = {
|
||||
"limit": 1,
|
||||
@@ -258,14 +271,11 @@ class Mqtt(object):
|
||||
path="/chat?sid={}".format(session_id), headers=headers
|
||||
)
|
||||
|
||||
def loop_once(self):
|
||||
"""Run the listening loop once.
|
||||
|
||||
Returns whether to keep listening or not.
|
||||
"""
|
||||
def _loop_once(self) -> bool:
|
||||
rc = self._mqtt.loop(timeout=1.0)
|
||||
|
||||
# If disconnect() has been called
|
||||
# Beware, internal API, may have to change this to something more stable!
|
||||
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
|
||||
return False # Stop listening
|
||||
|
||||
@@ -298,23 +308,45 @@ class Mqtt(object):
|
||||
|
||||
return True # Keep listening
|
||||
|
||||
def disconnect(self):
|
||||
def listen(self) -> Iterable[_event_common.Event]:
|
||||
"""Run the listening loop continually.
|
||||
|
||||
Yields events when they arrive.
|
||||
|
||||
This will automatically reconnect on errors.
|
||||
"""
|
||||
while self._loop_once():
|
||||
if self._events:
|
||||
yield from self._events
|
||||
self._events = None
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Disconnect the MQTT listener.
|
||||
|
||||
Can be called while listening, which will stop the listening loop.
|
||||
|
||||
The `Listener` object should not be used after this is called!
|
||||
"""
|
||||
self._mqtt.disconnect()
|
||||
|
||||
def set_foreground(self, value):
|
||||
def set_foreground(self, value: bool) -> None:
|
||||
"""Set the `foreground` value while listening."""
|
||||
# TODO: Document what this actually does!
|
||||
payload = _util.json_minimal({"foreground": value})
|
||||
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
|
||||
self._foreground = value
|
||||
# TODO: We can't wait for this, since the loop is running with .loop_forever()
|
||||
# TODO: We can't wait for this, since the loop is running within the same thread
|
||||
# info.wait_for_publish()
|
||||
|
||||
def set_chat_on(self, value):
|
||||
def set_chat_on(self, value: bool) -> None:
|
||||
"""Set the `chat_on` value while listening."""
|
||||
# TODO: Document what this actually does!
|
||||
# TODO: Is this the right request to make?
|
||||
data = {"make_user_available_when_in_foreground": value}
|
||||
payload = _util.json_minimal(data)
|
||||
info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1)
|
||||
self._chat_on = value
|
||||
# TODO: We can't wait for this, since the loop is running with .loop_forever()
|
||||
# TODO: We can't wait for this, since the loop is running within the same thread
|
||||
# info.wait_for_publish()
|
||||
|
||||
# def send_additional_contacts(self, additional_contacts):
|
||||
|
Reference in New Issue
Block a user