Browse Source

Code cleaning, removed unused code

master
Giuseppe Guerra 2 months ago
parent
commit
59d3206fd9
11 changed files with 34 additions and 263 deletions
  1. +1
    -1
      constants/events.py
  2. +2
    -2
      plugins/base/__init__.py
  3. +1
    -1
      plugins/base/filters.py
  4. +1
    -1
      plugins/base/utils.py
  5. +24
    -21
      plugins/multiplayer.py
  6. +1
    -3
      singletons/bot.py
  7. +0
    -163
      utils/cache.py
  8. +2
    -1
      utils/general.py
  9. +0
    -69
      utils/privileges_cache.py
  10. +1
    -1
      utils/rippleapi.py
  11. +1
    -0
      ws/messages.py

+ 1
- 1
constants/events.py View File

@@ -5,4 +5,4 @@ class WsEvent(Enum):
CHAT_CHANNELS = "chat_channels"
MULTIPLAYER = "multiplayer"
LOBBY = "lobby"
STATUS_UPDATES = "status_updates"
STATUS_UPDATES = "status_updates"

+ 2
- 2
plugins/base/__init__.py View File

@@ -39,7 +39,6 @@ class CommandAlias(Command):
return f"Alias: {super(CommandAlias, self).__str__()}"



class GenericBotError(Exception):
pass

@@ -151,6 +150,7 @@ def protected(required_privileges: Privileges) -> Callable:
return wrapper
return decorator


