diff --git a/.gitignore b/.gitignore index fdc9616..c95f5de 100644 --- a/.gitignore +++ b/.gitignore @@ -152,5 +152,5 @@ cython_debug/ .idea/ # Project-specific ignores -src/data +data/ *.pid diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..b3d1412 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,36 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: "v3.4.0" + hooks: + - id: check-merge-conflict + stages: [ commit, push ] + - id: check-ast + stages: [ commit ] + + - repo: local + hooks: + - id: black + name: black + entry: black + language: system + types: [ python ] + stages: [ commit ] + + - id: isort + name: isort + entry: isort + args: + - --profile + - black + - --gitignore + language: system + types: [ python ] + stages: [ commit ] + + - id: flake8 + name: flake8 + entry: flake8 --exclude *venv + language: system + always_run: true + pass_filenames: false + stages: [ commit ] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index d622a49..0000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,28 +0,0 @@ -# Adding a comic - -Hey fellow comic fan! I see you venturing across this project, and you look like you want to add a comic. Do not worry, I will show you! - -Adding a comic is super easy. It does not even take knowledge in code to do it! There is only 2 steps to make: - -1. Let's start by searching for the comics and its information. You can find most of the information just by visiting the comic page. With your newly acquired knowledge about the comic, you can start filing this form: - ``` - "":{ - "Name": "", - "Author": "The name of the author(s).", - "Web_name": "", - "Main_website": "", - "Working_type": "", - "Description": "", - "Position": , - "First_date": "YEAR, MONTH, DAY", - "Color": "", - "Image": "", - "Aliases": "", - "Helptxt": "" - } - ``` - When you are done, paste the new information in the [configuration file](src/misc/comics_details.json) under the last comic information. -2. Add a new command in [Comics.py](src/Scripts/Comics.py) with the name of the comic under the last command. Just change the aliases to the ones you specified in the config file and set the `comic_name` variable to the name of the first value. -3. All done! Thank you for contributing to BDBot! - -From [this file](src/misc/ADD_COMIC.md). diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 120000 index 0000000..620f4d3 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1 @@ +bdbot/misc/ADD_COMIC.md \ No newline at end of file diff --git a/README.md b/README.md index 0ea561e..73734da 100644 --- a/README.md +++ b/README.md @@ -10,13 +10,6 @@ Discord bot that post bd strips. Simple as that! License of BDBot Last commit indicator -**IMPORTANT: Due to a misconception in how the new discord.py and discord update worked, the bot has had issues since the last day of August. A patch is on the way and will be applied before October 1st.** - -**IMPORTANT: If you cannot send slash commands, make sure to re-invite the bot with the link here:** - -https://discord.com/api/oauth2/authorize?client_id=807780409362481163&permissions=277025647680&scope=bot%20applications.commands - - ## Comics: ### Added: - Garfield https://www.gocomics.com/garfield/ @@ -67,6 +60,7 @@ https://discord.com/api/oauth2/authorize?client_id=807780409362481163&permission ## Related GitHub pages: CalvinBot : https://github.com/wdr1/CalvinBot + Robobert: https://github.com/JTexpo/Robobert ## What to learn from this project? @@ -98,13 +92,13 @@ Robobert: https://github.com/JTexpo/Robobert ## Current state of the project - Functionalities - - '/' Information embed on the requested comic. - - '/help' : Help embed - - '/git' command : Redirects to this GitHub page - - '/invite' command : Generate a link to invite the bot to your server ([or use this link](https://discord.com/api/oauth2/authorize?client_id=807780409362481163&permissions=0&scope=bot)) - - Daily Command: use '/ add/remove' to add or remove a comic from the daily list for the server. - - Use '/remove_all' to remove all comics from the daily list for the server. - - Use '/remove_channel' to remove all comics in the channel. + - `/` Information embed on the requested comic. + - `/help` : Help embed + - `/git` command : Redirects to this GitHub page + - `/invite` command : Generate a link to invite the bot to your server ([or use this link](https://discord.com/api/oauth2/authorize?client_id=807780409362481163&permissions=0&scope=bot)) + - Daily Command: use `/ add/remove` to add or remove a comic from the daily list for the server. + - Use `/remove_all` to remove all comics from the daily list for the server. + - Use `/remove_channel` to remove all comics in the channel. - Tell me if I forgot some commands here! - Bugs @@ -113,13 +107,9 @@ Robobert: https://github.com/JTexpo/Robobert - Anything else to know? - Why can't I go farther than 7 comics on Comics Kingdom? Comics Kingdom use a special premium subscription plan to view all comics. There is no known way to get around it and getting the subscription and after distributing the comic for free could cause some undesirable consequences in the future. - Why is there only 2 images for Webtoons? Webtoons only gives out two images link to the comic in their rss feed. Finding each image link is way more complicated than this and is not in place now. - - The error manager ('Errors.py') might be sometimes commented out because I want to see the errors directly in the terminal. Please tell me if I forget to remove those multi-line comments. - You want to do a pull request to add your favourite comic? - Preferably, Gocomics and Comics Kingdom comics are the easiest to implement, so try to stick with that if your comic is hosted there. - - Steps: - 1. Add a new value in misc/comics_details.json that specifies each value for the comic. (See [this README](src/misc/ADD_COMIC.md)). - 2. Add a command in Scripts/Comics.py under the latest comic with aliases that are the same that were added to the json file and change the value of 'comic_name' to the name of the comic added. - 3. That's it! + - Please see [this README](CONTRIBUTING.md) for complete instructions - If the comic is NOT hosted on GoComics/Comics Kingdom, please open an issue on the git page (https://github.com/BBArikL/BDBot). - Any pull requests that was not approved from another site will be automatically rejected, and you will be asked to follow the procedure cited. - 'Beta' and 'main'? @@ -143,6 +133,6 @@ Robobert: https://github.com/JTexpo/Robobert - Discord discriminator - Date and time of the request - The request - - If you want to delete this information (and the associated requests), use `/request_delete` + - If you want to delete this information (and the associated requests), use `/delete_request` diff --git a/src/Web_requests_manager.py b/bdbot/Web_requests_manager.py similarity index 59% rename from src/Web_requests_manager.py rename to bdbot/Web_requests_manager.py index f543566..28ca35d 100644 --- a/src/Web_requests_manager.py +++ b/bdbot/Web_requests_manager.py @@ -1,49 +1,123 @@ import json +import logging import random - -from urllib.request import urlopen +from datetime import date, datetime, timedelta +from typing import Optional, Union from urllib.error import HTTPError -from datetime import date, timedelta, datetime +from urllib.request import urlopen + from bs4 import BeautifulSoup -from rss_parser import Parser from requests import get -from src import utils -from typing import Optional, Union +from rss_parser import Parser + +from bdbot.utils import ( + COMIC_LATEST_LINKS_PATH, + DETAILS_PATH, + Action, + Date, + ExtendedAction, + check_if_latest_link, + get_link, + get_random_link, + link_cache, + load_json, + save_json, +) +logger = logging.getLogger("discord") # Utilities for web requests to have the fresh comic details -ORIGINAL_DETAILS = {"url": "", "Name": "", "title": "", "author": "", "day": "", "month": "", "year": "", - "sub_img_url": "", "img_url": "", "alt": "", "color": 0, "is_latest": False} +ORIGINAL_DETAILS = { + "url": "", + "Name": "", + "title": "", + "author": "", + "day": "", + "month": "", + "year": "", + "sub_img_url": "", + "img_url": "", + "alt": "", + "color": 0, + "is_latest": False, +} MAX_TRIES = 15 -def get_new_comic_details(strip_details: dict, param: str, comic_date: Optional[Union[datetime, int]] = None, - latest_check: Optional[bool] = False) -> Optional[dict[str, Union[str, int, bool]]]: +def create_link_cache(logger_: logging.Logger) -> None: + """Create a cache of links containing the latest comics links + + :param logger_: The logger to use + """ + logger_.debug("Running link cache...") + comics: dict = load_json(DETAILS_PATH) + for comic in comics: + logger_.debug(f"Getting image link for comic {comic} ...") + comic_url: Optional[dict[str, str]] + try: + comic_url = get_new_comic_details(comics[comic], action=Action.Today) + except (ValueError, AttributeError) as e: + logger_.error(f"An error occurred for comic {comic}: {e}") + comic_url = None + link_cache.update( + { + comics[comic]["Name"]: comic_url["img_url"] + if comic_url is not None + else "" + } + ) + + logger_.debug("Saving comics link...") + save_json(link_cache, COMIC_LATEST_LINKS_PATH) + + +def get_new_comic_details( + strip_details: dict, + action: Union[Action, ExtendedAction], + comic_date: Optional[Union[datetime, int]] = None, + latest_check: Optional[bool] = False, +) -> Optional[dict[str, Union[str, int, bool]]]: """Gets the comics details from the internet :param strip_details: - :param param: + :param action: :param comic_date: :param latest_check: :return: """ working_type = strip_details["Working_type"] comic_details: Optional[dict[str, str]] - if working_type == 'date': # Specific manager for date comics website - comic_details = get_comic_info_date(strip_details, param=param, comic_date=comic_date, - latest_check=latest_check) - elif working_type == 'rss': # Works by rss - comic_details = get_comic_info_rss(strip_details, param=param, comic_date=comic_date, latest_check=latest_check) + if working_type == "date": # Specific manager for date comics website + comic_details = get_comic_info_date( + strip_details, + action=action, + comic_date=comic_date, + latest_check=latest_check, + ) + elif working_type == "rss": # Works by rss + comic_details = get_comic_info_rss( + strip_details, + action=action, + comic_date=comic_date, + latest_check=latest_check, + ) else: # Works by number - comic_details = get_comic_info_number(strip_details, parameters=param, latest_check=latest_check) + comic_details = get_comic_info_number( + strip_details, action=action, latest_check=latest_check + ) return comic_details -def get_comic_info_date(strip_details, param=None, comic_date=None, latest_check=False) -> Optional[dict[str, str]]: +def get_comic_info_date( + strip_details, + action: Union[Action, ExtendedAction] = None, + comic_date: Date = None, + latest_check: bool = False, +) -> Optional[dict[str, str]]: """Get the details of comics which site works by date :param strip_details: - :param param: + :param action: :param comic_date: :param latest_check: :return: @@ -61,52 +135,59 @@ def get_comic_info_date(strip_details, param=None, comic_date=None, latest_check # Gets today date comic_date = date.today() - while i < MAX_TRIES and (details["img_url"] == "" or details["img_url"] is None): + while i < MAX_TRIES and ( + details["img_url"] == "" or details["img_url"] is None + ): i += 1 - if param != "random": + if action != Action.Random: details["day"] = comic_date.strftime("%d") details["month"] = comic_date.strftime("%m") details["year"] = comic_date.strftime("%Y") # Gets today / url - details["url"] = utils.get_link(strip_details, comic_date) + details["url"] = get_link(strip_details, comic_date) else: # Random comic - details["url"], random_date = utils.get_random_link(strip_details) + details["url"], random_date = get_random_link(strip_details) # Get the html of the comic site try: html = urlopen(details["url"]).read() - except HTTPError: + except HTTPError as e: + logger.error(f"An html error occurred: {e}") html = None # Extracts the title of the comic - details["title"] = extract_meta_content(html, 'title') + details["title"] = extract_meta_content(html, "title") # Finds the url of the image - details["img_url"] = extract_meta_content(html, 'image') + details["img_url"] = extract_meta_content(html, "image") # Finds the final url - details["url"] = extract_meta_content(html, 'url') + details["url"] = extract_meta_content(html, "url") if details["img_url"] is None: # Go back one day comic_date = comic_date - timedelta(days=1) - if i >= MAX_TRIES and details["img_url"] is None: # If it hasn't found anything + if i >= MAX_TRIES and details["img_url"] is None: + # If it hasn't found anything return None if details is not None: finalize_comic(strip_details, details, latest_check) # Finds the date of the random comic - if details['day'] == "": + if details["day"] == "": final_date: datetime if random_date is None: # We have to parse the string (Only gocomics) final_date = datetime.strptime( - details["url"].replace(f'https://www.gocomics.com/{strip_details["Web_name"]}/', ""), - "%Y/%m/%d") + details["url"].replace( + f'https://www.gocomics.com/{strip_details["Web_name"]}/', "" + ), + "%Y/%m/%d", + ) else: final_date = random_date @@ -129,21 +210,27 @@ def extract_meta_content(html, content) -> Optional[str]: :return: """ soup = BeautifulSoup(html, "html.parser") - content_meta = soup.find('meta', attrs={'property': f'og:{content}', 'content': True}) + content_meta = soup.find( + "meta", attrs={"property": f"og:{content}", "content": True} + ) if content_meta is not None: # If it finds the meta properties of the image - content_value = content_meta['content'] + content_value = content_meta["content"] return content_value else: return None -def get_comic_info_number(strip_details, parameters=None, latest_check=False): +def get_comic_info_number( + strip_details, + action: Union[Action, ExtendedAction] = None, + latest_check=False, +): """For sites which works by number :param strip_details: - :param parameters: + :param action: :param latest_check: :return: """ @@ -155,40 +242,41 @@ def get_comic_info_number(strip_details, parameters=None, latest_check=False): if strip_details["Name"] is not None: main_website = strip_details["Main_website"] - if strip_details["Name"] == 'xkcd': - if parameters == "random": + if strip_details["Name"] == "xkcd": + if action == Action.Random: # Link for random XKCD comic main_website = "https://c.xkcd.com/random/comic/" html = urlopen(main_website).read() - main_website = extract_meta_content(html, 'url') + main_website = extract_meta_content(html, "url") main_website = main_website + "info.0.json" details["url"] = main_website - elif strip_details["Name"] == 'Cyanide and Happiness': - if parameters == 'random': - main_website += parameters - elif parameters == 'today': - main_website += 'latest' + elif strip_details["Name"] == "Cyanide and Happiness": + if action == Action.Random: + main_website += "random" + elif action == Action.Today: + main_website += "latest" details["url"] = main_website else: html = urlopen(main_website).read() - details["url"] = extract_meta_content(html, 'url') + details["url"] = extract_meta_content(html, "url") # Get the html of the comic site try: html = urlopen(details["url"]).read() - except HTTPError: + except HTTPError as e: + logger.error(f"An html error occurred: {e}") html = None if html is not None: - if strip_details["Name"] == 'Cyanide and Happiness': + if strip_details["Name"] == "Cyanide and Happiness": # Cyanide and Happiness special extractor # heavily inspired by https://github.com/JTexpo/Robobert # Parse the json that is embedded into the end of the page - soup = BeautifulSoup(html, 'html.parser') + soup = BeautifulSoup(html, "html.parser") dat = soup.find("script", id="__NEXT_DATA__").get_text() js = json.loads(dat) @@ -215,19 +303,23 @@ def get_comic_info_number(strip_details, parameters=None, latest_check=False): details["img_url"] = comic_det["comicmgurl"] else: # modern comics - details["img_url"] = comic_det["comicimgstaticbucketurl"]["mediaItemUrl"] + details["img_url"] = comic_det["comicimgstaticbucketurl"][ + "mediaItemUrl" + ] details["author"] = author_det["name"] details["sub_img_url"] = author_det["image"]["mediaItemUrl"] - post_date = datetime.strptime(comdata["date"], "%Y-%m-%dT%H:%M:%S") + post_date = datetime.strptime( + comdata["date"], "%Y-%m-%dT%H:%M:%S" + ) details["day"] = post_date.day details["month"] = post_date.month details["year"] = post_date.year break - elif strip_details["Name"] == 'xkcd': + elif strip_details["Name"] == "xkcd": # XKCD special extractor # We requested a json and not a html json_details = json.loads(html) @@ -242,11 +334,11 @@ def get_comic_info_number(strip_details, parameters=None, latest_check=False): else: # General extractor - details["url"] = extract_meta_content(html, 'url') + details["url"] = extract_meta_content(html, "url") - details["title"] = extract_meta_content(html, 'title') + details["title"] = extract_meta_content(html, "title") - details["img_url"] = extract_meta_content(html, 'image') + details["img_url"] = extract_meta_content(html, "image") else: return None @@ -259,11 +351,16 @@ def get_comic_info_number(strip_details, parameters=None, latest_check=False): return details -def get_comic_info_rss(strip_details, param=None, comic_date=None, latest_check=False) -> Optional[dict[str, str]]: +def get_comic_info_rss( + strip_details, + action: Union[Action, ExtendedAction] = None, + comic_date=None, + latest_check=False, +) -> Optional[dict[str, str]]: """For comics which can only be found by rss :param strip_details: - :param param: + :param action: :param comic_date: :param latest_check: :return: @@ -279,31 +376,31 @@ def get_comic_info_rss(strip_details, param=None, comic_date=None, latest_check= details["author"] = strip_details["Author"] main_website = strip_details["Main_website"] - if main_website == 'https://garfieldminusgarfield.net/': - fall_back_img = 'https://64.media.tumblr.com/avatar_02c53466ae58_64.gif' - rss_site = 'https://garfieldminusgarfield.net/rss' + if main_website == "https://garfieldminusgarfield.net/": + fall_back_img = "https://64.media.tumblr.com/avatar_02c53466ae58_64.gif" + rss_site = "https://garfieldminusgarfield.net/rss" else: main_website += strip_details["Web_name"] fall_back_img = strip_details["Image"] rss_site = main_website.replace("list", "rss") # Gets today date - if param == "today": + if action == Action.Today: # First comic in the rss feed comic_nb = 0 - elif param == 'random': + elif action == Action.Random: comic_nb = random.randint(0, max_entries) details["title"] = strip_details["Name"] - if param == 'Specific_date': + if action == Action.Specific_date: details["img_url"] = fall_back_img - if main_website == 'https://garfieldminusgarfield.net/': + if main_website == "https://garfieldminusgarfield.net/": # Garfield minus Garfield date_formatted = comic_date.strftime("%Y/%m/%d") - main_website += 'day/' + date_formatted + main_website += "day/" + date_formatted details["url"] = main_website details["day"] = comic_date.strftime("%d") details["month"] = comic_date.strftime("%m") @@ -316,7 +413,7 @@ def get_comic_info_rss(strip_details, param=None, comic_date=None, latest_check= # Get information tz: str weekday: str - if strip_details["Main_website"] == 'https://www.webtoons.com/en/': + if strip_details["Main_website"] == "https://www.webtoons.com/en/": if feed.title != "": details["title"] = f"{feed.title}" weekday = "A" @@ -325,7 +422,9 @@ def get_comic_info_rss(strip_details, param=None, comic_date=None, latest_check= weekday = "a" tz = "z" - new_date = datetime.strptime(feed.publish_date, f"%{weekday}, %d %b %Y %H:%M:%S %{tz}") + new_date = datetime.strptime( + feed.publish_date, f"%{weekday}, %d %b %Y %H:%M:%S %{tz}" + ) details["day"] = new_date.strftime("%d") details["month"] = new_date.strftime("%m") details["year"] = new_date.strftime("%Y") @@ -333,7 +432,9 @@ def get_comic_info_rss(strip_details, param=None, comic_date=None, latest_check= details["url"] = feed.link img_index = 0 - if len(feed.description_images) > 1: # general check for a second image to embed + if ( + len(feed.description_images) > 1 + ): # general check for a second image to embed details["sub_img_url"] = feed.description_images[img_index].source img_index += 1 @@ -355,4 +456,4 @@ def finalize_comic(strip_details: dict, details: dict, latest_link: bool) -> Non """ details["color"] = int(strip_details["Color"], 16) if latest_link: - details["is_latest"] = utils.check_if_latest_link(details["Name"], details["img_url"]) + details["is_latest"] = check_if_latest_link(details["Name"], details["img_url"]) diff --git a/bdbot/__init__.py b/bdbot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/bdbot/__main__.py similarity index 53% rename from main.py rename to bdbot/__main__.py index 4eb4c83..c3e567c 100644 --- a/main.py +++ b/bdbot/__main__.py @@ -1,11 +1,14 @@ -import os -import logging import asyncio +import logging +import os +from datetime import datetime + import discord from discord.ext import commands from dotenv import load_dotenv -from datetime import datetime -from src import utils, discord_utils + +from bdbot import discord_utils, utils +from bdbot.utils import LOGS_DIRECTORY_PATH def main(): @@ -13,26 +16,31 @@ def main(): Main entry point for the bot """ os.chdir(os.path.dirname(__file__)) # Force the current working directory - load_dotenv() + load_dotenv(utils.ENV_FILE) intents = discord.Intents.default() - bot: discord.ext.commands.AutoShardedBot = commands.AutoShardedBot( + bot: discord.ext.commands.Bot = commands.Bot( intents=intents, command_prefix="bd!", help_command=None, - description=f"BDBot now supports slash commands! Re-invite the bot with /inv!", - shard_count=4 + description="BDBot now supports slash commands! Re-invite the bot with /inv!", ) - - logger = logging.getLogger('discord') - logger.setLevel(logging.DEBUG if os.getenv('DEBUG') == "True" else logging.INFO) - handler = logging.FileHandler(filename=f'src/data/logs/discord_{datetime.now().strftime("%Y_%m_%d_%H_%M")}.log', - encoding='utf-8', mode='w') - handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s')) - logger.addHandler(handler) + handler = logging.FileHandler( + filename=f'{LOGS_DIRECTORY_PATH}discord_{datetime.now().strftime("%Y_%m_%d_%H_%M")}.log', + encoding="utf-8", + mode="w", + ) + log_format = logging.Formatter("%(asctime)s:%(levelname)s:%(name)s: %(message)s") + discord.utils.setup_logging( + handler=handler, + formatter=log_format, + level=logging.DEBUG if os.getenv("DEBUG") == "True" else logging.INFO, + root=False, + ) + logger = logging.getLogger("discord") logger.info("Writing pid file...") - pid_file = "bdbot.pid" + pid_file = utils.PID_FILE try: utils.write_pid(pid_file) logger.info(f"Wrote pid to file {pid_file}") @@ -44,7 +52,7 @@ def main(): asyncio.run(run(bot, logger)) # Runs the bot with the private bot token -async def run(bot: commands.AutoShardedBot, logger: logging.Logger): +async def run(bot: commands.Bot, logger: logging.Logger): """Loads all the cogs and start the bot :param bot: The bot @@ -52,18 +60,16 @@ async def run(bot: commands.AutoShardedBot, logger: logging.Logger): """ logger.info("Setting up private server object") try: - discord_utils.SERVER = discord.Object(id=int(os.getenv("PRIVATE_SERVER_SUPPORT_ID"))) + discord_utils.SERVER = discord.Object( + id=int(os.getenv("PRIVATE_SERVER_SUPPORT_ID")) + ) logger.info("Private server set!") except TypeError: - logger.info("Could not set private server object, please be wary that owner commands are usable everywhere") + logger.info( + "Could not set private server object, please be wary that owner commands are usable everywhere" + ) discord_utils.SERVER = None - for filename in os.listdir('src/Scripts'): - if filename.endswith('py'): - await bot.load_extension(f'src.Scripts.{filename[:-3]}') - - logger.info("Cogs successfully loaded!") - logger.info("Loading comic details...") utils.strip_details = utils.load_json(utils.DETAILS_PATH) logger.info("Loaded comic details!") @@ -74,10 +80,18 @@ async def run(bot: commands.AutoShardedBot, logger: logging.Logger): logger.info("Loading latest comic links...") utils.link_cache = utils.load_json(utils.COMIC_LATEST_LINKS_PATH) + utils.link_cache = utils.fill_cache(utils.strip_details, utils.link_cache) logger.info("Loaded comic links!") + for filename in os.listdir("cogs"): + if filename.endswith("py") and filename != "__init__.py": + await bot.load_extension(f"bdbot.cogs.{filename[:-3]}") + + logger.info("Cogs successfully loaded!") + async with bot: - await bot.start(os.getenv('TOKEN')) + await bot.start(os.getenv("TOKEN")) + if __name__ == "__main__": main() diff --git a/bdbot_manager.py b/bdbot/bdbot_manager.py similarity index 50% rename from bdbot_manager.py rename to bdbot/bdbot_manager.py index e3b4fd1..bf2c553 100644 --- a/bdbot_manager.py +++ b/bdbot/bdbot_manager.py @@ -1,17 +1,33 @@ +import getpass import json import logging import os +import platform import re +import shutil import sys +from typing import Optional, Union from InquirerPy import inquirer -from InquirerPy.prompts import SecretPrompt, ListPrompt -from src.utils import load_json, DETAILS_PATH, REQUEST_FILE_PATH, DATABASE_FILE_PATH, save_json, save_backup, \ - create_link_cache -from typing import Union, Optional - -TEMP_FILE_PATH = "src/misc/comics_not_ready.json" -RETIRED_COMICS_PATH = "src/misc/retired_comics.json" +from InquirerPy.prompts import ListPrompt, SecretPrompt + +from bdbot.utils import ( + BACKUP_FILE_PATH, + BASE_DATA_PATH, + DATABASE_FILE_PATH, + DETAILS_PATH, + ENV_FILE, + FOOTERS_FILE_PATH, + LOGS_DIRECTORY_PATH, + REQUEST_FILE_PATH, + load_json, + save_backup, + save_json, +) +from bdbot.Web_requests_manager import create_link_cache + +TEMP_FILE_PATH = "misc/comics_not_ready.json" +RETIRED_COMICS_PATH = "misc/retired_comics.json" logger = logging.Logger("manager_logger", logging.INFO) @@ -26,8 +42,10 @@ def main(): action = "" while action != "Exit": - action = inquirer.select(message="What do you want to do?", choices=["Manage bot", "Manage comics", - "Exit"]).execute() + action = inquirer.select( + message="What do you want to do?", + choices=["Manage bot", "Manage comics", "Exit"], + ).execute() if action == "Manage bot": manage_bot() @@ -37,67 +55,116 @@ def main(): def manage_bot(): """Manage the bot and its settings""" - action = inquirer.select(message="What do you want to do?", - choices=["Database tools", "Verify requests", "Create image link cache", "Setup Bot"], - mandatory=False).execute() + action = "" + while action != "Return": + action = inquirer.select( + message="What do you want to do?", + choices=[ + "Database tools - Not implemented", + "Verify requests - Not implemented", + "Create image link cache", + "Setup Bot", + "Uninstall Bot", + "Return", + ], + mandatory=False, + ).execute() - if action == "Create image link cache": - logger.info("Running link cache, please wait up to 1-2 minutes...") - create_link_cache(logger) - logger.info("Link cache created!") - if action == "Setup Bot": - setup_bot() + if action == "Create image link cache": + logger.info("Running link cache, please wait up to 1-2 minutes...") + os.makedirs("data", exist_ok=True) + create_link_cache(logger) + logger.info("Link cache created!") + elif action == "Setup Bot": + setup_bot() + elif action == "Uninstall Bot": + conf = inquirer.confirm( + "Are you sure you want to uninstall the bot?" + ).execute() + if conf: + os.rmdir(BASE_DATA_PATH) + else: + logger.info("Canceled bot uninstallation") def setup_bot(): """Sets up the bot to be able to be launched""" + os.makedirs(BASE_DATA_PATH, exist_ok=True) logger.info("Setting up environment variables...") write_env = True - if os.path.exists("./env"): - write_env = inquirer.confirm("The file `.env` already exist. Do you want to overwrite it?") + if os.path.exists(ENV_FILE): + write_env = inquirer.confirm( + "The file `.env` already exist. Do you want to overwrite it?" + ).execute() if write_env: - environment_variables: dict[str, dict[str, Union[str, Union[SecretPrompt, ListPrompt]]]] = { - "TOKEN": {"value": "", "inquiry": inquirer.secret(message="Enter the token (The bot discord token):")}, - "CLIENT_ID": {"value": "", "inquiry": inquirer.secret(message="Enter the client ID (The bot client ID. " - "To get a invite for the bot):")}, + environment_variables: dict[ + str, dict[str, Union[str, Union[SecretPrompt, ListPrompt]]] + ] = { + "TOKEN": { + "value": "", + "inquiry": inquirer.secret( + message="Enter the token (The bot discord token):" + ), + }, + "CLIENT_ID": { + "value": "", + "inquiry": inquirer.secret( + message="Enter the client ID (The bot client ID. " + "To get a invite for the bot):" + ), + }, "PRIVATE_CHANNEL_SUPPORT_ID": { "value": "", "inquiry": inquirer.secret( message="Enter the ID of the private channel (The ID of the channel where the bot can print" - " debugging information):" - ) + " debugging information):" + ), }, "PRIVATE_SERVER_SUPPORT_ID": { "value": "", "inquiry": inquirer.secret( message="Enter the ID of the private server (The ID of the server where the bot can allow owner " - "commands):" - ) + "commands):" + ), + }, + "TOP_GG_TOKEN": { + "value": "", + "inquiry": inquirer.secret( + message="Enter the topgg token (if applicable):" + ), }, "DEBUG": { "value": "", "inquiry": inquirer.select( message="Is the bot used for development purposes? (Should be False if the bot is supposed to" - " serve multiple servers):", - choices=["True", "False"] - ) - } + " serve multiple servers):", + choices=["True", "False"], + ), + }, } for envv in environment_variables: - environment_variables[envv]["value"] = environment_variables[envv]["inquiry"].execute() - - output = "\n".join([f"{envv}={environment_variables[envv]['value']}" for envv in environment_variables]) - usage = "w" if os.path.exists(".env") else "x" - with open(".env", f"{usage}t") as f: + environment_variables[envv]["value"] = environment_variables[envv][ + "inquiry" + ].execute() + + output = "\n".join( + [ + f"{envv}={environment_variables[envv]['value']}" + for envv in environment_variables + ] + ) + usage = "w" if os.path.exists(ENV_FILE) else "x" + with open(ENV_FILE, f"{usage}t") as f: f.write(output) logger.info("Creating folders and files...") - os.makedirs("src/data/backups", exist_ok=True) - os.makedirs("src/data/logs", exist_ok=True) + os.makedirs(os.path.dirname(BACKUP_FILE_PATH), exist_ok=True) + os.makedirs(LOGS_DIRECTORY_PATH, exist_ok=True) + os.makedirs(os.path.dirname(DETAILS_PATH), exist_ok=True) if not os.path.exists(DATABASE_FILE_PATH): with open(DATABASE_FILE_PATH, "xt") as f: @@ -107,10 +174,49 @@ def setup_bot(): with open(REQUEST_FILE_PATH, "xt"): pass + logger.info("Copying details and footer files...") + continue_copy = True + if os.path.exists(DETAILS_PATH): + continue_copy = inquirer.confirm( + "Details file already seem present, do you want to overwrite them?" + ).execute() + + if continue_copy: + shutil.copy("misc/comics_details.json", DETAILS_PATH) + shutil.copy("misc/random-footers.txt", FOOTERS_FILE_PATH) + logger.info("Creating link cache, this might take some time...") create_link_cache(logger) - logger.info("All done! You can start the bot with 'python main.py'!") + if platform.system() == "Linux": + # Tries to install service file and command to run the bot only on Linux + user = getpass.getuser() + local_service_path = "misc/runbdbot.service" + service_path = "/etc/systemd/system/" + dst_service_path = f"/home/{user}" + command = "sudo systemctl daemon-reload && sudo systemctl enable --now runbdbot.service" + install_service = inquirer.confirm( + "Do you want to install the service file which will let you run the bot automatically in the" + f" background?\n The file will need to be copied manually at {service_path} with root privileges and\n" + f" enabled with '{command}'?" + ).execute() + + if install_service: + with open(local_service_path, "rt") as f: + service_file = f.read() + + service_file = service_file.replace("{USER}", user) + service_file = service_file.replace("{PACKAGE_DIRECTORY}", os.getcwd()) + + with open(f"{dst_service_path}/runbdbot.service", "wt") as f: + f.write(service_file) + + logger.info( + f"The service file is now available at {dst_service_path} and is ready to be moved to" + f" {service_path}. Do not forget to enable the service with {command}!" + ) + + logger.info("All done! You can start the bot with 'python -m bdbot'!") def manage_comics(): @@ -123,7 +229,7 @@ def manage_comics(): action = inquirer.select( message="What do you want to do with the comics?", choices=["Add", "Delete", "Modify", "Return"], - mandatory=False + mandatory=False, ).execute() if action == "Delete" or action == "Modify": @@ -137,8 +243,9 @@ def manage_comics(): def choose_comic(action: str, comics: dict): comic = inquirer.fuzzy( message=f"What comic do you want to {action.lower()}?", - choices=[f"{comics[x]['Position']}. {comics[x]['Name']}" for x in comics] + ["Return"], - mandatory=False + choices=[f"{comics[x]['Position']}. {comics[x]['Name']}" for x in comics] + + ["Return"], + mandatory=False, ).execute() if comic is None or comic == "Return": @@ -157,50 +264,87 @@ def add_comic(comics: dict): websites = {"Gocomics": None, "ComicsKingdom": None, "Webtoon": None} socials = ["Website", "Facebook", "Twitter", "Youtube", "Patreon", "About"] first_date = {"Year": 0, "Month": 0, "Day": 0} - name = inquirer.text(message="What is the name of the comic? ", mandatory=False).execute() - author = inquirer.text(message="Who is the creator of the comic? ", mandatory=False).execute() - web_name = inquirer.text(message="Enter the name of the comic as it is written in the link to its main " - "page: ", mandatory=False).execute() - main_website = inquirer.text(message="What is the main website of the comic?", - completer={"Gocomics": None, "ComicsKingdom": None, "Webtoon": None}, - mandatory=False).execute() + name = inquirer.text( + message="What is the name of the comic? ", mandatory=False + ).execute() + author = inquirer.text( + message="Who is the creator of the comic? ", mandatory=False + ).execute() + web_name = inquirer.text( + message="Enter the name of the comic as it is written in the link to its main " + "page: ", + mandatory=False, + ).execute() + main_website = inquirer.text( + message="What is the main website of the comic?", + completer={"Gocomics": None, "ComicsKingdom": None, "Webtoon": None}, + mandatory=False, + ).execute() if main_website not in websites: - working_type = inquirer.select(message="What is the working type of the comic? (For example,are comics " - "accessible by specifying a date, a number or is there a rss " - "available?)\nIf you do not know, please choose other. ", - choices=["date", "number", "rss", "other"], mandatory=False).execute() + working_type = inquirer.select( + message="What is the working type of the comic? (For example,are comics " + "accessible by specifying a date, a number or is there a rss " + "available?)\nIf you do not know, please choose other. ", + choices=["date", "number", "rss", "other"], + mandatory=False, + ).execute() elif main_website == "Gocomics" or main_website == "ComicsKingdom": working_type = "date" else: working_type = "rss" - description = inquirer.text(message="Enter a long description of the comic: ", mandatory=False).execute() + description = inquirer.text( + message="Enter a long description of the comic: ", mandatory=False + ).execute() for social in socials: social_link: str = inquirer.text( message=f"Does this comic has a {social} page? (leave blank if not applicable) ", mandatory=False, - default="").execute() + default="", + ).execute() if social_link.strip(): description += f"\n{social}: {social_link}" if working_type == "date": for date in first_date: - first_date[date] = inquirer.number(message=f"What is the first date of the comic? " - f"Please enter the {date}: ", mandatory=False).execute() + first_date[date] = inquirer.number( + message=f"What is the first date of the comic? " + f"Please enter the {date}: ", + mandatory=False, + ).execute() elif working_type == "number" or working_type == "rss": first_date = "1" else: first_date = "" - color = inquirer.text(message="Enter the hexadecimal code of the most represented color in this comic " - "(without the 0x)", - validate=lambda x: re.match("[\\dA-F]{6}", x) is not None, mandatory=False).execute() - image = inquirer.text(message="Enter the link of a public image that represents well the comic: ").execute() - helptxt = inquirer.text(message="Write in one phrase a description of the comic.", - validate=lambda x: 100 >= len(x), - invalid_message="This short description must be equal or less than 100 characters!", - mandatory=False).execute() - final_comic_dict = process_inputs(name, author, web_name, main_website, working_type, description, len(comics), - first_date, color, image, helptxt) + color = inquirer.text( + message="Enter the hexadecimal code of the most represented color in this comic " + "(without the 0x)", + validate=lambda x: re.match("[\\dA-F]{6}", x) is not None, + mandatory=False, + ).execute() + image = inquirer.text( + message="Enter the link of a public image that represents well the comic: ", + mandatory=False, + ).execute() + help_txt = inquirer.text( + message="Write in one phrase a description of the comic.", + validate=lambda x: 100 >= len(x), + invalid_message="This short description must be equal or less than 100 characters!", + mandatory=False, + ).execute() + final_comic_dict = process_inputs( + name, + author, + web_name, + main_website, + working_type, + description, + len(comics), + first_date, + color, + image, + help_txt, + ) logger.info("Final comic data:") logger.info(json.dumps(final_comic_dict, indent=4)) confirm = inquirer.confirm("Is the data good?").execute() @@ -209,24 +353,30 @@ def add_comic(comics: dict): comics.update(final_comic_dict) save_json(comics, file_path=DETAILS_PATH) logger.info("Update done!") - - # Create a command - logger.info("Creating command...") - command = create_command(final_comic_dict) - logger.info("Here is your command:\n'''") - logger.info(command) - logger.info(f"'''\nAdd it to the end of {os.getcwd()}/src/Scripts/Comic.py to make the comic executable.") else: # Adds the details to a temporary file absolute_path = os.getcwd() + "/" + TEMP_FILE_PATH logger.info(f"Writing dictionary to a temporary location.... ({absolute_path})") temp_comic_data = open_json_if_exist(absolute_path) temp_comic_data.update(final_comic_dict) save_json(temp_comic_data, absolute_path) - logger.info("Wrote the details to the temporary file! You can edit this file manually or with this tool!") - - -def process_inputs(name: str, author: str, web_name: str, main_website: str, working_type: str, description: str, - position: int, first_date: Union[str, dict], color: str, image: str, helptxt: str) -> dict: + logger.info( + "Wrote the details to the temporary file! You can edit this file manually or with this tool!" + ) + + +def process_inputs( + name: str, + author: str, + web_name: str, + main_website: str, + working_type: str, + description: str, + position: int, + first_date: Union[str, dict], + color: str, + image: str, + helptxt: str, +) -> dict: """ :param name: @@ -245,7 +395,7 @@ def process_inputs(name: str, author: str, web_name: str, main_website: str, wor websites = { "Gocomics": "https://www.gocomics.com/", "ComicsKingdom": "https://comicskingdom.com/", - "Webtoon": "https://www.webtoons.com/en/" + "Webtoon": "https://www.webtoons.com/en/", } normalized_name = name.replace(" ", "") return { @@ -253,39 +403,21 @@ def process_inputs(name: str, author: str, web_name: str, main_website: str, wor "Name": name, "Author": author, "Web_name": web_name, - "Main_website": websites[main_website] if main_website in websites else main_website, + "Main_website": websites.get(main_website, main_website), "Working_type": working_type, "Description": description, "Position": position, - "First_date": first_date if type(first_date) is str else f"{first_date['Year']}, {first_date['Month']}, " - f"{first_date['Day']}", + "First_date": first_date + if type(first_date) is str + else f"{first_date['Year']}, {first_date['Month']}, " + f"{first_date['Day']}", "Color": color, "Image": image, - "Helptxt": helptxt + "Helptxt": helptxt, } } -def create_command(cmc: dict) -> str: - """ - - :param cmc: - :return: - """ - comic = cmc.get(list(cmc.keys())[0]) - normalized_name = comic['Name'].replace(" ", "_").lower() - return f""" - @commands.hybrid_command() - async def {normalized_name}(self, ctx: discord.ext.commands.Context, use: str = None, date: str = None, - hour: str = None): - \"\"\"{comic['Name']}\"\"\" - comic_name = '{comic['Name'].replace(" ", "")}' - - # Interprets the parameters given by the user - await utils.parameters_interpreter(ctx, utils.get_strip_details(comic_name), param=use, date=date, hour=hour) - """ - - def delete(comics: dict, comic: str): """ Removes a comic from the main configuration file and move it to a retired configuration file. @@ -295,7 +427,9 @@ def delete(comics: dict, comic: str): """ comic_number, comic_name = comic.split(". ") comic_number = int(comic_number) - confirm = inquirer.confirm(message=f"Are you sure you want to delete {comic_name}?").execute() + confirm = inquirer.confirm( + message=f"Are you sure you want to delete {comic_name}?" + ).execute() if confirm: abs_path = os.getcwd() + "/" + RETIRED_COMICS_PATH @@ -316,8 +450,10 @@ def delete(comics: dict, comic: str): save_json(retired_comics, abs_path) logger.info("Deletion successful in the details file!") - update_database = inquirer.confirm(message="Do you want to update the database as well? (Note: A backup will " - "be made in case this step breaks the database.)").execute() + update_database = inquirer.confirm( + message="Do you want to update the database as well? (Note: A backup will " + "be made in case this step breaks the database.)" + ).execute() if update_database: database_update(comic_number) else: @@ -338,7 +474,7 @@ def open_json_if_exist(absolute_path: str) -> dict: if os.path.exists(absolute_path): return load_json(absolute_path) else: - open(absolute_path, 'x').close() + open(absolute_path, "x").close() return temp_comic_data @@ -390,8 +526,11 @@ def modify(comics: dict, comic: str): comic_dict = comics[comic_dict_key] while comic_property != "Return": - comic_property = inquirer.select(message=f"Which comic_property of the comic {comic_name} do you want to edit?", - choices=[prop for prop in comic_dict] + ["Return"], mandatory=False).execute() + comic_property = inquirer.select( + message=f"Which property of the comic {comic_name} do you want to edit?", + choices=[prop for prop in comic_dict] + ["Return"], + mandatory=False, + ).execute() if comic_property != "" and comic_property != "Return": comic_dict = modify_property(comic_dict, comic_property) @@ -409,20 +548,27 @@ def modify_property(comic_dict: dict, comic_property: str) -> dict: :return: """ property_value = comic_dict[comic_property] - logger.info(f"Current {comic_property!r} value:\n`\n{comic_dict[comic_property]}\n`") + logger.info( + f"Current {comic_property!r} value:\n`\n{comic_dict[comic_property]}\n`" + ) completer: Optional[dict] = None if type(property_value) is str: completer = {word: None for word in property_value} - new_value = inquirer.text(message="What new value do you want to give this property?", mandatory=False, - completer=completer).execute() + new_value = inquirer.text( + message="What new value do you want to give this property?", + mandatory=False, + completer=completer, + ).execute() if new_value == "": logger.info(f"{comic_property!r} has not been changed.") else: - confirm = inquirer.confirm(message=f"Are you sure your want to set " - f"{comic_property!r} to \n`\n{new_value}\n` ?").execute() + confirm = inquirer.confirm( + message=f"Are you sure your want to set " + f"{comic_property!r} to \n`\n{new_value}\n` ?" + ).execute() if confirm: logger.info(f"Updating property {comic_property!r}...") diff --git a/src/Scripts/AutomaticPoster.py b/bdbot/cogs/AutomaticPoster.py similarity index 51% rename from src/Scripts/AutomaticPoster.py rename to bdbot/cogs/AutomaticPoster.py index 0de320b..580a183 100644 --- a/src/Scripts/AutomaticPoster.py +++ b/bdbot/cogs/AutomaticPoster.py @@ -1,9 +1,34 @@ -import discord - from datetime import datetime, timedelta, timezone -from discord.ext import tasks, commands -from src import utils, Web_requests_manager, discord_utils -from typing import Optional +from typing import Iterable, Optional + +import discord +from discord import app_commands +from discord.ext import commands, tasks + +from bdbot.discord_utils import ( + clean_database, + create_embed, + logger, + run_blocking, + send_chan_embed, + SERVER, send_message, is_owner, +) +from bdbot.utils import ( + COMIC_LATEST_LINKS_PATH, + DATABASE_FILE_PATH, + Action, + Date, + date_to_db, + get_hour, + get_today, + link_cache, + load_json, + parse_all, + save_backup, + save_json, + strip_details, restore_backup, +) +from bdbot.Web_requests_manager import get_new_comic_details class PosterHandler(commands.Cog): @@ -11,17 +36,17 @@ class PosterHandler(commands.Cog): Manages automatic posting of hourly comic strips """ - def __init__(self, bot: commands.Bot): + def __init__(self, bot: discord.Client): """ Construct the cog. :param bot: The discord bot """ - self.bot: commands.Bot = bot + self.bot: discord.Client = bot self.do_cleanup: bool = True - @commands.hybrid_command(hidden=True, guilds=discord_utils.SERVER) - @commands.is_owner() + # @app_commands.command(hidden=True, guilds=SERVER) + # @app_commands.is_owner() async def start_hourly(self, ctx: commands.Context): """Starts the PosterHandler loop""" await ctx.send("Hourly loop started! Hourly comics are posted at each hour.") @@ -30,18 +55,25 @@ async def start_hourly(self, ctx: commands.Context): async def wait_for_next_hour(self): """Wait for the time to restart the hourly loop""" - sleep_date = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + sleep_date = datetime.now(timezone.utc).replace( + minute=0, second=0, microsecond=0 + ) + timedelta(hours=1) await discord.utils.sleep_until(sleep_date) await PosterHandler.post_hourly.start(self) - @commands.hybrid_command(hidden=True, guilds=discord_utils.SERVER) + @app_commands.command() + @app_commands.guilds(SERVER.id) @commands.is_owner() - async def force_hourly(self, ctx: commands.Context, hour: Optional[int] = None): + async def force_hourly(self, inter: discord.Interaction, hour: Optional[int] = None): """Force the push of comics to all subscribed servers - :param ctx: The context of the where the command was called. + + :param inter: The context of the where the command was called. + :param hour: The hour to simulate """ - await ctx.send(f'Trying to force the hourly post for hour {utils.get_hour() if not hour else hour}h UTC') + await inter.response.send_message( + f"Trying to force the hourly post for hour {get_hour() if not hour else hour}h UTC" + ) await self.hourly(hour) @tasks.loop(hours=1) @@ -50,24 +82,25 @@ async def post_hourly(self): try: await self.hourly() except Exception as e: - discord_utils.logger.error(str(e)) + logger.error(str(e)) async def hourly(self, hour: Optional[int] = None): """Post hourly comics""" - discord_utils.logger.info("Starting automatic poster...") - strip_details: dict = utils.strip_details - comic_data: dict = utils.load_json(utils.DATABASE_FILE_PATH) + logger.info("Starting automatic poster...") + comic_data: dict = load_json(DATABASE_FILE_PATH) comic_list: dict = {} comic_keys: list[str] = list(strip_details.keys()) - post_days: list[str] = ["D", utils.get_today()] + post_days = [Date.Daily, get_today()] + if not hour: - hour: str = utils.get_hour() + hour = get_hour() + hour = str(hour) if hour == "6": - utils.save_backup(comic_data, discord_utils.logger) + save_backup(comic_data, logger) if self.do_cleanup: - utils.clean_database(data=comic_data, logger=discord_utils.logger) + clean_database(data=comic_data, logger_=logger) # Construct the list of what comics need to be sent for guild in comic_data: @@ -76,11 +109,13 @@ async def hourly(self, hour: Optional[int] = None): await self.check_comics_and_post(comic_list, strip_details, comic_keys) - utils.save_json(utils.link_cache, utils.COMIC_LATEST_LINKS_PATH) # Saves the link cache + save_json(link_cache, COMIC_LATEST_LINKS_PATH) # Saves the link cache - discord_utils.logger.info("Finished automatic poster.") + logger.info("Finished automatic poster.") - def get_comic_info_for_guild(self, guild_data: dict, comic_list: dict, post_days: list[str], hour: str): + def get_comic_info_for_guild( + self, guild_data: dict, comic_list: dict, post_days: Iterable[Date], hour: str + ): """Get the comic info for each server. This method mutate 'comic_list' for each comic. :param guild_data: All the information of the server @@ -94,20 +129,36 @@ def get_comic_info_for_guild(self, guild_data: dict, comic_list: dict, post_days # First check if it wants only the latest comics if "latest" in guild_data["channels"][channel]: latest_comics: list[int] = guild_data["channels"][channel]["latest"] - comic_list: dict = self.set_comic_to_post(guild_data, channel, comic_list, latest_comics, hour) + comic_list: dict = self.set_comic_to_post( + guild_data, channel, comic_list, latest_comics, hour + ) comic_list[channel].update({"latest_comics": latest_comics}) # Then check if the comic is wanted for a specific time for day in post_days: + day = date_to_db(day) if "date" in guild_data["channels"][channel]: if day in guild_data["channels"][channel]["date"]: if hour in guild_data["channels"][channel]["date"][day]: - hour_specific_comics: list[int] = guild_data["channels"][channel]["date"][day][hour] - comic_list: dict = self.set_comic_to_post(guild_data, channel, comic_list, - hour_specific_comics, hour) - - def set_comic_to_post(self, guild_data: dict, channel: str, comic_list: dict, comics_to_add: list[int], - hour: str) -> dict: + hour_specific_comics: list[int] = guild_data[ + "channels" + ][channel]["date"][day][hour] + comic_list: dict = self.set_comic_to_post( + guild_data, + channel, + comic_list, + hour_specific_comics, + hour, + ) + + def set_comic_to_post( + self, + guild_data: dict, + channel: str, + comic_list: dict, + comics_to_add: list[int], + hour: str, + ) -> dict: """Set one comic to post on one channel :param guild_data: All the information of the server @@ -120,36 +171,47 @@ def set_comic_to_post(self, guild_data: dict, channel: str, comic_list: dict, co to_mention = guild_data["mention"] role: Optional[discord.Role] = None - if ('only_daily' in guild_data) and \ - (not guild_data["only_daily"] or hour == "6") and \ - ("role" in guild_data) and to_mention: + if ( + ("only_daily" in guild_data) + and (not guild_data["only_daily"] or hour == "6") + and ("role" in guild_data) + and to_mention + ): # Check if: # - A role is set # - The role can be mentioned anytime, or it is 6 AM UTC # - And the guild wants to be mentioned role = discord.Guild.get_role( - self.bot.get_guild(guild_data["server_id"]), guild_data["role"]) - - comic_list.update({ - channel: { - "channel": channel, - "comics": comics_to_add, - "role": role, - "hasBeenMentioned": False, - "wantMention": to_mention + self.bot.get_guild(guild_data["server_id"]), guild_data["role"] + ) + + comic_list.update( + { + channel: { + "channel": channel, + "comics": comics_to_add, + "role": role, + "hasBeenMentioned": False, + "wantMention": to_mention, + } } - }) + ) else: comic_list[channel]["comics"].extend(comics_to_add) return comic_list - async def check_comics_and_post(self, comic_list: dict, strip_details: dict, comic_keys: list[str], - called_channel: Optional[discord.TextChannel] = None): + async def check_comics_and_post( + self, + comic_list: dict, + comic_details: dict, + comic_keys: list[str], + called_channel: Optional[discord.TextChannel] = None, + ): """Load comics and check if they are the latest ones. Finally, post the comic to the channels. :param comic_list: The information about where to post each comic and how - :param strip_details: The details of the comic strip + :param comic_details: The details of the comic strip :param comic_keys: The name of all the comics :param called_channel: The channel of where the command was sent from (Should be None for the hourly poster and filled when called manually) @@ -158,7 +220,7 @@ async def check_comics_and_post(self, comic_list: dict, strip_details: dict, com not_available_channels = {} nb_of_comics_posted = 0 # Check if any guild want the comic - for i in range(len(strip_details)): + for i in range(len(comic_details)): count = 0 for chan in comic_list: if i in comic_list[chan]["comics"]: @@ -169,18 +231,20 @@ async def check_comics_and_post(self, comic_list: dict, strip_details: dict, com # Get the details of the comic comic_details: Optional[dict] try: - comic_details = await discord_utils.run_blocking( - Web_requests_manager.get_new_comic_details, + comic_details = await run_blocking( + get_new_comic_details, self.bot, strip_details[comic_keys[i]], - "today", latest_check=True + Action.Today, + latest_check=True, ) - except Exception: + except Exception as e: # Anything can happen (connection problem, etc... and the bot will crash if any error # is raised in the poster loop) + logger.error(f"An error occurred while getting a comic: {e}") comic_details = None - embed = discord_utils.create_embed(comic_details) # Creates the embed + embed = create_embed(comic_details) # Creates the embed is_latest: bool if comic_details is not None: @@ -190,24 +254,40 @@ async def check_comics_and_post(self, comic_list: dict, strip_details: dict, com if is_latest and called_channel is None: # Only updates the link cache if it is done during the hourly loop - utils.link_cache[comic_details["Name"]] = comic_details["img_url"] + link_cache[comic_details["Name"]] = comic_details["img_url"] for channel in comic_list: # Finally, sends the comic - nb_of_comics_posted += await self.load_channel_and_send(i, comic_list, channel, embed, is_latest, - available_channels, not_available_channels, - called_channel) + nb_of_comics_posted += await self.load_channel_and_send( + i, + comic_list, + channel, + embed, + is_latest, + available_channels, + not_available_channels, + called_channel, + ) if called_channel is None: # Only logs the hourly loop at the end - discord_utils.logger.info(f"The hourly loop sent {nb_of_comics_posted} comic(s) the " - f"{datetime.now().strftime('%dth of %B %Y at %Hh')}") + logger.info( + f"The hourly loop sent {nb_of_comics_posted} comic(s) the " + f"{datetime.now().strftime('%dth of %B %Y at %Hh')}" + ) if called_channel is not None and nb_of_comics_posted == 0: # If it was called manually ('post' command), and there is no comics to post anywhere in the guild, # it will warn in the channel that no comics needed to be sent, and it will conclude await called_channel.send("No comics to send!") - async def load_channel_and_send(self, comic_number: int, comic_list: dict, channel: str, - embed: discord.Embed, is_latest: bool, available_channels: dict, - not_available_channels: dict, - called_channel: Optional[discord.TextChannel] = None) -> int: + async def load_channel_and_send( + self, + comic_number: int, + comic_list: dict, + channel: str, + embed: discord.Embed, + is_latest: bool, + available_channels: dict, + not_available_channels: dict, + called_channel: Optional[discord.TextChannel] = None, + ) -> int: """Sends the loaded comic to the specified channel :param comic_number: The number of the comic to send @@ -223,25 +303,43 @@ async def load_channel_and_send(self, comic_number: int, comic_list: dict, chann :returns: 1 if it posted a comic, 0 if it could/did not """ # First, check that the comic is the latest and if that channel only wants the latest (for this comic) - latest_comics = comic_list[channel]["latest_comics"] if "latest_comics" in comic_list[channel] else [] - if comic_number in comic_list[channel]["comics"] and (comic_number not in latest_comics - or (comic_number in latest_comics and is_latest)): + latest_comics = ( + comic_list[channel]["latest_comics"] + if "latest_comics" in comic_list[channel] + else [] + ) + if comic_number in comic_list[channel]["comics"] and ( + comic_number not in latest_comics + or (comic_number in latest_comics and is_latest) + ): # Then, gets the channel object by its ID channel_id = int(comic_list[channel]["channel"]) if channel_id not in available_channels: - chan = self.bot.get_channel(channel_id) # Retrieves the channel object by the discord client + chan = self.bot.get_channel( + channel_id + ) # Retrieves the channel object by the discord client # And save it for future use (so it can be looked up later) available_channels.update({channel_id: chan}) else: - chan = available_channels.get(channel_id) # Use the cached channel object - - if chan is not None and channel_id not in not_available_channels and \ - chan.permissions_for(chan.guild.get_member(self.bot.user.id)).send_messages: + chan = available_channels.get( + channel_id + ) # Use the cached channel object + + if ( + chan is not None + and channel_id not in not_available_channels + and chan.permissions_for( + chan.guild.get_member(self.bot.user.id) + ).send_messages + ): # Makes sure that the channel is available (e.g. channel object is not None and the bot # can send messages) try: - if not comic_list[channel]["hasBeenMentioned"] and comic_list[channel]["wantMention"]: + if ( + not comic_list[channel]["hasBeenMentioned"] + and comic_list[channel]["wantMention"] + ): # Checks if the channel want the original mention ('Comics for , UTC @') if comic_list[channel]["role"] is not None: # Checks if there is a role to mention @@ -249,38 +347,51 @@ async def load_channel_and_send(self, comic_number: int, comic_list: dict, chann else: role_mention = "" - await chan.send(f"Comics for " - f"{datetime.now(timezone.utc).strftime('%A %B %dth %Y, %H h UTC')}" - f" {role_mention}") - comic_list[channel]["hasBeenMentioned"] = True # Sets the channel as already mentioned - - await discord_utils.send_embed(chan, None, [embed]) # Sends the comic embed (most important) + await chan.send( + f"Comics for " + f"{datetime.now(timezone.utc).strftime('%A %B %dth %Y, %H h UTC')}" + f" {role_mention}" + ) + comic_list[channel][ + "hasBeenMentioned" + ] = True # Sets the channel as already mentioned + + await send_chan_embed( + chan, embed + ) # Sends the comic embed (most important) return 1 except Exception as e: # There is too many things that can go wrong here, just catch everything error_msg = f"An error occurred in the hourly poster: {e.__class__.__name__}: {e}" - discord_utils.logger.error(error_msg) + logger.error(error_msg) - if called_channel is not None: # Send the error message to the channel too + if called_channel is not None: + # Send the error message to the channel too await called_channel.send(error_msg) else: - not_available_channels.update({channel_id: None}) # Remembers that the channel is not available - if called_channel is not None: # If it can, send a message to the channel if an error occurred + not_available_channels.update( + {channel_id: None} + ) # Remembers that the channel is not available + if ( + called_channel is not None + ): # If it can, send a message to the channel if an error occurred if chan is None: chan = comic_list[channel]["channel"] else: chan = chan.mention - await called_channel.send(f"Could not send message to channel {chan}") + await called_channel.send( + f"Could not send message to channel {chan}" + ) else: # Logs that a channel is not available but still signed up for a comic - discord_utils.logger.warning("A comic could not be posted to a channel.") + logger.warning("A comic could not be posted to a channel.") return 0 # If it encountered an issue or there is no comic to send, return 0 - @commands.hybrid_command() - @commands.has_permissions(manage_guild=True) - @commands.guild_only() + @app_commands.command() + @app_commands.checks.has_permissions(manage_guild=True) + @app_commands.guild_only() async def post(self, ctx: commands.Context, date: str = None, hour: str = None): """Force the comic post for a single server. @@ -288,68 +399,79 @@ async def post(self, ctx: commands.Context, date: str = None, hour: str = None): :param date: The date to simulate :param hour: The hour to simulate """ - strip_details: dict = utils.strip_details - comic_data: dict = utils.load_json(utils.DATABASE_FILE_PATH) + comic_data: dict = load_json(DATABASE_FILE_PATH) comic_list: dict = {} comic_keys: list[str] = list(strip_details.keys()) guild_id: str = str(ctx.guild.id) if guild_id in comic_data: # Gets date and hour of force post - final_date, final_hour = utils.parse_all(date, hour, default_date=utils.get_today(), - default_hour=utils.get_hour()) - await ctx.send(f"Looking for comics to post for date: {utils.match_date[final_date]} at " - f"{final_hour}h UTC") - post_days = ["D", final_date] + final_date, final_hour = parse_all( + date, + hour, + default_date=get_today(), + default_hour=get_hour(), + ) + await ctx.send( + f"Looking for comics to post for date: {final_date} at " + f"{final_hour}h UTC" + ) + post_days = (Date.Daily, final_date) final_hour = str(final_hour) # Gets the comic info for the guild - self.get_comic_info_for_guild(comic_data[guild_id], comic_list, post_days, final_hour) + self.get_comic_info_for_guild( + comic_data[guild_id], comic_list, post_days, final_hour + ) # If there is comic to send if len(comic_list) > 0: - await self.check_comics_and_post(comic_list, strip_details, comic_keys, called_channel=ctx.channel) + await self.check_comics_and_post( + comic_list, strip_details, comic_keys, called_channel=ctx.channel + ) else: await ctx.send("No comics to send!") else: # Warns that no comic are available await ctx.send("This server is not subscribed to any comic!") - @commands.hybrid_command(hidden=True, guilds=discord_utils.SERVER) - @commands.is_owner() - async def update_database_clean(self, ctx: commands.Context): + @app_commands.command() + @app_commands.guilds(SERVER.id) + @app_commands.checks.check(is_owner) + async def update_database_clean(self, inter: discord.Interaction): """Clean the database from servers that don't have any comics saved - :param ctx: The context of the where the command was called. + :param inter: The context of the where the command was called. """ - nb_removed = utils.clean_database(strict=True, logger=discord_utils.logger) + nb_removed = clean_database(strict=True, logger_=logger) - await ctx.send(f'Cleaned the database from {nb_removed} inactive server(s).') + await send_message(inter, f'Cleaned the database from {nb_removed} inactive server(s).') - @commands.hybrid_command(hidden=True, guilds=discord_utils.SERVER) + @app_commands.command() + @app_commands.guilds(SERVER.id) @commands.is_owner() - async def restore_last_backup(self, ctx: commands.Context): + async def restore_last_backup(self, inter: discord.Interaction): """Restore a previous backup - :param ctx: The context of the where the command was called. + :param inter: The context of the where the command was called. """ # Stops the database cleaning and restore the last backup self.do_cleanup = False - utils.restore_backup() + restore_backup() - await ctx.send("Last backup restored! Please reboot the bot to re-enable automatic cleanups!") + await send_message(inter, "Last backup restored! Please reboot the bot to re-enable automatic cleanups!") - @commands.hybrid_command(hidden=True, guilds=discord_utils.SERVER) + @app_commands.command() + @app_commands.guilds(SERVER.id) @commands.is_owner() - async def do_backup(self, ctx: commands.Context): + async def do_backup(self, inter: discord.Interaction): """Force a backup - :param ctx: The context of the where the command was called. - """ + :param inter: The context of the where the command was called. + """ # Force a backup - utils.save_backup(utils.load_json(utils.DATABASE_FILE_PATH), discord_utils.logger) - - await ctx.send("Backup done!") + save_backup(load_json(DATABASE_FILE_PATH), logger) + await send_message(inter, "Backup done!") async def setup(bot: commands.Bot): diff --git a/bdbot/cogs/BDbot.py b/bdbot/cogs/BDbot.py new file mode 100644 index 0000000..bb1207e --- /dev/null +++ b/bdbot/cogs/BDbot.py @@ -0,0 +1,401 @@ +import math +import os +import re +from datetime import datetime, timezone +from typing import Union + +import discord +import topgg +from discord import app_commands, ui +from discord.ext import commands + +from bdbot import discord_utils, utils +from bdbot.cogs.AutomaticPoster import PosterHandler +from bdbot.cogs.Comics import Comic +from bdbot.discord_utils import NextSend, send_message, SERVER, on_error, is_owner +from bdbot.utils import Date + + +class BDBot(commands.Cog): + """Class responsible for main functions of the bot""" + + def __init__(self, bot: commands.Bot): + """Constructor of the cog + + Initialize all the properties of the cog""" + self.strip_details: dict = utils.load_json(utils.DETAILS_PATH) + self.bot: commands.Bot = bot + self.topggpy = None + self.start_time: datetime = datetime.now(timezone.utc) + self.bot.tree.error(on_error) + + @commands.Cog.listener() + async def on_ready(self): + """On start of the bot""" + # Set owner id + app_info = await self.bot.application_info() + discord_utils.OWNER = self.bot.owner_id = app_info.owner.id + + # Change the bot activity + await discord_utils.update_presence(self.bot) + discord_utils.logger.info(f"Logged in as {self.bot.user}") + channel_id: int = int(os.getenv("PRIVATE_CHANNEL_SUPPORT_ID")) + + channel: discord.TextChannel = self.bot.get_channel(channel_id) + + # Sends this message whenever restarting the bot + await channel.send("Bot restarted. I will now try to sync the commands.") + + # Sync the commands + guild: Union[None, discord.Guild] = None + command_tree: discord.app_commands.CommandTree = self.bot.tree + if os.getenv("DEBUG") == "True": + guild = channel.guild + command_tree.copy_global_to(guild=guild) + await channel.send(f"Syncing commands to server {guild.name} ...") + else: + await channel.send("Syncing global commands...") + + await command_tree.sync(guild=guild) + + await channel.send( + "Finished syncing commands. An hour might be needed for global commands to be available!" + ) + + async with self.bot: + await PosterHandler.wait_for_next_hour( + PosterHandler(self.bot) + ) # Wait for daily poster + + @commands.Cog.listener() + async def on_guild_remove(self, guild: discord.Guild): + """When the bot is removed from a server""" + discord_utils.remove_guild(guild, use=utils.ExtendedAction.Auto_remove_guild) + + @commands.Cog.listener() + async def on_guild_channel_delete(self, deleted_channel: discord.abc.GuildChannel): + """When a guild channel is deleted""" + discord_utils.remove_channel( + deleted_channel, use=utils.ExtendedAction.Auto_remove_channel + ) + + @commands.Cog.listener() + async def on_private_channel_delete( + self, deleted_channel: discord.abc.GuildChannel + ): + """When a private channel is deleted""" + discord_utils.remove_channel( + deleted_channel, use=utils.ExtendedAction.Auto_remove_channel + ) + + @commands.Cog.listener() + async def on_connect(self): + discord_utils.logger.info("Bot has been connected!") + + @commands.Cog.listener() + async def on_shard_connect(self, shard_id: int): + discord_utils.logger.info(f"Shard of id {shard_id} has been connected to discord gateway.") + + @commands.Cog.listener() + async def on_disconnect(self): + discord_utils.logger.info("Bot has been disconnected. Retrying to reconnect...") + + @commands.Cog.listener() + async def on_shard_disconnect(self, shard_id: int): + discord_utils.logger.info(f"Shard of id {shard_id} has been disconnected. Retrying to reconnect...") + + @app_commands.command() + async def git(self, inter: discord.Interaction): + """GitHub page""" + await send_message( + inter, "Want to help the bot? Go here: https://github.com/BBArikL/BDBot" + ) + + @app_commands.command() + async def invite(self, inter: discord.Interaction): + """Get a link to invite the bot""" + inv = discord_utils.get_url() + await send_message(inter, f"Share the bot! {inv}") + + @app_commands.command() + @app_commands.checks.has_permissions(manage_guild=True) # Only mods can add comics + async def add_all( + self, inter: discord.Interaction, date: Date = None, hour: int = None + ): + """Add all comics to a specific channel. Preferred way to add all comics. Mods only""" + status = discord_utils.add_all(inter, date, hour) + await send_message(inter, status) + + @app_commands.command() + @app_commands.checks.has_permissions( + manage_guild=True + ) # Only mods can delete the server from the database + async def remove_all(self, inter: discord.Interaction): + """Remove the guild from the database. Preferred way to remove all comics.Mods only""" + status = discord_utils.remove_guild(inter) + await send_message(inter, status) + + @app_commands.command() + @app_commands.checks.has_permissions( + manage_guild=True + ) # Only mods can delete the channel from the database + async def remove_channel(self, inter: discord.Interaction): + """Remove the channel from the database.Mods only""" + status = discord_utils.remove_channel(inter) + await send_message(inter, status) + + @app_commands.command() + @app_commands.checks.has_permissions(manage_guild=True) # Only mods can add a role + async def set_role(self, inter: discord.Interaction, role: discord.Role): + """Add a role to be notified. Mods only""" + if discord.Guild.get_role(inter.guild, role.id) is not None: + status = discord_utils.set_role(inter, role) + + await send_message(inter, status) + else: + await send_message(inter, "The role is invalid or not provided!") + + @app_commands.command() + @app_commands.checks.has_permissions( + manage_guild=True + ) # Only mods can delete the role + async def remove_role(self, inter: discord.Interaction): + """Deletes the role mention. Mods only""" + status = discord_utils.remove_role(inter) + await send_message(inter, status) + + @app_commands.command() + @app_commands.checks.has_permissions(manage_guild=True) + async def set_mention( + self, inter: discord.Interaction, choice: utils.MentionPolicy + ): + """Set the role mention policy. Mods only""" + status = discord_utils.set_mention(inter, choice == utils.MentionPolicy.Daily) + await send_message(inter, status) + + @app_commands.command() + @app_commands.checks.has_permissions(manage_guild=True) + async def get_mention(self, inter: discord.Interaction): + """Get the server's mention policy. Mods only""" + status, mention_policy = discord_utils.get_mention(inter, self.bot) + await send_message(inter, status) + + @app_commands.command() + @app_commands.checks.has_permissions(manage_guild=True) + async def post_mention( + self, inter: discord.Interaction, choice: utils.MentionChoice + ): + """Change the mention policy for the server. Mods only""" + status = discord_utils.set_post_mention( + inter, choice == utils.MentionChoice.Enable + ) + await send_message(inter, status) + + @app_commands.command() + async def vote(self, inter: discord.Interaction): + """Vote for the bot!""" + await send_message( + inter, + "Vote for the bot here: https://top.gg/bot/807780409362481163 and / or here : " + "https://discordbotlist.com/bots/bdbot", + ) + + @app_commands.command() + @app_commands.guilds(SERVER.id) + @app_commands.checks.check(is_owner) + async def nb_guild(self, inter: discord.Interaction): + """Gets the number of guilds that the bot is in (for analytics)""" + + await send_message( + inter, + f"The bot is in {len(self.bot.guilds)} servers. Trying to update status on Top.gg.....", + ) + + if self.topggpy is None: + self.topggpy = topgg.DBLClient(self.bot, str(os.getenv("TOP_GG_TOKEN"))) + + try: + await self.topggpy.post_guild_count() + await send_message( + inter, f"Posted server count ({self.topggpy.guild_count})" + ) + except Exception as e: + await send_message( + inter, "Failed to post server count\n{}: {}".format(type(e).__name__, e) + ) + + await send_message(inter, "Updating status...") + await discord_utils.update_presence(self.bot) + + @app_commands.command() + async def request(self, inter: discord.Interaction): + """Request something from the developer!""" + # Adds a request to the database + await inter.response.send_modal(BotRequest()) + + @app_commands.command() + async def delete_requests(self, inter: discord.Interaction): + """Delete requests sent through 'request'""" + author = f"{inter.user.name}#{inter.user.discriminator}" + output = [] + count = 0 + + with open(utils.REQUEST_FILE_PATH, "rt") as rq: + # Removes all lines matching with the username and discriminator + lines = rq.readlines() + + for line in lines: + if not re.match(f'.*".*"[^"]+{author}[^"]+', line): + # Tries to be sure that that request can't be used to delete another user's request + output.append(line) + else: + count += 1 + + if count > 0: + with open(utils.REQUEST_FILE_PATH, "wt") as rq: + rq.writelines("".join(output)) # Rewrites all lines + await send_message(inter, f"Deleted {count} request(s)!") + else: + await send_message(inter, "No requests to delete!") + + @app_commands.command() + @app_commands.checks.has_permissions(manage_guild=True) + async def sub(self, inter: discord.Interaction): + """Checks if the server is subbed to any comic""" + await inter.response.defer() + guild_data = discord_utils.get_specific_guild_data(inter) + max_fields = 5 + hr = "Hour" + if guild_data is not None: + comic_list = [] + comic_values: list[dict] = list(utils.strip_details.values()) + + for channel in guild_data["channels"]: + + if "latest" in guild_data["channels"][channel]: + for comic in guild_data["channels"][channel]["latest"]: + comic_list = discord_utils.add_comic_to_list( + comic_values, comic, self.bot, channel, comic_list + ) + + if "date" in guild_data["channels"][channel]: + for day in guild_data["channels"][channel]["date"]: + for hour in guild_data["channels"][channel]["date"][day]: + for comic in guild_data["channels"][channel]["date"][day][ + hour + ]: + comic_list = discord_utils.add_comic_to_list( + comic_values, + comic, + self.bot, + channel, + comic_list, + hour, + day, + ) + + if len(comic_list) > 0: + nb_fields = 0 + matching_date = utils.match_date + embeds = [discord.Embed(title="This server is subscribed to:")] + for comic in comic_list: + if nb_fields > max_fields: + nb_fields = 0 + embeds.append( + discord.Embed(title="This server is subscribed to:") + ) + + match_date = matching_date[comic["Date"]] + embeds[-1].add_field( + name=comic["Name"], + value=f"{'Each ' if match_date not in [Date.Latest, Date.Daily] else ''}{match_date.name}" + f"{f' at {comic[hr]} h UTC' if match_date not in [Date.Latest] else ''} in " + f"channel {comic['Channel']}", + ) + nb_fields += 1 + + await discord_utils.send_embed(inter, embeds, NextSend.Deferred) + else: + await send_message(inter, "This server is not subscribed to any comic!", next_send=NextSend.Deferred) + else: + await send_message(inter, "This server is not subscribed to any comic!", next_send=NextSend.Deferred) + + @app_commands.command() + async def ping(self, inter: discord.Interaction): + """Get the bot latency with discord API""" + await send_message(inter, "Pong! " + str(round(self.bot.latency * 1000)) + "ms") + + @app_commands.command() + async def uptime(self, inter: discord.Interaction): + """Get the bot uptime""" + delta = datetime.now(timezone.utc) - self.start_time + hours = math.floor(delta.seconds / 3600) + minutes = math.floor((delta.seconds - hours * 3600) / 60) + seconds = math.floor(delta.seconds - ((minutes * 60) + (hours * 3600))) + await send_message( + inter, + f"The bot has been up for {delta.days} days, {hours} hours, {minutes} minutes and {seconds} seconds.", + ) + + # @app_commands.command(hidden=True, server=discord_utils.SERVER) + # @app_commands.checks.check(is_owner) + # async def vrequest(self, inter: discord.Interaction): + # """Verifies the requests""" + # with open(utils.REQUEST_FILE_PATH, 'rt') as f: + # r = f.readlines() + # + # await send_message(inter, "Here are the requests:\n```\n" + "\n".join(r) + "\n```") + + @app_commands.command() + @app_commands.guilds(SERVER.id) + @app_commands.checks.check(is_owner) + async def nb_active(self, inter: discord.Interaction): + """Returns the number of servers using the hourly poster service""" + await send_message(inter, "There is " + str( + len(utils.load_json(utils.DATABASE_FILE_PATH))) + "servers using the hourly " + "poster service.") + + @app_commands.command() + @app_commands.guilds(SERVER.id) + @app_commands.checks.check(is_owner) + async def kill(self, inter: discord.Interaction): + """Close the bot connection""" + await send_message(inter, "Closing bot....") + await self.bot.close() + + @app_commands.command() + @app_commands.guilds(SERVER.id) + @app_commands.checks.check(is_owner) + async def reload(self, inter: discord.Interaction): + """ + Reload comics. + + :param inter: Discord message context. + """ + await send_message(inter, "Reloading comics....") + await self.bot.remove_cog("Comic") + utils.strip_details = utils.load_json(utils.DETAILS_PATH) + await self.bot.add_cog(Comic(self.bot)) + utils.GOCOMICS_EMBED = None + utils.KINGDOM_EMBED = None + utils.WEBTOONS_EMBED = None + await send_message(inter, "Reloaded comics!", next_send=NextSend.Followup) + + # ---- End of commands ----# + # ---- End of BDBot ----# + + +class BotRequest(ui.Modal, title="Request"): + """Request for the bot""" + + request = ui.TextInput(label="Request") + + async def on_submit(self, interaction: discord.Interaction) -> None: + utils.save_request( + self.request.value, interaction.user.name, interaction.user.discriminator + ) + await send_message(interaction, "Request saved! Thank you for using BDBot!", ephemeral=True) + + +async def setup(bot: discord.ext.commands.Bot): # Initialize the cog + await bot.add_cog(BDBot(bot)) diff --git a/bdbot/cogs/Comics.py b/bdbot/cogs/Comics.py new file mode 100644 index 0000000..98e2523 --- /dev/null +++ b/bdbot/cogs/Comics.py @@ -0,0 +1,178 @@ +import random +import re +from typing import Any, Callable, Union + +import discord +from discord import app_commands +from discord.ext import commands +from bdbot.discord_utils import get_possible_hours, parameters_interpreter, NextSend +from bdbot.utils import Action, Date, Month, get_strip_details, get_all_strips + + +def define_comic_callback(comic_strip_details: dict[str, Union[str, int]]): + async def date_comic_callback( + inter: discord.Interaction, + action: Action, + date: Date = None, + hour: int = None, + day: int = None, + month: Month = None, + year: int = None, + ): + # Interprets the parameters given by the user + func, params = parameters_interpreter( + inter, + comic_strip_details, + action=action, + date=date, + hour=hour, + day=day, + month=month, + year=year, + ) + await func(**params) + + async def number_comic_callback( + inter: discord.Interaction, + action: Action, + date: Date = None, + hour: int = None, + comic_number: int = None + ): + # Interprets the parameters given by the user + func, params = parameters_interpreter( + inter, + comic_strip_details, + action=action, + date=date, + hour=hour, + comic_number=comic_number + ) + + await func(**params) + + comic_callback_func: Callable + + if comic_strip_details["Working_type"] == "date" or \ + comic_strip_details["Main_website"] == "https://garfieldminusgarfield.net/": + comic_callback_func = date_comic_callback + else: + comic_callback_func = number_comic_callback + + return comic_callback_func + + +class Comic(commands.Cog): + """Class responsible for sending comics""" + + def __init__(self, bot: commands.Bot): + """Constructor of the cog + + :param bot: The discord Bot + """ + self.bot = bot + + comics_details = get_all_strips() + + for comic in comics_details: + comic_name: str = comics_details[comic]["Name"] + normalized_name = comic_name.lower().replace(" ", "_") + normalized_name = re.sub("[^\\w\\-_]", "", normalized_name) + comic_command = app_commands.Command( + name=normalized_name, + description=comic_name, + callback=define_comic_callback(comics_details[comic])) + # No built-in functions for adding autocomplete choices when creating callbacks in a factory way + comic_command._params.get("hour").choices = get_possible_hours() # noqa: See above + + self.bot.tree.add_command(comic_command) + + async def cog_unload(self) -> None: + comics_details = get_all_strips() + + for comic in comics_details: + comic_name: str = comics_details[comic]["Name"] + normalized_name = comic_name.lower().replace(" ", "_") + normalized_name = re.sub("[^\\w\\-_]", "", normalized_name) + self.bot.tree.remove_command(normalized_name) + + # --- Start of functions --- # + @app_commands.command() + @app_commands.choices(hour=get_possible_hours()) + async def random( + self, + inter: discord.Interaction, + action: Action, + date: Date = None, + hour: int = None, + day: int = None, + month: Month = None, + year: int = None, + comic_number: int = None, + ): + """Random comic""" + func, params = parameters_interpreter( + inter, + get_strip_details(random.choice(list(get_all_strips().keys()))), + action=action, + date=date, + hour=hour, + day=day, + month=month, + year=year, + comic_number=comic_number, + ) + await func(**params) + + @app_commands.command() + @commands.has_permissions(manage_guild=True) + @app_commands.choices(hour=get_possible_hours()) + async def all( + self, + inter: discord.Interaction, + action: Action, + date: Date = None, + hour: int = None, + day: int = None, + month: Month = None, + year: int = None, + comic_number: int = None, + ): + """All comics. Mods only""" + first = True + for com in get_all_strips(): + # Interprets the parameters given by the user + func: Callable + params: dict[str, Any] + func, params = parameters_interpreter( + inter, + get_strip_details(com), + action=action, + date=date, + hour=hour, + day=day, + month=month, + year=year, + comic_number=comic_number, + ) + + if first: + first = False + else: + params.update({"next_send": NextSend.Followup}) + + await func(**params) + + # Special comic commands + + # ---- END OF COMICS PARAMETERS ----# + # --- END of functions that communicate directly with discord ----# + # --- END of cog ----# + + +async def setup(bot: commands.Bot): + """Initialize the cog + + :param bot: The discord bot + """ + await bot.add_cog(Comic(bot)) diff --git a/bdbot/cogs/HelpCommands.py b/bdbot/cogs/HelpCommands.py new file mode 100644 index 0000000..18ac04e --- /dev/null +++ b/bdbot/cogs/HelpCommands.py @@ -0,0 +1,347 @@ +import discord +from discord import app_commands +from discord.ext import commands + +from bdbot import discord_utils, utils + + +class HelpCommands(commands.Cog): + """Class responsible for sending help embeds""" + help_group = app_commands.Group(name="help", description="Help commands") + + def __init__(self, bot: commands.Bot): + """Constructor of the cog + + Initialize all the properties of the cog""" + self.bot: commands.Bot = bot + + @help_group.command() + async def general(self, inter: discord.Interaction): + """HelpCommands for BDBot""" + embed: discord.Embed + if discord_utils.HELP_EMBED is None: + strips = utils.strip_details + embed = discord.Embed(title="BDBot!") + + embed.add_field( + name="Gocomics", + value="Use /help gocomics to get all comics that are supported on the Gocomics " + "website.", + ) + embed.add_field( + name="Comics Kingdom", + value="Use /help comicskingdom to get all comics that are supported on the Comics " + "Kingdom website.", + ) + embed.add_field( + name="Webtoons", + value="Use /help webtoons to get all comics that are supported on the Webtoons " + "website.", + ) + for strip in strips: + if ( + strips[strip]["Main_website"] != "https://www.gocomics.com/" + and strips[strip]["Main_website"] + != "https://comicskingdom.com/" + and strips[strip]["Main_website"] + != "https://www.webtoons.com/en/" + ): + embed.add_field( + name=strips[strip]["Name"], value=strips[strip]["Helptxt"] + ) + embed.add_field( + name="Hourly comics commands.", + value="Use /help hourly to see available commands for daily comics. " + "Post daily at 6:00 AM UTC.", + ) + + embed.add_field( + name="Request", + value="Have a request for the bot? Post your request at " + "https://github.com/BBArikL/BDBot/issues/new?assignees=&labels=enhancement" + "&template=comic-request.md&title=New+Comic+request for maximum visibility or " + "use `/request` to leave a message to the developer!", + ) + embed.add_field( + name="Status", + value="Gives back the status of the bot.\nCommand:\n`/status`", + ) + embed.add_field( + name="Ping", + value="Pong! Gives back the bot latency.\nCommand:\n`/ping`", + ) + embed.add_field( + name="Uptime", value="Gives back the uptime.\nCommand:\n`/up`" + ) + embed.add_field( + name="FAQ", + value="Have any question on the bot? This FAQ (`/faq`) might have the " + "response you need!", + ) + embed.add_field( + name="New commands", + value="See the the newly added commands by using `/new`", + ) + embed.add_field( + name="Git", + value="Gives the link of the git page.\nCommand:\n`/git`", + ) + embed.add_field( + name="Invite", + value="Gives a link to add the bot to your servers!\nCommand:\n`/invite`", + ) + embed.add_field( + name="Vote", value="Vote for the bot on Top.gg!\nCommand:\n`/vote`" + ) + + # Saves the embed for later use + utils.HELP_EMBED = embed + else: + embed = discord_utils.HELP_EMBED # Get the cached value + + await discord_utils.send_embed(inter, [embed]) + + @help_group.command() + async def hourly(self, inter: discord.Interaction): + """HelpCommands for hourly commands""" + embed: discord.Embed + + if discord_utils.HOURLY_EMBED is None: + + embed = discord.Embed( + title="Daily commands!", + description="Date and hour are optional arguments that can specify when the the " + "bot should send the comic. A date should be one of the seven days " + "of the week and the hour a number representing the time in a 24h " + "clock in UTC time. If not specified, defaults to the current time " + "in UTC.", + ) + embed.add_field( + name="Post", + value="The bot did not post the comics or you want to be sure that all comics " + "are correctly set up? Use `/post ` to force the post of " + "comics set at the specified time.", + ) + embed.add_field( + name="Add", + value="Use `/ add ` to add the comic to the daily list of " + "the channel.", + ) + embed.add_field( + name="Add all", + value="Use `/add_all ` to add all the comics to a specific day of the " + "week and hour.", + ) + embed.add_field( + name="Remove", + value="Use `/ remove ` to remove the comic" + " from the daily list.", + ) + embed.add_field( + name="Remove channel", + value="Use `/remove_channel` to unsubscribe your channel from all the comics", + ) + embed.add_field( + name="Remove all", + value="Use `/remove_all` to unsubscribe your server from all the comics" + " in all the channels.", + ) + embed.add_field( + name="Subscriptions", + value="Use `/sub` to view all subscribed comics for this server.", + ) + embed.add_field( + name="Set role mention", + value="Use `/set_role @` to add a role to mention for comics posts. To remove, " + "use `/remove_role`.", + ) + embed.add_field( + name="Mange role mention", + value="Use `/set_mention daily/all` to change the mention policy for the bot in the " + "server. This does not affect daily comics posted at 6h AM UTC. If the mention " + "policy is set to 'all', the bot will mention the role at each comic post, " + "otherwise it will only mention the role at 6h AM UTC daily.", + ) + embed.add_field( + name="Enable/Disable mention", + value="Use `/post_mention` to enable/disable server-wide the mention right before the" + " automatic comic posts.", + ) + embed.add_field( + name="Get mention policy", + value="Use `/get_mention` to get the server's mention policy.", + ) + + utils.HOURLY_EMBED = embed + else: + embed = discord_utils.HOURLY_EMBED + + await discord_utils.send_embed(inter, [embed]) + + @help_group.command() + async def gocomics(self, inter: discord.Interaction): + """Gocomics help""" + website_name = "Gocomics" + website = "https://www.gocomics.com/" + embeds: list[discord.Embed] + + if discord_utils.GOCOMICS_EMBED is None: + embeds = discord_utils.website_specific_embed(website_name, website) + utils.GOCOMICS_EMBED = embeds + else: + embeds = discord_utils.GOCOMICS_EMBED + + await discord_utils.send_embed(inter, embeds) + + @help_group.command() + async def comicskingdom(self, inter: discord.Interaction): + """Comics Kingdom help""" + website_name = "Comics Kingdom" + website = "https://comicskingdom.com/" + embeds: list[discord.Embed] + + if discord_utils.KINGDOM_EMBED is None: + embeds = discord_utils.website_specific_embed(website_name, website) + utils.KINGDOM_EMBED = embeds + else: + embeds = discord_utils.KINGDOM_EMBED + + await discord_utils.send_embed(inter, embeds) + + @help_group.command() + async def webtoons(self, inter: discord.Interaction): + """Webtoons help""" + website_name = "Webtoons" + website = "https://www.webtoons.com/en/" + embeds: list[discord.Embed] + + if discord_utils.WEBTOONS_EMBED is None: + embeds = discord_utils.website_specific_embed(website_name, website) + utils.WEBTOONS_EMBED = embeds + else: + embeds = discord_utils.WEBTOONS_EMBED + + await discord_utils.send_embed(inter, embeds) + + @app_commands.command() + async def new(self, inter: discord.Interaction): + """New features of the bot""" + embed: discord.Embed + + if discord_utils.NEW_EMBED is None: + embed = discord.Embed( + title="New features", + description="Find out what new features have been implemented " + "since the last update!", + ) + embed.add_field( + name="Thanks", + value="First, I want to take a moment to thank all of you who use BDBot to view your " + "favorite comics! It recently got approved by Discord and also has exceeded the 100 " + "server limit which is phenomenal! Thank you again for your trust into this " + "project! :)", + ) + embed.add_field( + name="New comics", + value="The new comics are: Chibird, War and Peas, Humans are stupid," + " Maximumble, Poorly Drawn Lines, Heathcliff, Andy Capp", + ) + embed.add_field( + name="Latest comics", + value="You want to have only the latest comics? Put `latest` in the date parameter of any" + " comic when adding it to the subscription list and the bot will only give you back" + " the latest comics when they are available!", + ) + embed.add_field( + name="Post", + value="Missed your comics or just want to test that the bot can properly send all comics" + " for a given time? Use `/post