Skip to content

Commit

Permalink
Merge pull request ai16z#378 from bmgalego/cache-manager
Browse files Browse the repository at this point in the history
feat: Cache Manager
  • Loading branch information
ponderingdemocritus authored Nov 21, 2024
2 parents 3ab32a9 + 0f6877f commit a1519b2
Show file tree
Hide file tree
Showing 32 changed files with 806 additions and 353 deletions.
3 changes: 2 additions & 1 deletion agent/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
!character.ts
.env
*.env
.env*
.env*
/data
75 changes: 58 additions & 17 deletions agent/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import { PostgresDatabaseAdapter } from "@ai16z/adapter-postgres";
import { SqliteDatabaseAdapter } from "@ai16z/adapter-sqlite";
import { DirectClientInterface } from "@ai16z/client-direct";
import { DirectClient, DirectClientInterface } from "@ai16z/client-direct";
import { DiscordClientInterface } from "@ai16z/client-discord";
import { AutoClientInterface } from "@ai16z/client-auto";
import { TelegramClientInterface } from "@ai16z/client-telegram";
import { TwitterClientInterface } from "@ai16z/client-twitter";
import {
DbCacheAdapter,
defaultCharacter,
FsCacheAdapter,
ICacheManager,
IDatabaseCacheAdapter,
stringToUuid,
AgentRuntime,
settings,
CacheManager,
Character,
IAgentRuntime,
ModelProviderName,
elizaLogger,
settings,
IDatabaseAdapter,
} from "@ai16z/eliza";
import { bootstrapPlugin } from "@ai16z/plugin-bootstrap";
import { solanaPlugin } from "@ai16z/plugin-solana";
Expand All @@ -21,7 +28,12 @@ import Database from "better-sqlite3";
import fs from "fs";
import readline from "readline";
import yargs from "yargs";
import { character } from "./character.ts";
import path from "path";
import { fileURLToPath } from "url";
import { character } from "./character";

const __filename = fileURLToPath(import.meta.url); // get the resolved path to the file
const __dirname = path.dirname(__filename); // get the name of the directory

export const wait = (minTime: number = 1000, maxTime: number = 3000) => {
const waitTime =
Expand Down Expand Up @@ -158,13 +170,19 @@ export function getTokenForProvider(
}
}

