Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ data/

# Agent/planning docs
docs/
.pi/
288 changes: 288 additions & 0 deletions scripts/query_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
#!/usr/bin/env python3
"""Query Pydantic Logfire logs via SQL API.

Usage:
python scripts/query_logs.py errors --minutes 30
python scripts/query_logs.py warnings --limit 20
python scripts/query_logs.py slow --threshold 5000
python scripts/query_logs.py user --user-id 12345
python scripts/query_logs.py group --group-id -1001234567
python scripts/query_logs.py sql "SELECT * FROM records LIMIT 10"
"""

from __future__ import annotations

import argparse
import json
import os
import sys
from datetime import UTC, datetime, timedelta

import requests
from dotenv import load_dotenv

DEFAULT_API_URL = "https://logfire-us.pydantic.dev/v2/query"
EU_API_URL = "https://logfire-eu.pydantic.dev/v2/query"
DEFAULT_LIMIT = 50
DEFAULT_MINUTES = 30

QUERY_TEMPLATES: dict[str, str] = {
"errors": """\
SELECT start_timestamp, duration, message, trace_id, is_exception,
exception_message, attributes
FROM records
WHERE is_exception = true
ORDER BY start_timestamp DESC
LIMIT {limit}""",
"warnings": """\
SELECT start_timestamp, duration, message, trace_id, level, attributes
FROM records
WHERE level = 'warn'
ORDER BY start_timestamp DESC
LIMIT {limit}""",
"slow": """\
SELECT start_timestamp, duration, message, trace_id, attributes
FROM records
WHERE duration > {threshold}
ORDER BY duration DESC
LIMIT {limit}""",
"user": """\
SELECT start_timestamp, duration, message, trace_id, level, attributes
FROM records
WHERE attributes->>'user_id' = '{user_id}'
ORDER BY start_timestamp DESC
LIMIT {limit}""",
"group": """\
SELECT start_timestamp, duration, message, trace_id, level, attributes
FROM records
WHERE attributes->>'group_id' = '{group_id}'
ORDER BY start_timestamp DESC
LIMIT {limit}""",
}

def get_config() -> tuple[str, str]:
"""Read config from environment variables.

Loads .env file if present. Reads LOGFIRE_READ_TOKEN and LOGFIRE_API_URL.

Returns:
Tuple of (api_url, read_token).

Raises:
SystemExit: If LOGFIRE_READ_TOKEN is not set.
"""
token = os.environ.get("LOGFIRE_READ_TOKEN") or os.environ.get("LOGFIRE_TOKEN")
if not token:
print(
"Error: LOGFIRE_READ_TOKEN not set.\n"
"Create a read token at https://logfire.pydantic.dev → Project Settings → Read Tokens\n"
"Then add LOGFIRE_READ_TOKEN=your_token_here to .env",
file=sys.stderr,
)
sys.exit(1)

api_url = os.environ.get("LOGFIRE_API_URL")
if not api_url:
api_url = EU_API_URL if token.startswith("pylf_v1_eu") else DEFAULT_API_URL
return api_url, token

def query_logfire(api_url: str, token: str, sql: str, min_timestamp: str | None = None) -> list[dict]:
"""Execute a SQL query against Logfire API.

Args:
api_url: Logfire query endpoint URL.
token: Read token for authentication.
sql: SQL query to execute.
min_timestamp: ISO format timestamp for time range filter.

Returns:
List of record dicts.

Raises:
SystemExit: On API error.
"""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload: dict[str, str | dict] = {"sql": sql}
payload["min_timestamp"] = min_timestamp or "2020-01-01T00:00:00+00:00"

try:
response = requests.post(api_url, headers=headers, json=payload, timeout=30)
except requests.RequestException as e:
print(f"Network error: {e}", file=sys.stderr)
sys.exit(1)

if response.status_code != 200:
print(f"API error {response.status_code}: {response.text}", file=sys.stderr)
sys.exit(1)

try:
data = response.json()
except Exception:
print(f"API returned non-JSON response: {response.text[:500]}", file=sys.stderr)
sys.exit(1)
if isinstance(data, dict) and "data" in data:
return data["data"]
if isinstance(data, list):
return data
return []

def format_text(records: list[dict]) -> str:
"""Format records as human-readable text.

Args:
records: List of record dicts from API.

Returns:
Formatted text string.
"""
if not records:
return "No records found."

