Source code for stamina.instrumentation._prometheus
# SPDX-FileCopyrightText: 2022 Hynek Schlawack <hs@ox.cx>## SPDX-License-Identifier: MITfrom__future__importannotationsfromtypingimportTYPE_CHECKINGfrom._dataimportRetryDetails,RetryHook,RetryHookFactory,guess_nameifTYPE_CHECKING:fromprometheus_clientimportCounterRETRIES_TOTAL=Nonedefinit_prometheus()->RetryHook:""" Initialize Prometheus instrumentation. """fromprometheus_clientimportCounterglobalRETRIES_TOTAL# noqa: PLW0603# Mostly for testing so we can call init_prometheus more than once.ifRETRIES_TOTALisNone:RETRIES_TOTAL=Counter("stamina_retries_total","Total number of retries.",("callable","retry_num","error_type"),)defcount_retries(details:RetryDetails)->None:""" Count and log retries for callable *name*. """RETRIES_TOTAL.labels(callable=details.name,retry_num=details.retry_num,error_type=guess_name(details.caused_by.__class__),).inc()returncount_retries
[docs]defget_prometheus_counter()->Counter|None:""" Return the Prometheus counter for the number of retries. Returns: If active, the Prometheus `counter <https://github.com/prometheus/client_python>`_ for the number of retries. None otherwise. .. versionadded:: 23.2.0 """from.importget_on_retry_hooks# Finalize the hooks if not done yet.get_on_retry_hooks()returnRETRIES_TOTAL