def tournament_staff_or_host(f: Callable) -> Callable:
"""
Allows this command only if the sender is the host of this match or if the sender
@@ -172,7 +172,7 @@ def tournament_staff_or_host(f: Callable) -> Callable:
if not can:
match_info = await singletons.bot.Bot().bancho_api_client.get_match_info(match_id)
can = match_info["host_api_identifier"] == sender["api_identifier"] \
or match_info["api_owner_user_id"] == sender["user_id"]
or match_info["api_owner_user_id"] == sender["user_id"]
if not can:
return "You must be the host of the match to trigger this command."
return await f(sender=sender, match_id=match_id, **kwargs)


+ 1
- 1
plugins/base/filters.py View File

@@ -14,4 +14,4 @@ def is_private(*, pm: bool) -> bool:


def is_public(*, pm: bool) -> bool:
return not pm
return not pm

+ 1
- 1
plugins/base/utils.py View File

@@ -50,4 +50,4 @@ async def username_to_user_id(username: str) -> int:

def required_kwargs_only(f: Callable, **all_kwargs) -> Dict[str, Any]:
f_kwargs_keys = {k for k in inspect.signature(f).parameters.keys()}
return {k: v for k, v in all_kwargs.items() if k in f_kwargs_keys & all_kwargs.keys()}
return {k: v for k, v in all_kwargs.items() if k in f_kwargs_keys & all_kwargs.keys()}

+ 24
- 21
plugins/multiplayer.py View File

@@ -36,10 +36,10 @@ def resolve_mp(f: Callable) -> Callable:
plugins.base.Arg("name", Schema(str)),
plugins.base.Arg("password", Schema(str), default=None, optional=True),
)
async def make(name: str, password: Optional[str]) -> str:
async def make(name: str, psw: Optional[str]) -> str:
match_id = await bot.bancho_api_client.create_match(
name,
password,
psw,
beatmap=BanchoApiBeatmap(0, "a" * 32, "No song")
)
return f"Multiplayer match #{match_id} created!"
@@ -240,8 +240,8 @@ async def map_(match_id: int, beatmap_id: int, game_mode: Optional[GameMode] = N
@plugins.base.arguments(
Arg("password", Schema(str), rest=True)
)
async def password(match_id: int, password: str) -> str:
await bot.bancho_api_client.edit_match(match_id, password=password)
async def password(match_id: int, psw: str) -> str:
await bot.bancho_api_client.edit_match(match_id, password=psw)
return "The password has been changed."


@@ -354,11 +354,11 @@ async def score_v(match_id: int, v: int) -> str:
@bot.command("mp help")
@plugins.base.base
async def help_() -> str:
l = '|'.join(
cmd_list = '|'.join(
k[len("mp "):] + (f" (alias of {v.root_name})" if issubclass(type(v), plugins.base.CommandAlias) else "")
for k, v in bot.get_commands_with_prefix('mp')
)
return f"Supported subcommands: !mp <{l}>"
return f"Supported subcommands: !mp <{cmd_list}>"


@bot.command("mp info")
@@ -367,33 +367,36 @@ async def help_() -> str:
@plugins.base.tournament_staff_or_host
@plugins.base.base
async def info(match_id: int) -> None:
info = await bot.bancho_api_client.get_match_info(match_id)
info_ = await bot.bancho_api_client.get_match_info(match_id)
r = f"#multi_{match_id}"
bot.send_message("✱ MATCH INFO ✱", r)
bot.send_message(f"id: {info['id']}, name: {info['name']}, has password: {info['has_password']}", r)
bot.send_message(f"id: {info_['id']}, name: {info_['name']}, has password: {info_['has_password']}", r)
bot.send_message(
f"in progress: {info['in_progress']}, "
f"game mode: {GameMode(info['game_mode']).name.lower()}, "
f"special: {info['special']}",
f"in progress: {info_['in_progress']}, "
f"game mode: {GameMode(info_['game_mode']).name.lower()}, "
f"special: {info_['special']}",
r
)
bot.send_message(f"owner: {info['api_owner_user_id']}, private history: {info['private_match_history']}", r)
bot.send_message(f"owner: {info_['api_owner_user_id']}, private history: {info_['private_match_history']}", r)
bot.send_message(
f"scoring type: {ScoringType(info['scoring_type']).name}, "
f"team type: {TeamType(info['team_type']).name}, ",
f"scoring type: {ScoringType(info_['scoring_type']).name}, "
f"team type: {TeamType(info_['team_type']).name}, ",
r
)
bot.send_message(
f"free mod: {bool(info['free_mod'])}, "
f"global mods: {str(Mod(info['mods'])) if info['mods'] != Mod.NO_MOD else 'no mod'}",
f"free mod: {bool(info_['free_mod'])}, "
f"global mods: {str(Mod(info_['mods'])) if info_['mods'] != Mod.NO_MOD else 'no mod'}",
r
)
bot.send_message("✱ SLOTS ✱", r)
last_full_slot = next((len(info["slots"]) - i for i, x in enumerate(reversed(info["slots"])) if x['user'] is not None), 0)
if not info['slots']:
last_full_slot = next(
(len(info_["slots"]) - i for i, x in enumerate(reversed(info_["slots"])) if x['user'] is not None),
0
)
if not info_['slots']:
bot.send_message("nobody", r)
else:
for i, slot in enumerate(info["slots"]):
for i, slot in enumerate(info_["slots"]):
if i >= last_full_slot:
break
bot.send_message(
@@ -401,12 +404,12 @@ async def info(match_id: int) -> None:
f"[{i}] "
) + (
f"[{Team(slot.get('team', Team.NEUTRAL)).name.lower()}] "
if info["team_type"] in (TeamType.TAG_TEAM_VS, TeamType.TEAM_VS)
if info_["team_type"] in (TeamType.TAG_TEAM_VS, TeamType.TEAM_VS)
else
""
) + (
f"<{SlotStatus(slot['status']).name.capitalize().replace('_', ' ')}> "
f"{'♛ ' if slot['user'] is not None and slot['user']['api_identifier'] == info['host_api_identifier'] else ''}"
f"{'♛ ' if slot['user'] is not None and slot['user']['api_identifier'] == info_['host_api_identifier'] else ''}"
f"{slot['user']['username'] if slot['user'] is not None else '{empty}'}"
f"{' +' if slot['mods'] != Mod.NO_MOD else ''}{str(Mod(slot['mods']))}"
),


+ 1
- 3
singletons/bot.py View File

@@ -26,15 +26,13 @@ from pubsub import reader
from pubsub.manager import PubSubBindingManager
from utils import singleton
from utils.letsapi import LetsApiClient
from utils.np_storage import NpStorage
from utils.periodic_tasks import periodic_task
from utils.rippleapi import BanchoApiClient, RippleApiClient, CheesegullApiClient
from constants.api_privileges import APIPrivileges


@singleton.singleton
class Bot:
VERSION: str = "2.3.1"
VERSION: str = "2.4.0"

def __init__(
self, *, nickname: str = "FokaBot", wss: bool = True,


+ 0
- 163
utils/cache.py View File

@@ -1,163 +0,0 @@
import datetime
import threading
from typing import Dict, Any, Optional

import logging

from collections import UserDict

from utils import general


class SafeUsernamesDict(UserDict):
"""
A custom dictionary where keys are always converted to safe usernames
"""
def __setitem__(self, key, value):
self.data[general.safefify_username(key)] = value

def __getitem__(self, item):
return self.data[general.safefify_username(item)]

def __contains__(self, item):
return general.safefify_username(item) in self.data


class CacheElement:
def __init__(self, expiration_delta: datetime.timedelta, add_time: Optional[datetime.datetime] = None):
"""
Initializes a new cached element.
It's recommended to subclass this object and call "super(...).__init__(...)"
only after setting the instance values. This is because, after calling this
constructor, all future "__setattr__" calls will update the cache refresh time
and doing so for every attribute while constructing the object is not the best thing.