lines: list[str] = []
for r in records:
ts = r.get("start_timestamp", "unknown")
if isinstance(ts, str) and len(ts) > 19:
ts = ts[:19]
duration = r.get("duration", "")
if duration and isinstance(duration, (int, float)):
duration = f"{duration:.0f}ms"
level = r.get("level", "")
trace_id = r.get("trace_id", "")
message = r.get("message", "")
is_exception = r.get("is_exception", False)
exception_message = r.get("exception_message", "")

prefix = "ERROR" if is_exception else (level.upper() if level else "INFO")
duration_str = f" ({duration})" if duration else ""
trace_str = f" trace:{trace_id[:8]}" if trace_id else ""

lines.append(f"[{ts}] {prefix}{duration_str}{trace_str}")
if message:
lines.append(f" {message}")
if exception_message:
lines.append(f" Exception: {exception_message}")
lines.append("")

return "\n".join(lines).rstrip()

def build_query(command: str, args: argparse.Namespace) -> tuple[str, str | None]:
"""Build SQL query and min_timestamp from command and arguments.

Args:
command: Query command name (errors, warnings, slow, user, group, sql).
args: Parsed CLI arguments.

Returns:
Tuple of (sql_query, min_timestamp_iso).
"""
min_ts = (datetime.now(tz=UTC) - timedelta(minutes=args.minutes)).isoformat()

if command == "sql":
sql = args.query
if "limit" not in sql.lower():
sql = f"{sql.rstrip(';')} LIMIT {args.limit}"
return sql, None

template = QUERY_TEMPLATES[command]
params: dict[str, int | str] = {
"limit": args.limit,
}

if command == "slow":
params["threshold"] = args.threshold
elif command == "user":
params["user_id"] = args.user_id
elif command == "group":
params["group_id"] = args.group_id

return template.format(**params), min_ts

def _add_common_args(parser: argparse.ArgumentParser) -> None:
"""Add common flags (--json, --limit, --minutes) to a parser."""
parser.add_argument(
"--json",
action="store_true",
dest="json_output",
help="Output as JSON instead of text",
)
parser.add_argument(
"--limit",
type=int,
default=DEFAULT_LIMIT,
help=f"Max results (default: {DEFAULT_LIMIT})",
)
parser.add_argument(
"--minutes",
type=int,
default=DEFAULT_MINUTES,
help=f"Time window in minutes (default: {DEFAULT_MINUTES})",
)


def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"""Parse CLI arguments.

Args:
argv: Argument list (defaults to sys.argv[1:]).

Returns:
Parsed namespace.
"""
parser = argparse.ArgumentParser(
description="Query Pydantic Logfire logs via SQL API",
)

subparsers = parser.add_subparsers(dest="command", required=True)

errors_p = subparsers.add_parser("errors", help="Error/exception logs")
_add_common_args(errors_p)

warnings_p = subparsers.add_parser("warnings", help="Warning-level logs")
_add_common_args(warnings_p)

slow_parser = subparsers.add_parser("slow", help="Slow spans")
_add_common_args(slow_parser)
slow_parser.add_argument(
"--threshold",
type=int,
default=5000,
help="Duration threshold in ms (default: 5000)",
)

user_parser = subparsers.add_parser("user", help="Activity by user ID")
_add_common_args(user_parser)
user_parser.add_argument("--user-id", required=True, type=int, help="Telegram user ID")

group_parser = subparsers.add_parser("group", help="Activity by group ID")
_add_common_args(group_parser)
group_parser.add_argument("--group-id", required=True, type=int, help="Telegram group ID")

sql_parser = subparsers.add_parser("sql", help="Free-form SQL query")
_add_common_args(sql_parser)
sql_parser.add_argument("query", help="SQL query to execute")

return parser.parse_args(argv)

def main(argv: list[str] | None = None) -> None:
"""Main entry point.

Args:
argv: Argument list (defaults to sys.argv[1:]).
"""
load_dotenv()
args = parse_args(argv)
api_url, token = get_config()
sql, min_ts = build_query(args.command, args)
records = query_logfire(api_url, token, sql, min_ts)

if args.json_output:
print(json.dumps(records, indent=2, default=str))
else:
print(format_text(records))

if __name__ == "__main__":
main()
Loading
Loading