any路由器因为跟claude code有一些参数适配的问题,所以我们可以在本地架设一个简单的网关,将参数在本地拦截,然后修改一下,再传给any大善人,就可以绕过这些参数适配的小问题了。
claudecode最新版本适用,不需要回退版本
这个本地网关做了什么
- 在本地监听 127.0.0.1:1998。
- 把 Claude Code 的请求转发到上游any的端口。
- 自动补认证头(Authorization / x-api-key)。
- 对 haiku 请求额外修正:补 context-1m-2025-08-07,并加 thinking.budget_tokens=1024。
- 把请求和响应写到 gateway_requests.jsonl 方便排错。
极简启动步骤
- 先开网关
export ANTHROPIC_BASE_URL=“any大善人地址”
export ANTHROPIC_AUTH_TOKEN=“你的真实token”
python3 /Users/Apple/Desktop/code/claude_gateway.py
- 新开一个终端再开 Claude Code
ANTHROPIC_BASE_URL=“http://127.0.0.1:1998” claude --enable-auto-mode
截图为证:
网关代码如下(vibe写的,很多冗余,大佬可以自行修改):
#!/usr/bin/env python3
import base64
import json
import os
import threading
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.error import HTTPError, URLError
from urllib.parse import urlsplit, urlunsplit
from urllib.request import Request, urlopen
LISTEN_HOST = os.getenv("CLAUDE_GATEWAY_HOST", "127.0.0.1")
LISTEN_PORT = int(os.getenv("CLAUDE_GATEWAY_PORT", "1998"))
UPSTREAM_BASE_URL = os.getenv(
"ANTHROPIC_BASE_URL", "https://a-ocnfniawgw.cn-shanghai.fcapp.run"
)
UPSTREAM_AUTH_TOKEN = os.getenv("ANTHROPIC_AUTH_TOKEN", "")
UPSTREAM_TIMEOUT = float(os.getenv("CLAUDE_GATEWAY_TIMEOUT", "120"))
LOG_PATH = os.getenv(
"CLAUDE_GATEWAY_LOG", os.path.join(os.path.dirname(__file__), "gateway_requests.jsonl")
)
LOG_LOCK = threading.Lock()
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def ensure_log_parent_exists() -> None:
parent = os.path.dirname(LOG_PATH)
if parent:
os.makedirs(parent, exist_ok=True)
def decode_body_for_log(body: bytes) -> dict:
if not body:
return {"encoding": "utf-8", "text": ""}
try:
return {"encoding": "utf-8", "text": body.decode("utf-8")}
except UnicodeDecodeError:
return {"encoding": "base64", "text": base64.b64encode(body).decode("ascii")}
def append_log(record: dict) -> None:
ensure_log_parent_exists()
line = json.dumps(record, ensure_ascii=False)
with LOG_LOCK:
with open(LOG_PATH, "a", encoding="utf-8") as f:
f.write(line + "\n")
def build_upstream_url(base_url: str, incoming_path: str) -> str:
base = urlsplit(base_url)
incoming = urlsplit(incoming_path)
incoming_path_only = incoming.path or "/"
base_path = base.path.rstrip("/")
if base_path:
merged_path = f"{base_path}{incoming_path_only}"
else:
merged_path = incoming_path_only
return urlunsplit((base.scheme, base.netloc, merged_path, incoming.query, ""))
def rewrite_request_headers(headers: dict, path: str) -> dict:
rewritten = dict(headers)
if UPSTREAM_AUTH_TOKEN:
has_x_api_key = any(k.lower() == "x-api-key" for k in rewritten)
has_authorization = any(k.lower() == "authorization" for k in rewritten)
if not has_x_api_key:
rewritten["x-api-key"] = UPSTREAM_AUTH_TOKEN
if not has_authorization:
rewritten["Authorization"] = f"Bearer {UPSTREAM_AUTH_TOKEN}"
# 先做骨架,后续按 any 规则逐步覆写。
return rewritten
def rewrite_request_body(body: bytes, headers: dict, path: str) -> bytes:
if not body:
return body
content_type = ""
for k, v in headers.items():
if k.lower() == "content-type":
content_type = v
break
if "application/json" not in content_type.lower():
return body
try:
payload = json.loads(body.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
return body
model = str(payload.get("model", "")).lower()
if not model.startswith("claude-haiku"):
return body
beta_key = None
for k in headers.keys():
if k.lower() == "anthropic-beta":
beta_key = k
break
raw_beta = headers.get(beta_key, "") if beta_key else ""
beta_features = [item.strip() for item in raw_beta.split(",") if item.strip()]
if "context-1m-2025-08-07" not in beta_features:
beta_features.append("context-1m-2025-08-07")
merged_beta = ",".join(beta_features)
if beta_key:
headers[beta_key] = merged_beta
else:
headers["anthropic-beta"] = merged_beta
payload["thinking"] = {"type": "enabled", "budget_tokens": 1024}
return json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
class ClaudeGatewayHandler(BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"
def do_GET(self):
self._handle_proxy()
def do_POST(self):
self._handle_proxy()
def do_PUT(self):
self._handle_proxy()
def do_PATCH(self):
self._handle_proxy()
def do_DELETE(self):
self._handle_proxy()
def do_OPTIONS(self):
self._handle_proxy()
def do_HEAD(self):
self._handle_proxy()
def log_message(self, fmt, *args):
return
def _read_request_body(self) -> bytes:
content_length = int(self.headers.get("Content-Length", "0") or "0")
if content_length <= 0:
return b""
return self.rfile.read(content_length)
def _copy_request_headers(self) -> dict:
headers = {}
for key, value in self.headers.items():
key_l = key.lower()
if key_l in {"host", "content-length", "connection", "accept-encoding"}:
continue
headers[key] = value
return headers
def _send_response(self, status: int, headers: dict, body: bytes) -> None:
self.send_response(status)
ignored = {"transfer-encoding", "content-length", "connection"}
for k, v in headers.items():
if k.lower() in ignored:
continue
self.send_header(k, v)
self.send_header("Content-Length", str(len(body)))
self.send_header("Connection", "close")
self.end_headers()
if self.command != "HEAD" and body:
self.wfile.write(body)
def _handle_proxy(self):
req_body = self._read_request_body()
req_headers = self._copy_request_headers()
req_headers = rewrite_request_headers(req_headers, self.path)
req_body = rewrite_request_body(req_body, req_headers, self.path)
upstream_url = build_upstream_url(UPSTREAM_BASE_URL, self.path)
request_log = {
"timestamp": utc_now_iso(),
"client_ip": self.client_address[0] if self.client_address else "",
"method": self.command,
"path": self.path,
"upstream_url": upstream_url,
"headers": dict(self.headers.items()),
"body": decode_body_for_log(req_body),
"body_length": len(req_body),
}
try:
upstream_req = Request(
url=upstream_url,
data=req_body if req_body else None,
headers=req_headers,
method=self.command,
)
with urlopen(upstream_req, timeout=UPSTREAM_TIMEOUT) as resp:
resp_status = resp.getcode()
resp_headers = dict(resp.headers.items())
resp_body = resp.read()
request_log["response"] = {
"status": resp_status,
"headers": resp_headers,
"body": decode_body_for_log(resp_body),
"body_length": len(resp_body),
}
append_log(request_log)
self._send_response(resp_status, resp_headers, resp_body)
return
except HTTPError as e:
err_body = e.read() if hasattr(e, "read") else b""
err_headers = dict(e.headers.items()) if getattr(e, "headers", None) else {}
request_log["response"] = {
"status": e.code,
"headers": err_headers,
"body": decode_body_for_log(err_body),
"body_length": len(err_body),
}
append_log(request_log)
self._send_response(e.code, err_headers, err_body)
return
except (URLError, TimeoutError, Exception) as e:
error_payload = {
"error": "gateway_upstream_error",
"message": str(e),
}
error_body = json.dumps(error_payload, ensure_ascii=False).encode("utf-8")
request_log["response"] = {
"status": 502,
"headers": {"Content-Type": "application/json; charset=utf-8"},
"body": {"encoding": "utf-8", "text": error_body.decode("utf-8")},
"body_length": len(error_body),
}
append_log(request_log)
self._send_response(
502,
{"Content-Type": "application/json; charset=utf-8"},
error_body,
)
def main():
server = ThreadingHTTPServer((LISTEN_HOST, LISTEN_PORT), ClaudeGatewayHandler)
print(f"[gateway] listening on http://{LISTEN_HOST}:{LISTEN_PORT}")
print(f"[gateway] upstream: {UPSTREAM_BASE_URL}")
print(f"[gateway] auth token configured: {bool(UPSTREAM_AUTH_TOKEN)}")
print(f"[gateway] log file: {LOG_PATH}")
server.serve_forever()
if __name__ == "__main__":
main()
3 个帖子 - 2 位参与者