:param expiration_delta: datetime.timedelta that explains for how long this object should be considered valid
:param add_time: datetime.datetime of when the object should be considered as "created".
Leave to "None" to use datetime.datetime.now()
"""
if add_time is None:
add_time = datetime.datetime.now()
self._cache__refresh_time = add_time
self._cache__expiration_delta = expiration_delta

@property
def expiration_datetime(self) -> datetime.datetime:
"""
Returns the datetime of when this cached element will expire

:return:
"""
return self._cache__refresh_time + self._cache__expiration_delta

@property
def is_expired(self) -> bool:
"""
Whether this cached element is expired or not

:return: True if it's expired, False otherwise
"""
return datetime.datetime.now() > self.expiration_datetime

def __setattr__(self, key: str, value: Any) -> None:
"""
Sets an attribute and updates the cache refresh time.
If the cache refresh time is not present (pre-initialization),
it'll simply return the attribute value.

:param key:
:param value:
:return:
"""
# "hasattr" is needed so we can bulk set subclass attributes before calling "super()"
# without updating _cache__refresh_time for each attribute
if not key.startswith("_cache__") and hasattr(self, "_cache__refresh_time"):
self._cache__refresh_time = datetime.datetime.now()
super(CacheElement, self).__setattr__(key, value)


class CacheStorage:
logger = logging.getLogger("general_cache_storage")

def __init__(self):
"""
Initializes a new CacheStorage object
"""
self._data: Dict[Any, CacheElement] = {}
self._lock: threading.Lock = threading.Lock()

def purge(self) -> None:
"""
Deletes all elements marked as "expired" from the storage

:return:
"""
with self._lock:
self.logger.debug(f"Deleting expired cached elements")
# Keep a counter to save some memory
c = 0
for k in (x for x, v in self._data.items() if v.is_expired):
del self._data[k]
c += 1
self.logger.debug(f"Deleted {c} cached elements.")

def __len__(self) -> int:
"""
Returns the number of elements (both valid and expired) in the storage

:return: number of total elements in the storage
"""
with self._lock:
return len(self._data)

def __contains__(self, key: Any) -> bool:
"""
Returns True if "key" is in the storage and is valid.

:param key: cached element key
:return: True if self[key] exists and has not expired yet, otherwise False
"""
with self._lock:
return key in self._data and not self._data[key].is_expired

def __getitem__(self, item: Any) -> Optional[CacheElement]:
"""
Returns an element from the storage.
Raises KeyError is there's no such element.
If the element has expired, it gets deleted and a KeyError is raised

:param item: cached element key
:return: the element
:raises KeyError: if there's no element or it has expired
"""
with self._lock:
if self._data[item].is_expired:
del self[item]
# Recursively call __getitem__ to raise a KeyError
return self[item]
return self._data[item]

def __setitem__(self, key: Any, value: CacheElement) -> None:
"""
Sets an element in the storage

:param key: cached element key
:param value: cached element
:return:
"""
with self._lock:
self._data[key] = value

def __delitem__(self, key: Any) -> None:
"""
Deletes an element.
Does nothing if there's no element with such key

