Skip to content

memoir.taxonomy package

memoir.taxonomy

Semantic taxonomy components.

TaxonomyLoader

High-level loader for consuming taxonomy data in services/apps.

Provides convenient methods for: - Loading taxonomy from markdown files (builtin or external) - Saving taxonomy data to the memoir store - Reading taxonomy from store (for classifier/search) - Formatting data for LLM prompts

Source code in src/memoir/taxonomy/loader.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class TaxonomyLoader:
    """
    High-level loader for consuming taxonomy data in services/apps.

    Provides convenient methods for:
    - Loading taxonomy from markdown files (builtin or external)
    - Saving taxonomy data to the memoir store
    - Reading taxonomy from store (for classifier/search)
    - Formatting data for LLM prompts
    """

    def __init__(self, store: Any = None):
        """Initialize the taxonomy loader.

        Args:
            store: ProllyTreeStore instance for persistence.
                   If None, store operations will raise errors.
        """
        self.store = store
        self.registry = TaxonomyRegistry()
        self.namespace = TAXONOMY_NAMESPACE
        self._parser = MarkdownTaxonomySource()

    # -------------------------------------------------------------------------
    # Loading from files to registry
    # -------------------------------------------------------------------------

    def load_builtin(self) -> list[str]:
        """Load all built-in taxonomy files into the registry.

        Returns:
            List of loaded taxonomy IDs.
        """
        return self.registry.load_builtin()

    def load_external(self, path: Path | str) -> str:
        """Load an external taxonomy file into the registry.

        Args:
            path: Path to the markdown file.

        Returns:
            ID of the loaded taxonomy.
        """
        return self.registry.load_external(path)

    # -------------------------------------------------------------------------
    # Saving to store
    # -------------------------------------------------------------------------

    def _ensure_store(self) -> None:
        """Ensure store is available."""
        if self.store is None:
            raise RuntimeError("Store not initialized. Pass store to TaxonomyLoader.")

    def save_to_store(self, taxonomy_id: str) -> bool:
        """Save a single taxonomy entry to the store.

        Args:
            taxonomy_id: ID of the taxonomy to save.

        Returns:
            True if saved successfully, False if not found.
        """
        self._ensure_store()

        data = self.registry.get(taxonomy_id)
        if not data:
            logger.warning(f"Taxonomy not found in registry: {taxonomy_id}")
            return False

        # Save metadata
        meta_key = f"meta:{taxonomy_id}"
        meta_value = {
            "type": data.metadata.type,
            "id": data.metadata.id,
            "name": data.metadata.name,
            "domain": data.metadata.domain,
            "version": data.metadata.version,
            "author": data.metadata.author,
            "description": data.metadata.description,
        }
        if data.metadata.created:
            meta_value["created"] = data.metadata.created
        if data.metadata.updated:
            meta_value["updated"] = data.metadata.updated
        if data.metadata.taxonomy_version:
            meta_value["taxonomy_version"] = data.metadata.taxonomy_version

        self.store.put(self.namespace, meta_key, {"value": meta_value})

        # Save type-specific data
        if data.metadata.type == "examples" and data.examples:
            examples_key = f"examples:{taxonomy_id}"
            examples_value = [
                {"input": inp, "path": path, "reasoning": reason}
                for inp, path, reason in data.examples
            ]
            self.store.put(self.namespace, examples_key, {"value": examples_value})

        elif data.metadata.type == "descriptions" and data.descriptions:
            desc_key = f"descriptions:{taxonomy_id}"
            self.store.put(self.namespace, desc_key, {"value": data.descriptions})

        elif data.metadata.type == "preset" and data.paths:
            preset_key = f"preset:{taxonomy_id}"
            self.store.put(self.namespace, preset_key, {"value": data.paths})

        # Update indexes
        self._update_indexes(data)

        logger.debug(f"Saved taxonomy to store: {taxonomy_id}")
        return True

    def save_all_to_store(self) -> int:
        """Save all taxonomies in the registry to the store.

        Returns:
            Number of taxonomies saved.
        """
        self._ensure_store()

        saved_count = 0
        for taxonomy_id in self.registry.list_ids():
            if self.save_to_store(taxonomy_id):
                saved_count += 1

        return saved_count

    def _update_indexes(self, data: TaxonomyData) -> None:
        """Update the type and domain indexes in the store.

        Args:
            data: The taxonomy data to index.
        """
        taxonomy_id = data.metadata.id
        taxonomy_type = data.metadata.type
        domain = data.metadata.domain

        # Update type index
        type_index_key = "index:by-type"
        type_index = self._get_from_store(type_index_key, {})
        if taxonomy_type not in type_index:
            type_index[taxonomy_type] = []
        if taxonomy_id not in type_index[taxonomy_type]:
            type_index[taxonomy_type].append(taxonomy_id)
        self.store.put(self.namespace, type_index_key, {"value": type_index})

        # Update domain index
        domain_index_key = "index:by-domain"
        domain_index = self._get_from_store(domain_index_key, {})
        if domain not in domain_index:
            domain_index[domain] = []
        if taxonomy_id not in domain_index[domain]:
            domain_index[domain].append(taxonomy_id)
        self.store.put(self.namespace, domain_index_key, {"value": domain_index})

    def _get_from_store(self, key: str, default: Any = None) -> Any:
        """Get a value from the store with default.

        Args:
            key: Store key.
            default: Default value if not found.

        Returns:
            Value from store or default.
        """
        result = self.store.get(self.namespace, key)
        if result is None:
            return default
        # Handle the Item wrapper if present
        if hasattr(result, "value"):
            return result.value.get("value", default)
        if isinstance(result, dict):
            return result.get("value", default)
        return default

    # -------------------------------------------------------------------------
    # Loading from store (for classifier/search)
    # -------------------------------------------------------------------------

    def get_examples_from_store(
        self, limit: int | None = None, domain: str | None = None
    ) -> list[tuple[str, str, str]]:
        """Get classification examples from the store.

        Args:
            limit: Maximum number of examples to return.
            domain: Domain to filter by (default: general).

        Returns:
            List of (input_text, path, reasoning) tuples.
        """
        self._ensure_store()

        # Get type index
        type_index = self._get_from_store("index:by-type", {})
        example_ids = type_index.get("examples", [])
        logger.debug(
            f"[TaxonomyLoader] Loading examples from store, found IDs: {example_ids}"
        )

        # Filter by domain if specified
        if domain:
            domain_index = self._get_from_store("index:by-domain", {})
            domain_ids = set(domain_index.get(domain, []))
            example_ids = [eid for eid in example_ids if eid in domain_ids]

        # Collect all examples
        examples: list[tuple[str, str, str]] = []
        for taxonomy_id in example_ids:
            key = f"examples:{taxonomy_id}"
            example_data = self._get_from_store(key, [])
            for item in example_data:
                examples.append((item["input"], item["path"], item["reasoning"]))
                if limit and len(examples) >= limit:
                    logger.debug(
                        f"[TaxonomyLoader] Loaded {len(examples)} examples from store (limit reached)"
                    )
                    return examples

        logger.debug(f"[TaxonomyLoader] Loaded {len(examples)} examples from store")
        return examples[:limit] if limit else examples

    def get_descriptions_from_store(self, domain: str | None = None) -> dict[str, str]:
        """Get category descriptions from the store.

        Args:
            domain: Domain to filter by (default: general).

        Returns:
            Dict mapping category to description.
        """
        self._ensure_store()

        # Get type index
        type_index = self._get_from_store("index:by-type", {})
        desc_ids = type_index.get("descriptions", [])

        # Filter by domain if specified
        if domain:
            domain_index = self._get_from_store("index:by-domain", {})
            domain_ids = set(domain_index.get(domain, []))
            # Include both general and domain-specific
            general_ids = set(domain_index.get("general", []))
            desc_ids = [
                did for did in desc_ids if did in domain_ids or did in general_ids
            ]

        # Merge descriptions (later entries override earlier)
        descriptions: dict[str, str] = {}
        for taxonomy_id in desc_ids:
            key = f"descriptions:{taxonomy_id}"
            desc_data = self._get_from_store(key, {})
            descriptions.update(desc_data)

        logger.debug(
            f"[TaxonomyLoader] Loaded {len(descriptions)} category descriptions from store"
        )
        return descriptions

    def get_preset_paths_from_store(
        self, preset_id: str | None = None
    ) -> dict[str, list[str]]:
        """Get preset taxonomy paths from the store.

        Args:
            preset_id: Specific preset ID to load, or None for all.

        Returns:
            Dict mapping category to list of paths.
        """
        self._ensure_store()

        if preset_id:
            key = f"preset:{preset_id}"
            paths = self._get_from_store(key, {})
            logger.debug(
                f"[TaxonomyLoader] Loaded preset '{preset_id}' from store: {len(paths)} categories"
            )
            return paths

        # Get all presets
        type_index = self._get_from_store("index:by-type", {})
        preset_ids = type_index.get("preset", [])

        paths: dict[str, list[str]] = {}
        for pid in preset_ids:
            key = f"preset:{pid}"
            preset_data = self._get_from_store(key, {})
            for category, category_paths in preset_data.items():
                if category not in paths:
                    paths[category] = []
                paths[category].extend(category_paths)

        return paths

    # -------------------------------------------------------------------------
    # Convenience: Initialize store from files
    # -------------------------------------------------------------------------

    def init_store(
        self,
        include_builtin: bool = True,
        external_paths: list[Path | str] | None = None,
        merge_strategy: str = "extend",
    ) -> dict[str, Any]:
        """Initialize the store with taxonomy data from files.

        Args:
            include_builtin: Whether to load builtin taxonomy files.
            external_paths: List of external markdown file paths.
            merge_strategy: How to handle existing data:
                - "extend": Add new entries, keep existing (default)
                - "override": External entries replace same-id entries
                - "replace": Clear store, load only specified sources

        Returns:
            Dict with counts of loaded taxonomies by type.
        """
        self._ensure_store()

        # Clear if replace strategy
        if merge_strategy == "replace":
            self._clear_taxonomy_from_store()
            self.registry.clear()

        loaded: dict[str, int] = {"examples": 0, "descriptions": 0, "preset": 0}

        # Load builtin
        if include_builtin:
            builtin_ids = self.load_builtin()
            for tid in builtin_ids:
                data = self.registry.get(tid)
                if data:
                    loaded[data.metadata.type] = loaded.get(data.metadata.type, 0) + 1

        # Load external
        if external_paths:
            for path in external_paths:
                try:
                    tid = self.load_external(path)
                    data = self.registry.get(tid)
                    if data:
                        loaded[data.metadata.type] = (
                            loaded.get(data.metadata.type, 0) + 1
                        )
                except Exception as e:
                    logger.error(f"Failed to load external taxonomy {path}: {e}")

        # Save to store
        saved_count = self.save_all_to_store()
        logger.info(f"Initialized store with {saved_count} taxonomy entries")

        return {
            "loaded": loaded,
            "saved": saved_count,
        }

    def _clear_taxonomy_from_store(self) -> None:
        """Clear all taxonomy data from the store."""
        # Get all keys and remove them
        type_index = self._get_from_store("index:by-type", {})

        for taxonomy_type, ids in type_index.items():
            for tid in ids:
                if taxonomy_type == "examples":
                    self.store.delete(self.namespace, f"examples:{tid}")
                elif taxonomy_type == "descriptions":
                    self.store.delete(self.namespace, f"descriptions:{tid}")
                elif taxonomy_type == "preset":
                    self.store.delete(self.namespace, f"preset:{tid}")
                self.store.delete(self.namespace, f"meta:{tid}")

        # Clear indexes
        self.store.delete(self.namespace, "index:by-type")
        self.store.delete(self.namespace, "index:by-domain")

    # -------------------------------------------------------------------------
    # Prompt formatting (reads from store)
    # -------------------------------------------------------------------------

    def format_for_prompt(
        self,
        include_examples: bool = True,
        include_descriptions: bool = True,
        example_limit: int = 8,
        domain: str | None = None,
    ) -> str:
        """Format taxonomy data for LLM prompt insertion.

        Reads from the store (not registry) to ensure consistency
        with what's persisted.

        Args:
            include_examples: Whether to include classification examples.
            include_descriptions: Whether to include category descriptions.
            example_limit: Maximum number of examples to include.
            domain: Domain to filter by.

        Returns:
            Formatted string ready for prompt inclusion.
        """
        parts = []

        if include_descriptions:
            descriptions = self.get_descriptions_from_store(domain)
            if descriptions:
                parts.append("TAXONOMY CATEGORIES:")
                for cat, desc in sorted(descriptions.items()):
                    parts.append(f"  {cat}: {desc}")
                parts.append("")

        if include_examples:
            examples = self.get_examples_from_store(limit=example_limit, domain=domain)
            if examples:
                parts.append(
                    "CLASSIFICATION EXAMPLES (3-level paths: category.subcategory.type):"
                )
                for input_text, path, _reasoning in examples:
                    parts.append(f'  "{input_text}" -> {path}')
                parts.append("")

        return "\n".join(parts)

    # -------------------------------------------------------------------------
    # Utility methods
    # -------------------------------------------------------------------------

    def list_stored_taxonomies(self) -> dict[str, list[str]]:
        """List all taxonomies stored in the store, grouped by type.

        Returns:
            Dict mapping type to list of taxonomy IDs.
        """
        self._ensure_store()
        return self._get_from_store("index:by-type", {})

    def get_taxonomy_metadata(self, taxonomy_id: str) -> dict[str, Any] | None:
        """Get metadata for a specific taxonomy from the store.

        Args:
            taxonomy_id: The taxonomy ID.

        Returns:
            Metadata dict or None if not found.
        """
        self._ensure_store()
        return self._get_from_store(f"meta:{taxonomy_id}")

    def has_taxonomy_in_store(self) -> bool:
        """Check if any taxonomy data exists in the store.

        Returns:
            True if taxonomy data exists.
        """
        self._ensure_store()
        type_index = self._get_from_store("index:by-type", {})
        return bool(type_index)

__init__

__init__(store: Any = None)

Initialize the taxonomy loader.

Parameters:

Name Type Description Default
store Any

ProllyTreeStore instance for persistence. If None, store operations will raise errors.

None
Source code in src/memoir/taxonomy/loader.py
def __init__(self, store: Any = None):
    """Initialize the taxonomy loader.

    Args:
        store: ProllyTreeStore instance for persistence.
               If None, store operations will raise errors.
    """
    self.store = store
    self.registry = TaxonomyRegistry()
    self.namespace = TAXONOMY_NAMESPACE
    self._parser = MarkdownTaxonomySource()

load_builtin

load_builtin() -> list[str]

Load all built-in taxonomy files into the registry.

Returns:

Type Description
list[str]

List of loaded taxonomy IDs.

Source code in src/memoir/taxonomy/loader.py
def load_builtin(self) -> list[str]:
    """Load all built-in taxonomy files into the registry.

    Returns:
        List of loaded taxonomy IDs.
    """
    return self.registry.load_builtin()

load_external

load_external(path: Path | str) -> str

Load an external taxonomy file into the registry.

Parameters:

Name Type Description Default
path Path | str

Path to the markdown file.

required

Returns:

Type Description
str

ID of the loaded taxonomy.

Source code in src/memoir/taxonomy/loader.py
def load_external(self, path: Path | str) -> str:
    """Load an external taxonomy file into the registry.

    Args:
        path: Path to the markdown file.

    Returns:
        ID of the loaded taxonomy.
    """
    return self.registry.load_external(path)

save_to_store

save_to_store(taxonomy_id: str) -> bool

Save a single taxonomy entry to the store.

Parameters:

Name Type Description Default
taxonomy_id str

ID of the taxonomy to save.

required

Returns:

Type Description
bool

True if saved successfully, False if not found.

Source code in src/memoir/taxonomy/loader.py
def save_to_store(self, taxonomy_id: str) -> bool:
    """Save a single taxonomy entry to the store.

    Args:
        taxonomy_id: ID of the taxonomy to save.

    Returns:
        True if saved successfully, False if not found.
    """
    self._ensure_store()

    data = self.registry.get(taxonomy_id)
    if not data:
        logger.warning(f"Taxonomy not found in registry: {taxonomy_id}")
        return False

    # Save metadata
    meta_key = f"meta:{taxonomy_id}"
    meta_value = {
        "type": data.metadata.type,
        "id": data.metadata.id,
        "name": data.metadata.name,
        "domain": data.metadata.domain,
        "version": data.metadata.version,
        "author": data.metadata.author,
        "description": data.metadata.description,
    }
    if data.metadata.created:
        meta_value["created"] = data.metadata.created
    if data.metadata.updated:
        meta_value["updated"] = data.metadata.updated
    if data.metadata.taxonomy_version:
        meta_value["taxonomy_version"] = data.metadata.taxonomy_version

    self.store.put(self.namespace, meta_key, {"value": meta_value})

    # Save type-specific data
    if data.metadata.type == "examples" and data.examples:
        examples_key = f"examples:{taxonomy_id}"
        examples_value = [
            {"input": inp, "path": path, "reasoning": reason}
            for inp, path, reason in data.examples
        ]
        self.store.put(self.namespace, examples_key, {"value": examples_value})

    elif data.metadata.type == "descriptions" and data.descriptions:
        desc_key = f"descriptions:{taxonomy_id}"
        self.store.put(self.namespace, desc_key, {"value": data.descriptions})

    elif data.metadata.type == "preset" and data.paths:
        preset_key = f"preset:{taxonomy_id}"
        self.store.put(self.namespace, preset_key, {"value": data.paths})

    # Update indexes
    self._update_indexes(data)

    logger.debug(f"Saved taxonomy to store: {taxonomy_id}")
    return True

save_all_to_store

save_all_to_store() -> int

Save all taxonomies in the registry to the store.

Returns:

Type Description
int

Number of taxonomies saved.

Source code in src/memoir/taxonomy/loader.py
def save_all_to_store(self) -> int:
    """Save all taxonomies in the registry to the store.

    Returns:
        Number of taxonomies saved.
    """
    self._ensure_store()

    saved_count = 0
    for taxonomy_id in self.registry.list_ids():
        if self.save_to_store(taxonomy_id):
            saved_count += 1

    return saved_count

get_examples_from_store

get_examples_from_store(limit: int | None = None, domain: str | None = None) -> list[tuple[str, str, str]]

Get classification examples from the store.

Parameters:

Name Type Description Default
limit int | None

Maximum number of examples to return.

None
domain str | None

Domain to filter by (default: general).

None

Returns:

Type Description
list[tuple[str, str, str]]

List of (input_text, path, reasoning) tuples.

Source code in src/memoir/taxonomy/loader.py
def get_examples_from_store(
    self, limit: int | None = None, domain: str | None = None
) -> list[tuple[str, str, str]]:
    """Get classification examples from the store.

    Args:
        limit: Maximum number of examples to return.
        domain: Domain to filter by (default: general).

    Returns:
        List of (input_text, path, reasoning) tuples.
    """
    self._ensure_store()

    # Get type index
    type_index = self._get_from_store("index:by-type", {})
    example_ids = type_index.get("examples", [])
    logger.debug(
        f"[TaxonomyLoader] Loading examples from store, found IDs: {example_ids}"
    )

    # Filter by domain if specified
    if domain:
        domain_index = self._get_from_store("index:by-domain", {})
        domain_ids = set(domain_index.get(domain, []))
        example_ids = [eid for eid in example_ids if eid in domain_ids]

    # Collect all examples
    examples: list[tuple[str, str, str]] = []
    for taxonomy_id in example_ids:
        key = f"examples:{taxonomy_id}"
        example_data = self._get_from_store(key, [])
        for item in example_data:
            examples.append((item["input"], item["path"], item["reasoning"]))
            if limit and len(examples) >= limit:
                logger.debug(
                    f"[TaxonomyLoader] Loaded {len(examples)} examples from store (limit reached)"
                )
                return examples

    logger.debug(f"[TaxonomyLoader] Loaded {len(examples)} examples from store")
    return examples[:limit] if limit else examples

get_descriptions_from_store

get_descriptions_from_store(domain: str | None = None) -> dict[str, str]

Get category descriptions from the store.

Parameters:

Name Type Description Default
domain str | None

Domain to filter by (default: general).

None

Returns:

Type Description
dict[str, str]

Dict mapping category to description.

Source code in src/memoir/taxonomy/loader.py
def get_descriptions_from_store(self, domain: str | None = None) -> dict[str, str]:
    """Get category descriptions from the store.

    Args:
        domain: Domain to filter by (default: general).

    Returns:
        Dict mapping category to description.
    """
    self._ensure_store()

    # Get type index
    type_index = self._get_from_store("index:by-type", {})
    desc_ids = type_index.get("descriptions", [])

    # Filter by domain if specified
    if domain:
        domain_index = self._get_from_store("index:by-domain", {})
        domain_ids = set(domain_index.get(domain, []))
        # Include both general and domain-specific
        general_ids = set(domain_index.get("general", []))
        desc_ids = [
            did for did in desc_ids if did in domain_ids or did in general_ids
        ]

    # Merge descriptions (later entries override earlier)
    descriptions: dict[str, str] = {}
    for taxonomy_id in desc_ids:
        key = f"descriptions:{taxonomy_id}"
        desc_data = self._get_from_store(key, {})
        descriptions.update(desc_data)

    logger.debug(
        f"[TaxonomyLoader] Loaded {len(descriptions)} category descriptions from store"
    )
    return descriptions

get_preset_paths_from_store

get_preset_paths_from_store(preset_id: str | None = None) -> dict[str, list[str]]

Get preset taxonomy paths from the store.

Parameters:

Name Type Description Default
preset_id str | None

Specific preset ID to load, or None for all.

None

Returns:

Type Description
dict[str, list[str]]

Dict mapping category to list of paths.

Source code in src/memoir/taxonomy/loader.py
def get_preset_paths_from_store(
    self, preset_id: str | None = None
) -> dict[str, list[str]]:
    """Get preset taxonomy paths from the store.

    Args:
        preset_id: Specific preset ID to load, or None for all.

    Returns:
        Dict mapping category to list of paths.
    """
    self._ensure_store()

    if preset_id:
        key = f"preset:{preset_id}"
        paths = self._get_from_store(key, {})
        logger.debug(
            f"[TaxonomyLoader] Loaded preset '{preset_id}' from store: {len(paths)} categories"
        )
        return paths

    # Get all presets
    type_index = self._get_from_store("index:by-type", {})
    preset_ids = type_index.get("preset", [])

    paths: dict[str, list[str]] = {}
    for pid in preset_ids:
        key = f"preset:{pid}"
        preset_data = self._get_from_store(key, {})
        for category, category_paths in preset_data.items():
            if category not in paths:
                paths[category] = []
            paths[category].extend(category_paths)

    return paths

init_store

init_store(include_builtin: bool = True, external_paths: list[Path | str] | None = None, merge_strategy: str = 'extend') -> dict[str, Any]

Initialize the store with taxonomy data from files.

Parameters:

Name Type Description Default
include_builtin bool

Whether to load builtin taxonomy files.

True
external_paths list[Path | str] | None

List of external markdown file paths.

None
merge_strategy str

How to handle existing data: - "extend": Add new entries, keep existing (default) - "override": External entries replace same-id entries - "replace": Clear store, load only specified sources

'extend'

Returns:

Type Description
dict[str, Any]

Dict with counts of loaded taxonomies by type.

Source code in src/memoir/taxonomy/loader.py
def init_store(
    self,
    include_builtin: bool = True,
    external_paths: list[Path | str] | None = None,
    merge_strategy: str = "extend",
) -> dict[str, Any]:
    """Initialize the store with taxonomy data from files.

    Args:
        include_builtin: Whether to load builtin taxonomy files.
        external_paths: List of external markdown file paths.
        merge_strategy: How to handle existing data:
            - "extend": Add new entries, keep existing (default)
            - "override": External entries replace same-id entries
            - "replace": Clear store, load only specified sources

    Returns:
        Dict with counts of loaded taxonomies by type.
    """
    self._ensure_store()

    # Clear if replace strategy
    if merge_strategy == "replace":
        self._clear_taxonomy_from_store()
        self.registry.clear()

    loaded: dict[str, int] = {"examples": 0, "descriptions": 0, "preset": 0}

    # Load builtin
    if include_builtin:
        builtin_ids = self.load_builtin()
        for tid in builtin_ids:
            data = self.registry.get(tid)
            if data:
                loaded[data.metadata.type] = loaded.get(data.metadata.type, 0) + 1

    # Load external
    if external_paths:
        for path in external_paths:
            try:
                tid = self.load_external(path)
                data = self.registry.get(tid)
                if data:
                    loaded[data.metadata.type] = (
                        loaded.get(data.metadata.type, 0) + 1
                    )
            except Exception as e:
                logger.error(f"Failed to load external taxonomy {path}: {e}")

    # Save to store
    saved_count = self.save_all_to_store()
    logger.info(f"Initialized store with {saved_count} taxonomy entries")

    return {
        "loaded": loaded,
        "saved": saved_count,
    }

format_for_prompt

format_for_prompt(include_examples: bool = True, include_descriptions: bool = True, example_limit: int = 8, domain: str | None = None) -> str

Format taxonomy data for LLM prompt insertion.

Reads from the store (not registry) to ensure consistency with what's persisted.

Parameters:

Name Type Description Default
include_examples bool

Whether to include classification examples.

True
include_descriptions bool

Whether to include category descriptions.

True
example_limit int

Maximum number of examples to include.

8
domain str | None

Domain to filter by.

None

Returns:

Type Description
str

Formatted string ready for prompt inclusion.

Source code in src/memoir/taxonomy/loader.py
def format_for_prompt(
    self,
    include_examples: bool = True,
    include_descriptions: bool = True,
    example_limit: int = 8,
    domain: str | None = None,
) -> str:
    """Format taxonomy data for LLM prompt insertion.

    Reads from the store (not registry) to ensure consistency
    with what's persisted.

    Args:
        include_examples: Whether to include classification examples.
        include_descriptions: Whether to include category descriptions.
        example_limit: Maximum number of examples to include.
        domain: Domain to filter by.

    Returns:
        Formatted string ready for prompt inclusion.
    """
    parts = []

    if include_descriptions:
        descriptions = self.get_descriptions_from_store(domain)
        if descriptions:
            parts.append("TAXONOMY CATEGORIES:")
            for cat, desc in sorted(descriptions.items()):
                parts.append(f"  {cat}: {desc}")
            parts.append("")

    if include_examples:
        examples = self.get_examples_from_store(limit=example_limit, domain=domain)
        if examples:
            parts.append(
                "CLASSIFICATION EXAMPLES (3-level paths: category.subcategory.type):"
            )
            for input_text, path, _reasoning in examples:
                parts.append(f'  "{input_text}" -> {path}')
            parts.append("")

    return "\n".join(parts)

list_stored_taxonomies

list_stored_taxonomies() -> dict[str, list[str]]

List all taxonomies stored in the store, grouped by type.

Returns:

Type Description
dict[str, list[str]]

Dict mapping type to list of taxonomy IDs.

Source code in src/memoir/taxonomy/loader.py
def list_stored_taxonomies(self) -> dict[str, list[str]]:
    """List all taxonomies stored in the store, grouped by type.

    Returns:
        Dict mapping type to list of taxonomy IDs.
    """
    self._ensure_store()
    return self._get_from_store("index:by-type", {})

get_taxonomy_metadata

get_taxonomy_metadata(taxonomy_id: str) -> dict[str, Any] | None

Get metadata for a specific taxonomy from the store.

Parameters:

Name Type Description Default
taxonomy_id str

The taxonomy ID.

required

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found.

Source code in src/memoir/taxonomy/loader.py
def get_taxonomy_metadata(self, taxonomy_id: str) -> dict[str, Any] | None:
    """Get metadata for a specific taxonomy from the store.

    Args:
        taxonomy_id: The taxonomy ID.

    Returns:
        Metadata dict or None if not found.
    """
    self._ensure_store()
    return self._get_from_store(f"meta:{taxonomy_id}")

has_taxonomy_in_store

has_taxonomy_in_store() -> bool

Check if any taxonomy data exists in the store.

Returns:

Type Description
bool

True if taxonomy data exists.

Source code in src/memoir/taxonomy/loader.py
def has_taxonomy_in_store(self) -> bool:
    """Check if any taxonomy data exists in the store.

    Returns:
        True if taxonomy data exists.
    """
    self._ensure_store()
    type_index = self._get_from_store("index:by-type", {})
    return bool(type_index)

MarkdownTaxonomySource

Markdown file-based taxonomy data source.

Parses YAML frontmatter and structured markdown content into taxonomy data structures.

Supported types: - examples: Classification examples in markdown tables - descriptions: Category descriptions in a markdown table - preset: Taxonomy paths in bullet lists under headers

