Refactor MQTT to do proper reconnecting

This commit is contained in:
Mads Marquart
2020-01-05 14:56:01 +01:00
parent 766b0125fb
commit a298e0cf16

View File

@@ -2,7 +2,7 @@ import attr
import random import random
import paho.mqtt.client import paho.mqtt.client
from ._core import log from ._core import log
from . import _util, _graphql from . import _util, _exception, _graphql
def generate_session_id(): def generate_session_id():
@@ -17,7 +17,7 @@ class Mqtt:
_on_message = attr.ib() _on_message = attr.ib()
_chat_on = attr.ib() _chat_on = attr.ib()
_foreground = attr.ib() _foreground = attr.ib()
_session_id = attr.ib(factory=generate_session_id) _sequence_id = attr.ib()
_sync_token = attr.ib(None) _sync_token = attr.ib(None)
_HOST = "edge-chat.facebook.com" _HOST = "edge-chat.facebook.com"
@@ -30,15 +30,6 @@ class Mqtt:
protocol=paho.mqtt.client.MQTTv31, protocol=paho.mqtt.client.MQTTv31,
transport="websockets", transport="websockets",
) )
self = cls(
state=state,
mqtt=mqtt,
on_message=on_message,
chat_on=chat_on,
foreground=foreground,
)
mqtt.enable_logger() mqtt.enable_logger()
# mqtt.max_inflight_messages_set(20) # mqtt.max_inflight_messages_set(20)
# mqtt.max_queued_messages_set(0) # unlimited # mqtt.max_queued_messages_set(0) # unlimited
@@ -46,46 +37,46 @@ class Mqtt:
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120) # mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
# TODO: Is region (lla | atn | odn | others?) important? # TODO: Is region (lla | atn | odn | others?) important?
mqtt.tls_set() mqtt.tls_set()
mqtt.ws_set_options(
path="/chat?sid={}".format(session_id), headers=self._create_headers
)
mqtt.on_message = self._on_message_handler
sequence_id = self._fetch_sequence_id(self._state) self = cls(
# Set connect/reconnect data with an empty sync token and an newly fetched state=state,
# sequence id initially mqtt=mqtt,
self._set_reconnect_data(self._sync_token, sequence_id) on_message=on_message,
chat_on=chat_on,
foreground=foreground,
sequence_id=cls._fetch_sequence_id(state),
)
# Configure callbacks
mqtt.on_message = self._on_message_handler
mqtt.on_connect = self._on_connect_handler
self._configure_connect_options()
# TODO: Handle response code # TODO: Handle response code
response_code = mqtt.connect(self._HOST, 443, keepalive=10) response_code = mqtt.connect(self._HOST, 443, keepalive=10)
def _create_headers(self, headers): return self
log.debug("Fetching MQTT headers")
# TODO: Make this access thread safe
headers["Cookie"] = _util.get_cookie_header(self._state._session, self._HOST)
headers["User-Agent"] = self._state._session.headers["User-Agent"]
headers["Origin"] = "https://www.facebook.com"
headers["Host"] = self._HOST
return headers
def _on_message_handler(self, client, userdata, message): def _on_message_handler(self, client, userdata, message):
j = _util.parse_json(message.payload) # Parse payload JSON
if message.topic == "/t_ms": try:
sequence_id = None j = _util.parse_json(message.payload)
except _exception.FBchatFacebookError:
log.exception("Failed parsing MQTT data as JSON: %r", message.payload)
return
if message.topic == "/t_ms":
# Update sync_token when received # Update sync_token when received
# This is received in the first message after we've created a messenger # This is received in the first message after we've created a messenger
# sync queue. # sync queue.
if "syncToken" in j and "firstDeltaSeqId" in j: if "syncToken" in j and "firstDeltaSeqId" in j:
self._sync_token = j["syncToken"] self._sync_token = j["syncToken"]
sequence_id = j["firstDeltaSeqId"] self._sequence_id = j["firstDeltaSeqId"]
# Update last sequence id when received # Update last sequence id when received
if "lastIssuedSeqId" in j: if "lastIssuedSeqId" in j:
sequence_id = j["lastIssuedSeqId"] self._sequence_id = j["lastIssuedSeqId"]
if sequence_id is not None:
self._set_reconnect_data(self._sync_token, sequence_id)
# Call the external callback # Call the external callback
self._on_message(message.topic, j) self._on_message(message.topic, j)
@@ -109,40 +100,39 @@ class Mqtt:
# TODO: Proper exceptions # TODO: Proper exceptions
raise raise
@staticmethod def _on_connect_handler(self, client, userdata, flags, rc):
def _get_messenger_sync(state, sync_token, sequence_id): # configure receiving messages.
"""Get the data to configure receiving messages."""
payload = { payload = {
"sync_api_version": 10, "sync_api_version": 10,
"max_deltas_able_to_process": 1000, "max_deltas_able_to_process": 1000,
"delta_batch_size": 500, "delta_batch_size": 500,
"encoding": "JSON", "encoding": "JSON",
"entity_fbid": state.user_id, "entity_fbid": self._state.user_id,
} }
# If we don't have a sync_token, create a new messenger queue # If we don't have a sync_token, create a new messenger queue
# This is done so that across reconnects, if we've received a sync token, we # This is done so that across reconnects, if we've received a sync token, we
# SHOULD receive a piece of data in /t_ms exactly once! # SHOULD receive a piece of data in /t_ms exactly once!
if sync_token is None: if self._sync_token is None:
topic = "/messenger_sync_create_queue" topic = "/messenger_sync_create_queue"
payload["initial_titan_sequence_id"] = str(sequence_id) payload["initial_titan_sequence_id"] = str(self._sequence_id)
payload["device_params"] = None payload["device_params"] = None
else: else:
topic = "/messenger_sync_get_diffs" topic = "/messenger_sync_get_diffs"
payload["last_seq_id"] = str(sequence_id) payload["last_seq_id"] = str(self._sequence_id)
payload["sync_token"] = sync_token payload["sync_token"] = self._sync_token
return topic, payload self._mqtt.publish(topic, _util.json_minimal(payload), qos=1)
def _set_reconnect_data(self, sync_token, sequence_id): def _configure_connect_options(self):
log.debug("Setting MQTT reconnect data: %s/%s", sync_token, sequence_id) # Generate a new session ID on each reconnect
topic, payload = self._get_messenger_sync(self._state, sync_token, sequence_id) session_id = generate_session_id()
username = { username = {
# The user ID # The user ID
"u": self._state.user_id, "u": self._state.user_id,
# Session ID # Session ID
"s": self._session_id, "s": session_id,
# Active status setting # Active status setting
"chat_on": self._chat_on, "chat_on": self._chat_on,
# foreground_state - Whether the window is focused # foreground_state - Whether the window is focused
@@ -178,18 +168,18 @@ class Mqtt:
"/sr_res", "/sr_res",
], ],
# MQTT extension by FB, allows making a PUBLISH while CONNECTing # MQTT extension by FB, allows making a PUBLISH while CONNECTing
# Using this is more efficient, but the same can be acheived with:
# def on_connect(*args):
# mqtt.publish(topic, payload, qos=1)
# mqtt.on_connect = on_connect
# TODO: For some reason this doesn't work!
"pm": [ "pm": [
{ # {
"topic": topic, # "topic": topic,
"payload": _util.json_minimal(payload), # "payload": payload,
"qos": 1, # "qos": 1,
"messageId": 65536, # "messageId": 65536,
} # }
# The above is more efficient, but the same effect could have been
# acheived with:
# def on_connect(*args):
# mqtt.publish(topic, payload=..., qos=1)
# mqtt.on_connect = on_connect
], ],
# Unknown parameters # Unknown parameters
"cp": 3, "cp": 3,
@@ -205,8 +195,44 @@ class Mqtt:
# TODO: Make this thread safe # TODO: Make this thread safe
self._mqtt.username_pw_set(_util.json_minimal(username)) self._mqtt.username_pw_set(_util.json_minimal(username))
headers = {
# TODO: Make this access thread safe
"Cookie": _util.get_cookie_header(self._state._session, self._HOST),
"User-Agent": self._state._session.headers["User-Agent"],
"Origin": "https://www.facebook.com",
"Host": self._HOST,
}
self._mqtt.ws_set_options(
path="/chat?sid={}".format(session_id), headers=headers
)
def listen(self): def listen(self):
self._mqtt.loop_forever() # TODO: retry_first_connection=True? while True:
rc = self._mqtt.loop(timeout=1.0)
if rc == paho.mqtt.client.MQTT_ERR_SUCCESS:
continue # No errors
# If disconnect() has been called
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
break
# Wait before reconnecting
self._mqtt._reconnect_wait()
# Try reconnecting
self._configure_connect_options()
try:
self._mqtt.reconnect()
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
):
log.debug("MQTT connection failed")
# self._mqtt.loop_forever() # TODO: retry_first_connection=True?
def disconnect(self): def disconnect(self):
self._mqtt.disconnect() self._mqtt.disconnect()