Skip to content

Commit

Permalink
Merge pull request #10 from parea-ai/PAI-596-enable-filtering-on-nest…
Browse files Browse the repository at this point in the history
…ed-outputs

feat: add root trace ids
  • Loading branch information
joschkabraun authored Jan 31, 2024
2 parents 6f0e926 + dfe2810 commit d9ae82e
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 74 deletions.
2 changes: 1 addition & 1 deletion src/api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ interface RequestConfig {

export class HTTPClient {
private static instance: HTTPClient;
private baseURL: string = 'https://parea-ai-backend-e2adf7624bcb3980.onporter.run/api/parea/v1';
private baseURL: string = 'https://parea-ai-backend-us-9ac16cdbc7a7b006.onporter.run/api/parea/v1';
private apiKey: string | null = null;
private client: AxiosInstance;

Expand Down
1 change: 1 addition & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export class Parea {
const inference_id = genTraceId();
data.inference_id = inference_id;
data.parent_trace_id = parentTraceId || inference_id;
data.root_trace_id = parentStore ? Array.from(parentStore.values())[0].rootTraceId : data.parent_trace_id;

if (process.env.PAREA_OS_ENV_EXPERIMENT_UUID) {
experiment_uuid = process.env.PAREA_OS_ENV_EXPERIMENT_UUID;
Expand Down
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export type LLMInputs = {
export type Completion = {
inference_id?: string;
parent_trace_id?: string;
root_trace_id?: string;
trace_name?: string;
llm_inputs?: { [key: string]: any };
llm_configuration?: LLMInputs;
Expand Down Expand Up @@ -121,6 +122,7 @@ export type Log = {
export type TraceLog = Log & {
trace_id: string;
parent_trace_id?: string;
root_trace_id?: string;
start_timestamp: string;
organization_id?: string;
error?: string;
Expand Down
72 changes: 39 additions & 33 deletions src/utils/trace_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AsyncLocalStorage } from 'node:async_hooks';
export type ContextObject = {
traceLog: TraceLog;
threadIdsRunningEvals: string[];
rootTraceId: string;
};

export const asyncLocalStorage = new AsyncLocalStorage<Map<string, ContextObject>>();
Expand Down Expand Up @@ -62,11 +63,13 @@ export const trace = (funcName: string, func: (...args: any[]) => any, options?:

const parentStore = asyncLocalStorage.getStore();
const parentTraceId = parentStore ? Array.from(parentStore.keys())[0] : undefined;
const rootTraceId = parentStore ? Array.from(parentStore.values())[0].rootTraceId : traceId;

const traceLog: TraceLog = {
trace_name: funcName,
trace_id: traceId,
parent_trace_id: parentTraceId || traceId,
root_trace_id: rootTraceId,
start_timestamp: toDateTimeString(startTimestamp),
inputs: extractFunctionParams(func, args),
metadata: options?.metadata,
Expand All @@ -78,42 +81,45 @@ export const trace = (funcName: string, func: (...args: any[]) => any, options?:
experiment_uuid: process.env.PAREA_OS_ENV_EXPERIMENT_UUID || null,
};

return asyncLocalStorage.run(new Map([[traceId, { traceLog, threadIdsRunningEvals: [] }]]), async () => {
if (parentStore && parentTraceId) {
const parentTraceLog = parentStore.get(parentTraceId);
if (parentTraceLog) {
parentTraceLog.traceLog.children.push(traceId);
parentStore.set(parentTraceId, parentTraceLog);
return asyncLocalStorage.run(
new Map([[traceId, { traceLog, threadIdsRunningEvals: [], rootTraceId }]]),
async () => {
if (parentStore && parentTraceId) {
const parentTraceLog = parentStore.get(parentTraceId);
if (parentTraceLog) {
parentTraceLog.traceLog.children.push(traceId);
parentStore.set(parentTraceId, parentTraceLog);
}
}
}

try {
const result = await func(...args);
const output = typeof result === 'string' ? result : JSON.stringify(result);
let outputForEvalMetrics = output;
if (options?.accessOutputOfFunc) {
outputForEvalMetrics = options?.accessOutputOfFunc(result);
try {
const result = await func(...args);
const output = typeof result === 'string' ? result : JSON.stringify(result);
let outputForEvalMetrics = output;
if (options?.accessOutputOfFunc) {
outputForEvalMetrics = options?.accessOutputOfFunc(result);
}
traceInsert(traceId, {
output,
evaluation_metric_names: options?.evalFuncNames,
output_for_eval_metrics: outputForEvalMetrics,
});
return result;
} catch (error: any) {
console.error(`Error occurred in function ${func.name}, ${error}`);
traceInsert(traceId, { error: error.toString(), status: 'error' });
throw error;
} finally {
const endTimestamp = new Date();
traceInsert(traceId, {
end_timestamp: toDateTimeString(endTimestamp),
latency: (endTimestamp.getTime() - startTimestamp.getTime()) / 1000,
});
await pareaLogger.recordLog(traceLog);
await handleRunningEvals(traceLog, traceId, options);
}
traceInsert(traceId, {
output,
evaluation_metric_names: options?.evalFuncNames,
output_for_eval_metrics: outputForEvalMetrics,
});
return result;
} catch (error: any) {
console.error(`Error occurred in function ${func.name}, ${error}`);
traceInsert(traceId, { error: error.toString(), status: 'error' });
throw error;
} finally {
const endTimestamp = new Date();
traceInsert(traceId, {
end_timestamp: toDateTimeString(endTimestamp),
latency: (endTimestamp.getTime() - startTimestamp.getTime()) / 1000,
});
await pareaLogger.recordLog(traceLog);
await handleRunningEvals(traceLog, traceId, options);
}
});
},
);
};
};

Expand Down
85 changes: 45 additions & 40 deletions src/utils/wrap_openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ function wrapMethod(method: Function) {

const parentStore = asyncLocalStorage.getStore();
const parentTraceId = parentStore ? Array.from(parentStore.keys())[0] : undefined;
const rootTraceId = parentStore ? Array.from(parentStore.values())[0].rootTraceId : traceId;

const traceLog: TraceLog = {
trace_id: traceId,
parent_trace_id: parentTraceId || traceId,
root_trace_id: rootTraceId,
trace_name: 'llm-openai',
start_timestamp: toDateTimeString(startTimestamp),
configuration: {
Expand All @@ -34,48 +36,51 @@ function wrapMethod(method: Function) {
experiment_uuid: process.env.PAREA_OS_ENV_EXPERIMENT_UUID || null,
};

return asyncLocalStorage.run(new Map([[traceId, { traceLog, threadIdsRunningEvals: [] }]]), async () => {
if (parentStore && parentTraceId) {
const parentTraceLog = parentStore.get(parentTraceId);
if (parentTraceLog) {
parentTraceLog.traceLog.children.push(traceId);
parentStore.set(parentTraceId, parentTraceLog);
return asyncLocalStorage.run(
new Map([[traceId, { traceLog, threadIdsRunningEvals: [], rootTraceId }]]),
async () => {
if (parentStore && parentTraceId) {
const parentTraceLog = parentStore.get(parentTraceId);
if (parentTraceLog) {
parentTraceLog.traceLog.children.push(traceId);
parentStore.set(parentTraceId, parentTraceLog);
}
}
}

try {
response = await method.apply(this, args);
traceInsert(traceId, {
output: getOutput(response),
input_tokens: response.usage.prompt_tokens,
output_tokens: response.usage.completion_tokens,
total_tokens: response.usage.total_tokens,
cost: getTotalCost(args[0].model, response.usage.prompt_tokens, response.usage.completion_tokens),
});
} catch (err: unknown) {
if (err instanceof Error) {
error = err.message;
} else {
error = 'An unknown error occurred';

try {
response = await method.apply(this, args);
traceInsert(traceId, {
output: getOutput(response),
input_tokens: response.usage.prompt_tokens,
output_tokens: response.usage.completion_tokens,
total_tokens: response.usage.total_tokens,
cost: getTotalCost(args[0].model, response.usage.prompt_tokens, response.usage.completion_tokens),
});
} catch (err: unknown) {
if (err instanceof Error) {
error = err.message;
} else {
error = 'An unknown error occurred';
}
status = 'error';
traceInsert(traceId, { error, status });
} finally {
endTimestamp = new Date();
traceInsert(traceId, {
end_timestamp: toDateTimeString(endTimestamp),
latency: (endTimestamp.getTime() - startTimestamp.getTime()) / 1000,
status: status,
});
await pareaLogger.recordLog(traceLog);
}

if (error) {
throw new Error(error);
}
status = 'error';
traceInsert(traceId, { error, status });
} finally {
endTimestamp = new Date();
traceInsert(traceId, {
end_timestamp: toDateTimeString(endTimestamp),
latency: (endTimestamp.getTime() - startTimestamp.getTime()) / 1000,
status: status,
});
await pareaLogger.recordLog(traceLog);
}

if (error) {
throw new Error(error);
}

return response;
});

return response;
},
);
};
}

Expand Down

0 comments on commit d9ae82e

Please sign in to comment.