OS命令注入漏洞是一个十分严重,但是又十分容易被人忽视的一个问题。
在后台开发过程中,经常会遇到需要执行一些shell命令,但是命令的参数需要通过Restful API传入,举个简单的例子:
后台实现了一个ping的功能,用户可以指定ping的目标,假设后台代码:
def ping(target):
return subprocess.getstatusoutput('ping -c 1 %s' % target)
如果用户正确传target值,那么没啥问题,比如用户想ping下百度,最终调用是:
ping("www.baidu.com")
执行的shell命令是:
ping -c 1 www.baidu.com
但是如果有一个用户,恶意的传了这么一个target: baidu.com; rm -rf /
,后台直接填充到ping命令中去,那么后台执行的命令就是:
ping -c 1 baidu.com; rm -rf /
如果你是以root用户执行的话,那就只能赶紧跑路了。
防御的办法有这么几个思路:
|
,&&
,$()
,;
等使用带有安全机制的函数库,在上面那个例子中,subprocess.getstatusoutput
底层是调用的subprocess.check_output
并且参数中设置了shell=True
:
def getstatusoutput(cmd):
try:
data = check_output(cmd, shell=True, universal_newlines=True, stderr=STDOUT)
exitcode = 0
except CalledProcessError as ex:
data = ex.output
exitcode = ex.returncode
if data[-1:] == '\n':
data = data[:-1]
return exitcode, data
问题就出在shell=True
上,设置为True就是直接把命令丢给shell处理的,就会出现上述问题。如果设置成False
,再把subprocess接收的Command参数改成一个List,例如:["ls", "-l"]
,在运行时,subprocess会把Command参数List的第一个元素作为执行命令,后面的项都强制作为参数去处理,也就没有注入的风险了。
通过阅读源码,发现getstatusoutput
下面是调用的subprocess.check_output
,我们可以仿照其实现改写一下getstatusoutput
函数,最终要的是设置shell=False
:
def getstatusoutput_safe(cmd):
try:
data = check_output(cmd, shell=False, universal_newlines=True, stderr=STDOUT)
exitcode = 0
except CalledProcessError as ex:
data = ex.output
exitcode = ex.returncode
if data[-1:] == '\n':
data = data[:-1]
return exitcode, data
def ping(target):
return getstatusoutput_safe(["ping", "-c", "1", target])
但上面的用法有1个弊端,就是无法使用管道,而管道又是shell命令中十分重要的东西,使用频率很高,这里简单提供一个函数,实现shell命令的管道功能。
def exit_processes(process_list):
for process in process_list:
try:
LOG.debug("exit process: %s" % process)
if process.stdout:
process.stdout.close()
if process.stderr:
process.stderr.close()
try: # Flushing a BufferedWriter may raise an error
if process.stdin:
process.stdin.close()
finally:
# Wait for the process to terminate, to avoid zombies.
process.wait()
except:
pass
def shell(cmds, check_result=False, success_code=0, shell=False, timeout=30):
LOG.debug("cmd: %s" % cmds)
args_list = []
process_list = []
args = []
for arg in cmds:
if arg == "|":
args_list.append(args)
args = []
continue
args.append(arg)
if args:
args_list.append(args)
try:
pre_process = None
for args in args_list:
if pre_process:
process = subprocess.Popen(args,
stdin=pre_process.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell)
else:
process = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell)
process_list.append(process)
pre_process = process
try:
output, errorput = pre_process.communicate(timeout=timeout)
except Exception as e:
pre_process.kill()
pre_process.wait()
raise e
if output:
output = output.decode().rstrip()
returncode = pre_process.returncode
if check_result and returncode != success_code:
if errorput:
errorput = errorput.decode().rstrip()
raise Exception("shell command run failed.")
return returncode, output
finally:
exit_processes(process_list)
使用方法:
shell(["ls", "-l", "/", "|", "grep", "usr"])