Source code in src/memoir/taxonomy/markdown_source.py
class MarkdownTaxonomySource:
    """
    Markdown file-based taxonomy data source.

    Parses YAML frontmatter and structured markdown content
    into taxonomy data structures.

    Supported types:
    - examples: Classification examples in markdown tables
    - descriptions: Category descriptions in a markdown table
    - preset: Taxonomy paths in bullet lists under headers
    """

    def __init__(self, encoding: str = "utf-8"):
        """Initialize the markdown source parser.

        Args:
            encoding: File encoding to use when reading files.
        """
        self.encoding = encoding

    def load(self, path: Path) -> TaxonomyData:
        """Load and parse a markdown taxonomy file.

        Args:
            path: Path to the markdown file.

        Returns:
            Parsed TaxonomyData.

        Raises:
            TaxonomyParseError: If the file cannot be parsed.
            FileNotFoundError: If the file doesn't exist.
        """
        if not path.exists():
            raise FileNotFoundError(f"Taxonomy file not found: {path}")

        content = path.read_text(encoding=self.encoding)
        return self.parse(content)

    def parse(self, content: str) -> TaxonomyData:
        """Parse markdown content into TaxonomyData.

        Args:
            content: Raw markdown content.

        Returns:
            Parsed TaxonomyData.

        Raises:
            TaxonomyParseError: If the content cannot be parsed.
        """
        metadata, body = self._split_frontmatter(content)

        if metadata.type == "examples":
            examples = self._parse_examples_tables(body)
            return TaxonomyData(metadata=metadata, examples=examples, raw_content=body)
        elif metadata.type == "descriptions":
            descriptions = self._parse_descriptions_table(body)
            return TaxonomyData(
                metadata=metadata, descriptions=descriptions, raw_content=body
            )
        elif metadata.type == "preset":
            paths = self._parse_preset_lists(body)
            return TaxonomyData(metadata=metadata, paths=paths, raw_content=body)
        else:
            raise TaxonomyParseError(f"Unknown taxonomy type: {metadata.type}")

    def _split_frontmatter(self, content: str) -> tuple[TaxonomyMetadata, str]:
        """Split YAML frontmatter from markdown body.

        Args:
            content: Raw markdown content.

        Returns:
            Tuple of (metadata, body).

        Raises:
            TaxonomyParseError: If frontmatter is missing or invalid.
        """
        pattern = r"^---\s*\n(.*?)\n---\s*\n(.*)$"
        match = re.match(pattern, content, re.DOTALL)
        if not match:
            raise TaxonomyParseError("Invalid markdown: missing YAML frontmatter")

        yaml_content = match.group(1)
        body = match.group(2)

        try:
            meta_dict = yaml.safe_load(yaml_content)
        except yaml.YAMLError as e:
            raise TaxonomyParseError(f"Invalid YAML frontmatter: {e}") from e

        # Validate required fields
        required_fields = ["type", "id", "name"]
        for field_name in required_fields:
            if field_name not in meta_dict:
                raise TaxonomyParseError(
                    f"Missing required field in frontmatter: {field_name}"
                )

        # Handle optional list fields that might be None
        if meta_dict.get("tags") is None:
            meta_dict["tags"] = []
        if meta_dict.get("dependencies") is None:
            meta_dict["dependencies"] = []

        metadata = TaxonomyMetadata(**meta_dict)
        return metadata, body

    def _parse_examples_tables(self, body: str) -> list[tuple[str, str, str]]:
        """Parse markdown tables under ## headers into examples.

        Expected format:
        ## category_name
        | Input | Path | Reasoning |
        |-------|------|-----------|
        | My name is Sarah | profile.personal.identity | identity info |

        Args:
            body: Markdown body content.

        Returns:
            List of (input_text, path, reasoning) tuples.
        """
        examples = []

        # Split by ## headers
        sections = re.split(r"^## (\w+)\s*$", body, flags=re.MULTILINE)

        # sections[0] is content before first ##, then alternating category/content
        for i in range(1, len(sections), 2):
            if i + 1 >= len(sections):
                break

            # category = sections[i]  # Not needed, path includes category
            content = sections[i + 1]

            # Parse table rows
            table_examples = self._parse_table_rows(content)
            examples.extend(table_examples)

        return examples

    def _parse_table_rows(self, content: str) -> list[tuple[str, str, str]]:
        """Parse markdown table rows into example tuples.

        Args:
            content: Content containing a markdown table.

        Returns:
            List of (input, path, reasoning) tuples.
        """
        examples = []
        lines = content.strip().split("\n")

        in_table = False
        for line in lines:
            line = line.strip()

            # Skip empty lines
            if not line:
                continue

            # Skip header row and separator
            if (
                line.startswith("| Input")
                or line.startswith("|--")
                or line.startswith("| ---")
            ):
                in_table = True
                continue

            # Parse data rows
            if in_table and line.startswith("|") and line.endswith("|"):
                cells = [cell.strip() for cell in line.split("|")[1:-1]]
                if len(cells) >= 3:
                    input_text = cells[0]
                    path = cells[1]
                    reasoning = cells[2]
                    if input_text and path:  # Skip empty rows
                        examples.append((input_text, path, reasoning))

        return examples

    def _parse_descriptions_table(self, body: str) -> dict[str, str]:
        """Parse markdown table into category descriptions dict.

        Expected format:
        | Category | Description |
        |----------|-------------|
        | profile | Personal facts... |

        Args:
            body: Markdown body content.

        Returns:
            Dict mapping category to description.
        """
        descriptions = {}
        lines = body.strip().split("\n")

        in_table = False
        for line in lines:
            line = line.strip()

            # Skip empty lines
            if not line:
                continue

            # Skip header row and separator
            if (
                line.startswith("| Category")
                or line.startswith("|--")
                or line.startswith("| ---")
            ):
                in_table = True
                continue

            # Parse data rows
            if in_table and line.startswith("|") and line.endswith("|"):
                cells = [cell.strip() for cell in line.split("|")[1:-1]]
                if len(cells) >= 2:
                    category = cells[0]
                    description = cells[1]
                    if category and description:
                        descriptions[category] = description

        return descriptions

    def _parse_preset_lists(self, body: str) -> dict[str, list[str]]:
        """Parse markdown lists under ## headers into preset paths.

        Expected format:
        ## profile
        - personal.identity
        - personal.demographics

        Args:
            body: Markdown body content.

        Returns:
            Dict mapping category to list of subcategory.type paths.
        """
        paths: dict[str, list[str]] = {}

        # Split by ## headers
        sections = re.split(r"^## (\w+)\s*$", body, flags=re.MULTILINE)

        # sections[0] is content before first ##, then alternating category/content
        for i in range(1, len(sections), 2):
            if i + 1 >= len(sections):
                break

            category = sections[i].strip()
            content = sections[i + 1]

            # Parse bullet list items
            category_paths = []
            for line in content.split("\n"):
                line = line.strip()
                if line.startswith("- "):
                    path = line[2:].strip()
                    if path:
                        category_paths.append(path)

            if category_paths:
                paths[category] = category_paths

        return paths

    def to_dict(self, data: TaxonomyData) -> dict[str, Any]:
        """Convert TaxonomyData to a dictionary for storage.

        Args:
            data: The taxonomy data to convert.

        Returns:
            Dictionary representation suitable for JSON serialization.
        """
        result: dict[str, Any] = {
            "metadata": {
                "type": data.metadata.type,
                "id": data.metadata.id,
                "name": data.metadata.name,
                "domain": data.metadata.domain,
                "version": data.metadata.version,
                "author": data.metadata.author,
                "description": data.metadata.description,
                "tags": data.metadata.tags,
                "dependencies": data.metadata.dependencies,
            }
        }

        if data.metadata.created:
            result["metadata"]["created"] = data.metadata.created
        if data.metadata.updated:
            result["metadata"]["updated"] = data.metadata.updated
        if data.metadata.taxonomy_version:
            result["metadata"]["taxonomy_version"] = data.metadata.taxonomy_version

        if data.examples is not None:
            result["examples"] = [
                {"input": inp, "path": path, "reasoning": reason}
                for inp, path, reason in data.examples
            ]

        if data.descriptions is not None:
            result["descriptions"] = data.descriptions

        if data.paths is not None:
            result["paths"] = data.paths

        return result

    def from_dict(self, data: dict[str, Any]) -> TaxonomyData:
        """Convert a dictionary back to TaxonomyData.

        Args:
            data: Dictionary from storage.

        Returns:
            TaxonomyData instance.
        """
        meta_dict = data["metadata"]
        metadata = TaxonomyMetadata(
            type=meta_dict["type"],
            id=meta_dict["id"],
            name=meta_dict["name"],
            domain=meta_dict.get("domain", "general"),
            version=meta_dict.get("version", "1.0.0"),
            created=meta_dict.get("created"),
            updated=meta_dict.get("updated"),
            author=meta_dict.get("author", "system"),
            description=meta_dict.get("description", ""),
            tags=meta_dict.get("tags", []),
            dependencies=meta_dict.get("dependencies", []),
            taxonomy_version=meta_dict.get("taxonomy_version"),
        )

        examples = None
        if "examples" in data:
            examples = [
                (e["input"], e["path"], e["reasoning"]) for e in data["examples"]
            ]

        descriptions = data.get("descriptions")
        paths = data.get("paths")

        return TaxonomyData(
            metadata=metadata,
            examples=examples,
            descriptions=descriptions,
            paths=paths,
        )

__init__

__init__(encoding: str = 'utf-8')

Initialize the markdown source parser.

Parameters:

Name Type Description Default
encoding str

File encoding to use when reading files.

'utf-8'
Source code in src/memoir/taxonomy/markdown_source.py
def __init__(self, encoding: str = "utf-8"):
    """Initialize the markdown source parser.

    Args:
        encoding: File encoding to use when reading files.
    """
    self.encoding = encoding

load

load(path: Path) -> TaxonomyData

Load and parse a markdown taxonomy file.

Parameters:

Name Type Description Default
path Path

Path to the markdown file.

required

Returns:

Type Description
TaxonomyData

Parsed TaxonomyData.

Raises:

Type Description
TaxonomyParseError

If the file cannot be parsed.

FileNotFoundError

If the file doesn't exist.

Source code in src/memoir/taxonomy/markdown_source.py
def load(self, path: Path) -> TaxonomyData:
    """Load and parse a markdown taxonomy file.

    Args:
        path: Path to the markdown file.

    Returns:
        Parsed TaxonomyData.

    Raises:
        TaxonomyParseError: If the file cannot be parsed.
        FileNotFoundError: If the file doesn't exist.
    """
    if not path.exists():
        raise FileNotFoundError(f"Taxonomy file not found: {path}")

    content = path.read_text(encoding=self.encoding)
    return self.parse(content)

parse

parse(content: str) -> TaxonomyData

Parse markdown content into TaxonomyData.

Parameters:

Name Type Description Default
content str

Raw markdown content.

required

Returns:

Type Description
TaxonomyData

Parsed TaxonomyData.

Raises:

Type Description
TaxonomyParseError

If the content cannot be parsed.

Source code in src/memoir/taxonomy/markdown_source.py
def parse(self, content: str) -> TaxonomyData:
    """Parse markdown content into TaxonomyData.

    Args:
        content: Raw markdown content.

    Returns:
        Parsed TaxonomyData.

    Raises:
        TaxonomyParseError: If the content cannot be parsed.
    """
    metadata, body = self._split_frontmatter(content)

    if metadata.type == "examples":
        examples = self._parse_examples_tables(body)
        return TaxonomyData(metadata=metadata, examples=examples, raw_content=body)
    elif metadata.type == "descriptions":
        descriptions = self._parse_descriptions_table(body)
        return TaxonomyData(
            metadata=metadata, descriptions=descriptions, raw_content=body
        )
    elif metadata.type == "preset":
        paths = self._parse_preset_lists(body)
        return TaxonomyData(metadata=metadata, paths=paths, raw_content=body)
    else:
        raise TaxonomyParseError(f"Unknown taxonomy type: {metadata.type}")

to_dict

to_dict(data: TaxonomyData) -> dict[str, Any]

Convert TaxonomyData to a dictionary for storage.

Parameters:

Name Type Description Default
data TaxonomyData

The taxonomy data to convert.

required

Returns:

Type Description
dict[str, Any]

Dictionary representation suitable for JSON serialization.

Source code in src/memoir/taxonomy/markdown_source.py
def to_dict(self, data: TaxonomyData) -> dict[str, Any]:
    """Convert TaxonomyData to a dictionary for storage.

    Args:
        data: The taxonomy data to convert.

    Returns:
        Dictionary representation suitable for JSON serialization.
    """
    result: dict[str, Any] = {
        "metadata": {
            "type": data.metadata.type,
            "id": data.metadata.id,
            "name": data.metadata.name,
            "domain": data.metadata.domain,
            "version": data.metadata.version,
            "author": data.metadata.author,
            "description": data.metadata.description,
            "tags": data.metadata.tags,
            "dependencies": data.metadata.dependencies,
        }
    }

    if data.metadata.created:
        result["metadata"]["created"] = data.metadata.created
    if data.metadata.updated:
        result["metadata"]["updated"] = data.metadata.updated
    if data.metadata.taxonomy_version:
        result["metadata"]["taxonomy_version"] = data.metadata.taxonomy_version

    if data.examples is not None:
        result["examples"] = [
            {"input": inp, "path": path, "reasoning": reason}
            for inp, path, reason in data.examples
        ]

    if data.descriptions is not None:
        result["descriptions"] = data.descriptions

    if data.paths is not None:
        result["paths"] = data.paths

    return result

from_dict

from_dict(data: dict[str, Any]) -> TaxonomyData

Convert a dictionary back to TaxonomyData.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary from storage.

required

Returns:

Type Description
TaxonomyData

TaxonomyData instance.

Source code in src/memoir/taxonomy/markdown_source.py
def from_dict(self, data: dict[str, Any]) -> TaxonomyData:
    """Convert a dictionary back to TaxonomyData.

    Args:
        data: Dictionary from storage.

    Returns:
        TaxonomyData instance.
    """
    meta_dict = data["metadata"]
    metadata = TaxonomyMetadata(
        type=meta_dict["type"],
        id=meta_dict["id"],
        name=meta_dict["name"],
        domain=meta_dict.get("domain", "general"),
        version=meta_dict.get("version", "1.0.0"),
        created=meta_dict.get("created"),
        updated=meta_dict.get("updated"),
        author=meta_dict.get("author", "system"),
        description=meta_dict.get("description", ""),
        tags=meta_dict.get("tags", []),
        dependencies=meta_dict.get("dependencies", []),
        taxonomy_version=meta_dict.get("taxonomy_version"),
    )

    examples = None
    if "examples" in data:
        examples = [
            (e["input"], e["path"], e["reasoning"]) for e in data["examples"]
        ]

    descriptions = data.get("descriptions")
    paths = data.get("paths")

    return TaxonomyData(
        metadata=metadata,
        examples=examples,
        descriptions=descriptions,
        paths=paths,
    )

TaxonomyData dataclass

Parsed taxonomy data from markdown.

Source code in src/memoir/taxonomy/markdown_source.py
@dataclass
class TaxonomyData:
    """Parsed taxonomy data from markdown."""

    metadata: TaxonomyMetadata
    examples: list[tuple[str, str, str]] | None = None  # (input, path, reasoning)
    descriptions: dict[str, str] | None = None  # category -> description
    paths: dict[str, list[str]] | None = None  # category -> [subcategory.type, ...]
    raw_content: str = ""

TaxonomyMetadata dataclass

Metadata from taxonomy markdown file frontmatter.

Source code in src/memoir/taxonomy/markdown_source.py
@dataclass
class TaxonomyMetadata:
    """Metadata from taxonomy markdown file frontmatter."""

    type: str  # examples | descriptions | preset
    id: str
    name: str
    domain: str = "general"
    version: str = "1.0.0"
    created: str | None = None
    updated: str | None = None
    author: str = "system"
    description: str = ""
    tags: list[str] = field(default_factory=list)
    dependencies: list[str] = field(default_factory=list)
    taxonomy_version: str | None = None  # For presets (e.g., "simplified")

TaxonomyParseError

Bases: Exception

Error parsing taxonomy markdown file.

Source code in src/memoir/taxonomy/markdown_source.py
class TaxonomyParseError(Exception):
    """Error parsing taxonomy markdown file."""

    pass

TaxonomyEntry dataclass

Entry in the taxonomy registry.

Source code in src/memoir/taxonomy/registry.py
@dataclass
class TaxonomyEntry:
    """Entry in the taxonomy registry."""

    data: TaxonomyData
    source_path: Path | None = None
    is_builtin: bool = True

TaxonomyRegistry

Central registry for managing taxonomy data from multiple sources.

Provides: - Loading from built-in markdown files - Loading from external/user-provided files - Domain-based filtering - Type-based lookup (examples, descriptions, presets) - Merging/combining taxonomy data

Source code in src/memoir/taxonomy/registry.py
class TaxonomyRegistry:
    """
    Central registry for managing taxonomy data from multiple sources.

    Provides:
    - Loading from built-in markdown files
    - Loading from external/user-provided files
    - Domain-based filtering
    - Type-based lookup (examples, descriptions, presets)
    - Merging/combining taxonomy data
    """

    def __init__(self):
        """Initialize an empty registry."""
        self._entries: dict[str, TaxonomyEntry] = {}
        self._by_type: dict[str, list[str]] = {
            "examples": [],
            "descriptions": [],
            "preset": [],
        }
        self._by_domain: dict[str, list[str]] = {}
        self._parser = MarkdownTaxonomySource()
        self._builtin_path = Path(__file__).parent / "data"

    def load_builtin(self) -> list[str]:
        """Load all built-in taxonomy markdown files.

        Returns:
            List of loaded taxonomy IDs.
        """
        loaded_ids = []

        if not self._builtin_path.exists():
            logger.warning(f"Built-in taxonomy path not found: {self._builtin_path}")
            return loaded_ids

        for md_file in self._builtin_path.rglob("*.md"):
            if md_file.name == "README.md":
                continue
            try:
                taxonomy_id = self._load_file(md_file, is_builtin=True)
                loaded_ids.append(taxonomy_id)
                logger.debug(f"Loaded builtin taxonomy: {taxonomy_id} from {md_file}")
            except (TaxonomyParseError, FileNotFoundError) as e:
                logger.error(f"Failed to load {md_file}: {e}")

        return loaded_ids

    def load_external(self, path: Path | str) -> str:
        """Load an external taxonomy file.

        Args:
            path: Path to markdown file.

        Returns:
            ID of the loaded taxonomy.

        Raises:
            TaxonomyParseError: If the file cannot be parsed.
            FileNotFoundError: If the file doesn't exist.
        """
        path = Path(path)
        return self._load_file(path, is_builtin=False)

    def _load_file(self, path: Path, is_builtin: bool) -> str:
        """Load a single taxonomy file.

        Args:
            path: Path to the markdown file.
            is_builtin: Whether this is a builtin file.

        Returns:
            ID of the loaded taxonomy.
        """
        data = self._parser.load(path)

        entry = TaxonomyEntry(data=data, source_path=path, is_builtin=is_builtin)

        taxonomy_id = data.metadata.id
        self._entries[taxonomy_id] = entry

        # Update type index
        taxonomy_type = data.metadata.type
        if taxonomy_type not in self._by_type:
            self._by_type[taxonomy_type] = []
        if taxonomy_id not in self._by_type[taxonomy_type]:
            self._by_type[taxonomy_type].append(taxonomy_id)

        # Update domain index
        domain = data.metadata.domain
        if domain not in self._by_domain:
            self._by_domain[domain] = []
        if taxonomy_id not in self._by_domain[domain]:
            self._by_domain[domain].append(taxonomy_id)

        return taxonomy_id

    def get(self, taxonomy_id: str) -> TaxonomyData | None:
        """Get taxonomy data by ID.

        Args:
            taxonomy_id: The taxonomy ID to look up.

        Returns:
            TaxonomyData if found, None otherwise.
        """
        entry = self._entries.get(taxonomy_id)
        return entry.data if entry else None

    def get_entry(self, taxonomy_id: str) -> TaxonomyEntry | None:
        """Get full taxonomy entry by ID.

        Args:
            taxonomy_id: The taxonomy ID to look up.

        Returns:
            TaxonomyEntry if found, None otherwise.
        """
        return self._entries.get(taxonomy_id)

    def get_by_type(
        self, taxonomy_type: str, domain: str | None = None
    ) -> list[TaxonomyData]:
        """Get all taxonomy data of a specific type.

        Args:
            taxonomy_type: Type to filter by (examples, descriptions, preset).
            domain: Optional domain to filter by.

        Returns:
            List of matching TaxonomyData.
        """
        ids = self._by_type.get(taxonomy_type, [])
        if domain:
            domain_ids = set(self._by_domain.get(domain, []))
            ids = [tid for tid in ids if tid in domain_ids]
        return [self._entries[tid].data for tid in ids if tid in self._entries]

    def get_combined_examples(
        self, domain: str | None = None
    ) -> list[tuple[str, str, str]]:
        """Get all examples combined, optionally filtered by domain.

        Args:
            domain: Optional domain to filter by. If None, uses "general".

        Returns:
            List of (input_text, path, reasoning) tuples.
        """
        examples: list[tuple[str, str, str]] = []

        # Load general first if no specific domain or if domain is different
        if domain is None or domain == "general":
            for data in self.get_by_type("examples", "general"):
                if data.examples:
                    examples.extend(data.examples)
        elif domain != "general":
            # Load general first, then domain-specific
            for data in self.get_by_type("examples", "general"):
                if data.examples:
                    examples.extend(data.examples)
            for data in self.get_by_type("examples", domain):
                if data.examples:
                    examples.extend(data.examples)

        return examples

    def get_combined_descriptions(self, domain: str | None = None) -> dict[str, str]:
        """Get all descriptions merged, domain-specific overriding general.

        Args:
            domain: Optional domain to filter by. If None, uses "general".

        Returns:
            Dict mapping category to description.
        """
        descriptions: dict[str, str] = {}

        # Load general first
        for data in self.get_by_type("descriptions", "general"):
            if data.descriptions:
                descriptions.update(data.descriptions)

        # Then domain-specific (if different from general)
        if domain and domain != "general":
            for data in self.get_by_type("descriptions", domain):
                if data.descriptions:
                    descriptions.update(data.descriptions)

        return descriptions

    def get_combined_paths(
        self, preset_id: str | None = None, domain: str | None = None
    ) -> dict[str, list[str]]:
        """Get preset paths, optionally filtered by preset ID or domain.

        Args:
            preset_id: Specific preset ID to load.
            domain: Domain to filter by.

        Returns:
            Dict mapping category to list of paths.
        """
        if preset_id:
            data = self.get(preset_id)
            if data and data.paths:
                return data.paths
            return {}

        # Combine all presets for domain
        paths: dict[str, list[str]] = {}
        presets = self.get_by_type("preset", domain or "general")
        for data in presets:
            if data.paths:
                for category, category_paths in data.paths.items():
                    if category not in paths:
                        paths[category] = []
                    paths[category].extend(category_paths)

        return paths

    def list_ids(self) -> list[str]:
        """List all registered taxonomy IDs.

        Returns:
            List of taxonomy IDs.
        """
        return list(self._entries.keys())

    def list_domains(self) -> list[str]:
        """List all available domains.

        Returns:
            List of domain names.
        """
        return list(self._by_domain.keys())

    def list_by_type(self, taxonomy_type: str) -> list[str]:
        """List taxonomy IDs by type.

        Args:
            taxonomy_type: The type to list (examples, descriptions, preset).

        Returns:
            List of taxonomy IDs of that type.
        """
        return list(self._by_type.get(taxonomy_type, []))

    def remove(self, taxonomy_id: str) -> bool:
        """Remove a taxonomy entry from the registry.

        Args:
            taxonomy_id: The taxonomy ID to remove.

        Returns:
            True if removed, False if not found.
        """
        if taxonomy_id not in self._entries:
            return False

        entry = self._entries[taxonomy_id]
        taxonomy_type = entry.data.metadata.type
        domain = entry.data.metadata.domain

        # Remove from type index
        if taxonomy_type in self._by_type:
            self._by_type[taxonomy_type] = [
                tid for tid in self._by_type[taxonomy_type] if tid != taxonomy_id
            ]

        # Remove from domain index
        if domain in self._by_domain:
            self._by_domain[domain] = [
                tid for tid in self._by_domain[domain] if tid != taxonomy_id
            ]

        # Remove entry
        del self._entries[taxonomy_id]
        return True

    def clear(self) -> None:
        """Clear all entries from the registry."""
        self._entries.clear()
        self._by_type = {"examples": [], "descriptions": [], "preset": []}
        self._by_domain = {}

    def __len__(self) -> int:
        """Return the number of entries in the registry."""
        return len(self._entries)

    def __contains__(self, taxonomy_id: str) -> bool:
        """Check if a taxonomy ID is in the registry."""
        return taxonomy_id in self._entries

__init__

__init__()

Initialize an empty registry.

Source code in src/memoir/taxonomy/registry.py
def __init__(self):
    """Initialize an empty registry."""
    self._entries: dict[str, TaxonomyEntry] = {}
    self._by_type: dict[str, list[str]] = {
        "examples": [],
        "descriptions": [],
        "preset": [],
    }
    self._by_domain: dict[str, list[str]] = {}
    self._parser = MarkdownTaxonomySource()
    self._builtin_path = Path(__file__).parent / "data"

load_builtin

load_builtin() -> list[str]

Load all built-in taxonomy markdown files.

Returns:

Type Description
list[str]

List of loaded taxonomy IDs.

Source code in src/memoir/taxonomy/registry.py
def load_builtin(self) -> list[str]:
    """Load all built-in taxonomy markdown files.

    Returns:
        List of loaded taxonomy IDs.
    """
    loaded_ids = []

    if not self._builtin_path.exists():
        logger.warning(f"Built-in taxonomy path not found: {self._builtin_path}")
        return loaded_ids

    for md_file in self._builtin_path.rglob("*.md"):
        if md_file.name == "README.md":
            continue
        try:
            taxonomy_id = self._load_file(md_file, is_builtin=True)
            loaded_ids.append(taxonomy_id)
            logger.debug(f"Loaded builtin taxonomy: {taxonomy_id} from {md_file}")
        except (TaxonomyParseError, FileNotFoundError) as e:
            logger.error(f"Failed to load {md_file}: {e}")

    return loaded_ids

load_external

load_external(path: Path | str) -> str

Load an external taxonomy file.

Parameters:

Name Type Description Default
path Path | str

Path to markdown file.

required

Returns:

Type Description
str

ID of the loaded taxonomy.

Raises:

Type Description
TaxonomyParseError

If the file cannot be parsed.

FileNotFoundError

If the file doesn't exist.

Source code in src/memoir/taxonomy/registry.py
def load_external(self, path: Path | str) -> str:
    """Load an external taxonomy file.

    Args:
        path: Path to markdown file.

    Returns:
        ID of the loaded taxonomy.

    Raises:
        TaxonomyParseError: If the file cannot be parsed.
        FileNotFoundError: If the file doesn't exist.
    """
    path = Path(path)
    return self._load_file(path, is_builtin=False)

get

get(taxonomy_id: str) -> TaxonomyData | None

Get taxonomy data by ID.

Parameters:

Name Type Description Default
taxonomy_id str

The taxonomy ID to look up.

required

Returns:

Type Description
TaxonomyData | None

TaxonomyData if found, None otherwise.

Source code in src/memoir/taxonomy/registry.py
def get(self, taxonomy_id: str) -> TaxonomyData | None:
    """Get taxonomy data by ID.

    Args:
        taxonomy_id: The taxonomy ID to look up.

    Returns:
        TaxonomyData if found, None otherwise.
    """
    entry = self._entries.get(taxonomy_id)
    return entry.data if entry else None

get_entry

get_entry(taxonomy_id: str) -> TaxonomyEntry | None

Get full taxonomy entry by ID.

Parameters:

Name Type Description Default
taxonomy_id str

The taxonomy ID to look up.

required

Returns:

Type Description
TaxonomyEntry | None

TaxonomyEntry if found, None otherwise.

Source code in src/memoir/taxonomy/registry.py
def get_entry(self, taxonomy_id: str) -> TaxonomyEntry | None:
    """Get full taxonomy entry by ID.

    Args:
        taxonomy_id: The taxonomy ID to look up.

    Returns:
        TaxonomyEntry if found, None otherwise.
    """
    return self._entries.get(taxonomy_id)

get_by_type

get_by_type(taxonomy_type: str, domain: str | None = None) -> list[TaxonomyData]

Get all taxonomy data of a specific type.

Parameters:

Name Type Description Default
taxonomy_type str

Type to filter by (examples, descriptions, preset).

required
domain str | None

Optional domain to filter by.

None

Returns:

Type Description
list[TaxonomyData]

List of matching TaxonomyData.

Source code in src/memoir/taxonomy/registry.py
def get_by_type(
    self, taxonomy_type: str, domain: str | None = None
) -> list[TaxonomyData]:
    """Get all taxonomy data of a specific type.

    Args:
        taxonomy_type: Type to filter by (examples, descriptions, preset).
        domain: Optional domain to filter by.

    Returns:
        List of matching TaxonomyData.
    """
    ids = self._by_type.get(taxonomy_type, [])
    if domain:
        domain_ids = set(self._by_domain.get(domain, []))
        ids = [tid for tid in ids if tid in domain_ids]
    return [self._entries[tid].data for tid in ids if tid in self._entries]

get_combined_examples

get_combined_examples(domain: str | None = None) -> list[tuple[str, str, str]]

Get all examples combined, optionally filtered by domain.

Parameters:

Name Type Description Default
domain str | None

Optional domain to filter by. If None, uses "general".

None

Returns:

Type Description
list[tuple[str, str, str]]

List of (input_text, path, reasoning) tuples.

Source code in src/memoir/taxonomy/registry.py
def get_combined_examples(
    self, domain: str | None = None
) -> list[tuple[str, str, str]]:
    """Get all examples combined, optionally filtered by domain.

    Args:
        domain: Optional domain to filter by. If None, uses "general".

    Returns:
        List of (input_text, path, reasoning) tuples.
    """
    examples: list[tuple[str, str, str]] = []

    # Load general first if no specific domain or if domain is different
    if domain is None or domain == "general":
        for data in self.get_by_type("examples", "general"):
            if data.examples:
                examples.extend(data.examples)
    elif domain != "general":
        # Load general first, then domain-specific
        for data in self.get_by_type("examples", "general"):
            if data.examples:
                examples.extend(data.examples)
        for data in self.get_by_type("examples", domain):
            if data.examples:
                examples.extend(data.examples)

    return examples

get_combined_descriptions

get_combined_descriptions(domain: str | None = None) -> dict[str, str]

Get all descriptions merged, domain-specific overriding general.

Parameters:

Name Type Description Default
domain str | None

Optional domain to filter by. If None, uses "general".

None

Returns:

Type Description
dict[str, str]

Dict mapping category to description.

Source code in src/memoir/taxonomy/registry.py
def get_combined_descriptions(self, domain: str | None = None) -> dict[str, str]:
    """Get all descriptions merged, domain-specific overriding general.

    Args:
        domain: Optional domain to filter by. If None, uses "general".

    Returns:
        Dict mapping category to description.
    """
    descriptions: dict[str, str] = {}

    # Load general first
    for data in self.get_by_type("descriptions", "general"):
        if data.descriptions:
            descriptions.update(data.descriptions)

    # Then domain-specific (if different from general)
    if domain and domain != "general":
        for data in self.get_by_type("descriptions", domain):
            if data.descriptions:
                descriptions.update(data.descriptions)

    return descriptions

get_combined_paths

get_combined_paths(preset_id: str | None = None, domain: str | None = None) -> dict[str, list[str]]

Get preset paths, optionally filtered by preset ID or domain.

Parameters:

Name Type Description Default
preset_id str | None

Specific preset ID to load.

None
domain str | None

Domain to filter by.

None

Returns:

Type Description
dict[str, list[str]]

Dict mapping category to list of paths.

Source code in src/memoir/taxonomy/registry.py
def get_combined_paths(
    self, preset_id: str | None = None, domain: str | None = None
) -> dict[str, list[str]]:
    """Get preset paths, optionally filtered by preset ID or domain.

    Args:
        preset_id: Specific preset ID to load.
        domain: Domain to filter by.

    Returns:
        Dict mapping category to list of paths.
    """
    if preset_id:
        data = self.get(preset_id)
        if data and data.paths:
            return data.paths
        return {}

    # Combine all presets for domain
    paths: dict[str, list[str]] = {}
    presets = self.get_by_type("preset", domain or "general")
    for data in presets:
        if data.paths:
            for category, category_paths in data.paths.items():
                if category not in paths:
                    paths[category] = []
                paths[category].extend(category_paths)

    return paths

