#! /usr/bin/env python ############################################################################# ## ## ## pytstop.py --- Python debugging engine ## ## see http://www.secdev.org/projects/pytstop/ ## ## for more informations ## ## ## ## Copyright (C) 2005 Philippe Biondi ## ## ## ## This program is free software; you can redistribute it and/or modify it ## ## under the terms of the GNU General Public License version 2 as ## ## published by the Free Software Foundation; version 2. ## ## ## ## This program is distributed in the hope that it will be useful, but ## ## WITHOUT ANY WARRANTY; without even the implied warranty of ## ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## ## General Public License for more details. ## ## ## ############################################################################# # $Id: pytstop.py,v 1.6 2006/02/22 15:03:02 pbi Exp pbi $ import os, ptrace import struct import logging mainlog = logging.getLogger("pytstop") console_handler = logging.StreamHandler() #console_handler.setFormatter(logging.Formatter("[%(name)s] %(levelname)s: %(message)s")) console_handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s")) mainlog.addHandler(console_handler) processlog = logging.getLogger("pytstop.process") bplog = logging.getLogger("pytstop.process.bp") signallog = logging.getLogger("pytstop.process.signal") eventlog = logging.getLogger("pytstop.process.event") userlog = logging.getLogger("pytstop.user") debuglog = logging.getLogger("pytstop.debug") mainlog.setLevel(1) class PytstopException(Exception): pass class ProcessError(PytstopException): pass class UserError(PytstopException): pass class ProcessTerminated(ProcessError): pass class ProcessCrashed(ProcessTerminated): pass class ProcessExited(ProcessTerminated): pass SIGTRAP = 5 SIGSEGV = 11 class BP: HWBREAKPOINT = 1 BREAKPOINT = 2 WATCHPOINT = 3 ### Hexdump def sane(x): if 32 <= ord(x) < 127: return x else: return "." def hexdump(p,start=0): for i in range((len(p)+15)/16): h,a = zip(*map(lambda c: ("%02x"%ord(c),sane(c)), p[i*16:(i+1)*16])) print "%08x: %-47s %s" % (start+i*16, " ".join(h), "".join(a)) ### Do stuff class Processor: def __init__(self): self.max_isn_size = 16 self.pc_name = "eip" def get_isn(opcodes): return "" def emul(self, isn, vbox): pass class i386(Processor): regs= [ "eax", "ebx", "ecx", "edx", "esi", "edi", "ebp", "ds", "es", "fs", "gs", "orig_eax", "eip", "cs", "eflags", "esp", "ss", "sig", "dr0", "dr1", "dr2", "dr3", "dr4", "dr5", "dr6", "dr7", ] def __init__(self): Processor.__init__(self) def get_pc_name(self): return "eip" def get_all_regs(self): return self.regs[:] class i486(i386): pass class i586(i486): pass class i686(i586): pass name2proc = { "i386" : i386, "i486" : i486, "i586" : i586, "i686" : i686, } class VBox: def __init__(self, proc=None): self.breakpoints = [] self.proc = proc self.regs = self.proc.get_all_regs() self.signal = 0 self.status = 0 self.breaked = 0 self.bplist={} self.hbplist={} self.br_addrlist={} def get_mem(self, addr): return 0 def set_mem(self, addr, val): pass def get_memblock(self, addr, size): v = "" i = 0 while i < size: v += struct.pack("I",self.get_mem(addr+i)) i += 4 return v[:size] def set_memblock(self, addr, val): for i in range(len(val)/4): self.set_mem(addr, struct.unpack("I",val[i*4:i*4+4])[0]) addr += 4 rem = len(val)%4 if rem: d = struct.pack("I",self.get_mem(addr)) v = val[-rem:]+d[rem:] self.set_mem(addr, struct.unpack("I", v)[0]) def get_reg(self, name): return 0 def get_pc(self): return self.get_reg(self.proc.pc_name) def set_reg(self, name, val): pass def get_all_regs(): return {} def opcode(self, eip=None): raise NotImplemented("Not ready yet...") def mnemonic(self, eip=None): raise NotImplemented("Not ready yet...") def get_isn(self, addr=None): raise NotImplemented("Not ready yet...") def emul_isn(self, isn = None): raise NotImplemented("Not ready yet...") def dump(self, filename): return def hexdump(self, addr=None, length=64): if addr is None: addr = self.get_pc()-16 hexdump(self[addr::length], addr) ## Breakpoint management def _find_bp_idx(self, addr, typ, num): i = 0 vlst = self.br_addrlist[addr] while i < len(vlst) and (vlst[i][0] != typ or vlst[i][1] != num): i += 1 if i >= len(vlst): raise Exception("BP not found") return i # Soft breakpoints def _set_bp(self, addr, num): pass def _restore_bp(self, addr, num, saved): pass def set_bp(self, addr, num=-1): if num < 0: num = max(self.bplist.keys()+[-1])+1 saved = self._set_bp(addr, num) self.bplist[num] = addr v = (BP.BREAKPOINT, num, saved) if addr in self.br_addrlist: self.br_addrlist[addr].append(v) else: self.br_addrlist[addr] = [v] return num def get_bp(self, num): return self.bplist[num] def del_bp(self, num): addr = self.bplist[num] vlst = self.br_addrlist[addr] i = self._find_bp_idx(addr, BP.BREAKPOINT, num) saved = vlst[i][2] self._restore_bp(addr, num, saved) del(vlst[i]) del(self.bplist[num]) # Hardware breakpoints def _set_hbp(self, addr, num): pass def _restore_hbp(self, addr, num, saved): pass def set_hbp(self, addr, num=-1): if num < 0: num = 0 while num in self.hbplist: num += 1 saved = self._set_hbp(addr, num) self.hbplist[num] = addr v = (BP.HWBREAKPOINT, num, saved) if addr in self.br_addrlist: self.br_addrlist[addr].append(v) else: self.br_addrlist[addr] = [v] return num def get_hbp(self, num): return self.hbplist[num] def del_hbp(self, num): addr = self.hbplist[num] vlst = self.br_addrlist[addr] i = self._find_bp_idx(addr, BP.HWBREAKPOINT, num) saved = vlst[i][2] self._restore_hbp(addr, num, saved) del(vlst[i]) if not vlst: del(self.br_addrlist[addr]) del(self.hbplist[num]) # Program control def cont(self, sig=None, wait=1): return 0 def syscall(self, sig=None, wait=1): return 0 def singlestep(self, sig=None, wait=1): return 0 def bt(self): """Print backtrace""" def __repr__(self): return "" % (self.get_pc(),self.signal) def __getattr__(self, attr): if "regs" in self.__dict__ and attr in self.regs: return self.get_reg(attr) raise AttributeError(attr) def __setattr__(self, attr, val): if "regs" in self.__dict__ and attr in self.regs: self.set_reg(attr, val) else: self.__dict__[attr] = val def __getitem__(self, loc): if type(loc) is slice: if loc.stop: return self.get_memblock(loc.start, loc.stop-loc.start) elif loc.step: return self.get_memblock(loc.start, loc.step) else: return self.get_mem(loc.start) else: return self.get_mem(loc) def __setitem__(self, loc, val): if type(loc) is slice: loc = slice.start if type(val) is str: self.set_memblock(loc, val) else: self.set_mem(loc, val) def __call__(self, type=0, *args, **kargs): """type: 0=cont 1=syscall 2=singlestep""" if type == 0: return self.cont(*args,**kargs) elif type == 1: return self.syscall(*args,**kargs) elif type == 2: return self.syscall(*args,**kargs) def __iter__(self): try: while 1: self.singlestep() yield self.get_pc() except ProcessExited: raise StopIteration # From /usr/src/linux/include/asm-i386/user.h # from struct user USER_EAX = 6*4 USER_EBX = 0*4 USER_ECX = 1*4 USER_EDX = 2*4 USER_ESI = 3*4 USER_EDI = 4*4 USER_EBP = 5*4 USER_DS = 7*4 USER_ES = 8*4 USER_FS = 9*4 USER_GS = 10*4 USER_ORIG_EAX = 11*4 USER_EIP = 12*4 USER_CS = 13*4 USER_EFLAGS = 14*4 USER_ESP = 15*4 USER_SS = 16*4 USER_FPVALID = 17*4 USER_SIG = 46*4 USER_DR0 = 63*4 USER_DR1 = 64*4 USER_DR2 = 65*4 USER_DR3 = 66*4 USER_DR4 = 67*4 USER_DR5 = 68*4 USER_DR6 = 69*4 USER_DR7 = 70*4 class Ptrace(VBox): registers_loc = { "eax": USER_EAX, "ebx": USER_EBX, "ecx": USER_ECX, "edx": USER_EDX, "esi": USER_ESI, "edi": USER_EDI, "ebp": USER_EBP, "ds": USER_DS, "es": USER_ES, "fs": USER_FS, "gs": USER_GS, "orig_eax": USER_ORIG_EAX, "eip": USER_EIP, "cs": USER_CS, "eflags": USER_EFLAGS, "esp": USER_ESP, "ss": USER_SS, "sig": USER_SIG, "dr0": USER_DR0, "dr1": USER_DR1, "dr2": USER_DR2, "dr3": USER_DR3, "dr4": USER_DR4, "dr5": USER_DR5, "dr6": USER_DR6, "dr7": USER_DR7, } def __init__(self, *args): VBox.__init__(self, name2proc[os.uname()[4]]()) self.args=args self.ptrace_it(*args) def ptrace_it(self, *args): self.signal = 0 if type(args[0]) is int: self.pid = args[0] ptrace.attach(self.pid) self.filename = "/proc/%i/exe" % self.pid else: self.filename = args[0] self.pid = os.fork() if self.pid == 0: ptrace.traceme() os.execvp(args[0],args) self.procpid = "/proc/%i/" % self.pid os.waitpid(self.pid,0) eventlog.info("process %i attached" % self.pid) def __del__(self): try: ptrace.kill(self.pid) except OSError: pass def _set_hbp(self, addr, num): saved = None if 0 <= num < 4: reg = "dr%i" % num mask = (0xfL << (16+4*num)) | (3L << (2*num)) dr7 = self.dr7 saved = getattr(self, reg),mask,dr7 setattr(self, reg, addr) dr7 &= mask dr7 |= 1 << (1+2*num) # break on isn, 1 byte lenght bp, not global, local self.dr7 = dr7 return saved def _restore_hbp(self, addr, num, saved): saddr, smask, sdr7 = saved if 0 <= num < 4: reg = "dr%i" % num dr7 = self.dr7 dr7 &= smask dr7 |= (smask & sdr7) self.dr7 = dr7 setattr(self, reg, saddr) def _set_bp(self, addr, num): saved = self[addr::1] self[addr] = "\xcc" return saved def _restore_bp(self, addr, num, saved): self[addr] = saved def get_reg(self, name): try: ret = ptrace.peekuser(self.pid, self.registers_loc[name]) & 0xffffffffL except OSError,m: raise ProcessCrashed(m) return ret def set_reg(self, name, val): if val & 0x80000000L: val |= ~0xffffffffL try: ptrace.pokeuser(self.pid, self.registers_loc[name], val) except OSError,m: raise ProcessCrashed(m) def get_all_regs(self): # all_regs = self.proc.get_all_regs() all_regs = self.registers_loc.keys() return dict(map(lambda x: (x, self.get_reg(x)), all_regs)) def get_mem(self, addr): if addr & 0x80000000L: addr |= ~0xffffffffL try: ret = ptrace.peekdata(self.pid, addr) & 0xffffffffL except OSError,m: raise ProcessCrashed(m) return ret def set_mem(self, addr, val): if addr & 0x80000000L: addr |= ~0xffffffffL if val & 0x80000000L: val |= ~0xffffffffL try: ret = ptrace.pokedata(self.pid, addr, val) except OSError,m: raise ProcessCrashed(m) return ret def get_memblock(self, addr, size): f = open(self.procpid+"mem") f.seek(addr) return f.read(size) ## Process control def cont(self, sig=None, wait=1): if sig is None: sig = self.signal self.breaked = 0 if self.eip in self.br_addrlist: addr = self.eip for typ, num, sav in self.br_addrlist[addr]: if typ == BP.BREAKPOINT: self._restore_bp(addr, num, sav) elif typ == BP.HWBREAKPOINT: self._restore_hbp(addr, num, sav) debuglog.debug("pass bp at %08x" % self.eip) w = self.singlestep(sig) debuglog.debug("passed pb. Now at %08x" % self.eip) for typ, num, sav in self.br_addrlist[addr]: if typ == BP.BREAKPOINT: self._set_bp(addr, num) elif typ == BP.HWBREAKPOINT: self._set_hbp(addr, num) # if self.breaked and wait: # return w try: ptrace.cont(self.pid, sig) except OSError,m: raise ProcessCrashed(m) if wait: return self.wait() def singlestep(self, sig=None, wait=1): if sig is None: sig = self.signal try: ptrace.singlestep(self.pid, sig) except OSError,m: raise ProcessCrashed(m) if wait: return self.wait() def syscall(self, sig=None, wait=1): if sig is None: sig = self.signal try: ptrace.syscall(self.pid, sig) except OSError,m: raise ProcessCrashed(m) if wait: ret = self.wait() if self.signal == 5: self.breaked = self.signal self.signal = 0 return ret def detach(self, sig=0): try: ptrace.detach(self.pid, sig) except OSError,m: raise ProcessCrashed(m) def kill(self): try: ptrace.kill(self.pid) except OSError,m: raise ProcessCrashed(m) def restart(self): try: self.kill() except: pass self.signal = self.breaked = self.status = 0 self.ptrace_it(*self.args) for num,addr in self.hbplist.items(): self.set_hbp(addr,num) for num,addr in self.bplist.items(): self.set_bp(addr,num) def wait(self): s = os.waitpid(self.pid,0)[1] self.status = s & 0xff if os.WIFEXITED(s): raise ProcessExited(os.WEXITSTATUS(s)) elif os.WIFSTOPPED(s): self.signal = os.WSTOPSIG(s) signallog.info("Received stop signal %i" % self.signal) elif os.WIFSIGNALED(s): self.signal = os.WTERMSIG(s) signallog.info("Received signal %i" % self.signal) else: eventlog.warning("wait: unknown status %02x, sig=%02x" % ( self.status, (s >> 8) & 0xff)) self.breaked = 0 if self.signal == SIGTRAP: if self.dr6 & 0x4000: # singlestep self.breaked = self.signal self.signal = 0 bplog.debug("TRAP: singlestep") elif self.dr6 & 0x2000: # access to debug registers self.breaked = self.signal self.signal = 0 bplog.info("Process attempts to access debug registers") else: hbpmask = 0 for i in range(4): if i in self.hbplist: hbpmask |= 1L<" % (self.filename, self.get_pc(), self.signal) if __name__ == "__main__": import code code.interact(banner="Welcome to pytstop debugger", local=globals())