Skip to content

Fix conflicts for PR 78 #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: gv/69
Choose a base branch
from
10 changes: 5 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

151 changes: 117 additions & 34 deletions src/registry/garbage-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// Unreferenced will delete all blobs that are not referenced by any manifest.
// Untagged will delete all blobs that are not referenced by any manifest and are not tagged.

import { ServerError } from "../errors";
import { ManifestSchema } from "../manifest";
import { isReference } from "./r2";
import { hexToDigest } from "../user";
import {symlinkHeader, isReference} from "./r2";

export type GarbageCollectionMode = "unreferenced" | "untagged";
export type GCOptions = {
Expand Down Expand Up @@ -148,7 +148,7 @@ export class GarbageCollector {
}

private async list(prefix: string, callback: (object: R2Object) => Promise<boolean>): Promise<boolean> {
const listed = await this.registry.list({ prefix });
const listed = await this.registry.list({ prefix: prefix, include: ["customMetadata"] });
for (const object of listed.objects) {
if ((await callback(object)) === false) {
return false;
Expand Down Expand Up @@ -183,67 +183,150 @@ export class GarbageCollector {

private async collectInner(options: GCOptions): Promise<boolean> {
// We can run out of memory, this should be a bloom filter
let referencedBlobs = new Set<string>();
const manifestList: { [key: string]: Set<string> } = {};
const mark = await this.getInsertionMark(options.name);

// List manifest from repo to be scanned
await this.list(`${options.name}/manifests/`, async (manifestObject) => {
const tag = manifestObject.key.split("/").pop();
if (!tag || (options.mode === "untagged" && tag.startsWith("sha256:"))) {
return true;
const currentHashFile = hexToDigest(manifestObject.checksums.sha256!);
if (manifestList[currentHashFile] === undefined) {
manifestList[currentHashFile] = new Set<string>();
}
const manifest = await this.registry.get(manifestObject.key);
if (!manifest) {
return true;
manifestList[currentHashFile].add(manifestObject.key);
return true;
});

// In untagged mode, search for manifest to delete
if (options.mode === "untagged") {
const manifestToRemove = new Set<string>();
const referencedManifests = new Set<string>();
// List tagged manifest to find manifest-list
for (const [_, manifests] of Object.entries(manifestList)) {
const taggedManifest = [...manifests].filter((item) => !item.split("/").pop()?.startsWith("sha256:"));
for (const manifestPath of taggedManifest) {
// Tagged manifest some, load manifest content
const manifest = await this.registry.get(manifestPath);
if (!manifest) {
continue;
}

const manifestData = (await manifest.json()) as ManifestSchema;
// Search for manifest list
if (manifestData.schemaVersion == 2 && "manifests" in manifestData) {
// Extract referenced manifests from manifest list
manifestData.manifests.forEach((manifest) => {
referencedManifests.add(manifest.digest);
});
}
}
}

const manifestData = (await manifest.json()) as ManifestSchema;
// TODO: garbage collect manifests.
if ("manifests" in manifestData) {
return true;
for (const [key, manifests] of Object.entries(manifestList)) {
if (referencedManifests.has(key)) {
continue;
}
if (![...manifests].some((item) => !item.split("/").pop()?.startsWith("sha256:"))) {
// Add untagged manifest that should be removed
manifests.forEach((manifest) => {
manifestToRemove.add(manifest);
});
// Manifest to be removed shouldn't be parsed to search for referenced layers
delete manifestList[key];
}
}

// Deleting untagged manifest
if (manifestToRemove.size > 0) {
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
throw new Error("there is a manifest insertion going, the garbage collection shall stop");
}

// GC will deleted untagged manifest
await this.registry.delete(manifestToRemove.values().toArray());
}
}

const referencedBlobs = new Set<string>();
// From manifest, extract referenced layers
for (const [_, manifests] of Object.entries(manifestList)) {
// Select only one manifest per unique manifest
const manifestPath = manifests.values().next().value;
if (manifestPath === undefined) {
continue;
}
const manifest = await this.registry.get(manifestPath);
// Skip if manifest not found
if (!manifest) continue;

const manifestData = (await manifest.json()) as ManifestSchema;

if (manifestData.schemaVersion === 1) {
manifestData.fsLayers.forEach((layer) => {
referencedBlobs.add(layer.blobSum);
});
} else {
// Skip manifest-list, they don't contain any layers references
if ("manifests" in manifestData) continue;
// Add referenced layers from current manifest
manifestData.layers.forEach((layer) => {
referencedBlobs.add(layer.digest);
});
// Add referenced config blob from current manifest
referencedBlobs.add(manifestData.config.digest);
}
}

return true;
});

let unreferencedKeys: string[] = [];
const deleteThreshold = 15;
const blobsReferences: {[id: string] : string} = {}
const unreferencedBlobs = new Set<string>();
// List blobs to be removed
await this.list(`${options.name}/blobs/`, async (object) => {
const hash = object.key.split("/").pop();
if (hash && !referencedBlobs.has(hash)) {
const blobHash = object.key.split("/").pop();
if (blobHash && !referencedBlobs.has(blobHash)) {
const key = isReference(object);
// also push the underlying reference object
if (key) {
unreferencedKeys.push(key);
}

unreferencedKeys.push(object.key);
if (unreferencedKeys.length > deleteThreshold) {
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
throw new ServerError("there is a manifest insertion going, the garbage collection shall stop");
}

await this.registry.delete(unreferencedKeys);
unreferencedKeys = [];
blobsReferences[object.key] = key;
}
unreferencedBlobs.add(object.key);
}
return true;
});
if (unreferencedKeys.length > 0) {

// Check for symlink before removal
if (unreferencedBlobs.size >= 0) {
await this.list("", async (object) => {
const objectPath = object.key;
// Skip non-blobs object and from any other repository (symlink only target cross repository blobs)
if (objectPath.startsWith(`${options.name}/`) || !objectPath.includes("/blobs/sha256:")) {
return true;
}
if (object.customMetadata && object.customMetadata[symlinkHeader] !== undefined) {
// Check if the symlink target the current GC repository
if (object.customMetadata[symlinkHeader] !== options.name) return true;
// Get symlink blob to retrieve its target
const symlinkBlob = await this.registry.get(object.key);
// Skip if symlinkBlob not found
if (!symlinkBlob) return true;
// Get the path of the target blob from the symlink blob
const targetBlobPath = await symlinkBlob.text();
if (unreferencedBlobs.has(targetBlobPath)) {
// This symlink target a layer that should be removed
unreferencedBlobs.delete(targetBlobPath);
delete blobsReferences[targetBlobPath];
}
}
return unreferencedBlobs.size > 0;
});
}

if (unreferencedBlobs.size > 0) {
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
throw new Error("there is a manifest insertion going, the garbage collection shall stop");
}

await this.registry.delete(unreferencedKeys);
// GC will delete unreferenced blobs
await this.registry.delete(unreferencedBlobs.values().toArray());
await this.registry.delete(Object.values(blobsReferences));
}

return true;
Expand Down
8 changes: 8 additions & 0 deletions src/registry/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,14 @@ export class RegistryHTTPClient implements Registry {
}
}

mountExistingLayer(
_sourceName: string,
_digest: string,
_destinationName: string,
): Promise<RegistryError | FinishedUploadObject> {
throw new Error("unimplemented");
}

putManifest(
_namespace: string,
_reference: string,
Expand Down
74 changes: 67 additions & 7 deletions src/registry/r2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export async function encodeState(state: State, env: Env): Promise<{ jwt: string
return { jwt: jwtSignature, hash: await getSHA256(jwtSignature, "") };
}

export const symlinkHeader = "X-Serverless-Registry-Symlink";
export const referenceHeader = "X-Serverless-Registry-Reference";
export const digestHeaderInReference = "X-Serverless-Registry-Digest";
export const registryUploadKey = "X-Serverless-Registry-Upload";
Expand All @@ -161,7 +162,7 @@ export async function getUploadState(
throw new InternalError();
}

if (!verifyHash && stateStrHash !== verifyHash) {
if (verifyHash !== undefined && stateStrHash !== verifyHash) {
return new RangeError(stateStrHash, stateObject);
}

Expand Down Expand Up @@ -200,14 +201,12 @@ export class R2Registry implements Registry {
return { response: new ServerError("invalid checksum from R2 backend") };
}

const checkManifestResponse = {
return {
exists: true,
digest: hexToDigest(res.checksums.sha256!),
contentType: res.httpMetadata!.contentType!,
size: res.size,
};

return checkManifestResponse;
}

async listRepositories(limit?: number, last?: string): Promise<RegistryError | ListRepositoriesResponse> {
Expand Down Expand Up @@ -432,6 +431,52 @@ export class R2Registry implements Registry {
};
}

async mountExistingLayer(
sourceName: string,
digest: string,
destinationName: string,
): Promise<RegistryError | FinishedUploadObject> {
const sourceLayerPath = `${sourceName}/blobs/${digest}`;
const [res, err] = await wrap(this.env.REGISTRY.head(sourceLayerPath));
if (err) {
return wrapError("mountExistingLayer", err);
}
if (!res) {
return wrapError("mountExistingLayer", "Layer not found");
} else {
const destinationLayerPath = `${destinationName}/blobs/${digest}`;
if (sourceLayerPath === destinationLayerPath) {
// Bad request
throw new InternalError();
}
// Prevent recursive symlink
if (res.customMetadata && symlinkHeader in res.customMetadata) {
return await this.mountExistingLayer(res.customMetadata[symlinkHeader], digest, destinationName);
}
// Trying to mount a layer from sourceLayerPath to destinationLayerPath

// Create linked file with custom metadata
const [newFile, error] = await wrap(
this.env.REGISTRY.put(destinationLayerPath, sourceLayerPath, {
sha256: await getSHA256(sourceLayerPath, ""),
httpMetadata: res.httpMetadata,
customMetadata: { [symlinkHeader]: sourceName }, // Storing target repository name in metadata (to easily resolve recursive layer mounting)
}),
);
if (error) {
return wrapError("mountExistingLayer", error);
}
if (newFile && "response" in newFile) {
return wrapError("mountExistingLayer", newFile.response);
}

return {
digest: hexToDigest(res.checksums.sha256!),
location: `/v2/${destinationLayerPath}`,
};
}
}

async layerExists(name: string, tag: string): Promise<RegistryError | CheckLayerResponse> {
const [res, err] = await wrap(this.env.REGISTRY.head(`${name}/blobs/${tag}`));
if (err) {
Expand Down Expand Up @@ -502,6 +547,19 @@ export class R2Registry implements Registry {
}
}

// Handle R2 symlink
if (res.customMetadata && symlinkHeader in res.customMetadata) {
const layerPath = await res.text();
// Symlink detected! Will download layer from "layerPath"
const [linkName, linkDigest] = layerPath.split("/blobs/");
if (linkName == name && linkDigest == digest) {
return {
response: new Response(JSON.stringify(BlobUnknownError), { status: 404 }),
};
}
return await this.env.REGISTRY_CLIENT.getLayer(linkName, linkDigest);
}

return {
stream: res.body!,
digest,
Expand Down Expand Up @@ -896,12 +954,15 @@ export class R2Registry implements Registry {
return { response: new InternalError() };
}
if (hashedState === null || !hashedState.state) {
return { response: new InternalError() };
return {
response: new Response(null, { status: 404 }),
}
}
const state = hashedState.state;

const upload = this.env.REGISTRY.resumeMultipartUpload(state.registryUploadId, state.uploadId);
await upload.abort();
await this.env.REGISTRY.delete(getRegistryUploadsPath(state));
return true;
}

Expand Down Expand Up @@ -931,7 +992,6 @@ export class R2Registry implements Registry {
}

async garbageCollection(namespace: string, mode: GarbageCollectionMode): Promise<boolean> {
const result = await this.gc.collect({ name: namespace, mode: mode });
return result;
return await this.gc.collect({ name: namespace, mode: mode });
}
}
Loading