reflex/reflex/utils/telemetry.py

215 lines
5.7 KiB
Python

"""Anonymous telemetry for Reflex."""
from __future__ import annotations
import asyncio
import dataclasses
import multiprocessing
import platform
import warnings
from contextlib import suppress
from reflex.config import environment
try:
from datetime import UTC, datetime
except ImportError:
from datetime import datetime
UTC = None
import httpx
import psutil
from reflex import constants
from reflex.utils import console
from reflex.utils.prerequisites import ensure_reflex_installation_id, get_project_hash
POSTHOG_API_URL: str = "https://app.posthog.com/capture/"
def get_os() -> str:
"""Get the operating system.
Returns:
The operating system.
"""
return platform.system()
def get_detailed_platform_str() -> str:
"""Get the detailed os/platform string.
Returns:
The platform string
"""
return platform.platform()
def get_python_version() -> str:
"""Get the Python version.
Returns:
The Python version.
"""
# Remove the "+" from the version string in case user is using a pre-release version.
return platform.python_version().rstrip("+")
def get_reflex_version() -> str:
"""Get the Reflex version.
Returns:
The Reflex version.
"""
return constants.Reflex.VERSION
def get_cpu_count() -> int:
"""Get the number of CPUs.
Returns:
The number of CPUs.
"""
return multiprocessing.cpu_count()
def get_memory() -> int:
"""Get the total memory in MB.
Returns:
The total memory in MB.
"""
try:
return psutil.virtual_memory().total >> 20
except ValueError: # needed to pass ubuntu test
return 0
def _raise_on_missing_project_hash() -> bool:
"""Check if an error should be raised when project hash is missing.
When running reflex with --backend-only, or doing database migration
operations, there is no requirement for a .web directory, so the reflex.json
file may not exist, and this should not be considered an error.
Returns:
False when compilation should be skipped (i.e. no .web directory is required).
Otherwise return True.
"""
return not environment.REFLEX_SKIP_COMPILE.get()
def _prepare_event(event: str, **kwargs) -> dict:
"""Prepare the event to be sent to the PostHog server.
Args:
event: The event name.
kwargs: Additional data to send with the event.
Returns:
The event data.
"""
from reflex.utils.prerequisites import get_cpu_info
installation_id = ensure_reflex_installation_id()
project_hash = get_project_hash(raise_on_fail=_raise_on_missing_project_hash())
if installation_id is None or project_hash is None:
console.debug(
f"Could not get installation_id or project_hash: {installation_id}, {project_hash}"
)
return {}
if UTC is None:
# for python 3.10
stamp = datetime.utcnow().isoformat()
else:
# for python 3.11 & 3.12
stamp = datetime.now(UTC).isoformat()
cpuinfo = get_cpu_info()
additional_keys = ["template", "context", "detail", "user_uuid"]
additional_fields = {
key: value for key in additional_keys if (value := kwargs.get(key)) is not None
}
return {
"api_key": "phc_JoMo0fOyi0GQAooY3UyO9k0hebGkMyFJrrCw1Gt5SGb",
"event": event,
"properties": {
"distinct_id": installation_id,
"distinct_app_id": project_hash,
"user_os": get_os(),
"user_os_detail": get_detailed_platform_str(),
"reflex_version": get_reflex_version(),
"python_version": get_python_version(),
"cpu_count": get_cpu_count(),
"memory": get_memory(),
"cpu_info": dataclasses.asdict(cpuinfo) if cpuinfo else {},
**additional_fields,
},
"timestamp": stamp,
}
def _send_event(event_data: dict) -> bool:
try:
httpx.post(POSTHOG_API_URL, json=event_data)
except Exception:
return False
else:
return True
def _send(event: str, telemetry_enabled: bool | None, **kwargs):
from reflex.config import get_config
# Get the telemetry_enabled from the config if it is not specified.
if telemetry_enabled is None:
telemetry_enabled = get_config().telemetry_enabled
# Return if telemetry is disabled.
if not telemetry_enabled:
return False
with suppress(Exception):
event_data = _prepare_event(event, **kwargs)
if not event_data:
return False
return _send_event(event_data)
def send(event: str, telemetry_enabled: bool | None = None, **kwargs):
"""Send anonymous telemetry for Reflex.
Args:
event: The event name.
telemetry_enabled: Whether to send the telemetry (If None, get from config).
kwargs: Additional data to send with the event.
"""
async def async_send(event: str, telemetry_enabled: bool | None, **kwargs):
return _send(event, telemetry_enabled, **kwargs)
try:
# Within an event loop context, send the event asynchronously.
asyncio.create_task(async_send(event, telemetry_enabled, **kwargs))
except RuntimeError:
# If there is no event loop, send the event synchronously.
warnings.filterwarnings("ignore", category=RuntimeWarning)
_send(event, telemetry_enabled, **kwargs)
def send_error(error: Exception, context: str):
"""Send an error event.
Args:
error: The error to send.
context: The context of the error (e.g. "frontend" or "backend")
Returns:
Whether the telemetry was sent successfully.
"""
return send("error", detail=type(error).__name__, context=context)