Handle connecting in Listener.listen

This commit is contained in:
Mads Marquart
2020-01-25 14:50:15 +01:00
parent 2da8369c70
commit f8e110f180

View File

@@ -61,6 +61,24 @@ def generate_session_id() -> int:
return random.randint(1, 2 ** 53)
def mqtt_factory() -> paho.mqtt.client.Client:
# Configure internal MQTT handler
mqtt = paho.mqtt.client.Client(
client_id="mqttwsclient",
clean_session=True,
protocol=paho.mqtt.client.MQTTv31,
transport="websockets",
)
mqtt.enable_logger()
# mqtt.max_inflight_messages_set(20) # The rest will get queued
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
mqtt.tls_set()
mqtt.connect_async(HOST, 443, keepalive=10)
return mqtt
def fetch_sequence_id(session: _session.Session) -> int:
"""Fetch sequence ID."""
params = {
@@ -84,10 +102,10 @@ class Listener:
"""Helper, to listen for incoming Facebook events."""
session = attr.ib(type=_session.Session)
_mqtt = attr.ib(type=paho.mqtt.client.Client)
_chat_on = attr.ib(type=bool)
_foreground = attr.ib(type=bool)
_sequence_id = attr.ib(type=int)
_mqtt = attr.ib(factory=mqtt_factory, type=paho.mqtt.client.Client)
_sync_token = attr.ib(None, type=str)
_tmp_events = attr.ib(None, type=Optional[Iterable[_events.Event]])
@@ -97,6 +115,11 @@ class Listener:
self.session, self._chat_on, self._foreground
)
def __attrs_post_init__(self):
# Configure callbacks
self._mqtt.on_message = self._on_message_handler
self._mqtt.on_connect = self._on_connect_handler
@classmethod
def connect(cls, session: _session.Session, chat_on: bool, foreground: bool):
"""Initialize a connection to the Facebook MQTT service.
@@ -109,51 +132,13 @@ class Listener:
Example:
>>> listener = fbchat.Listener.connect(session, chat_on=True, foreground=True)
"""
mqtt = paho.mqtt.client.Client(
client_id="mqttwsclient",
clean_session=True,
protocol=paho.mqtt.client.MQTTv31,
transport="websockets",
)
mqtt.enable_logger()
# mqtt.max_inflight_messages_set(20) # The rest will get queued
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
mqtt.tls_set()
self = cls(
return cls(
session=session,
mqtt=mqtt,
chat_on=chat_on,
foreground=foreground,
sequence_id=fetch_sequence_id(session),
)
# Configure callbacks
mqtt.on_message = self._on_message_handler
mqtt.on_connect = self._on_connect_handler
self._configure_connect_options()
# Attempt to connect
try:
rc = mqtt.connect(HOST, 443, keepalive=10)
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
) as e:
raise _exception.FacebookError("MQTT connection failed") from e
# Raise error if connecting failed
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
err = paho.mqtt.client.error_string(rc)
raise _exception.FacebookError("MQTT connection failed: {}".format(err))
return self
def _handle_ms(self, j):
"""Handle /t_ms special logic.
@@ -206,7 +191,7 @@ class Listener:
log.debug("MQTT payload: %s, %s", message.topic, j)
if message.topic == "/t_ms":
if not _handle_ms(j):
if not self._handle_ms(j):
return
try:
@@ -313,45 +298,21 @@ class Listener:
path="/chat?sid={}".format(session_id), headers=headers
)
def _loop_once(self) -> bool:
rc = self._mqtt.loop(timeout=1.0)
# If disconnect() has been called
# Beware, internal API, may have to change this to something more stable!
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
return False # Stop listening
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
# If known/expected error
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
log.warning("Connection lost, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340
log.warning("Connection error, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
raise _exception.NotLoggedIn("MQTT connection refused")
else:
err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err)
def _reconnect(self):
# Try reconnecting
self._configure_connect_options()
try:
self._mqtt.reconnect()
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
) as e:
log.debug("MQTT reconnection failed: %s", e)
# Wait before reconnecting
self._mqtt._reconnect_wait()
# Try reconnecting
self._configure_connect_options()
try:
self._mqtt.reconnect()
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
) as e:
log.debug("MQTT reconnection failed: %s", e)
return True # Keep listening
def listen(self) -> Iterable[_events.Event]:
"""Run the listening loop continually.
@@ -365,10 +326,41 @@ class Listener:
>>> for event in listener.listen():
... print(event)
"""
while self._loop_once():
# Make sure we're connected
while True:
# Beware, internal API, may have to change this to something more stable!
if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async:
self._reconnect()
else:
break
while True:
rc = self._mqtt.loop(timeout=1.0)
# If disconnect() has been called
# Beware, internal API, may have to change this to something more stable!
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
break # Stop listening
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
# If known/expected error
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
log.warning("Connection lost, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340
log.warning("Connection error, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
raise _exception.NotLoggedIn("MQTT connection refused")
else:
err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err)
self._reconnect()
if self._tmp_events:
yield from self._tmp_events
self._tmp_events = None
self._tmp_events = None
def disconnect(self) -> None:
"""Disconnect the MQTT listener.