Replace pull channel with MQTT setup

This commit is contained in:
Mads Marquart
2020-01-05 19:42:07 +01:00
parent 803bfa7084
commit a1b80a7abb
2 changed files with 52 additions and 106 deletions

View File

@@ -12,6 +12,7 @@ from ._util import *
from .models import *
from . import _graphql
from ._state import State
from ._mqtt import Mqtt
import time
import json
@@ -85,13 +86,11 @@ class Client(object):
Raises:
FBchatException: On failed login
"""
self._sticky, self._pool = (None, None)
self._seq = "0"
self._default_thread_id = None
self._default_thread_type = None
self._pull_channel = 0
self._markAlive = True
self._buddylist = dict()
self._mqtt = None
handler.setLevel(logging_level)
@@ -2169,38 +2168,6 @@ class Client(object):
LISTEN METHODS
"""
def _ping(self):
data = {
"seq": self._seq,
"channel": "p_" + self._uid,
"clientid": self._state._client_id,
"partition": -2,
"cap": 0,
"uid": self._uid,
"sticky_token": self._sticky,
"sticky_pool": self._pool,
"viewer_uid": self._uid,
"state": "active",
}
j = self._get(
"https://{}-edge-chat.facebook.com/active_ping".format(self._pull_channel),
data,
)
def _pullMessage(self):
"""Call pull api to fetch message data."""
data = {
"seq": self._seq,
"msgs_recv": 0,
"sticky_token": self._sticky,
"sticky_pool": self._pool,
"clientid": self._state._client_id,
"state": "active" if self._markAlive else "offline",
}
return self._get(
"https://{}-edge-chat.facebook.com/pull".format(self._pull_channel), data
)
def _parseDelta(self, m):
def getThreadIdAndThreadType(msg_metadata):
"""Return a tuple consisting of thread ID and thread type."""
@@ -2751,13 +2718,15 @@ class Client(object):
else:
self.onUnknownMesssageType(msg=m)
def _parse_payload(self, m):
mtype = m.get("type")
def _parse_payload(self, topic, m):
# Things that directly change chat
if mtype == "delta":
if topic == "delta":
self._parseDelta(m)
# TODO: Remove old parsing below
# Inbox
elif mtype == "inbox":
elif topic == "inbox":
self.onInbox(
unseen=m["unseen"],
unread=m["unread"],
@@ -2766,7 +2735,7 @@ class Client(object):
)
# Typing
elif mtype == "typ" or mtype == "ttyp":
elif topic in ["typ", "ttyp"]:
author_id = str(m.get("from"))
thread_id = m.get("thread_fbid")
if thread_id:
@@ -2794,20 +2763,12 @@ class Client(object):
#
# self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time'))
elif mtype in ["jewel_requests_add"]:
elif topic == "jewel_requests_add":
from_id = m["from"]
self.onFriendRequest(from_id=from_id, msg=m)
# Happens on every login
elif mtype == "qprimer":
self.onQprimer(ts=m.get("made"), msg=m)
# Is sent before any other message
elif mtype == "deltaflow":
pass
# Chat timestamp
elif mtype == "chatproxy-presence":
elif topic == "chatproxy-presence":
statuses = dict()
for id_, data in m.get("buddyList", {}).items():
statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data)
@@ -2816,7 +2777,7 @@ class Client(object):
self.onChatTimestamp(buddylist=statuses, msg=m)
# Buddylist overlay
elif mtype == "buddylist_overlay":
elif topic == "buddylist_overlay":
statuses = dict()
for id_, data in m.get("overlay", {}).items():
old_in_game = None
@@ -2832,29 +2793,11 @@ class Client(object):
else:
self.onUnknownMesssageType(msg=m)
def _parseMessage(self, content):
"""Get message and author name from content.
May contain multiple messages in the content.
"""
self._seq = content.get("seq", "0")
if "lb_info" in content:
self._sticky = content["lb_info"]["sticky"]
self._pool = content["lb_info"]["pool"]
if "batches" in content:
for batch in content["batches"]:
self._parseMessage(batch)
if "ms" not in content:
return
for m in content["ms"]:
try:
self._parse_payload(m)
except Exception as e:
self.onMessageError(exception=e, msg=m)
def _parse_message(self, topic, data):
try:
self._parse_payload(topic, data)
except Exception as e:
self.onMessageError(exception=e, msg=data)
def startListening(self):
"""Start listening from an external event loop.
@@ -2862,6 +2805,15 @@ class Client(object):
Raises:
FBchatException: If request failed
"""
if not self._mqtt:
self._mqtt = Mqtt.connect(
state=self._state,
on_message=self._parse_message,
chat_on=self._markAlive,
foreground=True,
)
# Backwards compat
self.onQprimer(ts=now(), msg=None)
self.listening = True
def doOneListen(self, markAlive=None):
@@ -2879,36 +2831,20 @@ class Client(object):
"""
if markAlive is not None:
self._markAlive = markAlive
try:
if self._markAlive:
self._ping()
content = self._pullMessage()
if content:
self._parseMessage(content)
except KeyboardInterrupt:
return False
except requests.Timeout:
pass
except requests.ConnectionError:
# If the client has lost their internet connection, keep trying every 30 seconds
time.sleep(30)
except FBchatFacebookError as e:
# Fix 502 and 503 pull errors
if e.request_status_code in [502, 503]:
# Bump pull channel, while contraining withing 0-4
self._pull_channel = (self._pull_channel + 1) % 5
self.startListening()
else:
raise e
except Exception as e:
return self.onListenError(exception=e)
return True
# TODO: Remove this wierd check, and let the user handle the chat_on parameter
if self._markAlive != self._mqtt._chat_on:
self._mqtt.set_chat_on(self._markAlive)
# TODO: Remove on_error param
return self._mqtt.loop_once(on_error=self.onListenError)
def stopListening(self):
"""Clean up the variables from `Client.startListening`."""
"""Stop the listening loop."""
if not self._mqtt:
raise ValueError("Not listening")
self._mqtt.disconnect()
self.listening = False
self._sticky, self._pool = (None, None)
def listen(self, markAlive=None):
"""Initialize and runs the listening loop continually.

View File

@@ -222,7 +222,7 @@ class Mqtt:
path="/chat?sid={}".format(session_id), headers=headers
)
def loop_once(self):
def loop_once(self, on_error=None):
"""Run the listening loop once.
Returns whether to keep listening or not.
@@ -236,6 +236,12 @@ class Mqtt:
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
err = paho.mqtt.client.error_string(rc)
log.warning("MQTT Error: %s", err)
if on_error:
# Temporary to support on_error param
try:
raise _exception.FBchatException("MQTT Error: {}".format(err))
except _exception.FBchatException as e:
on_error(exception=e)
# Wait before reconnecting
self._mqtt._reconnect_wait()
@@ -264,11 +270,15 @@ class Mqtt:
# TODO: We can't wait for this, since the loop is running with .loop_forever()
# info.wait_for_publish()
# def set_client_settings(self, available_when_in_foreground: bool):
# data = {"make_user_available_when_in_foreground": available_when_in_foreground}
# payload = _util.json_minimal(data)
# info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1)
#
def set_chat_on(self, value):
# TODO: Is this the right request to make?
data = {"make_user_available_when_in_foreground": value}
payload = _util.json_minimal(data)
info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1)
self._chat_on = value
# TODO: We can't wait for this, since the loop is running with .loop_forever()
# info.wait_for_publish()
# def send_additional_contacts(self, additional_contacts):
# payload = _util.json_minimal({"additional_contacts": additional_contacts})
# info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1)