2026-03-19 23:18:12 +00:00

288 lines
10 KiB
Python

# Copyright (c) 2023 ETH Zurich.
# All rights reserved.
#
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import json
import logging
import os
import random
import time
from typing import Any, Dict, List, Optional, Union
import yaml
from openai import APIStatusError, OpenAI
from openai.types.chat.chat_completion import ChatCompletion
from .abstract_language_model import AbstractLanguageModel
class OpenRouterError(Exception):
"""Base error for OpenRouter integration."""
class OpenRouterBadRequestError(OpenRouterError):
"""Raised when OpenRouter returns HTTP 400 after retries."""
class OpenRouterRateLimitError(OpenRouterError):
"""Raised when OpenRouter returns HTTP 429 after retries."""
def load_openrouter_config(path: str) -> Dict[str, Any]:
"""Load a YAML or JSON OpenRouter configuration file."""
return _load_config_file(path)
def _load_config_file(path: str) -> Dict[str, Any]:
ext = os.path.splitext(path)[1].lower()
with open(path, "r", encoding="utf-8") as f:
if ext in (".yaml", ".yml"):
data = yaml.safe_load(f)
else:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError(f"Config at {path} must be a JSON/YAML object")
return data
class OpenRouter(AbstractLanguageModel):
"""
OpenRouter-backed language model with per-request rotation of API keys and models.
Configuration is loaded from YAML or JSON (see ``config.openrouter.example.yaml``).
"""
def __init__(
self,
config_path: str = "",
model_name: str = "openrouter",
cache: bool = False,
) -> None:
self._rotation_model_name = model_name
self._request_overrides: Dict[str, Any] = {}
super().__init__(config_path, model_name, cache)
self._apply_openrouter_config()
def load_config(self, path: str) -> None:
if path == "":
path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"config.openrouter.yaml",
)
self.config_path = path
self.config = _load_config_file(path)
self.logger.debug("Loaded OpenRouter config from %s", path)
def _apply_openrouter_config(self) -> None:
cfg = self.config
self.base_url: str = cfg.get("base_url", "https://openrouter.ai/api/v1")
keys = cfg.get("api_keys") or []
if isinstance(keys, str):
keys = [keys]
self.api_keys: List[str] = [k for k in keys if k]
if not self.api_keys:
raise ValueError("OpenRouter config must define non-empty 'api_keys'")
models = cfg.get("models") or []
if isinstance(models, str):
models = [models]
self.models: List[str] = [m for m in models if m]
if not self.models:
raise ValueError("OpenRouter config must define non-empty 'models'")
self.temperature: float = float(cfg.get("temperature", 1.0))
self.max_tokens: int = int(cfg.get("max_tokens", 4096))
self.stop: Union[str, List[str], None] = cfg.get("stop")
self.prompt_token_cost: float = float(cfg.get("prompt_token_cost", 0.0))
self.response_token_cost: float = float(cfg.get("response_token_cost", 0.0))
self.max_retries_429: int = int(cfg.get("max_retries_429", 8))
self.max_retries_400: int = int(cfg.get("max_retries_400", 3))
self.base_backoff_seconds: float = float(cfg.get("base_backoff_seconds", 1.0))
self.http_referer: str = cfg.get("http_referer", "") or os.getenv(
"OPENROUTER_HTTP_REFERER", ""
)
self.x_title: str = cfg.get("x_title", "") or os.getenv("OPENROUTER_X_TITLE", "")
self.model_name = self._rotation_model_name
self.last_model_id: Optional[str] = None
self.generation_model_id: Optional[str] = None
def set_request_overrides(self, **kwargs: Any) -> None:
"""Optional per-request parameters (used by the HTTP API). Cleared with :meth:`clear_request_overrides`."""
self._request_overrides = {k: v for k, v in kwargs.items() if v is not None}
def clear_request_overrides(self) -> None:
self._request_overrides = {}
def _pick_key(self) -> str:
return random.choice(self.api_keys)
def _pick_model(self, override: Optional[str]) -> str:
if override:
return override
o = self._request_overrides.get("model")
if o:
return str(o)
return random.choice(self.models)
def _effective_temperature(self) -> float:
t = self._request_overrides.get("temperature")
return float(t) if t is not None else self.temperature
def _effective_max_tokens(self) -> int:
m = self._request_overrides.get("max_tokens")
return int(m) if m is not None else self.max_tokens
def _client_for_key(self, api_key: str) -> OpenAI:
headers: Dict[str, str] = {}
if self.http_referer:
headers["HTTP-Referer"] = self.http_referer
if self.x_title:
headers["X-Title"] = self.x_title
return OpenAI(
base_url=self.base_url,
api_key=api_key,
default_headers=headers or None,
)
def _sleep_backoff(self, attempt: int) -> None:
cap = 60.0
delay = min(
self.base_backoff_seconds * (2**attempt) + random.random(),
cap,
)
self.logger.warning("Backing off %.2fs (attempt %d)", delay, attempt + 1)
time.sleep(delay)
def chat(
self,
messages: List[Dict[str, str]],
num_responses: int = 1,
model_override: Optional[str] = None,
) -> ChatCompletion:
"""
Call OpenRouter chat completions with rotation and retries for 429/400.
"""
attempts_429 = 0
attempts_400 = 0
attempt = 0
last_exc: Optional[Exception] = None
while True:
api_key = self._pick_key()
model_id = self._pick_model(model_override)
client = self._client_for_key(api_key)
try:
response = client.chat.completions.create(
model=model_id,
messages=messages,
temperature=self._effective_temperature(),
max_tokens=self._effective_max_tokens(),
n=num_responses,
stop=self.stop,
)
if response.usage is not None:
self.prompt_tokens += response.usage.prompt_tokens or 0
self.completion_tokens += response.usage.completion_tokens or 0
pt_k = float(self.prompt_tokens) / 1000.0
ct_k = float(self.completion_tokens) / 1000.0
self.cost = (
self.prompt_token_cost * pt_k
+ self.response_token_cost * ct_k
)
self.last_model_id = model_id
if self.generation_model_id is None:
self.generation_model_id = model_id
self.logger.info(
"OpenRouter response model=%s id=%s", model_id, response.id
)
return response
except APIStatusError as e:
last_exc = e
code = e.status_code
if code == 429:
if attempts_429 >= self.max_retries_429:
raise OpenRouterRateLimitError(
f"OpenRouter rate limited after {attempts_429} retries: {e.message}"
) from e
attempts_429 += 1
self._sleep_backoff(attempt)
attempt += 1
continue
if code == 400:
self.logger.warning(
"OpenRouter HTTP 400 (will retry with rotated key/model if allowed): %s body=%s",
e.message,
e.body,
)
if attempts_400 >= self.max_retries_400:
raise OpenRouterBadRequestError(
f"OpenRouter bad request after {attempts_400} retries: {e.message}"
) from e
attempts_400 += 1
attempt += 1
time.sleep(random.uniform(0.2, 0.8))
continue
raise
except Exception:
self.logger.exception("Unexpected error calling OpenRouter")
raise
def query(
self, query: str, num_responses: int = 1
) -> Union[List[ChatCompletion], ChatCompletion]:
if self.cache and query in self.response_cache:
return self.response_cache[query]
messages = [{"role": "user", "content": query}]
model_ov = self._request_overrides.get("model")
model_override = str(model_ov) if model_ov else None
if num_responses == 1:
response = self.chat(messages, 1, model_override=model_override)
else:
response = []
next_try = num_responses
total_num_attempts = num_responses
remaining = num_responses
while remaining > 0 and total_num_attempts > 0:
try:
assert next_try > 0
res = self.chat(
messages, next_try, model_override=model_override
)
response.append(res)
remaining -= next_try
next_try = min(remaining, next_try)
except Exception as e:
next_try = max(1, (next_try + 1) // 2)
self.logger.warning(
"Error in OpenRouter query: %s, retrying with n=%s",
e,
next_try,
)
time.sleep(random.uniform(0.5, 2.0))
total_num_attempts -= 1
if self.cache:
self.response_cache[query] = response
return response
def get_response_texts(
self, query_response: Union[List[ChatCompletion], ChatCompletion]
) -> List[str]:
if not isinstance(query_response, list):
query_response = [query_response]
texts: List[str] = []
for response in query_response:
for choice in response.choices:
c = choice.message.content
texts.append(c if c is not None else "")
return texts