Refactor and improve MQTT listener a bit
This commit is contained in:
422
fbchat/_mqtt.py
422
fbchat/_mqtt.py
@@ -8,210 +8,9 @@ from . import _util, _exception, _session, _graphql, _events
|
||||
from typing import Iterable, Optional
|
||||
|
||||
|
||||
def get_cookie_header(session: requests.Session, url: str) -> str:
|
||||
"""Extract a cookie header from a requests session."""
|
||||
# The cookies are extracted this way to make sure they're escaped correctly
|
||||
return requests.cookies.get_cookie_header(
|
||||
session.cookies, requests.Request("GET", url),
|
||||
)
|
||||
HOST = "edge-chat.facebook.com"
|
||||
|
||||
|
||||
def generate_session_id() -> int:
|
||||
"""Generate a random session ID between 1 and 9007199254740991."""
|
||||
return random.randint(1, 2 ** 53)
|
||||
|
||||
|
||||
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
|
||||
class Listener:
|
||||
"""Helper, to listen for incoming Facebook events."""
|
||||
|
||||
session = attr.ib(type=_session.Session)
|
||||
_mqtt = attr.ib(type=paho.mqtt.client.Client)
|
||||
_chat_on = attr.ib(type=bool)
|
||||
_foreground = attr.ib(type=bool)
|
||||
_sequence_id = attr.ib(type=int)
|
||||
_sync_token = attr.ib(None, type=str)
|
||||
_tmp_events = attr.ib(None, type=Optional[Iterable[_events.Event]])
|
||||
|
||||
_HOST = "edge-chat.facebook.com"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
# An alternative repr, to illustrate that you can't create the class directly
|
||||
return "<fbchat.Listener session={} chat_on={} foreground={}>".format(
|
||||
self.session, self._chat_on, self._foreground
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def connect(cls, session: _session.Session, chat_on: bool, foreground: bool):
|
||||
"""Initialize a connection to the Facebook MQTT service.
|
||||
|
||||
Args:
|
||||
session: The session to use when making requests.
|
||||
chat_on: Whether ...
|
||||
foreground: Whether ...
|
||||
|
||||
Example:
|
||||
>>> listener = fbchat.Listener.connect(session, chat_on=True, foreground=True)
|
||||
"""
|
||||
mqtt = paho.mqtt.client.Client(
|
||||
client_id="mqttwsclient",
|
||||
clean_session=True,
|
||||
protocol=paho.mqtt.client.MQTTv31,
|
||||
transport="websockets",
|
||||
)
|
||||
mqtt.enable_logger()
|
||||
# mqtt.max_inflight_messages_set(20) # The rest will get queued
|
||||
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
|
||||
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
|
||||
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
|
||||
# TODO: Is region (lla | atn | odn | others?) important?
|
||||
mqtt.tls_set()
|
||||
|
||||
self = cls(
|
||||
session=session,
|
||||
mqtt=mqtt,
|
||||
chat_on=chat_on,
|
||||
foreground=foreground,
|
||||
sequence_id=cls._fetch_sequence_id(session),
|
||||
)
|
||||
|
||||
# Configure callbacks
|
||||
mqtt.on_message = self._on_message_handler
|
||||
mqtt.on_connect = self._on_connect_handler
|
||||
|
||||
self._configure_connect_options()
|
||||
|
||||
# Attempt to connect
|
||||
try:
|
||||
rc = mqtt.connect(self._HOST, 443, keepalive=10)
|
||||
except (
|
||||
# Taken from .loop_forever
|
||||
paho.mqtt.client.socket.error,
|
||||
OSError,
|
||||
paho.mqtt.client.WebsocketConnectionError,
|
||||
) as e:
|
||||
raise _exception.FacebookError("MQTT connection failed") from e
|
||||
|
||||
# Raise error if connecting failed
|
||||
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
|
||||
err = paho.mqtt.client.error_string(rc)
|
||||
raise _exception.FacebookError("MQTT connection failed: {}".format(err))
|
||||
|
||||
return self
|
||||
|
||||
def _on_message_handler(self, client, userdata, message):
|
||||
# Parse payload JSON
|
||||
try:
|
||||
j = _util.parse_json(message.payload.decode("utf-8"))
|
||||
except (_exception.FacebookError, UnicodeDecodeError):
|
||||
log.debug(message.payload)
|
||||
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
|
||||
return
|
||||
|
||||
log.debug("MQTT payload: %s, %s", message.topic, j)
|
||||
|
||||
if message.topic == "/t_ms":
|
||||
# Update sync_token when received
|
||||
# This is received in the first message after we've created a messenger
|
||||
# sync queue.
|
||||
if "syncToken" in j and "firstDeltaSeqId" in j:
|
||||
self._sync_token = j["syncToken"]
|
||||
self._sequence_id = j["firstDeltaSeqId"]
|
||||
return
|
||||
|
||||
if "errorCode" in j:
|
||||
error = j["errorCode"]
|
||||
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
|
||||
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
|
||||
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
|
||||
# much time passed, or that it was simply missing
|
||||
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
|
||||
# the desired events could not be retrieved
|
||||
log.error(
|
||||
"The MQTT listener was disconnected for too long,"
|
||||
" events may have been lost"
|
||||
)
|
||||
self._sync_token = None
|
||||
self._sequence_id = self._fetch_sequence_id(self.session)
|
||||
self._messenger_queue_publish()
|
||||
# TODO: Signal to the user that they should reload their data!
|
||||
return
|
||||
log.error("MQTT error code %s received", error)
|
||||
return
|
||||
|
||||
# Update last sequence id when received
|
||||
if "lastIssuedSeqId" in j:
|
||||
self._sequence_id = j["lastIssuedSeqId"]
|
||||
else:
|
||||
log.error("Missing last sequence id: %s", j)
|
||||
|
||||
try:
|
||||
# TODO: Don't handle this in a callback
|
||||
self._tmp_events = list(
|
||||
_events.parse_events(self.session, message.topic, j)
|
||||
)
|
||||
except _exception.ParseError:
|
||||
log.exception("Failed parsing MQTT data")
|
||||
|
||||
@staticmethod
|
||||
def _fetch_sequence_id(session: _session.Session) -> int:
|
||||
"""Fetch sequence ID."""
|
||||
params = {
|
||||
"limit": 0,
|
||||
"tags": ["INBOX"],
|
||||
"before": None,
|
||||
"includeDeliveryReceipts": False,
|
||||
"includeSeqID": True,
|
||||
}
|
||||
log.debug("Fetching MQTT sequence ID")
|
||||
# Same request as in `Client.fetch_threads`
|
||||
(j,) = session._graphql_requests(
|
||||
_graphql.from_doc_id("1349387578499440", params)
|
||||
)
|
||||
sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"]
|
||||
if not sequence_id:
|
||||
raise _exception.NotLoggedIn("Failed fetching sequence id")
|
||||
return int(sequence_id)
|
||||
|
||||
def _on_connect_handler(self, client, userdata, flags, rc):
|
||||
if rc == 21:
|
||||
raise _exception.FacebookError(
|
||||
"Failed connecting. Maybe your cookies are wrong?"
|
||||
)
|
||||
if rc != 0:
|
||||
return # Don't try to send publish if the connection failed
|
||||
|
||||
self._messenger_queue_publish()
|
||||
|
||||
def _messenger_queue_publish(self):
|
||||
# configure receiving messages.
|
||||
payload = {
|
||||
"sync_api_version": 10,
|
||||
"max_deltas_able_to_process": 1000,
|
||||
"delta_batch_size": 500,
|
||||
"encoding": "JSON",
|
||||
"entity_fbid": self.session.user.id,
|
||||
}
|
||||
|
||||
# If we don't have a sync_token, create a new messenger queue
|
||||
# This is done so that across reconnects, if we've received a sync token, we
|
||||
# SHOULD receive a piece of data in /t_ms exactly once!
|
||||
if self._sync_token is None:
|
||||
topic = "/messenger_sync_create_queue"
|
||||
payload["initial_titan_sequence_id"] = str(self._sequence_id)
|
||||
payload["device_params"] = None
|
||||
else:
|
||||
topic = "/messenger_sync_get_diffs"
|
||||
payload["last_seq_id"] = str(self._sequence_id)
|
||||
payload["sync_token"] = self._sync_token
|
||||
|
||||
self._mqtt.publish(topic, _util.json_minimal(payload), qos=1)
|
||||
|
||||
def _configure_connect_options(self):
|
||||
# Generate a new session ID on each reconnect
|
||||
session_id = generate_session_id()
|
||||
|
||||
topics = [
|
||||
TOPICS = [
|
||||
# Things that happen in chats (e.g. messages)
|
||||
"/t_ms",
|
||||
# Group typing notifications
|
||||
@@ -248,6 +47,216 @@ class Listener:
|
||||
"/webrtc_response",
|
||||
]
|
||||
|
||||
|
||||
def get_cookie_header(session: requests.Session, url: str) -> str:
|
||||
"""Extract a cookie header from a requests session."""
|
||||
# The cookies are extracted this way to make sure they're escaped correctly
|
||||
return requests.cookies.get_cookie_header(
|
||||
session.cookies, requests.Request("GET", url),
|
||||
)
|
||||
|
||||
|
||||
def generate_session_id() -> int:
|
||||
"""Generate a random session ID between 1 and 9007199254740991."""
|
||||
return random.randint(1, 2 ** 53)
|
||||
|
||||
|
||||
def fetch_sequence_id(session: _session.Session) -> int:
|
||||
"""Fetch sequence ID."""
|
||||
params = {
|
||||
"limit": 0,
|
||||
"tags": ["INBOX"],
|
||||
"before": None,
|
||||
"includeDeliveryReceipts": False,
|
||||
"includeSeqID": True,
|
||||
}
|
||||
log.debug("Fetching MQTT sequence ID")
|
||||
# Same doc id as in `Client.fetch_threads`
|
||||
(j,) = session._graphql_requests(_graphql.from_doc_id("1349387578499440", params))
|
||||
sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"]
|
||||
if not sequence_id:
|
||||
raise _exception.NotLoggedIn("Failed fetching sequence id")
|
||||
return int(sequence_id)
|
||||
|
||||
|
||||
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
|
||||
class Listener:
|
||||
"""Helper, to listen for incoming Facebook events."""
|
||||
|
||||
session = attr.ib(type=_session.Session)
|
||||
_mqtt = attr.ib(type=paho.mqtt.client.Client)
|
||||
_chat_on = attr.ib(type=bool)
|
||||
_foreground = attr.ib(type=bool)
|
||||
_sequence_id = attr.ib(type=int)
|
||||
_sync_token = attr.ib(None, type=str)
|
||||
_tmp_events = attr.ib(None, type=Optional[Iterable[_events.Event]])
|
||||
|
||||
def __repr__(self) -> str:
|
||||
# An alternative repr, to illustrate that you can't create the class directly
|
||||
return "<fbchat.Listener session={} chat_on={} foreground={}>".format(
|
||||
self.session, self._chat_on, self._foreground
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def connect(cls, session: _session.Session, chat_on: bool, foreground: bool):
|
||||
"""Initialize a connection to the Facebook MQTT service.
|
||||
|
||||
Args:
|
||||
session: The session to use when making requests.
|
||||
chat_on: Whether ...
|
||||
foreground: Whether ...
|
||||
|
||||
Example:
|
||||
>>> listener = fbchat.Listener.connect(session, chat_on=True, foreground=True)
|
||||
"""
|
||||
mqtt = paho.mqtt.client.Client(
|
||||
client_id="mqttwsclient",
|
||||
clean_session=True,
|
||||
protocol=paho.mqtt.client.MQTTv31,
|
||||
transport="websockets",
|
||||
)
|
||||
mqtt.enable_logger()
|
||||
# mqtt.max_inflight_messages_set(20) # The rest will get queued
|
||||
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
|
||||
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
|
||||
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
|
||||
mqtt.tls_set()
|
||||
|
||||
self = cls(
|
||||
session=session,
|
||||
mqtt=mqtt,
|
||||
chat_on=chat_on,
|
||||
foreground=foreground,
|
||||
sequence_id=fetch_sequence_id(session),
|
||||
)
|
||||
|
||||
# Configure callbacks
|
||||
mqtt.on_message = self._on_message_handler
|
||||
mqtt.on_connect = self._on_connect_handler
|
||||
|
||||
self._configure_connect_options()
|
||||
|
||||
# Attempt to connect
|
||||
try:
|
||||
rc = mqtt.connect(HOST, 443, keepalive=10)
|
||||
except (
|
||||
# Taken from .loop_forever
|
||||
paho.mqtt.client.socket.error,
|
||||
OSError,
|
||||
paho.mqtt.client.WebsocketConnectionError,
|
||||
) as e:
|
||||
raise _exception.FacebookError("MQTT connection failed") from e
|
||||
|
||||
# Raise error if connecting failed
|
||||
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
|
||||
err = paho.mqtt.client.error_string(rc)
|
||||
raise _exception.FacebookError("MQTT connection failed: {}".format(err))
|
||||
|
||||
return self
|
||||
|
||||
def _handle_ms(self, j):
|
||||
"""Handle /t_ms special logic.
|
||||
|
||||
Returns whether to continue parsing the message.
|
||||
"""
|
||||
# TODO: Merge this with the parsing in _events
|
||||
|
||||
# Update sync_token when received
|
||||
# This is received in the first message after we've created a messenger
|
||||
# sync queue.
|
||||
if "syncToken" in j and "firstDeltaSeqId" in j:
|
||||
self._sync_token = j["syncToken"]
|
||||
self._sequence_id = j["firstDeltaSeqId"]
|
||||
return False
|
||||
|
||||
if "errorCode" in j:
|
||||
error = j["errorCode"]
|
||||
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
|
||||
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
|
||||
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
|
||||
# much time passed, or that it was simply missing
|
||||
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
|
||||
# the desired events could not be retrieved
|
||||
log.error(
|
||||
"The MQTT listener was disconnected for too long,"
|
||||
" events may have been lost"
|
||||
)
|
||||
self._sync_token = None
|
||||
self._sequence_id = fetch_sequence_id(self.session)
|
||||
self._messenger_queue_publish()
|
||||
# TODO: Signal to the user that they should reload their data!
|
||||
return False
|
||||
log.error("MQTT error code %s received", error)
|
||||
return False
|
||||
|
||||
# Update last sequence id
|
||||
# Except for the two cases above, this is always received
|
||||
self._sequence_id = j["lastIssuedSeqId"]
|
||||
return True
|
||||
|
||||
def _on_message_handler(self, client, userdata, message):
|
||||
# Parse payload JSON
|
||||
try:
|
||||
j = _util.parse_json(message.payload.decode("utf-8"))
|
||||
except (_exception.FacebookError, UnicodeDecodeError):
|
||||
log.debug(message.payload)
|
||||
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
|
||||
return
|
||||
|
||||
log.debug("MQTT payload: %s, %s", message.topic, j)
|
||||
|
||||
if message.topic == "/t_ms":
|
||||
if not _handle_ms(j):
|
||||
return
|
||||
|
||||
try:
|
||||
# TODO: Don't handle this in a callback
|
||||
self._tmp_events = list(
|
||||
_events.parse_events(self.session, message.topic, j)
|
||||
)
|
||||
except _exception.ParseError:
|
||||
log.exception("Failed parsing MQTT data")
|
||||
|
||||
def _on_connect_handler(self, client, userdata, flags, rc):
|
||||
if rc == 21:
|
||||
raise _exception.FacebookError(
|
||||
"Failed connecting. Maybe your cookies are wrong?"
|
||||
)
|
||||
if rc != 0:
|
||||
err = paho.mqtt.client.connack_string(rc)
|
||||
log.error("MQTT Connection Error: %s", err)
|
||||
return # Don't try to send publish if the connection failed
|
||||
|
||||
self._messenger_queue_publish()
|
||||
|
||||
def _messenger_queue_publish(self):
|
||||
# configure receiving messages.
|
||||
payload = {
|
||||
"sync_api_version": 10,
|
||||
"max_deltas_able_to_process": 1000,
|
||||
"delta_batch_size": 500,
|
||||
"encoding": "JSON",
|
||||
"entity_fbid": self.session.user.id,
|
||||
}
|
||||
|
||||
# If we don't have a sync_token, create a new messenger queue
|
||||
# This is done so that across reconnects, if we've received a sync token, we
|
||||
# SHOULD receive a piece of data in /t_ms exactly once!
|
||||
if self._sync_token is None:
|
||||
topic = "/messenger_sync_create_queue"
|
||||
payload["initial_titan_sequence_id"] = str(self._sequence_id)
|
||||
payload["device_params"] = None
|
||||
else:
|
||||
topic = "/messenger_sync_get_diffs"
|
||||
payload["last_seq_id"] = str(self._sequence_id)
|
||||
payload["sync_token"] = self._sync_token
|
||||
|
||||
self._mqtt.publish(topic, _util.json_minimal(payload), qos=1)
|
||||
|
||||
def _configure_connect_options(self):
|
||||
# Generate a new session ID on each reconnect
|
||||
session_id = generate_session_id()
|
||||
|
||||
username = {
|
||||
# The user ID
|
||||
"u": self.session.user.id,
|
||||
@@ -262,7 +271,7 @@ class Listener:
|
||||
# Application ID, taken from facebook.com
|
||||
"aid": 219994525426954,
|
||||
# MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing
|
||||
"st": topics,
|
||||
"st": TOPICS,
|
||||
# MQTT extension by FB, allows making a PUBLISH while CONNECTing
|
||||
# Using this is more efficient, but the same can be acheived with:
|
||||
# def on_connect(*args):
|
||||
@@ -288,19 +297,18 @@ class Listener:
|
||||
"pack": [],
|
||||
}
|
||||
|
||||
# TODO: Make this thread safe
|
||||
self._mqtt.username_pw_set(_util.json_minimal(username))
|
||||
|
||||
headers = {
|
||||
# TODO: Make this access thread safe
|
||||
"Cookie": get_cookie_header(
|
||||
self.session._session, "https://edge-chat.facebook.com/chat"
|
||||
),
|
||||
"User-Agent": self.session._session.headers["User-Agent"],
|
||||
"Origin": "https://www.facebook.com",
|
||||
"Host": self._HOST,
|
||||
"Host": HOST,
|
||||
}
|
||||
|
||||
# TODO: Is region (lla | atn | odn | others?) important?
|
||||
self._mqtt.ws_set_options(
|
||||
path="/chat?sid={}".format(session_id), headers=headers
|
||||
)
|
||||
|
Reference in New Issue
Block a user