From 26f99d983e76cc19cfbcf914f6f2eea6954b755a Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Thu, 9 Jan 2020 22:02:09 +0100 Subject: [PATCH] Refactor polls and poll options --- fbchat/_client.py | 55 +-------------------- fbchat/_poll.py | 116 ++++++++++++++++++++++++++++++-------------- fbchat/_thread.py | 15 ++++-- tests/test_poll.py | 25 ++++++---- tests/test_polls.py | 42 ++++++---------- 5 files changed, 121 insertions(+), 132 deletions(-) diff --git a/fbchat/_client.py b/fbchat/_client.py index a15fd5f..245e98f 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -3,7 +3,7 @@ import time import requests from ._core import log -from . import _util, _graphql, _session +from . import _util, _graphql, _session, _poll from ._exception import FBchatException, FBchatFacebookError from ._thread import ThreadLocation @@ -22,7 +22,6 @@ from ._quick_reply import ( QuickReplyPhoneNumber, QuickReplyEmail, ) -from ._poll import Poll, PollOption from ._plan import PlanData @@ -615,22 +614,6 @@ class Client: raise FBchatException("Could not fetch image URL from: {}".format(j)) return url - def fetch_poll_options(self, poll_id): - """Fetch list of `PollOption` objects from the poll id. - - Args: - poll_id: Poll ID to fetch from - - Returns: - list - - Raises: - FBchatException: If request failed - """ - data = {"question_id": poll_id} - j = self._payload_post("/ajax/mercury/get_poll_options", data) - return [PollOption._from_graphql(m) for m in j] - def _get_private_data(self): (j,) = self.graphql_requests(_graphql.from_doc_id("1868889766468115", {})) return j["viewer"] @@ -675,40 +658,6 @@ class Client: END FETCH METHODS """ - """ - SEND METHODS - """ - - def update_poll_vote(self, poll_id, option_ids=[], new_options=[]): - """Update a poll vote. - - Args: - poll_id: ID of the poll to update vote - option_ids: List of the option IDs to vote - new_options: List of the new option names - - Raises: - FBchatException: If request failed - """ - data = {"question_id": poll_id} - - for i, option_id in enumerate(option_ids): - data["selected_options[{}]".format(i)] = option_id - - for i, option_text in enumerate(new_options): - data["new_options[{}]".format(i)] = option_text - - j = self._payload_post("/messaging/group_polling/update_vote/?dpr=1", data) - if j.get("status") != "success": - raise FBchatFacebookError( - "Failed updating poll vote: {}".format(j.get("errorTitle")), - fb_error_message=j.get("errorMessage"), - ) - - """ - END SEND METHODS - """ - def mark_as_delivered(self, thread_id, message_id): """Mark a message as delivered. @@ -1165,7 +1114,7 @@ class Client: elif delta_type == "group_poll": event_type = delta["untypedData"]["event_type"] poll_json = _util.parse_json(delta["untypedData"]["question_json"]) - poll = Poll._from_graphql(poll_json) + poll = _poll.Poll._from_graphql(self.session, poll_json) if event_type == "question_creation": # User created group poll self.on_poll_created( diff --git a/fbchat/_poll.py b/fbchat/_poll.py index 6c6fdc7..957aa49 100644 --- a/fbchat/_poll.py +++ b/fbchat/_poll.py @@ -1,49 +1,28 @@ import attr from ._core import attrs_default - - -@attrs_default -class Poll: - """Represents a poll.""" - - #: Title of the poll - title = attr.ib() - #: List of `PollOption`, can be fetched with `Client.fetch_poll_options` - options = attr.ib() - #: Options count - options_count = attr.ib(None) - #: ID of the poll - id = attr.ib(None) - - @classmethod - def _from_graphql(cls, data): - return cls( - id=int(data["id"]), - title=data.get("title") if data.get("title") else data.get("text"), - options=[PollOption._from_graphql(m) for m in data.get("options")], - options_count=data.get("total_count"), - ) +from . import _exception, _session +from typing import Iterable, Sequence @attrs_default class PollOption: """Represents a poll option.""" - #: Text of the poll option - text = attr.ib() - #: Whether vote when creating or client voted - vote = attr.ib(False) - #: ID of the users who voted for this poll option - voters = attr.ib(None) - #: Votes count - votes_count = attr.ib(None) #: ID of the poll option - id = attr.ib(None) + id = attr.ib(converter=str, type=str) + #: Text of the poll option + text = attr.ib(type=str) + #: Whether vote when creating or client voted + vote = attr.ib(type=bool) + #: ID of the users who voted for this poll option + voters = attr.ib(type=Sequence[str]) + #: Votes count + votes_count = attr.ib(type=int) @classmethod def _from_graphql(cls, data): if data.get("viewer_has_voted") is None: - vote = None + vote = False elif isinstance(data["viewer_has_voted"], bool): vote = data["viewer_has_voted"] else: @@ -53,13 +32,76 @@ class PollOption: text=data.get("text"), vote=vote, voters=( - [m.get("node").get("id") for m in data.get("voters").get("edges")] + [m["node"]["id"] for m in data["voters"]["edges"]] if isinstance(data.get("voters"), dict) - else data.get("voters") + else data["voters"] ), votes_count=( - data.get("voters").get("count") + data["voters"]["count"] if isinstance(data.get("voters"), dict) - else data.get("total_count") + else data["total_count"] ), ) + + +@attrs_default +class Poll: + """Represents a poll.""" + + #: ID of the poll + session = attr.ib(type=_session.Session) + #: ID of the poll + id = attr.ib(converter=str, type=str) + #: The poll's question + question = attr.ib(type=str) + #: The poll's top few options. The full list can be fetched with `fetch_options` + options = attr.ib(type=Sequence[PollOption]) + #: Options count + options_count = attr.ib(type=int) + + @classmethod + def _from_graphql(cls, session, data): + return cls( + session=session, + id=data["id"], + question=data["title"] if data.get("title") else data["text"], + options=[PollOption._from_graphql(m) for m in data["options"]], + options_count=data["total_count"], + ) + + def fetch_options(self) -> Sequence[PollOption]: + """Fetch full list of `PollOption` objects on the poll.""" + data = {"question_id": self.id} + j = self.session._payload_post("/ajax/mercury/get_poll_options", data) + return [PollOption._from_graphql(m) for m in j] + + def set_votes(self, option_ids: Iterable[str], new_options: Iterable[str] = None): + """Update the user's poll vote. + + Args: + option_ids: Option ids to vote for / keep voting for + new_options: New options to add + + Example: + options = poll.fetch_options() + # Add option + poll.set_votes([o.id for o in options], new_options=["New option"]) + # Remove vote from option + poll.set_votes([o.id for o in options if o.text != "Option 1"]) + """ + data = {"question_id": self.id} + + for i, option_id in enumerate(option_ids or ()): + data["selected_options[{}]".format(i)] = option_id + + for i, option_text in enumerate(new_options or ()): + data["new_options[{}]".format(i)] = option_text + + j = self.session._payload_post( + "/messaging/group_polling/update_vote/?dpr=1", data + ) + if j.get("status") != "success": + raise _exception.FBchatFacebookError( + "Failed updating poll vote: {}".format(j.get("errorTitle")), + fb_error_message=j.get("errorMessage"), + ) diff --git a/fbchat/_thread.py b/fbchat/_thread.py index 628cb3e..9b9698a 100644 --- a/fbchat/_thread.py +++ b/fbchat/_thread.py @@ -5,7 +5,7 @@ import datetime import enum from ._core import attrs_default, Image from . import _util, _exception, _session, _graphql, _attachment, _file, _plan -from typing import MutableMapping, Any, Iterable, Tuple, Optional +from typing import MutableMapping, Mapping, Any, Iterable, Tuple, Optional class ThreadLocation(enum.Enum): @@ -473,10 +473,15 @@ class ThreadABC(metaclass=abc.ABCMeta): """ return _plan.Plan._create(self, name, at, location_name, location_id) - def create_poll(self, question: str, options=Iterable[Tuple[str, bool]]): + def create_poll(self, question: str, options=Mapping[str, bool]): """Create poll in a thread. - # TODO: Arguments + Args: + question: The question + options: Options and whether you want to select the option + + Example: + thread.create_poll("Test poll", {"Option 1": True, "Option 2": False}) """ # We're using ordered dictionaries, because the Facebook endpoint that parses # the POST parameters is badly implemented, and deals with ordering the options @@ -486,9 +491,9 @@ class ThreadABC(metaclass=abc.ABCMeta): [("question_text", question), ("target_id", self.id)] ) - for i, (text, vote) in enumerate(options): + for i, (text, vote) in enumerate(options.items()): data["option_text_array[{}]".format(i)] = text - data["option_is_selected_array[{}]".format(i)] = str(int(vote)) + data["option_is_selected_array[{}]".format(i)] = "1" if vote else "0" j = self.session._payload_post( "/messaging/group_polling/create_poll/?dpr=1", data diff --git a/tests/test_poll.py b/tests/test_poll.py index dd12470..fa64bca 100644 --- a/tests/test_poll.py +++ b/tests/test_poll.py @@ -10,7 +10,7 @@ def test_poll_option_from_graphql_unvoted(): "voters": [], } assert PollOption( - text="abc", vote=False, voters=[], votes_count=0, id=123456789 + text="abc", vote=False, voters=[], votes_count=0, id="123456789" ) == PollOption._from_graphql(data) @@ -23,7 +23,7 @@ def test_poll_option_from_graphql_voted(): "voters": ["1234", "2345"], } assert PollOption( - text="abc", vote=True, voters=["1234", "2345"], votes_count=2, id=123456789 + text="abc", vote=True, voters=["1234", "2345"], votes_count=2, id="123456789" ) == PollOption._from_graphql(data) @@ -39,11 +39,11 @@ def test_poll_option_from_graphql_alternate_format(): }, } assert PollOption( - text="abc", vote=True, voters=["1234", "2345"], votes_count=2, id=123456789 + text="abc", vote=True, voters=["1234", "2345"], votes_count=2, id="123456789" ) == PollOption._from_graphql(data) -def test_poll_from_graphql(): +def test_poll_from_graphql(session): data = { "id": "123456789", "text": "Some poll", @@ -74,14 +74,21 @@ def test_poll_from_graphql(): ], } assert Poll( - title="Some poll", + session=session, + question="Some poll", options=[ - PollOption(text="Abc", vote=True, voters=["1234"], votes_count=1, id=1111), PollOption( - text="Def", vote=False, voters=["2345", "3456"], votes_count=2, id=2222 + text="Abc", vote=True, voters=["1234"], votes_count=1, id="1111" ), - PollOption(text="Ghi", vote=False, voters=[], votes_count=0, id=3333), + PollOption( + text="Def", + vote=False, + voters=["2345", "3456"], + votes_count=2, + id="2222", + ), + PollOption(text="Ghi", vote=False, voters=[], votes_count=0, id="3333"), ], options_count=5, id=123456789, - ) == Poll._from_graphql(data) + ) == Poll._from_graphql(session, data) diff --git a/tests/test_polls.py b/tests/test_polls.py index 274f259..820eeb4 100644 --- a/tests/test_polls.py +++ b/tests/test_polls.py @@ -9,40 +9,26 @@ pytestmark = pytest.mark.online @pytest.fixture( scope="module", params=[ - Poll(title=random_hex(), options=[]), - Poll( - title=random_hex(), - options=[ - PollOption(text=random_hex(), vote=True), - PollOption(text=random_hex(), vote=True), + (random_hex(), []), + (random_hex(), [(random_hex(), True), (random_hex(), True),],), + (random_hex(), [(random_hex(), False), (random_hex(), False),],), + ( + random_hex(), + [ + (random_hex(), True), + (random_hex(), True), + (random_hex(), False), + (random_hex(), False), + (random_hex()), + (random_hex()), ], ), - Poll( - title=random_hex(), - options=[ - PollOption(text=random_hex(), vote=False), - PollOption(text=random_hex(), vote=False), - ], - ), - Poll( - title=random_hex(), - options=[ - PollOption(text=random_hex(), vote=True), - PollOption(text=random_hex(), vote=True), - PollOption(text=random_hex(), vote=False), - PollOption(text=random_hex(), vote=False), - PollOption(text=random_hex()), - PollOption(text=random_hex()), - ], - ), - pytest.param( - Poll(title=None, options=[]), marks=[pytest.mark.xfail(raises=ValueError)] - ), + pytest.param((None, []), marks=[pytest.mark.xfail(raises=ValueError)]), ], ) def poll_data(request, client1, group, catch_event): with catch_event("on_poll_created") as x: - client1.create_poll(request.param, thread_id=group["id"]) + client1.create_poll(request.param[0], request.param[1], thread_id=group["id"]) options = client1.fetch_poll_options(x.res["poll"].id) return x.res, request.param, options