Files
hyperguild/ingestion/internal/graphstore/pg.go
Mathias d5f112b600
All checks were successful
CI / Lint / Test / Vet (push) Successful in 13s
CI / Mirror to GitHub (push) Successful in 4s
feat(graph,graphstore): M2 parse tier+topic from frontmatter, persist via Upsert (infra#72)
extract.go now reads `tier:` and `topic:` from YAML frontmatter, with
a path-based fallback when frontmatter is absent (the pre-M3 state on
every existing entry):

  knowledge/* → tier=knowledge
  notes/*     → tier=note
  wiki/**     → tier=note   (sources + concepts + entities are I-level)
  inbox/**, raw/**, sessions/**, clips/** → tier=inbox

Frontmatter wins when present — covers the M3-migrated case where an
entry's path may not match the tier the author chose for it.

UpsertEntity persists both columns. M1's schema already has them.

Backfill on next pod start populates tier for the whole corpus
without any file moves; M3 will follow up with the actual layout
migration and explicit frontmatter writes.
2026-05-25 12:35:38 +02:00

366 lines
11 KiB
Go

// Package graphstore stores the brain knowledge graph (entities +
// directed edges) in PostgreSQL on the shared postgres18 instance,
// alongside the pgvector embeddings in [vectorstore].
//
// Schema (created idempotently by Init):
//
// brain_entities(slug PK, type, wing, hall, doc_path, title, updated_at)
// brain_edges(id PK, src_slug FK, dst_slug, edge_type, src_doc, src_line,
// weight, updated_at)
//
// Edges fan-out from a source document; calling [PGStore.ReplaceEdgesForDoc]
// replaces every edge previously emitted from that document so re-ingest is
// idempotent without bookkeeping.
//
// All slug strings are stored verbatim — callers are expected to canonicalise
// before persisting. Dst slugs may reference entities that don't yet exist
// (dangling edges); resolution is deferred to query time so ingestion order
// doesn't matter.
package graphstore
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/mathiasbq/hyperguild/ingestion/internal/graph"
)
// PGStore is the postgres-backed brain knowledge-graph store. Construct
// with New + call Init once to create tables and indexes. Use Close to
// release the pool.
type PGStore struct {
pool *pgxpool.Pool
}
// New opens a pgxpool against dsn and pings to verify connectivity. The
// caller owns the resulting PGStore and must invoke Close.
func New(ctx context.Context, dsn string) (*PGStore, error) {
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, fmt.Errorf("pgxpool: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping: %w", err)
}
return &PGStore{pool: pool}, nil
}
// Close releases the underlying connection pool.
func (s *PGStore) Close() {
if s.pool != nil {
s.pool.Close()
}
}
// Init creates brain_entities + brain_edges tables and their indexes if
// they don't yet exist. Safe to call on every startup. No-op when the
// schema already matches.
func (s *PGStore) Init(ctx context.Context) error {
const ddl = `
CREATE TABLE IF NOT EXISTS brain_entities (
slug TEXT PRIMARY KEY,
type TEXT NOT NULL DEFAULT 'knowledge',
wing TEXT NOT NULL DEFAULT '',
hall TEXT NOT NULL DEFAULT '',
doc_path TEXT NOT NULL,
title TEXT NOT NULL DEFAULT '',
tier TEXT NOT NULL DEFAULT '',
topic TEXT NOT NULL DEFAULT '',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Idempotent migration for clusters created before the DIKW tier
-- redesign (infra#72). ADD COLUMN IF NOT EXISTS is safe across
-- repeated startups.
ALTER TABLE brain_entities
ADD COLUMN IF NOT EXISTS tier TEXT NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS topic TEXT NOT NULL DEFAULT '';
CREATE INDEX IF NOT EXISTS brain_entities_wing_idx
ON brain_entities (wing) WHERE wing <> '';
CREATE INDEX IF NOT EXISTS brain_entities_type_idx
ON brain_entities (type);
CREATE INDEX IF NOT EXISTS brain_entities_tier_idx
ON brain_entities (tier) WHERE tier <> '';
CREATE INDEX IF NOT EXISTS brain_entities_topic_idx
ON brain_entities (topic) WHERE topic <> '';
CREATE TABLE IF NOT EXISTS brain_edges (
id BIGSERIAL PRIMARY KEY,
src_slug TEXT NOT NULL,
dst_slug TEXT NOT NULL,
edge_type TEXT NOT NULL DEFAULT 'wikilink',
src_doc TEXT NOT NULL,
src_line INTEGER NOT NULL DEFAULT 0,
weight REAL NOT NULL DEFAULT 1.0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS brain_edges_src_idx
ON brain_edges (src_slug, edge_type);
CREATE INDEX IF NOT EXISTS brain_edges_dst_idx
ON brain_edges (dst_slug, edge_type);
CREATE INDEX IF NOT EXISTS brain_edges_src_doc_idx
ON brain_edges (src_doc);
`
_, err := s.pool.Exec(ctx, ddl)
return err
}
// UpsertEntity inserts or updates one entity by slug.
func (s *PGStore) UpsertEntity(ctx context.Context, e graph.Entity) error {
if e.Slug == "" {
return errors.New("entity slug is required")
}
if e.Type == "" {
e.Type = "knowledge"
}
_, err := s.pool.Exec(ctx, `
INSERT INTO brain_entities (slug, type, wing, hall, doc_path, title, tier, topic, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now())
ON CONFLICT (slug) DO UPDATE
SET type = EXCLUDED.type,
wing = EXCLUDED.wing,
hall = EXCLUDED.hall,
doc_path = EXCLUDED.doc_path,
title = EXCLUDED.title,
tier = EXCLUDED.tier,
topic = EXCLUDED.topic,
updated_at = now()
`, e.Slug, e.Type, e.Wing, e.Hall, e.DocPath, e.Title, e.Tier, e.Topic)
if err != nil {
return fmt.Errorf("upsert entity %q: %w", e.Slug, err)
}
return nil
}
// ReplaceEdgesForDoc deletes every edge previously emitted from docPath
// and inserts the new set in one transaction. Caller should pass the
// complete edge set for the doc — partial updates are not supported.
func (s *PGStore) ReplaceEdgesForDoc(ctx context.Context, docPath string, edges []graph.Edge) error {
if docPath == "" {
return errors.New("doc path is required")
}
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()
if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil {
return fmt.Errorf("delete prior edges for %q: %w", docPath, err)
}
for _, e := range edges {
if e.SrcSlug == "" || e.DstSlug == "" {
continue
}
if _, err := tx.Exec(ctx, `
INSERT INTO brain_edges (src_slug, dst_slug, edge_type, src_doc, src_line, weight)
VALUES ($1, $2, $3, $4, $5, 1.0)
`, e.SrcSlug, e.DstSlug, e.EdgeType, e.SrcDoc, e.SrcLine); err != nil {
return fmt.Errorf("insert edge %s->%s: %w", e.SrcSlug, e.DstSlug, err)
}
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("commit: %w", err)
}
return nil
}
// DeleteByDoc removes the entity at docPath and every edge it sourced.
// Use when a wiki page is deleted on disk.
func (s *PGStore) DeleteByDoc(ctx context.Context, docPath string) error {
if docPath == "" {
return errors.New("doc path is required")
}
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()
if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil {
return fmt.Errorf("delete edges: %w", err)
}
if _, err := tx.Exec(ctx, `DELETE FROM brain_entities WHERE doc_path = $1`, docPath); err != nil {
return fmt.Errorf("delete entity: %w", err)
}
return tx.Commit(ctx)
}
// Neighbor is one row in a Neighbors / Subgraph response.
type Neighbor struct {
Slug string
Type string
Wing string
Hall string
DocPath string
Title string
EdgeType string
Distance int // hop count from origin; 1 for direct neighbors
}
// Neighbors returns the direct (1-hop) outgoing neighbours of slug.
// edgeType filters by relationship kind; "" returns all kinds.
// limit defaults to 25 when <= 0.
func (s *PGStore) Neighbors(ctx context.Context, slug, edgeType string, limit int) ([]Neighbor, error) {
if slug == "" {
return nil, errors.New("slug is required")
}
if limit <= 0 {
limit = 25
}
q := `
SELECT e.dst_slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''),
COALESCE(t.doc_path,''), COALESCE(t.title,''), e.edge_type, 1
FROM brain_edges e
LEFT JOIN brain_entities t ON t.slug = e.dst_slug
WHERE e.src_slug = $1
AND ($2 = '' OR e.edge_type = $2)
ORDER BY e.updated_at DESC
LIMIT $3
`
rows, err := s.pool.Query(ctx, q, slug, edgeType, limit)
if err != nil {
return nil, fmt.Errorf("query neighbors: %w", err)
}
defer rows.Close()
return scanNeighbors(rows)
}
// Subgraph returns every distinct slug reachable from origin within
// depth outgoing hops, annotated with the shortest hop distance. The
// origin itself is omitted. depth defaults to 2 when <= 0; values
// above 6 are clamped to 6 to bound traversal cost.
func (s *PGStore) Subgraph(ctx context.Context, origin string, depth int) ([]Neighbor, error) {
if origin == "" {
return nil, errors.New("origin slug is required")
}
if depth <= 0 {
depth = 2
}
if depth > 6 {
depth = 6
}
q := `
WITH RECURSIVE walk(slug, edge_type, distance) AS (
SELECT e.dst_slug, e.edge_type, 1
FROM brain_edges e
WHERE e.src_slug = $1
UNION
SELECT e.dst_slug, e.edge_type, w.distance + 1
FROM walk w
JOIN brain_edges e ON e.src_slug = w.slug
WHERE w.distance < $2
)
SELECT w.slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''),
COALESCE(t.doc_path,''), COALESCE(t.title,''), w.edge_type, MIN(w.distance)
FROM walk w
LEFT JOIN brain_entities t ON t.slug = w.slug
WHERE w.slug <> $1
GROUP BY w.slug, t.type, t.wing, t.hall, t.doc_path, t.title, w.edge_type
ORDER BY MIN(w.distance), w.slug
`
rows, err := s.pool.Query(ctx, q, origin, depth)
if err != nil {
return nil, fmt.Errorf("query subgraph: %w", err)
}
defer rows.Close()
return scanNeighbors(rows)
}
// PathStep is one hop in a Path response.
type PathStep struct {
FromSlug string
ToSlug string
EdgeType string
}
// Path returns the shortest directed path from src to dst within
// maxDepth hops, as an ordered list of edges. Empty slice means no
// path exists. maxDepth defaults to 4 when <= 0; values above 8 are
// clamped to 8.
func (s *PGStore) Path(ctx context.Context, src, dst string, maxDepth int) ([]PathStep, error) {
if src == "" || dst == "" {
return nil, errors.New("src and dst are required")
}
if maxDepth <= 0 {
maxDepth = 4
}
if maxDepth > 8 {
maxDepth = 8
}
q := `
WITH RECURSIVE walk(cur, path_slugs, path_edges, distance) AS (
SELECT e.dst_slug,
ARRAY[e.src_slug, e.dst_slug]::TEXT[],
ARRAY[e.edge_type]::TEXT[],
1
FROM brain_edges e
WHERE e.src_slug = $1
UNION ALL
SELECT e.dst_slug,
w.path_slugs || e.dst_slug,
w.path_edges || e.edge_type,
w.distance + 1
FROM walk w
JOIN brain_edges e ON e.src_slug = w.cur
WHERE w.distance < $3
AND NOT (e.dst_slug = ANY(w.path_slugs))
)
SELECT path_slugs, path_edges
FROM walk
WHERE cur = $2
ORDER BY distance ASC
LIMIT 1
`
row := s.pool.QueryRow(ctx, q, src, dst, maxDepth)
var (
slugs []string
kinds []string
)
if err := row.Scan(&slugs, &kinds); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("scan path: %w", err)
}
if len(slugs) < 2 || len(kinds) == 0 {
return nil, nil
}
steps := make([]PathStep, 0, len(kinds))
for i := 0; i < len(kinds) && i+1 < len(slugs); i++ {
steps = append(steps, PathStep{
FromSlug: slugs[i],
ToSlug: slugs[i+1],
EdgeType: kinds[i],
})
}
return steps, nil
}
// CountEdges is a debug helper — returns the total edges currently stored.
// Used by tests and by the volume-gate diagnostic.
func (s *PGStore) CountEdges(ctx context.Context) (int64, error) {
var n int64
err := s.pool.QueryRow(ctx, `SELECT count(*) FROM brain_edges`).Scan(&n)
return n, err
}
func scanNeighbors(rows pgx.Rows) ([]Neighbor, error) {
var out []Neighbor
for rows.Next() {
var n Neighbor
if err := rows.Scan(
&n.Slug, &n.Type, &n.Wing, &n.Hall,
&n.DocPath, &n.Title, &n.EdgeType, &n.Distance,
); err != nil {
return nil, fmt.Errorf("scan: %w", err)
}
out = append(out, n)
}
return out, rows.Err()
}