list_ids

list_ids() -> list[str]

List all registered taxonomy IDs.

Returns:

Type Description
list[str]

List of taxonomy IDs.

Source code in src/memoir/taxonomy/registry.py
def list_ids(self) -> list[str]:
    """List all registered taxonomy IDs.

    Returns:
        List of taxonomy IDs.
    """
    return list(self._entries.keys())

list_domains

list_domains() -> list[str]

List all available domains.

Returns:

Type Description
list[str]

List of domain names.

Source code in src/memoir/taxonomy/registry.py
def list_domains(self) -> list[str]:
    """List all available domains.

    Returns:
        List of domain names.
    """
    return list(self._by_domain.keys())

list_by_type

list_by_type(taxonomy_type: str) -> list[str]

List taxonomy IDs by type.

Parameters:

Name Type Description Default
taxonomy_type str

The type to list (examples, descriptions, preset).

required

Returns:

Type Description
list[str]

List of taxonomy IDs of that type.

Source code in src/memoir/taxonomy/registry.py
def list_by_type(self, taxonomy_type: str) -> list[str]:
    """List taxonomy IDs by type.

    Args:
        taxonomy_type: The type to list (examples, descriptions, preset).

    Returns:
        List of taxonomy IDs of that type.
    """
    return list(self._by_type.get(taxonomy_type, []))

remove

remove(taxonomy_id: str) -> bool

Remove a taxonomy entry from the registry.

Parameters:

Name Type Description Default
taxonomy_id str

The taxonomy ID to remove.

required

Returns:

Type Description
bool

True if removed, False if not found.

Source code in src/memoir/taxonomy/registry.py
def remove(self, taxonomy_id: str) -> bool:
    """Remove a taxonomy entry from the registry.

    Args:
        taxonomy_id: The taxonomy ID to remove.

    Returns:
        True if removed, False if not found.
    """
    if taxonomy_id not in self._entries:
        return False

    entry = self._entries[taxonomy_id]
    taxonomy_type = entry.data.metadata.type
    domain = entry.data.metadata.domain

    # Remove from type index
    if taxonomy_type in self._by_type:
        self._by_type[taxonomy_type] = [
            tid for tid in self._by_type[taxonomy_type] if tid != taxonomy_id
        ]

    # Remove from domain index
    if domain in self._by_domain:
        self._by_domain[domain] = [
            tid for tid in self._by_domain[domain] if tid != taxonomy_id
        ]

    # Remove entry
    del self._entries[taxonomy_id]
    return True

clear

clear() -> None

Clear all entries from the registry.

Source code in src/memoir/taxonomy/registry.py
def clear(self) -> None:
    """Clear all entries from the registry."""
    self._entries.clear()
    self._by_type = {"examples": [], "descriptions": [], "preset": []}
    self._by_domain = {}

__len__

__len__() -> int

Return the number of entries in the registry.

Source code in src/memoir/taxonomy/registry.py
def __len__(self) -> int:
    """Return the number of entries in the registry."""
    return len(self._entries)

__contains__

__contains__(taxonomy_id: str) -> bool

Check if a taxonomy ID is in the registry.

Source code in src/memoir/taxonomy/registry.py
def __contains__(self, taxonomy_id: str) -> bool:
    """Check if a taxonomy ID is in the registry."""
    return taxonomy_id in self._entries

SemanticTaxonomy

Bases: BaseTaxonomy

Fixed semantic taxonomy with predefined paths. Provides hierarchical organization for AI memory classification. Implements TaxonomyInterface for standardized access.

Source code in src/memoir/taxonomy/semantic.py
class SemanticTaxonomy(BaseTaxonomy):
    """
    Fixed semantic taxonomy with predefined paths.
    Provides hierarchical organization for AI memory classification.
    Implements TaxonomyInterface for standardized access.
    """

    def __init__(self, taxonomy_loader: Any | None = None):
        """
        Initialize semantic taxonomy with flexible data loading.

        Args:
            taxonomy_loader: Optional TaxonomyLoader for loading taxonomy from store.
                            If None, uses TaxonomyPresets as fallback.
        """
        self._taxonomy_loader = taxonomy_loader
        self._all_paths = self._load_all_paths()
        self._path_index = self._build_path_index()

    def _load_all_paths(self) -> set[str]:
        """
        Load all paths from TaxonomyLoader or fallback to TaxonomyPresets.

        Returns:
            Set of all valid taxonomy paths.
        """
        paths = set()

        # Try to load from TaxonomyLoader (store-based)
        if self._taxonomy_loader:
            try:
                preset_paths = self._taxonomy_loader.get_preset_paths_from_store()
                if preset_paths:
                    for category, category_paths in preset_paths.items():
                        # Add the category itself
                        paths.add(category)
                        for path in category_paths:
                            full_path = f"{category}.{path}"
                            paths.add(full_path)
                            # Also add intermediate paths
                            parts = full_path.split(".")
                            for i in range(1, len(parts)):
                                paths.add(".".join(parts[:i]))
                    logger.debug(
                        f"[SemanticTaxonomy] Loaded {len(paths)} paths from store"
                    )
                    return paths
            except Exception as e:
                logger.warning(
                    f"[SemanticTaxonomy] Failed to load from store, using fallback: {e}"
                )

        # Fallback to TaxonomyPresets
        from .taxonomy import TaxonomyPresets, TaxonomyVersion

        preset_paths = TaxonomyPresets.PRESETS[TaxonomyVersion.SIMPLIFIED]
        for category, category_paths in preset_paths.items():
            # Add the category itself
            paths.add(category)
            for path in category_paths:
                full_path = f"{category}.{path}"
                paths.add(full_path)
                # Also add intermediate paths
                parts = full_path.split(".")
                for i in range(1, len(parts)):
                    paths.add(".".join(parts[:i]))

        logger.debug(
            f"[SemanticTaxonomy] Loaded {len(paths)} paths from TaxonomyPresets"
        )
        return paths

    def _build_path_index(self) -> dict[str, list[str]]:
        """Build an index for efficient path lookups."""
        index = {}
        for path in self._all_paths:
            parts = path.split(".")
            for i in range(len(parts)):
                prefix = ".".join(parts[: i + 1])
                if prefix not in index:
                    index[prefix] = []
                if path != prefix:
                    index[prefix].append(path)
        return index

    def get_all_paths(self) -> list[str]:
        """Return all valid taxonomy paths."""
        return sorted(self._all_paths)

    def get_children(self, path: str) -> list[str]:
        """Get immediate children of a path."""
        if path not in self._path_index:
            return []

        children = []
        path_depth = len(path.split("."))
        for child in self._path_index[path]:
            if len(child.split(".")) == path_depth + 1:
                children.append(child)
        return sorted(children)

    def get_descendants(self, path: str) -> list[str]:
        """Get all descendants of a path."""
        if path not in self._path_index:
            return []
        return sorted(self._path_index[path])

    def is_valid_path(self, path: str) -> bool:
        """Check if a path exists in the taxonomy."""
        return path in self._all_paths

    def get_path_depth(self, path: str) -> int:
        """Get the depth of a path in the hierarchy."""
        return len(path.split("."))

    def get_category(self, path: str) -> TaxonomyCategory:
        """Get the top-level category for a path."""
        if not path:
            return None
        root = path.split(".")[0]
        try:
            return TaxonomyCategory(root)
        except ValueError:
            return None

    def get_related_paths(self, path: str, max_distance: int = 2) -> list[str]:
        """Get paths related to the given path within a certain distance."""
        if not self.is_valid_path(path):
            return []

        related = set()
        parts = path.split(".")

        # Get siblings
        if len(parts) > 1:
            parent = ".".join(parts[:-1])
            related.update(self.get_children(parent))

        # Get ancestors up to max_distance
        for i in range(1, min(max_distance + 1, len(parts))):
            ancestor = ".".join(parts[:-i])
            related.add(ancestor)

        # Get descendants up to max_distance
        if max_distance > 0:
            descendants = self.get_descendants(path)
            for desc in descendants:
                if (
                    self.get_path_depth(desc) - self.get_path_depth(path)
                    <= max_distance
                ):
                    related.add(desc)

        related.discard(path)  # Remove the path itself
        return sorted(related)

    def get_statistics(self) -> dict:
        """Get statistics about the taxonomy."""
        category_counts = {}
        depth_counts = {}

        for path in self._all_paths:
            category = self.get_category(path)
            if category:
                cat_name = category.value
                category_counts[cat_name] = category_counts.get(cat_name, 0) + 1

            depth = self.get_path_depth(path)
            depth_counts[depth] = depth_counts.get(depth, 0) + 1

        return {
            "total_paths": len(self._all_paths),
            "categories": len(list(TaxonomyCategory)),
            "max_depth": max(depth_counts.keys()),
            "paths_by_category": category_counts,
            "paths_by_depth": depth_counts,
        }

__init__

__init__(taxonomy_loader: Any | None = None)

Initialize semantic taxonomy with flexible data loading.

Parameters:

Name Type Description Default
taxonomy_loader Any | None

Optional TaxonomyLoader for loading taxonomy from store. If None, uses TaxonomyPresets as fallback.

None
Source code in src/memoir/taxonomy/semantic.py
def __init__(self, taxonomy_loader: Any | None = None):
    """
    Initialize semantic taxonomy with flexible data loading.

    Args:
        taxonomy_loader: Optional TaxonomyLoader for loading taxonomy from store.
                        If None, uses TaxonomyPresets as fallback.
    """
    self._taxonomy_loader = taxonomy_loader
    self._all_paths = self._load_all_paths()
    self._path_index = self._build_path_index()

get_all_paths

get_all_paths() -> list[str]

Return all valid taxonomy paths.

Source code in src/memoir/taxonomy/semantic.py
def get_all_paths(self) -> list[str]:
    """Return all valid taxonomy paths."""
    return sorted(self._all_paths)

get_children

get_children(path: str) -> list[str]

Get immediate children of a path.

Source code in src/memoir/taxonomy/semantic.py
def get_children(self, path: str) -> list[str]:
    """Get immediate children of a path."""
    if path not in self._path_index:
        return []

    children = []
    path_depth = len(path.split("."))
    for child in self._path_index[path]:
        if len(child.split(".")) == path_depth + 1:
            children.append(child)
    return sorted(children)

get_descendants

get_descendants(path: str) -> list[str]

Get all descendants of a path.

Source code in src/memoir/taxonomy/semantic.py
def get_descendants(self, path: str) -> list[str]:
    """Get all descendants of a path."""
    if path not in self._path_index:
        return []
    return sorted(self._path_index[path])

is_valid_path

is_valid_path(path: str) -> bool

Check if a path exists in the taxonomy.

Source code in src/memoir/taxonomy/semantic.py
def is_valid_path(self, path: str) -> bool:
    """Check if a path exists in the taxonomy."""
    return path in self._all_paths

get_path_depth

get_path_depth(path: str) -> int

Get the depth of a path in the hierarchy.

Source code in src/memoir/taxonomy/semantic.py
def get_path_depth(self, path: str) -> int:
    """Get the depth of a path in the hierarchy."""
    return len(path.split("."))

get_category

get_category(path: str) -> TaxonomyCategory

Get the top-level category for a path.

Source code in src/memoir/taxonomy/semantic.py
def get_category(self, path: str) -> TaxonomyCategory:
    """Get the top-level category for a path."""
    if not path:
        return None
    root = path.split(".")[0]
    try:
        return TaxonomyCategory(root)
    except ValueError:
        return None
get_related_paths(path: str, max_distance: int = 2) -> list[str]

Get paths related to the given path within a certain distance.

Source code in src/memoir/taxonomy/semantic.py
def get_related_paths(self, path: str, max_distance: int = 2) -> list[str]:
    """Get paths related to the given path within a certain distance."""
    if not self.is_valid_path(path):
        return []

    related = set()
    parts = path.split(".")

    # Get siblings
    if len(parts) > 1:
        parent = ".".join(parts[:-1])
        related.update(self.get_children(parent))

    # Get ancestors up to max_distance
    for i in range(1, min(max_distance + 1, len(parts))):
        ancestor = ".".join(parts[:-i])
        related.add(ancestor)

    # Get descendants up to max_distance
    if max_distance > 0:
        descendants = self.get_descendants(path)
        for desc in descendants:
            if (
                self.get_path_depth(desc) - self.get_path_depth(path)
                <= max_distance
            ):
                related.add(desc)

    related.discard(path)  # Remove the path itself
    return sorted(related)

get_statistics

get_statistics() -> dict

Get statistics about the taxonomy.

Source code in src/memoir/taxonomy/semantic.py
def get_statistics(self) -> dict:
    """Get statistics about the taxonomy."""
    category_counts = {}
    depth_counts = {}

    for path in self._all_paths:
        category = self.get_category(path)
        if category:
            cat_name = category.value
            category_counts[cat_name] = category_counts.get(cat_name, 0) + 1

        depth = self.get_path_depth(path)
        depth_counts[depth] = depth_counts.get(depth, 0) + 1

    return {
        "total_paths": len(self._all_paths),
        "categories": len(list(TaxonomyCategory)),
        "max_depth": max(depth_counts.keys()),
        "paths_by_category": category_counts,
        "paths_by_depth": depth_counts,
    }

TaxonomyCategory

Bases: Enum

Top-level taxonomy categories.

Source code in src/memoir/taxonomy/semantic.py
class TaxonomyCategory(Enum):
    """Top-level taxonomy categories."""

    PROFILE = "profile"
    PREFERENCES = "preferences"
    EXPERIENCE = "experience"
    CONTEXT = "context"
    KNOWLEDGE = "knowledge"
    RELATIONSHIPS = "relationships"
    GOALS = "goals"
    BEHAVIOR = "behavior"

TaxonomyPresets

Minimal fallback taxonomy data.

WARNING: This is fallback data only. Use TaxonomyLoader for full taxonomy. See module docstring for details.

Source code in src/memoir/taxonomy/taxonomy.py
class TaxonomyPresets:
    """
    Minimal fallback taxonomy data.

    WARNING: This is fallback data only. Use TaxonomyLoader for full taxonomy.
    See module docstring for details.
    """

    # ==========================================================================
    # FALLBACK CLASSIFICATION EXAMPLES (minimal set)
    # Full examples are in: src/memoir/taxonomy/data/general/examples.md
    # ==========================================================================
    CLASSIFICATION_EXAMPLES: ClassVar[list[tuple[str, str, str]]] = [
        # Profile
        ("My name is Sarah", "profile.personal.identity", "identity"),
        ("I work as a software engineer", "profile.professional.occupation", "job"),
        # Preferences
        ("I prefer VS Code", "preferences.tools.editors", "tool preference"),
        ("I like Python", "preferences.coding.languages", "language preference"),
        # Context
        ("We use PostgreSQL", "context.project.database", "project context"),
        ("Our team does standups daily", "context.team.meetings", "team context"),
        # Experience
        ("I worked at Google for 3 years", "experience.work.jobs", "work history"),
        ("I built a REST API last month", "experience.work.projects", "project"),
        # Goals
        ("I want to learn Rust", "goals.learning.skills", "learning goal"),
        ("I aim to become a tech lead", "goals.career.advancement", "career goal"),
        # Relationships
        ("My manager is John", "relationships.professional.manager", "work relation"),
        ("I mentor two junior devs", "relationships.professional.mentees", "mentoring"),
        # Knowledge
        (
            "Python uses indentation for blocks",
            "knowledge.technical.languages",
            "tech fact",
        ),
        ("REST APIs use HTTP methods", "knowledge.technical.architecture", "tech fact"),
        # Behavior
        ("I usually code in the morning", "behavior.work.schedule", "work pattern"),
        ("I review PRs before lunch", "behavior.work.practices", "work habit"),
    ]

    # ==========================================================================
    # FALLBACK CATEGORY DESCRIPTIONS (8 main categories)
    # Full descriptions are in: src/memoir/taxonomy/data/general/descriptions.md
    # ==========================================================================
    CATEGORY_DESCRIPTIONS: ClassVar[dict[str, str]] = {
        "profile": "Personal facts: identity, demographics, job, education, skills",
        "preferences": "Likes/dislikes: tools, languages, frameworks, work style",
        "context": "Project/team info: tech stack, infrastructure, team roles",
        "experience": "Past events: work history, projects, achievements",
        "goals": "Aspirations: career, learning, projects, personal growth",
        "relationships": "People: colleagues, manager, mentors, mentees",
        "knowledge": "Facts learned: technical concepts, domain knowledge",
        "behavior": "Patterns: work habits, routines, practices",
    }

    # ==========================================================================
    # FALLBACK PRESET PATHS (minimal set for each category)
    # Full paths are in: src/memoir/taxonomy/data/general/presets.md
    # ==========================================================================
    PRESETS: ClassVar[dict[TaxonomyVersion, dict[str, list[str]]]] = {
        TaxonomyVersion.SIMPLIFIED: {
            "profile": [
                "personal.identity",
                "personal.demographics",
                "personal.location",
                "professional.occupation",
                "professional.education",
                "professional.skills",
            ],
            "preferences": [
                "tools.editors",
                "tools.testing",
                "coding.languages",
                "coding.frameworks",
                "work.environment",
                "work.schedule",
            ],
            "context": [
                "project.stack",
                "project.repository",
                "project.database",
                "team.methodology",
                "team.meetings",
                "team.roles",
            ],
            "experience": [
                "work.jobs",
                "work.projects",
                "education.schools",
                "education.courses",
            ],
            "goals": [
                "career.advancement",
                "career.skills",
                "learning.skills",
                "learning.certifications",
            ],
            "relationships": [
                "professional.manager",
                "professional.colleagues",
                "professional.mentees",
                "personal.family",
            ],
            "knowledge": [
                "technical.languages",
                "technical.architecture",
                "domain.business",
                "domain.industry",
            ],
            "behavior": [
                "work.schedule",
                "work.practices",
                "coding.habits",
                "communication.style",
            ],
        }
    }

    def get_paths_for_category(
        self, version: TaxonomyVersion, category: str
    ) -> list[str]:
        """Get all paths for a specific category."""
        if version not in self.PRESETS:
            raise ValueError(f"Unknown taxonomy version: {version}")

        category_paths = self.PRESETS[version].get(category, [])
        return [f"{category}.{path}" for path in category_paths]

    def get_all_paths(self, version: TaxonomyVersion) -> list[str]:
        """Get all taxonomy paths for a version."""
        if version not in self.PRESETS:
            raise ValueError(f"Unknown taxonomy version: {version}")

        all_paths = []
        for category, paths in self.PRESETS[version].items():
            for path in paths:
                full_path = f"{category}.{path}"
                all_paths.append(full_path)

        return sorted(all_paths)

    @classmethod
    def get_preset(cls, version: TaxonomyVersion) -> dict[str, list[str]]:
        """Get a taxonomy preset for a specific version."""
        return cls.PRESETS.get(version, cls.PRESETS[TaxonomyVersion.SIMPLIFIED]).copy()

    @classmethod
    def get_first_level_categories(cls, version: TaxonomyVersion) -> list[str]:
        """Get only the first-level categories for a taxonomy version."""
        preset = cls.get_preset(version)
        return list(preset.keys())

    @classmethod
    def list_versions(cls) -> list[TaxonomyVersion]:
        """List all available taxonomy versions."""
        return list(cls.PRESETS.keys())

get_paths_for_category

get_paths_for_category(version: TaxonomyVersion, category: str) -> list[str]

Get all paths for a specific category.

Source code in src/memoir/taxonomy/taxonomy.py
def get_paths_for_category(
    self, version: TaxonomyVersion, category: str
) -> list[str]:
    """Get all paths for a specific category."""
    if version not in self.PRESETS:
        raise ValueError(f"Unknown taxonomy version: {version}")

    category_paths = self.PRESETS[version].get(category, [])
    return [f"{category}.{path}" for path in category_paths]

get_all_paths

get_all_paths(version: TaxonomyVersion) -> list[str]

Get all taxonomy paths for a version.

Source code in src/memoir/taxonomy/taxonomy.py
def get_all_paths(self, version: TaxonomyVersion) -> list[str]:
    """Get all taxonomy paths for a version."""
    if version not in self.PRESETS:
        raise ValueError(f"Unknown taxonomy version: {version}")

    all_paths = []
    for category, paths in self.PRESETS[version].items():
        for path in paths:
            full_path = f"{category}.{path}"
            all_paths.append(full_path)

    return sorted(all_paths)

get_preset classmethod

get_preset(version: TaxonomyVersion) -> dict[str, list[str]]

Get a taxonomy preset for a specific version.

Source code in src/memoir/taxonomy/taxonomy.py
@classmethod
def get_preset(cls, version: TaxonomyVersion) -> dict[str, list[str]]:
    """Get a taxonomy preset for a specific version."""
    return cls.PRESETS.get(version, cls.PRESETS[TaxonomyVersion.SIMPLIFIED]).copy()

get_first_level_categories classmethod

get_first_level_categories(version: TaxonomyVersion) -> list[str]

Get only the first-level categories for a taxonomy version.

Source code in src/memoir/taxonomy/taxonomy.py
@classmethod
def get_first_level_categories(cls, version: TaxonomyVersion) -> list[str]:
    """Get only the first-level categories for a taxonomy version."""
    preset = cls.get_preset(version)
    return list(preset.keys())

list_versions classmethod

list_versions() -> list[TaxonomyVersion]

List all available taxonomy versions.

Source code in src/memoir/taxonomy/taxonomy.py
@classmethod
def list_versions(cls) -> list[TaxonomyVersion]:
    """List all available taxonomy versions."""
    return list(cls.PRESETS.keys())

TaxonomyVersion

Bases: Enum

Available taxonomy versions.

Source code in src/memoir/taxonomy/taxonomy.py
class TaxonomyVersion(Enum):
    """Available taxonomy versions."""

    GENERAL = "general"
    SIMPLIFIED = "simplified"

get_taxonomy

get_taxonomy() -> SemanticTaxonomy

Get the thread-safe singleton taxonomy instance.

Source code in src/memoir/taxonomy/semantic.py
def get_taxonomy() -> SemanticTaxonomy:
    """Get the thread-safe singleton taxonomy instance."""
    global _taxonomy_instance
    if _taxonomy_instance is None:
        with _taxonomy_lock:
            # Double-check locking pattern
            if _taxonomy_instance is None:
                _taxonomy_instance = SemanticTaxonomy()
    return _taxonomy_instance

Submodules

memoir.taxonomy.semantic module

memoir.taxonomy.semantic

Comprehensive semantic taxonomy for AI memory classification. Defines hierarchical paths for deterministic memory organization.

TaxonomyCategory

Bases: Enum

Top-level taxonomy categories.

Source code in src/memoir/taxonomy/semantic.py
class TaxonomyCategory(Enum):
    """Top-level taxonomy categories."""

    PROFILE = "profile"
    PREFERENCES = "preferences"
    EXPERIENCE = "experience"
    CONTEXT = "context"
    KNOWLEDGE = "knowledge"
    RELATIONSHIPS = "relationships"
    GOALS = "goals"
    BEHAVIOR = "behavior"

TaxonomyNode dataclass

Represents a node in the taxonomy tree.

Source code in src/memoir/taxonomy/semantic.py
@dataclass
class TaxonomyNode:
    """Represents a node in the taxonomy tree."""

    path: str
    category: TaxonomyCategory
    depth: int
    is_leaf: bool
    description: str
    examples: list[str]

SemanticTaxonomy

Bases: BaseTaxonomy

Fixed semantic taxonomy with predefined paths. Provides hierarchical organization for AI memory classification. Implements TaxonomyInterface for standardized access.

Source code in src/memoir/taxonomy/semantic.py
class SemanticTaxonomy(BaseTaxonomy):
    """
    Fixed semantic taxonomy with predefined paths.
    Provides hierarchical organization for AI memory classification.
    Implements TaxonomyInterface for standardized access.
    """

    def __init__(self, taxonomy_loader: Any | None = None):
        """
        Initialize semantic taxonomy with flexible data loading.

        Args:
            taxonomy_loader: Optional TaxonomyLoader for loading taxonomy from store.
                            If None, uses TaxonomyPresets as fallback.
        """
        self._taxonomy_loader = taxonomy_loader
        self._all_paths = self._load_all_paths()
        self._path_index = self._build_path_index()

    def _load_all_paths(self) -> set[str]:
        """
        Load all paths from TaxonomyLoader or fallback to TaxonomyPresets.

        Returns:
            Set of all valid taxonomy paths.
        """
        paths = set()

        # Try to load from TaxonomyLoader (store-based)
        if self._taxonomy_loader:
            try:
                preset_paths = self._taxonomy_loader.get_preset_paths_from_store()
                if preset_paths:
                    for category, category_paths in preset_paths.items():
                        # Add the category itself
                        paths.add(category)
                        for path in category_paths:
                            full_path = f"{category}.{path}"
                            paths.add(full_path)
                            # Also add intermediate paths
                            parts = full_path.split(".")
                            for i in range(1, len(parts)):
                                paths.add(".".join(parts[:i]))
                    logger.debug(
                        f"[SemanticTaxonomy] Loaded {len(paths)} paths from store"
                    )
                    return paths
            except Exception as e:
                logger.warning(
                    f"[SemanticTaxonomy] Failed to load from store, using fallback: {e}"
                )

        # Fallback to TaxonomyPresets
        from .taxonomy import TaxonomyPresets, TaxonomyVersion

        preset_paths = TaxonomyPresets.PRESETS[TaxonomyVersion.SIMPLIFIED]
        for category, category_paths in preset_paths.items():
            # Add the category itself
            paths.add(category)
            for path in category_paths:
                full_path = f"{category}.{path}"
                paths.add(full_path)
                # Also add intermediate paths
                parts = full_path.split(".")
                for i in range(1, len(parts)):
                    paths.add(".".join(parts[:i]))

        logger.debug(
            f"[SemanticTaxonomy] Loaded {len(paths)} paths from TaxonomyPresets"
        )
        return paths

    def _build_path_index(self) -> dict[str, list[str]]:
        """Build an index for efficient path lookups."""
        index = {}
        for path in self._all_paths:
            parts = path.split(".")
            for i in range(len(parts)):
                prefix = ".".join(parts[: i + 1])
                if prefix not in index:
                    index[prefix] = []
                if path != prefix:
                    index[prefix].append(path)
        return index

    def get_all_paths(self) -> list[str]:
        """Return all valid taxonomy paths."""
        return sorted(self._all_paths)

    def get_children(self, path: str) -> list[str]:
        """Get immediate children of a path."""
        if path not in self._path_index:
            return []

        children = []
        path_depth = len(path.split("."))
        for child in self._path_index[path]:
            if len(child.split(".")) == path_depth + 1:
                children.append(child)
        return sorted(children)

    def get_descendants(self, path: str) -> list[str]:
        """Get all descendants of a path."""
        if path not in self._path_index:
            return []
        return sorted(self._path_index[path])

    def is_valid_path(self, path: str) -> bool:
        """Check if a path exists in the taxonomy."""
        return path in self._all_paths

    def get_path_depth(self, path: str) -> int:
        """Get the depth of a path in the hierarchy."""
        return len(path.split("."))

    def get_category(self, path: str) -> TaxonomyCategory:
        """Get the top-level category for a path."""
        if not path:
            return None
        root = path.split(".")[0]
        try:
            return TaxonomyCategory(root)
        except ValueError:
            return None

    def get_related_paths(self, path: str, max_distance: int = 2) -> list[str]:
        """Get paths related to the given path within a certain distance."""
        if not self.is_valid_path(path):
            return []

        related = set()
        parts = path.split(".")

        # Get siblings
        if len(parts) > 1:
            parent = ".".join(parts[:-1])
            related.update(self.get_children(parent))

        # Get ancestors up to max_distance
        for i in range(1, min(max_distance + 1, len(parts))):
            ancestor = ".".join(parts[:-i])
            related.add(ancestor)

        # Get descendants up to max_distance
        if max_distance > 0:
            descendants = self.get_descendants(path)
            for desc in descendants:
                if (
                    self.get_path_depth(desc) - self.get_path_depth(path)
                    <= max_distance
                ):
                    related.add(desc)

        related.discard(path)  # Remove the path itself
        return sorted(related)

    def get_statistics(self) -> dict:
        """Get statistics about the taxonomy."""
        category_counts = {}
        depth_counts = {}

        for path in self._all_paths:
            category = self.get_category(path)
            if category:
                cat_name = category.value
                category_counts[cat_name] = category_counts.get(cat_name, 0) + 1

            depth = self.get_path_depth(path)
            depth_counts[depth] = depth_counts.get(depth, 0) + 1

        return {
            "total_paths": len(self._all_paths),
            "categories": len(list(TaxonomyCategory)),
            "max_depth": max(depth_counts.keys()),
            "paths_by_category": category_counts,
            "paths_by_depth": depth_counts,
        }

__init__

__init__(taxonomy_loader: Any | None = None)

Initialize semantic taxonomy with flexible data loading.

Parameters:

Name Type Description Default
taxonomy_loader Any | None

Optional TaxonomyLoader for loading taxonomy from store. If None, uses TaxonomyPresets as fallback.

None
Source code in src/memoir/taxonomy/semantic.py
def __init__(self, taxonomy_loader: Any | None = None):
    """
    Initialize semantic taxonomy with flexible data loading.

    Args:
        taxonomy_loader: Optional TaxonomyLoader for loading taxonomy from store.
                        If None, uses TaxonomyPresets as fallback.
    """
    self._taxonomy_loader = taxonomy_loader
    self._all_paths = self._load_all_paths()
    self._path_index = self._build_path_index()

get_all_paths

get_all_paths() -> list[str]

Return all valid taxonomy paths.

Source code in src/memoir/taxonomy/semantic.py
def get_all_paths(self) -> list[str]:
    """Return all valid taxonomy paths."""
    return sorted(self._all_paths)

get_children

get_children(path: str) -> list[str]

Get immediate children of a path.

Source code in src/memoir/taxonomy/semantic.py
def get_children(self, path: str) -> list[str]:
    """Get immediate children of a path."""
    if path not in self._path_index:
        return []

    children = []
    path_depth = len(path.split("."))
    for child in self._path_index[path]:
        if len(child.split(".")) == path_depth + 1:
            children.append(child)
    return sorted(children)

get_descendants

get_descendants(path: str) -> list[str]

Get all descendants of a path.

Source code in src/memoir/taxonomy/semantic.py
def get_descendants(self, path: str) -> list[str]:
    """Get all descendants of a path."""
    if path not in self._path_index:
        return []
    return sorted(self._path_index[path])

is_valid_path

is_valid_path(path: str) -> bool

Check if a path exists in the taxonomy.

Source code in src/memoir/taxonomy/semantic.py
def is_valid_path(self, path: str) -> bool:
    """Check if a path exists in the taxonomy."""
    return path in self._all_paths

get_path_depth

