Browse Source

ws suspend and resume support

Giuseppe Guerra 2 months ago
5 changed files with 93 additions and 9 deletions
  1. +47
  2. +1
  3. +16
  4. +18
  5. +11

+ 47
- 7 View File

@@ -6,19 +6,17 @@ from import WsEvent
from import Bot
from utils.rippleapi import BanchoClientType
from ws.client import LoginFailedError
from ws.messages import WsSubscribe, WsAuth, WsJoinChatChannel, WsPong, WsChatMessage
from ws.messages import WsSubscribe, WsAuth, WsJoinChatChannel, WsPong, WsChatMessage, WsResume, WsSuspend

bot = Bot()

async def connected():
bot.logger.debug("Ws client started, now logging in")
async def _login():
results = await bot.client.wait("msg:auth_success", "msg:auth_failure")
if "msg:auth_failure" in results:"Login failed")
bot.logger.error("Login failed")
raise LoginFailedError()"Logged in successfully")
except LoginFailedError:
@@ -36,6 +34,32 @@ async def connected():
await bot.run_init_hooks()

async def _resume():
if not bot.suspended:
raise RuntimeError("The bot must be suspended in order to resume")
results = await bot.client.wait("msg:resume_success", "msg:resume_failure")
if "msg:resume_failure" in results:
bot.logger.error("Resume failed! Now disposing")

# We have logged back in!
bot.resume_token = None"Resumed connection. Flushing old queue.")

async def connected():
bot.logger.debug("Ws client started, now logging in")
if not bot.suspended:
await _login()
await _resume()

async def chat_channel_joined(name: str, **kwargs):"Joined {name}")
@@ -109,6 +133,20 @@ async def on_message(sender: Dict[str, Any], recipient: Dict[str, Any], pm: bool
bot.send_message(x, final_recipient)

async def suspend(token: str, **kwargs):"Suspended fun! Closing ws connection.")
bot.resume_token = token
# Cancel just the writer task so we do not send any new messages.
# The server will take care of closing our connection.
# (which will result in cancelling the reader task as well)
# All messages sent in the meantime will end up in the queue
# and will be sent as soon as the new writer task gets scheduled
# once we re-enstablish the connection to the server.
if not bot.client.writer_task.cancelled():

async def on_disconnect(*args, **kwargs):
@@ -131,10 +169,12 @@ async def on_disconnect(*args, **kwargs):
await bot.client.start()
await bot.client.wait("ready")
await bot.client.wait("ready", "resumed")
bot.send_message("Reconnected.", "#admin")

# Reset only if we haven't been disconnected for server recycle
if not bot.suspended:
bot.reconnecting = True
seconds = 5 # todo: backoff?"Disconnected! Starting reconnect loop in {seconds} seconds")

+ 1
- 0
pubsub/handlers/ View File

@@ -6,6 +6,7 @@ from utils.schema import NonEmptyString

bind = Bot().pubsub_binding_manager

@pubsub.schema({"recipient": NonEmptyString, "message": NonEmptyString})
async def handle(data: Dict[str, Any]) -> None:

+ 16
- 1
singletons/ View File

@@ -17,7 +17,7 @@ except ImportError:

import functools
import logging
from typing import Callable, Optional, Dict, Union, List, Tuple, Type, Iterator, Generator, Iterable, Set
from typing import Callable, Optional, Dict, Union, List, Tuple, Iterator, Set

from aiohttp import web
import aioredis
@@ -96,11 +96,26 @@ class Bot:

self.tinydb_path = tinydb_path

self._resume_token: Optional[str] = None

self.login_channels_left: Set[str] = set()
self.joined_channels: Set[str] = set()
self.match_delayed_start_tasks: Dict[int, asyncio.Task] = {}
self.init_hooks: List[InitHook] = []

def suspended(self) -> bool:
return self._resume_token is not None

def resume_token(self) -> Optional[str]:
return self._resume_token

def resume_token(self, v: Optional[str]) -> None:
self._resume_token = v
self.client.suspended = v is not None

def send_message(self, message: str, recipient: str) -> None:
Shorthand to send a message

+ 18
- 0
ws/ View File

@@ -29,12 +29,26 @@ class WsClient:
self.ws_url = ws_url
self._reader_queue = asyncio.Queue()
self._writer_queue = asyncio.Queue()
# Messages sent while the client is reconnecting end up in here
self._old_writer_queue: Optional[asyncio.Queue] = None
self._events: DefaultDict[str, asyncio.Event] = defaultdict(lambda: asyncio.Event())
self._event_handlers: DefaultDict[str, List[Callable]] = defaultdict(list) Optional[aiohttp.ClientWebSocketResponse] = None
self.writer_task: Optional[asyncio.Task] = None
self.reader_task: Optional[asyncio.Task] = None
self.running: bool = False
self.suspended: bool = False

def recycle_queue(self):
self._old_writer_queue = self._writer_queue
self._writer_queue = asyncio.Queue()

def flush_old_queue(self):
if self._old_writer_queue is None:
while not self._old_writer_queue.empty():
self._old_writer_queue = None

def send(self, message: WsMessage) -> None:
@@ -73,6 +87,10 @@ class WsClient:
async with aiohttp.ClientSession() as session:"Connecting to {self.ws_url}")
async with session.ws_connect(self.ws_url) as ws:
if self.suspended:
self.logger.debug("Recycling writer queue")

self.running = True = ws
self.writer_task = asyncio.ensure_future(self.writer())

+ 11
- 1
ws/ View File

@@ -66,4 +66,14 @@ class WsSubscribeMatch(WsSubscribe):

class WsUnsubscribeMatch(WsSubscribe):
def __init__(self, match_id: int):
super(WsUnsubscribeMatch, self).__init__(WsEvent.MULTIPLAYER, {"match_id": match_id})
super(WsUnsubscribeMatch, self).__init__(WsEvent.MULTIPLAYER, {"match_id": match_id})

class WsResume(WsMessage):
def __init__(self, token: str):
super(WsResume, self).__init__("resume", {"token": token})

class WsSuspend(WsMessage):
def __init__(self):
super(WsSuspend, self).__init__("suspend")