Created
August 10, 2012 16:34
-
-
Save infogulch/3315405 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) Mathias Kaerlev 2011-2012. | |
# This file is part of pyspades. | |
# pyspades is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# pyspades is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# You should have received a copy of the GNU General Public License | |
# along with pyspades. If not, see <http://www.gnu.org/licenses/>. | |
import math | |
from random import choice | |
from pyspades.constants import * | |
from pyspades.common import prettify_timespan | |
from pyspades.server import parse_command | |
from twisted.internet import reactor | |
from map import check_rotation | |
import inspect | |
commands = {} | |
aliases = {} | |
rights = {} | |
class InvalidPlayer(Exception): | |
pass | |
class InvalidSpectator(InvalidPlayer): | |
pass | |
class InvalidTeam(Exception): | |
pass | |
def add_rights(func_name, *user_types): | |
for user_type in user_types: | |
if user_type in rights: | |
rights[user_type].add(func_name) | |
else: | |
rights[user_type] = set([func_name]) | |
def restrict(func, *user_types): | |
def new_func(connection, *arg, **kw): | |
return func(connection, *arg, **kw) | |
new_func.func_name = func.func_name | |
new_func.user_types = user_types | |
return new_func | |
def has_rights(f, connection): | |
return not hasattr(f, 'user_types') or f.func_name in connection.rights | |
def admin(func): | |
return restrict(func, 'admin') | |
def name(name): | |
def dec(func): | |
func.func_name = name | |
return func | |
return dec | |
def alias(name): | |
def dec(func): | |
try: | |
func.aliases.append(name) | |
except AttributeError: | |
func.aliases = [name] | |
return func | |
return dec | |
def get_player(protocol, value, spectators = True): | |
ret = None | |
try: | |
if value.startswith('#'): | |
value = int(value[1:]) | |
ret = protocol.players[value] | |
else: | |
players = protocol.players | |
try: | |
ret = players[value] | |
except KeyError: | |
value = value.lower() | |
for player in players.values(): | |
name = player.name.lower() | |
if name == value: | |
return player | |
if name.count(value): | |
ret = player | |
except (KeyError, IndexError, ValueError): | |
pass | |
if ret is None: | |
raise InvalidPlayer() | |
elif not spectators and ret.world_object is None: | |
raise InvalidSpectator() | |
return ret | |
def get_team(connection, value): | |
value = value.lower() | |
if value == 'blue': | |
return connection.protocol.blue_team | |
elif value == 'green': | |
return connection.protocol.green_team | |
elif value == 'spectator': | |
return connection.protocol.spectator_team | |
raise InvalidTeam() | |
def join_arguments(arg, default = None): | |
if not arg: | |
return default | |
return ' '.join(arg) | |
def parse_maps(pre_maps): | |
maps = [] | |
for n in pre_maps: | |
if n[0]=="#" and len(maps)>0: | |
maps[-1] += " "+n | |
else: | |
maps.append(n) | |
return maps, ', '.join(maps) | |
@admin | |
def kick(connection, value, *arg): | |
reason = join_arguments(arg) | |
player = get_player(connection.protocol, value) | |
player.kick(reason) | |
def get_ban_arguments(connection, arg): | |
duration = None | |
if len(arg): | |
try: | |
duration = int(arg[0]) | |
arg = arg[1:] | |
except (IndexError, ValueError): | |
pass | |
if duration is None: | |
if len(arg)>0 and arg[0] == "perma": | |
arg = arg[1:] | |
else: | |
duration = connection.protocol.default_ban_time | |
reason = join_arguments(arg) | |
return duration, reason | |
@admin | |
def ban(connection, value, *arg): | |
duration, reason = get_ban_arguments(connection, arg) | |
player = get_player(connection.protocol, value) | |
player.ban(reason, duration) | |
@admin | |
def banip(connection, ip, *arg): | |
duration, reason = get_ban_arguments(connection, arg) | |
try: | |
connection.protocol.add_ban(ip, reason, duration) | |
except ValueError: | |
return 'Invalid IP address/network' | |
reason = ': ' + reason if reason is not None else '' | |
duration = duration or None | |
if duration is None: | |
return 'IP/network %s permabanned%s' % (ip, reason) | |
else: | |
return 'IP/network %s banned for %s%s' % (ip, | |
prettify_timespan(duration * 60), reason) | |
@admin | |
def unban(connection, ip): | |
try: | |
connection.protocol.remove_ban(ip) | |
return 'IP unbanned' | |
except KeyError: | |
return 'IP not found in ban list' | |
@name('undoban') | |
@admin | |
def undo_ban(connection, *arg): | |
if len(connection.protocol.bans)>0: | |
result = connection.protocol.undo_last_ban() | |
return ('Ban for %s undone' % result[0]) | |
else: | |
return 'No bans to undo!' | |
@admin | |
def say(connection, *arg): | |
value = ' '.join(arg) | |
connection.protocol.send_chat(value) | |
connection.protocol.irc_say(value) | |
add_rights('kill', 'admin') | |
def kill(connection, value = None): | |
if value is None: | |
player = connection | |
else: | |
if not connection.rights.kill: | |
return "You can't use this command" | |
player = get_player(connection.protocol, value, False) | |
player.kill() | |
if connection is not player: | |
message = '%s killed %s' % (connection.name, player.name) | |
connection.protocol.send_chat(message, irc = True) | |
@admin | |
def heal(connection, player = None): | |
if player is not None: | |
player = get_player(connection.protocol, player, False) | |
message = '%s was healed by %s' % (player.name, connection.name) | |
else: | |
if connection not in connection.protocol.players: | |
raise ValueError() | |
player = connection | |
message = '%s was healed' % (connection.name) | |
player.refill() | |
connection.protocol.send_chat(message, irc = True) | |
def rules(connection): | |
if connection not in connection.protocol.players: | |
raise KeyError() | |
lines = connection.protocol.rules | |
if lines is None: | |
return | |
connection.send_lines(lines) | |
def help(connection): | |
""" | |
This help | |
""" | |
if connection.protocol.help is not None and not connection.admin: | |
connection.send_lines(connection.protocol.help) | |
else: | |
names = [command.func_name for command in command_list | |
if command.func_name in connection.rights] | |
return 'Available commands: %s' % (', '.join(names)) | |
def login(connection, password): | |
""" | |
Login as a user type | |
""" | |
if connection not in connection.protocol.players: | |
raise KeyError() | |
for user_type, passwords in connection.protocol.passwords.iteritems(): | |
if password in passwords: | |
if user_type in connection.user_types: | |
return "You're already logged in as %s" % user_type | |
return connection.on_user_login(user_type, True) | |
if connection.login_retries is None: | |
connection.login_retries = connection.protocol.login_retries - 1 | |
else: | |
connection.login_retries -= 1 | |
if not connection.login_retries: | |
connection.kick('Ran out of login attempts') | |
return | |
return 'Invalid password - you have %s tries left' % ( | |
connection.login_retries) | |
def pm(connection, value, *arg): | |
player = get_player(connection.protocol, value) | |
message = join_arguments(arg) | |
player.send_chat('PM from %s: %s' % (connection.name, message)) | |
return 'PM sent to %s' % player.name | |
@name('admin') | |
def to_admin(connection, *arg): | |
protocol = connection.protocol | |
message = join_arguments(arg) | |
if not message: | |
return "Enter a message you want to send, like /admin I'm stuck" | |
prefix = '(TO ADMINS)' | |
irc_relay = protocol.irc_relay | |
if irc_relay: | |
if irc_relay.factory.bot and irc_relay.factory.bot.colors: | |
prefix = '\x0304' + prefix + '\x0f' | |
irc_relay.send(prefix + ' <%s> %s' % (connection.name, message)) | |
for player in protocol.players.values(): | |
if player.admin and player is not connection: | |
player.send_chat('To ADMINS from %s: %s' % | |
(connection.name, message)) | |
return 'Message sent to admins' | |
def streak(connection): | |
if connection not in connection.protocol.players: | |
raise KeyError() | |
return ('Your current kill streak is %s. Best is %s kills' % | |
(connection.streak, connection.best_streak)) | |
@admin | |
def lock(connection, value): | |
team = get_team(connection, value) | |
team.locked = True | |
connection.protocol.send_chat('%s team is now locked' % team.name) | |
connection.protocol.irc_say('* %s locked %s team' % (connection.name, | |
team.name)) | |
@admin | |
def unlock(connection, value): | |
team = get_team(connection, value) | |
team.locked = False | |
connection.protocol.send_chat('%s team is now unlocked' % team.name) | |
connection.protocol.irc_say('* %s unlocked %s team' % (connection.name, | |
team.name)) | |
@admin | |
def switch(connection, player = None, team = None): | |
protocol = connection.protocol | |
if player is not None: | |
player = get_player(protocol, player) | |
elif connection in protocol.players: | |
player = connection | |
else: | |
raise ValueError() | |
if player.team.spectator: | |
player.send_chat("The switch command can't be used on a spectating player.") | |
return | |
if team is None: | |
new_team = player.team.other | |
else: | |
new_team = get_team(connection, team) | |
if player.invisible: | |
old_team = player.team | |
player.team = new_team | |
player.on_team_changed(old_team) | |
player.spawn(player.world_object.position.get()) | |
player.send_chat('Switched to %s team' % player.team.name) | |
if connection is not player and connection in protocol.players: | |
connection.send_chat('Switched %s to %s team' % (player.name, | |
player.team.name)) | |
protocol.irc_say('* %s silently switched teams' % player.name) | |
else: | |
player.respawn_time = protocol.respawn_time | |
player.set_team(new_team) | |
protocol.send_chat('%s switched teams' % player.name, irc = True) | |
@name('setbalance') | |
@admin | |
def set_balance(connection, value): | |
try: | |
value = int(value) | |
except ValueError: | |
return 'Invalid value %r. Use 0 for off, 1 and up for on' % value | |
protocol = connection.protocol | |
protocol.balanced_teams = value | |
protocol.send_chat('Balanced teams set to %s' % value) | |
connection.protocol.irc_say('* %s set balanced teams to %s' % ( | |
connection.name, value)) | |
@name('togglebuild') | |
@admin | |
def toggle_build(connection, player = None): | |
if player is not None: | |
player = get_player(connection.protocol, player) | |
value = not player.building | |
player.building = value | |
msg = '%s can build again' if value else '%s is disabled from building' | |
connection.protocol.send_chat(msg % player.name) | |
connection.protocol.irc_say('* %s %s building for %s' % (connection.name, | |
['disabled', 'enabled'][int(value)], player.name)) | |
return | |
value = not connection.protocol.building | |
connection.protocol.building = value | |
on_off = ['OFF', 'ON'][int(value)] | |
connection.protocol.send_chat('Building has been toggled %s!' % on_off) | |
connection.protocol.irc_say('* %s toggled building %s' % (connection.name, | |
on_off)) | |
@name('togglekill') | |
@admin | |
def toggle_kill(connection, player = None): | |
if player is not None: | |
player = get_player(connection.protocol, player) | |
value = not player.killing | |
player.killing = value | |
msg = '%s can kill again' if value else '%s is disabled from killing' | |
connection.protocol.send_chat(msg % player.name) | |
connection.protocol.irc_say('* %s %s killing for %s' % (connection.name, | |
['disabled', 'enabled'][int(value)], player.name)) | |
return | |
value = not connection.protocol.killing | |
connection.protocol.killing = value | |
on_off = ['OFF', 'ON'][int(value)] | |
connection.protocol.send_chat('Killing has been toggled %s!' % on_off) | |
connection.protocol.irc_say('* %s toggled killing %s' % (connection.name, | |
on_off)) | |
@name('toggleteamkill') | |
@admin | |
def toggle_teamkill(connection): | |
value = not connection.protocol.friendly_fire | |
connection.protocol.friendly_fire = value | |
on_off = ['OFF', 'ON'][int(value)] | |
connection.protocol.send_chat('Friendly fire has been toggled %s!' % on_off) | |
connection.protocol.irc_say('* %s toggled friendly fire %s' % ( | |
connection.name, on_off)) | |
@admin | |
def mute(connection, value): | |
player = get_player(connection.protocol, value) | |
if player.mute: | |
return '%s is already muted' % player.name | |
player.mute = True | |
message = '%s has been muted by %s' % (player.name, connection.name) | |
connection.protocol.send_chat(message, irc = True) | |
@admin | |
def unmute(connection, value): | |
player = get_player(connection.protocol, value) | |
if not player.mute: | |
return '%s is not muted' % player.name | |
player.mute = False | |
message = '%s has been unmuted by %s' % (player.name, connection.name) | |
connection.protocol.send_chat(message, irc = True) | |
def deaf(connection, value = None): | |
if value is not None: | |
if not connection.admin and not connection.rights.deaf: | |
return 'No administrator rights!' | |
connection = get_player(connection.protocol, value) | |
message = '%s deaf' % ('now' if not connection.deaf else 'no longer') | |
connection.protocol.irc_say('%s is %s' % (connection.name, message)) | |
message = "You're " + message | |
if connection.deaf: | |
connection.deaf = False | |
connection.send_chat(message) | |
else: | |
connection.send_chat(message) | |
connection.deaf = True | |
@name('globalchat') | |
@admin | |
def global_chat(connection): | |
connection.protocol.global_chat = not connection.protocol.global_chat | |
connection.protocol.send_chat('Global chat %s' % ('enabled' if | |
connection.protocol.global_chat else 'disabled'), irc = True) | |
@alias('tp') | |
@admin | |
def teleport(connection, player1, player2 = None, silent = False): | |
player1 = get_player(connection.protocol, player1) | |
if player2 is not None: | |
if connection.admin or connection.rights.teleport_other: | |
player, target = player1, get_player(connection.protocol, player2) | |
silent = silent or player.invisible | |
message = ('%s ' + ('silently ' if silent else '') + 'teleported ' | |
'%s to %s') | |
message = message % (connection.name, player.name, target.name) | |
else: | |
return 'No administrator rights!' | |
else: | |
if connection not in connection.protocol.players: | |
raise ValueError() | |
player, target = connection, player1 | |
silent = silent or player.invisible | |
message = '%s ' + ('silently ' if silent else '') + 'teleported to %s' | |
message = message % (player.name, target.name) | |
player.set_location(target.get_location()) | |
if silent: | |
connection.protocol.irc_say('* ' + message) | |
else: | |
connection.protocol.send_chat(message, irc = True) | |
@admin | |
def unstick(connection, player = None): | |
if player is not None: | |
player = get_player(connection.protocol, player) | |
else: | |
player = connection | |
connection.protocol.send_chat("%s unstuck %s" % | |
(connection.name, player.name), irc = True) | |
player.set_location_safe(player.get_location()) | |
@alias('tps') | |
@admin | |
def tpsilent(connection, player1, player2 = None): | |
teleport(connection, player1, player2, silent = True) | |
from pyspades.common import coordinates, to_coordinates | |
@name('goto') | |
@admin | |
def go_to(connection, value): | |
if connection not in connection.protocol.players: | |
raise KeyError() | |
move(connection, connection.name, value, silent = connection.invisible) | |
@admin | |
def move(connection, player, value, silent = False): | |
player = get_player(connection.protocol, player) | |
x, y = coordinates(value) | |
x += 32 | |
y += 32 | |
player.set_location((x, y, connection.protocol.map.get_height(x, y) - 2)) | |
if connection is player: | |
message = ('%s ' + ('silently ' if silent else '') + 'teleported to ' | |
'location %s') | |
message = message % (player.name, value.upper()) | |
else: | |
message = ('%s ' + ('silently ' if silent else '') + 'teleported %s ' | |
'to location %s') | |
message = message % (connection.name, player.name, value.upper()) | |
if silent: | |
connection.protocol.irc_say('* ' + message) | |
else: | |
connection.protocol.send_chat(message, irc = True) | |
@admin | |
def where(connection, value = None): | |
if value is not None: | |
connection = get_player(connection.protocol, value) | |
elif connection not in connection.protocol.players: | |
raise ValueError() | |
x, y, z = connection.get_location() | |
return '%s is in %s (%s, %s, %s)' % (connection.name, | |
to_coordinates(x, y), int(x), int(y), int(z)) | |
@admin | |
def god(connection, value = None): | |
if value is not None: | |
connection = get_player(connection.protocol, value) | |
elif connection not in connection.protocol.players: | |
raise ValueError() | |
connection.god = not connection.god | |
if connection.protocol.set_god_build: | |
connection.god_build = connection.god | |
else: | |
connection.god_build = False | |
if connection.god: | |
message = '%s entered GOD MODE!' % connection.name | |
else: | |
message = '%s returned to being a mere human' % connection.name | |
connection.protocol.send_chat(message, irc = True) | |
@name('godbuild') | |
@admin | |
def god_build(connection, player = None): | |
protocol = connection.protocol | |
if player is not None: | |
player = get_player(protocol, player) | |
elif connection in protocol.players: | |
player = connection | |
else: | |
raise ValueError() | |
if not player.god: | |
return 'Placing god blocks is only allowed in god mode' | |
player.god_build = not player.god_build | |
message = ('now placing god blocks' if player.god_build else | |
'no longer placing god blocks') | |
player.send_chat("You're %s" % message) | |
if connection is not player and connection in protocol.players: | |
connection.send_chat('%s is %s' % (player.name, message)) | |
protocol.irc_say('* %s is %s' % (player.name, message)) | |
@admin | |
def fly(connection, player = None): | |
protocol = connection.protocol | |
if player is not None: | |
player = get_player(protocol, player) | |
elif connection in protocol.players: | |
player = connection | |
else: | |
raise ValueError() | |
player.fly = not player.fly | |
message = 'now flying' if player.fly else 'no longer flying' | |
player.send_chat("You're %s" % message) | |
if connection is not player and connection in protocol.players: | |
connection.send_chat('%s is %s' % (player.name, message)) | |
protocol.irc_say('* %s is %s' % (player.name, message)) | |
from pyspades.contained import KillAction | |
from pyspades.server import create_player, set_tool, set_color, input_data, weapon_input | |
from pyspades.common import make_color | |
@alias('invis') | |
@alias('inv') | |
@admin | |
def invisible(connection, player = None): | |
protocol = connection.protocol | |
if player is not None: | |
player = get_player(protocol, player) | |
elif connection in protocol.players: | |
player = connection | |
else: | |
raise ValueError() | |
player.invisible = not player.invisible | |
player.filter_visibility_data = player.invisible | |
player.god = player.invisible | |
player.god_build = False | |
player.killing = not player.invisible | |
if player.invisible: | |
player.send_chat("You're now invisible") | |
protocol.irc_say('* %s became invisible' % player.name) | |
kill_action = KillAction() | |
kill_action.kill_type = choice([GRENADE_KILL, FALL_KILL]) | |
kill_action.player_id = kill_action.killer_id = player.player_id | |
reactor.callLater(1.0 / NETWORK_FPS, protocol.send_contained, | |
kill_action, sender = player) | |
else: | |
player.send_chat("You return to visibility") | |
protocol.irc_say('* %s became visible' % player.name) | |
x, y, z = player.world_object.position.get() | |
create_player.player_id = player.player_id | |
create_player.name = player.name | |
create_player.x = x | |
create_player.y = y | |
create_player.z = z | |
create_player.weapon = player.weapon | |
create_player.team = player.team.id | |
world_object = player.world_object | |
input_data.player_id = player.player_id | |
input_data.up = world_object.up | |
input_data.down = world_object.down | |
input_data.left = world_object.left | |
input_data.right = world_object.right | |
input_data.jump = world_object.jump | |
input_data.crouch = world_object.crouch | |
input_data.sneak = world_object.sneak | |
input_data.sprint = world_object.sprint | |
set_tool.player_id = player.player_id | |
set_tool.value = player.tool | |
set_color.player_id = player.player_id | |
set_color.value = make_color(*player.color) | |
weapon_input.primary = world_object.primary_fire | |
weapon_input.secondary = world_object.secondary_fire | |
protocol.send_contained(create_player, sender = player, save = True) | |
protocol.send_contained(set_tool, sender = player) | |
protocol.send_contained(set_color, sender = player, save = True) | |
protocol.send_contained(input_data, sender = player) | |
protocol.send_contained(weapon_input, sender = player) | |
if connection is not player and connection in protocol.players: | |
if player.invisible: | |
return '%s is now invisible' % player.name | |
else: | |
return '%s is now visible' % player.name | |
@admin | |
def ip(connection, value = None): | |
if value is None: | |
if connection not in connection.protocol.players: | |
raise ValueError() | |
player = connection | |
else: | |
player = get_player(connection.protocol, value) | |
return 'The IP of %s is %s' % (player.name, player.address[0]) | |
@name('whowas') | |
@admin | |
def who_was(connection, value): | |
value = value.lower() | |
ret = None | |
exact_match = False | |
for name, ip in connection.protocol.player_memory: | |
name_lower = name.lower() | |
if name_lower == value: | |
ret = (name, ip) | |
exact_match = True | |
elif not exact_match and name_lower.count(value): | |
ret = (name, ip) | |
if ret is None: | |
raise InvalidPlayer() | |
return "%s's most recent IP was %s" % ret | |
@name('resetgame') | |
@admin | |
def reset_game(connection): | |
resetting_player = connection | |
# irc compatibility | |
if resetting_player not in connection.protocol.players: | |
for player in connection.protocol.players.values(): | |
resetting_player = player | |
if player.admin: | |
break | |
if resetting_player is connection: | |
return | |
connection.protocol.reset_game(resetting_player) | |
connection.protocol.on_game_end() | |
connection.protocol.send_chat('Game has been reset by %s' % connection.name, | |
irc = True) | |
from map import Map | |
import itertools | |
@name('map') | |
@admin | |
def change_planned_map(connection, *pre_maps): | |
name = connection.name | |
protocol = connection.protocol | |
# parse seed numbering | |
maps, map_list = parse_maps(pre_maps) | |
if not maps: | |
return 'Invalid map name' | |
map = maps[0] | |
protocol.planned_map = check_rotation([map])[0] | |
protocol.send_chat('%s changed next map to %s' % (name, map), irc = True) | |
@name('rotation') | |
@admin | |
def change_rotation(connection, *pre_maps): | |
name = connection.name | |
protocol = connection.protocol | |
maps, map_list = parse_maps(pre_maps) | |
if len(maps) == 0: | |
return 'Usage: /rotation <map1> <map2> <map3>...' | |
ret = protocol.set_map_rotation(maps, False) | |
if not ret: | |
return 'Invalid map in map rotation (%s)' % ret.map | |
protocol.send_chat("%s changed map rotation to %s." % | |
(name, map_list), irc=True) | |
@name('rotationadd') | |
@admin | |
def rotation_add(connection, *pre_maps): | |
name = connection.name | |
protocol = connection.protocol | |
new_maps, map_list = parse_maps(pre_maps) | |
maps = connection.protocol.get_map_rotation() | |
map_list = ", ".join(maps) + map_list | |
maps.extend(new_maps) | |
ret = protocol.set_map_rotation(maps, False) | |
if not ret: | |
return 'Invalid map in map rotation (%s)' % ret.map | |
protocol.send_chat("%s added %s to map rotation." % | |
(name, " ".join(pre_maps)), irc=True) | |
@name('showrotation') | |
def show_rotation(connection): | |
return ", ".join(connection.protocol.get_map_rotation()) | |
@name('revertrotation') | |
@admin | |
def revert_rotation(connection): | |
protocol = connection.protocol | |
maps = protocol.config['maps'] | |
protocol.set_map_rotation(maps, False) | |
protocol.irc_say("* %s reverted map rotation to %s" % (name, maps)) | |
def mapname(connection): | |
return 'Current map: ' + connection.protocol.map_info.name | |
@admin | |
def advance(connection): | |
connection.protocol.advance_rotation('Map advance forced.') | |
@name('timelimit') | |
@admin | |
def set_time_limit(connection, value): | |
value = float(value) | |
protocol = connection.protocol | |
protocol.set_time_limit(value) | |
protocol.send_chat('Time limit set to %s' % value, irc = True) | |
@name('time') | |
def get_time_limit(connection): | |
advance_call = connection.protocol.advance_call | |
if advance_call is None: | |
return 'No time limit set' | |
left = int(math.ceil((advance_call.getTime() - reactor.seconds()) / 60.0)) | |
return 'There are %s minutes left' % left | |
@name('servername') | |
@admin | |
def server_name(connection, *arg): | |
name = join_arguments(arg) | |
protocol = connection.protocol | |
protocol.config['name'] = name | |
protocol.update_format() | |
message = "%s changed servername to to '%s'" % (connection.name, name) | |
print message | |
connection.protocol.irc_say("* " + message) | |
if connection in connection.protocol.players: | |
return message | |
@name('master') | |
@admin | |
def toggle_master(connection): | |
protocol = connection.protocol | |
protocol.set_master_state(not protocol.master) | |
message = ("toggled master broadcast %s" % ['OFF', 'ON'][ | |
int(protocol.master)]) | |
protocol.irc_say("* %s " % connection.name + message) | |
if connection in connection.protocol.players: | |
return ("You " + message) | |
def ping(connection, value = None): | |
if value is None: | |
if connection not in connection.protocol.players: | |
raise ValueError() | |
player = connection | |
else: | |
player = get_player(connection.protocol, value) | |
ping = player.latency | |
if value is None: | |
return ('Your ping is %s ms. Lower ping is better!' % ping) | |
return "%s's ping is %s ms" % (player.name, ping) | |
def intel(connection): | |
if connection not in connection.protocol.players: | |
raise KeyError() | |
flag = connection.team.other.flag | |
if flag.player is not None: | |
if flag.player is connection: | |
return "You have the enemy intel, return to base!" | |
else: | |
return "%s has the enemy intel!" % flag.player.name | |
return "Nobody in your team has the enemy intel" | |
def version(connection): | |
return 'Server version is "%s"' % connection.protocol.server_version | |
@name('server') | |
def server_info(connection): | |
protocol = connection.protocol | |
msg = 'You are playing on "%s"' % protocol.name | |
if protocol.identifier is not None: | |
msg += ' at %s' % protocol.identifier | |
return msg | |
def scripts(connection): | |
scripts = connection.protocol.config.get('scripts', []) | |
return 'Scripts enabled: %s' % (', '.join(scripts)) | |
@admin | |
def fog(connection, r, g, b): | |
r = int(r) | |
g = int(g) | |
b = int(b) | |
connection.protocol.set_fog_color((r, g, b)) | |
def weapon(connection, value): | |
player = get_player(connection.protocol, value) | |
if player.weapon_object is None: | |
name = '(unknown)' | |
else: | |
name = player.weapon_object.name | |
return '%s has a %s' % (player.name, name) | |
command_list = [ | |
help, | |
pm, | |
to_admin, | |
login, | |
kick, | |
intel, | |
ip, | |
who_was, | |
fog, | |
ban, | |
banip, | |
unban, | |
undo_ban, | |
mute, | |
unmute, | |
deaf, | |
global_chat, | |
say, | |
kill, | |
heal, | |
lock, | |
unlock, | |
switch, | |
set_balance, | |
rules, | |
toggle_build, | |
toggle_kill, | |
toggle_teamkill, | |
teleport, | |
tpsilent, | |
go_to, | |
move, | |
unstick, | |
where, | |
god, | |
god_build, | |
fly, | |
invisible, | |
streak, | |
reset_game, | |
toggle_master, | |
change_planned_map, | |
change_rotation, | |
revert_rotation, | |
show_rotation, | |
rotation_add, | |
advance, | |
set_time_limit, | |
get_time_limit, | |
server_name, | |
ping, | |
version, | |
server_info, | |
scripts, | |
weapon, | |
mapname | |
] | |
def add(func, name = None): | |
""" | |
Function to add a command from scripts | |
""" | |
if name is None: | |
name = func.func_name | |
name = name.lower() | |
func.argspec = inspect.getargspec(func) | |
add_rights(name, *getattr(func, 'user_types', ())) | |
commands[name] = func | |
try: | |
for alias in func.aliases: | |
aliases[alias.lower()] = name | |
except AttributeError: | |
pass | |
for command_func in command_list: | |
add(command_func) | |
# optional commands | |
try: | |
import pygeoip | |
database = pygeoip.GeoIP('./data/GeoLiteCity.dat') | |
@name('from') | |
def where_from(connection, value = None): | |
if value is None: | |
if connection not in connection.protocol.players: | |
raise ValueError() | |
player = connection | |
else: | |
player = get_player(connection.protocol, value) | |
record = database.record_by_addr(player.address[0]) | |
if record is None: | |
return 'Player location could not be determined.' | |
items = [] | |
for entry in ('country_name', 'city', 'region_name'): | |
# sometimes, the record entries are numbers or nonexistent | |
try: | |
value = record[entry] | |
int(value) # if this raises a ValueError, it's not a number | |
continue | |
except KeyError: | |
continue | |
except ValueError: | |
pass | |
items.append(value) | |
return '%s is from %s' % (player.name, ', '.join(items)) | |
add(where_from) | |
except ImportError: | |
print "('from' command disabled - missing pygeoip)" | |
except (IOError, OSError): | |
print "('from' command disabled - missing data/GeoLiteCity.dat)" | |
def handle_command(connection, command, parameters): | |
command = command.lower() | |
try: | |
command = aliases[command] | |
except KeyError: | |
pass | |
try: | |
command_func = commands[command] | |
except KeyError: | |
return # 'Invalid command' | |
if len(parameters) < len(command_func.argspec.args) | |
or command_func.argspec.varargs is None | |
and len(parameters) > len(command_func.argspec.args) | |
return 'Invalid number of arguments for %s' % command | |
try: | |
if not has_rights(command_func, connection): | |
return "You can't use this command" | |
return command_func(connection, *parameters) | |
except KeyError: | |
return # 'Invalid command' | |
except TypeError, t: | |
print 'Command', command, 'failed with args:', parameters | |
print t | |
return 'Command failed' | |
except InvalidPlayer: | |
return 'No such player' | |
except InvalidTeam: | |
return 'Invalid team specifier' | |
except ValueError: | |
return 'Invalid parameters' | |
def debug_handle_command(connection, command, parameters): | |
# use this when regular handle_command eats errors | |
if connection in connection.protocol.players: | |
connection.send_chat("Commands are in DEBUG mode") | |
command = command.lower() | |
try: | |
command = aliases[command] | |
except KeyError: | |
pass | |
try: | |
command_func = commands[command] | |
except KeyError: | |
return # 'Invalid command' | |
if not has_rights(command_func, connection): | |
return "You can't use this command" | |
return command_func(connection, *parameters) | |
# handle_command = debug_handle_command | |
def handle_input(connection, input): | |
# for IRC and console | |
return handle_command(connection, *parse_command(input)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment