Last active
November 4, 2018 23:30
-
-
Save polyjitter/8a729058b8dd9d4adaab16aebebb8d27 to your computer and use it in GitHub Desktop.
A Python shell contained in an embed for Discord.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Based off repl from RDanny by Danny. | |
# Will fail if the bot does not have embed permissions. | |
# Embed designed and implemented by taciturasa. | |
# Released under the MIT License. | |
# | |
# The MIT License (MIT) | |
# | |
# Copyright (c) 2015 Rapptz | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a | |
# copy of this software and associated documentation files (the "Software"), | |
# to deal in the Software without restriction, including without limitation | |
# the rights to use, copy, modify, merge, publish, distribute, sublicense, | |
# and/or sell copies of the Software, and to permit persons to whom the | |
# Software is furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER | |
# DEALINGS IN THE SOFTWARE. | |
import discord | |
from discord.ext import commands | |
import collections | |
import aiohttp | |
import inspect | |
import traceback | |
from contextlib import redirect_stdout | |
import io | |
class EmbedShell(): | |
def __init__(self, bot): | |
self.bot = bot | |
self.repl_sessions = {} | |
self.repl_embeds = {} | |
self.aioclient = aiohttp.ClientSession() | |
def cleanup_code(self, content): | |
'''Automatically removes code blocks from the code.''' | |
# remove ```py\n``` | |
if content.startswith('```') and content.endswith('```'): | |
return '\n'.join(content.split('\n')[1:-1]) | |
# remove `foo` | |
return content.strip('` \n') | |
def get_syntax_error(self, err): | |
'''Returns SyntaxError formatted for repl reply.''' | |
return '```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```'.format( | |
err, | |
'^', | |
type(err).__name__) | |
async def post_to_hastebin(self, string): | |
'''Posts a string to hastebin.''' | |
url = "https://hastebin.com/documents" | |
data = string.encode('utf-8') | |
async with self.aioclient.post(url=url, data=data) as haste_response: | |
print(haste_response) | |
haste_key = (await haste_response.json())['key'] | |
haste_url = f"http://hastebin.com/{haste_key}" | |
return haste_url | |
@commands.group(name='shell', | |
aliases=['ipython', 'repl', | |
'longexec', 'core', 'overkill'], | |
pass_context=True, | |
invoke_without_command=True) | |
async def repl(self, ctx, *, name: str=None): | |
'''Head on impact with an interactive python shell.''' | |
# TODO Minimize local variables | |
# TODO Minimize branches | |
session = ctx.message.channel.id | |
embed = discord.Embed( | |
description="_Enter code to execute or evaluate. " | |
"`exit()` or `quit` to exit._") | |
embed.set_author( | |
name="Interactive Python Shell", | |
icon_url="https://upload.wikimedia.org/wikipedia/commons/thumb" | |
"/c/c3/Python-logo-notext.svg/1024px-Python-logo-notext.svg.png") | |
embed.set_footer(text="Based on RDanny's repl command by Danny.") | |
if name is not None: | |
embed.title = name.strip(" ") | |
history = collections.OrderedDict() | |
variables = { | |
'ctx': ctx, | |
'bot': self.bot, | |
'message': ctx.message, | |
'server': ctx.message.server, | |
'channel': ctx.message.channel, | |
'author': ctx.message.author, | |
'discord': discord, | |
'twitter': self.twitter, | |
'_': None | |
} | |
if session in self.repl_sessions: | |
error_embed = discord.Embed( | |
color=15746887, | |
description="**Error**: " | |
"_Shell is already running in channel._") | |
await self.bot.say(embed=error_embed) | |
return | |
shell = await self.bot.say(embed=embed) | |
self.repl_sessions[session] = shell | |
self.repl_embeds[shell] = embed | |
while True: | |
response = await self.bot.wait_for_message( | |
author=ctx.message.author, | |
channel=ctx.message.channel, | |
check=lambda m: m.content.startswith('`')) | |
cleaned = self.cleanup_code(response.content) | |
shell = self.repl_sessions[session] | |
# Regular Bot Method | |
# try: | |
# await self.bot.get_message( | |
# ctx.message.channel, | |
# self.repl_sessions[session].id) | |
# except discord.NotFound: | |
# new_shell = await self.bot.say(embed=self.repl_embeds[shell]) | |
# self.repl_sessions[session] = new_shell | |
# | |
# embed = self.repl_embeds[shell] | |
# del self.repl_embeds[shell] | |
# self.repl_embeds[new_shell] = embed | |
# | |
# shell = self.repl_sessions[session] | |
shell_check = discord.utils.get( | |
self.bot.messages, | |
id=self.repl_sessions[session].id) | |
# Self Bot Method | |
if shell_check is None: | |
new_shell = await self.bot.say(embed=self.repl_embeds[shell]) | |
self.repl_sessions[session] = new_shell | |
embed = self.repl_embeds[shell] | |
del self.repl_embeds[shell] | |
self.repl_embeds[new_shell] = embed | |
shell = self.repl_sessions[session] | |
del shell_check | |
try: | |
await self.bot.delete_message(response) | |
except discord.Forbidden: | |
pass | |
if len(self.repl_embeds[shell].fields) >= 7: | |
self.repl_embeds[shell].remove_field(0) | |
if cleaned in ('quit', 'exit', 'exit()'): | |
self.repl_embeds[shell].color = 16426522 | |
if self.repl_embeds[shell].title is not discord.Embed.Empty: | |
history_string = "History for {}\n\n\n".format( | |
self.repl_embeds[shell].title) | |
else: | |
history_string = "History for latest session\n\n\n" | |
for item in history.keys(): | |
history_string += ">>> {}\n{}\n\n".format( | |
item, | |
history[item]) | |
haste_url = await self.post_to_hastebin(history_string) | |
return_msg = "[`Leaving shell session. "\ | |
"History hosted on hastebin.`]({})".format( | |
haste_url) | |
self.repl_embeds[shell].add_field( | |
name="`>>> {}`".format(cleaned), | |
value=return_msg, | |
inline=False) | |
await self.bot.edit_message( | |
self.repl_sessions[session], | |
embed=self.repl_embeds[shell]) | |
del self.repl_embeds[shell] | |
del self.repl_sessions[session] | |
return | |
executor = exec | |
if cleaned.count('\n') == 0: | |
# single statement, potentially 'eval' | |
try: | |
code = compile(cleaned, '<repl session>', 'eval') | |
except SyntaxError: | |
pass | |
else: | |
executor = eval | |
if executor is exec: | |
try: | |
code = compile(cleaned, '<repl session>', 'exec') | |
except SyntaxError as err: | |
self.repl_embeds[shell].color = 15746887 | |
return_msg = self.get_syntax_error(err) | |
history[cleaned] = return_msg | |
if len(cleaned) > 800: | |
cleaned = "<Too big to be printed>" | |
if len(return_msg) > 800: | |
haste_url = await self.post_to_hastebin(return_msg) | |
return_msg = "[`SyntaxError too big to be printed. "\ | |
"Hosted on hastebin.`]({})".format( | |
haste_url) | |
self.repl_embeds[shell].add_field( | |
name="`>>> {}`".format(cleaned), | |
value=return_msg, | |
inline=False) | |
await self.bot.edit_message( | |
self.repl_sessions[session], | |
embed=self.repl_embeds[shell]) | |
continue | |
variables['message'] = response | |
fmt = None | |
stdout = io.StringIO() | |
try: | |
with redirect_stdout(stdout): | |
result = executor(code, variables) | |
if inspect.isawaitable(result): | |
result = await result | |
except Exception as err: | |
self.repl_embeds[shell].color = 15746887 | |
value = stdout.getvalue() | |
fmt = '```py\n{}{}\n```'.format( | |
value, | |
traceback.format_exc()) | |
else: | |
self.repl_embeds[shell].color = 4437377 | |
value = stdout.getvalue() | |
if result is not None: | |
fmt = '```py\n{}{}\n```'.format( | |
value, | |
result) | |
variables['_'] = result | |
elif value: | |
fmt = '```py\n{}\n```'.format(value) | |
history[cleaned] = fmt | |
if len(cleaned) > 800: | |
cleaned = "<Too big to be printed>" | |
try: | |
if fmt is not None: | |
if len(fmt) >= 800: | |
haste_url = await self.post_to_hastebin(fmt) | |
self.repl_embeds[shell].add_field( | |
name="`>>> {}`".format(cleaned), | |
value="[`Content too big to be printed. " | |
"Hosted on hastebin.`]({})".format( | |
haste_url), | |
inline=False) | |
await self.bot.edit_message( | |
self.repl_sessions[session], | |
embed=self.repl_embeds[shell]) | |
else: | |
self.repl_embeds[shell].add_field( | |
name="`>>> {}`".format(cleaned), | |
value=fmt, | |
inline=False) | |
await self.bot.edit_message( | |
self.repl_sessions[session], | |
embed=self.repl_embeds[shell]) | |
else: | |
self.repl_embeds[shell].add_field( | |
name="`>>> {}`".format(cleaned), | |
value="`Empty response, assumed successful.`", | |
inline=False) | |
await self.bot.edit_message( | |
self.repl_sessions[session], | |
embed=self.repl_embeds[shell]) | |
except discord.Forbidden: | |
pass | |
except discord.HTTPException as err: | |
error_embed = discord.Embed( | |
color=15746887, | |
description='**Error**: _{}_'.format(err)) | |
await self.bot.say(embed=error_embed) | |
@repl.command(name='jump', | |
aliases=['hop', 'pull', 'recenter', 'whereditgo'], | |
pass_context=True) | |
async def _repljump(self, ctx): | |
'''Brings the shell back down so you can see it again.''' | |
session = ctx.message.channel.id | |
if session not in self.repl_sessions: | |
error_embed = discord.Embed( | |
color=15746887, | |
description="**Error**: _No shell running in channel._") | |
await self.bot.say(embed=error_embed) | |
return | |
shell = self.repl_sessions[session] | |
embed = self.repl_embeds[shell] | |
await self.bot.delete_message(ctx.message) | |
try: | |
await self.bot.delete_message(shell) | |
except discord.errors.NotFound: | |
pass | |
new_shell = await self.bot.say(embed=embed) | |
self.repl_sessions[session] = new_shell | |
del self.repl_embeds[shell] | |
self.repl_embeds[new_shell] = embed | |
@repl.command(name='clear', | |
aliases=['clean', 'purge', 'cleanup', | |
'ohfuckme', 'deletthis'], | |
pass_context=True) | |
async def _replclear(self, ctx): | |
'''Clears the fields of the shell and resets the color.''' | |
session = ctx.message.channel.id | |
if session not in self.repl_sessions: | |
error_embed = discord.Embed( | |
color=15746887, | |
description="**Error**: _No shell running in channel._") | |
await self.bot.say(embed=error_embed) | |
return | |
shell = self.repl_sessions[session] | |
self.repl_embeds[shell].color = discord.Color.default() | |
self.repl_embeds[shell].clear_fields() | |
await self.bot.delete_message(ctx.message) | |
await self.bot.edit_message( | |
shell, | |
embed=self.repl_embeds[shell]) | |
def setup(bot): | |
bot.add_cog(EmbedShell(bot)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
print("Starting up...\n") | |
import discord | |
from discord.ext import commands | |
print("imports finalized.\n") | |
bot = commands.Bot(command_prefix='embed!', | |
description='Wrapped up EmbedShell', | |
self_bot=True) | |
print("bot created.\n") | |
startup_extensions=['embedshell'] | |
print("globals created.\n") | |
bot.get_command("help").hidden = True | |
@bot.event | |
async def on_ready(): | |
print('Logged in as') | |
print(bot.user.name) | |
print(bot.user.id) | |
print('------') | |
for extension in startup_extensions: | |
try: | |
bot.load_extension(extension) | |
except Exception as e: | |
exc = '{}: {}'.format(type(e).__name__, e) | |
print('Failed to load extension {}\n{}'.format(extension, exc)) | |
print("Logging in...") | |
bot.run('**USERTOKEN**', | |
bot=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
OwO whats dis