[SCRIPT] Proxy all Android app traffic to Burp Suite via TPROXY
Fully AI-generated, but works.
```
#!/usr/bin/env python3
"""
tproxy.py — Per-app transparent TCP proxy for Android → Burp Suite
Captures an Android app's TCP traffic via iptables NAT REDIRECT (per UID),
recovers the original destination with SO_ORIGINAL_DST, then forwards each
connection to Burp Suite's HTTP proxy using the CONNECT method.
Usage:
python tproxy.py <app_uid> <proxy_host:proxy_port> [-p <tproxy_port>]
python tproxy.py --clean <app_uid> # remove stale rules
Examples:
python tproxy.py 10183 192.168.1.100:8080
python tproxy.py 10183 192.168.1.100:8080 -p 5280 -v
python tproxy.py --clean 10183
Requires: root (su / sudo), iptables, Python 3.7+
"""
import argparse
import logging
import os
import select
import signal
import socket
import struct
import subprocess
import sys
import threading
# ── Constants ──────────────────────────────────────────────────────────────
SOL_IP = 0 # socket.SOL_IP on Linux
SO_ORIGINAL_DST = 80
DEFAULT_PORT = 5280
BUF_SIZE = 65536
CONNECT_TIMEOUT = 10
RELAY_IDLE = 300
log = logging.getLogger("tproxy")
# ── Helpers ────────────────────────────────────────────────────────────────
def get_original_dst(sock):
"""Recover the pre-REDIRECT destination from a socket (IPv4 only)."""
data = sock.getsockopt(SOL_IP, SO_ORIGINAL_DST, 16)
# struct sockaddr_in: family(2) + port(2 BE) + addr(4) + zero(8)
port = struct.unpack("!H", data[2:4])[0]
ip = socket.inet_ntoa(data[4:8])
return ip, port
def relay(a, b, timeout=RELAY_IDLE):
"""Shovel bytes between two sockets until one side closes or idle."""
try:
while True:
r, _, _ = select.select([a, b], [], [], timeout)
if not r:
break
for s in r:
data = s.recv(BUF_SIZE)
if not data:
return
target = b if s is a else a
target.sendall(data)
except (OSError, ConnectionError):
pass
finally:
for s in (a, b):
try:
s.close()
except OSError:
pass
def handle_client(cli, addr, proxy_dst):
"""Handle one redirected connection: recover dst → CONNECT → relay."""
try:
orig_ip, orig_port = get_original_dst(cli)
except (OSError, struct.error) as exc:
log.warning("No original dst from %s:%s — %s", addr[0], addr[1], exc)
cli.close()
return
if not orig_ip:
cli.close()
return
log.info("→ %s:%d (from %s:%d)", orig_ip, orig_port, addr[0], addr[1])
# Connect to Burp proxy
try:
burp = socket.create_connection(proxy_dst, timeout=CONNECT_TIMEOUT)
except (OSError, ConnectionError) as exc:
log.error("Cannot reach proxy %s:%d — %s", proxy_dst[0], proxy_dst[1], exc)
cli.close()
return
# CONNECT handshake with Burp
try:
connect_req = (
f"CONNECT {orig_ip}:{orig_port} HTTP/1.1\r\n"
f"Host: {orig_ip}:{orig_port}\r\n"
f"\r\n"
)
burp.sendall(connect_req.encode())
resp = b""
while b"\r\n\r\n" not in resp:
chunk = burp.recv(4096)
if not chunk:
raise ConnectionError("Proxy closed during CONNECT response")
resp += chunk
status_line = resp.split(b"\r\n")[0].decode()
if "200" not in status_line:
log.error("Proxy rejected CONNECT: %s", status_line)
burp.close()
cli.close()
return
# Any trailing data after headers belongs to the tunnel
extra = resp.split(b"\r\n\r\n", 1)[1]
if extra:
cli.sendall(extra)
except (OSError, ConnectionError) as exc:
log.error("CONNECT handshake failed: %s", exc)
burp.close()
cli.close()
return
log.info(" Tunnel up: %s:%d ↔ proxy", orig_ip, orig_port)
relay(cli, burp)
# ── iptables Manager ──────────────────────────────────────────────────────
class Iptables:
"""Manages per-UID NAT REDIRECT rules (apply / remove / clean)."""
def __init__(self, uid, port, proxy_ip=None, proxy_port=None, ipv6=False):
self.uid = uid
self.port = port
self.proxy_ip = proxy_ip
self.proxy_port = proxy_port
self.ipv6 = ipv6
self.cmd = "ip6tables" if ipv6 else "iptables"
self._rules = [] # specs for deletion
# ── Apply ──────────────────────────────────────────────────────
def apply(self):
"""Insert NAT REDIRECT rules for the target UID.
Rules are inserted at the top of OUTPUT chain via repeated
``-I OUTPUT 1``, so the last rule inserted ends up first:
1. RETURN loopback (skip local traffic)
2. RETURN proxy IP:port (prevent redirect loops)
3. REDIRECT → tproxy port (catch everything else)
"""
loopback = "::1/128" if self.ipv6 else "127.0.0.0/8"
# We insert REDIRECT first (it slides to the bottom) then the
# RETURN rules on top of it, so the evaluation order is correct.
specs = []
# 1) REDIRECT everything (inserted first → ends up at bottom)
specs.append(
[self.cmd, "-t", "nat", "-I", "OUTPUT", "1",
"-p", "tcp", "-m", "owner", "--uid-owner", str(self.uid),
"-j", "REDIRECT", "--to-port", str(self.port)]
)
# 2) RETURN for proxy IP:port (if resolvable, prevents loops)
if self.proxy_ip and self.proxy_ip not in ("0.0.0.0", "::"):
port_spec = [self.cmd, "-t", "nat", "-I", "OUTPUT", "1",
"-p", "tcp", "-m", "owner",
"--uid-owner", str(self.uid),
"-d", self.proxy_ip,
"-j", "RETURN"]
if self.proxy_port:
port_spec += ["--dport", str(self.proxy_port)]
specs.append(port_spec)
# 3) RETURN for loopback (inserted last → ends up at top)
specs.append(
[self.cmd, "-t", "nat", "-I", "OUTPUT", "1",
"-p", "tcp", "-m", "owner", "--uid-owner", str(self.uid),
"-d", loopback, "-j", "RETURN"]
)
for spec in specs:
self._exec(spec)
self._rules.append(spec)
log.info("iptables applied (uid=%d → :%d)", self.uid, self.port)
# ── Remove ─────────────────────────────────────────────────────
def remove(self):
"""Remove previously applied rules (reverse order)."""
for spec in reversed(self._rules):
self._exec(self._spec_to_delete(spec))
self._rules.clear()
log.info("iptables rules removed")
# ── Clean (stale rule remover) ─────────────────────────────────
def clean(self):
"""Force-remove any NAT REDIRECT rules for this UID.
Uses ``iptables -S`` to list rules in parseable format, then
deletes matching entries by converting ``-A`` → ``-D``.
"""
rules = self._dump_output()
removed = 0
for rule in rules:
if f"--uid-owner {self.uid}" not in rule:
continue
# Convert "-A OUTPUT ..." → "iptables -t nat -D OUTPUT ..."
delete_spec = [self.cmd, "-t", "nat"]
parts = rule.split()
for p in parts:
if p == "-A":
delete_spec.append("-D")
else:
delete_spec.append(p)
self._exec(delete_spec)
removed += 1
if removed:
log.info("Cleaned %d stale rule(s) for uid=%d", removed, self.uid)
else:
log.info("No stale rules found for uid=%d", self.uid)
# ── Internal ───────────────────────────────────────────────────
def _dump_output(self):
"""Return the NAT OUTPUT chain rules via ``iptables -S`` (parseable)."""
try:
r = subprocess.run(
[self.cmd, "-t", "nat", "-S", "OUTPUT"],
capture_output=True, text=True, timeout=5,
)
if r.returncode != 0:
return []
return r.stdout.strip().splitlines()
except Exception:
return []
@staticmethod
def _spec_to_delete(spec):
"""Convert an -I spec to a -D spec (drop the position number).
``-I OUTPUT 1 -p tcp ...`` → ``-D OUTPUT -p tcp ...``
"""
d = []
i = 0
while i < len(spec):
if spec[i] == "-I":
d.append("-D")
d.append(spec[i + 1]) # chain name (OUTPUT)
i += 3 # skip -I, chain, position-number
continue
d.append(spec[i])
i += 1
return d
@staticmethod
def _exec(args):
try:
r = subprocess.run(args, capture_output=True, text=True, timeout=5)
if r.returncode != 0:
log.warning("iptables: %s [%s]", r.stderr.strip(), " ".join(args))
except FileNotFoundError:
log.error("%s not found — is iptables installed?", args[0])
raise
except subprocess.TimeoutExpired:
log.error("iptables timed out: %s", " ".join(args))
raise
# ── TCP Server ────────────────────────────────────────────────────────────
def run_server(listen_port, proxy_host, proxy_port, stop_event):
"""Accept redirected connections and hand them to relay threads."""
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", listen_port))
srv.listen(128)
srv.settimeout(1.0)
log.info("Listening on :%d → proxy %s:%d", listen_port, proxy_host, proxy_port)
counter = 0
while not stop_event.is_set():
try:
cli, addr = srv.accept()
except socket.timeout:
continue
except OSError:
break
counter += 1
threading.Thread(
target=handle_client,
args=(cli, addr, (proxy_host, proxy_port)),
name=f"relay-{counter}",
daemon=True,
).start()
try:
srv.close()
except OSError:
pass
# ── CLI ───────────────────────────────────────────────────────────────────
def parse_proxy(proxy_str):
"""Parse host:port string, return (host, port)."""
if ":" in proxy_str:
host, p = proxy_str.rsplit(":", 1)
return host, int(p)
return proxy_str, 8080
def main():
ap = argparse.ArgumentParser(
description="Per-app transparent TCP proxy → Burp Suite (Android)",
)
ap.add_argument("uid", type=int, help="Target app UID (e.g. 10183)")
ap.add_argument("proxy", nargs="?", help="Burp proxy address (host:port)")
ap.add_argument("-p", "--port", type=int, default=DEFAULT_PORT,
help=f"Local tproxy listen port (default: {DEFAULT_PORT})")
ap.add_argument("-6", "--ipv6", action="store_true",
help="Also add ip6tables rules")
ap.add_argument("--clean", action="store_true",
help="Remove stale iptables rules for UID and exit")
ap.add_argument("-v", "--verbose", action="store_true",
help="Debug-level logging")
args = ap.parse_args()
if not args.clean and not args.proxy:
ap.error("proxy argument is required (unless using --clean)")
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
# --clean mode: just nuke stale rules and exit
if args.clean:
ip4 = Iptables(args.uid, 0)
ip4.clean()
if args.ipv6:
Iptables(args.uid, 0, ipv6=True).clean()
return
proxy_host, proxy_port = parse_proxy(args.proxy)
# Resolve proxy hostname → IP for iptables exclusion rule
proxy_ip = None
try:
proxy_ip = socket.gethostbyname(proxy_host)
except socket.gaierror:
log.warning("Cannot resolve proxy host %s — skipping proxy exclusion", proxy_host)
if os.geteuid() != 0:
sys.exit("Error: root required. Run with su or sudo.")
# Build iptables managers
ip4 = Iptables(args.uid, args.port, proxy_ip, proxy_port)
ip6 = (Iptables(args.uid, args.port, proxy_ip, proxy_port, ipv6=True)
if args.ipv6 else None)
stop = threading.Event()
cleaned = False
def cleanup():
nonlocal cleaned
if cleaned:
return
cleaned = True
log.info("Cleaning up iptables rules…")
ip4.remove()
if ip6:
ip6.remove()
def on_signal(sig, frame):
log.info("Received signal %d", sig)
stop.set()
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
# Apply rules
try:
ip4.apply()
if ip6:
ip6.apply()
except Exception as exc:
sys.exit(f"Failed to apply iptables rules: {exc}")
try:
run_server(args.port, proxy_host, proxy_port, stop)
except Exception as exc:
log.error("Server error: %s", exc)
finally:
cleanup()
log.info("Bye.")
if __name__ == "__main__":
main()
```
Created: 2026-05-24 05:35:54, Updated: 2026-05-24 05:36:09, ID: 22b00a3a-f894-479f-9167-2f8cbe0b982a