:param key: cached element key
:return:
"""
with self._lock:
try:
del self._data[key]
except KeyError:
pass

+ 2
- 1
utils/general.py View File

@@ -5,6 +5,7 @@ from collections import Sequence

ALPHABET = string.ascii_letters + string.digits


def safefify_username(username: str) -> str:
"""
Returns the safe username from a normal username
@@ -26,4 +27,4 @@ def random_secure_string(length: int, alphabet: Sequence = None) -> str:
"""
if alphabet is None:
alphabet = ALPHABET
return "".join(secrets.choice(alphabet) for _ in range(length))
return "".join(secrets.choice(alphabet) for _ in range(length))

+ 0
- 69
utils/privileges_cache.py View File

@@ -1,69 +0,0 @@
import logging
import datetime
from typing import Dict, Optional, TypeVar, Any

from constants.privileges import Privileges
from utils.rippleapi import RippleApiClient
from utils.cache import CacheElement, CacheStorage, SafeUsernamesDict


class CachedPrivileges(CacheElement):
def __init__(self, privileges: Privileges, expiration_delta: datetime.timedelta = datetime.timedelta(minutes=30)):
super(CachedPrivileges, self).__init__(expiration_delta=expiration_delta)
self.privileges = privileges

@classmethod
def api_user_factory(cls, user: Dict[str, Any]) -> "CachedPrivilegesT":
return cls(Privileges(user["privileges"]))

def __str__(self) -> str:
return f"<{self.__class__.__name__}> " + \
(
", ".join(
f"{k}={getattr(self, k)}"
for k in dir(self)
if not k.startswith("__") and not callable(getattr(self, k))
)
)


CachedPrivilegesT = TypeVar("CachedPrivilegesT", bound=CachedPrivileges)


class PrivilegesCache(CacheStorage):
logger = logging.getLogger("privileges_cache")

def __init__(self, ripple_api_client: RippleApiClient):
super(PrivilegesCache, self).__init__()
self._data: SafeUsernamesDict[str, CachedPrivileges] = SafeUsernamesDict()
self._ripple_api_client: RippleApiClient = ripple_api_client

async def get(self, username: str) -> Optional[Privileges]:
await self._cache_privileges(username)
cached_privileges = self._data.get(username, None)
self.logger.debug(f"Cached privileges for user {username}: {cached_privileges}")
if cached_privileges is None:
return None
return cached_privileges.privileges

async def _cache_privileges(self, username: str, force: bool = False):
if not force:
cached_privileges = self._data.get(username, None)
if cached_privileges is not None and not cached_privileges.is_expired:
self.logger.debug(f"Privileges for user {username} are in cache and haven't expired yet.")
return
self.logger.debug(f"Caching privileges for user {username}")
users = await self._ripple_api_client.get_user(username)
if not users:
self.logger.error(f"Cannot cache privileged for user {users}! The user does not exist.")
return
self._data[username] = CachedPrivileges.api_user_factory(users[0])

def __getitem__(self, item: int = None) -> None:
raise RuntimeError(
"This cache is async and does not support __getitem__ and __setitem__. "
"Please use the get() coroutine instead."
)

def __setitem__(self, key: int, value: CachedPrivileges) -> None:
self.__getitem__()

+ 1
- 1
utils/rippleapi.py View File

@@ -393,7 +393,7 @@ class BanchoApiClient(RippleApiBaseClient):
async def lock(
self, match_id: int, slot: Optional[int] = None, slots: Union[None, int, List[Dict[str, Any]]] = None
) -> None:
if slots is None == slot is None:
if (slots is None) == (slot is None):
raise ValueError("You must provide either slot or slots, not neither or both.")
await self._request(f"multiplayer/{match_id}/lock", "POST", self.remove_none({
"slot": slot,


+ 1
- 0
ws/messages.py View File

@@ -11,6 +11,7 @@ class WsMessage:
if type(data) is dict:
data = dict(data)
elif callable(getattr(data, "__dict__", None)):
# lol pycharm
data = data.__dict__()
else:
raise ValueError(f"Non-serializable object in ws message data: {data} (type: {type(data)})")


Loading…
Cancel
Save