function initializeDatabase() {
function initializeDatabase(dataDir: string) {
if (process.env.POSTGRES_URL) {
return new PostgresDatabaseAdapter({
const db = new PostgresDatabaseAdapter({
connectionString: process.env.POSTGRES_URL,
});
return db;
} else {
return new SqliteDatabaseAdapter(new Database("./db.sqlite"));
const filePath = path.resolve(
dataDir,
process.env.SQLITE_FILE ?? "db.sqlite"
);
const db = new SqliteDatabaseAdapter(new Database(filePath));
return db;
}
}

Expand Down Expand Up @@ -208,9 +226,10 @@ export async function initializeClients(
return clients;
}

export async function createAgent(
export function createAgent(
character: Character,
db: any,
db: IDatabaseAdapter,
cache: ICacheManager,
token: string
) {
elizaLogger.success(
Expand All @@ -233,29 +252,51 @@ export async function createAgent(
actions: [],
services: [],
managers: [],
cacheManager: cache,
});
}

async function startAgent(character: Character, directClient: any) {
function intializeFsCache(baseDir: string, character: Character) {
const cacheDir = path.resolve(baseDir, character.id, "cache");

const cache = new CacheManager(new FsCacheAdapter(cacheDir));
return cache;
}

function intializeDbCache(character: Character, db: IDatabaseCacheAdapter) {
const cache = new CacheManager(new DbCacheAdapter(db, character.id));
return cache;
}

async function startAgent(character: Character, directClient: DirectClient) {
try {
character.id ??= stringToUuid(character.name);

const token = getTokenForProvider(character.modelProvider, character);
const db = initializeDatabase();
const dataDir = path.join(__dirname, "../data");

const runtime = await createAgent(character, db, token);
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
}

const clients = await initializeClients(
character,
runtime as IAgentRuntime
);
const db = initializeDatabase(dataDir);

await db.init();

const cache = intializeDbCache(character, db);
const runtime = createAgent(character, db, cache, token);

const clients = await initializeClients(character, runtime);

directClient.registerAgent(await runtime);
directClient.registerAgent(runtime);

return clients;
} catch (error) {
console.error(
elizaLogger.error(
`Error starting agent for character ${character.name}:`,
error
);
console.error(error);
throw error;
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/adapter-postgres/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"types": "dist/index.d.ts",
"dependencies": {
"@ai16z/eliza": "workspace:*",
"@types/pg": "^8.11.10",
"pg": "^8.13.1"
},
"devDependencies": {
Expand Down
33 changes: 21 additions & 12 deletions packages/adapter-postgres/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE EXTENSION IF NOT EXISTS fuzzystrmatch;

BEGIN;

CREATE TABLE accounts (
CREATE TABLE IF NOT EXISTS accounts (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"name" TEXT,
Expand All @@ -26,12 +26,12 @@ CREATE TABLE accounts (
"details" JSONB DEFAULT '{}'::jsonb
);

CREATE TABLE rooms (
CREATE TABLE IF NOT EXISTS rooms (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE memories (
CREATE TABLE IF NOT EXISTS memories (
"id" UUID PRIMARY KEY,
"type" TEXT NOT NULL,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
Expand All @@ -46,7 +46,7 @@ CREATE TABLE memories (
CONSTRAINT fk_agent FOREIGN KEY ("agentId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE goals (
CREATE TABLE IF NOT EXISTS goals (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"userId" UUID REFERENCES accounts("id"),
Expand All @@ -59,7 +59,7 @@ CREATE TABLE goals (
CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE logs (
CREATE TABLE IF NOT EXISTS logs (
"id" UUID PRIMARY KEY DEFAULT gen_random_uuid(),
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"userId" UUID NOT NULL REFERENCES accounts("id"),
Expand All @@ -70,7 +70,7 @@ CREATE TABLE logs (
CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE participants (
CREATE TABLE IF NOT EXISTS participants (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"userId" UUID REFERENCES accounts("id"),
Expand All @@ -82,7 +82,7 @@ CREATE TABLE participants (
CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE relationships (
CREATE TABLE IF NOT EXISTS relationships (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"userA" UUID NOT NULL REFERENCES accounts("id"),
Expand All @@ -94,11 +94,20 @@ CREATE TABLE relationships (
CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS cache (
"key" TEXT NOT NULL,
"agentId" TEXT NOT NULL,
"value" JSONB DEFAULT '{}'::jsonb,
"createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"expiresAt" TIMESTAMP,
PRIMARY KEY ("key", "agentId")
);

-- Indexes
CREATE INDEX idx_memories_embedding ON memories USING hnsw ("embedding" vector_cosine_ops);
CREATE INDEX idx_memories_type_room ON memories("type", "roomId");
CREATE INDEX idx_participants_user ON participants("userId");
CREATE INDEX idx_participants_room ON participants("roomId");
CREATE INDEX idx_relationships_users ON relationships("userA", "userB");
CREATE INDEX IF NOT EXISTS idx_memories_embedding ON memories USING hnsw ("embedding" vector_cosine_ops);
CREATE INDEX IF NOT EXISTS idx_memories_type_room ON memories("type", "roomId");
CREATE INDEX IF NOT EXISTS idx_participants_user ON participants("userId");
CREATE INDEX IF NOT EXISTS idx_participants_room ON participants("roomId");
CREATE INDEX IF NOT EXISTS idx_relationships_users ON relationships("userA", "userB");

COMMIT;
96 changes: 89 additions & 7 deletions packages/adapter-postgres/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { v4 } from "uuid";
import pg from "pg";
import pg, { type Pool } from "pg";
import {
Account,
Actor,
Expand All @@ -8,18 +8,27 @@ import {
type Memory,
type Relationship,
type UUID,
type IDatabaseCacheAdapter,
Participant,
DatabaseAdapter,
} from "@ai16z/eliza";
import { DatabaseAdapter } from "@ai16z/eliza";
const { Pool } = pg;
import fs from "fs";
import { fileURLToPath } from "url";
import path from "path";

export class PostgresDatabaseAdapter extends DatabaseAdapter {
private pool: typeof Pool;
const __filename = fileURLToPath(import.meta.url); // get the resolved path to the file
const __dirname = path.dirname(__filename); // get the name of the directory

export class PostgresDatabaseAdapter
extends DatabaseAdapter<Pool>
implements IDatabaseCacheAdapter
{
private pool: Pool;

constructor(connectionConfig: any) {
super();

this.pool = new Pool({
this.pool = new pg.Pool({
...connectionConfig,
max: 20,
idleTimeoutMillis: 30000,
Expand All @@ -29,8 +38,22 @@ export class PostgresDatabaseAdapter extends DatabaseAdapter {
this.pool.on("error", (err) => {
console.error("Unexpected error on idle client", err);
});
}

async init() {
await this.testConnection();

this.testConnection();
try {
const client = await this.pool.connect();
const schema = fs.readFileSync(
path.resolve(__dirname, "../schema.sql"),
"utf8"
);
await client.query(schema);
} catch (error) {
console.error(error);
throw error;
}
}

async testConnection(): Promise<boolean> {
Expand Down Expand Up @@ -820,6 +843,65 @@ export class PostgresDatabaseAdapter extends DatabaseAdapter {
throw new Error("Failed to fetch actor details");
}
}

async getCache(params: {
key: string;
agentId: UUID;
}): Promise<string | undefined> {
const client = await this.pool.connect();
try {
const sql = `SELECT "value"::TEXT FROM cache WHERE "key" = $1 AND "agentId" = $2`;
const { rows } = await this.pool.query<{ value: string }>(sql, [
params.key,
params.agentId,
]);

return rows[0]?.value ?? undefined;
} catch (error) {
console.log("Error fetching cache", error);
} finally {
client.release();
}
}

async setCache(params: {
key: string;
agentId: UUID;
value: string;
}): Promise<boolean> {
const client = await this.pool.connect();
try {
await client.query(
`INSERT INTO cache ("key", "agentId", "value", "createdAt") VALUES ($1, $2, $3, CURRENT_TIMESTAMP)
ON CONFLICT ("key", "agentId")
DO UPDATE SET "value" = EXCLUDED.value, "createdAt" = CURRENT_TIMESTAMP`,
[params.key, params.agentId, params.value]
);
return true;
} catch (error) {
console.log("Error adding cache", error);
} finally {
client.release();
}
}

async deleteCache(params: {
key: string;
agentId: UUID;
}): Promise<boolean> {
const client = await this.pool.connect();
try {
await client.query(
`DELETE FROM cache WHERE "key" = $1 AND "agentId" = $2`,
[params.key, params.agentId]
);
return true;
} catch (error) {
console.log("Error adding cache", error);
} finally {
client.release();
}
}
}

export default PostgresDatabaseAdapter;
Loading

0 comments on commit a1519b2

Please sign in to comment.