Skip to content

Commit

Permalink
judge: add request trace (#934)
Browse files Browse the repository at this point in the history
Co-authored-by: panda <[email protected]>
  • Loading branch information
undefined-moe and pandadtdyy authored Jan 26, 2025
1 parent 74c8b0e commit f3eb6ca
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 47 deletions.
1 change: 1 addition & 0 deletions packages/hydrojudge/src/compile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export default async function compile(
time: lang.compile_time_limit || 10000,
memory: lang.compile_memory_limit || 256 * 1024 * 1024,
},
`compile[${lang.key}]`,
3,
);
// TODO: distinguish user program and checker
Expand Down
2 changes: 1 addition & 1 deletion packages/hydrojudge/src/compiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export async function compilerVersions(langs: Record<string, LangConfig>) {
copyIn: {},
time: 10000,
memory: 256 * 1024 * 1024,
}, 5);
}, 'compilerVersions', 5);
result[lang] = `${res.stdout}\n${res.stderr}`.trim();
}
return result;
Expand Down
1 change: 1 addition & 0 deletions packages/hydrojudge/src/judge/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function judgeCase(c: NormalizedCase) {
addressSpaceLimit: address_space_limit,
processLimit: process_limit,
},
`judgeCase[${c.id}]<${ctx.rid}>`,
);
const {
code, signalled, time, memory, fileIds,
Expand Down
2 changes: 2 additions & 0 deletions packages/hydrojudge/src/judge/generate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export const judge = async (ctx: JudgeTask) => {
time: parseTimeMS('2s'),
memory: parseMemoryMB('256m'),
},
`generate[${i}]<${ctx.rid}>`,
1,
);
const tmp = path.join(tmpdir(), `${ctx.request.rid}.${i}.in`);
Expand Down Expand Up @@ -89,6 +90,7 @@ export const judge = async (ctx: JudgeTask) => {
time: parseTimeMS('2s'),
memory: parseMemoryMB('256m'),
},
`generate.std[${i}]<${ctx.rid}>`,
1,
);
const tmp = path.join(tmpdir(), `${ctx.request.rid}.${i}.out`);
Expand Down
2 changes: 2 additions & 0 deletions packages/hydrojudge/src/judge/hack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export async function judge(ctx: Context) {
time: parseTimeMS(ctx.config.time || '1s'),
memory: parseMemoryMB(ctx.config.memory || '256m'),
},
`hack.validator[${ctx.rid}]`,
);
if (validateResult.status !== STATUS.STATUS_ACCEPTED) {
const message = `${validateResult.stdout || ''}\n${validateResult.stderr || ''}`.trim();
Expand All @@ -42,6 +43,7 @@ export async function judge(ctx: Context) {
addressSpaceLimit: address_space_limit,
processLimit: process_limit,
},
`hack[${ctx.rid}]`,
);
const {
code, signalled, time, memory,
Expand Down
1 change: 1 addition & 0 deletions packages/hydrojudge/src/judge/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const judge = async (ctx: JudgeTask) => {
addressSpaceLimit: address_space_limit,
processLimit: process_limit,
},
`judge[${ctx.rid}]`,
1,
);
const {
Expand Down
57 changes: 27 additions & 30 deletions packages/hydrojudge/src/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ function proc(params: Parameter): Cmd {
};
}