get_path_depth(path: str) -> int

Get the depth of a path in the hierarchy.

Source code in src/memoir/taxonomy/semantic.py
def get_path_depth(self, path: str) -> int:
    """Get the depth of a path in the hierarchy."""
    return len(path.split("."))

get_category

get_category(path: str) -> TaxonomyCategory

Get the top-level category for a path.

Source code in src/memoir/taxonomy/semantic.py
def get_category(self, path: str) -> TaxonomyCategory:
    """Get the top-level category for a path."""
    if not path:
        return None
    root = path.split(".")[0]
    try:
        return TaxonomyCategory(root)
    except ValueError:
        return None
get_related_paths(path: str, max_distance: int = 2) -> list[str]

Get paths related to the given path within a certain distance.

Source code in src/memoir/taxonomy/semantic.py
def get_related_paths(self, path: str, max_distance: int = 2) -> list[str]:
    """Get paths related to the given path within a certain distance."""
    if not self.is_valid_path(path):
        return []

    related = set()
    parts = path.split(".")

    # Get siblings
    if len(parts) > 1:
        parent = ".".join(parts[:-1])
        related.update(self.get_children(parent))

    # Get ancestors up to max_distance
    for i in range(1, min(max_distance + 1, len(parts))):
        ancestor = ".".join(parts[:-i])
        related.add(ancestor)

    # Get descendants up to max_distance
    if max_distance > 0:
        descendants = self.get_descendants(path)
        for desc in descendants:
            if (
                self.get_path_depth(desc) - self.get_path_depth(path)
                <= max_distance
            ):
                related.add(desc)

    related.discard(path)  # Remove the path itself
    return sorted(related)

get_statistics

get_statistics() -> dict

Get statistics about the taxonomy.

Source code in src/memoir/taxonomy/semantic.py
def get_statistics(self) -> dict:
    """Get statistics about the taxonomy."""
    category_counts = {}
    depth_counts = {}

    for path in self._all_paths:
        category = self.get_category(path)
        if category:
            cat_name = category.value
            category_counts[cat_name] = category_counts.get(cat_name, 0) + 1

        depth = self.get_path_depth(path)
        depth_counts[depth] = depth_counts.get(depth, 0) + 1

    return {
        "total_paths": len(self._all_paths),
        "categories": len(list(TaxonomyCategory)),
        "max_depth": max(depth_counts.keys()),
        "paths_by_category": category_counts,
        "paths_by_depth": depth_counts,
    }

get_taxonomy

get_taxonomy() -> SemanticTaxonomy

Get the thread-safe singleton taxonomy instance.

Source code in src/memoir/taxonomy/semantic.py
def get_taxonomy() -> SemanticTaxonomy:
    """Get the thread-safe singleton taxonomy instance."""
    global _taxonomy_instance
    if _taxonomy_instance is None:
        with _taxonomy_lock:
            # Double-check locking pattern
            if _taxonomy_instance is None:
                _taxonomy_instance = SemanticTaxonomy()
    return _taxonomy_instance

memoir.taxonomy.iterative module

memoir.taxonomy.iterative

LLM-Driven Iterative Taxonomy Expansion System. Based on "Creating a Fine Grained Entity Type Taxonomy Using LLMs" paper. Implements iterative, focused subtree expansion with GPT-4.

DynamicNode dataclass

Represents a node in the dynamic taxonomy tree.

Source code in src/memoir/taxonomy/iterative.py
@dataclass
class DynamicNode:
    """Represents a node in the dynamic taxonomy tree."""

    path: str
    category: str | None
    depth: int
    is_leaf: bool
    is_dynamic: bool
    created_at: datetime
    children: dict[str, "DynamicNode"] = field(default_factory=dict)
    other_items: list[dict[str, Any]] = field(default_factory=list)
    item_count: int = field(default=0)

TaxonomyExpansionResult

Bases: BaseModel

Result of a taxonomy expansion operation.

Source code in src/memoir/taxonomy/iterative.py
class TaxonomyExpansionResult(BaseModel):
    """Result of a taxonomy expansion operation."""

    parent_path: str = Field(description="Path of the expanded parent node")
    new_paths: list[str] = Field(description="New taxonomy paths created")
    migrated_items: int = Field(description="Number of items migrated to new paths")
    confidence: float = Field(description="Confidence in the expansion quality")
    strategy: str = Field(description="Strategy used for expansion")
    reasoning: str = Field(description="Human-readable reasoning for expansion")
    timestamp: float = Field(description="When the expansion occurred")

LLMExpansionStrategy

Bases: Enum

LLM-based expansion strategies.

Source code in src/memoir/taxonomy/iterative.py
class LLMExpansionStrategy(Enum):
    """LLM-based expansion strategies."""

    FOCUSED_SUBTREE = "focused_subtree"  # Expand one subtree at a time
    BREADTH_FIRST = "breadth_first"  # Expand all nodes at same level
    DEPTH_FIRST = "depth_first"  # Expand deepest nodes first
    PATTERN_BASED = "pattern_based"  # Use pattern combinations

ExpansionContext dataclass

Context for LLM-driven expansion.

Source code in src/memoir/taxonomy/iterative.py
@dataclass
class ExpansionContext:
    """Context for LLM-driven expansion."""

    node_path: str
    parent_hierarchy: list[str]  # Full path from root
    sibling_categories: list[str]  # Existing siblings
    unclassified_items: list[dict[str, Any]]
    current_depth: int
    taxonomy_snapshot: dict[str, Any]  # Relevant taxonomy portion

TaxonomyCombination

Bases: BaseModel

Pattern-based taxonomy combination.

Source code in src/memoir/taxonomy/iterative.py
class TaxonomyCombination(BaseModel):
    """Pattern-based taxonomy combination."""

    pattern: str = Field(description="Combination pattern e.g. 'Location + Domain'")
    template: str = Field(description="Result template e.g. '{domain} in {location}'")
    examples: list[str] = Field(description="Example results")

LLMIterativeTaxonomy

Bases: BaseTaxonomy

LLM-driven iterative taxonomy that expands intelligently using GPT-4. Implements the methodology from the paper with focused subtree expansion.

Source code in src/memoir/taxonomy/iterative.py
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
class LLMIterativeTaxonomy(BaseTaxonomy):
    """
    LLM-driven iterative taxonomy that expands intelligently using GPT-4.
    Implements the methodology from the paper with focused subtree expansion.
    """

    def __init__(
        self,
        taxonomy_version: TaxonomyVersion = TaxonomyVersion.GENERAL,
        base_taxonomy: SemanticTaxonomy | None = None,
        llm: Any | None = None,
        expansion_strategy: LLMExpansionStrategy = LLMExpansionStrategy.FOCUSED_SUBTREE,
        min_items_threshold: int = MIN_ITEMS_FOR_EXPANSION,
        enable_combinations: bool = True,
        max_categories_per_expansion: int = MAX_CATEGORIES_PER_EXPANSION,
        use_full_base_taxonomy: bool = False,
    ):
        """
        Initialize LLM-driven iterative taxonomy.

        Args:
            taxonomy_version: The taxonomy preset version to use (e.g., GENERAL, AGENT_CONVERSATION)
            base_taxonomy: Optional custom taxonomy structure (overrides taxonomy_version if provided)
            llm: Language model for expansion (GPT-4 recommended)
            expansion_strategy: Strategy for taxonomy expansion
            min_items_threshold: Minimum items before triggering expansion
            enable_combinations: Enable pattern-based combinations
            max_categories_per_expansion: Maximum categories to suggest per LLM expansion (default: 10)
            use_full_base_taxonomy: If True, imports full taxonomy hierarchy; if False, only first level
        """
        self.taxonomy_version = taxonomy_version
        self.base_taxonomy = base_taxonomy
        self.use_full_base_taxonomy = use_full_base_taxonomy
        self.llm = llm
        self.expansion_strategy = expansion_strategy
        self.min_items_threshold = min_items_threshold
        self.enable_combinations = enable_combinations
        self.max_categories_per_expansion = max_categories_per_expansion

        # Build initial structure
        self.root = self._build_initial_tree()
        self.path_index: dict[str, DynamicNode] = {}
        self._rebuild_index()

        # Track expansions and combinations
        self.expansion_history: list[TaxonomyExpansionResult] = []
        self.active_expansions: set[str] = set()  # Paths being expanded
        self.combinations: list[TaxonomyCombination] = []

        # Expansion queue for parallel processing
        self.expansion_queue: asyncio.Queue = None
        self.expansion_workers: list[asyncio.Task] = []

    def _build_initial_tree(self) -> DynamicNode:
        """Build initial tree from base taxonomy."""
        root = DynamicNode(
            path="",
            category=None,
            depth=0,
            is_leaf=False,
            is_dynamic=False,
            created_at=datetime.now(),
        )

        # Use custom taxonomy if provided, otherwise use preset
        if self.base_taxonomy and self.use_full_base_taxonomy:
            # Import full base taxonomy paths (legacy behavior)
            base_paths = self.base_taxonomy.get_all_paths()
            for path in base_paths:
                self._add_path_to_tree(root, path, is_dynamic=False)
        else:
            # Use only first-level categories from the selected preset
            first_level_categories = TaxonomyPresets.get_first_level_categories(
                self.taxonomy_version
            )
            for category in first_level_categories:
                # Add first-level category as non-leaf to allow expansion
                node = self._add_path_to_tree(root, category, is_dynamic=False)
                # Force it to be non-leaf even if it has no children yet
                node.is_leaf = False

        # Add strategic 'other' categories for expansion
        self._add_strategic_other_categories(root)

        return root

    def _add_strategic_other_categories(self, node: DynamicNode, max_depth: int = 3):
        """Add 'other' categories only at strategic levels for expansion."""
        if node.depth >= max_depth:
            return

        # Add 'other' to non-leaf nodes or nodes with children
        # This includes first-level categories that were marked as non-leaf
        if (node.children or not node.is_leaf) and "other" not in node.children:
            other_path = f"{node.path}.other" if node.path else "other"
            node.children["other"] = DynamicNode(
                path=other_path,
                category=node.category,
                depth=node.depth + 1,
                is_leaf=False,
                is_dynamic=True,
                created_at=datetime.now(),
            )

        # Recursively add to non-other children
        for name, child in node.children.items():
            if name != "other":
                self._add_strategic_other_categories(child, max_depth)

    async def expand_subtree_with_llm(
        self, node_path: str, focus_depth: int | None = None
    ) -> TaxonomyExpansionResult:
        """
        Expand a subtree using LLM-driven analysis.
        Implements the paper's focused subtree expansion approach.

        Args:
            node_path: Path to the node to expand
            focus_depth: Optional depth limit for expansion

        Returns:
            TaxonomyExpansionResult with expansion details
        """
        if node_path not in self.path_index:
            return TaxonomyExpansionResult(
                parent_path=node_path,
                new_paths=[],
                migrated_items=0,
                confidence=0.0,
                strategy=self.expansion_strategy.value,
                reasoning="Node not found",
                timestamp=time.time(),
            )

        node = self.path_index[node_path]

        # Check if enough items for expansion
        if len(node.other_items) < self.min_items_threshold:
            return TaxonomyExpansionResult(
                parent_path=node_path,
                new_paths=[],
                migrated_items=0,
                confidence=0.0,
                strategy=self.expansion_strategy.value,
                reasoning=f"Insufficient items ({len(node.other_items)} < {self.min_items_threshold})",
                timestamp=time.time(),
            )

        # Mark as active expansion
        self.active_expansions.add(node_path)

        try:
            # Build expansion context
            context = self._build_expansion_context(node)

            # Generate categories using LLM
            new_categories = await self._generate_categories_with_llm(context)

            # Create new nodes
            new_paths = []
            for category in new_categories:
                new_path = f"{node_path}.{category}".lstrip(".")
                if new_path not in self.path_index:
                    self._add_path_to_tree(self.root, new_path, is_dynamic=True)
                    new_paths.append(new_path)

                    # Add 'other' subcategory if at appropriate depth
                    if node.depth < MAX_DEPTH - 2:
                        other_subpath = f"{new_path}.other"
                        self._add_path_to_tree(
                            self.root, other_subpath, is_dynamic=True
                        )

            # Rebuild index
            self._rebuild_index()

            # Reclassify and migrate items
            migrated_count = await self._reclassify_items(node, new_paths)

            result = TaxonomyExpansionResult(
                parent_path=node_path,
                new_paths=new_paths,
                migrated_items=migrated_count,
                confidence=0.8,  # Default confidence for LLM expansion
                strategy=self.expansion_strategy.value,
                reasoning=f"LLM-driven expansion created {len(new_paths)} categories from {len(node.other_items)} items",
                timestamp=time.time(),
            )

            self.expansion_history.append(result)
            return result

        finally:
            self.active_expansions.discard(node_path)

    def _build_expansion_context(self, node: DynamicNode) -> ExpansionContext:
        """Build context for LLM expansion."""
        # Get parent hierarchy
        path_parts = node.path.split(".") if node.path else []

        # Get sibling categories
        parent_path = ".".join(path_parts[:-1]) if len(path_parts) > 1 else ""
        parent_node = self.path_index.get(parent_path, self.root)
        siblings = [name for name in parent_node.children if name != "other"]

        # Get relevant taxonomy snapshot (parent and siblings structure)
        taxonomy_snapshot = self._get_taxonomy_snapshot(parent_node, depth=2)

        return ExpansionContext(
            node_path=node.path,
            parent_hierarchy=path_parts,
            sibling_categories=siblings,
            unclassified_items=node.other_items[:20],  # Sample for LLM
            current_depth=node.depth,
            taxonomy_snapshot=taxonomy_snapshot,
        )

    def _get_taxonomy_snapshot(
        self, node: DynamicNode, depth: int = 2
    ) -> dict[str, Any]:
        """Get a snapshot of taxonomy structure around a node."""
        if depth <= 0 or not node.children:
            return {"path": node.path, "is_leaf": node.is_leaf}

        snapshot = {"path": node.path, "children": {}}

        for name, child in node.children.items():
            if name != "other":  # Exclude 'other' from snapshot
                snapshot["children"][name] = self._get_taxonomy_snapshot(
                    child, depth - 1
                )

        return snapshot

    async def _generate_categories_with_llm(
        self, context: ExpansionContext
    ) -> list[str]:
        """
        Generate new categories using LLM based on context.
        Implements the paper's prompting strategy.
        """
        if not self.llm:
            # Fallback to pattern analysis if no LLM
            return self._fallback_category_generation(context)

        # Build prompt following paper's approach
        prompt = self._build_expansion_prompt(context)

        try:
            # Call LLM (implementation depends on LLM interface)
            # This is a placeholder - actual implementation would use the LLM's API
            response = await self._call_llm(prompt)
            categories = self._parse_llm_response(response)

            # Validate and filter categories
            valid_categories = []
            for category in categories[:MAX_CATEGORIES_PER_EXPANSION]:
                if self._validate_category(category, context):
                    valid_categories.append(category)

            return valid_categories

        except Exception as e:
            logger.error(f"LLM expansion failed: {e}")
            return self._fallback_category_generation(context)

    def _build_expansion_prompt(self, context: ExpansionContext) -> str:
        """Build prompt for LLM expansion following paper's methodology."""
        prompt_parts = [
            "You are expanding a hierarchical taxonomy. Based on the unclassified items below, "
            "suggest new categories that would logically fit into the existing structure.",
            "",
            f"Current path: {context.node_path or 'root'}",
            f"Depth level: {context.current_depth}",
        ]

        # Add domain-specific guidance
        domain_guidance = self._get_domain_guidance(context.node_path)
        if domain_guidance:
            prompt_parts.extend(["", "Domain-specific guidance:", domain_guidance])

        prompt_parts.extend(["", "Existing sibling categories:"])

        for sibling in context.sibling_categories[:10]:
            prompt_parts.append(f"  - {sibling}")

        prompt_parts.extend(
            [
                "",
                "Sample unclassified items:",
            ]
        )

        for item in context.unclassified_items[:10]:
            content = item.get("content", "")
            content = content[:100] if isinstance(content, str) else str(content)[:100]
            prompt_parts.append(f"  - {content}")

        prompt_parts.extend(
            [
                "",
                f"Suggest up to {self.max_categories_per_expansion} new category names that would logically group these items.",
                "Categories should:",
                "1. Be semantically coherent with existing siblings AND maintain domain consistency",
                "2. Be at the appropriate level of specificity for this depth",
                "3. Not duplicate existing categories",
                "4. Follow the naming convention of siblings",
                "5. Ensure categories belong semantically to the domain/area context",
                "",
                "IMPORTANT: Only suggest categories that truly belong in this domain/area.",
                "If items seem to belong elsewhere, suggest 'needs_reclassification' instead.",
                "",
                "Return only the category names, one per line.",
            ]
        )

        return "\n".join(prompt_parts)

    def _get_domain_guidance(self, node_path: str) -> str:
        """Get dynamic domain-specific guidance for expansion based on existing taxonomy structure."""
        if not node_path:
            return ""

        path_parts = node_path.split(".")
        if len(path_parts) < 1:
            return ""

        domain = path_parts[0]
        area = path_parts[1] if len(path_parts) > 1 else None

        # Dynamically analyze existing taxonomy structure for this domain
        domain_analysis = self._analyze_domain_patterns(domain, area)

        guidance_parts = [f"This expansion is in the {domain}"]
        if area:
            guidance_parts[0] += f".{area}"
        guidance_parts[0] += " domain."

        # Add context from existing sibling areas
        if domain_analysis["sibling_areas"]:
            guidance_parts.append(
                f"Related areas in {domain}: {', '.join(domain_analysis['sibling_areas'][:5])}."
            )

        # Add semantic consistency guidance based on actual taxonomy diversity
        if domain_analysis["concept_diversity"] > 0.5:
            guidance_parts.append(
                "Maintain semantic consistency - avoid mixing unrelated concepts from other domains."
            )

        # Add depth-appropriate guidance
        current_depth = len(path_parts)
        if current_depth <= 2:
            guidance_parts.append(
                "Focus on creating intermediate categories that logically group related concepts."
            )
        else:
            guidance_parts.append(
                "Create specific categories that maintain the hierarchical progression."
            )

        return " ".join(guidance_parts)

    def suggest_intermediate_levels(self, path: str, content: str) -> dict:
        """
        Dynamically suggest intermediate levels based on existing taxonomy structure and content analysis.

        Args:
            path: Current taxonomy path
            content: Content being classified

        Returns:
            Dict with intermediate level suggestions
        """
        path_parts = path.split(".")
        suggestions = []
        reasoning = ""

        # Only suggest intermediates for shallow paths (depth 1-2)
        if len(path_parts) <= 2:
            # Analyze existing taxonomy to find common intermediate patterns
            intermediate_analysis = self._analyze_intermediate_patterns(path, content)

            suggestions = intermediate_analysis["suggestions"]
            reasoning = intermediate_analysis["reasoning"]

        return {
            "suggestions": suggestions,
            "reasoning": reasoning,
        }

    def _analyze_domain_patterns(self, domain: str, area: str | None = None) -> dict:
        """
        Dynamically analyze existing taxonomy patterns for a domain/area.

        Args:
            domain: The domain to analyze
            area: Optional specific area within the domain

        Returns:
            Dict with domain pattern analysis
        """
        all_paths = self.get_all_paths()

        # Find all paths in this domain
        domain_paths = [p for p in all_paths if p.startswith(f"{domain}.")]

        # Extract areas (second level categories)
        areas = set()
        for path in domain_paths:
            parts = path.split(".")
            if len(parts) >= 2:
                areas.add(parts[1])

        # Calculate concept diversity (how varied the domain is)
        concept_diversity = (
            len(areas) / max(len(domain_paths), 1) if domain_paths else 0
        )

        # Find sibling areas if we're analyzing a specific area
        sibling_areas = list(areas)
        if area and area in sibling_areas:
            sibling_areas.remove(area)

        return {
            "sibling_areas": sibling_areas,
            "concept_diversity": concept_diversity,
            "total_paths": len(domain_paths),
            "depth_distribution": self._get_depth_distribution(domain_paths),
        }

    def _analyze_intermediate_patterns(self, path: str, content: str) -> dict:
        """
        Analyze existing taxonomy structure to suggest intermediate levels dynamically.

        Args:
            path: Current path (domain or domain.area)
            content: Content being classified

        Returns:
            Dict with suggested intermediate patterns
        """
        path_parts = path.split(".")
        suggestions = []
        reasoning = ""

        if len(path_parts) == 1:
            # Domain level - suggest areas based on existing taxonomy
            domain = path_parts[0]
            domain_analysis = self._analyze_domain_patterns(domain)

            # Suggest existing areas that might match content
            content_words = set(content.lower().split())
            for area in domain_analysis["sibling_areas"]:
                area_words = set(area.replace("_", " ").split())
                if content_words.intersection(area_words):
                    suggestions.append(f"{path}.{area}")

            if suggestions:
                reasoning = f"Suggested existing areas in {domain} domain that match content keywords"

        elif len(path_parts) == 2:
            # Area level - suggest intermediate categories based on similar paths
            domain, area = path_parts

            # Find existing paths that go deeper than current path
            all_paths = self.get_all_paths()
            deeper_paths = [
                p
                for p in all_paths
                if p.startswith(f"{path}.") and len(p.split(".")) == 3
            ]

            if deeper_paths:
                # Extract third-level categories
                third_levels = [p.split(".")[2] for p in deeper_paths]

                # Check which ones might match the content
                content_lower = content.lower()
                for third_level in set(third_levels):
                    if third_level.lower() in content_lower or any(
                        word in content_lower
                        for word in third_level.replace("_", " ").split()
                    ):
                        suggestions.append(f"{path}.{third_level}")

                if suggestions:
                    reasoning = (
                        f"Suggested existing subcategories in {area} that match content"
                    )
            else:
                # No existing deeper paths, suggest based on common patterns
                suggestions = self._suggest_common_intermediates(path, content)
                if suggestions:
                    reasoning = (
                        "Suggested common intermediate patterns for better specificity"
                    )

        return {
            "suggestions": suggestions,
            "reasoning": reasoning,
        }

    def _suggest_common_intermediates(self, path: str, content: str) -> list[str]:
        """
        Suggest intermediate patterns based purely on learned patterns from existing taxonomy.
        No hard-coded assumptions - only learns from actual taxonomy structure.
        """
        suggestions = []
        content_lower = content.lower()
        content_words = set(content_lower.split())

        # Find similar paths in the taxonomy to learn patterns from
        all_paths = self.get_all_paths()
        path_parts = path.split(".")

        if len(path_parts) >= 2:
            _, area = path_parts[0], path_parts[1]

            # Look for similar area patterns in any domain
            similar_patterns = []
            for existing_path in all_paths:
                existing_parts = existing_path.split(".")
                if len(existing_parts) >= 3:
                    _existing_domain, existing_area, existing_sub = existing_parts[:3]

                    # Find areas with similar naming patterns or content overlap
                    area_words = set(area.replace("_", " ").split())
                    existing_area_words = set(existing_area.replace("_", " ").split())

                    # Check for word overlap or semantic similarity
                    word_overlap = area_words.intersection(existing_area_words)
                    content_overlap = content_words.intersection(
                        set(existing_sub.replace("_", " ").split())
                    )

                    if word_overlap or content_overlap:
                        similar_patterns.append(existing_sub)

            # Extract the most relevant patterns based on content matching
            pattern_scores = {}
            for pattern in set(similar_patterns):
                pattern_words = set(pattern.replace("_", " ").split())
                overlap = content_words.intersection(pattern_words)
                if overlap:
                    pattern_scores[pattern] = len(overlap)

            # Suggest top scoring patterns
            if pattern_scores:
                sorted_patterns = sorted(
                    pattern_scores.items(), key=lambda x: x[1], reverse=True
                )
                for pattern, _score in sorted_patterns[:3]:  # Top 3 suggestions
                    suggestions.append(f"{path}.{pattern}")

        return suggestions

    def _get_depth_distribution(self, paths: list[str]) -> dict:
        """Get the distribution of path depths."""
        depth_counts = {}
        for path in paths:
            depth = len(path.split("."))
            depth_counts[depth] = depth_counts.get(depth, 0) + 1
        return depth_counts

    def _validate_with_learned_patterns(
        self, domain: str, area: str, content_lower: str
    ) -> dict:
        """
        Validate domain/area using purely learned patterns from existing taxonomy structure.
        No hard-coded rules - everything is learned from actual usage patterns.

        Args:
            domain: Domain to validate
            area: Area within domain to validate
            content_lower: Lowercase content to check

        Returns:
            Dict with validation info and keywords to use
        """
        # Only use learned patterns from existing taxonomy - no hard-coded rules
        learned_keywords = self._extract_keywords_from_taxonomy(domain, area)

        return {
            "has_rules": len(learned_keywords) > 0,
            "keywords": learned_keywords,
        }

    def _extract_keywords_from_taxonomy(self, domain: str, area: str) -> list[str]:
        """
        Extract keywords from existing taxonomy paths and content for this domain.area.

        This allows the system to learn validation patterns from actual usage.
        """
        keywords = []
        all_paths = self.get_all_paths()

        # Find paths in this domain.area
        area_paths = [p for p in all_paths if p.startswith(f"{domain}.{area}.")]

        # Extract keywords from path components
        for path in area_paths:
            parts = path.split(".")[2:]  # Skip domain.area
            for part in parts:
                # Convert underscore/camelCase to words
                words = part.replace("_", " ").lower().split()
                keywords.extend(words)

        # Could also extract from stored content in 'other' nodes (future enhancement)

        return list(set(keywords))

    def _validate_with_structure_analysis(self, path: str, content: str) -> dict:
        """
        Validate using structural analysis when no specific rules exist.

        This is the fallback for completely new domains/areas.
        """
        path_parts = path.split(".")
        domain = path_parts[0]

        # Check if content seems related to any existing paths in this domain
        all_paths = self.get_all_paths()
        domain_paths = [p for p in all_paths if p.startswith(f"{domain}.")]

        if not domain_paths:
            # Brand new domain - assume valid
            return {"valid": True, "confidence": 0.5, "issues": [], "suggestions": []}

        # Analyze similarity to existing paths
        content_words = set(content.lower().split())

        # Find most similar existing paths
        similarities = []
        for existing_path in domain_paths:
            path_words = set()
            for part in existing_path.split("."):
                path_words.update(part.replace("_", " ").split())

            intersection = content_words.intersection(path_words)
            if intersection:
                similarity = len(intersection) / len(content_words.union(path_words))
                similarities.append((existing_path, similarity))

        if similarities:
            similarities.sort(key=lambda x: x[1], reverse=True)
            best_match = similarities[0]

            if best_match[1] > 0.3:  # Good similarity
                return {
                    "valid": True,
                    "confidence": 0.8,
                    "issues": [],
                    "suggestions": [],
                }
            else:
                # Suggest the best matching path
                return {
                    "valid": False,
                    "confidence": 0.4,
                    "issues": [
                        f"Content doesn't seem to match {path} based on structural analysis"
                    ],
                    "suggestions": [best_match[0]],
                }

        # No similar paths found - might be misclassified
        return {
            "valid": False,
            "confidence": 0.2,
            "issues": [f"Content doesn't seem to match existing patterns in {domain}"],
            "suggestions": [],
        }

    def analyze_path_quality(self, path: str, content: str) -> dict:
        """
        Comprehensive analysis of classification path quality.

        Args:
            path: Taxonomy path to analyze
            content: Content being classified

        Returns:
            Dict with comprehensive quality analysis
        """
        analysis = {
            "overall_score": 0.0,
            "domain_consistency": {},
            "intermediate_suggestions": {},
            "depth_analysis": {},
            "recommendations": [],
        }

        # 1. Domain consistency analysis
        domain_validation = self.validate_domain_consistency(path, content)
        analysis["domain_consistency"] = domain_validation

        # 2. Intermediate level suggestions
        intermediate_analysis = self.suggest_intermediate_levels(path, content)
        analysis["intermediate_suggestions"] = intermediate_analysis

        # 3. Depth analysis
        path_parts = path.split(".")
        depth = len(path_parts)

        analysis["depth_analysis"] = {
            "current_depth": depth,
            "optimal_range": "2-4 levels",
            "is_optimal": 2 <= depth <= 4,
            "issues": [],
        }

        if depth == 1:
            analysis["depth_analysis"]["issues"].append(
                "Too broad - needs more specificity"
            )
        elif depth > 4:
            analysis["depth_analysis"]["issues"].append(
                "Too deep - may be overly specific"
            )

        # 4. Calculate overall score
        score = 0.0

        # Domain consistency (40% weight)
        if domain_validation["valid"]:
            score += 0.4 * domain_validation["confidence"]

        # Depth appropriateness (30% weight)
        if 2 <= depth <= 4:
            score += 0.3
        elif depth == 1:
            score += 0.1  # Very broad
        elif depth > 4:
            score += 0.2  # Too specific

        # Path completeness (30% weight)
        if intermediate_analysis["suggestions"]:
            score += 0.1  # Some issues but fixable
        else:
            score += 0.3  # No obvious missing levels

        analysis["overall_score"] = min(1.0, score)

        # 5. Generate recommendations
        recommendations = []

        if not domain_validation["valid"]:
            recommendations.append(
                f"Domain mismatch detected. Consider: {', '.join(domain_validation['suggestions'][:2])}"
            )

        if intermediate_analysis["suggestions"]:
            recommendations.append(
                f"Add intermediate level: {intermediate_analysis['suggestions'][0]}"
            )

        if depth == 1:
            recommendations.append(
                "Classification too broad - add more specific categories"
            )
        elif depth > 4:
            recommendations.append(
                "Classification too specific - consider using parent category"
            )

        analysis["recommendations"] = recommendations

        return analysis

    async def _call_llm(self, prompt: str) -> str:
        """Call the LLM with the prompt."""
        if self.llm is None:
            # Fallback when no LLM is provided
            return "category1\ncategory2\ncategory3"

        try:
            # Use the provided LLM (works with LangChain LLMs)
            response = await self.llm.ainvoke(prompt)

            # Handle different response types
            if hasattr(response, "content"):
                content = response.content
                print(f"\n🤖 GPT Response: {content}")
                return content
            elif isinstance(response, str):
                print(f"LLM String Response: {response}")
                return response
            else:
                str_response = str(response)
                print(f"LLM String Conversion: {str_response}")
                return str_response
        except Exception as e:
            # Log the error and fall back to default categories
            print(f"LLM call failed: {e}")
            return "category1\ncategory2\ncategory3"

    def _parse_llm_response(self, response: str) -> list[str]:
        """Parse LLM response to extract category names."""
        categories = []
        for line in response.strip().split("\n"):
            line = line.strip()
            if line and not line.startswith("#"):  # Skip comments
                # Clean up the category name - handle numbered lists, bullets, etc.
                category = line

                # Remove numbered list prefixes (1., 2., etc.)
                import re

                category = re.sub(r"^\d+\.\s*", "", category)

                # Remove bullet prefixes (-, *, etc.)
                category = category.strip("- ").strip("* ").strip()

                if category:
                    categories.append(category)

        print(f"📋 Parsed categories: {categories}")
        return categories

    def _validate_category(self, category: str, context: ExpansionContext) -> bool:
        """Validate a proposed category name."""
        # Check for duplicates
        if category in context.sibling_categories:
            return False

        # Check for invalid characters
        if not category or "/" in category or "." in category:
            return False

        # Check length
        return not len(category) > 50

    def _fallback_category_generation(self, context: ExpansionContext) -> list[str]:
        """Fallback category generation without LLM."""
        # Analyze patterns in unclassified items
        categories = set()

        for item in context.unclassified_items:
            if "original_classification" in item:
                orig_path = item["original_classification"]
                parts = orig_path.split(".")

                # Extract the next level that was attempted
                if len(parts) > context.current_depth:
                    next_level = parts[context.current_depth]
                    categories.add(next_level)

        return list(categories)[:MAX_CATEGORIES_PER_EXPANSION]

    async def _reclassify_items(self, node: DynamicNode, new_paths: list[str]) -> int:
        """Reclassify items from 'other' to new categories using LLM if available."""
        if not node.other_items or not new_paths:
            return 0

        migrated_count = 0
        remaining_items = []

        for item in node.other_items:
            best_path = await self._find_best_category(item, new_paths)

            if best_path:
                # Migrate to new category
                target_node = self.path_index[best_path]
                target_node.item_count += 1
                migrated_count += 1
            else:
                remaining_items.append(item)

        node.other_items = remaining_items
        return migrated_count

    async def _find_best_category(
        self, item: dict[str, Any], candidate_paths: list[str]
    ) -> str | None:
        """Find the best category for an item among candidates using LLM-based classification."""
        if not candidate_paths:
            return None

        content = item.get("content", "")
        if not content:
            return None

        # Use LLM for intelligent classification
        if self.llm:
            try:
                prompt = self._build_classification_prompt(content, candidate_paths)
                response = await self._call_llm(prompt)
                return self._parse_best_category_response(response, candidate_paths)
            except Exception as e:
                print(f"LLM classification failed for item: {e}")
                # Fall back to simple heuristic

        # Simple fallback: find best category using basic string matching
        return self._find_category_by_text_similarity(content, candidate_paths)

    def _build_classification_prompt(
        self, content: str, candidate_paths: list[str]
    ) -> str:
        """Build a prompt for LLM to classify content into best category."""
        # Extract just the category names for cleaner prompt
        categories = [path.split(".")[-1] for path in candidate_paths]

        prompt_parts = [
            "You are classifying content into the most appropriate category.",
            "",
            f"Content to classify: {content}",
            "",
            "Available categories:",
        ]

        for i, category in enumerate(categories, 1):
            prompt_parts.append(f"{i}. {category}")

        prompt_parts.extend(
            [
                "",
                "Return ONLY the number (1, 2, 3, etc.) of the best matching category.",
                "If no category is a good match, return 0.",
                "Consider semantic meaning, not just exact keyword matches.",
            ]
        )

        return "\n".join(prompt_parts)

    def _parse_best_category_response(
        self, response: str, candidate_paths: list[str]
    ) -> str | None:
        """Parse LLM response to get the best category path."""
        try:
            # Extract number from response
            import re

            numbers = re.findall(r"\d+", response.strip())
            if not numbers:
                return None

            choice = int(numbers[0])

            # Return None if LLM said no good match (0)
            if choice == 0:
                return None

            # Return the corresponding path (1-indexed)
            if 1 <= choice <= len(candidate_paths):
                chosen_path = candidate_paths[choice - 1]
                print(
                    f"🎯 LLM chose category: {chosen_path.split('.')[-1]} for content: {response.strip()}"
                )
                return chosen_path

        except Exception as e:
            print(f"Failed to parse LLM category response '{response}': {e}")

        return None

    def _find_category_by_text_similarity(
        self, content: str, candidate_paths: list[str]
    ) -> str | None:
        """Fallback method using simple text similarity when LLM is unavailable."""
        content_lower = content.lower()

        # Try exact category name matches first
        for path in candidate_paths:
            category = path.split(".")[-1].lower()
            if category in content_lower:
                return path

        # Try partial matches with category name parts
        for path in candidate_paths:
            category = path.split(".")[-1].lower()
            category_parts = category.replace("-", "_").split("_")

            # Look for category parts in content (minimum 4 chars to avoid false matches)
            if any(part in content_lower for part in category_parts if len(part) >= 4):
                return path

        # No good match found
        return None

    async def parallel_expand(
        self, target_paths: list[str] | None = None
    ) -> list[TaxonomyExpansionResult]:
        """
        Perform parallel expansion of multiple subtrees.
        Implements the paper's approach of concurrent work on different branches.

        Args:
            target_paths: Specific paths to expand, or None for automatic selection

        Returns:
            List of expansion results
        """
        if not target_paths:
            target_paths = self._select_expansion_targets()

        # Limit parallel expansions
        target_paths = target_paths[:PARALLEL_EXPANSION_LIMIT]

        # Create expansion tasks
        tasks = []
        for path in target_paths:
            if path not in self.active_expansions:
                task = asyncio.create_task(self.expand_subtree_with_llm(path))
                tasks.append(task)

        # Wait for all expansions
        results = await asyncio.gather(*tasks)

        return results

    def _select_expansion_targets(self) -> list[str]:
        """Select nodes for expansion based on strategy."""
        targets = []

        if self.expansion_strategy == LLMExpansionStrategy.FOCUSED_SUBTREE:
            # Find nodes with most items in 'other'
            candidates = [
                (path, node)
                for path, node in self.path_index.items()
                if path.endswith(".other")
                and len(node.other_items) >= self.min_items_threshold
            ]
            candidates.sort(key=lambda x: len(x[1].other_items), reverse=True)
            targets = [path for path, _ in candidates]

        elif self.expansion_strategy == LLMExpansionStrategy.BREADTH_FIRST:
            # Expand all nodes at the shallowest depth with items
            min_depth = float("inf")
            for path, node in self.path_index.items():
                if (
                    path.endswith(".other")
                    and len(node.other_items) >= self.min_items_threshold
                ):
                    min_depth = min(min_depth, node.depth)

            targets = [
                path
                for path, node in self.path_index.items()
                if path.endswith(".other")
                and node.depth == min_depth
                and len(node.other_items) >= self.min_items_threshold
            ]

        elif self.expansion_strategy == LLMExpansionStrategy.DEPTH_FIRST:
            # Expand deepest nodes first
            candidates = [
                (path, node)
                for path, node in self.path_index.items()
                if path.endswith(".other")
                and len(node.other_items) >= self.min_items_threshold
            ]
            candidates.sort(key=lambda x: x[1].depth, reverse=True)
            targets = [path for path, _ in candidates]

        return targets

    def apply_combinations(self, combination: TaxonomyCombination) -> list[str]:
        """
        Apply pattern-based combinations to reduce redundancy.
        Implements the paper's combination approach.

        Args:
            combination: Pattern combination to apply

        Returns:
            List of newly created combination paths
        """
        if not self.enable_combinations:
            return []

        new_paths = []

        # Parse combination pattern (e.g., "Location + Domain")
        parts = combination.pattern.split(" + ")
        if len(parts) != 2:
            return []

        category1, category2 = parts[0].strip(), parts[1].strip()

        # Find matching paths
        paths1 = [p for p in self.path_index if category1.lower() in p.lower()]
        paths2 = [p for p in self.path_index if category2.lower() in p.lower()]

        # Create combinations
        for path1 in paths1[:10]:  # Limit combinations
            for path2 in paths2[:10]:
                # Extract relevant parts
                loc_part = path1.split(".")[-1]
                dom_part = path2.split(".")[-1]

                # Apply template
                combined = combination.template.format(
                    location=loc_part, domain=dom_part
                )

                # Create new path
                new_path = f"combined.{combined.replace(' ', '_').lower()}"
                if new_path not in self.path_index:
                    self._add_path_to_tree(self.root, new_path, is_dynamic=True)
                    new_paths.append(new_path)

        # Rebuild index
        self._rebuild_index()

        # Track combination
        self.combinations.append(combination)

        return new_paths

    def _add_path_to_tree(
        self, root: DynamicNode, path: str, is_dynamic: bool = False
    ) -> DynamicNode:
        """Add a path to the tree structure."""
        parts = path.split(".")
        current = root

        for i, part in enumerate(parts):
            current_path = ".".join(parts[: i + 1])

            if part not in current.children:
                current.children[part] = DynamicNode(
                    path=current_path,
                    category=None,
                    depth=i + 1,
                    is_leaf=(i == len(parts) - 1),
                    is_dynamic=is_dynamic,
                    created_at=datetime.now(),
                )

            current = current.children[part]

        return current

    def _rebuild_index(self):
        """Rebuild the path index."""
        self.path_index = {}

        def traverse(node: DynamicNode):
            if node.path:
                self.path_index[node.path] = node
            for child in node.children.values():
                traverse(child)

        traverse(self.root)

    def is_valid_path(self, path: str) -> bool:
        """Check if a path exists in the taxonomy."""
        return path in self.path_index

    def get_all_paths(self) -> list[str]:
        """Get all available paths in the taxonomy."""
        return list(self.path_index.keys())

    def export_for_llm(self) -> str:
        """
        Export taxonomy in a format suitable for LLM context.
        Follows the paper's approach for maintaining taxonomy in GPT-4 context.
        """

        def node_to_dict(node: DynamicNode, max_depth: int = 5) -> dict[str, Any]:
            if node.depth >= max_depth or not node.children:
                return {"path": node.path, "item_count": node.item_count}

            return {
                "path": node.path,
                "children": {
                    name: node_to_dict(child, max_depth)
                    for name, child in node.children.items()
                    if not name.endswith("other")  # Exclude 'other' for clarity
                },
            }

        taxonomy_dict = node_to_dict(self.root)
        return json.dumps(taxonomy_dict, indent=2)

    def validate_domain_consistency(self, path: str, content: str) -> dict:
        """
        Validate if content semantically belongs in the domain/area of the given path.

        Args:
            path: The taxonomy path to validate
            content: The content being classified

        Returns:
            Dict with validation results and suggestions
        """
        path_parts = path.split(".")
        if len(path_parts) < 2:
            return {"valid": True, "confidence": 1.0, "issues": [], "suggestions": []}

        domain = path_parts[0]
        area = path_parts[1]
        content_lower = content.lower()

        # Use dynamic validation combining core rules with learned patterns
        validation_result = self._validate_with_learned_patterns(
            domain, area, content_lower
        )

        if validation_result["has_rules"]:
            area_keywords = validation_result["keywords"]
        else:
            # No specific rules - use taxonomy structure analysis
            return self._validate_with_structure_analysis(path, content)

        # Check if content contains keywords relevant to this area
        content_matches_area = any(
            keyword in content_lower for keyword in area_keywords
        )

        if content_matches_area:
            return {"valid": True, "confidence": 0.9, "issues": [], "suggestions": []}

        # Content doesn't match - find better alternatives using dynamic analysis
        suggestions = []

        # Check other areas in same domain
        domain_analysis = self._analyze_domain_patterns(domain)
        for other_area in domain_analysis["sibling_areas"]:
            if other_area != area:
                other_validation = self._validate_with_learned_patterns(
                    domain, other_area, content_lower
                )
                if other_validation["has_rules"]:
                    other_keywords = other_validation["keywords"]
                    if any(keyword in content_lower for keyword in other_keywords):
                        suggestions.append(f"{domain}.{other_area}")

        # Check if content might belong to different domain entirely
        all_paths = self.get_all_paths()
        domains = list({p.split(".")[0] for p in all_paths if "." in p})

        for other_domain in domains:
            if other_domain != domain:
                other_domain_analysis = self._analyze_domain_patterns(other_domain)
                for other_area in other_domain_analysis["sibling_areas"]:
                    other_validation = self._validate_with_learned_patterns(
                        other_domain, other_area, content_lower
                    )
                    if other_validation["has_rules"]:
                        other_keywords = other_validation["keywords"]
                        if any(keyword in content_lower for keyword in other_keywords):
                            suggestions.append(f"{other_domain}.{other_area}")
                            break  # Only suggest one from each domain

        issues = [
            f"Content doesn't seem to match {domain}.{area} based on semantic analysis"
        ]

        return {
            "valid": False,
            "confidence": 0.3,
            "issues": issues,
            "suggestions": suggestions[:3],  # Limit to top 3 suggestions
        }

    def track_classification(
        self, path: str, content: str, metadata: dict | None = None
    ) -> bool:
        """
        Track a classification result and trigger expansion if needed.

        This method should be called by the semantic_classifier whenever
        content is classified to help the iterative taxonomy learn and expand.

        Args:
            path: The classified path
            content: The content that was classified
            metadata: Optional metadata about the classification

        Returns:
            True if expansion was triggered, False otherwise
        """
        import time

        # Validate domain consistency first
        validation = self.validate_domain_consistency(path, content)
        if not validation["valid"] and validation["suggestions"]:
            logger.warning(
                f"Domain consistency issue for path '{path}': {validation['issues'][0]}. "
                f"Suggested alternatives: {', '.join(validation['suggestions'])}"
            )
            # Add to metadata for tracking
            if metadata is None:
                metadata = {}
            metadata["domain_validation"] = validation

        # Find the node for this path
        node = self.path_index.get(path)
        if not node:
            return False

        # If this is an 'other' path, track the item for future expansion
        if path.endswith(".other"):
            if not hasattr(node, "other_items"):
                node.other_items = []

            # Add item with metadata
            item_data = {
                "content": content,
                "timestamp": time.time(),
                "metadata": metadata or {},
            }
            node.other_items.append(item_data)

            # Check if we should trigger expansion
            if len(node.other_items) >= self.min_items_threshold:
                # Mark for expansion
                if path not in self.active_expansions:
                    logger.info(
                        f"Path {path} ready for expansion with {len(node.other_items)} items"
                    )
                return True

        return False

    def get_classification_hints(self, content: str) -> dict[str, Any]:
        """
        Get hints for better classification based on similar content in 'other' paths.

        This helps the semantic_classifier make better decisions by learning
        from previously unclassified content.

        Args:
            content: Content to get hints for

        Returns:
            Dictionary with classification hints
        """
        hints = {
            "suggested_paths": [],
            "avoid_paths": [],
            "similar_content": [],
            "expansion_candidates": [],
        }

        content_lower = content.lower()

        # Look through 'other' paths for similar content
        for path, node in self.path_index.items():
            if path.endswith(".other") and hasattr(node, "other_items"):
                for item in node.other_items:
                    item_content = item.get("content", "").lower()

                    # Simple similarity check
                    common_words = set(content_lower.split()) & set(
                        item_content.split()
                    )
                    if len(common_words) >= 2:  # At least 2 common words
                        hints["similar_content"].append(
                            {
                                "path": path,
                                "content": item.get("content"),
                                "similarity": len(common_words),
                            }
                        )

                        # Suggest the parent path instead of 'other'
                        parent_path = ".".join(path.split(".")[:-1])
                        if parent_path and parent_path not in hints["suggested_paths"]:
                            hints["suggested_paths"].append(parent_path)

                # Mark paths with many items as expansion candidates
                if len(node.other_items) >= self.min_items_threshold - 1:
                    hints["expansion_candidates"].append(
                        {"path": path, "item_count": len(node.other_items)}
                    )

        return hints

    def get_taxonomy_info(self) -> dict[str, Any]:
        """Get information about the current taxonomy configuration."""
        return {
            "version": self.taxonomy_version.value,
            "first_level_categories": TaxonomyPresets.get_first_level_categories(
                self.taxonomy_version
            ),
            "use_full_base": self.use_full_base_taxonomy,
            "expansion_strategy": self.expansion_strategy.value,
            "min_items_threshold": self.min_items_threshold,
            "max_categories_per_expansion": self.max_categories_per_expansion,
        }

    def get_expansion_statistics(self) -> dict[str, Any]:
        """Get detailed statistics about expansions."""
        stats = {
            "taxonomy_version": self.taxonomy_version.value,
            "total_paths": len(self.path_index),
            "dynamic_paths": sum(1 for n in self.path_index.values() if n.is_dynamic),
            "expansion_history": len(self.expansion_history),
            "active_expansions": len(self.active_expansions),
            "total_migrated": sum(r.migrated_items for r in self.expansion_history),
            "combinations_applied": len(self.combinations),
            "depth_distribution": defaultdict(int),
            "items_in_other": 0,
        }

        for node in self.path_index.values():
            stats["depth_distribution"][node.depth] += 1
            if node.path.endswith(".other") and hasattr(node, "other_items"):
                stats["items_in_other"] += len(node.other_items)

        return stats

    async def classify_with_confidence(
        self,
        content: str,
        metadata: dict | None = None,
        confidence_threshold: float = 0.6,
    ) -> dict[str, Any]:
        """
        Classify content and return classification with confidence and expansion recommendations.

        Args:
            content: Content to classify
            metadata: Optional metadata
            confidence_threshold: Minimum confidence for accepting classification

        Returns:
            Dictionary with classification results and recommendations
        """
        if not self.llm:
            # Fallback to basic pattern matching
            return {
                "is_memory": True,
                "path": "context.general",
                "confidence": 0.5,
                "reasoning": "Basic fallback classification",
                "needs_expansion": False,
                "suggested_action": "classify",
            }

        # Get current taxonomy structure for LLM context
        structure = self._get_taxonomy_structure_for_llm()

        # Build classification prompt
        prompt = self._build_classification_prompt_with_structure(
            content, structure, metadata
        )

        try:
            response = await self.llm.ainvoke(prompt)
            result = self._parse_classification_with_confidence(response)

            # Check if expansion is needed
            if result["confidence"] < confidence_threshold and result["is_memory"]:
                result["needs_expansion"] = True
                result["suggested_action"] = "expand"

                # Get expansion suggestions
                expansion_suggestion = await self._suggest_expansion_for_low_confidence(
                    content, result["path"], metadata
                )
                result.update(expansion_suggestion)
            else:
                result["needs_expansion"] = False
                result["suggested_action"] = (
                    "classify" if result["is_memory"] else "skip"
                )

            return result

        except Exception as e:
            logger.error(f"Classification with confidence failed: {e}")
            return {
                "is_memory": False,
                "path": None,
                "confidence": 0.0,
                "reasoning": f"Classification failed: {e!s}",
                "needs_expansion": False,
                "suggested_action": "skip",
            }

    def _get_taxonomy_structure_for_llm(self) -> dict:
        """Get taxonomy structure optimized for LLM context."""
        # Get hierarchical structure
        structure = {}
        for path in self.get_all_paths():
            if path.endswith(".other"):
                continue  # Skip 'other' paths in structure

            parts = path.split(".")
            current = structure

            for i, part in enumerate(parts):
                if part not in current:
                    current[part] = {} if i < len(parts) - 1 else None
                current = current[part] if current[part] is not None else {}

        return {
            "version": self.taxonomy_version.value,
            "structure": structure,
            "sample_paths": [
                p for p in self.get_all_paths() if not p.endswith(".other")
            ][:20],
            "total_categories": len(
                [p for p in self.get_all_paths() if not p.endswith(".other")]
            ),
        }

    def _build_classification_prompt_with_structure(
        self, content: str, structure: dict, metadata: dict | None
    ) -> str:
        """Build classification prompt with full taxonomy structure."""
        prompt_parts = [
            "You are an intelligent memory classifier. Analyze the following content and determine:",
            "1. Is this information worth storing as a memory? (true/false)",
            "2. If yes, which taxonomy path best fits this content?",
            "3. What is your confidence in this classification (0.0 to 1.0)?",
            "",
            f"Content to analyze: {content}",
        ]

        if metadata:
            prompt_parts.append(f"Metadata: {json.dumps(metadata)}")

        prompt_parts.extend(
            [
                "",
                f"Current taxonomy version: {structure['version']}",
                f"Total available categories: {structure['total_categories']}",
                "",
                "Sample available paths:",
            ]
        )

        for path in structure["sample_paths"][:15]:
            prompt_parts.append(f"  - {path}")

        if len(structure["sample_paths"]) > 15:
            prompt_parts.append(f"  ... and {len(structure['sample_paths']) - 15} more")

        prompt_parts.extend(
            [
                "",
                "Guidelines:",
                "- Only classify as memory if the content has lasting value",
                "- Choose the most specific appropriate path",
                "- If unsure between paths, prefer higher-level categories",
                "- Confidence should reflect how well the content fits the chosen path",
                "",
                "Respond in JSON format:",
                "{",
                '  "is_memory": true/false,',
                '  "path": "best.matching.path" or null,',
                '  "confidence": 0.0-1.0,',
                '  "reasoning": "explanation of decision"',
                "}",
            ]
        )

        return "\n".join(prompt_parts)

    def _parse_classification_with_confidence(self, response: Any) -> dict:
        """Parse LLM classification response with confidence."""
        try:
            if hasattr(response, "content"):
                content = response.content
            else:
                content = str(response)

            # Extract JSON from response
            import re

            json_match = re.search(r"\{[^{}]*\}", content, re.DOTALL)
            if json_match:
                data = json.loads(json_match.group())
                return {
                    "is_memory": data.get("is_memory", False),
                    "path": data.get("path"),
                    "confidence": float(data.get("confidence", 0.0)),
                    "reasoning": data.get("reasoning", ""),
                }

        except Exception as e:
            logger.error(f"Failed to parse classification response: {e}")

        return {
            "is_memory": False,
            "path": None,
            "confidence": 0.0,
            "reasoning": "Failed to parse classification response",
        }

    async def _suggest_expansion_for_low_confidence(
        self, content: str, path: str, metadata: dict | None
    ) -> dict:
        """Suggest expansion options for low confidence classification."""
        if not path:
            return {"expansion_suggestions": [], "use_parent": False}

        prompt_parts = [
            f"Content '{content}' was classified to '{path}' with low confidence.",
            "",
            "Should we:",
            "1. Expand the taxonomy with more specific subcategories",
            "2. Use a more general parent category",
            "3. Create new categories at the same level",
            "",
            "Consider the content specificity and taxonomy depth.",
            "",
            "Respond in JSON:",
            "{",
            '  "action": "expand" | "use_parent" | "same_level",',
            '  "reasoning": "explanation",',
            '  "suggested_categories": ["category1", "category2"] (if expanding),',
            '  "parent_path": "parent.path" (if using parent)',
            "}",
        ]

        try:
            response = await self.llm.ainvoke("\n".join(prompt_parts))

            if hasattr(response, "content"):
                content = response.content
            else:
                content = str(response)

            import re

            json_match = re.search(r"\{[^{}]*\}", content, re.DOTALL)
            if json_match:
                data = json.loads(json_match.group())
                return {
                    "expansion_action": data.get("action", "expand"),
                    "expansion_reasoning": data.get("reasoning", ""),
                    "suggested_categories": data.get("suggested_categories", []),
                    "parent_path": data.get("parent_path"),
                }

        except Exception as e:
            logger.error(f"Expansion suggestion failed: {e}")

        return {
            "expansion_action": "expand",
            "expansion_reasoning": "Default expansion due to low confidence",
            "suggested_categories": [],
            "parent_path": None,
        }

