From 8ce2483de4132a655c136f96a542db7a882922cb Mon Sep 17 00:00:00 2001 From: "jiangqi.rrt" Date: Mon, 4 Aug 2025 21:16:28 +0800 Subject: [PATCH] support FinishEventProcessor and some Span Set Method --- cozeloop/_client.py | 63 +++++++++- cozeloop/internal/consts/__init__.py | 1 + cozeloop/internal/httpclient/client.py | 4 +- cozeloop/internal/trace/__init__.py | 1 - cozeloop/internal/trace/exporter.py | 52 ++++---- cozeloop/internal/trace/model/model.py | 46 ++++++- cozeloop/internal/trace/noop_span.py | 12 ++ cozeloop/internal/trace/queue_manager.py | 55 ++++++--- cozeloop/internal/trace/span.py | 70 +++++++++-- cozeloop/internal/trace/span_processor.py | 144 ++++++++++++++++++---- cozeloop/internal/trace/trace.py | 46 ++++++- cozeloop/internal/version.py | 2 +- cozeloop/span.py | 25 ++++ cozeloop/spec/tracespec/runtime.py | 6 +- pyproject.toml | 2 +- 15 files changed, 443 insertions(+), 86 deletions(-) diff --git a/cozeloop/_client.py b/cozeloop/_client.py index ec8a3fc..be03abb 100644 --- a/cozeloop/_client.py +++ b/cozeloop/_client.py @@ -7,7 +7,9 @@ import os import threading from datetime import datetime -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List, Optional, Callable + +import httpx from cozeloop.client import Client from cozeloop._noop import NOOP_SPAN, _NoopClient @@ -17,6 +19,8 @@ from cozeloop.internal.httpclient import Auth from cozeloop.internal.prompt import PromptProvider from cozeloop.internal.trace import TraceProvider +from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf, QueueConf +from cozeloop.internal.trace.trace import default_finish_event_processor from cozeloop.span import SpanContext, Span logger = logging.getLogger(__name__) @@ -35,6 +39,15 @@ _default_client = None _client_lock = threading.Lock() +class APIBasePath: + def __init__( + self, + trace_span_upload_path: str = None, + trace_file_upload_path: str = None, + ): + self.trace_span_upload_path = trace_span_upload_path + self.trace_file_upload_path = trace_file_upload_path + def _generate_cache_key(*args) -> str: key_str = "\t".join(str(arg) for arg in args) @@ -54,8 +67,13 @@ def new_client( prompt_cache_max_count: int = consts.DEFAULT_PROMPT_CACHE_MAX_COUNT, prompt_cache_refresh_interval: int = consts.DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL, prompt_trace: bool = False, + http_client: Optional[httpx.Client] = None, + trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None, + tag_truncate_conf: Optional[TagTruncateConf] = None, + api_base_path: Optional[APIBasePath] = None, + trace_queue_conf: Optional[QueueConf] = None, ) -> Client: - cache_key = _generate_cache_key( + cache_key = _generate_cache_key( # all args are used to generate cache key api_base_url, workspace_id, api_token, @@ -67,7 +85,12 @@ def new_client( ultra_large_report, prompt_cache_max_count, prompt_cache_refresh_interval, - prompt_trace + prompt_trace, + http_client, + trace_finish_event_processor, + tag_truncate_conf, + api_base_path, + trace_queue_conf, ) with _cache_lock: @@ -88,6 +111,11 @@ def new_client( prompt_cache_max_count=prompt_cache_max_count, prompt_cache_refresh_interval=prompt_cache_refresh_interval, prompt_trace=prompt_trace, + arg_http_client=http_client, + trace_finish_event_processor=trace_finish_event_processor, + tag_truncate_conf=tag_truncate_conf, + api_base_path=api_base_path, + trace_queue_conf=trace_queue_conf, ) _client_cache[cache_key] = client return client @@ -113,7 +141,12 @@ def __init__( ultra_large_report: bool = False, prompt_cache_max_count: int = consts.DEFAULT_PROMPT_CACHE_MAX_COUNT, prompt_cache_refresh_interval: int = consts.DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL, - prompt_trace: bool = False + prompt_trace: bool = False, + arg_http_client: Optional[httpx.Client] = None, + trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None, + tag_truncate_conf: Optional[TagTruncateConf] = None, + api_base_path: Optional[APIBasePath] = None, + trace_queue_conf: Optional[QueueConf] = None, ): workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID) api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL) @@ -136,6 +169,8 @@ def __init__( self._workspace_id = workspace_id inner_client = httpclient.HTTPClient() + if arg_http_client: + inner_client = arg_http_client auth = self._build_auth( api_base_url=api_base_url, http_client=inner_client, @@ -151,10 +186,26 @@ def __init__( timeout=timeout, upload_timeout=upload_timeout ) + finish_pro = default_finish_event_processor + if trace_finish_event_processor: + def combined_processor(event_info: FinishEventInfo): + default_finish_event_processor(event_info) + trace_finish_event_processor(event_info) + finish_pro = combined_processor + span_upload_path = None + file_upload_path = None + if api_base_path: + span_upload_path = api_base_path.trace_span_upload_path + file_upload_path = api_base_path.trace_file_upload_path self._trace_provider = TraceProvider( http_client=http_client, workspace_id=workspace_id, - ultra_large_report=ultra_large_report + ultra_large_report=ultra_large_report, + finish_event_processor=finish_pro, + tag_truncate_conf=tag_truncate_conf, + span_upload_path=span_upload_path, + file_upload_path=file_upload_path, + queue_conf=trace_queue_conf, ) self._prompt_provider = PromptProvider( workspace_id=workspace_id, @@ -234,7 +285,7 @@ def start_span( else: return self._trace_provider.start_span(name=name, span_type=span_type, start_time=start_time, parent_span_id=child_of.span_id, trace_id=child_of.trace_id, - baggage=child_of.baggage, start_new_trace=start_new_trace) + baggage=child_of.baggage(), start_new_trace=start_new_trace) except Exception as e: logger.warning(f"Start span failed, returning noop span. Error: {e}") return NOOP_SPAN diff --git a/cozeloop/internal/consts/__init__.py b/cozeloop/internal/consts/__init__.py index f33b111..44ea376 100644 --- a/cozeloop/internal/consts/__init__.py +++ b/cozeloop/internal/consts/__init__.py @@ -39,6 +39,7 @@ THREAD_ID = "thread_id" START_TIME_FIRST_RESP = "start_time_first_resp" LATENCY_FIRST_RESP = "latency_first_resp" +DEPLOYMENT_ENV = "deployment_env" CUT_OFF = "cut_off" # ReserveFieldTypes Define the allowed types for each reserved field. diff --git a/cozeloop/internal/httpclient/client.py b/cozeloop/internal/httpclient/client.py index b52035d..003693f 100644 --- a/cozeloop/internal/httpclient/client.py +++ b/cozeloop/internal/httpclient/client.py @@ -43,10 +43,10 @@ def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, st res.update(headers) res[consts.AUTHORIZE_HEADER] = f"Bearer {self.auth.token}" - tt_env = os.getenv("x-tt-env") + tt_env = os.getenv("x_tt_env") if tt_env: res["x-tt-env"] = tt_env - ppe_env = os.getenv("x-use-ppe") + ppe_env = os.getenv("x_use_ppe") if ppe_env: res["x-use-ppe"] = "1" diff --git a/cozeloop/internal/trace/__init__.py b/cozeloop/internal/trace/__init__.py index 4b07f0e..aec0090 100644 --- a/cozeloop/internal/trace/__init__.py +++ b/cozeloop/internal/trace/__init__.py @@ -6,4 +6,3 @@ from .trace import TraceProvider from .trace import Span - diff --git a/cozeloop/internal/trace/exporter.py b/cozeloop/internal/trace/exporter.py index 738aff7..e9578e0 100644 --- a/cozeloop/internal/trace/exporter.py +++ b/cozeloop/internal/trace/exporter.py @@ -18,10 +18,10 @@ logger = logging.getLogger(__name__) class Exporter: - def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool: + def export_spans(self, ctx: dict, spans: List['UploadSpan']): raise NotImplementedError - def export_files(self, ctx: dict, files: List['UploadFile']) -> bool: + def export_files(self, ctx: dict, files: List['UploadFile']): raise NotImplementedError @@ -35,12 +35,26 @@ def export_files(self, ctx: dict, files: List['UploadFile']) -> bool: PATH_INGEST_TRACE = "/v1/loop/traces/ingest" PATH_UPLOAD_FILE = "/v1/loop/files/upload" +class UploadPath: + def __init__( + self, + span_upload_path: str, + file_upload_path: str, + ): + self.span_upload_path = span_upload_path + self.file_upload_path = file_upload_path + class SpanExporter(Exporter): - def __init__(self, client: Client): + def __init__( + self, + client: Client, + upload_path: UploadPath, + ): self.client = client + self.upload_path = upload_path - def export_files(self, ctx: dict, files: List['UploadFile']) -> bool: + def export_files(self, ctx: dict, files: List['UploadFile']): for file in files: if not file: continue @@ -48,42 +62,34 @@ def export_files(self, ctx: dict, files: List['UploadFile']) -> bool: logger.debug(f"uploadFile start, file name: {file.name}") try: resp = self.client.upload_file( - PATH_UPLOAD_FILE, + self.upload_path.file_upload_path, BaseResponse, file.data, file.tos_key, {"workspace_id": file.space_id}, ) if resp.code != 0: # todo: some err code do not need retry - logger.error(f"export files[{file.tos_key}] fail, code:[{resp.code}], msg:[{resp.msg}]") - return False + raise Exception(f"code:[{resp.code}], msg:[{resp.msg}]") except Exception as e: - logger.error(f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]") - return False + raise Exception(f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]") logger.debug(f"uploadFile end, file name: {file.name}") - return True - def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool: - logger.debug(f"export spans, spans count: {len(spans)}") - if not spans: - return True + def export_spans(self, ctx: dict, spans: List['UploadSpan']): + if not spans or len(spans) == 0: + return try: resp = self.client.post( - PATH_INGEST_TRACE, + self.upload_path.span_upload_path, BaseResponse, UploadSpanData(spans=spans), ) if resp.code != 0: # todo: some err code do not need retry - logger.error(f"export spans fail, code:[{resp.code}], msg:[{resp.msg}]") - return False + raise Exception(f"code:[{resp.code}], msg:[{resp.msg}]") except Exception as e: - logger.error(f"export spans fail, err:[{e}]") - return False - - return True + raise Exception(f"export spans fail, err:[{e}]") class UploadSpanData(BaseModel): @@ -92,10 +98,12 @@ class UploadSpanData(BaseModel): class UploadSpan(BaseModel): started_at_micros: int + log_id: str span_id: str parent_id: str trace_id: str duration_micros: int + service_name: str workspace_id: str span_name: str span_type: str @@ -137,10 +145,12 @@ def transfer_to_upload_span_and_file(spans: List['Span']) -> (List[UploadSpan], res_span.append(UploadSpan( started_at_micros=int(span.start_time.timestamp() * 1_000_000), + log_id=span.log_id, span_id=span.span_id, parent_id=span.parent_span_id, trace_id=span.trace_id, duration_micros=span.get_duration(), + service_name=span.service_name, workspace_id=span.get_space_id(), span_name=span.get_span_name(), span_type=span.get_span_type(), diff --git a/cozeloop/internal/trace/model/model.py b/cozeloop/internal/trace/model/model.py index 712d92d..3497d96 100644 --- a/cozeloop/internal/trace/model/model.py +++ b/cozeloop/internal/trace/model/model.py @@ -4,7 +4,8 @@ from enum import Enum from pydantic import BaseModel -from typing import List, Optional +from typing import List, Optional, Literal +from pydantic.dataclasses import dataclass class ObjectStorage(BaseModel): @@ -23,3 +24,46 @@ class Attachment(BaseModel): class UploadType(str, Enum): LONG = 1 MULTI_MODALITY = 2 + + +SpanFinishEvent = Literal[ + "queue_manager.span_entry.rate", + "queue_manager.file_entry.rate", + "exporter.span_flush.rate", + "exporter.file_flush.rate" +] + + +@dataclass +class FinishEventInfoExtra: + is_root_span: bool = False + latency_ms: int = 0 + + +@dataclass +class FinishEventInfo: + event_type: SpanFinishEvent + is_event_fail: bool + item_num: int # maybe multiple span is processed in one event + detail_msg: str + extra_params: Optional[FinishEventInfoExtra] = None + + +class TagTruncateConf: + def __init__( + self, + normal_field_max_byte: int, + input_output_field_max_byte: int, + ): + self.normal_field_max_byte = normal_field_max_byte + self.input_output_field_max_byte = input_output_field_max_byte + + +class QueueConf: + def __init__( + self, + span_queue_length: int, + span_max_export_batch_length: int, + ): + self.span_queue_length = span_queue_length + self.span_max_export_batch_length = span_max_export_batch_length diff --git a/cozeloop/internal/trace/noop_span.py b/cozeloop/internal/trace/noop_span.py index e06a3d2..0ef89e5 100644 --- a/cozeloop/internal/trace/noop_span.py +++ b/cozeloop/internal/trace/noop_span.py @@ -110,6 +110,18 @@ def set_start_time_first_resp(self, start_time_first_resp: int) -> None: def set_runtime(self, runtime: Runtime) -> None: pass + def set_service_name(self, service_name: str) -> None: + pass + + def set_log_id(self, log_id: str) -> None: + pass + + def set_system_tags(self, system_tags: Dict[str, Any]) -> None: + pass + + def set_deployment_env(self, deployment_env: str) -> None: + pass + def __enter__(self): return self diff --git a/cozeloop/internal/trace/queue_manager.py b/cozeloop/internal/trace/queue_manager.py index 90d728f..cc93e8c 100644 --- a/cozeloop/internal/trace/queue_manager.py +++ b/cozeloop/internal/trace/queue_manager.py @@ -12,13 +12,20 @@ import logging import queue import threading -from typing import Any, Callable, List +from typing import Any, Callable, List, Optional from queue import Queue from pydantic import BaseModel +from cozeloop.internal.trace.model.model import FinishEventInfo, FinishEventInfoExtra + logger = logging.getLogger(__name__) +QUEUE_NAME_SPAN = "span" +QUEUE_NAME_SPAN_RETRY = "span_retry" +QUEUE_NAME_FILE = "file" +QUEUE_NAME_FILE_RETRY = "file_retry" + class QueueManager: def enqueue(self, s: Any, byte_size: int): raise NotImplementedError @@ -37,6 +44,7 @@ class BatchQueueManagerOptions(BaseModel): max_export_batch_length: int max_export_batch_byte_size: int export_func: Callable[[dict, List[Any]], None] + finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None class BatchQueueManager(QueueManager): @@ -47,6 +55,7 @@ def __init__(self, options: BatchQueueManagerOptions): self.batch = [] self.batch_byte_size = 0 self.batch_lock = threading.Lock() + self.size_lock = threading.Lock() self.export_func = options.export_func self.stop_event = threading.Event() @@ -80,8 +89,9 @@ def worker(self): def is_should_export(self) -> bool: if len(self.batch) >= self.options.max_export_batch_length: return True - if self.batch_byte_size >= self.options.max_export_batch_byte_size: - return True + with self.size_lock: + if self.batch_byte_size >= self.options.max_export_batch_byte_size: + return True return False def _drain_queue(self): @@ -99,46 +109,63 @@ def _drain_queue(self): self._do_export_batch() def _do_export(self): - logger.debug( - f"{self.options.queue_name} queue _do_export, len: {len(self.batch)}") with self.batch_lock: while not self.queue.empty(): item = self.queue.get() self.batch.append(item) if len(self.batch) == self.options.max_export_batch_length: break - logger.debug( - f"{self.options.queue_name} queue _do_export_end, len: {len(self.batch)}") self._do_export_batch() def _do_export_batch(self): - logger.debug( - f"{self.options.queue_name} queue _do_export_batch, len: {len(self.batch)}") with self.batch_lock: if self.batch: if self.export_func: + logger.debug( + f"{self.options.queue_name} queue _do_export, len: {len(self.batch)}") self.export_func({}, self.batch) self.batch = [] - self.batch_byte_size = 0 + with self.size_lock: + self.batch_byte_size = 0 def enqueue(self, item: Any, byte_size: int): if self.stop_event.is_set(): return + is_fail = False + detail_msg = "" try: self.queue.put_nowait(item) if self.queue.qsize() >= self.options.max_queue_length: with self.condition: self.condition.notify() - logger.debug(f"{self.options.queue_name} enqueue, queue length: {self.queue.qsize()}") + detail_msg = f"{self.options.queue_name} enqueue, queue length: {self.queue.qsize()}" except queue.Full: - logger.error( - f"{self.options.queue_name} queue is full, dropped span") + is_fail = True + detail_msg = f"{self.options.queue_name} queue is full, dropped span" self.dropped += 1 else: - with self.batch_lock: + with self.size_lock: self.batch_byte_size += byte_size + event_typ = "queue_manager.file_entry.rate" + extra_params = FinishEventInfoExtra(is_root_span=False) + if self.options.queue_name == QUEUE_NAME_SPAN or self.options.queue_name == QUEUE_NAME_SPAN_RETRY: + event_typ = "queue_manager.span_entry.rate" + if item.is_root_span(): + extra_params = FinishEventInfoExtra( + is_root_span=True, + ) + + if self.options and self.options.finish_event_processor: + self.options.finish_event_processor(FinishEventInfo( + event_type=event_typ, + is_event_fail=is_fail, + item_num=1, + detail_msg=detail_msg, + extra_params=extra_params + )) + def shutdown(self) -> bool: if self.stop_event.is_set(): return True diff --git a/cozeloop/internal/trace/span.py b/cozeloop/internal/trace/span.py index ab7f361..6189a5a 100644 --- a/cozeloop/internal/trace/span.py +++ b/cozeloop/internal/trace/span.py @@ -3,13 +3,14 @@ import logging from abc import ABC -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional from datetime import datetime import threading import json import urllib.parse from cozeloop import span +from cozeloop.internal.trace.model.model import TagTruncateConf from cozeloop.spec.tracespec import (ModelInput, ModelOutput, ModelMessagePartType, ModelMessage, ModelMessagePart, ModelImageURL, ModelFileURL, ModelChoice, Runtime, ERROR, PROMPT_KEY, PROMPT_VERSION, MODEL_PROVIDER, MODEL_NAME, RUNTIME_, CALL_OPTIONS, @@ -33,7 +34,7 @@ def __init__(self, trace_id: str, span_id: str, baggage: Dict[str, str] = None): baggage = {} self.trace_id = trace_id self.span_id = span_id - self.baggage = baggage + self._baggage = baggage def trace_id(self) -> str: return self.trace_id @@ -41,11 +42,16 @@ def trace_id(self) -> str: def span_id(self) -> str: return self.span_id + @property def baggage(self) -> Dict[str, str]: - return self.baggage + return self._baggage + + @baggage.setter + def baggage(self, value: Dict[str, str]): + self._baggage = value def set_baggage_item(self, key: str, value: str): - self.baggage[key] = value + self._baggage[key] = value class Span(span.Span, SpanContext, ABC): @@ -54,10 +60,18 @@ def __init__(self, span_type: str = '', name: str = '', space_id: str = '', trac baggage: Dict[str, str] = None, tag_map: Dict[str, Any] = None, system_tag_map: Dict[str, Any] = None, status_code: int = 0, multi_modality_key_map: Dict[str, Any] = None, ultra_large_report: bool = False, span_processor: Any = None, flags: int = 0, - is_finished: int = 0): + is_finished: int = 0, *, service_name: str = '', log_id: str = '', + tag_truncate_conf: Optional[TagTruncateConf] = None): + # span context param super().__init__(trace_id, span_id, baggage) + + # basic param self.span_type = span_type self.name = name + + # These params can be changed, but remember locking when changed + self.service_name = service_name + self.log_id = log_id self.space_id = space_id self.parent_span_id = parent_span_id self.start_time = start_time if start_time else datetime.now() @@ -65,6 +79,8 @@ def __init__(self, span_type: str = '', name: str = '', space_id: str = '', trac self.tag_map = tag_map if tag_map else {} self.system_tag_map = system_tag_map if system_tag_map else {} self.status_code = status_code + + # These params is internal field self.multi_modality_key_map = multi_modality_key_map if multi_modality_key_map else {} self.ultra_large_report = ultra_large_report self.span_processor = span_processor @@ -72,6 +88,7 @@ def __init__(self, span_type: str = '', name: str = '', space_id: str = '', trac self.is_finished = is_finished self.lock = threading.RLock() self.bytes_size: int = 0 + self.tag_truncate_conf = tag_truncate_conf def set_tags(self, tag_kv: Dict[str, Any]): if not tag_kv: @@ -138,7 +155,7 @@ def ultra_large_report(self) -> bool: return self.ultra_large_report def baggage(self) -> Dict[str, str]: - return super().baggage() + return super().baggage def set_input(self, input_data): if self is None: @@ -344,12 +361,31 @@ def set_start_time_first_resp(self, start_time_first_resp: int): self.set_tags({START_TIME_FIRST_RESP: start_time_first_resp}) def set_runtime(self, runtime: Runtime) -> None: - r = Runtime(scene=V_SCENE_CUSTOM, library=runtime.library, library_version=runtime.library_version) + r = runtime + r.scene = V_SCENE_CUSTOM with self.lock: if self.system_tag_map is None: self.system_tag_map = {} self.system_tag_map[RUNTIME_] = r + def set_service_name(self, service_name: str) -> None: + self.service_name = service_name + + def set_log_id(self, log_id: str) -> None: + self.log_id = log_id + + def set_system_tags(self, system_tags: Dict[str, Any]) -> None: + if not system_tags: + return + with self.lock: + if self.system_tag_map is None: + self.system_tag_map = {} + for key, value in system_tags.items(): + self.system_tag_map[key] = value + + def set_deployment_env(self, deployment_env: str) -> None: + self.set_tags({DEPLOYMENT_ENV: deployment_env}) + def get_rectified_map(self, input_map: Dict[str, Any]) -> (Dict[str, Any], List[str], int): validate_map = {} cut_off_keys = [] @@ -368,7 +404,7 @@ def get_rectified_map(self, input_map: Dict[str, Any]) -> (Dict[str, Any], List[ value = value_str # Truncate the value if a single tag's value is too large - tag_value_length_limit = get_tag_value_size_limit(key) + tag_value_length_limit = self.get_tag_value_size_limit(key) is_ultra_large_report = False v, is_truncate = truncate_string_by_byte(value_str, tag_value_length_limit) if is_truncate: @@ -397,6 +433,16 @@ def get_rectified_map(self, input_map: Dict[str, Any]) -> (Dict[str, Any], List[ def is_can_cut_off(self, value: Any) -> bool: return value is not None and not isinstance(value, (int, float, bool)) + def get_tag_value_size_limit(self, key: str) -> int: + limit = get_tag_value_size_limit(key) + if key == INPUT or key == OUTPUT: + if self.tag_truncate_conf and self.tag_truncate_conf.input_output_field_max_byte > 0: + limit = self.tag_truncate_conf.input_output_field_max_byte + else: + if self.tag_truncate_conf and self.tag_truncate_conf.normal_field_max_byte > 0: + limit = self.tag_truncate_conf.normal_field_max_byte + return limit + def set_multi_modality_map(self, key: str): with self.lock: if not self.multi_modality_key_map: @@ -487,7 +533,8 @@ def set_stat_info(self): input_tokens = tag_map.get(INPUT_TOKENS, 0) output_tokens = tag_map.get(OUTPUT_TOKENS, 0) - self.set_tags({TOKENS: int(input_tokens) + int(output_tokens)}) + if input_tokens > 0 or output_tokens > 0: + self.set_tags({TOKENS: int(input_tokens) + int(output_tokens)}) duration = int((datetime.now().timestamp() - self.start_time.timestamp()) * 1000000) with self.lock: @@ -512,11 +559,14 @@ def to_header(self) -> Dict[str, str]: def to_header_baggage(self) -> str: if not self.baggage: return "" - return ",".join(f"{k}={v}" for k, v in self.baggage.items()) + return ",".join(f"{k}={v}" for k, v in self.baggage().items()) def to_header_parent(self) -> str: return f"{GLOBAL_TRACE_VERSION:02x}-{self.trace_id}-{self.span_id}-{self.flags:02x}" + def is_root_span(self) -> bool: + return self.parent_span_id is None or self.parent_span_id == '' or self.parent_span_id == '0' + def __enter__(self): return self diff --git a/cozeloop/internal/trace/span_processor.py b/cozeloop/internal/trace/span_processor.py index 1b0da0a..7ee0cbd 100644 --- a/cozeloop/internal/trace/span_processor.py +++ b/cozeloop/internal/trace/span_processor.py @@ -11,10 +11,13 @@ import threading from cozeloop.internal.trace.exporter import * -from cozeloop.internal.trace.queue_manager import BatchQueueManager, BatchQueueManagerOptions +from cozeloop.internal.trace.model.model import FinishEventInfo, QueueConf, FinishEventInfoExtra +from cozeloop.internal.trace.queue_manager import BatchQueueManager, BatchQueueManagerOptions, QUEUE_NAME_FILE_RETRY, \ + QUEUE_NAME_FILE, QUEUE_NAME_SPAN_RETRY, QUEUE_NAME_SPAN from cozeloop.internal.trace.span import Span -DEFAULT_MAX_QUEUE_LENGTH = 2048 +DEFAULT_MAX_QUEUE_LENGTH = 1024 +DEFAULT_MAX_RETRY_QUEUE_LENGTH = 512 DEFAULT_MAX_EXPORT_BATCH_LENGTH = 100 DEFAULT_MAX_EXPORT_BATCH_BYTE_SIZE = 4 * 1024 * 1024 # 4MB MAX_RETRY_EXPORT_BATCH_LENGTH = 50 @@ -27,6 +30,7 @@ logger = logging.getLogger(__name__) + class SpanProcessor: def on_span_end(self, s: Span): raise NotImplementedError @@ -39,50 +43,82 @@ def force_flush(self) -> bool: class BatchSpanProcessor(SpanProcessor): - def __init__(self, client): - self.exporter = SpanExporter(client) + def __init__( + self, + client, + upload_path: UploadPath = None, + finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None, + queue_conf: Optional[QueueConf] = None, + ): + span_upload_path = PATH_INGEST_TRACE + file_upload_path = PATH_UPLOAD_FILE + if upload_path: + if upload_path.span_upload_path: + span_upload_path = upload_path.span_upload_path + if upload_path.file_upload_path: + file_upload_path = upload_path.file_upload_path + self.exporter = SpanExporter( + client, + UploadPath( + span_upload_path=span_upload_path, + file_upload_path=file_upload_path, + ) + ) + + span_queue_length = DEFAULT_MAX_QUEUE_LENGTH + span_export_batch_size = DEFAULT_MAX_EXPORT_BATCH_LENGTH + if queue_conf: + if queue_conf.span_queue_length > 0: + span_queue_length = queue_conf.span_queue_length + if queue_conf.span_max_export_batch_length > 0: # todo: need max limit + span_export_batch_size = queue_conf.span_max_export_batch_length self.file_retry_qm = BatchQueueManager( BatchQueueManagerOptions( - queue_name='file_retry', + queue_name=QUEUE_NAME_FILE_RETRY, batch_timeout=FILE_SCHEDULE_DELAY, max_queue_length=MAX_FILE_QUEUE_LENGTH, max_export_batch_length=MAX_FILE_EXPORT_BATCH_LENGTH, max_export_batch_byte_size=MAX_FILE_EXPORT_BATCH_BYTE_SIZE, - export_func=self._new_export_files_func(self.exporter, None) + export_func=self._new_export_files_func(self.exporter, None, finish_event_processor), + finish_event_processor=finish_event_processor, ) ) self.file_qm = BatchQueueManager( BatchQueueManagerOptions( - queue_name='file', + queue_name=QUEUE_NAME_FILE, batch_timeout=FILE_SCHEDULE_DELAY, max_queue_length=MAX_FILE_QUEUE_LENGTH, max_export_batch_length=MAX_FILE_EXPORT_BATCH_LENGTH, max_export_batch_byte_size=MAX_FILE_EXPORT_BATCH_BYTE_SIZE, - export_func=self._new_export_files_func(self.exporter, self.file_retry_qm) + export_func=self._new_export_files_func(self.exporter, self.file_retry_qm, finish_event_processor), + finish_event_processor=finish_event_processor, ) ) self.span_retry_qm = BatchQueueManager( BatchQueueManagerOptions( - queue_name='span_retry', + queue_name=QUEUE_NAME_SPAN_RETRY, batch_timeout=DEFAULT_SCHEDULE_DELAY, - max_queue_length=DEFAULT_MAX_QUEUE_LENGTH, + max_queue_length=DEFAULT_MAX_RETRY_QUEUE_LENGTH, max_export_batch_length=MAX_RETRY_EXPORT_BATCH_LENGTH, max_export_batch_byte_size=DEFAULT_MAX_EXPORT_BATCH_BYTE_SIZE, - export_func=self._new_export_spans_func(self.exporter, None, self.file_qm) + export_func=self._new_export_spans_func(self.exporter, None, self.file_qm, finish_event_processor), + finish_event_processor=finish_event_processor, ) ) self.span_qm = BatchQueueManager( BatchQueueManagerOptions( - queue_name='span', + queue_name=QUEUE_NAME_SPAN, batch_timeout=DEFAULT_SCHEDULE_DELAY, - max_queue_length=DEFAULT_MAX_QUEUE_LENGTH, - max_export_batch_length=DEFAULT_MAX_EXPORT_BATCH_LENGTH, + max_queue_length=span_queue_length, + max_export_batch_length=span_export_batch_size, max_export_batch_byte_size=DEFAULT_MAX_EXPORT_BATCH_BYTE_SIZE, - export_func=self._new_export_spans_func(self.exporter, self.span_retry_qm, self.file_qm) + export_func=self._new_export_spans_func(self.exporter, self.span_retry_qm, self.file_qm, + finish_event_processor), + finish_event_processor=finish_event_processor, ) ) @@ -108,34 +144,98 @@ def force_flush(self) -> bool: success = False return success - def _new_export_spans_func(self, exporter, span_retry_queue, file_queue): + def _new_export_spans_func( + self, + exporter, + span_retry_queue, + file_queue, + finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None + ): def export_func(ctx: dict, items: List[Any]): spans = [s for s in items if isinstance(s, Span)] + if not spans or len(spans) == 0: + return try: upload_spans, upload_files = transfer_to_upload_span_and_file(spans) - logger.debug(f"upload_spans len[{len(upload_spans)}], upload_files len[{len(upload_files)}]") except Exception as e: logger.warning(f"transfer_to_upload_span_and_file fail") + return - if not exporter.export_spans(ctx, upload_spans): + event_err_msg = "" + before = time.perf_counter() + + is_export_pass = True + export_msg = "" + try: + exporter.export_spans(ctx, upload_spans) + except Exception as e: + is_export_pass = False + export_msg = f"{e}" + + elapsed_time_ms = (time.perf_counter() - before) * 1000 + if not is_export_pass: if span_retry_queue: for span in spans: span_retry_queue.enqueue(span, span.bytes_size) + event_err_msg = f'{export_msg}, retry later' + else: + event_err_msg = f'{export_msg}, retry second time failed' else: for file in upload_files: if file and file_queue: file_queue.enqueue(file, len(file.data)) + if finish_event_processor: + finish_event_processor(FinishEventInfo( + event_type="exporter.span_flush.rate", + is_event_fail=not is_export_pass, + item_num=len(spans), + detail_msg=event_err_msg, + extra_params=FinishEventInfoExtra( + latency_ms=int(elapsed_time_ms) + ) + )) return export_func - def _new_export_files_func(self, exporter, file_retry_queue): + def _new_export_files_func( + self, + exporter, + file_retry_queue, + finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None + ): def export_func(ctx: dict, items: List[Any]): files = [f for f in items if isinstance(f, UploadFile)] + if not files or len(files) == 0: + return + + event_err_msg = "" + before = time.perf_counter() + + is_export_pass = True + export_msg = "" + try: + exporter.export_files(ctx, files) + except Exception as e: + is_export_pass = True + export_msg = f"{e}" - if not exporter.export_files(ctx, files): - logger.warning(f" export_files fail") + elapsed_time_ms = (time.perf_counter() - before) * 1000 + if not is_export_pass: if file_retry_queue: for file in files: file_retry_queue.enqueue(file, len(file.data)) + event_err_msg = f'{export_msg}, retry later' + else: + event_err_msg = f'{export_msg}, retry second time failed' + if finish_event_processor: + finish_event_processor(FinishEventInfo( + event_type="exporter.file_flush.rate", + is_event_fail=not is_export_pass, + item_num=len(files), + detail_msg=event_err_msg, + extra_params=FinishEventInfoExtra( + latency_ms=int(elapsed_time_ms) + ) + )) - return export_func \ No newline at end of file + return export_func diff --git a/cozeloop/internal/trace/trace.py b/cozeloop/internal/trace/trace.py index 7e763a1..d536747 100644 --- a/cozeloop/internal/trace/trace.py +++ b/cozeloop/internal/trace/trace.py @@ -1,10 +1,12 @@ # Copyright (c) 2025 Bytedance Ltd. and/or its affiliates # SPDX-License-Identifier: MIT - +import logging import time from datetime import datetime -from typing import Dict, Optional +from typing import Dict, Optional, Callable +from cozeloop.internal.trace.exporter import UploadPath +from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf, QueueConf from cozeloop.spec.tracespec import Runtime, RUNTIME_ from cozeloop.internal import consts from cozeloop.internal.trace.span import from_header, Span, SpanContext, \ @@ -12,6 +14,7 @@ from cozeloop.internal.trace.span_processor import BatchSpanProcessor, SpanProcessor from cozeloop.internal.utils.get import gen_16char_id, gen_32char_id +logger = logging.getLogger(__name__) class TraceOptions: def __init__(self, workspace_id: str, ultra_large_report: bool = False): @@ -33,11 +36,28 @@ def __init__( http_client, workspace_id: str, *, - ultra_large_report: bool = False + ultra_large_report: bool = False, + finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None, + tag_truncate_conf: Optional[TagTruncateConf] = None, + span_upload_path: str = None, + file_upload_path: str = None, + queue_conf: Optional[QueueConf] = None, ): self.workspace_id = workspace_id self.ultra_large_report = ultra_large_report - self.span_processor: SpanProcessor = BatchSpanProcessor(http_client) + upload_path = None + if span_upload_path or file_upload_path: + upload_path = UploadPath( + span_upload_path=span_upload_path, + file_upload_path=file_upload_path, + ) + self.span_processor: SpanProcessor = BatchSpanProcessor( + http_client, + upload_path, + finish_event_processor, + queue_conf, + ) + self.tag_truncate_conf = tag_truncate_conf def start_span( self, @@ -50,6 +70,10 @@ def start_span( start_new_trace: bool = False, scene: str = '' ) -> Span: + if name == '': + name = 'unknown' + if span_type == '': + span_type = 'unknown' if len(name) > consts.MAX_BYTES_OF_ONE_TAG_VALUE_DEFAULT: logger.warning( f"Name is too long, will be truncated to {consts.MAX_BYTES_OF_ONE_TAG_VALUE_DEFAULT} bytes, original name: {name}" @@ -69,7 +93,7 @@ def start_span( if parent_span_id is None: parent_span_id = parent_span.span_id if baggage is None: - baggage = parent_span.baggage + baggage = parent_span.baggage() loop_span = self._start_span(name, span_type, start_time, parent_span_id, trace_id, baggage, scene) set_span_to_context(loop_span) @@ -116,8 +140,9 @@ def _start_span(self, ultra_large_report=self.ultra_large_report, multi_modality_key_map={}, span_processor=self.span_processor, - flags=0, + flags=1, # default sampled is_finished=0, + tag_truncate_conf=self.tag_truncate_conf, ) span.set_baggage_escape(baggage, False) @@ -128,3 +153,12 @@ def flush(self): def close_trace(self): self.span_processor.shutdown() + + +def default_finish_event_processor(info: FinishEventInfo): + if info is None: + return + if info.is_event_fail: + logger.error(f"[fornax_sdk] finish_event[{info.event_type}] fail, msg: {info.detail_msg}") + else: + logger.debug(f"[fornax_sdk] finish_event[{info.event_type}] success, item num: {info.item_num}, msg: {info.detail_msg}") \ No newline at end of file diff --git a/cozeloop/internal/version.py b/cozeloop/internal/version.py index 379aa5a..82ed6df 100644 --- a/cozeloop/internal/version.py +++ b/cozeloop/internal/version.py @@ -1,4 +1,4 @@ # Copyright (c) 2025 Bytedance Ltd. and/or its affiliates # SPDX-License-Identifier: MIT -VERSION = 'v0.1.0' \ No newline at end of file +VERSION = 'v0.1.10' \ No newline at end of file diff --git a/cozeloop/span.py b/cozeloop/span.py index ec3fd22..ec1f7c4 100644 --- a/cozeloop/span.py +++ b/cozeloop/span.py @@ -135,6 +135,7 @@ def set_model_name(self, model_name: str) -> None: def set_model_call_options(self, model_call_options: Any) -> None: """ Set the model call options + Key: `call_options`. """ @abstractmethod @@ -167,6 +168,30 @@ def set_runtime(self, runtime: Runtime) -> None: Key: `runtime`. """ + @abstractmethod + def set_service_name(self, service_name: str) -> None: + """ + set the custom service name, identify different services. + """ + + @abstractmethod + def set_log_id(self, log_id: str) -> None: + """ + Set the custom log id, identify different query. + """ + + @abstractmethod + def set_system_tags(self, system_tags: Dict[str, Any]) -> None: + """ + Set system tags. DO NOT use this method unless you know what you are doing. + """ + + def set_deployment_env(self, deployment_env: str) -> None: + """ + Set the deployment environment of the span, identify custom environments. + """ + + class Span(CommonSpanSetter, SpanContext): """ Interface for Span operations with tracing and tagging capabilities. diff --git a/cozeloop/spec/tracespec/runtime.py b/cozeloop/spec/tracespec/runtime.py index defd80c..66f97e0 100644 --- a/cozeloop/spec/tracespec/runtime.py +++ b/cozeloop/spec/tracespec/runtime.py @@ -4,6 +4,7 @@ from typing import Optional from pydantic import BaseModel + class Runtime(BaseModel): language: Optional[str] = None # from enum VLang in span_value.py library: Optional[str] = None # integration library, from enum VLib in span_value.py @@ -11,4 +12,7 @@ class Runtime(BaseModel): # Dependency Versions. library_version: Optional[str] = None - loop_sdk_version: Optional[str] = None \ No newline at end of file + loop_sdk_version: Optional[str] = None + + # Extra info. + extra: Optional[dict] = None \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 1036644..fee793b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cozeloop" -version = "0.1.8" +version = "0.1.10" description = "coze loop sdk" authors = ["JiangQi715 "] license = "MIT"