Refactor polls and poll options
This commit is contained in:
@@ -3,7 +3,7 @@ import time
|
||||
import requests
|
||||
|
||||
from ._core import log
|
||||
from . import _util, _graphql, _session
|
||||
from . import _util, _graphql, _session, _poll
|
||||
|
||||
from ._exception import FBchatException, FBchatFacebookError
|
||||
from ._thread import ThreadLocation
|
||||
@@ -22,7 +22,6 @@ from ._quick_reply import (
|
||||
QuickReplyPhoneNumber,
|
||||
QuickReplyEmail,
|
||||
)
|
||||
from ._poll import Poll, PollOption
|
||||
from ._plan import PlanData
|
||||
|
||||
|
||||
@@ -615,22 +614,6 @@ class Client:
|
||||
raise FBchatException("Could not fetch image URL from: {}".format(j))
|
||||
return url
|
||||
|
||||
def fetch_poll_options(self, poll_id):
|
||||
"""Fetch list of `PollOption` objects from the poll id.
|
||||
|
||||
Args:
|
||||
poll_id: Poll ID to fetch from
|
||||
|
||||
Returns:
|
||||
list
|
||||
|
||||
Raises:
|
||||
FBchatException: If request failed
|
||||
"""
|
||||
data = {"question_id": poll_id}
|
||||
j = self._payload_post("/ajax/mercury/get_poll_options", data)
|
||||
return [PollOption._from_graphql(m) for m in j]
|
||||
|
||||
def _get_private_data(self):
|
||||
(j,) = self.graphql_requests(_graphql.from_doc_id("1868889766468115", {}))
|
||||
return j["viewer"]
|
||||
@@ -675,40 +658,6 @@ class Client:
|
||||
END FETCH METHODS
|
||||
"""
|
||||
|
||||
"""
|
||||
SEND METHODS
|
||||
"""
|
||||
|
||||
def update_poll_vote(self, poll_id, option_ids=[], new_options=[]):
|
||||
"""Update a poll vote.
|
||||
|
||||
Args:
|
||||
poll_id: ID of the poll to update vote
|
||||
option_ids: List of the option IDs to vote
|
||||
new_options: List of the new option names
|
||||
|
||||
Raises:
|
||||
FBchatException: If request failed
|
||||
"""
|
||||
data = {"question_id": poll_id}
|
||||
|
||||
for i, option_id in enumerate(option_ids):
|
||||
data["selected_options[{}]".format(i)] = option_id
|
||||
|
||||
for i, option_text in enumerate(new_options):
|
||||
data["new_options[{}]".format(i)] = option_text
|
||||
|
||||
j = self._payload_post("/messaging/group_polling/update_vote/?dpr=1", data)
|
||||
if j.get("status") != "success":
|
||||
raise FBchatFacebookError(
|
||||
"Failed updating poll vote: {}".format(j.get("errorTitle")),
|
||||
fb_error_message=j.get("errorMessage"),
|
||||
)
|
||||
|
||||
"""
|
||||
END SEND METHODS
|
||||
"""
|
||||
|
||||
def mark_as_delivered(self, thread_id, message_id):
|
||||
"""Mark a message as delivered.
|
||||
|
||||
@@ -1165,7 +1114,7 @@ class Client:
|
||||
elif delta_type == "group_poll":
|
||||
event_type = delta["untypedData"]["event_type"]
|
||||
poll_json = _util.parse_json(delta["untypedData"]["question_json"])
|
||||
poll = Poll._from_graphql(poll_json)
|
||||
poll = _poll.Poll._from_graphql(self.session, poll_json)
|
||||
if event_type == "question_creation":
|
||||
# User created group poll
|
||||
self.on_poll_created(
|
||||
|
116
fbchat/_poll.py
116
fbchat/_poll.py
@@ -1,49 +1,28 @@
|
||||
import attr
|
||||
from ._core import attrs_default
|
||||
|
||||
|
||||
@attrs_default
|
||||
class Poll:
|
||||
"""Represents a poll."""
|
||||
|
||||
#: Title of the poll
|
||||
title = attr.ib()
|
||||
#: List of `PollOption`, can be fetched with `Client.fetch_poll_options`
|
||||
options = attr.ib()
|
||||
#: Options count
|
||||
options_count = attr.ib(None)
|
||||
#: ID of the poll
|
||||
id = attr.ib(None)
|
||||
|
||||
@classmethod
|
||||
def _from_graphql(cls, data):
|
||||
return cls(
|
||||
id=int(data["id"]),
|
||||
title=data.get("title") if data.get("title") else data.get("text"),
|
||||
options=[PollOption._from_graphql(m) for m in data.get("options")],
|
||||
options_count=data.get("total_count"),
|
||||
)
|
||||
from . import _exception, _session
|
||||
from typing import Iterable, Sequence
|
||||
|
||||
|
||||
@attrs_default
|
||||
class PollOption:
|
||||
"""Represents a poll option."""
|
||||
|
||||
#: Text of the poll option
|
||||
text = attr.ib()
|
||||
#: Whether vote when creating or client voted
|
||||
vote = attr.ib(False)
|
||||
#: ID of the users who voted for this poll option
|
||||
voters = attr.ib(None)
|
||||
#: Votes count
|
||||
votes_count = attr.ib(None)
|
||||
#: ID of the poll option
|
||||
id = attr.ib(None)
|
||||
id = attr.ib(converter=str, type=str)
|
||||
#: Text of the poll option
|
||||
text = attr.ib(type=str)
|
||||
#: Whether vote when creating or client voted
|
||||
vote = attr.ib(type=bool)
|
||||
#: ID of the users who voted for this poll option
|
||||
voters = attr.ib(type=Sequence[str])
|
||||
#: Votes count
|
||||
votes_count = attr.ib(type=int)
|
||||
|
||||
@classmethod
|
||||
def _from_graphql(cls, data):
|
||||
if data.get("viewer_has_voted") is None:
|
||||
vote = None
|
||||
vote = False
|
||||
elif isinstance(data["viewer_has_voted"], bool):
|
||||
vote = data["viewer_has_voted"]
|
||||
else:
|
||||
@@ -53,13 +32,76 @@ class PollOption:
|
||||
text=data.get("text"),
|
||||
vote=vote,
|
||||
voters=(
|
||||
[m.get("node").get("id") for m in data.get("voters").get("edges")]
|
||||
[m["node"]["id"] for m in data["voters"]["edges"]]
|
||||
if isinstance(data.get("voters"), dict)
|
||||
else data.get("voters")
|
||||
else data["voters"]
|
||||
),
|
||||
votes_count=(
|
||||
data.get("voters").get("count")
|
||||
data["voters"]["count"]
|
||||
if isinstance(data.get("voters"), dict)
|
||||
else data.get("total_count")
|
||||
else data["total_count"]
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@attrs_default
|
||||
class Poll:
|
||||
"""Represents a poll."""
|
||||
|
||||
#: ID of the poll
|
||||
session = attr.ib(type=_session.Session)
|
||||
#: ID of the poll
|
||||
id = attr.ib(converter=str, type=str)
|
||||
#: The poll's question
|
||||
question = attr.ib(type=str)
|
||||
#: The poll's top few options. The full list can be fetched with `fetch_options`
|
||||
options = attr.ib(type=Sequence[PollOption])
|
||||
#: Options count
|
||||
options_count = attr.ib(type=int)
|
||||
|
||||
@classmethod
|
||||
def _from_graphql(cls, session, data):
|
||||
return cls(
|
||||
session=session,
|
||||
id=data["id"],
|
||||
question=data["title"] if data.get("title") else data["text"],
|
||||
options=[PollOption._from_graphql(m) for m in data["options"]],
|
||||
options_count=data["total_count"],
|
||||
)
|
||||
|
||||
def fetch_options(self) -> Sequence[PollOption]:
|
||||
"""Fetch full list of `PollOption` objects on the poll."""
|
||||
data = {"question_id": self.id}
|
||||
j = self.session._payload_post("/ajax/mercury/get_poll_options", data)
|
||||
return [PollOption._from_graphql(m) for m in j]
|
||||
|
||||
def set_votes(self, option_ids: Iterable[str], new_options: Iterable[str] = None):
|
||||
"""Update the user's poll vote.
|
||||
|
||||
Args:
|
||||
option_ids: Option ids to vote for / keep voting for
|
||||
new_options: New options to add
|
||||
|
||||
Example:
|
||||
options = poll.fetch_options()
|
||||
# Add option
|
||||
poll.set_votes([o.id for o in options], new_options=["New option"])
|
||||
# Remove vote from option
|
||||
poll.set_votes([o.id for o in options if o.text != "Option 1"])
|
||||
"""
|
||||
data = {"question_id": self.id}
|
||||
|
||||
for i, option_id in enumerate(option_ids or ()):
|
||||
data["selected_options[{}]".format(i)] = option_id
|
||||
|
||||
for i, option_text in enumerate(new_options or ()):
|
||||
data["new_options[{}]".format(i)] = option_text
|
||||
|
||||
j = self.session._payload_post(
|
||||
"/messaging/group_polling/update_vote/?dpr=1", data
|
||||
)
|
||||
if j.get("status") != "success":
|
||||
raise _exception.FBchatFacebookError(
|
||||
"Failed updating poll vote: {}".format(j.get("errorTitle")),
|
||||
fb_error_message=j.get("errorMessage"),
|
||||
)
|
||||
|
@@ -5,7 +5,7 @@ import datetime
|
||||
import enum
|
||||
from ._core import attrs_default, Image
|
||||
from . import _util, _exception, _session, _graphql, _attachment, _file, _plan
|
||||
from typing import MutableMapping, Any, Iterable, Tuple, Optional
|
||||
from typing import MutableMapping, Mapping, Any, Iterable, Tuple, Optional
|
||||
|
||||
|
||||
class ThreadLocation(enum.Enum):
|
||||
@@ -473,10 +473,15 @@ class ThreadABC(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
return _plan.Plan._create(self, name, at, location_name, location_id)
|
||||
|
||||
def create_poll(self, question: str, options=Iterable[Tuple[str, bool]]):
|
||||
def create_poll(self, question: str, options=Mapping[str, bool]):
|
||||
"""Create poll in a thread.
|
||||
|
||||
# TODO: Arguments
|
||||
Args:
|
||||
question: The question
|
||||
options: Options and whether you want to select the option
|
||||
|
||||
Example:
|
||||
thread.create_poll("Test poll", {"Option 1": True, "Option 2": False})
|
||||
"""
|
||||
# We're using ordered dictionaries, because the Facebook endpoint that parses
|
||||
# the POST parameters is badly implemented, and deals with ordering the options
|
||||
@@ -486,9 +491,9 @@ class ThreadABC(metaclass=abc.ABCMeta):
|
||||
[("question_text", question), ("target_id", self.id)]
|
||||
)
|
||||
|
||||
for i, (text, vote) in enumerate(options):
|
||||
for i, (text, vote) in enumerate(options.items()):
|
||||
data["option_text_array[{}]".format(i)] = text
|
||||
data["option_is_selected_array[{}]".format(i)] = str(int(vote))
|
||||
data["option_is_selected_array[{}]".format(i)] = "1" if vote else "0"
|
||||
|
||||
j = self.session._payload_post(
|
||||
"/messaging/group_polling/create_poll/?dpr=1", data
|
||||
|
@@ -10,7 +10,7 @@ def test_poll_option_from_graphql_unvoted():
|
||||
"voters": [],
|
||||
}
|
||||
assert PollOption(
|
||||
text="abc", vote=False, voters=[], votes_count=0, id=123456789
|
||||
text="abc", vote=False, voters=[], votes_count=0, id="123456789"
|
||||
) == PollOption._from_graphql(data)
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ def test_poll_option_from_graphql_voted():
|
||||
"voters": ["1234", "2345"],
|
||||
}
|
||||
assert PollOption(
|
||||
text="abc", vote=True, voters=["1234", "2345"], votes_count=2, id=123456789
|
||||
text="abc", vote=True, voters=["1234", "2345"], votes_count=2, id="123456789"
|
||||
) == PollOption._from_graphql(data)
|
||||
|
||||
|
||||
@@ -39,11 +39,11 @@ def test_poll_option_from_graphql_alternate_format():
|
||||
},
|
||||
}
|
||||
assert PollOption(
|
||||
text="abc", vote=True, voters=["1234", "2345"], votes_count=2, id=123456789
|
||||
text="abc", vote=True, voters=["1234", "2345"], votes_count=2, id="123456789"
|
||||
) == PollOption._from_graphql(data)
|
||||
|
||||
|
||||
def test_poll_from_graphql():
|
||||
def test_poll_from_graphql(session):
|
||||
data = {
|
||||
"id": "123456789",
|
||||
"text": "Some poll",
|
||||
@@ -74,14 +74,21 @@ def test_poll_from_graphql():
|
||||
],
|
||||
}
|
||||
assert Poll(
|
||||
title="Some poll",
|
||||
session=session,
|
||||
question="Some poll",
|
||||
options=[
|
||||
PollOption(text="Abc", vote=True, voters=["1234"], votes_count=1, id=1111),
|
||||
PollOption(
|
||||
text="Def", vote=False, voters=["2345", "3456"], votes_count=2, id=2222
|
||||
text="Abc", vote=True, voters=["1234"], votes_count=1, id="1111"
|
||||
),
|
||||
PollOption(text="Ghi", vote=False, voters=[], votes_count=0, id=3333),
|
||||
PollOption(
|
||||
text="Def",
|
||||
vote=False,
|
||||
voters=["2345", "3456"],
|
||||
votes_count=2,
|
||||
id="2222",
|
||||
),
|
||||
PollOption(text="Ghi", vote=False, voters=[], votes_count=0, id="3333"),
|
||||
],
|
||||
options_count=5,
|
||||
id=123456789,
|
||||
) == Poll._from_graphql(data)
|
||||
) == Poll._from_graphql(session, data)
|
||||
|
@@ -9,40 +9,26 @@ pytestmark = pytest.mark.online
|
||||
@pytest.fixture(
|
||||
scope="module",
|
||||
params=[
|
||||
Poll(title=random_hex(), options=[]),
|
||||
Poll(
|
||||
title=random_hex(),
|
||||
options=[
|
||||
PollOption(text=random_hex(), vote=True),
|
||||
PollOption(text=random_hex(), vote=True),
|
||||
(random_hex(), []),
|
||||
(random_hex(), [(random_hex(), True), (random_hex(), True),],),
|
||||
(random_hex(), [(random_hex(), False), (random_hex(), False),],),
|
||||
(
|
||||
random_hex(),
|
||||
[
|
||||
(random_hex(), True),
|
||||
(random_hex(), True),
|
||||
(random_hex(), False),
|
||||
(random_hex(), False),
|
||||
(random_hex()),
|
||||
(random_hex()),
|
||||
],
|
||||
),
|
||||
Poll(
|
||||
title=random_hex(),
|
||||
options=[
|
||||
PollOption(text=random_hex(), vote=False),
|
||||
PollOption(text=random_hex(), vote=False),
|
||||
],
|
||||
),
|
||||
Poll(
|
||||
title=random_hex(),
|
||||
options=[
|
||||
PollOption(text=random_hex(), vote=True),
|
||||
PollOption(text=random_hex(), vote=True),
|
||||
PollOption(text=random_hex(), vote=False),
|
||||
PollOption(text=random_hex(), vote=False),
|
||||
PollOption(text=random_hex()),
|
||||
PollOption(text=random_hex()),
|
||||
],
|
||||
),
|
||||
pytest.param(
|
||||
Poll(title=None, options=[]), marks=[pytest.mark.xfail(raises=ValueError)]
|
||||
),
|
||||
pytest.param((None, []), marks=[pytest.mark.xfail(raises=ValueError)]),
|
||||
],
|
||||
)
|
||||
def poll_data(request, client1, group, catch_event):
|
||||
with catch_event("on_poll_created") as x:
|
||||
client1.create_poll(request.param, thread_id=group["id"])
|
||||
client1.create_poll(request.param[0], request.param[1], thread_id=group["id"])
|
||||
options = client1.fetch_poll_options(x.res["poll"].id)
|
||||
return x.res, request.param, options
|
||||
|
||||
|
Reference in New Issue
Block a user