(Init): Added shit

This commit is contained in:
2026-05-28 23:46:40 +00:00
parent a5250706cb
commit 8410600c63
46 changed files with 3898 additions and 228 deletions

5
crates/core/deno.json Normal file
View File

@@ -0,0 +1,5 @@
{
"name": "@elly/core",
"version": "0.1.0",
"exports": "./src/main.ts"
}

View File

@@ -0,0 +1,148 @@
/**
* DI composition root for `@elly/core`.
*
* Owns the lifecycle of every infrastructure dependency in the right order:
*
* build(): open DB → run migrations → open Kv → build bus → build HTTP server
* shutdown(): http server → kv → db
*
* Every dependency is created exactly once and passed by reference to the
* consumers that need it. Nothing in this crate may reach for a module-level
* singleton; if you need a dependency, ask for it through the container.
*/
import type { Config, CoreEnv, Logger } from "@elly/shared";
import { openDatabase, type DbConnection } from "./infrastructure/db/connection.ts";
import { Migrator } from "./infrastructure/db/migrator.ts";
import { openKv, type KvHandle } from "./infrastructure/kv/store.ts";
import { CooldownStore } from "./infrastructure/kv/cooldown.ts";
import { InteractionStateStore } from "./infrastructure/kv/interactionState.ts";
import { CacheStore } from "./infrastructure/kv/cache.ts";
import { DomainEventBus } from "./infrastructure/pubsub/bus.ts";
import { createHttpServer, type HttpServer } from "./infrastructure/http/server.ts";
import { buildSystemRoutes } from "./infrastructure/http/routes/system.ts";
export interface CoreContainer {
readonly config: Config;
readonly env: CoreEnv;
readonly logger: Logger;
readonly version: string;
readonly startedAt: number;
readonly db: DbConnection;
readonly kv: KvHandle;
readonly cooldowns: CooldownStore;
readonly interactionState: InteractionStateStore;
readonly cache: CacheStore;
readonly bus: DomainEventBus;
readonly http: HttpServer;
/** Tear down every dependency in reverse-creation order. */
shutdown(): Promise<void>;
}
export interface BuildContainerOptions {
readonly config: Config;
readonly env: CoreEnv;
readonly logger: Logger;
readonly version: string;
}
export async function buildContainer(options: BuildContainerOptions): Promise<CoreContainer> {
const log = options.logger.child({ component: "container" });
const startedAt = Date.now();
log.debug("building core container");
// ---- Database -----------------------------------------------------
const db = await openDatabase({
path: options.config.database.path,
logger: options.logger,
});
const migrator = new Migrator({ db: db.db, logger: options.logger });
await migrator.migrateToLatest();
// ---- Kv + helpers -------------------------------------------------
const kv = await openKv({
path: options.config.kv.path,
logger: options.logger,
});
const cooldowns = new CooldownStore(kv);
const interactionState = new InteractionStateStore(kv);
const cache = new CacheStore(kv);
// ---- Event bus ----------------------------------------------------
const bus = new DomainEventBus(options.logger);
// ---- HTTP server --------------------------------------------------
const routes = buildSystemRoutes({
version: options.version,
startedAt,
db,
kv,
bus,
});
const http = createHttpServer({
host: options.config.ipc.host,
port: options.config.ipc.port,
ipcToken: options.env.IPC_TOKEN,
logger: options.logger,
bus,
routes,
});
log.info("core container built", {
db: options.config.database.path,
kv: options.config.kv.path,
ipc: `${options.config.ipc.host}:${options.config.ipc.port}`,
});
return {
config: options.config,
env: options.env,
logger: options.logger,
version: options.version,
startedAt,
db,
kv,
cooldowns,
interactionState,
cache,
bus,
http,
async shutdown(): Promise<void> {
const errors: unknown[] = [];
const tryClose = async (label: string, fn: () => Promise<void>) => {
try {
await fn();
} catch (err) {
log.error("shutdown step failed", {
step: label,
err: err instanceof Error ? err : new Error(String(err)),
});
errors.push(err);
}
};
// Reverse build order: stop accepting traffic first, then close DBs.
await tryClose("http", () => http.shutdown());
await tryClose("kv", () => kv.close());
await tryClose("db", () => db.close());
if (errors.length > 0) {
log.warn("core container shutdown completed with errors", {
errorCount: errors.length,
});
} else {
log.info("core container shutdown clean");
}
},
};
}

View File

@@ -0,0 +1,128 @@
/**
* SQLite connection bootstrap.
*
* Opens (or creates) the on-disk SQLite database, applies our production
* PRAGMAs, and wraps the connection in a fully-typed `Kysely<Database>`.
*
* Pragmas:
* - `journal_mode=WAL` — readers don't block writers; required for the
* IPC server + cron jobs running concurrently.
* - `foreign_keys=ON` — SQLite defaults to OFF; we want referential
* integrity for child tables added in Phase 4.
* - `busy_timeout=5000` — block writers up to 5s if WAL is being checkpointed.
* - `synchronous=NORMAL` — safe with WAL, much faster than FULL.
*/
import { Database as SqliteDatabase } from "@db/sqlite";
import { ensureDir } from "@std/fs";
import { dirname } from "@std/path";
import { Kysely, sql } from "kysely";
import type { Logger } from "@elly/shared";
import { DenoSqliteDialect } from "./kysely-dialect.ts";
import type { Database } from "./schema.ts";
export interface DbConnection {
/** Type-safe Kysely query builder bound to the schema. */
readonly db: Kysely<Database>;
/** Underlying FFI handle — exposed for raw maintenance ops (vacuum, backup). */
readonly raw: SqliteDatabase;
/** Absolute path the DB was opened from. */
readonly path: string;
/** Close the underlying connection and tear down Kysely. */
close(): Promise<void>;
/** Run `VACUUM` to compact the on-disk file. */
vacuum(): Promise<void>;
/** Copy the live database file to `targetPath` (safe under WAL). */
backup(targetPath: string): Promise<void>;
/** Return useful runtime stats (path, byte size, table count). */
getStats(): Promise<DbStats>;
}
export interface DbStats {
readonly path: string;
readonly sizeBytes: number;
readonly tables: ReadonlyArray<string>;
}
export interface OpenDbOptions {
readonly path: string;
readonly logger: Logger;
}
/**
* Open the SQLite file, apply pragmas, return the connection wrapper.
*
* Idempotent across crash/restart: WAL files left behind are recovered
* automatically by SQLite on the next open.
*/
export async function openDatabase(options: OpenDbOptions): Promise<DbConnection> {
const log = options.logger.child({ component: "db" });
await ensureDir(dirname(options.path));
log.debug("opening sqlite database", { path: options.path });
const raw = new SqliteDatabase(options.path);
// Apply pragmas. `db.exec()` is sync; failures throw synchronously.
raw.exec("PRAGMA journal_mode = WAL");
raw.exec("PRAGMA foreign_keys = ON");
raw.exec("PRAGMA synchronous = NORMAL");
raw.exec("PRAGMA busy_timeout = 5000");
const db = new Kysely<Database>({ dialect: new DenoSqliteDialect(raw) });
log.info("sqlite database opened", { path: options.path });
return {
db,
raw,
path: options.path,
async close(): Promise<void> {
log.debug("closing sqlite database");
await db.destroy();
// Kysely's destroy() calls driver.destroy(), which closes `raw`.
},
async vacuum(): Promise<void> {
log.debug("vacuuming sqlite database");
await sql`VACUUM`.execute(db);
},
async backup(targetPath: string): Promise<void> {
log.info("backing up sqlite database", { target: targetPath });
await ensureDir(dirname(targetPath));
// `VACUUM INTO` is the SQLite-recommended online-safe backup method.
await sql`VACUUM INTO ${sql.lit(targetPath)}`.execute(db);
},
getStats(): Promise<DbStats> {
return collectStats(db, options.path);
},
};
}
async function collectStats(
db: Kysely<Database>,
path: string,
): Promise<DbStats> {
let sizeBytes = 0;
try {
const stat = await Deno.stat(path);
sizeBytes = stat.size;
} catch {
sizeBytes = 0;
}
const result = await sql<{ name: string }>`
SELECT name FROM sqlite_master
WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
`.execute(db);
return {
path,
sizeBytes,
tables: result.rows.map((r) => r.name),
};
}