async function adaptResult(result: SandboxResult, params: Parameter): Promise<SandboxAdaptedResult> {
function adaptResult(result: SandboxResult, params: Parameter): SandboxAdaptedResult {
const rate = getConfig('rate') as number;
// FIXME: Signalled?
const ret: SandboxAdaptedResult = {
Expand Down Expand Up @@ -146,13 +146,18 @@ async function adaptResult(result: SandboxResult, params: Parameter): Promise<Sa
}

export async function runPiped(
execute: Parameter[], pipeMapping: Pick<PipeMap, 'in' | 'out' | 'name'>[],
execute: Parameter[], pipeMapping: Pick<PipeMap, 'in' | 'out' | 'name'>[], params: Parameter = {}, trace: string = '',
): Promise<SandboxAdaptedResult[]> {
let res: SandboxResult[];
const size = parseMemoryMB(getConfig('stdio_size'));
try {
if (!supportOptional) {
const { copyOutOptional } = await client.version();
supportOptional = copyOutOptional;
if (!copyOutOptional) logger.warn('Sandbox version tooooooo low! Please upgrade to at least 1.2.0');
}
const body = {
cmd: execute.map((exe) => proc(exe)),
cmd: execute.map((exe) => proc({ ...exe, ...params })),
pipeMapping: pipeMapping.map((pipe) => ({
proxy: true,
max: 1024 * 1024 * size,
Expand All @@ -165,14 +170,14 @@ export async function runPiped(
}
const id = callId++;
if (argv.options.showSandbox) logger.debug('%d %s', id, JSON.stringify(body));
res = await client.run(body);
res = await client.run(body, trace);
if (argv.options.showSandbox) logger.debug('%d %s', id, JSON.stringify(res));
} catch (e) {
if (e instanceof FormatError || e instanceof SystemError) throw e;
console.error(e);
throw new SystemError('Sandbox Error', [e]);
}
return await Promise.all(res.map((r) => adaptResult(r, {}))) as SandboxAdaptedResult[];
return res.map((r) => adaptResult(r, params)) as SandboxAdaptedResult[];
}

export async function del(fileId: string) {
Expand All @@ -183,33 +188,25 @@ export async function get(fileId: string) {
return await client.getFile(fileId);
}

export async function run(execute: string, params?: Parameter): Promise<SandboxAdaptedResult> {
let result: SandboxResult;
try {
if (!supportOptional) {
const res = await client.version();
supportOptional = res.copyOutOptional;
if (!supportOptional) logger.warn('Sandbox version tooooooo low! Please upgrade to at least 1.2.0');
}
const body = { cmd: [proc({ execute, ...params })] };
const id = callId++;
if (argv.options.showSandbox) logger.debug('%d %s', id, JSON.stringify(body));
const res = await client.run(body);
if (argv.options.showSandbox) logger.debug('%d %s', id, JSON.stringify(res));
[result] = res;
} catch (e) {
if (e instanceof FormatError || e instanceof SystemError) throw e;
console.error(e);
// FIXME request body larger than maxBodyLength limit
throw new SystemError('Sandbox Error', e.message);
}
return await adaptResult(result, params);
}

const queue = new PQueue({ concurrency: getConfig('concurrency') || getConfig('parallelism') });

export function runQueued(execute: string, params?: Parameter, priority = 0) {
return queue.add(() => run(execute, params), { priority }) as Promise<SandboxAdaptedResult>;
export function runQueued(
execute: Parameter[], pipeMapping: Pick<PipeMap, 'in' | 'out' | 'name'>[],
params: Parameter, trace?: string, priority?: number,
): Promise<SandboxAdaptedResult[]>;
export function runQueued(execute: string, params: Parameter, trace?: string, priority?: number): Promise<SandboxAdaptedResult>;
export function runQueued(
arg0: string | Parameter[], arg1: Pick<PipeMap, 'in' | 'out' | 'name'>[] | Parameter,
arg2?: string | Parameter, arg3?: string | number, arg4?: number,
) {
const single = !Array.isArray(arg0);
const [execute, pipeMapping, params, trace, priority] = single
? [[{ execute: arg0 }], [], arg1 || {}, arg2 || '', arg3 || 0] as any
: [arg0, arg1, arg2 || {}, arg3 || '', arg4 || 0];
return queue.add(async () => {
const res = await runPiped(execute, pipeMapping, params, trace);
return single ? res[0] : res;
}, { priority });
}

export async function versionCheck(reportWarn: (str: string) => void, reportError = reportWarn) {
Expand Down
42 changes: 27 additions & 15 deletions packages/hydrojudge/src/sandbox/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@ import fs from 'fs';
import superagent from 'superagent';
import WebSocket from 'ws';
import { pipeRequest } from '@hydrooj/utils';
import { version } from '../../package.json';
import { getConfig } from '../config';
import {
Input, Output, Resize, SandboxRequest, SandboxResult, SandboxVersion,
} from './interface';

let url;
let url: string;
const UA = `HydroJudge/${version} (${Math.random().toString(36).substring(2, 8)})`;

export class Stream extends EventEmitter {
private ws: WebSocket;

constructor(httpUrl: string, req: SandboxRequest) {
super();
const wsUrl = `${httpUrl.replace('http', 'ws')}/stream`;
this.ws = new WebSocket(wsUrl);
this.ws = new WebSocket(wsUrl, { headers: { 'User-Agent': UA } });
this.ws.onmessage = (e) => {
const data: Buffer = e.data as Buffer;
switch (data[0]) {
Expand Down Expand Up @@ -81,28 +83,38 @@ export class Stream extends EventEmitter {
}
}

function _call(method: 'post' | 'get' | 'delete', endpoint: string, trace?: string) {
return superagent[method](`${url}/${endpoint}`).set('User-Agent', trace ? `${UA} (${trace})` : UA);
}
const client = new Proxy({
run(req: SandboxRequest): Promise<SandboxResult[]> {
return superagent.post(`${url}/run`).send(req).then((res) => res.body);
async run(req: SandboxRequest, trace?: string): Promise<SandboxResult[]> {
const res = await _call('post', 'run', trace).send(req);
return res.body;
},
getFile(fileId: string, dest?: string): Promise<Buffer> {
async getFile(fileId: string, dest?: string): Promise<Buffer> {
const req = _call('get', `file/${fileId}`);
if (dest) {
const w = fs.createWriteStream(dest);
return pipeRequest(superagent.get(`${url}/file/${fileId}`), w, 60000, fileId) as any;
return await pipeRequest(req, w, 60000, fileId) as any;
}
return superagent.get(`${url}/file/${fileId}`).responseType('arraybuffer').then((res) => res.body);
const res = await req.responseType('arraybuffer');
return res.body;
},
deleteFile(fileId: string): Promise<void> {
return superagent.delete(`${url}/file/${fileId}`).then((res) => res.body);
async deleteFile(fileId: string): Promise<void> {
const res = await _call('delete', `file/${fileId}`);
return res.body;
},
listFiles(): Promise<Record<string, string>> {
return superagent.get(`${url}/file`).then((res) => res.body);
async listFiles(): Promise<Record<string, string>> {
const res = await _call('get', 'file');
return res.body;
},
version(): Promise<SandboxVersion> {
return superagent.get(`${url}/version`).then((res) => res.body);
async version(): Promise<SandboxVersion> {
const res = await _call('get', 'version');
return res.body;
},
config(): Promise<Record<string, any>> {
return superagent.get(`${url}/config`).then((res) => res.body);
async config(): Promise<Record<string, any>> {
const res = await _call('get', 'config');
return res.body;
},
stream(req: SandboxRequest): Stream {
return new Stream(url, req);
Expand Down
2 changes: 1 addition & 1 deletion packages/hydrojudge/src/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ export class JudgeTask {
env: this.env,
time: 5000,
memory: 256,
});
}, `analysis[${this.lang}]<${this.rid}>`, 5);
const out = r.stdout.toString();
if (out.length) this.next({ compilerText: out.substring(0, 1024) });
if (process.env.DEV) console.log(r);
Expand Down

0 comments on commit f3eb6ca

Please sign in to comment.