extract.go now reads `tier:` and `topic:` from YAML frontmatter, with a path-based fallback when frontmatter is absent (the pre-M3 state on every existing entry): knowledge/* → tier=knowledge notes/* → tier=note wiki/** → tier=note (sources + concepts + entities are I-level) inbox/**, raw/**, sessions/**, clips/** → tier=inbox Frontmatter wins when present — covers the M3-migrated case where an entry's path may not match the tier the author chose for it. UpsertEntity persists both columns. M1's schema already has them. Backfill on next pod start populates tier for the whole corpus without any file moves; M3 will follow up with the actual layout migration and explicit frontmatter writes.
366 lines
11 KiB
Go
366 lines
11 KiB
Go
// Package graphstore stores the brain knowledge graph (entities +
|
|
// directed edges) in PostgreSQL on the shared postgres18 instance,
|
|
// alongside the pgvector embeddings in [vectorstore].
|
|
//
|
|
// Schema (created idempotently by Init):
|
|
//
|
|
// brain_entities(slug PK, type, wing, hall, doc_path, title, updated_at)
|
|
// brain_edges(id PK, src_slug FK, dst_slug, edge_type, src_doc, src_line,
|
|
// weight, updated_at)
|
|
//
|
|
// Edges fan-out from a source document; calling [PGStore.ReplaceEdgesForDoc]
|
|
// replaces every edge previously emitted from that document so re-ingest is
|
|
// idempotent without bookkeeping.
|
|
//
|
|
// All slug strings are stored verbatim — callers are expected to canonicalise
|
|
// before persisting. Dst slugs may reference entities that don't yet exist
|
|
// (dangling edges); resolution is deferred to query time so ingestion order
|
|
// doesn't matter.
|
|
package graphstore
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/mathiasbq/hyperguild/ingestion/internal/graph"
|
|
)
|
|
|
|
// PGStore is the postgres-backed brain knowledge-graph store. Construct
|
|
// with New + call Init once to create tables and indexes. Use Close to
|
|
// release the pool.
|
|
type PGStore struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
// New opens a pgxpool against dsn and pings to verify connectivity. The
|
|
// caller owns the resulting PGStore and must invoke Close.
|
|
func New(ctx context.Context, dsn string) (*PGStore, error) {
|
|
pool, err := pgxpool.New(ctx, dsn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pgxpool: %w", err)
|
|
}
|
|
if err := pool.Ping(ctx); err != nil {
|
|
pool.Close()
|
|
return nil, fmt.Errorf("ping: %w", err)
|
|
}
|
|
return &PGStore{pool: pool}, nil
|
|
}
|
|
|
|
// Close releases the underlying connection pool.
|
|
func (s *PGStore) Close() {
|
|
if s.pool != nil {
|
|
s.pool.Close()
|
|
}
|
|
}
|
|
|
|
// Init creates brain_entities + brain_edges tables and their indexes if
|
|
// they don't yet exist. Safe to call on every startup. No-op when the
|
|
// schema already matches.
|
|
func (s *PGStore) Init(ctx context.Context) error {
|
|
const ddl = `
|
|
CREATE TABLE IF NOT EXISTS brain_entities (
|
|
slug TEXT PRIMARY KEY,
|
|
type TEXT NOT NULL DEFAULT 'knowledge',
|
|
wing TEXT NOT NULL DEFAULT '',
|
|
hall TEXT NOT NULL DEFAULT '',
|
|
doc_path TEXT NOT NULL,
|
|
title TEXT NOT NULL DEFAULT '',
|
|
tier TEXT NOT NULL DEFAULT '',
|
|
topic TEXT NOT NULL DEFAULT '',
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
-- Idempotent migration for clusters created before the DIKW tier
|
|
-- redesign (infra#72). ADD COLUMN IF NOT EXISTS is safe across
|
|
-- repeated startups.
|
|
ALTER TABLE brain_entities
|
|
ADD COLUMN IF NOT EXISTS tier TEXT NOT NULL DEFAULT '',
|
|
ADD COLUMN IF NOT EXISTS topic TEXT NOT NULL DEFAULT '';
|
|
CREATE INDEX IF NOT EXISTS brain_entities_wing_idx
|
|
ON brain_entities (wing) WHERE wing <> '';
|
|
CREATE INDEX IF NOT EXISTS brain_entities_type_idx
|
|
ON brain_entities (type);
|
|
CREATE INDEX IF NOT EXISTS brain_entities_tier_idx
|
|
ON brain_entities (tier) WHERE tier <> '';
|
|
CREATE INDEX IF NOT EXISTS brain_entities_topic_idx
|
|
ON brain_entities (topic) WHERE topic <> '';
|
|
|
|
CREATE TABLE IF NOT EXISTS brain_edges (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
src_slug TEXT NOT NULL,
|
|
dst_slug TEXT NOT NULL,
|
|
edge_type TEXT NOT NULL DEFAULT 'wikilink',
|
|
src_doc TEXT NOT NULL,
|
|
src_line INTEGER NOT NULL DEFAULT 0,
|
|
weight REAL NOT NULL DEFAULT 1.0,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
CREATE INDEX IF NOT EXISTS brain_edges_src_idx
|
|
ON brain_edges (src_slug, edge_type);
|
|
CREATE INDEX IF NOT EXISTS brain_edges_dst_idx
|
|
ON brain_edges (dst_slug, edge_type);
|
|
CREATE INDEX IF NOT EXISTS brain_edges_src_doc_idx
|
|
ON brain_edges (src_doc);
|
|
`
|
|
_, err := s.pool.Exec(ctx, ddl)
|
|
return err
|
|
}
|
|
|
|
// UpsertEntity inserts or updates one entity by slug.
|
|
func (s *PGStore) UpsertEntity(ctx context.Context, e graph.Entity) error {
|
|
if e.Slug == "" {
|
|
return errors.New("entity slug is required")
|
|
}
|
|
if e.Type == "" {
|
|
e.Type = "knowledge"
|
|
}
|
|
_, err := s.pool.Exec(ctx, `
|
|
INSERT INTO brain_entities (slug, type, wing, hall, doc_path, title, tier, topic, updated_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now())
|
|
ON CONFLICT (slug) DO UPDATE
|
|
SET type = EXCLUDED.type,
|
|
wing = EXCLUDED.wing,
|
|
hall = EXCLUDED.hall,
|
|
doc_path = EXCLUDED.doc_path,
|
|
title = EXCLUDED.title,
|
|
tier = EXCLUDED.tier,
|
|
topic = EXCLUDED.topic,
|
|
updated_at = now()
|
|
`, e.Slug, e.Type, e.Wing, e.Hall, e.DocPath, e.Title, e.Tier, e.Topic)
|
|
if err != nil {
|
|
return fmt.Errorf("upsert entity %q: %w", e.Slug, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReplaceEdgesForDoc deletes every edge previously emitted from docPath
|
|
// and inserts the new set in one transaction. Caller should pass the
|
|
// complete edge set for the doc — partial updates are not supported.
|
|
func (s *PGStore) ReplaceEdgesForDoc(ctx context.Context, docPath string, edges []graph.Edge) error {
|
|
if docPath == "" {
|
|
return errors.New("doc path is required")
|
|
}
|
|
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("begin: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil {
|
|
return fmt.Errorf("delete prior edges for %q: %w", docPath, err)
|
|
}
|
|
for _, e := range edges {
|
|
if e.SrcSlug == "" || e.DstSlug == "" {
|
|
continue
|
|
}
|
|
if _, err := tx.Exec(ctx, `
|
|
INSERT INTO brain_edges (src_slug, dst_slug, edge_type, src_doc, src_line, weight)
|
|
VALUES ($1, $2, $3, $4, $5, 1.0)
|
|
`, e.SrcSlug, e.DstSlug, e.EdgeType, e.SrcDoc, e.SrcLine); err != nil {
|
|
return fmt.Errorf("insert edge %s->%s: %w", e.SrcSlug, e.DstSlug, err)
|
|
}
|
|
}
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return fmt.Errorf("commit: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteByDoc removes the entity at docPath and every edge it sourced.
|
|
// Use when a wiki page is deleted on disk.
|
|
func (s *PGStore) DeleteByDoc(ctx context.Context, docPath string) error {
|
|
if docPath == "" {
|
|
return errors.New("doc path is required")
|
|
}
|
|
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("begin: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil {
|
|
return fmt.Errorf("delete edges: %w", err)
|
|
}
|
|
if _, err := tx.Exec(ctx, `DELETE FROM brain_entities WHERE doc_path = $1`, docPath); err != nil {
|
|
return fmt.Errorf("delete entity: %w", err)
|
|
}
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
// Neighbor is one row in a Neighbors / Subgraph response.
|
|
type Neighbor struct {
|
|
Slug string
|
|
Type string
|
|
Wing string
|
|
Hall string
|
|
DocPath string
|
|
Title string
|
|
EdgeType string
|
|
Distance int // hop count from origin; 1 for direct neighbors
|
|
}
|
|
|
|
// Neighbors returns the direct (1-hop) outgoing neighbours of slug.
|
|
// edgeType filters by relationship kind; "" returns all kinds.
|
|
// limit defaults to 25 when <= 0.
|
|
func (s *PGStore) Neighbors(ctx context.Context, slug, edgeType string, limit int) ([]Neighbor, error) {
|
|
if slug == "" {
|
|
return nil, errors.New("slug is required")
|
|
}
|
|
if limit <= 0 {
|
|
limit = 25
|
|
}
|
|
q := `
|
|
SELECT e.dst_slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''),
|
|
COALESCE(t.doc_path,''), COALESCE(t.title,''), e.edge_type, 1
|
|
FROM brain_edges e
|
|
LEFT JOIN brain_entities t ON t.slug = e.dst_slug
|
|
WHERE e.src_slug = $1
|
|
AND ($2 = '' OR e.edge_type = $2)
|
|
ORDER BY e.updated_at DESC
|
|
LIMIT $3
|
|
`
|
|
rows, err := s.pool.Query(ctx, q, slug, edgeType, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query neighbors: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return scanNeighbors(rows)
|
|
}
|
|
|
|
// Subgraph returns every distinct slug reachable from origin within
|
|
// depth outgoing hops, annotated with the shortest hop distance. The
|
|
// origin itself is omitted. depth defaults to 2 when <= 0; values
|
|
// above 6 are clamped to 6 to bound traversal cost.
|
|
func (s *PGStore) Subgraph(ctx context.Context, origin string, depth int) ([]Neighbor, error) {
|
|
if origin == "" {
|
|
return nil, errors.New("origin slug is required")
|
|
}
|
|
if depth <= 0 {
|
|
depth = 2
|
|
}
|
|
if depth > 6 {
|
|
depth = 6
|
|
}
|
|
q := `
|
|
WITH RECURSIVE walk(slug, edge_type, distance) AS (
|
|
SELECT e.dst_slug, e.edge_type, 1
|
|
FROM brain_edges e
|
|
WHERE e.src_slug = $1
|
|
UNION
|
|
SELECT e.dst_slug, e.edge_type, w.distance + 1
|
|
FROM walk w
|
|
JOIN brain_edges e ON e.src_slug = w.slug
|
|
WHERE w.distance < $2
|
|
)
|
|
SELECT w.slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''),
|
|
COALESCE(t.doc_path,''), COALESCE(t.title,''), w.edge_type, MIN(w.distance)
|
|
FROM walk w
|
|
LEFT JOIN brain_entities t ON t.slug = w.slug
|
|
WHERE w.slug <> $1
|
|
GROUP BY w.slug, t.type, t.wing, t.hall, t.doc_path, t.title, w.edge_type
|
|
ORDER BY MIN(w.distance), w.slug
|
|
`
|
|
rows, err := s.pool.Query(ctx, q, origin, depth)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query subgraph: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return scanNeighbors(rows)
|
|
}
|
|
|
|
// PathStep is one hop in a Path response.
|
|
type PathStep struct {
|
|
FromSlug string
|
|
ToSlug string
|
|
EdgeType string
|
|
}
|
|
|
|
// Path returns the shortest directed path from src to dst within
|
|
// maxDepth hops, as an ordered list of edges. Empty slice means no
|
|
// path exists. maxDepth defaults to 4 when <= 0; values above 8 are
|
|
// clamped to 8.
|
|
func (s *PGStore) Path(ctx context.Context, src, dst string, maxDepth int) ([]PathStep, error) {
|
|
if src == "" || dst == "" {
|
|
return nil, errors.New("src and dst are required")
|
|
}
|
|
if maxDepth <= 0 {
|
|
maxDepth = 4
|
|
}
|
|
if maxDepth > 8 {
|
|
maxDepth = 8
|
|
}
|
|
q := `
|
|
WITH RECURSIVE walk(cur, path_slugs, path_edges, distance) AS (
|
|
SELECT e.dst_slug,
|
|
ARRAY[e.src_slug, e.dst_slug]::TEXT[],
|
|
ARRAY[e.edge_type]::TEXT[],
|
|
1
|
|
FROM brain_edges e
|
|
WHERE e.src_slug = $1
|
|
UNION ALL
|
|
SELECT e.dst_slug,
|
|
w.path_slugs || e.dst_slug,
|
|
w.path_edges || e.edge_type,
|
|
w.distance + 1
|
|
FROM walk w
|
|
JOIN brain_edges e ON e.src_slug = w.cur
|
|
WHERE w.distance < $3
|
|
AND NOT (e.dst_slug = ANY(w.path_slugs))
|
|
)
|
|
SELECT path_slugs, path_edges
|
|
FROM walk
|
|
WHERE cur = $2
|
|
ORDER BY distance ASC
|
|
LIMIT 1
|
|
`
|
|
row := s.pool.QueryRow(ctx, q, src, dst, maxDepth)
|
|
var (
|
|
slugs []string
|
|
kinds []string
|
|
)
|
|
if err := row.Scan(&slugs, &kinds); err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("scan path: %w", err)
|
|
}
|
|
if len(slugs) < 2 || len(kinds) == 0 {
|
|
return nil, nil
|
|
}
|
|
steps := make([]PathStep, 0, len(kinds))
|
|
for i := 0; i < len(kinds) && i+1 < len(slugs); i++ {
|
|
steps = append(steps, PathStep{
|
|
FromSlug: slugs[i],
|
|
ToSlug: slugs[i+1],
|
|
EdgeType: kinds[i],
|
|
})
|
|
}
|
|
return steps, nil
|
|
}
|
|
|
|
// CountEdges is a debug helper — returns the total edges currently stored.
|
|
// Used by tests and by the volume-gate diagnostic.
|
|
func (s *PGStore) CountEdges(ctx context.Context) (int64, error) {
|
|
var n int64
|
|
err := s.pool.QueryRow(ctx, `SELECT count(*) FROM brain_edges`).Scan(&n)
|
|
return n, err
|
|
}
|
|
|
|
func scanNeighbors(rows pgx.Rows) ([]Neighbor, error) {
|
|
var out []Neighbor
|
|
for rows.Next() {
|
|
var n Neighbor
|
|
if err := rows.Scan(
|
|
&n.Slug, &n.Type, &n.Wing, &n.Hall,
|
|
&n.DocPath, &n.Title, &n.EdgeType, &n.Distance,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scan: %w", err)
|
|
}
|
|
out = append(out, n)
|
|
}
|
|
return out, rows.Err()
|
|
}
|