Asynchronous Ripple chat bot, delta-compatible
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
7.2KB

  1. from typing import Dict, Any
  2. import asyncio
  3. from constants.events import WsEvent
  4. from plugins.base import Command
  5. from singletons.bot import Bot
  6. from utils.rippleapi import BanchoClientType
  7. from ws.client import LoginFailedError
  8. from ws.messages import WsSubscribe, WsAuth, WsJoinChatChannel, WsPong, WsChatMessage, WsResume, WsSuspend
  9. bot = Bot()
  10. async def _login():
  11. try:
  12. bot.client.send(WsAuth(bot.bancho_api_client.token))
  13. results = await bot.client.wait("msg:auth_success", "msg:auth_failure")
  14. if "msg:auth_failure" in results:
  15. bot.logger.error("Login failed")
  16. raise LoginFailedError()
  17. bot.logger.info("Logged in successfully")
  18. except LoginFailedError:
  19. bot.logger.error("Login failed! Now disposing.")
  20. bot.loop.stop()
  21. else:
  22. bot.client.send(WsSubscribe(WsEvent.CHAT_CHANNELS))
  23. await bot.client.wait("msg:subscribed")
  24. bot.logger.debug("Subscribed to chat channel events. Now joining channels")
  25. channels = await bot.bancho_api_client.get_all_channels()
  26. bot.login_channels_left |= {x["name"].lower() for x in channels}
  27. for channel in channels:
  28. bot.logger.debug(f"Joining {channel['name']}")
  29. bot.client.send(WsJoinChatChannel(channel["name"]))
  30. await bot.run_init_hooks()
  31. async def _resume():
  32. if not bot.suspended:
  33. raise RuntimeError("The bot must be suspended in order to resume")
  34. bot.client.send(WsResume(bot.resume_token))
  35. results = await bot.client.wait("msg:resume_success", "msg:resume_failure")
  36. if "msg:resume_failure" in results:
  37. bot.logger.error("Resume failed! Now disposing")
  38. bot.loop.stop()
  39. return
  40. # We have logged back in!
  41. bot.resume_token = None
  42. bot.logger.info("Resumed connection. Flushing old queue.")
  43. bot.client.flush_old_queue()
  44. bot.client.trigger("resumed")
  45. @bot.client.on("connected")
  46. async def connected():
  47. bot.logger.debug("Ws client started, now logging in")
  48. if not bot.suspended:
  49. await _login()
  50. else:
  51. await _resume()
  52. @bot.client.on("msg:chat_channel_joined")
  53. async def chat_channel_joined(name: str, **kwargs):
  54. bot.logger.info(f"Joined {name}")
  55. bot.joined_channels.add(name.lower())
  56. if not bot.ready:
  57. bot.login_channels_left.remove(name.lower())
  58. if not bot.login_channels_left:
  59. bot.ready = True
  60. bot.client.trigger("ready")
  61. bot.logger.info("Bot ready!")
  62. @bot.client.on("msg:chat_channel_added")
  63. async def chat_channel_added(name: str, **kwargs):
  64. bot.logger.debug(f"Channel {name} added")
  65. bot.client.send(WsJoinChatChannel(name))
  66. @bot.client.on("msg:chat_channel_removed")
  67. async def chat_channel_removed(name: str, **kwargs):
  68. bot.logger.debug(f"Channel {name} removed")
  69. try:
  70. bot.joined_channels.remove(name)
  71. except KeyError:
  72. pass
  73. @bot.client.on("msg:chat_channel_left")
  74. async def chat_channel_left(name: str, **kwargs):
  75. bot.logger.info(f"Left {name}")
  76. try:
  77. bot.joined_channels.remove(name)
  78. except KeyError:
  79. pass
  80. @bot.client.on("msg:ping")
  81. async def ping():
  82. bot.logger.debug("Got PINGed by the server. Answering.")
  83. bot.client.send(WsPong())
  84. @bot.client.on("msg:chat_message")
  85. async def on_message(sender: Dict[str, Any], recipient: Dict[str, Any], pm: bool, message: str, **kwargs) -> None:
  86. message = message.strip()
  87. is_command = message.startswith(bot.command_prefix)
  88. is_action = message.startswith("\x01ACTION")
  89. if sender["type"] == BanchoClientType.FAKE:
  90. # Do not process messages by fake Foka
  91. return
  92. bot.logger.debug(f"{sender['username']}{sender['api_identifier']}: {message} (cmd:{is_command}, act:{is_action})")
  93. # nick = sender["username"]
  94. if sender["username"].lower() == bot.nickname.lower() or (not is_command and not is_action):
  95. return
  96. if pm:
  97. final_recipient = sender["username"]
  98. else:
  99. final_recipient = recipient["name"]
  100. raw_message = message[len(bot.command_prefix if is_command else "\x01ACTION"):].lower().strip()
  101. dispatcher = bot.command_handlers if is_command else bot.action_handlers
  102. parts = raw_message.split(" ")
  103. for i, part in enumerate(parts):
  104. if part not in dispatcher:
  105. # Nothing to do
  106. return
  107. if issubclass(type(dispatcher[part]), Command):
  108. # Handler found
  109. k = " ".join(parts[:i+1])
  110. bot.logger.debug(f"Triggered {dispatcher[part]} ({k}) [{'command' if is_command else 'action'}]")
  111. command_name_length = len(k.split(" "))
  112. result = await dispatcher[part].handler(
  113. sender=sender, recipient=recipient, pm=pm, message=message,
  114. parts=message.split(" ")[command_name_length:], command_name=k
  115. )
  116. if result is not None:
  117. if type(result) not in (tuple, list):
  118. result = (result,)
  119. for x in result:
  120. bot.send_message(x, final_recipient)
  121. # Trigger only one command
  122. break
  123. else:
  124. # Nested
  125. dispatcher = dispatcher[part]
  126. @bot.client.on("msg:suspend")
  127. async def suspend(token: str, **kwargs):
  128. bot.logger.info(f"Suspended fun! Closing ws connection.")
  129. bot.resume_token = token
  130. # Cancel just the writer task so we do not send any new messages.
  131. # The server will take care of closing our connection.
  132. # (which will result in cancelling the reader task as well)
  133. # All messages sent in the meantime will end up in the queue
  134. # and will be sent as soon as the new writer task gets scheduled
  135. # once we re-enstablish the connection to the server.
  136. if not bot.client.writer_task.cancelled():
  137. bot.client.writer_task.cancel()
  138. @bot.client.on("disconnected")
  139. async def on_disconnect(*args, **kwargs):
  140. """
  141. Called when the client is disconnected.
  142. Tries to reconnect to the server.
  143. :param kwargs:
  144. :return:
  145. """
  146. if bot.disposing:
  147. return
  148. if bot.reconnecting:
  149. bot.logger.warning("Got 'disconnect', but the bot is already reconnecting.")
  150. return
  151. async def reconnect():
  152. """
  153. Performs the actual reconnection, wait for the 'ready' event and notifies '#admin'
  154. :return:
  155. """
  156. await bot.client.start()
  157. await bot.client.wait("ready", "resumed")
  158. bot.send_message("Reconnected.", "#admin")
  159. # Reset only if we haven't been disconnected for server recycle
  160. if not bot.suspended:
  161. bot.reset()
  162. bot.reconnecting = True
  163. seconds = 5 # todo: backoff?
  164. bot.logger.info(f"Disconnected! Starting reconnect loop in {seconds} seconds")
  165. await asyncio.sleep(seconds)
  166. while True:
  167. try:
  168. bot.logger.info(f"Trying to reconnect. Max timeout is {seconds} seconds.")
  169. await asyncio.wait_for(reconnect(), timeout=seconds)
  170. break
  171. except ConnectionError:
  172. bot.logger.warning(f"Connection failed! Retrying in {seconds} seconds")
  173. await asyncio.sleep(seconds)
  174. except asyncio.TimeoutError:
  175. bot.logger.warning("Server timeout")
  176. bot.reconnecting = False
  177. bot.logger.info("Reconnected!")