View File

@@ -0,0 +1,144 @@
/**
* Custom Kysely dialect that drives `jsr:@db/sqlite` (native FFI SQLite).
*
* Kysely's bundled `SqliteDialect` depends on `better-sqlite3`, which is
* Node-only. This thin adapter wraps Deno's FFI binding so we keep query
* builder, types, and migrations entirely in the Kysely ecosystem without
* pulling in a Node compatibility shim.
*
* Notes:
* - All operations are sync at the FFI layer; we wrap in `Promise.resolve`
* to satisfy Kysely's async `DatabaseConnection` contract.
* - Read-vs-write dispatch uses the compiled query's AST kind, falling
* back to a SQL prefix check for raw queries.
* - Streaming is not supported (FFI driver returns full result sets).
*/
import {
CompiledQuery,
type DatabaseConnection,
type DatabaseIntrospector,
type Dialect,
type DialectAdapter,
type Driver,
type Kysely,
type QueryCompiler,
type QueryResult,
SqliteAdapter,
SqliteIntrospector,
SqliteQueryCompiler,
} from "kysely";
import type { BindValue, Database as SqliteDatabase } from "@db/sqlite";
export class DenoSqliteDialect implements Dialect {
constructor(private readonly db: SqliteDatabase) {}
createAdapter(): DialectAdapter {
return new SqliteAdapter();
}
createDriver(): Driver {
return new DenoSqliteDriver(this.db);
}
createIntrospector(db: Kysely<unknown>): DatabaseIntrospector {
return new SqliteIntrospector(db);
}
createQueryCompiler(): QueryCompiler {
return new SqliteQueryCompiler();
}
}
class DenoSqliteDriver implements Driver {
constructor(private readonly db: SqliteDatabase) {}
init(): Promise<void> {
return Promise.resolve();
}
acquireConnection(): Promise<DatabaseConnection> {
return Promise.resolve(new DenoSqliteConnection(this.db));
}
async beginTransaction(conn: DatabaseConnection): Promise<void> {
await conn.executeQuery(CompiledQuery.raw("BEGIN"));
}
async commitTransaction(conn: DatabaseConnection): Promise<void> {
await conn.executeQuery(CompiledQuery.raw("COMMIT"));
}
async rollbackTransaction(conn: DatabaseConnection): Promise<void> {
await conn.executeQuery(CompiledQuery.raw("ROLLBACK"));
}
releaseConnection(): Promise<void> {
// FFI driver shares one connection — nothing to release per-statement.
return Promise.resolve();
}
destroy(): Promise<void> {
try {
this.db.close();
} catch {
// Closing a never-opened or already-closed DB is a no-op for us.
}
return Promise.resolve();
}
}
class DenoSqliteConnection implements DatabaseConnection {
constructor(private readonly db: SqliteDatabase) {}
executeQuery<R>(query: CompiledQuery): Promise<QueryResult<R>> {
// Kysely produces parameter values that are already SQLite-bindable
// (numbers, strings, BigInts, booleans, null, Uint8Array). The cast
// crosses the `unknown` -> `BindValue` boundary without inspecting each
// element; misuse would surface as a runtime SQLite error.
const params = query.parameters as readonly BindValue[];
const stmt = this.db.prepare(query.sql);
try {
if (isReadQuery(query)) {
const rows = stmt.all(...params) as R[];
return Promise.resolve({ rows });
}
const changes = stmt.run(...params);
// `@db/sqlite` exposes the most recent insert rowid on the Database
// instance after a successful `.run()`. Only surface it when SQLite
// actually assigned a row (rowid > 0).
const lastId = this.db.lastInsertRowId;
return Promise.resolve({
rows: [],
numAffectedRows: BigInt(changes),
insertId: lastId !== undefined && Number(lastId) > 0 ? BigInt(lastId) : undefined,
});
} finally {
stmt.finalize();
}
}
// deno-lint-ignore require-yield
async *streamQuery<R>(_query: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
throw new Error("Streaming is not supported by DenoSqliteDialect");
}
}
function isReadQuery(query: CompiledQuery): boolean {
const kind = (query.query as { kind?: string }).kind;
switch (kind) {
case "SelectQueryNode":
case "WithNode":
return true;
}
// RETURNING clauses on INSERT/UPDATE/DELETE produce rows. Kysely's
// `RootOperationNode` is a tagged union with no string index signature,
// so we route through `unknown` before the structural property probe.
const node = query.query as unknown as Record<string, unknown>;
if ("returning" in node && node.returning != null) return true;
// Raw/unknown — fall back to a prefix check.
return /^\s*(SELECT|WITH|PRAGMA|EXPLAIN)\b/i.test(query.sql);
}

View File

@@ -0,0 +1,26 @@
/**
* Migration 0001 — initial schema bookkeeping.
*
* Creates the `schema_migrations` table used by the migrator to track which
* versions have been applied. Domain tables are introduced in subsequent
* migrations once Phase 4 starts adding feature schemas.
*/
import type { Kysely } from "kysely";
export const version = 1;
export const name = "0001_initial";
export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable("schema_migrations")
.ifNotExists()
.addColumn("version", "integer", (col) => col.primaryKey())
.addColumn("name", "text", (col) => col.notNull())
.addColumn("applied_at", "text", (col) => col.notNull())
.execute();
}
export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable("schema_migrations").ifExists().execute();
}

View File

@@ -0,0 +1,27 @@
/**
* Ordered registry of all migrations.
*
* Each migration module exports `version`, `name`, `up(db)`, and `down(db)`.
* The `Migrator` applies any version > the highest one recorded in the
* `schema_migrations` table, in ascending order.
*
* New migrations are added by:
* 1. Creating `NNNN_description.ts` in this directory.
* 2. Importing it here and appending to the `MIGRATIONS` array.
* 3. Updating `Database` in `../schema.ts` to reflect the new columns.
*/
import type { Kysely } from "kysely";
export interface Migration {
readonly version: number;
readonly name: string;
up(db: Kysely<unknown>): Promise<void>;
down(db: Kysely<unknown>): Promise<void>;
}
import * as m0001 from "./0001_initial.ts";
export const MIGRATIONS: ReadonlyArray<Migration> = [
m0001,
] as const;

View File

@@ -0,0 +1,141 @@
/**
* Forward-only migration runner.
*
* Reads `MIGRATIONS` (ordered by version), determines the current schema
* version from `schema_migrations`, and applies any newer migrations in a
* single transaction each. Failure inside an `up()` rolls that one back —
* earlier migrations remain applied.
*
* Phase 2 ships only one migration (bookkeeping). The migrator is built now
* so Phase 4 can add domain-table migrations without changing infrastructure.
*/
import type { Kysely } from "kysely";
import { sql } from "kysely";
import type { Logger } from "@elly/shared";
import type { Database } from "./schema.ts";
import { type Migration, MIGRATIONS } from "./migrations/index.ts";
export interface MigratorOptions {
readonly db: Kysely<Database>;
readonly logger: Logger;
/** Override the migration list — useful for ad-hoc tooling. */
readonly migrations?: ReadonlyArray<Migration>;
}
export interface MigratorRunSummary {
readonly applied: ReadonlyArray<{ version: number; name: string }>;
readonly skipped: ReadonlyArray<{ version: number; name: string }>;
readonly finalVersion: number;
}
export class Migrator {
private readonly db: Kysely<Database>;
private readonly log: Logger;
private readonly migrations: ReadonlyArray<Migration>;
constructor(options: MigratorOptions) {
this.db = options.db;
this.log = options.logger.child({ component: "migrator" });
this.migrations = (options.migrations ?? MIGRATIONS)
.slice()
.sort((a, b) => a.version - b.version);
this.assertVersionsUnique();
}
/** Apply every migration newer than the recorded version. */
async migrateToLatest(): Promise<MigratorRunSummary> {
await this.ensureBootstrapTable();
const currentVersion = await this.currentVersion();
this.log.debug("migration baseline", { currentVersion });
const applied: Array<{ version: number; name: string }> = [];
const skipped: Array<{ version: number; name: string }> = [];
for (const migration of this.migrations) {
if (migration.version <= currentVersion) {
skipped.push({ version: migration.version, name: migration.name });
continue;
}
await this.apply(migration);
applied.push({ version: migration.version, name: migration.name });
}
const finalVersion = await this.currentVersion();
if (applied.length > 0) {
this.log.info("migrations applied", {
count: applied.length,
finalVersion,
names: applied.map((a) => a.name),
});
} else {
this.log.debug("schema up to date", { finalVersion });
}
return { applied, skipped, finalVersion };
}
/** Return the highest migration version recorded, or 0 if none. */
async currentVersion(): Promise<number> {
const result = await sql<{ v: number | null }>`
SELECT MAX(version) AS v FROM schema_migrations
`.execute(this.db);
const row = result.rows[0];
return row?.v ?? 0;
}
private async ensureBootstrapTable(): Promise<void> {
// The very first migration creates `schema_migrations`, but we need
// the table to exist BEFORE we read from it. Create it idempotently here.
await this.db.schema
.createTable("schema_migrations")
.ifNotExists()
.addColumn("version", "integer", (col) => col.primaryKey())
.addColumn("name", "text", (col) => col.notNull())
.addColumn("applied_at", "text", (col) => col.notNull())
.execute();
}
private async apply(migration: Migration): Promise<void> {
const startedAt = performance.now();
this.log.info("applying migration", {
version: migration.version,
name: migration.name,
});
await this.db.transaction().execute(async (trx) => {
// Migrations operate on the bare DDL surface; `Kysely<unknown>` is
// the standard variance escape hatch since at apply-time the
// typed `Database` may not yet reflect the schema being created.
await migration.up(trx as unknown as Kysely<unknown>);
await trx
.insertInto("schema_migrations")
.values({
version: migration.version,
name: migration.name,
applied_at: new Date().toISOString(),
})
.execute();
});
const durationMs = Number((performance.now() - startedAt).toFixed(2));
this.log.info("migration applied", {
version: migration.version,
name: migration.name,
durationMs,
});
}
private assertVersionsUnique(): void {
const seen = new Set<number>();
for (const m of this.migrations) {
if (seen.has(m.version)) {
throw new Error(
`Duplicate migration version ${m.version}; check crates/core/src/infrastructure/db/migrations/index.ts`,
);
}
seen.add(m.version);
}
}
}

View File

@@ -0,0 +1,45 @@
/**
* Kysely `Database` schema — the canonical TypeScript description of every
* table the core crate persists into SQLite.
*
* Phase 2 only declares the bookkeeping table required by the migrator
* (`schema_migrations`). Each subsequent migration that introduces a new
* domain table also extends this interface here so the type-checker prevents
* us from referencing columns that don't exist.
*
* Convention:
* - Surrogate IDs use ULIDs (string) generated by `@std/ulid`.
* - Timestamps are stored as ISO 8601 strings (TEXT) — easy to read in
* `sqlite3` CLI and trivially sortable.
* - Booleans are stored as 0/1 INTEGERs (SQLite has no native bool).
*/
import type { Generated } from "kysely";
// =====================================================================
// schema_migrations — managed by `Migrator`
// =====================================================================
export interface SchemaMigrationsTable {
/** Monotonically increasing migration version, e.g. 1, 2, 3. */
version: number;
/** Human-readable name, e.g. `"0001_initial"`. */
name: string;
/** ISO timestamp of when the migration was applied. */
applied_at: string;
}
// =====================================================================
// Composite Database type — passed to `Kysely<Database>`
// =====================================================================
export interface Database {
schema_migrations: SchemaMigrationsTable;
}
// =====================================================================
// Helper aliases for migrations
// =====================================================================
export type GeneratedId = Generated<string>;
export type Timestamp = string;

View File

@@ -0,0 +1,76 @@
/**
* Bearer-token authentication middleware.
*
* Compares the `Authorization: Bearer <token>` header against the configured
* `IPC_TOKEN`. The comparison is constant-time to avoid leaking the token
* via timing side-channels on the loopback interface.
*
* Routes flagged `anonymous: true` (e.g. `/health`) are exempted. The match
* decision is provided to this middleware via `ctx.locals.routeAnonymous`,
* which the server sets after route resolution.
*/
import { IPC_AUTH_HEADER, IpcErrorCode } from "@elly/shared";
import type { HttpMiddleware } from "../types.ts";
import { jsonError } from "./error.ts";
const BEARER_PREFIX = "Bearer ";
export interface AuthMiddlewareOptions {
readonly token: string;
}
export function authMiddleware(options: AuthMiddlewareOptions): HttpMiddleware {
const expected = options.token;
if (expected.length === 0) {
throw new Error("authMiddleware requires a non-empty IPC token");
}
return (ctx, next): Promise<Response> => {
if (ctx.locals.routeAnonymous === true) {
return next();
}
const header = ctx.request.headers.get(IPC_AUTH_HEADER);
if (!header || !header.startsWith(BEARER_PREFIX)) {
ctx.logger.warn("ipc auth missing", { reason: "no_bearer_header" });
return Promise.resolve(
jsonError({
status: 401,
code: IpcErrorCode.UNAUTHORIZED,
message: "Missing or malformed Authorization header",
requestId: ctx.requestId,
}),
);
}
const presented = header.slice(BEARER_PREFIX.length);
if (!timingSafeEqual(presented, expected)) {
ctx.logger.warn("ipc auth failed", { reason: "token_mismatch" });
return Promise.resolve(
jsonError({
status: 401,
code: IpcErrorCode.UNAUTHORIZED,
message: "Invalid IPC token",
requestId: ctx.requestId,
}),
);
}
return next();
};
}
function timingSafeEqual(a: string, b: string): boolean {
// Constant-time equality. We compare to the longer of the two so timing
// doesn't leak length. Diff bits accumulate into `mismatch`.
const ab = new TextEncoder().encode(a);
const bb = new TextEncoder().encode(b);
const len = Math.max(ab.length, bb.length);
let mismatch = ab.length ^ bb.length;
for (let i = 0; i < len; i++) {
mismatch |= (ab[i] ?? 0) ^ (bb[i] ?? 0);
}
return mismatch === 0;
}

View File

@@ -0,0 +1,97 @@
/**
* Error-handling middleware + JSON error helper.
*
* The middleware wraps the entire request chain. Any uncaught exception is
* captured, logged with its stack, and serialized to the canonical
* `IpcErrorBody` envelope (`@elly/shared`). Known `IpcError` instances
* preserve their `code`/`status`/`requestId`; everything else is rendered
* as a generic 500 to avoid leaking internals.
*/
import { IPC_ERROR_STATUS, IpcError, IpcErrorCode } from "@elly/shared";
import type { IpcErrorBody } from "@elly/shared";
import type { HttpMiddleware } from "../types.ts";
export function errorMiddleware(): HttpMiddleware {
return async (ctx, next) => {
try {
return await next();
} catch (err) {
if (err instanceof IpcError) {
ctx.logger.warn("ipc handler threw IpcError", {
code: err.code,
status: err.status,
err: { name: err.name, message: err.message },
});
return jsonError({
status: err.status,
code: err.code,
message: err.message,
requestId: ctx.requestId,
details: err.details,
});
}
const wrapped = err instanceof Error ? err : new Error(String(err));
ctx.logger.error("ipc handler threw unexpectedly", {
err: wrapped,
});
return jsonError({
status: 500,
code: IpcErrorCode.INTERNAL,
message: "Internal core error",
requestId: ctx.requestId,
});
}
};
}
export interface JsonErrorInput {
readonly status: number;
readonly code: string;
readonly message: string;
readonly requestId?: string;
readonly details?: unknown;
}
/**
* Build a `Response` carrying the canonical `IpcErrorBody` envelope.
*
* The status falls back to `IPC_ERROR_STATUS[code]` when the caller passes
* `status: 0`, allowing handlers to specify only the semantic code.
*/
export function jsonError(input: JsonErrorInput): Response {
const status = input.status > 0
? input.status
: (IPC_ERROR_STATUS as Record<string, number>)[input.code] ?? 500;
const body: IpcErrorBody = {
error: {
code: input.code,
message: input.message,
requestId: input.requestId,
details: input.details,
},
};
return new Response(JSON.stringify(body), {
status,
headers: {
"content-type": "application/json; charset=utf-8",
...(input.requestId ? { "x-request-id": input.requestId } : {}),
},
});
}
/**
* Convenience for handlers: build a success JSON response.
*/
export function jsonOk(body: unknown, init: { status?: number; requestId?: string } = {}): Response {
return new Response(JSON.stringify(body), {
status: init.status ?? 200,
headers: {
"content-type": "application/json; charset=utf-8",
...(init.requestId ? { "x-request-id": init.requestId } : {}),
},
});
}

View File

@@ -0,0 +1,45 @@
/**
* Request/response logging middleware.
*
* Emits two structured log lines per request:
* - `info` `ipc request started` — method, path, requestId
* - `info` `ipc request completed` — same fields + status, durationMs
*
* Failures (rejected promise inside `next`) are re-thrown after logging
* so the error middleware downstream can format them into envelopes.
*
* Per the architectural mandate, every request's latency is recorded —
* this is the foundation for production SLO dashboards in later phases.
*/
import type { HttpMiddleware } from "../types.ts";
export function loggingMiddleware(): HttpMiddleware {
return async (ctx, next) => {
ctx.logger.info("ipc request started", {
method: ctx.method,
path: ctx.url.pathname,
});
try {
const response = await next();
const durationMs = Number((performance.now() - ctx.startedAtMs).toFixed(2));
ctx.logger.info("ipc request completed", {
method: ctx.method,
path: ctx.url.pathname,
status: response.status,
durationMs,
});
return response;
} catch (err) {
const durationMs = Number((performance.now() - ctx.startedAtMs).toFixed(2));
ctx.logger.error("ipc request failed", {
method: ctx.method,
path: ctx.url.pathname,
durationMs,
err: err instanceof Error ? err : new Error(String(err)),
});
throw err;
}
};
}

View File

@@ -0,0 +1,147 @@
/**
* Minimal dependency-free HTTP router.
*
* Compiles `:param` segments in path patterns into RegExp at registration
* time so dispatch is O(routes) per request. Returns:
* - The matching route plus extracted `params`, or
* - A 404 sentinel handler if no route matches the path, or
* - A 405 sentinel handler if the path matched but no method matched.
*
* Middleware compose left-to-right; the final layer is the route handler.
*/
import type {
HttpContext,
HttpHandler,
HttpMethod,
HttpHandler as Handler,
HttpMiddleware,
RouteDefinition,
} from "./types.ts";
interface CompiledRoute {
readonly method: HttpMethod;
readonly pattern: RegExp;
readonly paramNames: ReadonlyArray<string>;
readonly anonymous: boolean;
readonly handler: Handler;
readonly rawPath: string;
}
export interface RouteMatch {
readonly handler: HttpHandler;
readonly params: Readonly<Record<string, string>>;
readonly anonymous: boolean;
readonly status: 200 | 404 | 405;
}
export class HttpRouter {
private readonly routes: CompiledRoute[] = [];
private readonly middleware: HttpMiddleware[] = [];
use(mw: HttpMiddleware): void {
this.middleware.push(mw);
}
register(route: RouteDefinition): void {
this.routes.push(compile(route));
}
registerMany(routes: ReadonlyArray<RouteDefinition>): void {
for (const r of routes) this.register(r);
}
/** Find the best match for `(method, path)`. */
resolve(method: HttpMethod, path: string): RouteMatch {
let methodMismatch = false;
for (const route of this.routes) {
const m = route.pattern.exec(path);
if (!m) continue;
if (route.method !== method) {
methodMismatch = true;
continue;
}
const params: Record<string, string> = {};
for (let i = 0; i < route.paramNames.length; i++) {
params[route.paramNames[i]] = decodeURIComponent(m[i + 1] ?? "");
}
return {
handler: route.handler,
params,
anonymous: route.anonymous,
status: 200,
};
}
return {
handler: methodMismatch ? methodNotAllowedHandler : notFoundHandler,
params: {},
anonymous: true,
status: methodMismatch ? 405 : 404,
};
}
/**
* Run the middleware chain followed by the matched handler.
* `match.handler` is the final layer.
*/
dispatch(ctx: HttpContext, match: RouteMatch): Promise<Response> {
let index = -1;
const chain = this.middleware;
const next = (): Promise<Response> => {
index++;
if (index < chain.length) {
return chain[index](ctx, next);
}
return Promise.resolve(match.handler(ctx));
};
return next();
}
}
// =====================================================================
// Helpers
// =====================================================================
function compile(route: RouteDefinition): CompiledRoute {
const paramNames: string[] = [];
// Escape literal characters except `:param` segments.
const regexBody = route.path
.split("/")
.map((segment) => {
if (segment.startsWith(":")) {
paramNames.push(segment.slice(1));
return "([^/]+)";
}
return segment.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
})
.join("/");
return {
method: route.method,
pattern: new RegExp(`^${regexBody}$`),
paramNames,
anonymous: route.anonymous ?? false,
handler: route.handler,
rawPath: route.path,
};
}
function notFoundHandler(): Response {
return new Response(
JSON.stringify({
error: { code: "not_found", message: "Route not found" },
}),
{ status: 404, headers: { "content-type": "application/json" } },
);
}
function methodNotAllowedHandler(): Response {
return new Response(
JSON.stringify({
error: { code: "method_not_allowed", message: "Method not allowed for this route" },
}),
{ status: 405, headers: { "content-type": "application/json" } },
);
}

View File

@@ -0,0 +1,148 @@
/**
* Phase 2 system routes:
* - `GET /health` Anonymous liveness probe (200 OK).
* - `GET /v1/version` Authenticated diagnostic payload.
* - `GET /v1/events` Authenticated SSE stream of domain events.
*
* These are the only routes Phase 2 ships. Phase 4 will add the feature
* routes (`/v1/applications/...`, `/v1/suggestions/...`, etc.).
*/
import { IpcRoutes } from "@elly/shared";
import type { AnyDomainEvent } from "@elly/shared";
import type { DbConnection } from "../../db/connection.ts";
import type { KvHandle } from "../../kv/store.ts";
import type { DomainEventBus } from "../../pubsub/bus.ts";
import { jsonOk } from "../middleware/error.ts";
import type { HttpContext, RouteDefinition } from "../types.ts";
export interface SystemRouteDeps {
readonly version: string;
readonly startedAt: number;
readonly db: DbConnection;
readonly kv: KvHandle;
readonly bus: DomainEventBus;
}
const SSE_HEADERS: HeadersInit = {
"content-type": "text/event-stream; charset=utf-8",
"cache-control": "no-cache, no-transform",
"connection": "keep-alive",
// The bot's SSE consumer (Phase 3) doesn't use `x-accel-buffering`, but
// operators sitting behind nginx will thank us.
"x-accel-buffering": "no",
};
export function buildSystemRoutes(deps: SystemRouteDeps): ReadonlyArray<RouteDefinition> {
return [
{
method: "GET",
path: IpcRoutes.HEALTH,
anonymous: true,
handler: (ctx) =>
jsonOk(
{ status: "ok", uptimeMs: Date.now() - deps.startedAt },
{ requestId: ctx.requestId },
),
},
{
method: "GET",
path: IpcRoutes.VERSION,
handler: async (ctx) => {
const stats = await deps.db.getStats();
return jsonOk(
{
version: deps.version,
pid: Deno.pid,
uptimeMs: Date.now() - deps.startedAt,
deno: Deno.version,
db: {
path: stats.path,
sizeBytes: stats.sizeBytes,
tables: stats.tables,
},
kv: {
path: deps.kv.path,
},
bus: {
subscribers: deps.bus.subscribers,
},
},
{ requestId: ctx.requestId },
);
},
},
{
method: "GET",
path: IpcRoutes.EVENTS,
handler: (ctx) => buildSseResponse(ctx, deps.bus),
},
];
}
function buildSseResponse(ctx: HttpContext, bus: DomainEventBus): Response {
const encoder = new TextEncoder();
const stream = new ReadableStream<Uint8Array>({
start(controller) {
let closed = false;
const closeIfOpen = () => {
if (closed) return;
closed = true;
try {
controller.close();
} catch {
// already closed
}
};
const tryEnqueue = (chunk: Uint8Array) => {
if (closed) return;
try {
controller.enqueue(chunk);
} catch {
closed = true;
}
};
// Initial comment so proxies open the connection right away.
tryEnqueue(encoder.encode(`: connected requestId=${ctx.requestId}\n\n`));
// Replay nothing — Phase 2 has no backlog. Subscribe live.
const unsubscribe = bus.subscribe((event: AnyDomainEvent) => {
const payload = `event: ${event.type}\nid: ${event.id}\ndata: ${
JSON.stringify(event)
}\n\n`;
tryEnqueue(encoder.encode(payload));
});
// Periodic comment-line keep-alive so idle connections aren't reaped
// by intermediaries that close on inactivity.
const heartbeat = setInterval(() => {
tryEnqueue(encoder.encode(`: keep-alive ${Date.now()}\n\n`));
}, 15_000);
const onAbort = () => {
ctx.logger.debug("sse client disconnected", { requestId: ctx.requestId });
clearInterval(heartbeat);
unsubscribe();
closeIfOpen();
};
if (ctx.signal.aborted) {
onAbort();
} else {
ctx.signal.addEventListener("abort", onAbort, { once: true });
}
ctx.logger.info("sse client connected", { requestId: ctx.requestId });
},
});
return new Response(stream, {
status: 200,
headers: { ...SSE_HEADERS, "x-request-id": ctx.requestId },
});
}

View File

@@ -0,0 +1,167 @@
/**
* Core IPC HTTP server.
*
* Wires together:
* - `HttpRouter` — route resolution + middleware composition.
* - Middleware chain (in onion order, outer → inner):
* errorMiddleware → loggingMiddleware → authMiddleware → handler
* - `Deno.serve` — Deno's native server, bound to `host:port` from config.
*
* Exposes `start()` (returns when the server is accepting connections) and
* `shutdown()` (graceful drain with a hard timeout).
*/
import { ulid } from "@std/ulid";
import { IPC_REQUEST_ID_HEADER } from "@elly/shared";
import type { Logger } from "@elly/shared";
import type { DomainEventBus } from "../pubsub/bus.ts";
import { authMiddleware } from "./middleware/auth.ts";
import { errorMiddleware } from "./middleware/error.ts";
import { loggingMiddleware } from "./middleware/logging.ts";
import { HttpRouter } from "./router.ts";
import type {
HttpContext,
HttpMethod,
RouteDefinition,
} from "./types.ts";
export interface HttpServerOptions {
readonly host: string;
readonly port: number;
readonly ipcToken: string;
readonly logger: Logger;
readonly bus: DomainEventBus;
readonly routes: ReadonlyArray<RouteDefinition>;
/** Hard timeout (ms) for shutdown to wait for in-flight requests. */
readonly shutdownTimeoutMs?: number;
}
export interface HttpServer {
readonly host: string;
readonly port: number;
start(): Promise<void>;
shutdown(): Promise<void>;
}
export function createHttpServer(options: HttpServerOptions): HttpServer {
const log = options.logger.child({ component: "http" });
const router = new HttpRouter();
// Outer-most middleware first.
router.use(errorMiddleware());
router.use(loggingMiddleware());
router.use(authMiddleware({ token: options.ipcToken }));
router.registerMany(options.routes);
const controller = new AbortController();
let server: Deno.HttpServer | null = null;
let listening: Promise<void> | null = null;
return {
host: options.host,
port: options.port,
start(): Promise<void> {
if (listening) return listening;
listening = new Promise<void>((resolve, reject) => {
try {
server = Deno.serve(
{
hostname: options.host,
port: options.port,
signal: controller.signal,
onListen: ({ hostname, port }) => {
log.info("ipc http server listening", { hostname, port });
resolve();
},
onError: (err) => {
const wrapped = err instanceof Error ? err : new Error(String(err));
log.error("ipc http unhandled error", { err: wrapped });
return new Response("internal", { status: 500 });
},
},
(request) => handleRequest(request, router, options.bus, log),
);
} catch (err) {
reject(err);
}
});
return listening;
},
async shutdown(): Promise<void> {
log.info("ipc http server shutting down");
controller.abort();
if (server) {
const timeoutMs = options.shutdownTimeoutMs ?? 5_000;
await raceWithTimeout(server.finished, timeoutMs, () => {
log.warn("ipc http server shutdown timed out; forcing", { timeoutMs });
});
}
log.info("ipc http server stopped");
},
};
}
function handleRequest(
request: Request,
router: HttpRouter,
bus: DomainEventBus,
logger: Logger,
): Promise<Response> {
const url = new URL(request.url);
const method = request.method.toUpperCase() as HttpMethod;
const requestId = request.headers.get(IPC_REQUEST_ID_HEADER) ?? ulid();
const startedAtMs = performance.now();
const match = router.resolve(method, url.pathname);
const locals: Record<string, unknown> = {
routeAnonymous: match.anonymous,
routeStatus: match.status,
};
const requestLogger = logger.child({
requestId,
method,
path: url.pathname,
});
const ctx: HttpContext = {
request,
url,
method,
params: match.params,
requestId,
startedAtMs,
logger: requestLogger,
bus,
locals,
signal: request.signal,
};
return router.dispatch(ctx, match);
}
async function raceWithTimeout(
promise: Promise<void>,
timeoutMs: number,
onTimeout: () => void,
): Promise<void> {
let timer: number | undefined;
const timeout = new Promise<void>((resolve) => {
timer = setTimeout(() => {
onTimeout();
resolve();
}, timeoutMs);
});
try {
await Promise.race([promise, timeout]);
} finally {
if (timer !== undefined) clearTimeout(timer);
}
}

View File

@@ -0,0 +1,53 @@
/**
* Shared types for the IPC HTTP layer.
*
* The router is intentionally tiny and dependency-free — Phase 2 doesn't
* justify a framework. Middleware compose around handlers via a classic
* onion model: each middleware receives `(ctx, next)` and may short-circuit
* by returning a response without calling `next`.
*/
import type { Logger } from "@elly/shared";
import type { DomainEventBus } from "../pubsub/bus.ts";
export type HttpMethod = "GET" | "POST" | "PUT" | "PATCH" | "DELETE";
export interface HttpContext {
/** Inbound request. */
readonly request: Request;
/** Parsed URL — cached so middleware/handlers don't re-parse. */
readonly url: URL;
/** Method (uppercased). */
readonly method: HttpMethod;
/** Path params extracted by the router (empty for no-param routes). */
readonly params: Readonly<Record<string, string>>;
/** Correlation ID — generated or echoed from `x-request-id`. */
readonly requestId: string;
/** Wall-clock start of the request in ms (for latency logging). */
readonly startedAtMs: number;
/** Logger pre-bound with method, path, requestId. */
readonly logger: Logger;
/** Domain event bus (handlers may publish from here). */
readonly bus: DomainEventBus;
/** Mutable bag for cross-middleware state (rarely used). */
readonly locals: Record<string, unknown>;
/** Convenience accessor for the connection abort signal. */
readonly signal: AbortSignal;
}
export type HttpHandler = (ctx: HttpContext) => Promise<Response> | Response;
export type HttpMiddleware = (
ctx: HttpContext,
next: () => Promise<Response>,
) => Promise<Response>;
export interface RouteDefinition {
readonly method: HttpMethod;
/** Path pattern, e.g. `"/v1/applications/:id"`. Compiled by the router. */
readonly path: string;
/** Optional: marks a route as unauthenticated (default: requires auth). */
readonly anonymous?: boolean;
readonly handler: HttpHandler;
}

View File

@@ -0,0 +1,85 @@
/**
* Generic TTL cache backed by `Deno.Kv`.
*
* Used in Phase 4 by the PikaNetwork client to cache profile/leaderboard
* lookups across restarts. Each entry carries an explicit `cachedAt` so
* callers can detect "stale-but-acceptable" reads.
*
* Keys are namespaced by `cacheKind` (e.g. `"pika:profile"`) so unrelated
* features cannot collide. The actual KV key is
* `[ "cache", cacheKind, ...identifier ]`.
*/
import type { KvHandle } from "./store.ts";
export interface CacheEntry<T> {
readonly value: T;
readonly cachedAt: number;
readonly expiresAt: number;
}
export interface CacheGetResult<T> {
readonly hit: boolean;
readonly entry: CacheEntry<T> | null;
}
export class CacheStore {
constructor(private readonly handle: KvHandle) {}
async get<T>(cacheKind: string, identifier: Deno.KvKey): Promise<CacheGetResult<T>> {
const key = this.buildKey(cacheKind, identifier);
const entry = await this.handle.kv.get<CacheEntry<T>>(key);
if (entry.value === null) {
return { hit: false, entry: null };
}
if (entry.value.expiresAt <= Date.now()) {
// Best-effort cleanup of an expired entry that Kv hasn't reaped yet.
await this.handle.kv.delete(key);
return { hit: false, entry: null };
}
return { hit: true, entry: entry.value };
}
async set<T>(args: {
cacheKind: string;
identifier: Deno.KvKey;
value: T;
ttlMs: number;
}): Promise<CacheEntry<T>> {
if (args.ttlMs <= 0) {
throw new Error("CacheStore.set requires ttlMs > 0");
}
const now = Date.now();
const entry: CacheEntry<T> = {
value: args.value,
cachedAt: now,
expiresAt: now + args.ttlMs,
};
const key = this.buildKey(args.cacheKind, args.identifier);
await this.handle.kv.set(key, entry, { expireIn: args.ttlMs });
return entry;
}
async delete(cacheKind: string, identifier: Deno.KvKey): Promise<void> {
const key = this.buildKey(cacheKind, identifier);
await this.handle.kv.delete(key);
}
/**
* Drop every entry for a given `cacheKind`. Use sparingly — full prefix
* scans are O(n) over that kind.
*/
async deleteKind(cacheKind: string): Promise<number> {
const prefix: Deno.KvKey = ["cache", cacheKind];
let count = 0;
for await (const entry of this.handle.kv.list({ prefix })) {
await this.handle.kv.delete(entry.key);
count++;
}
return count;
}
private buildKey(cacheKind: string, identifier: Deno.KvKey): Deno.KvKey {
return this.handle.key("cache", cacheKind, ...identifier);
}
}

View File

@@ -0,0 +1,93 @@
/**
* Cooldown store backed by `Deno.Kv`.
*
* Survives process restarts (unlike the legacy in-memory `Collection` map).
* Each entry is keyed by `[ "cooldown", command, userId ]` and stores the
* epoch-ms timestamp at which the cooldown expires. We use Kv's `expireIn`
* to let the runtime garbage-collect expired entries.
*
* `checkAndSet` is atomic via Kv's compare-and-swap loop — concurrent
* invocations cannot both observe "no cooldown" and both set one.
*/
import type { KvHandle } from "./store.ts";
export interface CooldownCheckResult {
/** True if the caller is currently rate-limited. */
readonly limited: boolean;
/** Seconds remaining before the cooldown expires (rounded up). 0 if not limited. */
readonly retryAfterSeconds: number;
/** Underlying epoch-ms expiry; 0 if not limited. */
readonly expiresAtMs: number;
}
export class CooldownStore {
constructor(private readonly handle: KvHandle) {}
/**
* Atomically check whether `userId` is on cooldown for `command`. If not,
* set a new cooldown of `durationMs` and return `limited: false`.
*
* Retries up to 3 times on Kv contention before giving up.
*/
async checkAndSet(args: {
command: string;
userId: string;
durationMs: number;
}): Promise<CooldownCheckResult> {
if (args.durationMs <= 0) {
return { limited: false, retryAfterSeconds: 0, expiresAtMs: 0 };
}
const key = this.handle.key("cooldown", args.command, args.userId);
for (let attempt = 0; attempt < 3; attempt++) {
const existing = await this.handle.kv.get<number>(key);
const now = Date.now();
if (existing.value !== null && existing.value > now) {
return {
limited: true,
retryAfterSeconds: Math.ceil((existing.value - now) / 1000),
expiresAtMs: existing.value,
};
}
const expiresAtMs = now + args.durationMs;
const result = await this.handle.kv.atomic()
.check(existing)
.set(key, expiresAtMs, { expireIn: args.durationMs })
.commit();
if (result.ok) {
return { limited: false, retryAfterSeconds: 0, expiresAtMs };
}
// Contention — retry.
}
// Three CAS retries failed. Treat as limited so we never bypass cooldowns
// under load, and report a one-second back-off.
return { limited: true, retryAfterSeconds: 1, expiresAtMs: Date.now() + 1000 };
}
/** Peek at the current cooldown for a user/command without modifying it. */
async peek(command: string, userId: string): Promise<CooldownCheckResult> {
const key = this.handle.key("cooldown", command, userId);
const entry = await this.handle.kv.get<number>(key);
const now = Date.now();
if (entry.value === null || entry.value <= now) {
return { limited: false, retryAfterSeconds: 0, expiresAtMs: 0 };
}
return {
limited: true,
retryAfterSeconds: Math.ceil((entry.value - now) / 1000),
expiresAtMs: entry.value,
};
}
/** Manually clear a cooldown — used by admin tooling. */
async clear(command: string, userId: string): Promise<void> {
const key = this.handle.key("cooldown", command, userId);
await this.handle.kv.delete(key);
}
}

View File

@@ -0,0 +1,76 @@
/**
* Interaction state store.
*
* Discord component customIds are capped at 100 characters, which is far too
* small to embed arbitrary context. Per the architectural mandate, we keep
* interactions stateless by stashing context in Deno.Kv keyed by a short
* token (typically a ULID embedded in the customId).
*
* Records have a default TTL of 15 minutes to keep the KV store bounded —
* Discord interactions become invalid after 15 minutes anyway.
*/
import { ulid } from "@std/ulid";
import type { KvHandle } from "./store.ts";
const DEFAULT_TTL_MS = 15 * 60 * 1000;
export interface InteractionStateRecord<T> {
readonly token: string;
readonly value: T;
readonly storedAt: number;
}
export class InteractionStateStore {
constructor(private readonly handle: KvHandle) {}
/**
* Persist a context object and return a fresh ULID token. The token is
* what callers should embed in customIds — never the value itself.
*/
async create<T>(value: T, ttlMs: number = DEFAULT_TTL_MS): Promise<string> {
const token = ulid();
const key = this.handle.key("interaction", token);
const record: InteractionStateRecord<T> = {
token,
value,
storedAt: Date.now(),
};
await this.handle.kv.set(key, record, { expireIn: ttlMs });
return token;
}
/**
* Read a stored context. Returns `null` if the token doesn't exist or
* has already expired/been consumed.
*/
async read<T>(token: string): Promise<InteractionStateRecord<T> | null> {
const key = this.handle.key("interaction", token);
const entry = await this.handle.kv.get<InteractionStateRecord<T>>(key);
return entry.value;
}
/**
* Read-and-delete (consume-once semantics). Use this for one-shot
* confirm/cancel flows so a stale token can't replay the action.
*/
async consume<T>(token: string): Promise<InteractionStateRecord<T> | null> {
const key = this.handle.key("interaction", token);
const entry = await this.handle.kv.get<InteractionStateRecord<T>>(key);
if (entry.value === null) return null;
// Atomically delete only if the version we just read still matches.
const result = await this.handle.kv.atomic()
.check(entry)
.delete(key)
.commit();
return result.ok ? entry.value : null;
}
/** Manually invalidate a token (e.g. when the parent message is deleted). */
async revoke(token: string): Promise<void> {
const key = this.handle.key("interaction", token);
await this.handle.kv.delete(key);
}
}

View File

@@ -0,0 +1,73 @@
/**
* `Deno.Kv` lifecycle wrapper.
*
* Owns the single `Deno.Kv` handle for the core process and exposes typed
* helpers used by:
* - `CooldownStore` (per-command, per-user TTL gating)
* - `InteractionStateStore` (ephemeral context for stateless components)
* - `CacheStore` (generic TTL cache, e.g. for the Pika client)
*
* Why one file?
* The three concerns share the same KV handle and the same notion of
* `KvKey` prefixes. Keeping them in one module makes the prefix table
* explicit and prevents two features from accidentally colliding.
*/
import { ensureDir } from "@std/fs";
import { dirname } from "@std/path";
import type { Logger } from "@elly/shared";
export type KvNamespace = "cooldown" | "interaction" | "cache" | "meta";
export const KV_NAMESPACES: ReadonlyArray<KvNamespace> = [
"cooldown",
"interaction",
"cache",
"meta",
];
export interface KvHandle {
/** The underlying `Deno.Kv` instance. */
readonly kv: Deno.Kv;
/** Filesystem path the KV store was opened from. */
readonly path: string;
/** Build a fully-qualified KV key by namespace + segments. */
key(namespace: KvNamespace, ...segments: Deno.KvKey): Deno.KvKey;
/** Close the underlying connection. Safe to call multiple times. */
close(): Promise<void>;
}
export interface OpenKvOptions {
readonly path: string;
readonly logger: Logger;
}
/**
* Open (or create) the on-disk Deno.Kv store at `path`.
*/
export async function openKv(options: OpenKvOptions): Promise<KvHandle> {
const log = options.logger.child({ component: "kv" });
await ensureDir(dirname(options.path));
log.debug("opening deno.kv store", { path: options.path });
const kv = await Deno.openKv(options.path);
log.info("deno.kv store opened", { path: options.path });
let closed = false;
return {
kv,
path: options.path,
key(namespace: KvNamespace, ...segments: Deno.KvKey): Deno.KvKey {
return [namespace, ...segments];
},
async close(): Promise<void> {
if (closed) return;
closed = true;
log.debug("closing deno.kv store");
kv.close();
// close() on Deno.Kv is sync; await Promise.resolve to keep the async signature.
await Promise.resolve();
},
};
}

View File

@@ -0,0 +1,86 @@
/**
* In-process domain event bus.
*
* Core services publish typed domain events here; the HTTP `events` route
* (Server-Sent Events) subscribes and forwards each event to the bot.
*
* Implementation:
* - Backed by `EventTarget`/`CustomEvent` — Deno's stdlib event primitive,
* no external deps.
* - Listeners are synchronous from the bus's perspective. Subscribers
* wrap their handler in their own queue if they need backpressure
* (the SSE bridge does this via the response stream).
* - Every published event is automatically stamped with a ULID `id` and
* `timestamp` if the publisher didn't supply them.
*/
import { ulid } from "@std/ulid";
import type { AnyDomainEvent, Logger } from "@elly/shared";
const EVENT_NAME = "domain";
export type EventListener = (event: AnyDomainEvent) => void;
export type EventPublishInput =
& Omit<AnyDomainEvent, "id" | "timestamp">
& Partial<Pick<AnyDomainEvent, "id" | "timestamp">>;
export class DomainEventBus {
private readonly target = new EventTarget();
private readonly log: Logger;
private listenerCount = 0;
constructor(logger: Logger) {
this.log = logger.child({ component: "pubsub" });
}
/** Publish a fully-formed or partial event. Returns the final event object. */
publish(input: EventPublishInput): AnyDomainEvent {
const event = {
...input,
id: input.id ?? ulid(),
timestamp: input.timestamp ?? Date.now(),
} as AnyDomainEvent;
this.log.debug("publishing domain event", {
type: event.type,
id: event.id,
listeners: this.listenerCount,
});
this.target.dispatchEvent(new CustomEvent<AnyDomainEvent>(EVENT_NAME, { detail: event }));
return event;
}
/**
* Subscribe to every event. Returns an unsubscribe function — call it
* exactly once when the consumer is torn down (e.g. SSE client disconnect).
*/
subscribe(handler: EventListener): () => void {
const wrapped = (e: Event) => {
const ev = (e as CustomEvent<AnyDomainEvent>).detail;
try {
handler(ev);
} catch (err) {
this.log.error("subscriber threw", {
eventType: ev.type,
eventId: ev.id,
err: err instanceof Error ? err : new Error(String(err)),
});
}
};
this.target.addEventListener(EVENT_NAME, wrapped);
this.listenerCount++;
return () => {
this.target.removeEventListener(EVENT_NAME, wrapped);
this.listenerCount = Math.max(0, this.listenerCount - 1);
};
}
/** Active listener count — useful for /v1/version diagnostics. */
get subscribers(): number {
return this.listenerCount;
}
}

165
crates/core/src/main.ts Normal file
View File

@@ -0,0 +1,165 @@
/**
* @elly/core entrypoint — Phase 2 boot.
*
* Lifecycle:
* 1. Load + validate `config.toml` (Zod).
* 2. Load + validate environment (Zod).
* 3. Build the root structured logger.
* 4. Build the DI container (DB, migrations, KV, bus, HTTP server).
* 5. Start the IPC HTTP server.
* 6. Publish `server.ready` on the domain bus.
* 7. Block on SIGINT/SIGTERM, then gracefully shut down the container.
*/
import {
ConfigError,
ConfigValidationError,
CoreEnvSchema,
createLogger,
EnvValidationError,
loadConfig,
loadEnv,
type Config,
type CoreEnv,
type Logger,
} from "@elly/shared";
import { buildContainer, type CoreContainer } from "./container.ts";
const CONFIG_PATH = Deno.env.get("CONFIG_PATH") ?? "./config.toml";
const LOGGER_NAME = "@elly/core";
const VERSION = "0.1.0";
async function main(): Promise<void> {
const config = await loadConfigOrExit();
const env = loadEnvOrExit();
const logger = buildLogger(config, env);
logger.info("phase 2 boot starting", {
crate: "core",
nodeEnv: env.NODE_ENV,
version: VERSION,
});
let container: CoreContainer | null = null;
try {
container = await buildContainer({ config, env, logger, version: VERSION });
} catch (err) {
logger.fatal("container build failed", {
err: err instanceof Error ? err : new Error(String(err)),
});
await logger.flush();
Deno.exit(1);
}
installSignalHandlers(container, logger);
try {
await container.http.start();
} catch (err) {
logger.fatal("ipc server failed to start", {
err: err instanceof Error ? err : new Error(String(err)),
});
await container.shutdown();
await logger.flush();
Deno.exit(1);
}
container.bus.publish({
type: "server.ready",
payload: { version: VERSION, pid: Deno.pid },
});
logger.info("phase 2 boot complete", {
pid: Deno.pid,
ipc: `${config.ipc.host}:${config.ipc.port}`,
});
// Hold the process open until a signal handler tears the container down.
await new Promise<void>(() => {});
}
// =====================================================================
// Boot helpers
// =====================================================================
async function loadConfigOrExit(): Promise<Config> {
try {
return await loadConfig(CONFIG_PATH);
} catch (err) {
bootFail(err, "config");
}
}
function loadEnvOrExit(): CoreEnv {
try {
return loadEnv(CoreEnvSchema);
} catch (err) {
bootFail(err, "env");
}
}
function buildLogger(config: Config, env: CoreEnv): Logger {
const level = env.LOG_LEVEL ?? config.logging.level;
const isProd = env.NODE_ENV === "production";
return createLogger({
name: LOGGER_NAME,
level,
format: isProd ? "json" : config.logging.format,
file: config.logging.file
? {
path: config.logging.file,
maxBytes: config.logging.file_max_bytes,
maxBackups: config.logging.file_max_backups,
}
: undefined,
});
}
function installSignalHandlers(container: CoreContainer, logger: Logger): void {
let shuttingDown = false;
const shutdown = (signal: string) => {
if (shuttingDown) {
logger.warn("shutdown already in progress; ignoring signal", { signal });
return;
}
shuttingDown = true;
logger.info("shutdown signal received", { signal });
container
.shutdown()
.catch((err) => {
logger.error("shutdown error", {
err: err instanceof Error ? err : new Error(String(err)),
});
})
.finally(() => {
logger.flush().finally(() => Deno.exit(0));
});
};
Deno.addSignalListener("SIGINT", () => shutdown("SIGINT"));
Deno.addSignalListener("SIGTERM", () => shutdown("SIGTERM"));
}
function bootFail(err: unknown, stage: "config" | "env"): never {
if (err instanceof ConfigValidationError || err instanceof EnvValidationError) {
console.error(`[@elly/core] ${stage} validation failed:`);
for (const issue of err.issues) {
console.error(` - ${issue.path}: ${issue.message}`);
}
Deno.exit(1);
}
if (err instanceof ConfigError) {
console.error(`[@elly/core] ${stage} error: ${err.message}`);
Deno.exit(1);
}
console.error(`[@elly/core] unexpected ${stage} error:`, err);
Deno.exit(1);
}
if (import.meta.main) {
main().catch((err) => {
console.error("[@elly/core] fatal boot error:", err);
Deno.exit(1);
});
}