lcat
My personal /var/log
  • Home
  • Contact
  • HackMe!

[SCRIPT] Proxy all Android app traffic to Burp Suite via TPROXY

Fully AI-generated, but works.

```
#!/usr/bin/env python3
"""
tproxy.py — Per-app transparent TCP proxy for Android → Burp Suite

Captures an Android app's TCP traffic via iptables NAT REDIRECT (per UID),
recovers the original destination with SO_ORIGINAL_DST, then forwards each
connection to Burp Suite's HTTP proxy using the CONNECT method.

Usage:
    python tproxy.py <app_uid> <proxy_host:proxy_port> [-p <tproxy_port>]
    python tproxy.py --clean <app_uid>          # remove stale rules

Examples:
    python tproxy.py 10183 192.168.1.100:8080
    python tproxy.py 10183 192.168.1.100:8080 -p 5280 -v
    python tproxy.py --clean 10183

Requires: root (su / sudo), iptables, Python 3.7+
"""

import argparse
import logging
import os
import select
import signal
import socket
import struct
import subprocess
import sys
import threading

# ── Constants ──────────────────────────────────────────────────────────────

SOL_IP = 0  # socket.SOL_IP on Linux
SO_ORIGINAL_DST = 80
DEFAULT_PORT = 5280
BUF_SIZE = 65536
CONNECT_TIMEOUT = 10
RELAY_IDLE = 300

log = logging.getLogger("tproxy")


# ── Helpers ────────────────────────────────────────────────────────────────

def get_original_dst(sock):
    """Recover the pre-REDIRECT destination from a socket (IPv4 only)."""
    data = sock.getsockopt(SOL_IP, SO_ORIGINAL_DST, 16)
    # struct sockaddr_in: family(2) + port(2 BE) + addr(4) + zero(8)
    port = struct.unpack("!H", data[2:4])[0]
    ip = socket.inet_ntoa(data[4:8])
    return ip, port


def relay(a, b, timeout=RELAY_IDLE):
    """Shovel bytes between two sockets until one side closes or idle."""
    try:
        while True:
            r, _, _ = select.select([a, b], [], [], timeout)
            if not r:
                break
            for s in r:
                data = s.recv(BUF_SIZE)
                if not data:
                    return
                target = b if s is a else a
                target.sendall(data)
    except (OSError, ConnectionError):
        pass
    finally:
        for s in (a, b):
            try:
                s.close()
            except OSError:
                pass


def handle_client(cli, addr, proxy_dst):
    """Handle one redirected connection: recover dst → CONNECT → relay."""
    try:
        orig_ip, orig_port = get_original_dst(cli)
    except (OSError, struct.error) as exc:
        log.warning("No original dst from %s:%s — %s", addr[0], addr[1], exc)
        cli.close()
        return

    if not orig_ip:
        cli.close()
        return

    log.info("→ %s:%d  (from %s:%d)", orig_ip, orig_port, addr[0], addr[1])

    # Connect to Burp proxy
    try:
        burp = socket.create_connection(proxy_dst, timeout=CONNECT_TIMEOUT)
    except (OSError, ConnectionError) as exc:
        log.error("Cannot reach proxy %s:%d — %s", proxy_dst[0], proxy_dst[1], exc)
        cli.close()
        return

    # CONNECT handshake with Burp
    try:
        connect_req = (
            f"CONNECT {orig_ip}:{orig_port} HTTP/1.1\r\n"
            f"Host: {orig_ip}:{orig_port}\r\n"
            f"\r\n"
        )
        burp.sendall(connect_req.encode())

        resp = b""
        while b"\r\n\r\n" not in resp:
            chunk = burp.recv(4096)
            if not chunk:
                raise ConnectionError("Proxy closed during CONNECT response")
            resp += chunk

        status_line = resp.split(b"\r\n")[0].decode()
        if "200" not in status_line:
            log.error("Proxy rejected CONNECT: %s", status_line)
            burp.close()
            cli.close()
            return

        # Any trailing data after headers belongs to the tunnel
        extra = resp.split(b"\r\n\r\n", 1)[1]
        if extra:
            cli.sendall(extra)

    except (OSError, ConnectionError) as exc:
        log.error("CONNECT handshake failed: %s", exc)
        burp.close()
        cli.close()
        return

    log.info("  Tunnel up: %s:%d ↔ proxy", orig_ip, orig_port)
    relay(cli, burp)


# ── iptables Manager ──────────────────────────────────────────────────────

class Iptables:
    """Manages per-UID NAT REDIRECT rules (apply / remove / clean)."""

    def __init__(self, uid, port, proxy_ip=None, proxy_port=None, ipv6=False):
        self.uid = uid
        self.port = port
        self.proxy_ip = proxy_ip
        self.proxy_port = proxy_port
        self.ipv6 = ipv6
        self.cmd = "ip6tables" if ipv6 else "iptables"
        self._rules = []  # specs for deletion

    # ── Apply ──────────────────────────────────────────────────────

    def apply(self):
        """Insert NAT REDIRECT rules for the target UID.

        Rules are inserted at the top of OUTPUT chain via repeated
        ``-I OUTPUT 1``, so the last rule inserted ends up first:

            1. RETURN  loopback          (skip local traffic)
            2. RETURN  proxy IP:port     (prevent redirect loops)
            3. REDIRECT → tproxy port    (catch everything else)
        """
        loopback = "::1/128" if self.ipv6 else "127.0.0.0/8"

        # We insert REDIRECT first (it slides to the bottom) then the
        # RETURN rules on top of it, so the evaluation order is correct.
        specs = []

        # 1) REDIRECT everything (inserted first → ends up at bottom)
        specs.append(
            [self.cmd, "-t", "nat", "-I", "OUTPUT", "1",
             "-p", "tcp", "-m", "owner", "--uid-owner", str(self.uid),
             "-j", "REDIRECT", "--to-port", str(self.port)]
        )

        # 2) RETURN for proxy IP:port (if resolvable, prevents loops)
        if self.proxy_ip and self.proxy_ip not in ("0.0.0.0", "::"):
            port_spec = [self.cmd, "-t", "nat", "-I", "OUTPUT", "1",
                         "-p", "tcp", "-m", "owner",
                         "--uid-owner", str(self.uid),
                         "-d", self.proxy_ip,
                         "-j", "RETURN"]
            if self.proxy_port:
                port_spec += ["--dport", str(self.proxy_port)]
            specs.append(port_spec)

        # 3) RETURN for loopback (inserted last → ends up at top)
        specs.append(
            [self.cmd, "-t", "nat", "-I", "OUTPUT", "1",
             "-p", "tcp", "-m", "owner", "--uid-owner", str(self.uid),
             "-d", loopback, "-j", "RETURN"]
        )

        for spec in specs:
            self._exec(spec)
            self._rules.append(spec)

        log.info("iptables applied (uid=%d → :%d)", self.uid, self.port)

    # ── Remove ─────────────────────────────────────────────────────

    def remove(self):
        """Remove previously applied rules (reverse order)."""
        for spec in reversed(self._rules):
            self._exec(self._spec_to_delete(spec))
        self._rules.clear()
        log.info("iptables rules removed")

    # ── Clean (stale rule remover) ─────────────────────────────────

    def clean(self):
        """Force-remove any NAT REDIRECT rules for this UID.

        Uses ``iptables -S`` to list rules in parseable format, then
        deletes matching entries by converting ``-A`` → ``-D``.
        """
        rules = self._dump_output()
        removed = 0
        for rule in rules:
            if f"--uid-owner {self.uid}" not in rule:
                continue
            # Convert "-A OUTPUT ..." → "iptables -t nat -D OUTPUT ..."
            delete_spec = [self.cmd, "-t", "nat"]
            parts = rule.split()
            for p in parts:
                if p == "-A":
                    delete_spec.append("-D")
                else:
                    delete_spec.append(p)
            self._exec(delete_spec)
            removed += 1
        if removed:
            log.info("Cleaned %d stale rule(s) for uid=%d", removed, self.uid)
        else:
            log.info("No stale rules found for uid=%d", self.uid)

    # ── Internal ───────────────────────────────────────────────────

    def _dump_output(self):
        """Return the NAT OUTPUT chain rules via ``iptables -S`` (parseable)."""
        try:
            r = subprocess.run(
                [self.cmd, "-t", "nat", "-S", "OUTPUT"],
                capture_output=True, text=True, timeout=5,
            )
            if r.returncode != 0:
                return []
            return r.stdout.strip().splitlines()
        except Exception:
            return []

    @staticmethod
    def _spec_to_delete(spec):
        """Convert an -I spec to a -D spec (drop the position number).

        ``-I OUTPUT 1 -p tcp ...`` → ``-D OUTPUT -p tcp ...``
        """
        d = []
        i = 0
        while i < len(spec):
            if spec[i] == "-I":
                d.append("-D")
                d.append(spec[i + 1])   # chain name (OUTPUT)
                i += 3                   # skip -I, chain, position-number
                continue
            d.append(spec[i])
            i += 1
        return d

    @staticmethod
    def _exec(args):
        try:
            r = subprocess.run(args, capture_output=True, text=True, timeout=5)
            if r.returncode != 0:
                log.warning("iptables: %s  [%s]", r.stderr.strip(), " ".join(args))
        except FileNotFoundError:
            log.error("%s not found — is iptables installed?", args[0])
            raise
        except subprocess.TimeoutExpired:
            log.error("iptables timed out: %s", " ".join(args))
            raise


# ── TCP Server ────────────────────────────────────────────────────────────

def run_server(listen_port, proxy_host, proxy_port, stop_event):
    """Accept redirected connections and hand them to relay threads."""
    srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    srv.bind(("0.0.0.0", listen_port))
    srv.listen(128)
    srv.settimeout(1.0)
    log.info("Listening on :%d → proxy %s:%d", listen_port, proxy_host, proxy_port)

    counter = 0
    while not stop_event.is_set():
        try:
            cli, addr = srv.accept()
        except socket.timeout:
            continue
        except OSError:
            break

        counter += 1
        threading.Thread(
            target=handle_client,
            args=(cli, addr, (proxy_host, proxy_port)),
            name=f"relay-{counter}",
            daemon=True,
        ).start()

    try:
        srv.close()
    except OSError:
        pass


# ── CLI ───────────────────────────────────────────────────────────────────

def parse_proxy(proxy_str):
    """Parse host:port string, return (host, port)."""
    if ":" in proxy_str:
        host, p = proxy_str.rsplit(":", 1)
        return host, int(p)
    return proxy_str, 8080


def main():
    ap = argparse.ArgumentParser(
        description="Per-app transparent TCP proxy → Burp Suite (Android)",
    )
    ap.add_argument("uid", type=int, help="Target app UID (e.g. 10183)")
    ap.add_argument("proxy", nargs="?", help="Burp proxy address (host:port)")
    ap.add_argument("-p", "--port", type=int, default=DEFAULT_PORT,
                    help=f"Local tproxy listen port (default: {DEFAULT_PORT})")
    ap.add_argument("-6", "--ipv6", action="store_true",
                    help="Also add ip6tables rules")
    ap.add_argument("--clean", action="store_true",
                    help="Remove stale iptables rules for UID and exit")
    ap.add_argument("-v", "--verbose", action="store_true",
                    help="Debug-level logging")
    args = ap.parse_args()

    if not args.clean and not args.proxy:
        ap.error("proxy argument is required (unless using --clean)")

    logging.basicConfig(
        level=logging.DEBUG if args.verbose else logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
        datefmt="%H:%M:%S",
    )

    # --clean mode: just nuke stale rules and exit
    if args.clean:
        ip4 = Iptables(args.uid, 0)
        ip4.clean()
        if args.ipv6:
            Iptables(args.uid, 0, ipv6=True).clean()
        return

    proxy_host, proxy_port = parse_proxy(args.proxy)

    # Resolve proxy hostname → IP for iptables exclusion rule
    proxy_ip = None
    try:
        proxy_ip = socket.gethostbyname(proxy_host)
    except socket.gaierror:
        log.warning("Cannot resolve proxy host %s — skipping proxy exclusion", proxy_host)

    if os.geteuid() != 0:
        sys.exit("Error: root required. Run with su or sudo.")

    # Build iptables managers
    ip4 = Iptables(args.uid, args.port, proxy_ip, proxy_port)
    ip6 = (Iptables(args.uid, args.port, proxy_ip, proxy_port, ipv6=True)
           if args.ipv6 else None)

    stop = threading.Event()
    cleaned = False

    def cleanup():
        nonlocal cleaned
        if cleaned:
            return
        cleaned = True
        log.info("Cleaning up iptables rules…")
        ip4.remove()
        if ip6:
            ip6.remove()

    def on_signal(sig, frame):
        log.info("Received signal %d", sig)
        stop.set()

    signal.signal(signal.SIGINT, on_signal)
    signal.signal(signal.SIGTERM, on_signal)

    # Apply rules
    try:
        ip4.apply()
        if ip6:
            ip6.apply()
    except Exception as exc:
        sys.exit(f"Failed to apply iptables rules: {exc}")

    try:
        run_server(args.port, proxy_host, proxy_port, stop)
    except Exception as exc:
        log.error("Server error: %s", exc)
    finally:
        cleanup()
        log.info("Bye.")


if __name__ == "__main__":
    main()
```
Created: 2026-05-24 05:35:54, Updated: 2026-05-24 05:36:09, ID: 22b00a3a-f894-479f-9167-2f8cbe0b982a