added code cog

parent 4151da74
# MIT License
#
# Copyright (c) 2018 AnonymousDapper
#
# Permission is hereby granted
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# MIT License
#
# Copyright (c) 2018 AnonymousDapper
#
# Permission is hereby granted
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import inspect
import sqlalchemy
import subprocess
from functools import partial
from discord.ext import commands
from .utils import checks
from .utils.logger import get_logger
logger = get_logger()
class Debug:
def __init__(self, bot):
self.bot = bot
self.NL = "\n" # for embedding in f-strings
# Strip formatting from codeblocks
def clean(self, code):
if code.startswith("```") and code.endswith("```"):
return "\n".join(code.split("\n")[1:-1])
return code.strip("` \n")
# Utility function for length checking and uploading if required
async def check_length(self, content):
if len(content) > 2000:
url = await self.bot.paste_text(content)
return f"\N{WARNING SIGN} Output too long. View result at <{url}>"
else:
return content
# Run code in eval mode
@commands.command(name="debug", brief="eval mode")
@checks.is_owner()
async def run_debug(self, ctx, *, code:str):
source = self.clean(code)
scope = globals()
scope.update(dict(
bot=self,
message=ctx.message,
guild=ctx.guild,
channel=ctx.channel,
author=ctx.author,
ctx=ctx,
__code=source
))
try:
compiled = compile(source, "<eval>", "eval")
scope["__compiled"] = compiled
result = eval(compiled, scope)
except SyntaxError as e:
await ctx.send(f"```py\n{e.text}\n{'^':>{e.offset}}\n{type(e).__name__}: {e}\n```")
except Exception as e:
await ctx.send(f"```md\n- {type(e).__name__}: {e}\n```")
else:
if inspect.isawaitable(result):
result = await result
await ctx.send(await self.check_length(f"```py\n{str(result)}\n```"))
# Run code in exec mode
@commands.command(name="run", brief="exec mode")
@checks.is_owner()
async def run_exec(self, ctx, *, code:str):
source = "async def __coro():\n " + "\n ".join(self.clean(code).split("\n"))
scope = globals()
scope.update(dict(
bot=self,
message=ctx.message,
guild=ctx.guild,
channel=ctx.channel,
author=ctx.author,
ctx=ctx,
__code=source
))
with self.bot.stdout_wrapper() as stdout, self.bot.stderr_wrapper() as stderr:
try:
compiled = compile(source, "<exec>", "exec")
scope["__compiled"] = compiled
exec(compiled, scope)
await scope["__coro"]()
except SyntaxError as e:
await ctx.send(f"```py\n{e.text}\n{'^':>{e.offset}}\n{type(e).__name__}: {e}\n```")
return
except Exception as e:
await ctx.send(f"```md\n- {type(e).__name__}: {e}\n```")
return
result_out = str(stdout.getvalue())
result_err = str(stderr.getvalue())
result = f"```py\n{result_out}\n{('########## Errors ##########' + self.NL + result_err) if result_err != '' else ''}\n```"
await ctx.send(await self.check_length(result))
# Extension setup
def setup(bot):
bot.add_cog(Debug(bot))
\ No newline at end of file
......@@ -20,10 +20,16 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ["is_owner"]
import discord.utils
from discord.ext import commands
from .logger import get_logger
logger = get_logger()
# Predicate for owner id check
def is_owner_check(ctx):
return ctx.message.author.id in ctx.bot.config["General"]["owners"]
......
......@@ -20,6 +20,12 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ["paint", "back", "attr"]
from .logger import get_logger
logger = get_logger()
class _Text:
"""
ANSI text color codes for easy formatting
......
# MIT License
#
# Copyright (c) 2018 AnonymousDapper
#
# Permission is hereby granted
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ["ImgurApiError", "PostRequestLimitError", "ApiCreditLimitError", "ImgurAPI"]
import aiohttp
import asyncio
from .logger import get_logger
logger = get_logger()
API_BASE = "https://api.imgur.com/3"
class ImgurApiError(Exception):
pass
class PostRequestLimitError(ImgurApiError):
pass
class ApiCreditLimitError(ImgurApiError):
pass
class ImgurAPI:
def __init__(self, **kwargs):
self.client_id = kwargs.get("client_id", "")
self.auth_headers = {
"Authorization": f"Client-ID {self.client_id}"
}
self.loop = asyncio.get_event_loop()
self.total_credits = 0
self.remain_credits = 0
self.post_limit = 0
self.post_remain = 0
self.loop.create_task(self.client_init())
self.loop.create_task(self._check_credits())
# Start up client session in a coroutine
async def client_init(self):
self.client_session = aiohttp.ClientSession()
# Fetch ratelimit info from headers
def _save_credits(self, response):
self.total_credits = response.headers.get("X-RateLimit-ClientLimit")
self.remain_credits = response.headers.get("X-RateLimit-ClientRemaining")
self.post_limit = response.headers.get("X-Post-Rate-Limit-Limit")
self.post_remain = response.headers.get("X-Post-Rate-Limit-Remaining")
# Base API calling function (POST)
async def _post(self, endpoint, data):
async with self.client_session.post(f"{API_BASE}/{endpoint}", headers=self.auth_headers, data=data) as response:
return response
# Base API calling function (GET)
async def _get(self, endpoint):
async with self.client_session.get(f"{API_BASE}/{endpoint}", headers=self.auth_headers) as response:
return response
# Check API credit tally
async def _check_credits(self):
credit_response = await self._get("credits")
self._save_credits(credit_response)
# Actual API functions
# Upload an image (image: BytesIO)
async def upload_image(self, image):
if self.post_remain == 0:
raise PostRequestLimitError(f"POST request quota reached (used {self.post_limit} requests)")
elif self.remain_credits < 10:
raise ApiCreditLimitError(f"Not enough API credits remain for this (have {self.remain_credits}, need 10)")
image.seek(0)
response = await self._post("image", image)
self._save_credits(response)
if response.status == 200:
data = await response.json()
return data["data"]["link"]
else:
raise ImgurApiError(f"Unknown Error: {response.status}")
......@@ -20,6 +20,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ["Tag", "Permission", "User", "Blacklist", "Whitelist", "TagVariable", "Message", "Command", "Prefix", "SQL"]
from contextlib import contextmanager
import traceback
......
......@@ -20,6 +20,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__all__ = ["get_elapsed_time", "get_ping_time"]
# Get elapsed time in 'Xy, Xm, Xw, Xd, Xh, Xm, Xs' format
def get_elapsed_time(date1, date2):
delta = abs(date2 - date1)
......
......@@ -41,6 +41,7 @@ from cogs.utils import permissions
from cogs.utils import sql
from cogs.utils import time, checks
from cogs.utils.colors import paint, back, attr
from cogs.utils.imgur import ImgurAPI
from discord.ext import commands
......@@ -87,10 +88,10 @@ class Builtin:
self.bot.load_extension(cog_name)
except Exception as e:
await ctx.send(f"Failed to load `{name}`: [{type(e).__name__}]: {e}")
await ctx.send(f"Failed to load {name}: [{type(e).__name__}]: `{e}`")
return
finally:
else:
await self.bot.post_reaction(ctx.message, success=True)
elif action == "unload":
......@@ -102,10 +103,10 @@ class Builtin:
self.bot.unload_extension(cog_name)
except Exception as e:
await ctx.send(f"Failed to unload `{name}`: [{type(e).__name__}]: {e}")
await ctx.send(f"Failed to unload {name}: [{type(e).__name__}]: `{e}`")
return
finally:
else:
await self.bot.post_reaction(ctx.message, success=True)
elif action == "reload":
......@@ -120,10 +121,10 @@ class Builtin:
except Exception as e:
await ctx.send(f"Failed to reload `{name}`: [{type(e).__name__}]: {e}")
await ctx.send(f"Failed to reload {name}: [{type(e).__name__}]: `{e}`")
return
finally:
else:
await self.bot.post_reaction(ctx.message, success=True)
@manage_cogs.command(name="list", brief="list loaded cogs")
......@@ -146,22 +147,31 @@ class Builtin:
await ctx.send(f"```md\n# Snake Bot Info\n* Discord.py version {discord.version_info.major}.{discord.version_info.minor}.{discord.version_info.micro}\n* Python version {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}\n* Latest Commit {result.stdout}\n```")
class SnakeBot(commands.Bot):
# stdout wrapper to capture output
# stderr wrapper
@contextmanager
def output_wrapper(self):
old_out = sys.stdout
def stderr_wrapper(self):
old_err = sys.stderr
new_out = StringIO()
new_err = StringIO()
sys.stderr = new_err
yield new_err
sys.stderr = old_err
# stdout wrapper
@contextmanager
def stdout_wrapper(self):
old_out = sys.stdout
new_out = StringIO()
sys.stdout = new_out
yield new_out, new_err
yield new_out
sys.stdout = old_out
sys.stderr = old_err
# Quick method to read toml configs
def _read_config(self, filename):
......@@ -185,13 +195,15 @@ class SnakeBot(commands.Bot):
credentials = self._read_config("credentials.toml")
self.token = credentials["Discord"]["token"]
self.imgur_api = ImgurAPI(client_id=credentials["Imgur"]["client_id"])
# Init superclass
super().__init__(
*args,
**kwargs,
description="\nHsss!\n",
help_attrs=dict(hidden=True),
command_not_found="\N{WARNING SIGN} Command `{}` doesn't exist!",
command_not_found=None,
command_has_no_subcommand="\N{WARNING SIGN} Command `{1}` doesn't exist in the `{0.name}` group!",
command_prefix=self.get_prefix
)
......@@ -255,7 +267,7 @@ class SnakeBot(commands.Bot):
# Socket data
async def log_socket_data(self, data):
if "t" in data:
t_type = date.get("t")
t_type = data.get("t")
if t_type is not None:
if t_type in self.socket_log:
......@@ -352,6 +364,15 @@ class SnakeBot(commands.Bot):
if not kwargs.get("quiet"):
await message.channel.send(reaction_emoji)
# Upload long text to personal hastebin
async def paste_text(self, content):
async with self.aio_session.post("http://thinking-rock.a-sketchy.site:8000/documents", data=content, headers={"Content-Type": "application/json"}) as response:
if response.status != 200:
return f"Could not upload: ({response.status})"
data = await response.json()
return f"http://thinking-rock.a-sketchy.site:8000/{data['key']}"
# Discord events
# Bot is ready
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment