Skip to content

support FinishEventProcessor and some Span Set Method #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions cozeloop/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import os
import threading
from datetime import datetime
from typing import Dict, Any, List, Optional
from typing import Dict, Any, List, Optional, Callable

import httpx

from cozeloop.client import Client
from cozeloop._noop import NOOP_SPAN, _NoopClient
Expand All @@ -17,6 +19,8 @@
from cozeloop.internal.httpclient import Auth
from cozeloop.internal.prompt import PromptProvider
from cozeloop.internal.trace import TraceProvider
from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf, QueueConf
from cozeloop.internal.trace.trace import default_finish_event_processor
from cozeloop.span import SpanContext, Span

logger = logging.getLogger(__name__)
Expand All @@ -35,6 +39,15 @@
_default_client = None
_client_lock = threading.Lock()

class APIBasePath:
def __init__(
self,
trace_span_upload_path: str = None,
trace_file_upload_path: str = None,
):
self.trace_span_upload_path = trace_span_upload_path
self.trace_file_upload_path = trace_file_upload_path


def _generate_cache_key(*args) -> str:
key_str = "\t".join(str(arg) for arg in args)
Expand All @@ -54,8 +67,13 @@ def new_client(
prompt_cache_max_count: int = consts.DEFAULT_PROMPT_CACHE_MAX_COUNT,
prompt_cache_refresh_interval: int = consts.DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL,
prompt_trace: bool = False,
http_client: Optional[httpx.Client] = None,
trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
tag_truncate_conf: Optional[TagTruncateConf] = None,
api_base_path: Optional[APIBasePath] = None,
trace_queue_conf: Optional[QueueConf] = None,
) -> Client:
cache_key = _generate_cache_key(
cache_key = _generate_cache_key( # all args are used to generate cache key
api_base_url,
workspace_id,
api_token,
Expand All @@ -67,7 +85,12 @@ def new_client(
ultra_large_report,
prompt_cache_max_count,
prompt_cache_refresh_interval,
prompt_trace
prompt_trace,
http_client,
trace_finish_event_processor,
tag_truncate_conf,
api_base_path,
trace_queue_conf,
)

with _cache_lock:
Expand All @@ -88,6 +111,11 @@ def new_client(
prompt_cache_max_count=prompt_cache_max_count,
prompt_cache_refresh_interval=prompt_cache_refresh_interval,
prompt_trace=prompt_trace,
arg_http_client=http_client,
trace_finish_event_processor=trace_finish_event_processor,
tag_truncate_conf=tag_truncate_conf,
api_base_path=api_base_path,
trace_queue_conf=trace_queue_conf,
)
_client_cache[cache_key] = client
return client
Expand All @@ -113,7 +141,12 @@ def __init__(
ultra_large_report: bool = False,
prompt_cache_max_count: int = consts.DEFAULT_PROMPT_CACHE_MAX_COUNT,
prompt_cache_refresh_interval: int = consts.DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL,
prompt_trace: bool = False
prompt_trace: bool = False,
arg_http_client: Optional[httpx.Client] = None,
trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
tag_truncate_conf: Optional[TagTruncateConf] = None,
api_base_path: Optional[APIBasePath] = None,
trace_queue_conf: Optional[QueueConf] = None,
):
workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID)
api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL)
Expand All @@ -136,6 +169,8 @@ def __init__(

self._workspace_id = workspace_id
inner_client = httpclient.HTTPClient()
if arg_http_client:
inner_client = arg_http_client
auth = self._build_auth(
api_base_url=api_base_url,
http_client=inner_client,
Expand All @@ -151,10 +186,26 @@ def __init__(
timeout=timeout,
upload_timeout=upload_timeout
)
finish_pro = default_finish_event_processor
if trace_finish_event_processor:
def combined_processor(event_info: FinishEventInfo):
default_finish_event_processor(event_info)
trace_finish_event_processor(event_info)
finish_pro = combined_processor
span_upload_path = None
file_upload_path = None
if api_base_path:
span_upload_path = api_base_path.trace_span_upload_path
file_upload_path = api_base_path.trace_file_upload_path
self._trace_provider = TraceProvider(
http_client=http_client,
workspace_id=workspace_id,
ultra_large_report=ultra_large_report
ultra_large_report=ultra_large_report,
finish_event_processor=finish_pro,
tag_truncate_conf=tag_truncate_conf,
span_upload_path=span_upload_path,
file_upload_path=file_upload_path,
queue_conf=trace_queue_conf,
)
self._prompt_provider = PromptProvider(
workspace_id=workspace_id,
Expand Down Expand Up @@ -234,7 +285,7 @@ def start_span(
else:
return self._trace_provider.start_span(name=name, span_type=span_type, start_time=start_time,
parent_span_id=child_of.span_id, trace_id=child_of.trace_id,
baggage=child_of.baggage, start_new_trace=start_new_trace)
baggage=child_of.baggage(), start_new_trace=start_new_trace)
except Exception as e:
logger.warning(f"Start span failed, returning noop span. Error: {e}")
return NOOP_SPAN
Expand Down
1 change: 1 addition & 0 deletions cozeloop/internal/consts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
THREAD_ID = "thread_id"
START_TIME_FIRST_RESP = "start_time_first_resp"
LATENCY_FIRST_RESP = "latency_first_resp"
DEPLOYMENT_ENV = "deployment_env"
CUT_OFF = "cut_off"

# ReserveFieldTypes Define the allowed types for each reserved field.
Expand Down
4 changes: 2 additions & 2 deletions cozeloop/internal/httpclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, st
res.update(headers)
res[consts.AUTHORIZE_HEADER] = f"Bearer {self.auth.token}"

tt_env = os.getenv("x-tt-env")
tt_env = os.getenv("x_tt_env")
if tt_env:
res["x-tt-env"] = tt_env
ppe_env = os.getenv("x-use-ppe")
ppe_env = os.getenv("x_use_ppe")
if ppe_env:
res["x-use-ppe"] = "1"

Expand Down
1 change: 0 additions & 1 deletion cozeloop/internal/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@
from .trace import TraceProvider

from .trace import Span

52 changes: 31 additions & 21 deletions cozeloop/internal/trace/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
logger = logging.getLogger(__name__)

class Exporter:
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool:
def export_spans(self, ctx: dict, spans: List['UploadSpan']):
raise NotImplementedError

def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
def export_files(self, ctx: dict, files: List['UploadFile']):
raise NotImplementedError


Expand All @@ -35,55 +35,61 @@ def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
PATH_INGEST_TRACE = "/v1/loop/traces/ingest"
PATH_UPLOAD_FILE = "/v1/loop/files/upload"

class UploadPath:
def __init__(
self,
span_upload_path: str,
file_upload_path: str,
):
self.span_upload_path = span_upload_path
self.file_upload_path = file_upload_path


class SpanExporter(Exporter):
def __init__(self, client: Client):
def __init__(
self,
client: Client,
upload_path: UploadPath,
):
self.client = client
self.upload_path = upload_path

def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
def export_files(self, ctx: dict, files: List['UploadFile']):
for file in files:
if not file:
continue

logger.debug(f"uploadFile start, file name: {file.name}")
try:
resp = self.client.upload_file(
PATH_UPLOAD_FILE,
self.upload_path.file_upload_path,
BaseResponse,
file.data,
file.tos_key,
{"workspace_id": file.space_id},
)
if resp.code != 0: # todo: some err code do not need retry
logger.error(f"export files[{file.tos_key}] fail, code:[{resp.code}], msg:[{resp.msg}]")
return False
raise Exception(f"code:[{resp.code}], msg:[{resp.msg}]")
except Exception as e:
logger.error(f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]")
return False
raise Exception(f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]")

logger.debug(f"uploadFile end, file name: {file.name}")
return True

def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool:
logger.debug(f"export spans, spans count: {len(spans)}")

if not spans:
return True
def export_spans(self, ctx: dict, spans: List['UploadSpan']):
if not spans or len(spans) == 0:
return

try:
resp = self.client.post(
PATH_INGEST_TRACE,
self.upload_path.span_upload_path,
BaseResponse,
UploadSpanData(spans=spans),
)
if resp.code != 0: # todo: some err code do not need retry
logger.error(f"export spans fail, code:[{resp.code}], msg:[{resp.msg}]")
return False
raise Exception(f"code:[{resp.code}], msg:[{resp.msg}]")
except Exception as e:
logger.error(f"export spans fail, err:[{e}]")
return False

return True
raise Exception(f"export spans fail, err:[{e}]")


class UploadSpanData(BaseModel):
Expand All @@ -92,10 +98,12 @@ class UploadSpanData(BaseModel):

class UploadSpan(BaseModel):
started_at_micros: int
log_id: str
span_id: str
parent_id: str
trace_id: str
duration_micros: int
service_name: str
workspace_id: str
span_name: str
span_type: str
Expand Down Expand Up @@ -137,10 +145,12 @@ def transfer_to_upload_span_and_file(spans: List['Span']) -> (List[UploadSpan],

res_span.append(UploadSpan(
started_at_micros=int(span.start_time.timestamp() * 1_000_000),
log_id=span.log_id,
span_id=span.span_id,
parent_id=span.parent_span_id,
trace_id=span.trace_id,
duration_micros=span.get_duration(),
service_name=span.service_name,
workspace_id=span.get_space_id(),
span_name=span.get_span_name(),
span_type=span.get_span_type(),
Expand Down
46 changes: 45 additions & 1 deletion cozeloop/internal/trace/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from enum import Enum

from pydantic import BaseModel
from typing import List, Optional
from typing import List, Optional, Literal
from pydantic.dataclasses import dataclass


class ObjectStorage(BaseModel):
Expand All @@ -23,3 +24,46 @@ class Attachment(BaseModel):
class UploadType(str, Enum):
LONG = 1
MULTI_MODALITY = 2


SpanFinishEvent = Literal[
"queue_manager.span_entry.rate",
"queue_manager.file_entry.rate",
"exporter.span_flush.rate",
"exporter.file_flush.rate"
]


@dataclass
class FinishEventInfoExtra:
is_root_span: bool = False
latency_ms: int = 0


@dataclass
class FinishEventInfo:
event_type: SpanFinishEvent
is_event_fail: bool
item_num: int # maybe multiple span is processed in one event
detail_msg: str
extra_params: Optional[FinishEventInfoExtra] = None


class TagTruncateConf:
def __init__(
self,
normal_field_max_byte: int,
input_output_field_max_byte: int,
):
self.normal_field_max_byte = normal_field_max_byte
self.input_output_field_max_byte = input_output_field_max_byte


class QueueConf:
def __init__(
self,
span_queue_length: int,
span_max_export_batch_length: int,
):
self.span_queue_length = span_queue_length
self.span_max_export_batch_length = span_max_export_batch_length
12 changes: 12 additions & 0 deletions cozeloop/internal/trace/noop_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ def set_start_time_first_resp(self, start_time_first_resp: int) -> None:
def set_runtime(self, runtime: Runtime) -> None:
pass

def set_service_name(self, service_name: str) -> None:
pass

def set_log_id(self, log_id: str) -> None:
pass

def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
pass

def set_deployment_env(self, deployment_env: str) -> None:
pass

def __enter__(self):
return self

Expand Down
Loading