__init__

__init__(taxonomy_version: TaxonomyVersion = TaxonomyVersion.GENERAL, base_taxonomy: SemanticTaxonomy | None = None, llm: Any | None = None, expansion_strategy: LLMExpansionStrategy = LLMExpansionStrategy.FOCUSED_SUBTREE, min_items_threshold: int = MIN_ITEMS_FOR_EXPANSION, enable_combinations: bool = True, max_categories_per_expansion: int = MAX_CATEGORIES_PER_EXPANSION, use_full_base_taxonomy: bool = False)

Initialize LLM-driven iterative taxonomy.

Parameters:

Name Type Description Default
taxonomy_version TaxonomyVersion

The taxonomy preset version to use (e.g., GENERAL, AGENT_CONVERSATION)

GENERAL
base_taxonomy SemanticTaxonomy | None

Optional custom taxonomy structure (overrides taxonomy_version if provided)

None
llm Any | None

Language model for expansion (GPT-4 recommended)

None
expansion_strategy LLMExpansionStrategy

Strategy for taxonomy expansion

FOCUSED_SUBTREE
min_items_threshold int

Minimum items before triggering expansion

MIN_ITEMS_FOR_EXPANSION
enable_combinations bool

Enable pattern-based combinations

True
max_categories_per_expansion int

Maximum categories to suggest per LLM expansion (default: 10)

MAX_CATEGORIES_PER_EXPANSION
use_full_base_taxonomy bool

If True, imports full taxonomy hierarchy; if False, only first level

False
Source code in src/memoir/taxonomy/iterative.py
def __init__(
    self,
    taxonomy_version: TaxonomyVersion = TaxonomyVersion.GENERAL,
    base_taxonomy: SemanticTaxonomy | None = None,
    llm: Any | None = None,
    expansion_strategy: LLMExpansionStrategy = LLMExpansionStrategy.FOCUSED_SUBTREE,
    min_items_threshold: int = MIN_ITEMS_FOR_EXPANSION,
    enable_combinations: bool = True,
    max_categories_per_expansion: int = MAX_CATEGORIES_PER_EXPANSION,
    use_full_base_taxonomy: bool = False,
):
    """
    Initialize LLM-driven iterative taxonomy.

    Args:
        taxonomy_version: The taxonomy preset version to use (e.g., GENERAL, AGENT_CONVERSATION)
        base_taxonomy: Optional custom taxonomy structure (overrides taxonomy_version if provided)
        llm: Language model for expansion (GPT-4 recommended)
        expansion_strategy: Strategy for taxonomy expansion
        min_items_threshold: Minimum items before triggering expansion
        enable_combinations: Enable pattern-based combinations
        max_categories_per_expansion: Maximum categories to suggest per LLM expansion (default: 10)
        use_full_base_taxonomy: If True, imports full taxonomy hierarchy; if False, only first level
    """
    self.taxonomy_version = taxonomy_version
    self.base_taxonomy = base_taxonomy
    self.use_full_base_taxonomy = use_full_base_taxonomy
    self.llm = llm
    self.expansion_strategy = expansion_strategy
    self.min_items_threshold = min_items_threshold
    self.enable_combinations = enable_combinations
    self.max_categories_per_expansion = max_categories_per_expansion

    # Build initial structure
    self.root = self._build_initial_tree()
    self.path_index: dict[str, DynamicNode] = {}
    self._rebuild_index()

    # Track expansions and combinations
    self.expansion_history: list[TaxonomyExpansionResult] = []
    self.active_expansions: set[str] = set()  # Paths being expanded
    self.combinations: list[TaxonomyCombination] = []

    # Expansion queue for parallel processing
    self.expansion_queue: asyncio.Queue = None
    self.expansion_workers: list[asyncio.Task] = []

expand_subtree_with_llm async

expand_subtree_with_llm(node_path: str, focus_depth: int | None = None) -> TaxonomyExpansionResult

Expand a subtree using LLM-driven analysis. Implements the paper's focused subtree expansion approach.

Parameters:

Name Type Description Default
node_path str

Path to the node to expand

required
focus_depth int | None

Optional depth limit for expansion

None

Returns:

Type Description
TaxonomyExpansionResult

TaxonomyExpansionResult with expansion details

Source code in src/memoir/taxonomy/iterative.py
async def expand_subtree_with_llm(
    self, node_path: str, focus_depth: int | None = None
) -> TaxonomyExpansionResult:
    """
    Expand a subtree using LLM-driven analysis.
    Implements the paper's focused subtree expansion approach.

    Args:
        node_path: Path to the node to expand
        focus_depth: Optional depth limit for expansion

    Returns:
        TaxonomyExpansionResult with expansion details
    """
    if node_path not in self.path_index:
        return TaxonomyExpansionResult(
            parent_path=node_path,
            new_paths=[],
            migrated_items=0,
            confidence=0.0,
            strategy=self.expansion_strategy.value,
            reasoning="Node not found",
            timestamp=time.time(),
        )

    node = self.path_index[node_path]

    # Check if enough items for expansion
    if len(node.other_items) < self.min_items_threshold:
        return TaxonomyExpansionResult(
            parent_path=node_path,
            new_paths=[],
            migrated_items=0,
            confidence=0.0,
            strategy=self.expansion_strategy.value,
            reasoning=f"Insufficient items ({len(node.other_items)} < {self.min_items_threshold})",
            timestamp=time.time(),
        )

    # Mark as active expansion
    self.active_expansions.add(node_path)

    try:
        # Build expansion context
        context = self._build_expansion_context(node)

        # Generate categories using LLM
        new_categories = await self._generate_categories_with_llm(context)

        # Create new nodes
        new_paths = []
        for category in new_categories:
            new_path = f"{node_path}.{category}".lstrip(".")
            if new_path not in self.path_index:
                self._add_path_to_tree(self.root, new_path, is_dynamic=True)
                new_paths.append(new_path)

                # Add 'other' subcategory if at appropriate depth
                if node.depth < MAX_DEPTH - 2:
                    other_subpath = f"{new_path}.other"
                    self._add_path_to_tree(
                        self.root, other_subpath, is_dynamic=True
                    )

        # Rebuild index
        self._rebuild_index()

        # Reclassify and migrate items
        migrated_count = await self._reclassify_items(node, new_paths)

        result = TaxonomyExpansionResult(
            parent_path=node_path,
            new_paths=new_paths,
            migrated_items=migrated_count,
            confidence=0.8,  # Default confidence for LLM expansion
            strategy=self.expansion_strategy.value,
            reasoning=f"LLM-driven expansion created {len(new_paths)} categories from {len(node.other_items)} items",
            timestamp=time.time(),
        )

        self.expansion_history.append(result)
        return result

    finally:
        self.active_expansions.discard(node_path)

suggest_intermediate_levels

suggest_intermediate_levels(path: str, content: str) -> dict

Dynamically suggest intermediate levels based on existing taxonomy structure and content analysis.

Parameters:

Name Type Description Default
path str

Current taxonomy path

required
content str

Content being classified

required

Returns:

Type Description
dict

Dict with intermediate level suggestions

Source code in src/memoir/taxonomy/iterative.py
def suggest_intermediate_levels(self, path: str, content: str) -> dict:
    """
    Dynamically suggest intermediate levels based on existing taxonomy structure and content analysis.

    Args:
        path: Current taxonomy path
        content: Content being classified

    Returns:
        Dict with intermediate level suggestions
    """
    path_parts = path.split(".")
    suggestions = []
    reasoning = ""

    # Only suggest intermediates for shallow paths (depth 1-2)
    if len(path_parts) <= 2:
        # Analyze existing taxonomy to find common intermediate patterns
        intermediate_analysis = self._analyze_intermediate_patterns(path, content)

        suggestions = intermediate_analysis["suggestions"]
        reasoning = intermediate_analysis["reasoning"]

    return {
        "suggestions": suggestions,
        "reasoning": reasoning,
    }

analyze_path_quality

analyze_path_quality(path: str, content: str) -> dict

Comprehensive analysis of classification path quality.

Parameters:

Name Type Description Default
path str

Taxonomy path to analyze

required
content str

Content being classified

required

Returns:

Type Description
dict

Dict with comprehensive quality analysis

Source code in src/memoir/taxonomy/iterative.py
def analyze_path_quality(self, path: str, content: str) -> dict:
    """
    Comprehensive analysis of classification path quality.

    Args:
        path: Taxonomy path to analyze
        content: Content being classified

    Returns:
        Dict with comprehensive quality analysis
    """
    analysis = {
        "overall_score": 0.0,
        "domain_consistency": {},
        "intermediate_suggestions": {},
        "depth_analysis": {},
        "recommendations": [],
    }

    # 1. Domain consistency analysis
    domain_validation = self.validate_domain_consistency(path, content)
    analysis["domain_consistency"] = domain_validation

    # 2. Intermediate level suggestions
    intermediate_analysis = self.suggest_intermediate_levels(path, content)
    analysis["intermediate_suggestions"] = intermediate_analysis

    # 3. Depth analysis
    path_parts = path.split(".")
    depth = len(path_parts)

    analysis["depth_analysis"] = {
        "current_depth": depth,
        "optimal_range": "2-4 levels",
        "is_optimal": 2 <= depth <= 4,
        "issues": [],
    }

    if depth == 1:
        analysis["depth_analysis"]["issues"].append(
            "Too broad - needs more specificity"
        )
    elif depth > 4:
        analysis["depth_analysis"]["issues"].append(
            "Too deep - may be overly specific"
        )

    # 4. Calculate overall score
    score = 0.0

    # Domain consistency (40% weight)
    if domain_validation["valid"]:
        score += 0.4 * domain_validation["confidence"]

    # Depth appropriateness (30% weight)
    if 2 <= depth <= 4:
        score += 0.3
    elif depth == 1:
        score += 0.1  # Very broad
    elif depth > 4:
        score += 0.2  # Too specific

    # Path completeness (30% weight)
    if intermediate_analysis["suggestions"]:
        score += 0.1  # Some issues but fixable
    else:
        score += 0.3  # No obvious missing levels

    analysis["overall_score"] = min(1.0, score)

    # 5. Generate recommendations
    recommendations = []

    if not domain_validation["valid"]:
        recommendations.append(
            f"Domain mismatch detected. Consider: {', '.join(domain_validation['suggestions'][:2])}"
        )

    if intermediate_analysis["suggestions"]:
        recommendations.append(
            f"Add intermediate level: {intermediate_analysis['suggestions'][0]}"
        )

    if depth == 1:
        recommendations.append(
            "Classification too broad - add more specific categories"
        )
    elif depth > 4:
        recommendations.append(
            "Classification too specific - consider using parent category"
        )

    analysis["recommendations"] = recommendations

    return analysis

parallel_expand async

parallel_expand(target_paths: list[str] | None = None) -> list[TaxonomyExpansionResult]

Perform parallel expansion of multiple subtrees. Implements the paper's approach of concurrent work on different branches.

Parameters:

Name Type Description Default
target_paths list[str] | None

Specific paths to expand, or None for automatic selection

None

Returns:

Type Description
list[TaxonomyExpansionResult]

List of expansion results

Source code in src/memoir/taxonomy/iterative.py
async def parallel_expand(
    self, target_paths: list[str] | None = None
) -> list[TaxonomyExpansionResult]:
    """
    Perform parallel expansion of multiple subtrees.
    Implements the paper's approach of concurrent work on different branches.

    Args:
        target_paths: Specific paths to expand, or None for automatic selection

    Returns:
        List of expansion results
    """
    if not target_paths:
        target_paths = self._select_expansion_targets()

    # Limit parallel expansions
    target_paths = target_paths[:PARALLEL_EXPANSION_LIMIT]

    # Create expansion tasks
    tasks = []
    for path in target_paths:
        if path not in self.active_expansions:
            task = asyncio.create_task(self.expand_subtree_with_llm(path))
            tasks.append(task)

    # Wait for all expansions
    results = await asyncio.gather(*tasks)

    return results

apply_combinations

apply_combinations(combination: TaxonomyCombination) -> list[str]

Apply pattern-based combinations to reduce redundancy. Implements the paper's combination approach.

Parameters:

Name Type Description Default
combination TaxonomyCombination

Pattern combination to apply

required

Returns:

Type Description
list[str]

List of newly created combination paths

Source code in src/memoir/taxonomy/iterative.py
def apply_combinations(self, combination: TaxonomyCombination) -> list[str]:
    """
    Apply pattern-based combinations to reduce redundancy.
    Implements the paper's combination approach.

    Args:
        combination: Pattern combination to apply

    Returns:
        List of newly created combination paths
    """
    if not self.enable_combinations:
        return []

    new_paths = []

    # Parse combination pattern (e.g., "Location + Domain")
    parts = combination.pattern.split(" + ")
    if len(parts) != 2:
        return []

    category1, category2 = parts[0].strip(), parts[1].strip()

    # Find matching paths
    paths1 = [p for p in self.path_index if category1.lower() in p.lower()]
    paths2 = [p for p in self.path_index if category2.lower() in p.lower()]

    # Create combinations
    for path1 in paths1[:10]:  # Limit combinations
        for path2 in paths2[:10]:
            # Extract relevant parts
            loc_part = path1.split(".")[-1]
            dom_part = path2.split(".")[-1]

            # Apply template
            combined = combination.template.format(
                location=loc_part, domain=dom_part
            )

            # Create new path
            new_path = f"combined.{combined.replace(' ', '_').lower()}"
            if new_path not in self.path_index:
                self._add_path_to_tree(self.root, new_path, is_dynamic=True)
                new_paths.append(new_path)

    # Rebuild index
    self._rebuild_index()

    # Track combination
    self.combinations.append(combination)

    return new_paths

is_valid_path

is_valid_path(path: str) -> bool

Check if a path exists in the taxonomy.

Source code in src/memoir/taxonomy/iterative.py
def is_valid_path(self, path: str) -> bool:
    """Check if a path exists in the taxonomy."""
    return path in self.path_index

get_all_paths

get_all_paths() -> list[str]

Get all available paths in the taxonomy.

Source code in src/memoir/taxonomy/iterative.py
def get_all_paths(self) -> list[str]:
    """Get all available paths in the taxonomy."""
    return list(self.path_index.keys())

export_for_llm

export_for_llm() -> str

Export taxonomy in a format suitable for LLM context. Follows the paper's approach for maintaining taxonomy in GPT-4 context.

Source code in src/memoir/taxonomy/iterative.py
def export_for_llm(self) -> str:
    """
    Export taxonomy in a format suitable for LLM context.
    Follows the paper's approach for maintaining taxonomy in GPT-4 context.
    """

    def node_to_dict(node: DynamicNode, max_depth: int = 5) -> dict[str, Any]:
        if node.depth >= max_depth or not node.children:
            return {"path": node.path, "item_count": node.item_count}

        return {
            "path": node.path,
            "children": {
                name: node_to_dict(child, max_depth)
                for name, child in node.children.items()
                if not name.endswith("other")  # Exclude 'other' for clarity
            },
        }

    taxonomy_dict = node_to_dict(self.root)
    return json.dumps(taxonomy_dict, indent=2)

validate_domain_consistency

validate_domain_consistency(path: str, content: str) -> dict

Validate if content semantically belongs in the domain/area of the given path.

Parameters:

Name Type Description Default
path str

The taxonomy path to validate

required
content str

The content being classified

required

Returns:

Type Description
dict

Dict with validation results and suggestions

Source code in src/memoir/taxonomy/iterative.py
def validate_domain_consistency(self, path: str, content: str) -> dict:
    """
    Validate if content semantically belongs in the domain/area of the given path.

    Args:
        path: The taxonomy path to validate
        content: The content being classified

    Returns:
        Dict with validation results and suggestions
    """
    path_parts = path.split(".")
    if len(path_parts) < 2:
        return {"valid": True, "confidence": 1.0, "issues": [], "suggestions": []}

    domain = path_parts[0]
    area = path_parts[1]
    content_lower = content.lower()

    # Use dynamic validation combining core rules with learned patterns
    validation_result = self._validate_with_learned_patterns(
        domain, area, content_lower
    )

    if validation_result["has_rules"]:
        area_keywords = validation_result["keywords"]
    else:
        # No specific rules - use taxonomy structure analysis
        return self._validate_with_structure_analysis(path, content)

    # Check if content contains keywords relevant to this area
    content_matches_area = any(
        keyword in content_lower for keyword in area_keywords
    )

    if content_matches_area:
        return {"valid": True, "confidence": 0.9, "issues": [], "suggestions": []}

    # Content doesn't match - find better alternatives using dynamic analysis
    suggestions = []

    # Check other areas in same domain
    domain_analysis = self._analyze_domain_patterns(domain)
    for other_area in domain_analysis["sibling_areas"]:
        if other_area != area:
            other_validation = self._validate_with_learned_patterns(
                domain, other_area, content_lower
            )
            if other_validation["has_rules"]:
                other_keywords = other_validation["keywords"]
                if any(keyword in content_lower for keyword in other_keywords):
                    suggestions.append(f"{domain}.{other_area}")

    # Check if content might belong to different domain entirely
    all_paths = self.get_all_paths()
    domains = list({p.split(".")[0] for p in all_paths if "." in p})

    for other_domain in domains:
        if other_domain != domain:
            other_domain_analysis = self._analyze_domain_patterns(other_domain)
            for other_area in other_domain_analysis["sibling_areas"]:
                other_validation = self._validate_with_learned_patterns(
                    other_domain, other_area, content_lower
                )
                if other_validation["has_rules"]:
                    other_keywords = other_validation["keywords"]
                    if any(keyword in content_lower for keyword in other_keywords):
                        suggestions.append(f"{other_domain}.{other_area}")
                        break  # Only suggest one from each domain

    issues = [
        f"Content doesn't seem to match {domain}.{area} based on semantic analysis"
    ]

    return {
        "valid": False,
        "confidence": 0.3,
        "issues": issues,
        "suggestions": suggestions[:3],  # Limit to top 3 suggestions
    }

track_classification

track_classification(path: str, content: str, metadata: dict | None = None) -> bool

Track a classification result and trigger expansion if needed.

This method should be called by the semantic_classifier whenever content is classified to help the iterative taxonomy learn and expand.

Parameters:

Name Type Description Default
path str

The classified path

required
content str

The content that was classified

required
metadata dict | None

Optional metadata about the classification

None

Returns:

Type Description
bool

True if expansion was triggered, False otherwise

Source code in src/memoir/taxonomy/iterative.py
def track_classification(
    self, path: str, content: str, metadata: dict | None = None
) -> bool:
    """
    Track a classification result and trigger expansion if needed.

    This method should be called by the semantic_classifier whenever
    content is classified to help the iterative taxonomy learn and expand.

    Args:
        path: The classified path
        content: The content that was classified
        metadata: Optional metadata about the classification

    Returns:
        True if expansion was triggered, False otherwise
    """
    import time

    # Validate domain consistency first
    validation = self.validate_domain_consistency(path, content)
    if not validation["valid"] and validation["suggestions"]:
        logger.warning(
            f"Domain consistency issue for path '{path}': {validation['issues'][0]}. "
            f"Suggested alternatives: {', '.join(validation['suggestions'])}"
        )
        # Add to metadata for tracking
        if metadata is None:
            metadata = {}
        metadata["domain_validation"] = validation

    # Find the node for this path
    node = self.path_index.get(path)
    if not node:
        return False

    # If this is an 'other' path, track the item for future expansion
    if path.endswith(".other"):
        if not hasattr(node, "other_items"):
            node.other_items = []

        # Add item with metadata
        item_data = {
            "content": content,
            "timestamp": time.time(),
            "metadata": metadata or {},
        }
        node.other_items.append(item_data)

        # Check if we should trigger expansion
        if len(node.other_items) >= self.min_items_threshold:
            # Mark for expansion
            if path not in self.active_expansions:
                logger.info(
                    f"Path {path} ready for expansion with {len(node.other_items)} items"
                )
            return True

    return False

get_classification_hints

get_classification_hints(content: str) -> dict[str, Any]

Get hints for better classification based on similar content in 'other' paths.

This helps the semantic_classifier make better decisions by learning from previously unclassified content.

Parameters:

Name Type Description Default
content str

Content to get hints for

required

Returns:

Type Description
dict[str, Any]

Dictionary with classification hints

Source code in src/memoir/taxonomy/iterative.py
def get_classification_hints(self, content: str) -> dict[str, Any]:
    """
    Get hints for better classification based on similar content in 'other' paths.

    This helps the semantic_classifier make better decisions by learning
    from previously unclassified content.

    Args:
        content: Content to get hints for

    Returns:
        Dictionary with classification hints
    """
    hints = {
        "suggested_paths": [],
        "avoid_paths": [],
        "similar_content": [],
        "expansion_candidates": [],
    }

    content_lower = content.lower()

    # Look through 'other' paths for similar content
    for path, node in self.path_index.items():
        if path.endswith(".other") and hasattr(node, "other_items"):
            for item in node.other_items:
                item_content = item.get("content", "").lower()

                # Simple similarity check
                common_words = set(content_lower.split()) & set(
                    item_content.split()
                )
                if len(common_words) >= 2:  # At least 2 common words
                    hints["similar_content"].append(
                        {
                            "path": path,
                            "content": item.get("content"),
                            "similarity": len(common_words),
                        }
                    )

                    # Suggest the parent path instead of 'other'
                    parent_path = ".".join(path.split(".")[:-1])
                    if parent_path and parent_path not in hints["suggested_paths"]:
                        hints["suggested_paths"].append(parent_path)

            # Mark paths with many items as expansion candidates
            if len(node.other_items) >= self.min_items_threshold - 1:
                hints["expansion_candidates"].append(
                    {"path": path, "item_count": len(node.other_items)}
                )

    return hints

get_taxonomy_info

get_taxonomy_info() -> dict[str, Any]

Get information about the current taxonomy configuration.

Source code in src/memoir/taxonomy/iterative.py
def get_taxonomy_info(self) -> dict[str, Any]:
    """Get information about the current taxonomy configuration."""
    return {
        "version": self.taxonomy_version.value,
        "first_level_categories": TaxonomyPresets.get_first_level_categories(
            self.taxonomy_version
        ),
        "use_full_base": self.use_full_base_taxonomy,
        "expansion_strategy": self.expansion_strategy.value,
        "min_items_threshold": self.min_items_threshold,
        "max_categories_per_expansion": self.max_categories_per_expansion,
    }

get_expansion_statistics

get_expansion_statistics() -> dict[str, Any]

Get detailed statistics about expansions.

Source code in src/memoir/taxonomy/iterative.py
def get_expansion_statistics(self) -> dict[str, Any]:
    """Get detailed statistics about expansions."""
    stats = {
        "taxonomy_version": self.taxonomy_version.value,
        "total_paths": len(self.path_index),
        "dynamic_paths": sum(1 for n in self.path_index.values() if n.is_dynamic),
        "expansion_history": len(self.expansion_history),
        "active_expansions": len(self.active_expansions),
        "total_migrated": sum(r.migrated_items for r in self.expansion_history),
        "combinations_applied": len(self.combinations),
        "depth_distribution": defaultdict(int),
        "items_in_other": 0,
    }

    for node in self.path_index.values():
        stats["depth_distribution"][node.depth] += 1
        if node.path.endswith(".other") and hasattr(node, "other_items"):
            stats["items_in_other"] += len(node.other_items)

    return stats

classify_with_confidence async

classify_with_confidence(content: str, metadata: dict | None = None, confidence_threshold: float = 0.6) -> dict[str, Any]

Classify content and return classification with confidence and expansion recommendations.

Parameters:

Name Type Description Default
content str

Content to classify

required
metadata dict | None

Optional metadata

None
confidence_threshold float

Minimum confidence for accepting classification

0.6

Returns:

Type Description
dict[str, Any]

Dictionary with classification results and recommendations

Source code in src/memoir/taxonomy/iterative.py
async def classify_with_confidence(
    self,
    content: str,
    metadata: dict | None = None,
    confidence_threshold: float = 0.6,
) -> dict[str, Any]:
    """
    Classify content and return classification with confidence and expansion recommendations.

    Args:
        content: Content to classify
        metadata: Optional metadata
        confidence_threshold: Minimum confidence for accepting classification

    Returns:
        Dictionary with classification results and recommendations
    """
    if not self.llm:
        # Fallback to basic pattern matching
        return {
            "is_memory": True,
            "path": "context.general",
            "confidence": 0.5,
            "reasoning": "Basic fallback classification",
            "needs_expansion": False,
            "suggested_action": "classify",
        }

    # Get current taxonomy structure for LLM context
    structure = self._get_taxonomy_structure_for_llm()

    # Build classification prompt
    prompt = self._build_classification_prompt_with_structure(
        content, structure, metadata
    )

    try:
        response = await self.llm.ainvoke(prompt)
        result = self._parse_classification_with_confidence(response)

        # Check if expansion is needed
        if result["confidence"] < confidence_threshold and result["is_memory"]:
            result["needs_expansion"] = True
            result["suggested_action"] = "expand"

            # Get expansion suggestions
            expansion_suggestion = await self._suggest_expansion_for_low_confidence(
                content, result["path"], metadata
            )
            result.update(expansion_suggestion)
        else:
            result["needs_expansion"] = False
            result["suggested_action"] = (
                "classify" if result["is_memory"] else "skip"
            )

        return result

    except Exception as e:
        logger.error(f"Classification with confidence failed: {e}")
        return {
            "is_memory": False,
            "path": None,
            "confidence": 0.0,
            "reasoning": f"Classification failed: {e!s}",
            "needs_expansion": False,
            "suggested_action": "skip",
        }

memoir.taxonomy.taxonomy module

memoir.taxonomy.taxonomy

Taxonomy presets - FALLBACK DATA ONLY.

IMPORTANT: This hardcoded data exists solely as a fallback when: 1. No TaxonomyLoader is provided to the classifier/search engine 2. The store has not been initialized with taxonomy data

The canonical source of taxonomy data is the markdown files in

src/memoir/taxonomy/data/general/*.md

These markdown files are loaded via TaxonomyLoader into the store. The hardcoded data below should be kept minimal and may be removed in a future version once store-based taxonomy loading is mandatory.

To use store-based taxonomy (recommended): taxonomy_loader = TaxonomyLoader(store) taxonomy_loader.init_store(include_builtin=True) classifier = IntelligentClassifier(llm=llm, taxonomy_loader=taxonomy_loader)

TaxonomyVersion

Bases: Enum

Available taxonomy versions.

Source code in src/memoir/taxonomy/taxonomy.py
class TaxonomyVersion(Enum):
    """Available taxonomy versions."""

    GENERAL = "general"
    SIMPLIFIED = "simplified"

TaxonomyPresets

Minimal fallback taxonomy data.

WARNING: This is fallback data only. Use TaxonomyLoader for full taxonomy. See module docstring for details.

Source code in src/memoir/taxonomy/taxonomy.py
class TaxonomyPresets:
    """
    Minimal fallback taxonomy data.

    WARNING: This is fallback data only. Use TaxonomyLoader for full taxonomy.
    See module docstring for details.
    """

    # ==========================================================================
    # FALLBACK CLASSIFICATION EXAMPLES (minimal set)
    # Full examples are in: src/memoir/taxonomy/data/general/examples.md
    # ==========================================================================
    CLASSIFICATION_EXAMPLES: ClassVar[list[tuple[str, str, str]]] = [
        # Profile
        ("My name is Sarah", "profile.personal.identity", "identity"),
        ("I work as a software engineer", "profile.professional.occupation", "job"),
        # Preferences
        ("I prefer VS Code", "preferences.tools.editors", "tool preference"),
        ("I like Python", "preferences.coding.languages", "language preference"),
        # Context
        ("We use PostgreSQL", "context.project.database", "project context"),
        ("Our team does standups daily", "context.team.meetings", "team context"),
        # Experience
        ("I worked at Google for 3 years", "experience.work.jobs", "work history"),
        ("I built a REST API last month", "experience.work.projects", "project"),
        # Goals
        ("I want to learn Rust", "goals.learning.skills", "learning goal"),
        ("I aim to become a tech lead", "goals.career.advancement", "career goal"),
        # Relationships
        ("My manager is John", "relationships.professional.manager", "work relation"),
        ("I mentor two junior devs", "relationships.professional.mentees", "mentoring"),
        # Knowledge
        (
            "Python uses indentation for blocks",
            "knowledge.technical.languages",
            "tech fact",
        ),
        ("REST APIs use HTTP methods", "knowledge.technical.architecture", "tech fact"),
        # Behavior
        ("I usually code in the morning", "behavior.work.schedule", "work pattern"),
        ("I review PRs before lunch", "behavior.work.practices", "work habit"),
    ]

    # ==========================================================================
    # FALLBACK CATEGORY DESCRIPTIONS (8 main categories)
    # Full descriptions are in: src/memoir/taxonomy/data/general/descriptions.md
    # ==========================================================================
    CATEGORY_DESCRIPTIONS: ClassVar[dict[str, str]] = {
        "profile": "Personal facts: identity, demographics, job, education, skills",
        "preferences": "Likes/dislikes: tools, languages, frameworks, work style",
        "context": "Project/team info: tech stack, infrastructure, team roles",
        "experience": "Past events: work history, projects, achievements",
        "goals": "Aspirations: career, learning, projects, personal growth",
        "relationships": "People: colleagues, manager, mentors, mentees",
        "knowledge": "Facts learned: technical concepts, domain knowledge",
        "behavior": "Patterns: work habits, routines, practices",
    }

    # ==========================================================================
    # FALLBACK PRESET PATHS (minimal set for each category)
    # Full paths are in: src/memoir/taxonomy/data/general/presets.md
    # ==========================================================================
    PRESETS: ClassVar[dict[TaxonomyVersion, dict[str, list[str]]]] = {
        TaxonomyVersion.SIMPLIFIED: {
            "profile": [
                "personal.identity",
                "personal.demographics",
                "personal.location",
                "professional.occupation",
                "professional.education",
                "professional.skills",
            ],
            "preferences": [
                "tools.editors",
                "tools.testing",
                "coding.languages",
                "coding.frameworks",
                "work.environment",
                "work.schedule",
            ],
            "context": [
                "project.stack",
                "project.repository",
                "project.database",
                "team.methodology",
                "team.meetings",
                "team.roles",
            ],
            "experience": [
                "work.jobs",
                "work.projects",
                "education.schools",
                "education.courses",
            ],
            "goals": [
                "career.advancement",
                "career.skills",
                "learning.skills",
                "learning.certifications",
            ],
            "relationships": [
                "professional.manager",
                "professional.colleagues",
                "professional.mentees",
                "personal.family",
            ],
            "knowledge": [
                "technical.languages",
                "technical.architecture",
                "domain.business",
                "domain.industry",
            ],
            "behavior": [
                "work.schedule",
                "work.practices",
                "coding.habits",
                "communication.style",
            ],
        }
    }

    def get_paths_for_category(
        self, version: TaxonomyVersion, category: str
    ) -> list[str]:
        """Get all paths for a specific category."""
        if version not in self.PRESETS:
            raise ValueError(f"Unknown taxonomy version: {version}")

        category_paths = self.PRESETS[version].get(category, [])
        return [f"{category}.{path}" for path in category_paths]

    def get_all_paths(self, version: TaxonomyVersion) -> list[str]:
        """Get all taxonomy paths for a version."""
        if version not in self.PRESETS:
            raise ValueError(f"Unknown taxonomy version: {version}")

        all_paths = []
        for category, paths in self.PRESETS[version].items():
            for path in paths:
                full_path = f"{category}.{path}"
                all_paths.append(full_path)

        return sorted(all_paths)

    @classmethod
    def get_preset(cls, version: TaxonomyVersion) -> dict[str, list[str]]:
        """Get a taxonomy preset for a specific version."""
        return cls.PRESETS.get(version, cls.PRESETS[TaxonomyVersion.SIMPLIFIED]).copy()

    @classmethod
    def get_first_level_categories(cls, version: TaxonomyVersion) -> list[str]:
        """Get only the first-level categories for a taxonomy version."""
        preset = cls.get_preset(version)
        return list(preset.keys())

    @classmethod
    def list_versions(cls) -> list[TaxonomyVersion]:
        """List all available taxonomy versions."""
        return list(cls.PRESETS.keys())

get_paths_for_category

get_paths_for_category(version: TaxonomyVersion, category: str) -> list[str]

Get all paths for a specific category.

Source code in src/memoir/taxonomy/taxonomy.py
def get_paths_for_category(
    self, version: TaxonomyVersion, category: str
) -> list[str]:
    """Get all paths for a specific category."""
    if version not in self.PRESETS:
        raise ValueError(f"Unknown taxonomy version: {version}")

    category_paths = self.PRESETS[version].get(category, [])
    return [f"{category}.{path}" for path in category_paths]

get_all_paths

get_all_paths(version: TaxonomyVersion) -> list[str]

Get all taxonomy paths for a version.

Source code in src/memoir/taxonomy/taxonomy.py
def get_all_paths(self, version: TaxonomyVersion) -> list[str]:
    """Get all taxonomy paths for a version."""
    if version not in self.PRESETS:
        raise ValueError(f"Unknown taxonomy version: {version}")

    all_paths = []
    for category, paths in self.PRESETS[version].items():
        for path in paths:
            full_path = f"{category}.{path}"
            all_paths.append(full_path)

    return sorted(all_paths)

get_preset classmethod

get_preset(version: TaxonomyVersion) -> dict[str, list[str]]

Get a taxonomy preset for a specific version.

Source code in src/memoir/taxonomy/taxonomy.py
@classmethod
def get_preset(cls, version: TaxonomyVersion) -> dict[str, list[str]]:
    """Get a taxonomy preset for a specific version."""
    return cls.PRESETS.get(version, cls.PRESETS[TaxonomyVersion.SIMPLIFIED]).copy()

get_first_level_categories classmethod

get_first_level_categories(version: TaxonomyVersion) -> list[str]

Get only the first-level categories for a taxonomy version.

Source code in src/memoir/taxonomy/taxonomy.py
@classmethod
def get_first_level_categories(cls, version: TaxonomyVersion) -> list[str]:
    """Get only the first-level categories for a taxonomy version."""
    preset = cls.get_preset(version)
    return list(preset.keys())

list_versions classmethod

list_versions() -> list[TaxonomyVersion]

List all available taxonomy versions.

Source code in src/memoir/taxonomy/taxonomy.py
@classmethod
def list_versions(cls) -> list[TaxonomyVersion]:
    """List all available taxonomy versions."""
    return list(cls.PRESETS.keys())

memoir.taxonomy.loader module

memoir.taxonomy.loader

Unified taxonomy loader for services and applications.

Provides high-level API for loading taxonomy data from markdown files into the store, and reading from store for classifier/search operations.

TaxonomyLoader

High-level loader for consuming taxonomy data in services/apps.

Provides convenient methods for: - Loading taxonomy from markdown files (builtin or external) - Saving taxonomy data to the memoir store - Reading taxonomy from store (for classifier/search) - Formatting data for LLM prompts

Source code in src/memoir/taxonomy/loader.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class TaxonomyLoader:
    """
    High-level loader for consuming taxonomy data in services/apps.

    Provides convenient methods for:
    - Loading taxonomy from markdown files (builtin or external)
    - Saving taxonomy data to the memoir store
    - Reading taxonomy from store (for classifier/search)
    - Formatting data for LLM prompts
    """

    def __init__(self, store: Any = None):
        """Initialize the taxonomy loader.

        Args:
            store: ProllyTreeStore instance for persistence.
                   If None, store operations will raise errors.
        """
        self.store = store
        self.registry = TaxonomyRegistry()
        self.namespace = TAXONOMY_NAMESPACE
        self._parser = MarkdownTaxonomySource()

    # -------------------------------------------------------------------------
    # Loading from files to registry
    # -------------------------------------------------------------------------

    def load_builtin(self) -> list[str]:
        """Load all built-in taxonomy files into the registry.

        Returns:
            List of loaded taxonomy IDs.
        """
        return self.registry.load_builtin()

    def load_external(self, path: Path | str) -> str:
        """Load an external taxonomy file into the registry.

        Args:
            path: Path to the markdown file.

        Returns:
            ID of the loaded taxonomy.
        """
        return self.registry.load_external(path)

    # -------------------------------------------------------------------------
    # Saving to store
    # -------------------------------------------------------------------------

    def _ensure_store(self) -> None:
        """Ensure store is available."""
        if self.store is None:
            raise RuntimeError("Store not initialized. Pass store to TaxonomyLoader.")

    def save_to_store(self, taxonomy_id: str) -> bool:
        """Save a single taxonomy entry to the store.

        Args:
            taxonomy_id: ID of the taxonomy to save.

        Returns:
            True if saved successfully, False if not found.
        """
        self._ensure_store()

        data = self.registry.get(taxonomy_id)
        if not data:
            logger.warning(f"Taxonomy not found in registry: {taxonomy_id}")
            return False

        # Save metadata
        meta_key = f"meta:{taxonomy_id}"
        meta_value = {
            "type": data.metadata.type,
            "id": data.metadata.id,
            "name": data.metadata.name,
            "domain": data.metadata.domain,
            "version": data.metadata.version,
            "author": data.metadata.author,
            "description": data.metadata.description,
        }
        if data.metadata.created:
            meta_value["created"] = data.metadata.created
        if data.metadata.updated:
            meta_value["updated"] = data.metadata.updated
        if data.metadata.taxonomy_version:
            meta_value["taxonomy_version"] = data.metadata.taxonomy_version

        self.store.put(self.namespace, meta_key, {"value": meta_value})

        # Save type-specific data
        if data.metadata.type == "examples" and data.examples:
            examples_key = f"examples:{taxonomy_id}"
            examples_value = [
                {"input": inp, "path": path, "reasoning": reason}
                for inp, path, reason in data.examples
            ]
            self.store.put(self.namespace, examples_key, {"value": examples_value})

        elif data.metadata.type == "descriptions" and data.descriptions:
            desc_key = f"descriptions:{taxonomy_id}"
            self.store.put(self.namespace, desc_key, {"value": data.descriptions})

        elif data.metadata.type == "preset" and data.paths:
            preset_key = f"preset:{taxonomy_id}"
            self.store.put(self.namespace, preset_key, {"value": data.paths})

        # Update indexes
        self._update_indexes(data)

        logger.debug(f"Saved taxonomy to store: {taxonomy_id}")
        return True

    def save_all_to_store(self) -> int:
        """Save all taxonomies in the registry to the store.

        Returns:
            Number of taxonomies saved.
        """
        self._ensure_store()

        saved_count = 0
        for taxonomy_id in self.registry.list_ids():
            if self.save_to_store(taxonomy_id):
                saved_count += 1

        return saved_count

    def _update_indexes(self, data: TaxonomyData) -> None:
        """Update the type and domain indexes in the store.

        Args:
            data: The taxonomy data to index.
        """
        taxonomy_id = data.metadata.id
        taxonomy_type = data.metadata.type
        domain = data.metadata.domain

        # Update type index
        type_index_key = "index:by-type"
        type_index = self._get_from_store(type_index_key, {})
        if taxonomy_type not in type_index:
            type_index[taxonomy_type] = []
        if taxonomy_id not in type_index[taxonomy_type]:
            type_index[taxonomy_type].append(taxonomy_id)
        self.store.put(self.namespace, type_index_key, {"value": type_index})

        # Update domain index
        domain_index_key = "index:by-domain"
        domain_index = self._get_from_store(domain_index_key, {})
        if domain not in domain_index:
            domain_index[domain] = []
        if taxonomy_id not in domain_index[domain]:
            domain_index[domain].append(taxonomy_id)
        self.store.put(self.namespace, domain_index_key, {"value": domain_index})

    def _get_from_store(self, key: str, default: Any = None) -> Any:
        """Get a value from the store with default.

        Args:
            key: Store key.
            default: Default value if not found.

        Returns:
            Value from store or default.
        """
        result = self.store.get(self.namespace, key)
        if result is None:
            return default
        # Handle the Item wrapper if present
        if hasattr(result, "value"):
            return result.value.get("value", default)
        if isinstance(result, dict):
            return result.get("value", default)
        return default

    # -------------------------------------------------------------------------
    # Loading from store (for classifier/search)
    # -------------------------------------------------------------------------

    def get_examples_from_store(
        self, limit: int | None = None, domain: str | None = None
    ) -> list[tuple[str, str, str]]:
        """Get classification examples from the store.

        Args:
            limit: Maximum number of examples to return.
            domain: Domain to filter by (default: general).

        Returns:
            List of (input_text, path, reasoning) tuples.
        """
        self._ensure_store()

        # Get type index
        type_index = self._get_from_store("index:by-type", {})
        example_ids = type_index.get("examples", [])
        logger.debug(
            f"[TaxonomyLoader] Loading examples from store, found IDs: {example_ids}"
        )

        # Filter by domain if specified
        if domain:
            domain_index = self._get_from_store("index:by-domain", {})
            domain_ids = set(domain_index.get(domain, []))
            example_ids = [eid for eid in example_ids if eid in domain_ids]

        # Collect all examples
        examples: list[tuple[str, str, str]] = []
        for taxonomy_id in example_ids:
            key = f"examples:{taxonomy_id}"
            example_data = self._get_from_store(key, [])
            for item in example_data:
                examples.append((item["input"], item["path"], item["reasoning"]))
                if limit and len(examples) >= limit:
                    logger.debug(
                        f"[TaxonomyLoader] Loaded {len(examples)} examples from store (limit reached)"
                    )
                    return examples

        logger.debug(f"[TaxonomyLoader] Loaded {len(examples)} examples from store")
        return examples[:limit] if limit else examples

    def get_descriptions_from_store(self, domain: str | None = None) -> dict[str, str]:
        """Get category descriptions from the store.

        Args:
            domain: Domain to filter by (default: general).

        Returns:
            Dict mapping category to description.
        """
        self._ensure_store()

        # Get type index
        type_index = self._get_from_store("index:by-type", {})
        desc_ids = type_index.get("descriptions", [])

        # Filter by domain if specified
        if domain:
            domain_index = self._get_from_store("index:by-domain", {})
            domain_ids = set(domain_index.get(domain, []))
            # Include both general and domain-specific
            general_ids = set(domain_index.get("general", []))
            desc_ids = [
                did for did in desc_ids if did in domain_ids or did in general_ids
            ]

        # Merge descriptions (later entries override earlier)
        descriptions: dict[str, str] = {}
        for taxonomy_id in desc_ids:
            key = f"descriptions:{taxonomy_id}"
            desc_data = self._get_from_store(key, {})
            descriptions.update(desc_data)

        logger.debug(
            f"[TaxonomyLoader] Loaded {len(descriptions)} category descriptions from store"
        )
        return descriptions

    def get_preset_paths_from_store(
        self, preset_id: str | None = None
    ) -> dict[str, list[str]]:
        """Get preset taxonomy paths from the store.

        Args:
            preset_id: Specific preset ID to load, or None for all.

        Returns:
            Dict mapping category to list of paths.
        """
        self._ensure_store()

        if preset_id:
            key = f"preset:{preset_id}"
            paths = self._get_from_store(key, {})
            logger.debug(
                f"[TaxonomyLoader] Loaded preset '{preset_id}' from store: {len(paths)} categories"
            )
            return paths

        # Get all presets
        type_index = self._get_from_store("index:by-type", {})
        preset_ids = type_index.get("preset", [])

        paths: dict[str, list[str]] = {}
        for pid in preset_ids:
            key = f"preset:{pid}"
            preset_data = self._get_from_store(key, {})
            for category, category_paths in preset_data.items():
                if category not in paths:
                    paths[category] = []
                paths[category].extend(category_paths)

        return paths

    # -------------------------------------------------------------------------
    # Convenience: Initialize store from files
    # -------------------------------------------------------------------------

    def init_store(
        self,
        include_builtin: bool = True,
        external_paths: list[Path | str] | None = None,
        merge_strategy: str = "extend",
    ) -> dict[str, Any]:
        """Initialize the store with taxonomy data from files.

        Args:
            include_builtin: Whether to load builtin taxonomy files.
            external_paths: List of external markdown file paths.
            merge_strategy: How to handle existing data:
                - "extend": Add new entries, keep existing (default)
                - "override": External entries replace same-id entries
                - "replace": Clear store, load only specified sources

        Returns:
            Dict with counts of loaded taxonomies by type.
        """
        self._ensure_store()

        # Clear if replace strategy
        if merge_strategy == "replace":
            self._clear_taxonomy_from_store()
            self.registry.clear()

        loaded: dict[str, int] = {"examples": 0, "descriptions": 0, "preset": 0}

        # Load builtin
        if include_builtin:
            builtin_ids = self.load_builtin()
            for tid in builtin_ids:
                data = self.registry.get(tid)
                if data:
                    loaded[data.metadata.type] = loaded.get(data.metadata.type, 0) + 1

        # Load external
        if external_paths:
            for path in external_paths:
                try:
                    tid = self.load_external(path)
                    data = self.registry.get(tid)
                    if data:
                        loaded[data.metadata.type] = (
                            loaded.get(data.metadata.type, 0) + 1
                        )
                except Exception as e:
                    logger.error(f"Failed to load external taxonomy {path}: {e}")

        # Save to store
        saved_count = self.save_all_to_store()
        logger.info(f"Initialized store with {saved_count} taxonomy entries")

        return {
            "loaded": loaded,
            "saved": saved_count,
        }

    def _clear_taxonomy_from_store(self) -> None:
        """Clear all taxonomy data from the store."""
        # Get all keys and remove them
        type_index = self._get_from_store("index:by-type", {})

        for taxonomy_type, ids in type_index.items():
            for tid in ids:
                if taxonomy_type == "examples":
                    self.store.delete(self.namespace, f"examples:{tid}")
                elif taxonomy_type == "descriptions":
                    self.store.delete(self.namespace, f"descriptions:{tid}")
                elif taxonomy_type == "preset":
                    self.store.delete(self.namespace, f"preset:{tid}")
                self.store.delete(self.namespace, f"meta:{tid}")

        # Clear indexes
        self.store.delete(self.namespace, "index:by-type")
        self.store.delete(self.namespace, "index:by-domain")

    # -------------------------------------------------------------------------
    # Prompt formatting (reads from store)
    # -------------------------------------------------------------------------

    def format_for_prompt(
        self,
        include_examples: bool = True,
        include_descriptions: bool = True,
        example_limit: int = 8,
        domain: str | None = None,
    ) -> str:
        """Format taxonomy data for LLM prompt insertion.

        Reads from the store (not registry) to ensure consistency
        with what's persisted.

        Args:
            include_examples: Whether to include classification examples.
            include_descriptions: Whether to include category descriptions.
            example_limit: Maximum number of examples to include.
            domain: Domain to filter by.

        Returns:
            Formatted string ready for prompt inclusion.
        """
        parts = []

        if include_descriptions:
            descriptions = self.get_descriptions_from_store(domain)
            if descriptions:
                parts.append("TAXONOMY CATEGORIES:")
                for cat, desc in sorted(descriptions.items()):
                    parts.append(f"  {cat}: {desc}")
                parts.append("")

        if include_examples:
            examples = self.get_examples_from_store(limit=example_limit, domain=domain)
            if examples:
                parts.append(
                    "CLASSIFICATION EXAMPLES (3-level paths: category.subcategory.type):"
                )
                for input_text, path, _reasoning in examples:
                    parts.append(f'  "{input_text}" -> {path}')
                parts.append("")

        return "\n".join(parts)

    # -------------------------------------------------------------------------
    # Utility methods
    # -------------------------------------------------------------------------

    def list_stored_taxonomies(self) -> dict[str, list[str]]:
        """List all taxonomies stored in the store, grouped by type.

        Returns:
            Dict mapping type to list of taxonomy IDs.
        """
        self._ensure_store()
        return self._get_from_store("index:by-type", {})

    def get_taxonomy_metadata(self, taxonomy_id: str) -> dict[str, Any] | None:
        """Get metadata for a specific taxonomy from the store.

        Args:
            taxonomy_id: The taxonomy ID.

        Returns:
            Metadata dict or None if not found.
        """
        self._ensure_store()
        return self._get_from_store(f"meta:{taxonomy_id}")

    def has_taxonomy_in_store(self) -> bool:
        """Check if any taxonomy data exists in the store.

        Returns:
            True if taxonomy data exists.
        """
        self._ensure_store()
        type_index = self._get_from_store("index:by-type", {})
        return bool(type_index)

__init__

__init__(store: Any = None)

Initialize the taxonomy loader.

Parameters:

Name Type Description Default
store Any

ProllyTreeStore instance for persistence. If None, store operations will raise errors.

None
Source code in src/memoir/taxonomy/loader.py
def __init__(self, store: Any = None):
    """Initialize the taxonomy loader.

    Args:
        store: ProllyTreeStore instance for persistence.
               If None, store operations will raise errors.
    """
    self.store = store
    self.registry = TaxonomyRegistry()
    self.namespace = TAXONOMY_NAMESPACE
    self._parser = MarkdownTaxonomySource()

load_builtin

load_builtin() -> list[str]

Load all built-in taxonomy files into the registry.

Returns:

Type Description
list[str]

List of loaded taxonomy IDs.

Source code in src/memoir/taxonomy/loader.py
def load_builtin(self) -> list[str]:
    """Load all built-in taxonomy files into the registry.

    Returns:
        List of loaded taxonomy IDs.
    """
    return self.registry.load_builtin()

load_external

load_external(path: Path | str) -> str

Load an external taxonomy file into the registry.

Parameters:

Name Type Description Default
path Path | str

Path to the markdown file.

required

Returns:

Type Description
str

ID of the loaded taxonomy.

Source code in src/memoir/taxonomy/loader.py
def load_external(self, path: Path | str) -> str:
    """Load an external taxonomy file into the registry.

    Args:
        path: Path to the markdown file.

    Returns:
        ID of the loaded taxonomy.
    """
    return self.registry.load_external(path)

save_to_store

save_to_store(taxonomy_id: str) -> bool

Save a single taxonomy entry to the store.

Parameters:

Name Type Description Default
taxonomy_id str

ID of the taxonomy to save.

required

Returns:

Type Description
bool

True if saved successfully, False if not found.

Source code in src/memoir/taxonomy/loader.py
def save_to_store(self, taxonomy_id: str) -> bool:
    """Save a single taxonomy entry to the store.

    Args:
        taxonomy_id: ID of the taxonomy to save.

    Returns:
        True if saved successfully, False if not found.
    """
    self._ensure_store()

    data = self.registry.get(taxonomy_id)
    if not data:
        logger.warning(f"Taxonomy not found in registry: {taxonomy_id}")
        return False

    # Save metadata
    meta_key = f"meta:{taxonomy_id}"
    meta_value = {
        "type": data.metadata.type,
        "id": data.metadata.id,
        "name": data.metadata.name,
        "domain": data.metadata.domain,
        "version": data.metadata.version,
        "author": data.metadata.author,
        "description": data.metadata.description,
    }
    if data.metadata.created:
        meta_value["created"] = data.metadata.created
    if data.metadata.updated:
        meta_value["updated"] = data.metadata.updated
    if data.metadata.taxonomy_version:
        meta_value["taxonomy_version"] = data.metadata.taxonomy_version

    self.store.put(self.namespace, meta_key, {"value": meta_value})

    # Save type-specific data
    if data.metadata.type == "examples" and data.examples:
        examples_key = f"examples:{taxonomy_id}"
        examples_value = [
            {"input": inp, "path": path, "reasoning": reason}
            for inp, path, reason in data.examples
        ]
        self.store.put(self.namespace, examples_key, {"value": examples_value})

    elif data.metadata.type == "descriptions" and data.descriptions:
        desc_key = f"descriptions:{taxonomy_id}"
        self.store.put(self.namespace, desc_key, {"value": data.descriptions})

    elif data.metadata.type == "preset" and data.paths:
        preset_key = f"preset:{taxonomy_id}"
        self.store.put(self.namespace, preset_key, {"value": data.paths})

    # Update indexes
    self._update_indexes(data)

    logger.debug(f"Saved taxonomy to store: {taxonomy_id}")
    return True

save_all_to_store

save_all_to_store() -> int

Save all taxonomies in the registry to the store.

Returns:

Type Description
int

Number of taxonomies saved.

Source code in src/memoir/taxonomy/loader.py
def save_all_to_store(self) -> int:
    """Save all taxonomies in the registry to the store.

    Returns:
        Number of taxonomies saved.
    """
    self._ensure_store()

    saved_count = 0
    for taxonomy_id in self.registry.list_ids():
        if self.save_to_store(taxonomy_id):
            saved_count += 1

    return saved_count

get_examples_from_store

get_examples_from_store(limit: int | None = None, domain: str | None = None) -> list[tuple[str, str, str]]

Get classification examples from the store.

Parameters:

Name Type Description Default
limit int | None

Maximum number of examples to return.

None
domain str | None

Domain to filter by (default: general).

None

Returns:

Type Description
list[tuple[str, str, str]]

List of (input_text, path, reasoning) tuples.

Source code in src/memoir/taxonomy/loader.py
def get_examples_from_store(
    self, limit: int | None = None, domain: str | None = None
) -> list[tuple[str, str, str]]:
    """Get classification examples from the store.

    Args:
        limit: Maximum number of examples to return.
        domain: Domain to filter by (default: general).

    Returns:
        List of (input_text, path, reasoning) tuples.
    """
    self._ensure_store()

    # Get type index
    type_index = self._get_from_store("index:by-type", {})
    example_ids = type_index.get("examples", [])
    logger.debug(
        f"[TaxonomyLoader] Loading examples from store, found IDs: {example_ids}"
    )

    # Filter by domain if specified
    if domain:
        domain_index = self._get_from_store("index:by-domain", {})
        domain_ids = set(domain_index.get(domain, []))
        example_ids = [eid for eid in example_ids if eid in domain_ids]

    # Collect all examples
    examples: list[tuple[str, str, str]] = []
    for taxonomy_id in example_ids:
        key = f"examples:{taxonomy_id}"
        example_data = self._get_from_store(key, [])
        for item in example_data:
            examples.append((item["input"], item["path"], item["reasoning"]))
            if limit and len(examples) >= limit:
                logger.debug(
                    f"[TaxonomyLoader] Loaded {len(examples)} examples from store (limit reached)"
                )
                return examples

    logger.debug(f"[TaxonomyLoader] Loaded {len(examples)} examples from store")
    return examples[:limit] if limit else examples

get_descriptions_from_store

get_descriptions_from_store(domain: str | None = None) -> dict[str, str]

Get category descriptions from the store.

Parameters:

Name Type Description Default
domain str | None

Domain to filter by (default: general).

None

Returns:

Type Description
dict[str, str]

Dict mapping category to description.

Source code in src/memoir/taxonomy/loader.py
def get_descriptions_from_store(self, domain: str | None = None) -> dict[str, str]:
    """Get category descriptions from the store.

    Args:
        domain: Domain to filter by (default: general).

    Returns:
        Dict mapping category to description.
    """
    self._ensure_store()

    # Get type index
    type_index = self._get_from_store("index:by-type", {})
    desc_ids = type_index.get("descriptions", [])

    # Filter by domain if specified
    if domain:
        domain_index = self._get_from_store("index:by-domain", {})
        domain_ids = set(domain_index.get(domain, []))
        # Include both general and domain-specific
        general_ids = set(domain_index.get("general", []))
        desc_ids = [
            did for did in desc_ids if did in domain_ids or did in general_ids
        ]

    # Merge descriptions (later entries override earlier)
    descriptions: dict[str, str] = {}
    for taxonomy_id in desc_ids:
        key = f"descriptions:{taxonomy_id}"
        desc_data = self._get_from_store(key, {})
        descriptions.update(desc_data)

    logger.debug(
        f"[TaxonomyLoader] Loaded {len(descriptions)} category descriptions from store"
    )
    return descriptions

get_preset_paths_from_store

get_preset_paths_from_store(preset_id: str | None = None) -> dict[str, list[str]]

Get preset taxonomy paths from the store.

Parameters:

Name Type Description Default
preset_id str | None

Specific preset ID to load, or None for all.

None

Returns:

Type Description
dict[str, list[str]]

Dict mapping category to list of paths.

Source code in src/memoir/taxonomy/loader.py
def get_preset_paths_from_store(
    self, preset_id: str | None = None
) -> dict[str, list[str]]:
    """Get preset taxonomy paths from the store.

    Args:
        preset_id: Specific preset ID to load, or None for all.

    Returns:
        Dict mapping category to list of paths.
    """
    self._ensure_store()

    if preset_id:
        key = f"preset:{preset_id}"
        paths = self._get_from_store(key, {})
        logger.debug(
            f"[TaxonomyLoader] Loaded preset '{preset_id}' from store: {len(paths)} categories"
        )
        return paths

    # Get all presets
    type_index = self._get_from_store("index:by-type", {})
    preset_ids = type_index.get("preset", [])

    paths: dict[str, list[str]] = {}
    for pid in preset_ids:
        key = f"preset:{pid}"
        preset_data = self._get_from_store(key, {})
        for category, category_paths in preset_data.items():
            if category not in paths:
                paths[category] = []
            paths[category].extend(category_paths)

    return paths

init_store

init_store(include_builtin: bool = True, external_paths: list[Path | str] | None = None, merge_strategy: str = 'extend') -> dict[str, Any]

Initialize the store with taxonomy data from files.

Parameters:

Name Type Description Default
include_builtin bool

Whether to load builtin taxonomy files.

True
external_paths list[Path | str] | None

List of external markdown file paths.

None
merge_strategy str

How to handle existing data: - "extend": Add new entries, keep existing (default) - "override": External entries replace same-id entries - "replace": Clear store, load only specified sources

'extend'

Returns:

Type Description
dict[str, Any]

Dict with counts of loaded taxonomies by type.

Source code in src/memoir/taxonomy/loader.py
def init_store(
    self,
    include_builtin: bool = True,
    external_paths: list[Path | str] | None = None,
    merge_strategy: str = "extend",
) -> dict[str, Any]:
    """Initialize the store with taxonomy data from files.

    Args:
        include_builtin: Whether to load builtin taxonomy files.
        external_paths: List of external markdown file paths.
        merge_strategy: How to handle existing data:
            - "extend": Add new entries, keep existing (default)
            - "override": External entries replace same-id entries
            - "replace": Clear store, load only specified sources

    Returns:
        Dict with counts of loaded taxonomies by type.
    """
    self._ensure_store()

    # Clear if replace strategy
    if merge_strategy == "replace":
        self._clear_taxonomy_from_store()
        self.registry.clear()

    loaded: dict[str, int] = {"examples": 0, "descriptions": 0, "preset": 0}

    # Load builtin
    if include_builtin:
        builtin_ids = self.load_builtin()
        for tid in builtin_ids:
            data = self.registry.get(tid)
            if data:
                loaded[data.metadata.type] = loaded.get(data.metadata.type, 0) + 1

    # Load external
    if external_paths:
        for path in external_paths:
            try:
                tid = self.load_external(path)
                data = self.registry.get(tid)
                if data:
                    loaded[data.metadata.type] = (
                        loaded.get(data.metadata.type, 0) + 1
                    )
            except Exception as e:
                logger.error(f"Failed to load external taxonomy {path}: {e}")

    # Save to store
    saved_count = self.save_all_to_store()
    logger.info(f"Initialized store with {saved_count} taxonomy entries")

    return {
        "loaded": loaded,
        "saved": saved_count,
    }

format_for_prompt

format_for_prompt(include_examples: bool = True, include_descriptions: bool = True, example_limit: int = 8, domain: str | None = None) -> str

Format taxonomy data for LLM prompt insertion.

Reads from the store (not registry) to ensure consistency with what's persisted.

Parameters:

Name Type Description Default
include_examples bool

Whether to include classification examples.

True
include_descriptions bool

Whether to include category descriptions.

True
example_limit int

Maximum number of examples to include.

8
domain str | None

Domain to filter by.

None

Returns:

Type Description
str

Formatted string ready for prompt inclusion.

Source code in src/memoir/taxonomy/loader.py
def format_for_prompt(
    self,
    include_examples: bool = True,
    include_descriptions: bool = True,
    example_limit: int = 8,
    domain: str | None = None,
) -> str:
    """Format taxonomy data for LLM prompt insertion.

    Reads from the store (not registry) to ensure consistency
    with what's persisted.

    Args:
        include_examples: Whether to include classification examples.
        include_descriptions: Whether to include category descriptions.
        example_limit: Maximum number of examples to include.
        domain: Domain to filter by.

    Returns:
        Formatted string ready for prompt inclusion.
    """
    parts = []

    if include_descriptions:
        descriptions = self.get_descriptions_from_store(domain)
        if descriptions:
            parts.append("TAXONOMY CATEGORIES:")
            for cat, desc in sorted(descriptions.items()):
                parts.append(f"  {cat}: {desc}")
            parts.append("")

    if include_examples:
        examples = self.get_examples_from_store(limit=example_limit, domain=domain)
        if examples:
            parts.append(
                "CLASSIFICATION EXAMPLES (3-level paths: category.subcategory.type):"
            )
            for input_text, path, _reasoning in examples:
                parts.append(f'  "{input_text}" -> {path}')
            parts.append("")

    return "\n".join(parts)

list_stored_taxonomies

list_stored_taxonomies() -> dict[str, list[str]]

List all taxonomies stored in the store, grouped by type.

Returns:

Type Description
dict[str, list[str]]

Dict mapping type to list of taxonomy IDs.

Source code in src/memoir/taxonomy/loader.py
def list_stored_taxonomies(self) -> dict[str, list[str]]:
    """List all taxonomies stored in the store, grouped by type.

    Returns:
        Dict mapping type to list of taxonomy IDs.
    """
    self._ensure_store()
    return self._get_from_store("index:by-type", {})

get_taxonomy_metadata

get_taxonomy_metadata(taxonomy_id: str) -> dict[str, Any] | None

Get metadata for a specific taxonomy from the store.

Parameters:

Name Type Description Default
taxonomy_id str

The taxonomy ID.

required

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found.

Source code in src/memoir/taxonomy/loader.py
def get_taxonomy_metadata(self, taxonomy_id: str) -> dict[str, Any] | None:
    """Get metadata for a specific taxonomy from the store.

    Args:
        taxonomy_id: The taxonomy ID.

    Returns:
        Metadata dict or None if not found.
    """
    self._ensure_store()
    return self._get_from_store(f"meta:{taxonomy_id}")

has_taxonomy_in_store

has_taxonomy_in_store() -> bool

Check if any taxonomy data exists in the store.

Returns:

Type Description
bool

True if taxonomy data exists.

Source code in src/memoir/taxonomy/loader.py
def has_taxonomy_in_store(self) -> bool:
    """Check if any taxonomy data exists in the store.

    Returns:
        True if taxonomy data exists.
    """
    self._ensure_store()
    type_index = self._get_from_store("index:by-type", {})
    return bool(type_index)

memoir.taxonomy.registry module

memoir.taxonomy.registry

Central registry for taxonomy data management.

Handles loading from builtin and external markdown files, and provides access to combined taxonomy data.

TaxonomyEntry dataclass

Entry in the taxonomy registry.

Source code in src/memoir/taxonomy/registry.py
@dataclass
class TaxonomyEntry:
    """Entry in the taxonomy registry."""

    data: TaxonomyData
    source_path: Path | None = None
    is_builtin: bool = True

TaxonomyRegistry

Central registry for managing taxonomy data from multiple sources.

Provides: - Loading from built-in markdown files - Loading from external/user-provided files - Domain-based filtering - Type-based lookup (examples, descriptions, presets) - Merging/combining taxonomy data

Source code in src/memoir/taxonomy/registry.py
class TaxonomyRegistry:
    """
    Central registry for managing taxonomy data from multiple sources.

    Provides:
    - Loading from built-in markdown files
    - Loading from external/user-provided files
    - Domain-based filtering
    - Type-based lookup (examples, descriptions, presets)
    - Merging/combining taxonomy data
    """

    def __init__(self):
        """Initialize an empty registry."""
        self._entries: dict[str, TaxonomyEntry] = {}
        self._by_type: dict[str, list[str]] = {
            "examples": [],
            "descriptions": [],
            "preset": [],
        }
        self._by_domain: dict[str, list[str]] = {}
        self._parser = MarkdownTaxonomySource()
        self._builtin_path = Path(__file__).parent / "data"

    def load_builtin(self) -> list[str]:
        """Load all built-in taxonomy markdown files.

        Returns:
            List of loaded taxonomy IDs.
        """
        loaded_ids = []

        if not self._builtin_path.exists():
            logger.warning(f"Built-in taxonomy path not found: {self._builtin_path}")
            return loaded_ids

        for md_file in self._builtin_path.rglob("*.md"):
            if md_file.name == "README.md":
                continue
            try:
                taxonomy_id = self._load_file(md_file, is_builtin=True)
                loaded_ids.append(taxonomy_id)
                logger.debug(f"Loaded builtin taxonomy: {taxonomy_id} from {md_file}")
            except (TaxonomyParseError, FileNotFoundError) as e:
                logger.error(f"Failed to load {md_file}: {e}")

        return loaded_ids

    def load_external(self, path: Path | str) -> str:
        """Load an external taxonomy file.

        Args:
            path: Path to markdown file.

        Returns:
            ID of the loaded taxonomy.

        Raises:
            TaxonomyParseError: If the file cannot be parsed.
            FileNotFoundError: If the file doesn't exist.
        """
        path = Path(path)
        return self._load_file(path, is_builtin=False)

    def _load_file(self, path: Path, is_builtin: bool) -> str:
        """Load a single taxonomy file.

        Args:
            path: Path to the markdown file.
            is_builtin: Whether this is a builtin file.

        Returns:
            ID of the loaded taxonomy.
        """
        data = self._parser.load(path)

        entry = TaxonomyEntry(data=data, source_path=path, is_builtin=is_builtin)

        taxonomy_id = data.metadata.id
        self._entries[taxonomy_id] = entry

        # Update type index
        taxonomy_type = data.metadata.type
        if taxonomy_type not in self._by_type:
            self._by_type[taxonomy_type] = []
        if taxonomy_id not in self._by_type[taxonomy_type]:
            self._by_type[taxonomy_type].append(taxonomy_id)

        # Update domain index
        domain = data.metadata.domain
        if domain not in self._by_domain:
            self._by_domain[domain] = []
        if taxonomy_id not in self._by_domain[domain]:
            self._by_domain[domain].append(taxonomy_id)

        return taxonomy_id

    def get(self, taxonomy_id: str) -> TaxonomyData | None:
        """Get taxonomy data by ID.

        Args:
            taxonomy_id: The taxonomy ID to look up.

        Returns:
            TaxonomyData if found, None otherwise.
        """
        entry = self._entries.get(taxonomy_id)
        return entry.data if entry else None

    def get_entry(self, taxonomy_id: str) -> TaxonomyEntry | None:
        """Get full taxonomy entry by ID.

        Args:
            taxonomy_id: The taxonomy ID to look up.

        Returns:
            TaxonomyEntry if found, None otherwise.
        """
        return self._entries.get(taxonomy_id)

    def get_by_type(
        self, taxonomy_type: str, domain: str | None = None
    ) -> list[TaxonomyData]:
        """Get all taxonomy data of a specific type.

        Args:
            taxonomy_type: Type to filter by (examples, descriptions, preset).
            domain: Optional domain to filter by.

        Returns:
            List of matching TaxonomyData.
        """
        ids = self._by_type.get(taxonomy_type, [])
        if domain:
            domain_ids = set(self._by_domain.get(domain, []))
            ids = [tid for tid in ids if tid in domain_ids]
        return [self._entries[tid].data for tid in ids if tid in self._entries]

    def get_combined_examples(
        self, domain: str | None = None
    ) -> list[tuple[str, str, str]]:
        """Get all examples combined, optionally filtered by domain.

        Args:
            domain: Optional domain to filter by. If None, uses "general".

        Returns:
            List of (input_text, path, reasoning) tuples.
        """
        examples: list[tuple[str, str, str]] = []

        # Load general first if no specific domain or if domain is different
        if domain is None or domain == "general":
            for data in self.get_by_type("examples", "general"):
                if data.examples:
                    examples.extend(data.examples)
        elif domain != "general":
            # Load general first, then domain-specific
            for data in self.get_by_type("examples", "general"):
                if data.examples:
                    examples.extend(data.examples)
            for data in self.get_by_type("examples", domain):
                if data.examples:
                    examples.extend(data.examples)

        return examples

    def get_combined_descriptions(self, domain: str | None = None) -> dict[str, str]:
        """Get all descriptions merged, domain-specific overriding general.

        Args:
            domain: Optional domain to filter by. If None, uses "general".

        Returns:
            Dict mapping category to description.
        """
        descriptions: dict[str, str] = {}

        # Load general first
        for data in self.get_by_type("descriptions", "general"):
            if data.descriptions:
                descriptions.update(data.descriptions)

        # Then domain-specific (if different from general)
        if domain and domain != "general":
            for data in self.get_by_type("descriptions", domain):
                if data.descriptions:
                    descriptions.update(data.descriptions)

        return descriptions

    def get_combined_paths(
        self, preset_id: str | None = None, domain: str | None = None
    ) -> dict[str, list[str]]:
        """Get preset paths, optionally filtered by preset ID or domain.

        Args:
            preset_id: Specific preset ID to load.
            domain: Domain to filter by.

        Returns:
            Dict mapping category to list of paths.
        """
        if preset_id:
            data = self.get(preset_id)
            if data and data.paths:
                return data.paths
            return {}

        # Combine all presets for domain
        paths: dict[str, list[str]] = {}
        presets = self.get_by_type("preset", domain or "general")
        for data in presets:
            if data.paths:
                for category, category_paths in data.paths.items():
                    if category not in paths:
                        paths[category] = []
                    paths[category].extend(category_paths)

        return paths

    def list_ids(self) -> list[str]:
        """List all registered taxonomy IDs.

        Returns:
            List of taxonomy IDs.
        """
        return list(self._entries.keys())

    def list_domains(self) -> list[str]:
        """List all available domains.

        Returns:
            List of domain names.
        """
        return list(self._by_domain.keys())

    def list_by_type(self, taxonomy_type: str) -> list[str]:
        """List taxonomy IDs by type.

        Args:
            taxonomy_type: The type to list (examples, descriptions, preset).

        Returns:
            List of taxonomy IDs of that type.
        """
        return list(self._by_type.get(taxonomy_type, []))

    def remove(self, taxonomy_id: str) -> bool:
        """Remove a taxonomy entry from the registry.

        Args:
            taxonomy_id: The taxonomy ID to remove.

        Returns:
            True if removed, False if not found.
        """
        if taxonomy_id not in self._entries:
            return False

        entry = self._entries[taxonomy_id]
        taxonomy_type = entry.data.metadata.type
        domain = entry.data.metadata.domain

        # Remove from type index
        if taxonomy_type in self._by_type:
            self._by_type[taxonomy_type] = [
                tid for tid in self._by_type[taxonomy_type] if tid != taxonomy_id
            ]

        # Remove from domain index
        if domain in self._by_domain:
            self._by_domain[domain] = [
                tid for tid in self._by_domain[domain] if tid != taxonomy_id
            ]

        # Remove entry
        del self._entries[taxonomy_id]
        return True

    def clear(self) -> None:
        """Clear all entries from the registry."""
        self._entries.clear()
        self._by_type = {"examples": [], "descriptions": [], "preset": []}
        self._by_domain = {}

    def __len__(self) -> int:
        """Return the number of entries in the registry."""
        return len(self._entries)

    def __contains__(self, taxonomy_id: str) -> bool:
        """Check if a taxonomy ID is in the registry."""
        return taxonomy_id in self._entries

__init__

__init__()

Initialize an empty registry.

Source code in src/memoir/taxonomy/registry.py
def __init__(self):
    """Initialize an empty registry."""
    self._entries: dict[str, TaxonomyEntry] = {}
    self._by_type: dict[str, list[str]] = {
        "examples": [],
        "descriptions": [],
        "preset": [],
    }
    self._by_domain: dict[str, list[str]] = {}
    self._parser = MarkdownTaxonomySource()
    self._builtin_path = Path(__file__).parent / "data"

load_builtin

load_builtin() -> list[str]

Load all built-in taxonomy markdown files.

Returns:

Type Description
list[str]

List of loaded taxonomy IDs.

Source code in src/memoir/taxonomy/registry.py
def load_builtin(self) -> list[str]:
    """Load all built-in taxonomy markdown files.

    Returns:
        List of loaded taxonomy IDs.
    """
    loaded_ids = []

    if not self._builtin_path.exists():
        logger.warning(f"Built-in taxonomy path not found: {self._builtin_path}")
        return loaded_ids

    for md_file in self._builtin_path.rglob("*.md"):
        if md_file.name == "README.md":
            continue
        try:
            taxonomy_id = self._load_file(md_file, is_builtin=True)
            loaded_ids.append(taxonomy_id)
            logger.debug(f"Loaded builtin taxonomy: {taxonomy_id} from {md_file}")
        except (TaxonomyParseError, FileNotFoundError) as e:
            logger.error(f"Failed to load {md_file}: {e}")

    return loaded_ids

load_external

load_external(path: Path | str) -> str

Load an external taxonomy file.

Parameters:

Name Type Description Default
path Path | str

Path to markdown file.

required

Returns:

Type Description
str

ID of the loaded taxonomy.

Raises:

Type Description
TaxonomyParseError

If the file cannot be parsed.

FileNotFoundError

If the file doesn't exist.

Source code in src/memoir/taxonomy/registry.py
def load_external(self, path: Path | str) -> str:
    """Load an external taxonomy file.

    Args:
        path: Path to markdown file.

    Returns:
        ID of the loaded taxonomy.

    Raises:
        TaxonomyParseError: If the file cannot be parsed.
        FileNotFoundError: If the file doesn't exist.
    """
    path = Path(path)
    return self._load_file(path, is_builtin=False)

get

get(taxonomy_id: str) -> TaxonomyData | None

Get taxonomy data by ID.

Parameters:

Name Type Description Default
taxonomy_id str

The taxonomy ID to look up.

required

Returns:

Type Description
TaxonomyData | None

TaxonomyData if found, None otherwise.

Source code in src/memoir/taxonomy/registry.py
def get(self, taxonomy_id: str) -> TaxonomyData | None:
    """Get taxonomy data by ID.

    Args:
        taxonomy_id: The taxonomy ID to look up.

    Returns:
        TaxonomyData if found, None otherwise.
    """
    entry = self._entries.get(taxonomy_id)
    return entry.data if entry else None

get_entry

get_entry(taxonomy_id: str) -> TaxonomyEntry | None

Get full taxonomy entry by ID.

Parameters:

Name Type Description Default
taxonomy_id str

The taxonomy ID to look up.

required

Returns:

Type Description
TaxonomyEntry | None

TaxonomyEntry if found, None otherwise.

Source code in src/memoir/taxonomy/registry.py
def get_entry(self, taxonomy_id: str) -> TaxonomyEntry | None:
    """Get full taxonomy entry by ID.

    Args:
        taxonomy_id: The taxonomy ID to look up.

    Returns:
        TaxonomyEntry if found, None otherwise.
    """
    return self._entries.get(taxonomy_id)

get_by_type

get_by_type(taxonomy_type: str, domain: str | None = None) -> list[TaxonomyData]

Get all taxonomy data of a specific type.

Parameters:

Name Type Description Default
taxonomy_type str

Type to filter by (examples, descriptions, preset).

required
domain str | None

Optional domain to filter by.

None

Returns:

Type Description
list[TaxonomyData]

List of matching TaxonomyData.

Source code in src/memoir/taxonomy/registry.py
def get_by_type(
    self, taxonomy_type: str, domain: str | None = None
) -> list[TaxonomyData]:
    """Get all taxonomy data of a specific type.

    Args:
        taxonomy_type: Type to filter by (examples, descriptions, preset).
        domain: Optional domain to filter by.

    Returns:
        List of matching TaxonomyData.
    """
    ids = self._by_type.get(taxonomy_type, [])
    if domain:
        domain_ids = set(self._by_domain.get(domain, []))
        ids = [tid for tid in ids if tid in domain_ids]
    return [self._entries[tid].data for tid in ids if tid in self._entries]

get_combined_examples

get_combined_examples(domain: str | None = None) -> list[tuple[str, str, str]]

Get all examples combined, optionally filtered by domain.

Parameters:

Name Type Description Default
domain str | None

Optional domain to filter by. If None, uses "general".

None

Returns:

Type Description
list[tuple[str, str, str]]

List of (input_text, path, reasoning) tuples.

Source code in src/memoir/taxonomy/registry.py
def get_combined_examples(
    self, domain: str | None = None
) -> list[tuple[str, str, str]]:
    """Get all examples combined, optionally filtered by domain.

    Args:
        domain: Optional domain to filter by. If None, uses "general".

    Returns:
        List of (input_text, path, reasoning) tuples.
    """
    examples: list[tuple[str, str, str]] = []

    # Load general first if no specific domain or if domain is different
    if domain is None or domain == "general":
        for data in self.get_by_type("examples", "general"):
            if data.examples:
                examples.extend(data.examples)
    elif domain != "general":
        # Load general first, then domain-specific
        for data in self.get_by_type("examples", "general"):
            if data.examples:
                examples.extend(data.examples)
        for data in self.get_by_type("examples", domain):
            if data.examples:
                examples.extend(data.examples)

    return examples

get_combined_descriptions

get_combined_descriptions(domain: str | None = None) -> dict[str, str]

Get all descriptions merged, domain-specific overriding general.

Parameters:

Name Type Description Default
domain str | None

Optional domain to filter by. If None, uses "general".

None

Returns:

Type Description
dict[str, str]

Dict mapping category to description.

Source code in src/memoir/taxonomy/registry.py
def get_combined_descriptions(self, domain: str | None = None) -> dict[str, str]:
    """Get all descriptions merged, domain-specific overriding general.

    Args:
        domain: Optional domain to filter by. If None, uses "general".

    Returns:
        Dict mapping category to description.
    """
    descriptions: dict[str, str] = {}

    # Load general first
    for data in self.get_by_type("descriptions", "general"):
        if data.descriptions:
            descriptions.update(data.descriptions)

    # Then domain-specific (if different from general)
    if domain and domain != "general":
        for data in self.get_by_type("descriptions", domain):
            if data.descriptions:
                descriptions.update(data.descriptions)

    return descriptions

get_combined_paths

get_combined_paths(preset_id: str | None = None, domain: str | None = None) -> dict[str, list[str]]

Get preset paths, optionally filtered by preset ID or domain.

Parameters:

Name Type Description Default
preset_id str | None

Specific preset ID to load.

None
domain str | None

Domain to filter by.

None

Returns:

Type Description
dict[str, list[str]]

Dict mapping category to list of paths.

Source code in src/memoir/taxonomy/registry.py
def get_combined_paths(
    self, preset_id: str | None = None, domain: str | None = None
) -> dict[str, list[str]]:
    """Get preset paths, optionally filtered by preset ID or domain.

    Args:
        preset_id: Specific preset ID to load.
        domain: Domain to filter by.

    Returns:
        Dict mapping category to list of paths.
    """
    if preset_id:
        data = self.get(preset_id)
        if data and data.paths:
            return data.paths
        return {}

    # Combine all presets for domain
    paths: dict[str, list[str]] = {}
    presets = self.get_by_type("preset", domain or "general")
    for data in presets:
        if data.paths:
            for category, category_paths in data.paths.items():
                if category not in paths:
                    paths[category] = []
                paths[category].extend(category_paths)

    return paths

list_ids

list_ids() -> list[str]

List all registered taxonomy IDs.

Returns:

Type Description
list[str]

List of taxonomy IDs.

Source code in src/memoir/taxonomy/registry.py
def list_ids(self) -> list[str]:
    """List all registered taxonomy IDs.

    Returns:
        List of taxonomy IDs.
    """
    return list(self._entries.keys())

list_domains

list_domains() -> list[str]

List all available domains.

Returns:

Type Description
list[str]

List of domain names.

Source code in src/memoir/taxonomy/registry.py
def list_domains(self) -> list[str]:
    """List all available domains.

    Returns:
        List of domain names.
    """
    return list(self._by_domain.keys())

list_by_type

list_by_type(taxonomy_type: str) -> list[str]

List taxonomy IDs by type.

Parameters:

Name Type Description Default
taxonomy_type str

The type to list (examples, descriptions, preset).

required

Returns:

Type Description
list[str]

List of taxonomy IDs of that type.

Source code in src/memoir/taxonomy/registry.py
def list_by_type(self, taxonomy_type: str) -> list[str]:
    """List taxonomy IDs by type.

    Args:
        taxonomy_type: The type to list (examples, descriptions, preset).

    Returns:
        List of taxonomy IDs of that type.
    """
    return list(self._by_type.get(taxonomy_type, []))

remove

remove(taxonomy_id: str) -> bool

Remove a taxonomy entry from the registry.

Parameters:

Name Type Description Default
taxonomy_id str

The taxonomy ID to remove.

required

Returns:

Type Description
bool

True if removed, False if not found.

Source code in src/memoir/taxonomy/registry.py
def remove(self, taxonomy_id: str) -> bool:
    """Remove a taxonomy entry from the registry.

    Args:
        taxonomy_id: The taxonomy ID to remove.

    Returns:
        True if removed, False if not found.
    """
    if taxonomy_id not in self._entries:
        return False

    entry = self._entries[taxonomy_id]
    taxonomy_type = entry.data.metadata.type
    domain = entry.data.metadata.domain

    # Remove from type index
    if taxonomy_type in self._by_type:
        self._by_type[taxonomy_type] = [
            tid for tid in self._by_type[taxonomy_type] if tid != taxonomy_id
        ]

    # Remove from domain index
    if domain in self._by_domain:
        self._by_domain[domain] = [
            tid for tid in self._by_domain[domain] if tid != taxonomy_id
        ]

    # Remove entry
    del self._entries[taxonomy_id]
    return True

clear

clear() -> None

Clear all entries from the registry.

Source code in src/memoir/taxonomy/registry.py
def clear(self) -> None:
    """Clear all entries from the registry."""
    self._entries.clear()
    self._by_type = {"examples": [], "descriptions": [], "preset": []}
    self._by_domain = {}

__len__

__len__() -> int

Return the number of entries in the registry.

Source code in src/memoir/taxonomy/registry.py
def __len__(self) -> int:
    """Return the number of entries in the registry."""
    return len(self._entries)

__contains__

__contains__(taxonomy_id: str) -> bool

Check if a taxonomy ID is in the registry.

Source code in src/memoir/taxonomy/registry.py
def __contains__(self, taxonomy_id: str) -> bool:
    """Check if a taxonomy ID is in the registry."""
    return taxonomy_id in self._entries

memoir.taxonomy.markdown_source module

memoir.taxonomy.markdown_source

Markdown-based taxonomy data source.

Parses YAML frontmatter and structured markdown content into taxonomy data.

TaxonomyParseError

Bases: Exception

Error parsing taxonomy markdown file.

Source code in src/memoir/taxonomy/markdown_source.py
class TaxonomyParseError(Exception):
    """Error parsing taxonomy markdown file."""

    pass

TaxonomyMetadata dataclass

Metadata from taxonomy markdown file frontmatter.

Source code in src/memoir/taxonomy/markdown_source.py
@dataclass
class TaxonomyMetadata:
    """Metadata from taxonomy markdown file frontmatter."""

    type: str  # examples | descriptions | preset
    id: str
    name: str
    domain: str = "general"
    version: str = "1.0.0"
    created: str | None = None
    updated: str | None = None
    author: str = "system"
    description: str = ""
    tags: list[str] = field(default_factory=list)
    dependencies: list[str] = field(default_factory=list)
    taxonomy_version: str | None = None  # For presets (e.g., "simplified")

TaxonomyData dataclass

Parsed taxonomy data from markdown.

Source code in src/memoir/taxonomy/markdown_source.py
@dataclass
class TaxonomyData:
    """Parsed taxonomy data from markdown."""

    metadata: TaxonomyMetadata
    examples: list[tuple[str, str, str]] | None = None  # (input, path, reasoning)
    descriptions: dict[str, str] | None = None  # category -> description
    paths: dict[str, list[str]] | None = None  # category -> [subcategory.type, ...]
    raw_content: str = ""

MarkdownTaxonomySource

Markdown file-based taxonomy data source.

Parses YAML frontmatter and structured markdown content into taxonomy data structures.

Supported types: - examples: Classification examples in markdown tables - descriptions: Category descriptions in a markdown table - preset: Taxonomy paths in bullet lists under headers

Source code in src/memoir/taxonomy/markdown_source.py
class MarkdownTaxonomySource:
    """
    Markdown file-based taxonomy data source.

    Parses YAML frontmatter and structured markdown content
    into taxonomy data structures.

    Supported types:
    - examples: Classification examples in markdown tables
    - descriptions: Category descriptions in a markdown table
    - preset: Taxonomy paths in bullet lists under headers
    """

    def __init__(self, encoding: str = "utf-8"):
        """Initialize the markdown source parser.

        Args:
            encoding: File encoding to use when reading files.
        """
        self.encoding = encoding

    def load(self, path: Path) -> TaxonomyData:
        """Load and parse a markdown taxonomy file.

        Args:
            path: Path to the markdown file.

        Returns:
            Parsed TaxonomyData.

        Raises:
            TaxonomyParseError: If the file cannot be parsed.
            FileNotFoundError: If the file doesn't exist.
        """
        if not path.exists():
            raise FileNotFoundError(f"Taxonomy file not found: {path}")

        content = path.read_text(encoding=self.encoding)
        return self.parse(content)

    def parse(self, content: str) -> TaxonomyData:
        """Parse markdown content into TaxonomyData.

        Args:
            content: Raw markdown content.

        Returns:
            Parsed TaxonomyData.

        Raises:
            TaxonomyParseError: If the content cannot be parsed.
        """
        metadata, body = self._split_frontmatter(content)

        if metadata.type == "examples":
            examples = self._parse_examples_tables(body)
            return TaxonomyData(metadata=metadata, examples=examples, raw_content=body)
        elif metadata.type == "descriptions":
            descriptions = self._parse_descriptions_table(body)
            return TaxonomyData(
                metadata=metadata, descriptions=descriptions, raw_content=body
            )
        elif metadata.type == "preset":
            paths = self._parse_preset_lists(body)
            return TaxonomyData(metadata=metadata, paths=paths, raw_content=body)
        else:
            raise TaxonomyParseError(f"Unknown taxonomy type: {metadata.type}")

    def _split_frontmatter(self, content: str) -> tuple[TaxonomyMetadata, str]:
        """Split YAML frontmatter from markdown body.

        Args:
            content: Raw markdown content.

        Returns:
            Tuple of (metadata, body).

        Raises:
            TaxonomyParseError: If frontmatter is missing or invalid.
        """
        pattern = r"^---\s*\n(.*?)\n---\s*\n(.*)$"
        match = re.match(pattern, content, re.DOTALL)
        if not match:
            raise TaxonomyParseError("Invalid markdown: missing YAML frontmatter")

        yaml_content = match.group(1)
        body = match.group(2)

        try:
            meta_dict = yaml.safe_load(yaml_content)
        except yaml.YAMLError as e:
            raise TaxonomyParseError(f"Invalid YAML frontmatter: {e}") from e

        # Validate required fields
        required_fields = ["type", "id", "name"]
        for field_name in required_fields:
            if field_name not in meta_dict:
                raise TaxonomyParseError(
                    f"Missing required field in frontmatter: {field_name}"
                )

        # Handle optional list fields that might be None
        if meta_dict.get("tags") is None:
            meta_dict["tags"] = []
        if meta_dict.get("dependencies") is None:
            meta_dict["dependencies"] = []

        metadata = TaxonomyMetadata(**meta_dict)
        return metadata, body

    def _parse_examples_tables(self, body: str) -> list[tuple[str, str, str]]:
        """Parse markdown tables under ## headers into examples.

        Expected format:
        ## category_name
        | Input | Path | Reasoning |
        |-------|------|-----------|
        | My name is Sarah | profile.personal.identity | identity info |

        Args:
            body: Markdown body content.

        Returns:
            List of (input_text, path, reasoning) tuples.
        """
        examples = []

        # Split by ## headers
        sections = re.split(r"^## (\w+)\s*$", body, flags=re.MULTILINE)

        # sections[0] is content before first ##, then alternating category/content
        for i in range(1, len(sections), 2):
            if i + 1 >= len(sections):
                break

            # category = sections[i]  # Not needed, path includes category
            content = sections[i + 1]

            # Parse table rows
            table_examples = self._parse_table_rows(content)
            examples.extend(table_examples)

        return examples

    def _parse_table_rows(self, content: str) -> list[tuple[str, str, str]]:
        """Parse markdown table rows into example tuples.

        Args:
            content: Content containing a markdown table.

        Returns:
            List of (input, path, reasoning) tuples.
        """
        examples = []
        lines = content.strip().split("\n")

        in_table = False
        for line in lines:
            line = line.strip()

            # Skip empty lines
            if not line:
                continue

            # Skip header row and separator
            if (
                line.startswith("| Input")
                or line.startswith("|--")
                or line.startswith("| ---")
            ):
                in_table = True
                continue

            # Parse data rows
            if in_table and line.startswith("|") and line.endswith("|"):
                cells = [cell.strip() for cell in line.split("|")[1:-1]]
                if len(cells) >= 3:
                    input_text = cells[0]
                    path = cells[1]
                    reasoning = cells[2]
                    if input_text and path:  # Skip empty rows
                        examples.append((input_text, path, reasoning))

        return examples

    def _parse_descriptions_table(self, body: str) -> dict[str, str]:
        """Parse markdown table into category descriptions dict.

        Expected format:
        | Category | Description |
        |----------|-------------|
        | profile | Personal facts... |

        Args:
            body: Markdown body content.

        Returns:
            Dict mapping category to description.
        """
        descriptions = {}
        lines = body.strip().split("\n")

        in_table = False
        for line in lines:
            line = line.strip()

            # Skip empty lines
            if not line:
                continue

            # Skip header row and separator
            if (
                line.startswith("| Category")
                or line.startswith("|--")
                or line.startswith("| ---")
            ):
                in_table = True
                continue

            # Parse data rows
            if in_table and line.startswith("|") and line.endswith("|"):
                cells = [cell.strip() for cell in line.split("|")[1:-1]]
                if len(cells) >= 2:
                    category = cells[0]
                    description = cells[1]
                    if category and description:
                        descriptions[category] = description

        return descriptions

    def _parse_preset_lists(self, body: str) -> dict[str, list[str]]:
        """Parse markdown lists under ## headers into preset paths.

        Expected format:
        ## profile
        - personal.identity
        - personal.demographics

        Args:
            body: Markdown body content.

        Returns:
            Dict mapping category to list of subcategory.type paths.
        """
        paths: dict[str, list[str]] = {}

        # Split by ## headers
        sections = re.split(r"^## (\w+)\s*$", body, flags=re.MULTILINE)

        # sections[0] is content before first ##, then alternating category/content
        for i in range(1, len(sections), 2):
            if i + 1 >= len(sections):
                break

            category = sections[i].strip()
            content = sections[i + 1]

            # Parse bullet list items
            category_paths = []
            for line in content.split("\n"):
                line = line.strip()
                if line.startswith("- "):
                    path = line[2:].strip()
                    if path:
                        category_paths.append(path)

            if category_paths:
                paths[category] = category_paths

        return paths

    def to_dict(self, data: TaxonomyData) -> dict[str, Any]:
        """Convert TaxonomyData to a dictionary for storage.

        Args:
            data: The taxonomy data to convert.

        Returns:
            Dictionary representation suitable for JSON serialization.
        """
        result: dict[str, Any] = {
            "metadata": {
                "type": data.metadata.type,
                "id": data.metadata.id,
                "name": data.metadata.name,
                "domain": data.metadata.domain,
                "version": data.metadata.version,
                "author": data.metadata.author,
                "description": data.metadata.description,
                "tags": data.metadata.tags,
                "dependencies": data.metadata.dependencies,
            }
        }

        if data.metadata.created:
            result["metadata"]["created"] = data.metadata.created
        if data.metadata.updated:
            result["metadata"]["updated"] = data.metadata.updated
        if data.metadata.taxonomy_version:
            result["metadata"]["taxonomy_version"] = data.metadata.taxonomy_version

        if data.examples is not None:
            result["examples"] = [
                {"input": inp, "path": path, "reasoning": reason}
                for inp, path, reason in data.examples
            ]

        if data.descriptions is not None:
            result["descriptions"] = data.descriptions

        if data.paths is not None:
            result["paths"] = data.paths

        return result

    def from_dict(self, data: dict[str, Any]) -> TaxonomyData:
        """Convert a dictionary back to TaxonomyData.

        Args:
            data: Dictionary from storage.

        Returns:
            TaxonomyData instance.
        """
        meta_dict = data["metadata"]
        metadata = TaxonomyMetadata(
            type=meta_dict["type"],
            id=meta_dict["id"],
            name=meta_dict["name"],
            domain=meta_dict.get("domain", "general"),
            version=meta_dict.get("version", "1.0.0"),
            created=meta_dict.get("created"),
            updated=meta_dict.get("updated"),
            author=meta_dict.get("author", "system"),
            description=meta_dict.get("description", ""),
            tags=meta_dict.get("tags", []),
            dependencies=meta_dict.get("dependencies", []),
            taxonomy_version=meta_dict.get("taxonomy_version"),
        )

        examples = None
        if "examples" in data:
            examples = [
                (e["input"], e["path"], e["reasoning"]) for e in data["examples"]
            ]

        descriptions = data.get("descriptions")
        paths = data.get("paths")

        return TaxonomyData(
            metadata=metadata,
            examples=examples,
            descriptions=descriptions,
            paths=paths,
        )

__init__

__init__(encoding: str = 'utf-8')

Initialize the markdown source parser.

Parameters:

Name Type Description Default
encoding str

File encoding to use when reading files.

'utf-8'
Source code in src/memoir/taxonomy/markdown_source.py
def __init__(self, encoding: str = "utf-8"):
    """Initialize the markdown source parser.

    Args:
        encoding: File encoding to use when reading files.
    """
    self.encoding = encoding

load

load(path: Path) -> TaxonomyData

Load and parse a markdown taxonomy file.

Parameters:

Name Type Description Default
path Path

Path to the markdown file.

required

Returns:

Type Description
TaxonomyData

Parsed TaxonomyData.

Raises:

Type Description
TaxonomyParseError

If the file cannot be parsed.

FileNotFoundError

If the file doesn't exist.

Source code in src/memoir/taxonomy/markdown_source.py
def load(self, path: Path) -> TaxonomyData:
    """Load and parse a markdown taxonomy file.

    Args:
        path: Path to the markdown file.

    Returns:
        Parsed TaxonomyData.

    Raises:
        TaxonomyParseError: If the file cannot be parsed.
        FileNotFoundError: If the file doesn't exist.
    """
    if not path.exists():
        raise FileNotFoundError(f"Taxonomy file not found: {path}")

    content = path.read_text(encoding=self.encoding)
    return self.parse(content)

parse

parse(content: str) -> TaxonomyData

Parse markdown content into TaxonomyData.

Parameters:

Name Type Description Default
content str

Raw markdown content.

required

Returns:

Type Description
TaxonomyData

Parsed TaxonomyData.

Raises:

Type Description
TaxonomyParseError

If the content cannot be parsed.

Source code in src/memoir/taxonomy/markdown_source.py
def parse(self, content: str) -> TaxonomyData:
    """Parse markdown content into TaxonomyData.

    Args:
        content: Raw markdown content.

    Returns:
        Parsed TaxonomyData.

    Raises:
        TaxonomyParseError: If the content cannot be parsed.
    """
    metadata, body = self._split_frontmatter(content)

    if metadata.type == "examples":
        examples = self._parse_examples_tables(body)
        return TaxonomyData(metadata=metadata, examples=examples, raw_content=body)
    elif metadata.type == "descriptions":
        descriptions = self._parse_descriptions_table(body)
        return TaxonomyData(
            metadata=metadata, descriptions=descriptions, raw_content=body
        )
    elif metadata.type == "preset":
        paths = self._parse_preset_lists(body)
        return TaxonomyData(metadata=metadata, paths=paths, raw_content=body)
    else:
        raise TaxonomyParseError(f"Unknown taxonomy type: {metadata.type}")

to_dict

to_dict(data: TaxonomyData) -> dict[str, Any]

Convert TaxonomyData to a dictionary for storage.

Parameters:

Name Type Description Default
data TaxonomyData

The taxonomy data to convert.

required

Returns:

Type Description
dict[str, Any]

Dictionary representation suitable for JSON serialization.

Source code in src/memoir/taxonomy/markdown_source.py
def to_dict(self, data: TaxonomyData) -> dict[str, Any]:
    """Convert TaxonomyData to a dictionary for storage.

    Args:
        data: The taxonomy data to convert.

    Returns:
        Dictionary representation suitable for JSON serialization.
    """
    result: dict[str, Any] = {
        "metadata": {
            "type": data.metadata.type,
            "id": data.metadata.id,
            "name": data.metadata.name,
            "domain": data.metadata.domain,
            "version": data.metadata.version,
            "author": data.metadata.author,
            "description": data.metadata.description,
            "tags": data.metadata.tags,
            "dependencies": data.metadata.dependencies,
        }
    }

    if data.metadata.created:
        result["metadata"]["created"] = data.metadata.created
    if data.metadata.updated:
        result["metadata"]["updated"] = data.metadata.updated
    if data.metadata.taxonomy_version:
        result["metadata"]["taxonomy_version"] = data.metadata.taxonomy_version

    if data.examples is not None:
        result["examples"] = [
            {"input": inp, "path": path, "reasoning": reason}
            for inp, path, reason in data.examples
        ]

    if data.descriptions is not None:
        result["descriptions"] = data.descriptions

    if data.paths is not None:
        result["paths"] = data.paths

    return result

from_dict

from_dict(data: dict[str, Any]) -> TaxonomyData

Convert a dictionary back to TaxonomyData.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary from storage.

required

Returns:

Type Description
TaxonomyData

TaxonomyData instance.

Source code in src/memoir/taxonomy/markdown_source.py
def from_dict(self, data: dict[str, Any]) -> TaxonomyData:
    """Convert a dictionary back to TaxonomyData.

    Args:
        data: Dictionary from storage.

    Returns:
        TaxonomyData instance.
    """
    meta_dict = data["metadata"]
    metadata = TaxonomyMetadata(
        type=meta_dict["type"],
        id=meta_dict["id"],
        name=meta_dict["name"],
        domain=meta_dict.get("domain", "general"),
        version=meta_dict.get("version", "1.0.0"),
        created=meta_dict.get("created"),
        updated=meta_dict.get("updated"),
        author=meta_dict.get("author", "system"),
        description=meta_dict.get("description", ""),
        tags=meta_dict.get("tags", []),
        dependencies=meta_dict.get("dependencies", []),
        taxonomy_version=meta_dict.get("taxonomy_version"),
    )

    examples = None
    if "examples" in data:
        examples = [
            (e["input"], e["path"], e["reasoning"]) for e in data["examples"]
        ]

    descriptions = data.get("descriptions")
    paths = data.get("paths")

    return TaxonomyData(
        metadata=metadata,
        examples=examples,
        descriptions=descriptions,
        paths=paths,
    )