Skip to content

memoir package

memoir

Memoir High-performance semantic memory system for AI agents.

ClassificationResult

Bases: BaseModel

Result of semantic classification.

Source code in src/memoir/classifier/semantic.py
class ClassificationResult(BaseModel):
    """Result of semantic classification."""

    primary_path: str = Field(description="Primary taxonomy path for the memory")
    confidence: float = Field(description="Confidence score (0-1)")
    alternative_paths: list[str] = Field(description="Alternative relevant paths")
    reasoning: str = Field(description="Brief reasoning for classification")

SemanticClassifier

Classifies memories into semantic taxonomy paths. Optimized for low-latency classification with caching.

Source code in src/memoir/classifier/semantic.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
class SemanticClassifier:
    """
    Classifies memories into semantic taxonomy paths.
    Optimized for low-latency classification with caching.
    """

    def __init__(
        self,
        llm: Any | None = None,
        taxonomy: TaxonomyInterface | None = None,
        cache_size: int = DEFAULT_CACHE_SIZE,
        use_examples: bool = True,
        fallback_path: str | None = None,
    ):
        """
        Initialize the semantic classifier.

        Args:
            llm: Language model for classification (optional, will use default)
            taxonomy: Taxonomy instance implementing TaxonomyInterface
                     If None, uses default SemanticTaxonomy
            cache_size: Size of the classification cache
            use_examples: Whether to include examples in prompts
            fallback_path: Custom fallback path when classification fails
        """
        self.taxonomy = taxonomy if taxonomy is not None else get_taxonomy()
        self.llm = llm
        self.use_examples = use_examples
        self.fallback_path = fallback_path or self._determine_fallback_path()
        self._cache = {}
        self._setup_classification_prompt()

    def _determine_fallback_path(self) -> str:
        """Determine appropriate fallback path based on available taxonomy."""
        try:
            all_paths = self.taxonomy.get_all_paths()

            # First, try to find the exact default fallback path for backwards compatibility
            if DEFAULT_FALLBACK_PATH in all_paths:
                return DEFAULT_FALLBACK_PATH

            # Try to find a context-related path that's reasonably specific
            context_paths = [path for path in all_paths if path.startswith("context.")]
            if context_paths:
                # Prefer paths with depth similar to the default (4-5 levels)
                preferred_paths = [
                    p for p in context_paths if 4 <= len(p.split(".")) <= 5
                ]
                if preferred_paths:
                    preferred_paths.sort(key=len)
                    return preferred_paths[0]

                # Fallback to any context path (prefer longer ones for backwards compatibility)
                context_paths.sort(key=len, reverse=True)
                return context_paths[0]

            # Try to find any 'other' category
            other_paths = [path for path in all_paths if path.endswith(".other")]
            if other_paths:
                # Prefer shorter 'other' paths
                other_paths.sort(key=len)
                return other_paths[0]

            # Use the first available path as last resort
            if all_paths:
                return all_paths[0]

        except Exception:
            pass

        # Ultimate fallback to the default path
        return DEFAULT_FALLBACK_PATH

    def _get_taxonomy_structure_info(self) -> str:
        """Generate taxonomy structure information for the prompt.

        Includes ALL paths (excluding 'other' paths) to ensure the static section
        meets the minimum token requirement for prompt caching (2048 tokens for Haiku).
        """
        try:
            # All taxonomies should implement TaxonomyInterface
            all_paths = self.taxonomy.get_all_paths()

            if not all_paths:
                return "The taxonomy structure is available but paths could not be enumerated."

            # Filter out 'other' paths for cleaner output (they're implied)
            non_other_paths = [p for p in all_paths if not p.endswith(".other")]

            # Group paths by top-level category for better organization
            categories: dict[str, list[str]] = {}
            for path in non_other_paths:
                parts = path.split(".")
                if parts:
                    category = parts[0]
                    if category not in categories:
                        categories[category] = []
                    categories[category].append(path)

            # Generate structured description with ALL paths for prompt caching
            structure_lines = [
                f"Complete taxonomy hierarchy ({len(non_other_paths)} available paths):",
                "",
            ]

            for category, paths in sorted(categories.items()):
                structure_lines.append(f"## {category.upper()}")
                for path in sorted(paths):
                    structure_lines.append(f"  - {path}")
                structure_lines.append("")

            # Add info about 'other' categories if this is an AdvancedTaxonomy
            if isinstance(self.taxonomy, AdvancedTaxonomyInterface):
                structure_lines.append(
                    "NOTE: Each category also has 'other' subcategories for unclassified content."
                )
                structure_lines.append(
                    "Use 'other' categories when content doesn't fit existing specific paths."
                )

            return "\n".join(structure_lines)

        except Exception as e:
            logger.warning(f"Could not generate taxonomy structure info: {e}")
            return "Taxonomy structure is available. Please classify using the most appropriate path."

    def _is_valid_path(self, path: str) -> bool:
        """Check if a path is valid in the current taxonomy."""
        try:
            # All taxonomies should implement TaxonomyInterface
            return self.taxonomy.is_valid_path(path)
        except Exception as e:
            logger.warning(f"Error validating path {path}: {e}")
            return False

    def _setup_classification_prompt(self):
        """Setup the classification prompt template.

        The prompt is structured with STATIC content FIRST (for prompt caching)
        and DYNAMIC content LAST. This allows LLM providers like Anthropic to
        cache the static prefix and reduce costs by up to 90%.
        """
        # Static content first, dynamic content last for optimal prompt caching
        self.classification_template = """[STATIC_SECTION_START]
You are a semantic memory classifier. Your task is to classify the given memory content into the most appropriate path(s) from the provided taxonomy.

AVAILABLE TAXONOMY STRUCTURE:
{taxonomy_structure}

{examples}

CLASSIFICATION GUIDELINES:
1. Match content to the MOST SPECIFIC appropriate path from the available taxonomy
2. Consider the semantic meaning and context of the content
3. AVOID generic paths like 'context.current' unless content is truly about the current conversation
4. Consider confidence level:
   - High confidence (0.8-1.0): Very specific and accurate path match
   - Medium confidence (0.5-0.7): Reasonable fit but could be broader
   - Low confidence (0.0-0.4): Content is unclear or doesn't fit well
5. When unsure, use the most specific relevant category available in the taxonomy
6. Use 'other' categories when content doesn't fit existing specific paths - this helps the system learn and expand

IMPORTANT:
- Only use paths that exist in the provided taxonomy
- Prefer accuracy over specificity
- Return a valid JSON response with the required fields
- 'Other' categories help the system learn and expand over time

Return your classification as pure JSON (no markdown, no code blocks, just JSON) with:
- primary_path: The best matching taxonomy path (can be an 'other' path)
- confidence: Confidence score from 0 to 1
- alternative_paths: List of other relevant paths (max 3)
- reasoning: Brief explanation of your choice (1-2 sentences)

Think step by step:
1. Can this be clearly categorized into existing paths?
2. If uncertain, what's the closest parent category?
3. Should this go to a specific path or an 'other' category?

CRITICAL: Return ONLY the JSON object, no explanations, no markdown formatting.
[STATIC_SECTION_END]

[DYNAMIC_SECTION_START]
{context_info}

{classification_hints}

MEMORY CONTENT TO CLASSIFY:
{memory_content}
[DYNAMIC_SECTION_END]"""

    def _get_classification_examples(self) -> str:
        """Get few-shot examples for classification."""
        if not self.use_examples:
            return ""

        # Generate dynamic examples based on available taxonomy paths
        examples = self._generate_dynamic_examples()

        examples_text = "EXAMPLES:\n"
        for ex in examples:
            examples_text += f"\nMemory: {ex['memory']}\n"
            examples_text += f"Classification: {ex['path']}\n"
            examples_text += f"Confidence: {ex['confidence']}\n"
            examples_text += f"Reasoning: {ex['reasoning']}\n"

        return examples_text

    def _generate_dynamic_examples(self) -> list[dict]:
        """Generate classification examples dynamically based on available taxonomy."""
        try:
            all_paths = self.taxonomy.get_all_paths()
            if not all_paths:
                return []

            # Select diverse paths for examples (avoid being too specific to any domain)
            example_templates = [
                {
                    "memory": "My name is {example_name} and I'm 28 years old",
                    "pattern": "profile.personal.identity",
                    "confidence": 0.95,
                    "reasoning": "Personal identity information - name and age",
                },
                {
                    "memory": "I work as a software engineer at Google",
                    "pattern": "profile.professional.current",
                    "confidence": 0.90,
                    "reasoning": "Current professional role and company",
                },
                {
                    "memory": "I graduated from MIT with a CS degree",
                    "pattern": "profile.professional.education.formal",
                    "confidence": 0.90,
                    "reasoning": "Formal education history",
                },
                {
                    "memory": "My favorite IDE is {example_tool}",
                    "pattern": "preferences.technology.programming.tools",
                    "confidence": 0.85,
                    "reasoning": "Tool/IDE preference",
                },
                {
                    "memory": "I have 5 years of experience in {example_skill}",
                    "pattern": "profile.professional.skills.technical",
                    "confidence": 0.85,
                    "reasoning": "Professional skill with experience duration",
                },
                {
                    "memory": "I prefer {example_preference} for my morning routine",
                    "pattern": "preferences.personal.lifestyle",
                    "confidence": 0.80,
                    "reasoning": "Personal lifestyle preference",
                },
            ]

            examples = []
            for template in example_templates:
                # Find a suitable path that matches the pattern
                matching_path = self._find_example_path(all_paths, template["pattern"])
                if matching_path:
                    examples.append(
                        {
                            "memory": template["memory"].format(
                                example_name="John Smith",
                                example_tool="VS Code",
                                example_skill="machine learning",
                                example_preference="coffee",
                            ),
                            "path": matching_path,
                            "confidence": template["confidence"],
                            "reasoning": template["reasoning"],
                        }
                    )

            return examples

        except Exception as e:
            logger.warning(f"Could not generate dynamic examples: {e}")
            # Return minimal fallback examples if dynamic generation fails
            return [
                {
                    "memory": "User's name is John Smith",
                    "path": "profile.personal.identity",
                    "confidence": 0.9,
                    "reasoning": "Personal identity information",
                }
            ]

    def _find_example_path(self, all_paths: list[str], pattern: str) -> str | None:
        """Find a suitable taxonomy path for example generation."""
        # Look for paths that contain the pattern
        candidates = [path for path in all_paths if pattern.lower() in path.lower()]

        if candidates:
            # Prefer paths that are not too deep (3-4 levels) and not 'other' categories
            good_candidates = [
                path
                for path in candidates
                if 3 <= len(path.split(".")) <= 4 and "other" not in path
            ]
            if good_candidates:
                return good_candidates[0]
            return candidates[0]

        # Fallback: find any path with appropriate top-level category
        if "identity" in pattern:
            candidates = [path for path in all_paths if path.startswith("profile.")]
        elif "preferences" in pattern:
            candidates = [path for path in all_paths if path.startswith("preferences.")]
        elif "skills" in pattern:
            candidates = [path for path in all_paths if "skill" in path.lower()]
        else:
            # For 'other' pattern, find any 'other' category
            candidates = [path for path in all_paths if path.endswith(".other")]

        return candidates[0] if candidates else None

    def _get_context_info(self, context: dict | None = None) -> str:
        """Format context information for classification."""
        if not context:
            return ""

        context_parts = []
        if "user_id" in context:
            context_parts.append(f"User: {context['user_id']}")
        if "session_id" in context:
            context_parts.append(f"Session: {context['session_id']}")
        if "timestamp" in context:
            context_parts.append(f"Time: {context['timestamp']}")
        if "conversation_topic" in context:
            context_parts.append(f"Topic: {context['conversation_topic']}")
        if "available_memory_paths" in context:
            paths = context["available_memory_paths"]
            if paths:
                context_parts.append("AVAILABLE STORED MEMORY PATHS:")
                context_parts.append(
                    "You should prioritize matching to these existing paths:"
                )
                for path in sorted(paths):
                    context_parts.append(f"  - {path}")
                context_parts.append(
                    "If the query relates to stored memories, try to match one of these paths."
                )

        if context_parts:
            return "CONTEXT:\n" + "\n".join(context_parts)
        return ""

    def _compute_cache_key(
        self, memory_content: str, context: dict | None = None
    ) -> str:
        """Compute a cache key for the classification."""
        content_hash = hashlib.sha256(memory_content.encode()).hexdigest()
        context_str = json.dumps(context, sort_keys=True) if context else ""
        context_hash = hashlib.sha256(context_str.encode()).hexdigest()
        return f"{content_hash}:{context_hash}"

    async def classify_async(
        self,
        memory_content: str,
        context: dict | None = None,
        use_cache: bool = True,
    ) -> ClassificationResult:
        """
        Classify memory content into taxonomy path asynchronously.

        Args:
            memory_content: The memory content to classify
            context: Optional context information
            use_cache: Whether to use cached results

        Returns:
            ClassificationResult with path and metadata
        """
        # Check cache
        if use_cache:
            cache_key = self._compute_cache_key(memory_content, context)
            if cache_key in self._cache:
                # logger.debug(f"Cache hit for classification: {cache_key}")
                pass
                return self._cache[cache_key]

        # Get iterative taxonomy hints to include in prompt
        classification_hints = ""
        if hasattr(self.taxonomy, "get_classification_hints"):
            hints = self.taxonomy.get_classification_hints(memory_content)
            if hints.get("suggested_paths") or hints.get("expansion_candidates"):
                classification_hints = "\nCLASSIFICATION HINTS:\n"
                if hints.get("suggested_paths"):
                    classification_hints += f"Similar content previously found in: {', '.join(hints['suggested_paths'][:3])}\n"
                if hints.get("expansion_candidates"):
                    candidates = [
                        f"{item['path']} ({item['item_count']} items)"
                        for item in hints["expansion_candidates"][:3]
                    ]
                    classification_hints += (
                        f"Paths ready for expansion: {', '.join(candidates)}\n"
                    )
                classification_hints += (
                    "Consider these hints when choosing the most appropriate path.\n"
                )

        # Prepare prompt
        prompt_vars = {
            "memory_content": memory_content,
            "context_info": self._get_context_info(context),
            "taxonomy_structure": self._get_taxonomy_structure_info(),
            "examples": self._get_classification_examples(),
            "classification_hints": classification_hints,
        }

        # Run classification
        try:
            if self.llm:
                # Use provided LLM
                prompt_text = self.classification_template.format(**prompt_vars)
                response = await self.llm.ainvoke(prompt_text)

                # Extract content from response
                if hasattr(response, "content"):
                    content = response.content
                elif isinstance(response, str):
                    content = response
                else:
                    content = str(response)

                # Clean up the response - handle markdown code blocks
                content = content.strip()
                if "```json" in content:
                    # Extract JSON from markdown code block
                    start = content.find("```json") + 7
                    end = content.find("```", start)
                    if end > start:
                        content = content[start:end].strip()
                elif "```" in content:
                    # Extract from generic code block
                    start = content.find("```") + 3
                    end = content.find("```", start)
                    if end > start:
                        content = content[start:end].strip()

                # Parse JSON
                result_dict = json.loads(content)
            else:
                # No LLM provided - must have one for production use
                raise ValueError(
                    "No LLM provided for classification. Cannot classify without language model."
                )

            result = ClassificationResult(**result_dict)

            # Get classification hints from iterative taxonomy before processing
            hints = None
            if hasattr(self.taxonomy, "get_classification_hints"):
                hints = self.taxonomy.get_classification_hints(memory_content)

                # Apply hints to improve classification
                if hints.get("suggested_paths"):
                    # If LLM suggested a path that matches a hint, boost confidence
                    if result.primary_path in hints["suggested_paths"]:
                        result.confidence = min(1.0, result.confidence + 0.1)

                    # If no good match but we have suggestions, consider the best suggestion
                    elif result.confidence < 0.6 and hints["suggested_paths"]:
                        best_suggestion = hints["suggested_paths"][0]
                        if self._is_valid_path(best_suggestion):
                            result.alternative_paths.insert(0, best_suggestion)
                            result.reasoning += (
                                f" (Hint: similar content found in {best_suggestion})"
                            )

            # Use advanced taxonomy logic if available
            if isinstance(self.taxonomy, AdvancedTaxonomyInterface):
                # Advanced taxonomy (e.g., DynamicTaxonomy) - use smart path selection
                selected_path, final_confidence = (
                    self.taxonomy.select_path_with_fallback(
                        classification_result=result,
                        memory_content=memory_content,
                        metadata=context.get("metadata") if context else None,
                    )
                )

                # Update result with advanced taxonomy's selection
                result.primary_path = selected_path
                result.confidence = final_confidence

            else:
                # Standard taxonomy - just validate paths
                if not self._is_valid_path(result.primary_path):
                    # Find closest valid path
                    result.primary_path = self._find_closest_valid_path(
                        result.primary_path
                    )

            # Track the classification in iterative taxonomy for learning
            if hasattr(self.taxonomy, "track_classification"):
                expansion_triggered = self.taxonomy.track_classification(
                    result.primary_path,
                    memory_content,
                    {
                        "confidence": result.confidence,
                        "reasoning": result.reasoning,
                        "alternatives": result.alternative_paths,
                        "hints_used": hints is not None,
                    },
                )

                if expansion_triggered:
                    # logger.info(
                    #     f"Triggered taxonomy expansion for path: {result.primary_path}"
                    # )
                    pass

            # Cache result
            if use_cache:
                self._cache[cache_key] = result

            return result

        except Exception as e:
            logger.error(f"Classification failed: {e}")
            # Return fallback classification
            return self._fallback_classification(memory_content)

    def classify(
        self,
        memory_content: str,
        context: dict | None = None,
        use_cache: bool = True,
    ) -> ClassificationResult:
        """
        Synchronous version of classify_async.
        """
        import asyncio

        return asyncio.run(self.classify_async(memory_content, context, use_cache))

    def _find_closest_valid_path(self, invalid_path: str) -> str:
        """Find the closest valid path in the taxonomy."""
        parts = invalid_path.split(".")

        # Try progressively shorter paths
        for i in range(len(parts), 0, -1):
            test_path = ".".join(parts[:i])
            if self._is_valid_path(test_path):
                return test_path

        # Fallback to configured fallback path, but validate it exists first
        if self._is_valid_path(self.fallback_path):
            return self.fallback_path

        # Ultimate fallback: find any valid path from the first category
        all_paths = self.taxonomy.get_all_paths()
        if all_paths:
            return all_paths[0]

        # Should never reach here if taxonomy is properly initialized
        raise RuntimeError("No valid paths found in taxonomy")

    def _fallback_classification(self, memory_content: str) -> ClassificationResult:
        """Provide a fallback classification when normal classification fails."""
        fallback_path = self._find_closest_valid_path(self.fallback_path)
        return ClassificationResult(
            primary_path=fallback_path,
            confidence=0.5,
            alternative_paths=[],
            reasoning="Fallback classification due to processing error",
        )

    def batch_classify(
        self, memories: list[str], context: dict | None = None
    ) -> list[ClassificationResult]:
        """
        Classify multiple memories in batch.

        Args:
            memories: List of memory contents to classify
            context: Optional shared context

        Returns:
            List of ClassificationResults
        """
        results = []
        for memory in memories:
            result = self.classify(memory, context)
            results.append(result)
        return results

    def get_statistics(self) -> dict:
        """Get classifier statistics."""
        # Get taxonomy path count using the interface
        try:
            path_count = len(self.taxonomy.get_all_paths())
        except Exception:
            path_count = 0

        return {
            "cache_size": len(self._cache),
            "taxonomy_paths": path_count,
            "taxonomy_type": type(self.taxonomy).__name__,
            "categories": len(list(TaxonomyCategory)),
        }

__init__

__init__(llm: Any | None = None, taxonomy: TaxonomyInterface | None = None, cache_size: int = DEFAULT_CACHE_SIZE, use_examples: bool = True, fallback_path: str | None = None)

Initialize the semantic classifier.

Parameters:

Name Type Description Default
llm Any | None

Language model for classification (optional, will use default)

None
taxonomy TaxonomyInterface | None

Taxonomy instance implementing TaxonomyInterface If None, uses default SemanticTaxonomy

None
cache_size int

Size of the classification cache

DEFAULT_CACHE_SIZE
use_examples bool

Whether to include examples in prompts

True
fallback_path str | None

Custom fallback path when classification fails

None
Source code in src/memoir/classifier/semantic.py
def __init__(
    self,
    llm: Any | None = None,
    taxonomy: TaxonomyInterface | None = None,
    cache_size: int = DEFAULT_CACHE_SIZE,
    use_examples: bool = True,
    fallback_path: str | None = None,
):
    """
    Initialize the semantic classifier.

    Args:
        llm: Language model for classification (optional, will use default)
        taxonomy: Taxonomy instance implementing TaxonomyInterface
                 If None, uses default SemanticTaxonomy
        cache_size: Size of the classification cache
        use_examples: Whether to include examples in prompts
        fallback_path: Custom fallback path when classification fails
    """
    self.taxonomy = taxonomy if taxonomy is not None else get_taxonomy()
    self.llm = llm
    self.use_examples = use_examples
    self.fallback_path = fallback_path or self._determine_fallback_path()
    self._cache = {}
    self._setup_classification_prompt()

classify_async async

classify_async(memory_content: str, context: dict | None = None, use_cache: bool = True) -> ClassificationResult

Classify memory content into taxonomy path asynchronously.

Parameters:

Name Type Description Default
memory_content str

The memory content to classify

required
context dict | None

Optional context information

None
use_cache bool

Whether to use cached results

True

Returns:

Type Description
ClassificationResult

ClassificationResult with path and metadata

Source code in src/memoir/classifier/semantic.py
async def classify_async(
    self,
    memory_content: str,
    context: dict | None = None,
    use_cache: bool = True,
) -> ClassificationResult:
    """
    Classify memory content into taxonomy path asynchronously.

    Args:
        memory_content: The memory content to classify
        context: Optional context information
        use_cache: Whether to use cached results

    Returns:
        ClassificationResult with path and metadata
    """
    # Check cache
    if use_cache:
        cache_key = self._compute_cache_key(memory_content, context)
        if cache_key in self._cache:
            # logger.debug(f"Cache hit for classification: {cache_key}")
            pass
            return self._cache[cache_key]

    # Get iterative taxonomy hints to include in prompt
    classification_hints = ""
    if hasattr(self.taxonomy, "get_classification_hints"):
        hints = self.taxonomy.get_classification_hints(memory_content)
        if hints.get("suggested_paths") or hints.get("expansion_candidates"):
            classification_hints = "\nCLASSIFICATION HINTS:\n"
            if hints.get("suggested_paths"):
                classification_hints += f"Similar content previously found in: {', '.join(hints['suggested_paths'][:3])}\n"
            if hints.get("expansion_candidates"):
                candidates = [
                    f"{item['path']} ({item['item_count']} items)"
                    for item in hints["expansion_candidates"][:3]
                ]
                classification_hints += (
                    f"Paths ready for expansion: {', '.join(candidates)}\n"
                )
            classification_hints += (
                "Consider these hints when choosing the most appropriate path.\n"
            )

    # Prepare prompt
    prompt_vars = {
        "memory_content": memory_content,
        "context_info": self._get_context_info(context),
        "taxonomy_structure": self._get_taxonomy_structure_info(),
        "examples": self._get_classification_examples(),
        "classification_hints": classification_hints,
    }

    # Run classification
    try:
        if self.llm:
            # Use provided LLM
            prompt_text = self.classification_template.format(**prompt_vars)
            response = await self.llm.ainvoke(prompt_text)

            # Extract content from response
            if hasattr(response, "content"):
                content = response.content
            elif isinstance(response, str):
                content = response
            else:
                content = str(response)

            # Clean up the response - handle markdown code blocks
            content = content.strip()
            if "```json" in content:
                # Extract JSON from markdown code block
                start = content.find("```json") + 7
                end = content.find("```", start)
                if end > start:
                    content = content[start:end].strip()
            elif "```" in content:
                # Extract from generic code block
                start = content.find("```") + 3
                end = content.find("```", start)
                if end > start:
                    content = content[start:end].strip()

            # Parse JSON
            result_dict = json.loads(content)
        else:
            # No LLM provided - must have one for production use
            raise ValueError(
                "No LLM provided for classification. Cannot classify without language model."
            )

        result = ClassificationResult(**result_dict)

        # Get classification hints from iterative taxonomy before processing
        hints = None
        if hasattr(self.taxonomy, "get_classification_hints"):
            hints = self.taxonomy.get_classification_hints(memory_content)

            # Apply hints to improve classification
            if hints.get("suggested_paths"):
                # If LLM suggested a path that matches a hint, boost confidence
                if result.primary_path in hints["suggested_paths"]:
                    result.confidence = min(1.0, result.confidence + 0.1)

                # If no good match but we have suggestions, consider the best suggestion
                elif result.confidence < 0.6 and hints["suggested_paths"]:
                    best_suggestion = hints["suggested_paths"][0]
                    if self._is_valid_path(best_suggestion):
                        result.alternative_paths.insert(0, best_suggestion)
                        result.reasoning += (
                            f" (Hint: similar content found in {best_suggestion})"
                        )

        # Use advanced taxonomy logic if available
        if isinstance(self.taxonomy, AdvancedTaxonomyInterface):
            # Advanced taxonomy (e.g., DynamicTaxonomy) - use smart path selection
            selected_path, final_confidence = (
                self.taxonomy.select_path_with_fallback(
                    classification_result=result,
                    memory_content=memory_content,
                    metadata=context.get("metadata") if context else None,
                )
            )

            # Update result with advanced taxonomy's selection
            result.primary_path = selected_path
            result.confidence = final_confidence

        else:
            # Standard taxonomy - just validate paths
            if not self._is_valid_path(result.primary_path):
                # Find closest valid path
                result.primary_path = self._find_closest_valid_path(
                    result.primary_path
                )

        # Track the classification in iterative taxonomy for learning
        if hasattr(self.taxonomy, "track_classification"):
            expansion_triggered = self.taxonomy.track_classification(
                result.primary_path,
                memory_content,
                {
                    "confidence": result.confidence,
                    "reasoning": result.reasoning,
                    "alternatives": result.alternative_paths,
                    "hints_used": hints is not None,
                },
            )

            if expansion_triggered:
                # logger.info(
                #     f"Triggered taxonomy expansion for path: {result.primary_path}"
                # )
                pass

        # Cache result
        if use_cache:
            self._cache[cache_key] = result

        return result

    except Exception as e:
        logger.error(f"Classification failed: {e}")
        # Return fallback classification
        return self._fallback_classification(memory_content)

classify

classify(memory_content: str, context: dict | None = None, use_cache: bool = True) -> ClassificationResult

Synchronous version of classify_async.

Source code in src/memoir/classifier/semantic.py
def classify(
    self,
    memory_content: str,
    context: dict | None = None,
    use_cache: bool = True,
) -> ClassificationResult:
    """
    Synchronous version of classify_async.
    """
    import asyncio

    return asyncio.run(self.classify_async(memory_content, context, use_cache))

batch_classify

batch_classify(memories: list[str], context: dict | None = None) -> list[ClassificationResult]

Classify multiple memories in batch.

Parameters:

Name Type Description Default
memories list[str]

List of memory contents to classify

required
context dict | None

Optional shared context

None

Returns:

Type Description
list[ClassificationResult]

List of ClassificationResults

Source code in src/memoir/classifier/semantic.py
def batch_classify(
    self, memories: list[str], context: dict | None = None
) -> list[ClassificationResult]:
    """
    Classify multiple memories in batch.

    Args:
        memories: List of memory contents to classify
        context: Optional shared context

    Returns:
        List of ClassificationResults
    """
    results = []
    for memory in memories:
        result = self.classify(memory, context)
        results.append(result)
    return results

get_statistics

get_statistics() -> dict

Get classifier statistics.

Source code in src/memoir/classifier/semantic.py
def get_statistics(self) -> dict:
    """Get classifier statistics."""
    # Get taxonomy path count using the interface
    try:
        path_count = len(self.taxonomy.get_all_paths())
    except Exception:
        path_count = 0

    return {
        "cache_size": len(self._cache),
        "taxonomy_paths": path_count,
        "taxonomy_type": type(self.taxonomy).__name__,
        "categories": len(list(TaxonomyCategory)),
    }

LangGraphMemoryStore

Bases: BaseStore, BaseIntegration

LangGraph-compatible memory store implementation using Memoir.

This adapter allows LangGraph agents to use Memoir's Git-like versioned memory system as a drop-in replacement for the standard memory store.

Source code in src/memoir/integration/langgraph/memory_store.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
class LangGraphMemoryStore(BaseStore, BaseIntegration):
    """LangGraph-compatible memory store implementation using Memoir.

    This adapter allows LangGraph agents to use Memoir's Git-like versioned
    memory system as a drop-in replacement for the standard memory store.
    """

    def __init__(
        self,
        config: MemoryConfig | None = None,
        llm: Any | None = None,
    ):
        """Initialize the LangGraph memory store.

        Args:
            config: Memory configuration settings
            llm: Optional LLM instance for intelligent features
        """
        config = config or MemoryConfig()
        BaseIntegration.__init__(self, config.to_dict())

        self.memory_config = config
        self.llm = llm

        # Initialize components
        self._init_storage()
        self._init_taxonomy_loader()
        self._init_taxonomy()
        self._init_search()

        # Track namespaces and branches
        self._namespaces: dict[str, str] = {}  # namespace -> branch mapping
        self._current_namespace = config.namespace

    def _init_storage(self) -> None:
        """Initialize the storage layer."""
        # LangGraph integration auto-creates the store on first use, like
        # LangMem's BaseStore. ProllyTreeStore itself is strict, so go
        # through StoreService.create_store to bootstrap when absent.
        from memoir.services.store_service import StoreService

        StoreService(self.memory_config.storage_path).create_store(
            self.memory_config.storage_path
        )
        self.store = ProllyTreeStore(
            path=self.memory_config.storage_path,
            enable_versioning=self.memory_config.enable_versioning,
        )

        # Memory manager will be initialized after search engine
        self.memory_manager = None

    def _init_taxonomy_loader(self) -> None:
        """Initialize the taxonomy loader and ensure taxonomy is in store."""
        self.taxonomy_loader = TaxonomyLoader(self.store)

        # Initialize taxonomy if not already present
        if not self.taxonomy_loader.has_taxonomy_in_store():
            logger.info("Initializing taxonomy in store...")
            self.taxonomy_loader.init_store(include_builtin=True)

    def _init_taxonomy(self) -> None:
        """Initialize the taxonomy system based on configuration."""
        taxonomy_type = self.memory_config.taxonomy_type

        if taxonomy_type == "fixed":
            self.taxonomy = SemanticTaxonomy()
            self.classifier = None
        elif taxonomy_type == "iterative" and self.llm:
            self.taxonomy = LLMIterativeTaxonomy(llm=self.llm)
            self.classifier = None
        elif taxonomy_type == "intelligent" and self.llm:
            # IntelligentClassifier manages its own taxonomy internally
            self.classifier = IntelligentClassifier(
                llm=self.llm,
                memory_store=None,  # Will be set later if needed
                taxonomy_loader=self.taxonomy_loader,
            )
            self.taxonomy = SemanticTaxonomy()  # Fallback for search
        else:
            # Fallback to fixed taxonomy
            self.taxonomy = SemanticTaxonomy()
            self.classifier = None

    def _init_search(self) -> None:
        """Initialize the search engine and complete memory manager setup."""
        if self.llm:
            self.search_engine = IntelligentSearchEngine(
                llm=self.llm,
                store=self.store,
                taxonomy_loader=self.taxonomy_loader,
            )
        else:
            # Fallback to a simple search if no LLM
            self.search_engine = None

        # Now initialize memory manager with all dependencies.
        # Lazy import: requires the `langmem` extra.
        try:
            from memoir.core.memory import ProllyTreeMemoryStoreManager
        except ImportError as e:
            raise ImportError(
                "LangGraphMemoryStore requires the 'langmem' extra. "
                "Install with: pip install 'memoir-ai[langmem]'"
            ) from e

        self.memory_manager = ProllyTreeMemoryStoreManager(
            prolly_store=self.store,
            classifier=getattr(self, "classifier", None),
            search_engine=self.search_engine,
        )

    async def initialize(self) -> None:
        """Initialize the store for async operations."""
        if not self._initialized:
            # Initialize async components if needed
            if hasattr(self.memory_manager, "initialize"):
                await self.memory_manager.initialize()
            self._initialized = True

    async def close(self) -> None:
        """Clean up resources."""
        if self._initialized:
            if hasattr(self.memory_manager, "close"):
                await self.memory_manager.close()
            self._initialized = False

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        # For sync context manager, just pass through
        pass

    # LangGraph BaseStore implementation

    async def abatch(self, ops: Sequence[Op]) -> list[Result]:
        """Execute a batch of operations.

        Args:
            ops: Sequence of operations to execute

        Returns:
            List of operation results
        """
        results = []

        for op in ops:
            try:
                if op.op == "put":
                    await self._put_items(op.namespace, op.items)
                    results.append(None)  # Successful put returns None
                elif op.op == "search":
                    items = await self._search_items(
                        op.namespace,
                        query=op.query,
                        limit=op.limit,
                    )
                    results.append(items)
                elif op.op == "delete":
                    await self._delete_items(op.namespace, op.keys)
                    results.append(None)  # Successful delete returns None
                else:
                    raise ValueError(f"Unknown operation: {op.op}")
            except Exception as e:
                logger.error(f"Operation failed: {e}")
                results.append(None)

        return results

    async def _put_items(
        self,
        namespace: NamespacePath,
        items: list[Item],
    ) -> None:
        """Store items in the memory system.

        Args:
            namespace: Namespace path for the items
            items: Items to store
        """
        # Ensure namespace branch exists
        self._get_or_create_branch(namespace)

        for item in items:
            # Convert Item to MemoryEntry
            memory_entry = self._item_to_memory_entry(item, namespace)

            # Store using memory manager
            # Combine namespace into a string for the memory manager
            namespace_str = ".".join(namespace)

            # Add thread_id and user_id to metadata
            full_metadata = memory_entry.metadata.copy()
            if memory_entry.thread_id:
                full_metadata["thread_id"] = memory_entry.thread_id
            if memory_entry.user_id:
                full_metadata["user_id"] = memory_entry.user_id

            memory_id = await self.memory_manager.store_memory(
                content=memory_entry.content,
                namespace=namespace_str,
                metadata=full_metadata,
            )

            # Store mapping of item key to memory ID only if memory_id is valid
            if item.key and memory_id:
                await self._store_key_mapping(namespace, item.key, memory_id)

    async def _search_items(
        self,
        namespace: NamespacePath,
        query: str | None = None,
        limit: int = 10,
    ) -> list[Item]:
        """Search for items in the memory system.

        Args:
            namespace: Namespace to search in
            query: Search query
            limit: Maximum number of results

        Returns:
            List of matching items
        """
        # Switch to namespace branch
        self._get_or_create_branch(namespace)
        # Note: ProllyTreeStore doesn't have checkout method
        # Branch management would need to be handled differently

        if query:
            # Perform semantic search
            namespace_str = ".".join(namespace)
            results = await self.memory_manager.search_memories(
                query=query,
                namespace=namespace_str,
                limit=limit,
            )

            # Convert results to Items
            # Check format of results (might be Memory objects)
            items = []
            for result in results:
                if hasattr(result, "content"):
                    # Memory object
                    content = result.content
                    metadata = result.metadata if hasattr(result, "metadata") else {}
                elif isinstance(result, dict):
                    # Dict format
                    content = result.get("content", "")
                    metadata = result.get("metadata", {})
                else:
                    content = str(result)
                    metadata = {}

                items.append(self._memory_to_item(content, metadata, namespace))
        else:
            # Return recent items from namespace
            items = await self._get_recent_items(namespace, limit)

        return items

    async def _delete_items(
        self,
        namespace: NamespacePath,
        keys: list[str],
    ) -> None:
        """Delete items from the memory system.

        Args:
            namespace: Namespace containing the items
            keys: Keys of items to delete
        """
        self._get_or_create_branch(namespace)
        # Branch operations would be handled by the underlying store if needed

        for key in keys:
            # Get memory ID from key mapping
            memory_id = await self._get_memory_id_from_key(namespace, key)
            if memory_id:
                # For now, just remove the mapping
                # Full deletion would require semantic path resolution
                pass

        # Commit if versioning is enabled
        if self.memory_config.enable_versioning:
            self.store.commit(f"Deleted {len(keys)} items from {namespace}")

    def batch(self, ops: Sequence[Op]) -> list[Result]:
        """Synchronous batch operations (delegates to async)."""
        return asyncio.run(self.abatch(ops))

    async def aget(
        self,
        namespace: NamespacePath,
        key: str,
    ) -> Item | None:
        """Get a single item by key.

        Args:
            namespace: Namespace containing the item
            key: Item key

        Returns:
            The item if found, None otherwise
        """
        try:
            # For now, try to get from mappings
            memory_id = await self._get_memory_id_from_key(namespace, key)
            if memory_id:
                # Try to retrieve from store using namespace + key
                # This is a simplified implementation
                data = self.store.get(namespace, key)
                if data:
                    return self._memory_to_item(
                        data.get("content", ""),
                        data.get("metadata", {}),
                        namespace,
                    )
            return None

        except Exception as e:
            logger.error(f"Failed to get item: {e}")
            return None

    def get(
        self,
        namespace: NamespacePath,
        key: str,
    ) -> Item | None:
        """Synchronous get (delegates to async)."""
        return asyncio.run(self.aget(namespace, key))

    async def asearch(
        self,
        namespace: NamespacePath,
        *,
        query: str | None = None,
        limit: int = 10,
        offset: int = 0,
    ) -> list[Item]:
        """Async search for items.

        Args:
            namespace: Namespace to search in
            query: Optional search query
            limit: Maximum results
            offset: Result offset

        Returns:
            List of matching items
        """
        items = await self._search_items(namespace, query, limit + offset)
        # Apply offset
        return items[offset : offset + limit]

    def search(
        self,
        namespace: NamespacePath,
        *,
        query: str | None = None,
        limit: int = 10,
        offset: int = 0,
    ) -> list[Item]:
        """Synchronous search (delegates to async)."""
        return asyncio.run(
            self.asearch(namespace, query=query, limit=limit, offset=offset)
        )

    async def aput(
        self,
        namespace: NamespacePath,
        key: str,
        value: Any,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Store a single item.

        Args:
            namespace: Namespace for the item
            key: Item key
            value: Item value
            metadata: Optional metadata
        """
        # Merge metadata into value for Item
        if isinstance(value, dict):
            value_with_metadata = {**value, "metadata": metadata or {}}
        else:
            value_with_metadata = {"content": value, "metadata": metadata or {}}

        item = Item(
            key=key,
            value=value_with_metadata,
            namespace=namespace,
            created_at=datetime.now(),
            updated_at=datetime.now(),
        )

        await self._put_items(namespace, [item])

    def put(
        self,
        namespace: NamespacePath,
        key: str,
        value: Any,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Synchronous put (delegates to async)."""
        asyncio.run(self.aput(namespace, key, value, metadata))

    async def adelete(
        self,
        namespace: NamespacePath,
        key: str,
    ) -> None:
        """Delete a single item.

        Args:
            namespace: Namespace containing the item
            key: Item key
        """
        await self._delete_items(namespace, [key])

    def delete(
        self,
        namespace: NamespacePath,
        key: str,
    ) -> None:
        """Synchronous delete (delegates to async)."""
        asyncio.run(self.adelete(namespace, key))

    # Helper methods

    def _get_or_create_branch(self, namespace: NamespacePath) -> str:
        """Get or create a branch for the namespace.

        Args:
            namespace: Namespace path

        Returns:
            Branch name
        """
        namespace_str = str(namespace)

        if namespace_str not in self._namespaces:
            # Create branch name from namespace
            branch_name = namespace_str.replace("/", "_").replace(".", "_")
            self._namespaces[namespace_str] = branch_name

            # Create branch if it doesn't exist
            with contextlib.suppress(Exception):
                asyncio.run(self.store.create_branch(branch_name))

        return self._namespaces[namespace_str]

    def _item_to_memory_entry(
        self,
        item: Item,
        namespace: NamespacePath,
    ) -> MemoryEntry:
        """Convert LangGraph Item to MemoryEntry.

        Args:
            item: LangGraph item
            namespace: Namespace path

        Returns:
            MemoryEntry
        """
        # Extract content and metadata from value
        if isinstance(item.value, dict):
            content = item.value.get("content", str(item.value))
            metadata = item.value.get("metadata", {})
        else:
            content = str(item.value)
            metadata = {}

        # Add namespace and key to metadata
        metadata["namespace"] = str(namespace)
        metadata["key"] = item.key

        return MemoryEntry(
            content=content,
            metadata=metadata,
            timestamp=item.created_at or datetime.now(),
            memory_id=item.key,
        )

    def _memory_to_item(
        self,
        content: str,
        metadata: dict[str, Any],
        namespace: NamespacePath | None = None,
    ) -> Item:
        """Convert memory data to LangGraph Item.

        Args:
            content: Memory content
            metadata: Memory metadata
            namespace: Optional namespace for the item

        Returns:
            LangGraph Item
        """
        return Item(
            key=metadata.get("key", ""),
            value={"content": content, "metadata": metadata},
            namespace=namespace or (),
            created_at=metadata.get("timestamp", datetime.now()),
            updated_at=metadata.get("updated_at", datetime.now()),
        )

    async def _store_key_mapping(
        self,
        namespace: NamespacePath,
        key: str,
        memory_id: str,
    ) -> None:
        """Store mapping between item key and memory ID.

        Args:
            namespace: Namespace path
            key: Item key
            memory_id: Memory ID
        """
        # Store in a special mappings namespace
        mapping_key = f"{'.'.join(namespace)}.{key}"
        self.store.put(
            namespace=("_mappings",), key=mapping_key, value={"memory_id": memory_id}
        )

    async def _get_memory_id_from_key(
        self,
        namespace: NamespacePath,
        key: str,
    ) -> str | None:
        """Get memory ID from item key.

        Args:
            namespace: Namespace path
            key: Item key

        Returns:
            Memory ID if found
        """
        mapping_key = f"{'.'.join(namespace)}.{key}"
        data = self.store.get(namespace=("_mappings",), key=mapping_key)
        return data.get("memory_id") if data else None

    async def _get_semantic_path(self, memory_id: str) -> str | None:
        """Get semantic path for a memory ID.

        Args:
            memory_id: Memory ID

        Returns:
            Semantic path if found
        """
        # Look up in index
        data = self.store.get(namespace=("_index", "memory_ids"), key=memory_id)
        return data.get("semantic_path") if data else None

    async def _get_recent_items(
        self,
        namespace: NamespacePath,
        limit: int,
    ) -> list[Item]:
        """Get recent items from a namespace.

        Args:
            namespace: Namespace path
            limit: Maximum number of items

        Returns:
            List of recent items
        """
        # Get all items from namespace using prefix search
        items = []

        # This is a simplified implementation
        # In production, you'd want to maintain a proper index
        return items[:limit]

__init__

__init__(config: MemoryConfig | None = None, llm: Any | None = None)

Initialize the LangGraph memory store.

Parameters:

Name Type Description Default
config MemoryConfig | None

Memory configuration settings

None
llm Any | None

Optional LLM instance for intelligent features

None
Source code in src/memoir/integration/langgraph/memory_store.py
def __init__(
    self,
    config: MemoryConfig | None = None,
    llm: Any | None = None,
):
    """Initialize the LangGraph memory store.

    Args:
        config: Memory configuration settings
        llm: Optional LLM instance for intelligent features
    """
    config = config or MemoryConfig()
    BaseIntegration.__init__(self, config.to_dict())

    self.memory_config = config
    self.llm = llm

    # Initialize components
    self._init_storage()
    self._init_taxonomy_loader()
    self._init_taxonomy()
    self._init_search()

    # Track namespaces and branches
    self._namespaces: dict[str, str] = {}  # namespace -> branch mapping
    self._current_namespace = config.namespace

initialize async

initialize() -> None

Initialize the store for async operations.

Source code in src/memoir/integration/langgraph/memory_store.py
async def initialize(self) -> None:
    """Initialize the store for async operations."""
    if not self._initialized:
        # Initialize async components if needed
        if hasattr(self.memory_manager, "initialize"):
            await self.memory_manager.initialize()
        self._initialized = True

close async

close() -> None

Clean up resources.

Source code in src/memoir/integration/langgraph/memory_store.py
async def close(self) -> None:
    """Clean up resources."""
    if self._initialized:
        if hasattr(self.memory_manager, "close"):
            await self.memory_manager.close()
        self._initialized = False

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in src/memoir/integration/langgraph/memory_store.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit."""
    # For sync context manager, just pass through
    pass

abatch async

abatch(ops: Sequence[Op]) -> list[Result]

Execute a batch of operations.

Parameters:

Name Type Description Default
ops Sequence[Op]

Sequence of operations to execute

required

Returns:

Type Description
list[Result]

List of operation results

Source code in src/memoir/integration/langgraph/memory_store.py
async def abatch(self, ops: Sequence[Op]) -> list[Result]:
    """Execute a batch of operations.

    Args:
        ops: Sequence of operations to execute

    Returns:
        List of operation results
    """
    results = []

    for op in ops:
        try:
            if op.op == "put":
                await self._put_items(op.namespace, op.items)
                results.append(None)  # Successful put returns None
            elif op.op == "search":
                items = await self._search_items(
                    op.namespace,
                    query=op.query,
                    limit=op.limit,
                )
                results.append(items)
            elif op.op == "delete":
                await self._delete_items(op.namespace, op.keys)
                results.append(None)  # Successful delete returns None
            else:
                raise ValueError(f"Unknown operation: {op.op}")
        except Exception as e:
            logger.error(f"Operation failed: {e}")
            results.append(None)

    return results

batch

batch(ops: Sequence[Op]) -> list[Result]

Synchronous batch operations (delegates to async).

Source code in src/memoir/integration/langgraph/memory_store.py
def batch(self, ops: Sequence[Op]) -> list[Result]:
    """Synchronous batch operations (delegates to async)."""
    return asyncio.run(self.abatch(ops))

aget async

aget(namespace: NamespacePath, key: str) -> Item | None

Get a single item by key.

Parameters:

Name Type Description Default
namespace NamespacePath

Namespace containing the item

required
key str

Item key

required

Returns:

Type Description
Item | None

The item if found, None otherwise

Source code in src/memoir/integration/langgraph/memory_store.py
async def aget(
    self,
    namespace: NamespacePath,
    key: str,
) -> Item | None:
    """Get a single item by key.

    Args:
        namespace: Namespace containing the item
        key: Item key

    Returns:
        The item if found, None otherwise
    """
    try:
        # For now, try to get from mappings
        memory_id = await self._get_memory_id_from_key(namespace, key)
        if memory_id:
            # Try to retrieve from store using namespace + key
            # This is a simplified implementation
            data = self.store.get(namespace, key)
            if data:
                return self._memory_to_item(
                    data.get("content", ""),
                    data.get("metadata", {}),
                    namespace,
                )
        return None

    except Exception as e:
        logger.error(f"Failed to get item: {e}")
        return None

get

get(namespace: NamespacePath, key: str) -> Item | None

Synchronous get (delegates to async).

Source code in src/memoir/integration/langgraph/memory_store.py
def get(
    self,
    namespace: NamespacePath,
    key: str,
) -> Item | None:
    """Synchronous get (delegates to async)."""
    return asyncio.run(self.aget(namespace, key))

asearch async

asearch(namespace: NamespacePath, *, query: str | None = None, limit: int = 10, offset: int = 0) -> list[Item]

Async search for items.

Parameters:

Name Type Description Default
namespace NamespacePath

Namespace to search in

required
query str | None

Optional search query

None
limit int

Maximum results

10
offset int

Result offset

0

Returns:

Type Description
list[Item]

List of matching items

Source code in src/memoir/integration/langgraph/memory_store.py
async def asearch(
    self,
    namespace: NamespacePath,
    *,
    query: str | None = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Item]:
    """Async search for items.

    Args:
        namespace: Namespace to search in
        query: Optional search query
        limit: Maximum results
        offset: Result offset

    Returns:
        List of matching items
    """
    items = await self._search_items(namespace, query, limit + offset)
    # Apply offset
    return items[offset : offset + limit]

search

search(namespace: NamespacePath, *, query: str | None = None, limit: int = 10, offset: int = 0) -> list[Item]

Synchronous search (delegates to async).

Source code in src/memoir/integration/langgraph/memory_store.py
def search(
    self,
    namespace: NamespacePath,
    *,
    query: str | None = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Item]:
    """Synchronous search (delegates to async)."""
    return asyncio.run(
        self.asearch(namespace, query=query, limit=limit, offset=offset)
    )

aput async

aput(namespace: NamespacePath, key: str, value: Any, metadata: dict[str, Any] | None = None) -> None

Store a single item.

Parameters:

Name Type Description Default
namespace NamespacePath

Namespace for the item

required
key str

Item key

required
value Any

Item value

required
metadata dict[str, Any] | None

Optional metadata

None
Source code in src/memoir/integration/langgraph/memory_store.py
async def aput(
    self,
    namespace: NamespacePath,
    key: str,
    value: Any,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Store a single item.

    Args:
        namespace: Namespace for the item
        key: Item key
        value: Item value
        metadata: Optional metadata
    """
    # Merge metadata into value for Item
    if isinstance(value, dict):
        value_with_metadata = {**value, "metadata": metadata or {}}
    else:
        value_with_metadata = {"content": value, "metadata": metadata or {}}

    item = Item(
        key=key,
        value=value_with_metadata,
        namespace=namespace,
        created_at=datetime.now(),
        updated_at=datetime.now(),
    )

    await self._put_items(namespace, [item])

put

put(namespace: NamespacePath, key: str, value: Any, metadata: dict[str, Any] | None = None) -> None

Synchronous put (delegates to async).

Source code in src/memoir/integration/langgraph/memory_store.py
def put(
    self,
    namespace: NamespacePath,
    key: str,
    value: Any,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Synchronous put (delegates to async)."""
    asyncio.run(self.aput(namespace, key, value, metadata))

adelete async

adelete(namespace: NamespacePath, key: str) -> None

Delete a single item.

Parameters:

Name Type Description Default
namespace NamespacePath

Namespace containing the item

required
key str

Item key

required
Source code in src/memoir/integration/langgraph/memory_store.py
async def adelete(
    self,
    namespace: NamespacePath,
    key: str,
) -> None:
    """Delete a single item.

    Args:
        namespace: Namespace containing the item
        key: Item key
    """
    await self._delete_items(namespace, [key])

delete

delete(namespace: NamespacePath, key: str) -> None

Synchronous delete (delegates to async).

Source code in src/memoir/integration/langgraph/memory_store.py
def delete(
    self,
    namespace: NamespacePath,
    key: str,
) -> None:
    """Synchronous delete (delegates to async)."""
    asyncio.run(self.adelete(namespace, key))

MemoryConfig dataclass

Configuration for LangGraph memory integration.

Source code in src/memoir/integration/langgraph/types.py
@dataclass
class MemoryConfig:
    """Configuration for LangGraph memory integration."""

    # Memoir-specific settings
    storage_path: str = "./memoir_storage"
    taxonomy_type: str = "intelligent"  # "fixed", "iterative", or "intelligent"
    enable_versioning: bool = True
    enable_search_cache: bool = True

    # LangGraph compatibility settings
    namespace: str = "default"
    max_search_results: int = 10
    similarity_threshold: float = 0.7

    # LLM settings for intelligent features
    llm_provider: str | None = None  # "openai", "anthropic", etc.
    llm_model: str | None = None
    api_key: str | None = None

    # Performance settings
    batch_size: int = 100
    async_operations: bool = True
    compression_enabled: bool = False

    def to_dict(self) -> dict[str, Any]:
        """Convert config to dictionary."""
        return {
            "storage_path": self.storage_path,
            "taxonomy_type": self.taxonomy_type,
            "enable_versioning": self.enable_versioning,
            "enable_search_cache": self.enable_search_cache,
            "namespace": self.namespace,
            "max_search_results": self.max_search_results,
            "similarity_threshold": self.similarity_threshold,
            "llm_provider": self.llm_provider,
            "llm_model": self.llm_model,
            "batch_size": self.batch_size,
            "async_operations": self.async_operations,
            "compression_enabled": self.compression_enabled,
        }

to_dict

to_dict() -> dict[str, Any]

Convert config to dictionary.

Source code in src/memoir/integration/langgraph/types.py
def to_dict(self) -> dict[str, Any]:
    """Convert config to dictionary."""
    return {
        "storage_path": self.storage_path,
        "taxonomy_type": self.taxonomy_type,
        "enable_versioning": self.enable_versioning,
        "enable_search_cache": self.enable_search_cache,
        "namespace": self.namespace,
        "max_search_results": self.max_search_results,
        "similarity_threshold": self.similarity_threshold,
        "llm_provider": self.llm_provider,
        "llm_model": self.llm_model,
        "batch_size": self.batch_size,
        "async_operations": self.async_operations,
        "compression_enabled": self.compression_enabled,
    }

LocationMemento

Manages user location data and generates geographic event summaries.

Source code in src/memoir/memento/location.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
class LocationMemento:
    """Manages user location data and generates geographic event summaries."""

    def __init__(self, memory_store):
        """Initialize location memento with memory store."""
        self.memory_store = memory_store

    async def apply_location_events(
        self,
        location_events: list[dict[str, str]],
        metadata: dict | None = None,
        namespace: str = "default",
    ) -> None:
        """
        Apply location events to the memory store.

        For same-location events, retrieves existing content and merges with new event.

        Args:
            location_events: List of location events with location and description
            metadata: Optional metadata to include with events
            namespace: Namespace to store location events in (default: "default")
        """
        logger.debug(
            f"LocationManager.apply_location_events called with {len(location_events) if location_events else 0} events"
        )
        if not location_events:
            logger.debug("No location events provided to apply_location_events")
            return

        for event in location_events:
            location_name = event.get("location", "")
            description = event.get("description", "")

            if not location_name or not description:
                logger.warning(f"Invalid location event: {event}")
                continue

            # Normalize location name for consistent storage
            normalized_location = self._normalize_location_name(location_name)

            if not normalized_location:
                logger.debug(f"Invalid location name: {location_name}")
                continue

            # Create the location path
            location_path = f"location.{normalized_location}"

            try:
                await self._store_or_merge_location_event(
                    location_path, description, metadata, namespace
                )
                logger.debug(f"Applied location event: {location_path} - {description}")
            except Exception as e:
                logger.error(f"Failed to apply location event {location_path}: {e}")

    def _normalize_location_name(self, location_name: str) -> str:
        """
        Normalize location name for consistent storage.

        Args:
            location_name: Raw location name from LLM

        Returns:
            Normalized location name suitable for path storage
        """
        if not location_name or not isinstance(location_name, str):
            return ""

        # Clean and normalize the location name
        # Remove extra whitespace and convert to lowercase
        normalized = location_name.strip().lower()

        # Replace spaces and special characters with underscores
        normalized = re.sub(
            r"[^\w\s-]", "", normalized
        )  # Remove special chars except spaces and hyphens
        normalized = re.sub(
            r"[\s-]+", "_", normalized
        )  # Replace spaces/hyphens with underscores
        normalized = re.sub(r"_+", "_", normalized)  # Collapse multiple underscores
        normalized = normalized.strip("_")  # Remove leading/trailing underscores

        # Handle common location patterns and abbreviations
        location_mappings = {
            "new_york_city": "new_york_city",
            "nyc": "new_york_city",
            "ny": "new_york",
            "california": "california",
            "ca": "california",
            "san_francisco": "san_francisco",
            "sf": "san_francisco",
            "los_angeles": "los_angeles",
            "la": "los_angeles",
            "united_states": "united_states",
            "usa": "united_states",
            "us": "united_states",
        }

        # Apply mappings if available
        if normalized in location_mappings:
            normalized = location_mappings[normalized]

        # Ensure minimum length and validity
        if len(normalized) < 2:
            return ""

        return normalized

    async def _store_or_merge_location_event(
        self,
        location_path: str,
        description: str,
        metadata: dict | None = None,
        namespace: str = "default",
    ) -> None:
        """
        Store location event or merge with existing location events.

        Args:
            location_path: Storage path for the location (e.g., "location.san_francisco")
            description: Event description
            metadata: Optional metadata
            namespace: Namespace to store location data in (default: "default")
        """
        # namespace parameter is passed to function

        # Check if location already has events
        existing_items = await self.memory_store.asearch(namespace, location_path)

        if existing_items:
            # Merge with existing location events
            _, existing_data = existing_items[0]

            if isinstance(existing_data, str):
                existing_content = existing_data
            elif isinstance(existing_data, dict):
                existing_content = existing_data.get("raw_text", "")
            else:
                existing_content = str(existing_data)

            # Merge descriptions, avoiding duplicates
            merged_content = self._merge_location_descriptions(
                existing_content, description
            )

            content = {
                "raw_text": merged_content,
                "summary": f"Location events at {location_path.split('.')[1].replace('_', ' ').title()}",
                "structured_data": {
                    "location_name": location_path.split(".")[1]
                    .replace("_", " ")
                    .title(),
                    "location_content": merged_content,
                    "update_type": "location_event",
                },
                "memory_type": "location_event",
            }
        else:
            # Create new location event
            content = {
                "raw_text": description,
                "summary": f"Location event at {location_path.split('.')[1].replace('_', ' ').title()}",
                "structured_data": {
                    "location_name": location_path.split(".")[1]
                    .replace("_", " ")
                    .title(),
                    "location_content": description,
                    "update_type": "location_event",
                },
                "memory_type": "location_event",
            }

        # Include metadata if provided
        if metadata:
            content["metadata"] = metadata

        # Store the location event
        logger.debug(
            f"About to call store_memory_async with namespace='{namespace}', path='{location_path}'"
        )
        logger.debug(f"Content to store: {content}")

        result = await self.memory_store.store_memory_async(
            namespace, content, location_path
        )
        logger.debug(f"store_memory_async returned: {result}")

        # Debug: immediately test if we can find what we just stored
        try:
            test_search = await self.memory_store.asearch(namespace, location_path)
            logger.debug(
                f"Immediate search for '{location_path}' found {len(test_search)} items"
            )
            if test_search:
                logger.debug(f"Found item: {test_search[0]}")

            # Also try searching with prefix
            prefix_search = await self.memory_store.asearch(namespace, "location.")
            logger.debug(
                f"Prefix search for 'location.' found {len(prefix_search)} items"
            )

        except Exception as e:
            logger.debug(f"Immediate search test failed: {e}")

    def _merge_location_descriptions(self, existing: str, new: str) -> str:
        """
        Merge location event descriptions, avoiding duplicates.

        Args:
            existing: Existing location event descriptions
            new: New location event description

        Returns:
            Merged location descriptions
        """
        if not existing:
            return new

        if not new:
            return existing

        # Split by common delimiters
        existing_events = [
            event.strip() for event in existing.split("|") if event.strip()
        ]

        # Check if new event is already present (fuzzy matching)
        new_lower = new.lower()
        for existing_event in existing_events:
            if existing_event.lower() == new_lower:
                return existing  # Duplicate, return existing

        # Add new event
        existing_events.append(new.strip())
        return " | ".join(existing_events)

    async def get_location_summary(
        self, llm: Any | None = None, namespace: str = "default"
    ) -> str:
        """
        Generate a summary of all location events.

        Args:
            llm: Optional LLM for generating narrative summaries
            namespace: Namespace to search for location data (default: "default")

        Returns:
            String summary of location events
        """
        try:
            # namespace parameter is passed to function

            # Search for all location events
            logger.debug(
                f"Searching for location events with query: namespace='{namespace}', prefix='location.'"
            )
            all_items = await self.memory_store.asearch(namespace, "location.")
            logger.debug(f"Search returned {len(all_items)} items")

            # Debug: log what we found
            if all_items:
                logger.info(f"Found {len(all_items)} items with location. prefix")
                for item in all_items[:3]:  # Log first few items
                    logger.info(f"Location item: {item}")
            else:
                logger.debug("No items found with location. prefix")

                # Debug: search for ANY items with location data
                logger.debug("Searching for ANY items with location data...")
                all_items_debug = await self.memory_store.asearch(namespace, "")
                location_items_debug = []
                for path, data in all_items_debug:
                    if isinstance(data, dict) and (
                        data.get("memory_type") == "location_event"
                        or "location_name" in data.get("structured_data", {})
                    ):
                        location_items_debug.append((path, data))
                        logger.debug(f"Found location data under path: {path}")

                if location_items_debug:
                    logger.debug(
                        f"Found {len(location_items_debug)} location events but not under location.* paths!"
                    )
                    return self._generate_structured_location_summary(
                        location_items_debug
                    )
                else:
                    logger.debug("No location events found anywhere in memory store!")

            location_items = all_items  # All items should already have location. prefix

            if not location_items:
                return "No location events available."

            # If no LLM provided, generate structured summary
            if not llm:
                return self._generate_structured_location_summary(location_items)

            # Generate LLM-based narrative summary
            return await self._generate_llm_location_summary(location_items, llm)

        except Exception as e:
            logger.error(f"Failed to generate location summary: {e}")
            logger.error(f"Exception details: {type(e).__name__}: {e!s}")
            import traceback

            logger.error(f"Traceback: {traceback.format_exc()}")
            return "Error generating location summary."

    def _generate_structured_location_summary(self, location_items: list) -> str:
        """Generate a structured location summary without LLM."""
        summary_lines = ["=== USER LOCATION SUMMARY ===", ""]

        # Group and sort locations
        locations = {}
        for path, data in location_items:
            location_name = path.split(".", 1)[1].replace("_", " ").title()

            # Handle nested memory object structure from asearch results
            if isinstance(data, dict):
                # Check if this is a nested memory object with 'content' field
                if "content" in data and isinstance(data["content"], dict):
                    # Extract from nested structure: data['content']['raw_text']
                    content = data["content"].get("raw_text", str(data))
                else:
                    # Direct structure: data['raw_text']
                    content = data.get("raw_text", str(data))
            else:
                content = str(data)

            locations[location_name] = content

        # Sort locations alphabetically
        for location_name in sorted(locations.keys()):
            content = locations[location_name]
            summary_lines.append(f"{location_name}:")

            # Split multiple events and format nicely
            events = content.split(" | ")
            for event in events:
                if event.strip():
                    summary_lines.append(f"  - {event.strip()}")
            summary_lines.append("")

        return "\n".join(summary_lines)

    async def _generate_llm_location_summary(
        self, location_items: list, llm: Any
    ) -> str:
        """Generate an LLM-based narrative location summary."""
        # Prepare location data for LLM
        location_data = []
        for path, data in location_items:
            location_name = path.split(".", 1)[1].replace("_", " ").title()

            if isinstance(data, dict):
                content = data.get("raw_text", str(data))
            else:
                content = str(data)

            location_data.append(f"{location_name}: {content}")

        location_text = "\n".join(location_data)

        prompt = f"""Create a concise narrative summary of the user's location-related experiences and activities. Focus on places they've been, lived, worked, or had significant experiences.

Location Data:
{location_text}

Create a narrative summary that:
1. Groups related locations geographically when possible
2. Highlights significant places and experiences
3. Shows patterns in the user's movements or preferences
4. Keeps the summary concise but informative

Location Summary:"""

        try:
            response = await llm.ainvoke(prompt)
            return response.content.strip()
        except Exception as e:
            logger.error(f"LLM location summary failed: {e}")
            return self._generate_structured_location_summary(location_items)

    async def get_location_events_for_search(
        self, location_query: str, namespace: str = "default"
    ) -> list[dict]:
        """
        Get location events relevant to a search query.

        Args:
            location_query: Search query for locations
            namespace: Namespace to search for location data (default: "default")

        Returns:
            List of relevant location events
        """
        try:
            # namespace parameter is passed to function

            # Search for location events
            all_items = await self.memory_store.asearch(namespace, "location.")
            location_items = [
                (path, data) for path, data in all_items if path.startswith("location.")
            ]

            # Filter by relevance to query
            relevant_events = []
            query_lower = location_query.lower()

            for path, data in location_items:
                location_name = path.split(".", 1)[1].replace("_", " ")

                if isinstance(data, dict):
                    content = data.get("raw_text", str(data))
                else:
                    content = str(data)

                # Check if query matches location name or content
                if (
                    query_lower in location_name.lower()
                    or query_lower in content.lower()
                ):
                    relevant_events.append(
                        {
                            "location": location_name.title(),
                            "content": content,
                            "path": path,
                        }
                    )

            return relevant_events

        except Exception as e:
            logger.error(f"Failed to get location events for search: {e}")
            return []

__init__

__init__(memory_store)

Initialize location memento with memory store.

Source code in src/memoir/memento/location.py
def __init__(self, memory_store):
    """Initialize location memento with memory store."""
    self.memory_store = memory_store

apply_location_events async

apply_location_events(location_events: list[dict[str, str]], metadata: dict | None = None, namespace: str = 'default') -> None

Apply location events to the memory store.

For same-location events, retrieves existing content and merges with new event.

Parameters:

Name Type Description Default
location_events list[dict[str, str]]

List of location events with location and description

required
metadata dict | None

Optional metadata to include with events

None
namespace str

Namespace to store location events in (default: "default")

'default'
Source code in src/memoir/memento/location.py
async def apply_location_events(
    self,
    location_events: list[dict[str, str]],
    metadata: dict | None = None,
    namespace: str = "default",
) -> None:
    """
    Apply location events to the memory store.

    For same-location events, retrieves existing content and merges with new event.

    Args:
        location_events: List of location events with location and description
        metadata: Optional metadata to include with events
        namespace: Namespace to store location events in (default: "default")
    """
    logger.debug(
        f"LocationManager.apply_location_events called with {len(location_events) if location_events else 0} events"
    )
    if not location_events:
        logger.debug("No location events provided to apply_location_events")
        return

    for event in location_events:
        location_name = event.get("location", "")
        description = event.get("description", "")

        if not location_name or not description:
            logger.warning(f"Invalid location event: {event}")
            continue

        # Normalize location name for consistent storage
        normalized_location = self._normalize_location_name(location_name)

        if not normalized_location:
            logger.debug(f"Invalid location name: {location_name}")
            continue

        # Create the location path
        location_path = f"location.{normalized_location}"

        try:
            await self._store_or_merge_location_event(
                location_path, description, metadata, namespace
            )
            logger.debug(f"Applied location event: {location_path} - {description}")
        except Exception as e:
            logger.error(f"Failed to apply location event {location_path}: {e}")

get_location_summary async

get_location_summary(llm: Any | None = None, namespace: str = 'default') -> str

Generate a summary of all location events.

Parameters:

Name Type Description Default
llm Any | None

Optional LLM for generating narrative summaries

None
namespace str

Namespace to search for location data (default: "default")

'default'

Returns:

Type Description
str

String summary of location events

Source code in src/memoir/memento/location.py
async def get_location_summary(
    self, llm: Any | None = None, namespace: str = "default"
) -> str:
    """
    Generate a summary of all location events.

    Args:
        llm: Optional LLM for generating narrative summaries
        namespace: Namespace to search for location data (default: "default")

    Returns:
        String summary of location events
    """
    try:
        # namespace parameter is passed to function

        # Search for all location events
        logger.debug(
            f"Searching for location events with query: namespace='{namespace}', prefix='location.'"
        )
        all_items = await self.memory_store.asearch(namespace, "location.")
        logger.debug(f"Search returned {len(all_items)} items")

        # Debug: log what we found
        if all_items:
            logger.info(f"Found {len(all_items)} items with location. prefix")
            for item in all_items[:3]:  # Log first few items
                logger.info(f"Location item: {item}")
        else:
            logger.debug("No items found with location. prefix")

            # Debug: search for ANY items with location data
            logger.debug("Searching for ANY items with location data...")
            all_items_debug = await self.memory_store.asearch(namespace, "")
            location_items_debug = []
            for path, data in all_items_debug:
                if isinstance(data, dict) and (
                    data.get("memory_type") == "location_event"
                    or "location_name" in data.get("structured_data", {})
                ):
                    location_items_debug.append((path, data))
                    logger.debug(f"Found location data under path: {path}")

            if location_items_debug:
                logger.debug(
                    f"Found {len(location_items_debug)} location events but not under location.* paths!"
                )
                return self._generate_structured_location_summary(
                    location_items_debug
                )
            else:
                logger.debug("No location events found anywhere in memory store!")

        location_items = all_items  # All items should already have location. prefix

        if not location_items:
            return "No location events available."

        # If no LLM provided, generate structured summary
        if not llm:
            return self._generate_structured_location_summary(location_items)

        # Generate LLM-based narrative summary
        return await self._generate_llm_location_summary(location_items, llm)

    except Exception as e:
        logger.error(f"Failed to generate location summary: {e}")
        logger.error(f"Exception details: {type(e).__name__}: {e!s}")
        import traceback

        logger.error(f"Traceback: {traceback.format_exc()}")
        return "Error generating location summary."
get_location_events_for_search(location_query: str, namespace: str = 'default') -> list[dict]

Get location events relevant to a search query.

Parameters:

Name Type Description Default
location_query str

Search query for locations

required
namespace str

Namespace to search for location data (default: "default")

'default'

Returns:

Type Description
list[dict]

List of relevant location events

Source code in src/memoir/memento/location.py
async def get_location_events_for_search(
    self, location_query: str, namespace: str = "default"
) -> list[dict]:
    """
    Get location events relevant to a search query.

    Args:
        location_query: Search query for locations
        namespace: Namespace to search for location data (default: "default")

    Returns:
        List of relevant location events
    """
    try:
        # namespace parameter is passed to function

        # Search for location events
        all_items = await self.memory_store.asearch(namespace, "location.")
        location_items = [
            (path, data) for path, data in all_items if path.startswith("location.")
        ]

        # Filter by relevance to query
        relevant_events = []
        query_lower = location_query.lower()

        for path, data in location_items:
            location_name = path.split(".", 1)[1].replace("_", " ")

            if isinstance(data, dict):
                content = data.get("raw_text", str(data))
            else:
                content = str(data)

            # Check if query matches location name or content
            if (
                query_lower in location_name.lower()
                or query_lower in content.lower()
            ):
                relevant_events.append(
                    {
                        "location": location_name.title(),
                        "content": content,
                        "path": path,
                    }
                )

        return relevant_events

    except Exception as e:
        logger.error(f"Failed to get location events for search: {e}")
        return []

ProfileMemento

Manages user profile data and generates profile summaries.

Source code in src/memoir/memento/profile.py
class ProfileMemento:
    """Manages user profile data and generates profile summaries."""

    def __init__(self, memory_store):
        """Initialize profile memento with memory store."""
        self.memory_store = memory_store

    async def apply_profile_updates(
        self,
        profile_updates: list[dict[str, str]],
        metadata: dict | None = None,
        namespace: str = "default",
    ) -> None:
        """
        Apply profile updates to the memory store.

        Args:
            profile_updates: List of profile updates with path and value
            metadata: Optional metadata to include with updates
            namespace: Namespace to store the profile updates in (default: "default")
        """
        if not profile_updates:
            return

        for update in profile_updates:
            path = update.get("path", "")
            value = update.get("value", "")

            if not path or not value:
                logger.warning(f"Invalid profile update: {update}")
                continue

            # Check if this is a profile path
            if not path.startswith("profile."):
                logger.warning(f"Non-profile path in profile update: {path}")
                continue

            # Store the profile update as a memory with special handling
            memory_data = {
                "raw_text": value,
                "summary": f"Profile update: {path.split('.')[-1]} = {value}",
                "structured_data": {
                    "profile_field": path,
                    "profile_value": value,
                    "update_type": "profile_update",
                },
                "memory_type": "profile_update",
            }

            # Store directly using the async method (consistent with timeline manager)
            await self.memory_store.store_memory_async(namespace, memory_data, path)
            logger.info(f"Applied profile update: {path} = {value}")

    async def get_profile_summary(self, llm=None, namespace: str = "default") -> str:
        """
        Generate a comprehensive profile summary from stored profile data.

        Args:
            llm: Optional LLM for generating narrative summary
            namespace: Namespace to search for profile data (default: "default")

        Returns:
            Profile summary string
        """
        try:
            # Search for all profile memories using the correct method signature
            # Use provided namespace string as expected by asearch method
            profile_memories = await self.memory_store.asearch(namespace, "profile.")

            # Debug: log what we found
            logger.debug(f"Found {len(profile_memories)} profile memories")

            # Limit results manually if needed
            if len(profile_memories) > 1000:
                profile_memories = profile_memories[:1000]

            if not profile_memories:
                return "No profile information available."

            # Organize profile data by category
            profile_data = self._organize_profile_data(profile_memories)

            # Generate summary
            if llm:
                return await self._generate_llm_summary(profile_data, llm)
            else:
                return self._generate_structured_summary(profile_data)

        except Exception as e:
            import traceback

            logger.error(f"Failed to generate profile summary: {e}")
            logger.error(f"Full traceback: {traceback.format_exc()}")
            return f"Error generating profile summary: {e}"

    def _organize_profile_data(
        self, profile_memories: list[tuple[str, Any]]
    ) -> dict[str, dict[str, str]]:
        """Organize profile memories into a structured hierarchy."""
        organized = {}

        for semantic_key, data in profile_memories:
            try:
                # Ensure semantic_key is a string
                if not isinstance(semantic_key, str):
                    logger.warning(
                        f"Non-string semantic key: {type(semantic_key)}: {semantic_key}"
                    )
                    semantic_key = str(semantic_key)

                # Handle the data format - it could be a MemoryItem dict or other format
                if isinstance(data, dict):
                    # Check if this is a MemoryItem structure with content field
                    if "content" in data and isinstance(data["content"], dict):
                        # This is a MemoryItem with content - extract the actual memory data
                        memory_data = data["content"]
                        structured_data = memory_data.get("structured_data", {})
                    else:
                        # This is the memory data directly
                        memory_data = data
                        structured_data = data.get("structured_data", {})
                else:
                    # If it's not a dict, try to extract meaningful data
                    logger.warning(
                        f"Unexpected data format for {semantic_key}: {type(data)}"
                    )
                    continue

                # Get the profile path and value
                profile_field = structured_data.get("profile_field")
                profile_value = structured_data.get("profile_value")
                update_type = structured_data.get("update_type")

                # Only process memories that are actual profile updates
                if update_type != "profile_update":
                    logger.debug(f"Skipping non-profile-update memory: {semantic_key}")
                    continue

                if not profile_field or not profile_value:
                    logger.warning(
                        f"Profile update memory missing field or value: {semantic_key}"
                    )
                    continue

                # Ensure profile_field is a string
                if not isinstance(profile_field, str):
                    logger.warning(
                        f"Non-string profile_field: {type(profile_field)}: {profile_field}"
                    )
                    profile_field = (
                        str(profile_field)
                        if profile_field is not None
                        else semantic_key
                    )

                if profile_field and profile_value:
                    # Convert profile_value to string if it's not already
                    if isinstance(profile_value, dict):
                        # If it's a dict, convert to JSON string
                        import json

                        profile_value_str = json.dumps(profile_value)
                    elif isinstance(profile_value, (list, tuple)):
                        # If it's a list/tuple, join as string
                        profile_value_str = ", ".join(str(x) for x in profile_value)
                    else:
                        profile_value_str = str(profile_value)

                    # Build nested dictionary structure
                    parts = profile_field.split(".")
                    current = organized

                    # Navigate to the correct nested position
                    for part in parts[:-1]:  # All except the last part
                        # Ensure part is a string
                        part = str(part) if part is not None else "unknown"
                        if part not in current:
                            current[part] = {}
                        current = current[part]

                    # Set the final value as string
                    final_key = str(parts[-1]) if parts[-1] is not None else "unknown"
                    current[final_key] = profile_value_str

            except Exception as e:
                logger.warning(f"Failed to process profile memory {semantic_key}: {e}")
                continue

        return organized

    def _generate_structured_summary(self, profile_data: dict[str, Any]) -> str:
        """Generate a structured text summary of profile data."""
        if not profile_data:
            return "No profile information available."

        summary_parts = ["=== USER PROFILE SUMMARY ===\n"]

        # Process each main category
        category_order = [
            ("personal", "Personal Information"),
            ("professional", "Professional Profile"),
            ("health", "Health & Wellness"),
            ("finance", "Financial Profile"),
            ("living", "Living Situation"),
            ("relationships", "Relationships & Social"),
            ("goals", "Goals & Aspirations"),
        ]

        for key, title in category_order:
            if key in profile_data:
                summary_parts.append(f"\n{title}:")
                summary_parts.append(
                    self._format_category_data(profile_data[key], indent=1)
                )

        # Add any other categories not in the standard order
        processed_keys = {key for key, _ in category_order}
        for key, data in profile_data.items():
            if key not in processed_keys:
                title = key.replace("_", " ").title()
                summary_parts.append(f"\n{title}:")
                summary_parts.append(self._format_category_data(data, indent=1))

        return "\n".join(summary_parts)

    def _format_category_data(self, data: dict[str, Any], indent: int = 0) -> str:
        """Format category data with proper indentation."""
        if not data:
            return "  " * indent + "No information available"

        lines = []
        prefix = "  " * indent

        for key, value in data.items():
            if isinstance(value, dict):
                # Nested category
                category_title = key.replace("_", " ").title()
                lines.append(f"{prefix}{category_title}:")
                lines.append(self._format_category_data(value, indent + 1))
            else:
                # Leaf value
                field_name = key.replace("_", " ").title()
                lines.append(f"{prefix}- {field_name}: {value}")

        return "\n".join(lines)

    async def _generate_llm_summary(self, profile_data: dict[str, Any], llm) -> str:
        """Generate a narrative summary using LLM."""
        try:
            # Convert profile data to a readable format for LLM
            structured_summary = self._generate_structured_summary(profile_data)

            prompt = f"""Generate a comprehensive, narrative profile summary based on the following structured profile data. Create a natural, flowing description that captures the key aspects of this person's life, background, and characteristics.

Profile Data:
{structured_summary}

Instructions:
- Write in third person
- Create a cohesive narrative that flows naturally
- Focus on the most important and defining characteristics
- Group related information together logically
- Keep it comprehensive but concise (2-3 paragraphs)
- Avoid simply listing facts - weave them into a story

Generate a professional profile summary:"""

            response = await llm.ainvoke(prompt)

            if hasattr(response, "content"):
                narrative_summary = response.content
            else:
                narrative_summary = str(response)

            # Combine structured and narrative summaries
            return f"=== USER PROFILE SUMMARY ===\n\n{narrative_summary}\n\n--- Detailed Profile Data ---\n{structured_summary}"

        except Exception as e:
            logger.error(f"Failed to generate LLM summary: {e}")
            # Fallback to structured summary
            return self._generate_structured_summary(profile_data)

__init__

__init__(memory_store)

Initialize profile memento with memory store.

Source code in src/memoir/memento/profile.py
def __init__(self, memory_store):
    """Initialize profile memento with memory store."""
    self.memory_store = memory_store

apply_profile_updates async

apply_profile_updates(profile_updates: list[dict[str, str]], metadata: dict | None = None, namespace: str = 'default') -> None

Apply profile updates to the memory store.

Parameters:

Name Type Description Default
profile_updates list[dict[str, str]]

List of profile updates with path and value

required
metadata dict | None

Optional metadata to include with updates

None
namespace str

Namespace to store the profile updates in (default: "default")

'default'
Source code in src/memoir/memento/profile.py
async def apply_profile_updates(
    self,
    profile_updates: list[dict[str, str]],
    metadata: dict | None = None,
    namespace: str = "default",
) -> None:
    """
    Apply profile updates to the memory store.

    Args:
        profile_updates: List of profile updates with path and value
        metadata: Optional metadata to include with updates
        namespace: Namespace to store the profile updates in (default: "default")
    """
    if not profile_updates:
        return

    for update in profile_updates:
        path = update.get("path", "")
        value = update.get("value", "")

        if not path or not value:
            logger.warning(f"Invalid profile update: {update}")
            continue

        # Check if this is a profile path
        if not path.startswith("profile."):
            logger.warning(f"Non-profile path in profile update: {path}")
            continue

        # Store the profile update as a memory with special handling
        memory_data = {
            "raw_text": value,
            "summary": f"Profile update: {path.split('.')[-1]} = {value}",
            "structured_data": {
                "profile_field": path,
                "profile_value": value,
                "update_type": "profile_update",
            },
            "memory_type": "profile_update",
        }

        # Store directly using the async method (consistent with timeline manager)
        await self.memory_store.store_memory_async(namespace, memory_data, path)
        logger.info(f"Applied profile update: {path} = {value}")

get_profile_summary async

get_profile_summary(llm=None, namespace: str = 'default') -> str

Generate a comprehensive profile summary from stored profile data.

Parameters:

Name Type Description Default
llm

Optional LLM for generating narrative summary

None
namespace str

Namespace to search for profile data (default: "default")

'default'

Returns:

Type Description
str

Profile summary string

Source code in src/memoir/memento/profile.py
async def get_profile_summary(self, llm=None, namespace: str = "default") -> str:
    """
    Generate a comprehensive profile summary from stored profile data.

    Args:
        llm: Optional LLM for generating narrative summary
        namespace: Namespace to search for profile data (default: "default")

    Returns:
        Profile summary string
    """
    try:
        # Search for all profile memories using the correct method signature
        # Use provided namespace string as expected by asearch method
        profile_memories = await self.memory_store.asearch(namespace, "profile.")

        # Debug: log what we found
        logger.debug(f"Found {len(profile_memories)} profile memories")

        # Limit results manually if needed
        if len(profile_memories) > 1000:
            profile_memories = profile_memories[:1000]

        if not profile_memories:
            return "No profile information available."

        # Organize profile data by category
        profile_data = self._organize_profile_data(profile_memories)

        # Generate summary
        if llm:
            return await self._generate_llm_summary(profile_data, llm)
        else:
            return self._generate_structured_summary(profile_data)

    except Exception as e:
        import traceback

        logger.error(f"Failed to generate profile summary: {e}")
        logger.error(f"Full traceback: {traceback.format_exc()}")
        return f"Error generating profile summary: {e}"

TimelineMemento

Manages user timeline data and generates chronological event summaries.

Source code in src/memoir/memento/timeline.py
class TimelineMemento:
    """Manages user timeline data and generates chronological event summaries."""

    def __init__(self, memory_store):
        """Initialize timeline memento with memory store."""
        self.memory_store = memory_store

    async def apply_timeline_events(
        self,
        timeline_events: list[dict[str, str]],
        metadata: dict | None = None,
        original_content: str | None = None,
        namespace: str = "default",
    ) -> None:
        """
        Apply timeline events to the memory store.

        For same-day events, retrieves existing content and merges with new event.

        Args:
            timeline_events: List of timeline events with date and description
            metadata: Optional metadata to include with events
            namespace: Namespace to store timeline events in (default: "default")
        """
        if not timeline_events:
            return

        for event in timeline_events:
            date_str = event.get("date", "")  # Format: YYYYMMDD
            description = event.get("description", "")

            if not date_str or not description:
                logger.warning(f"Invalid timeline event: {event}")
                continue

            # Validate date format
            if not self._validate_date_format(date_str):
                logger.warning(f"Invalid date format (expected YYYYMMDD): {date_str}")
                continue

            # Create the timeline path
            path = f"timeline.{date_str}"

            # Check if there's already an event for this date
            existing_events = await self.memory_store.asearch(namespace, path)

            if existing_events:
                # Merge with existing event(s) for the same day
                existing_content = self._extract_existing_content(existing_events)
                merged_content = self._merge_events(existing_content, description)
            else:
                merged_content = description

            # Store the timeline event as a memory
            memory_data = {
                "raw_text": merged_content,
                "original_content": original_content
                or merged_content,  # Store original input if available
                "summary": f"Timeline event on {self._format_date_display(date_str)}",
                "structured_data": {
                    "timeline_date": date_str,
                    "timeline_content": merged_content,
                    "original_content": original_content or merged_content,
                    "update_type": "timeline_event",
                },
                "memory_type": "timeline_event",
            }

            logger.info(f"DEBUG: Storing timeline memory_data: {memory_data}")

            # Store directly using the memory store with correct signature (async)
            await self.memory_store.store_memory_async(namespace, memory_data, path)
            logger.info(f"Applied timeline event: {path} = {merged_content[:100]}...")

    async def get_timeline_summary(
        self,
        start_date: str | None = None,
        end_date: str | None = None,
        llm=None,
        namespace: str = "default",
    ) -> str:
        """
        Generate a comprehensive timeline summary from stored timeline data.

        Args:
            start_date: Optional start date (YYYYMMDD format)
            end_date: Optional end date (YYYYMMDD format)
            llm: Optional LLM for generating narrative summary

        Returns:
            Timeline summary string
        """
        try:
            # Search for all timeline memories
            timeline_memories = await self.memory_store.asearch(namespace, "timeline.")

            # Debug: log what we found
            logger.debug(f"Found {len(timeline_memories)} timeline memories")

            # Filter by date range if specified
            if start_date or end_date:
                timeline_memories = self._filter_by_date_range(
                    timeline_memories, start_date, end_date
                )

            # Limit results if too many
            if len(timeline_memories) > 1000:
                timeline_memories = timeline_memories[:1000]

            if not timeline_memories:
                return "No timeline events available."

            # Organize timeline data chronologically
            timeline_data = self._organize_timeline_data(timeline_memories)

            # Generate summary
            if llm:
                return await self._generate_llm_summary(timeline_data, llm)
            else:
                return self._generate_structured_summary(timeline_data)

        except Exception as e:
            import traceback

            logger.error(f"Failed to generate timeline summary: {e}")
            logger.error(f"Full traceback: {traceback.format_exc()}")
            return f"Error generating timeline summary: {e}"

    def _validate_date_format(self, date_str: str) -> bool:
        """Validate that date string is in YYYYMMDD format."""
        if len(date_str) != 8:
            return False
        try:
            datetime.strptime(date_str, "%Y%m%d")
            return True
        except ValueError:
            return False

    def _format_date_display(self, date_str: str) -> str:
        """Format YYYYMMDD to human-readable date."""
        try:
            dt = datetime.strptime(date_str, "%Y%m%d")
            return dt.strftime("%B %d, %Y")
        except ValueError:
            return date_str

    def _extract_existing_content(self, existing_events: list[tuple[str, Any]]) -> str:
        """Extract content from existing timeline events."""
        contents = []
        for _, data in existing_events:
            if isinstance(data, dict):
                # Check if this is a MemoryItem structure with content field
                if "content" in data and isinstance(data["content"], dict):
                    memory_data = data["content"]
                    structured_data = memory_data.get("structured_data", {})
                    timeline_content = structured_data.get("timeline_content", "")
                    if timeline_content:
                        contents.append(timeline_content)
                else:
                    # Try direct access
                    structured_data = data.get("structured_data", {})
                    timeline_content = structured_data.get("timeline_content", "")
                    if timeline_content:
                        contents.append(timeline_content)

        return " | ".join(contents) if contents else ""

    def _merge_events(self, existing_content: str, new_content: str) -> str:
        """Merge existing and new events for the same day."""
        if not existing_content:
            return new_content

        # Simple merge strategy - combine with separator
        # In production, you might want to use an LLM to create a better summary
        return f"{existing_content} | {new_content}"

    def _filter_by_date_range(
        self,
        memories: list[tuple[str, Any]],
        start_date: str | None,
        end_date: str | None,
    ) -> list[tuple[str, Any]]:
        """Filter timeline memories by date range."""
        filtered = []

        for semantic_key, data in memories:
            # Extract date from key (timeline.YYYYMMDD)
            if "." in semantic_key:
                date_str = semantic_key.split(".")[-1]
                if self._validate_date_format(date_str):
                    # Check if within range
                    if start_date and date_str < start_date:
                        continue
                    if end_date and date_str > end_date:
                        continue
                    filtered.append((semantic_key, data))

        return filtered

    def _organize_timeline_data(
        self, timeline_memories: list[tuple[str, Any]]
    ) -> dict[str, str]:
        """Organize timeline memories into a chronological structure."""
        organized = {}

        for semantic_key, data in timeline_memories:
            try:
                # Extract date from key
                if "." not in semantic_key:
                    continue

                date_str = semantic_key.split(".")[-1]
                if not self._validate_date_format(date_str):
                    continue

                # Handle the data format
                if isinstance(data, dict):
                    # Check if this is a MemoryItem structure with content field
                    if "content" in data and isinstance(data["content"], dict):
                        memory_data = data["content"]
                        structured_data = memory_data.get("structured_data", {})
                    else:
                        memory_data = data
                        structured_data = data.get("structured_data", {})

                    # Get the timeline content
                    timeline_content = structured_data.get("timeline_content")
                    update_type = structured_data.get("update_type")

                    # Only process memories that are actual timeline events
                    if update_type != "timeline_event":
                        logger.debug(
                            f"Skipping non-timeline-event memory: {semantic_key}"
                        )
                        continue

                    if timeline_content:
                        organized[date_str] = timeline_content

            except Exception as e:
                logger.warning(f"Failed to process timeline memory {semantic_key}: {e}")
                continue

        # Sort by date
        sorted_dates = sorted(organized.keys())
        return {date: organized[date] for date in sorted_dates}

    def _generate_structured_summary(self, timeline_data: dict[str, str]) -> str:
        """Generate a structured text summary of timeline data."""
        if not timeline_data:
            return "No timeline events available."

        summary_parts = ["=== USER TIMELINE ===\n"]

        # Group by year and month for better organization
        events_by_year = {}
        for date_str, content in timeline_data.items():
            year = date_str[:4]
            month = date_str[4:6]

            if year not in events_by_year:
                events_by_year[year] = {}
            if month not in events_by_year[year]:
                events_by_year[year][month] = []

            events_by_year[year][month].append((date_str, content))

        # Generate summary by year and month
        for year in sorted(events_by_year.keys(), reverse=True):
            summary_parts.append(f"\n{year}:")

            for month in sorted(events_by_year[year].keys(), reverse=True):
                month_name = datetime.strptime(f"{year}{month}01", "%Y%m%d").strftime(
                    "%B"
                )
                summary_parts.append(f"\n  {month_name}:")

                for date_str, content in sorted(
                    events_by_year[year][month], reverse=True
                ):
                    day = int(date_str[6:8])
                    summary_parts.append(f"    {day:2d}: {content}")

        return "\n".join(summary_parts)

    async def _generate_llm_summary(self, timeline_data: dict[str, str], llm) -> str:
        """Generate a narrative summary using LLM."""
        try:
            # Convert timeline data to a readable format for LLM
            structured_summary = self._generate_structured_summary(timeline_data)

            prompt = f"""Generate a comprehensive, narrative timeline summary based on the following chronological events. Create a natural, flowing description that captures the key events and their significance in the person's life.

Timeline Data:
{structured_summary}

Instructions:
- Write in third person
- Create a cohesive narrative that flows naturally through time
- Highlight significant events and patterns
- Group related events logically
- Keep it comprehensive but concise
- Focus on the progression and development over time

Generate a timeline narrative:"""

            response = await llm.ainvoke(prompt)

            if hasattr(response, "content"):
                narrative_summary = response.content
            else:
                narrative_summary = str(response)

            # Combine structured and narrative summaries
            return f"=== USER TIMELINE ===\n\n{narrative_summary}\n\n--- Detailed Timeline ---\n{structured_summary}"

        except Exception as e:
            logger.error(f"Failed to generate LLM summary: {e}")
            # Fallback to structured summary
            return self._generate_structured_summary(timeline_data)

__init__

__init__(memory_store)

Initialize timeline memento with memory store.

Source code in src/memoir/memento/timeline.py
def __init__(self, memory_store):
    """Initialize timeline memento with memory store."""
    self.memory_store = memory_store

apply_timeline_events async

apply_timeline_events(timeline_events: list[dict[str, str]], metadata: dict | None = None, original_content: str | None = None, namespace: str = 'default') -> None

Apply timeline events to the memory store.

For same-day events, retrieves existing content and merges with new event.

Parameters:

Name Type Description Default
timeline_events list[dict[str, str]]

List of timeline events with date and description

required
metadata dict | None

Optional metadata to include with events

None
namespace str

Namespace to store timeline events in (default: "default")

'default'
Source code in src/memoir/memento/timeline.py
async def apply_timeline_events(
    self,
    timeline_events: list[dict[str, str]],
    metadata: dict | None = None,
    original_content: str | None = None,
    namespace: str = "default",
) -> None:
    """
    Apply timeline events to the memory store.

    For same-day events, retrieves existing content and merges with new event.

    Args:
        timeline_events: List of timeline events with date and description
        metadata: Optional metadata to include with events
        namespace: Namespace to store timeline events in (default: "default")
    """
    if not timeline_events:
        return

    for event in timeline_events:
        date_str = event.get("date", "")  # Format: YYYYMMDD
        description = event.get("description", "")

        if not date_str or not description:
            logger.warning(f"Invalid timeline event: {event}")
            continue

        # Validate date format
        if not self._validate_date_format(date_str):
            logger.warning(f"Invalid date format (expected YYYYMMDD): {date_str}")
            continue

        # Create the timeline path
        path = f"timeline.{date_str}"

        # Check if there's already an event for this date
        existing_events = await self.memory_store.asearch(namespace, path)

        if existing_events:
            # Merge with existing event(s) for the same day
            existing_content = self._extract_existing_content(existing_events)
            merged_content = self._merge_events(existing_content, description)
        else:
            merged_content = description

        # Store the timeline event as a memory
        memory_data = {
            "raw_text": merged_content,
            "original_content": original_content
            or merged_content,  # Store original input if available
            "summary": f"Timeline event on {self._format_date_display(date_str)}",
            "structured_data": {
                "timeline_date": date_str,
                "timeline_content": merged_content,
                "original_content": original_content or merged_content,
                "update_type": "timeline_event",
            },
            "memory_type": "timeline_event",
        }

        logger.info(f"DEBUG: Storing timeline memory_data: {memory_data}")

        # Store directly using the memory store with correct signature (async)
        await self.memory_store.store_memory_async(namespace, memory_data, path)
        logger.info(f"Applied timeline event: {path} = {merged_content[:100]}...")

get_timeline_summary async

get_timeline_summary(start_date: str | None = None, end_date: str | None = None, llm=None, namespace: str = 'default') -> str

Generate a comprehensive timeline summary from stored timeline data.

Parameters:

Name Type Description Default
start_date str | None

Optional start date (YYYYMMDD format)

None
end_date str | None

Optional end date (YYYYMMDD format)

None
llm

Optional LLM for generating narrative summary

None

Returns:

Type Description
str

Timeline summary string

Source code in src/memoir/memento/timeline.py
async def get_timeline_summary(
    self,
    start_date: str | None = None,
    end_date: str | None = None,
    llm=None,
    namespace: str = "default",
) -> str:
    """
    Generate a comprehensive timeline summary from stored timeline data.

    Args:
        start_date: Optional start date (YYYYMMDD format)
        end_date: Optional end date (YYYYMMDD format)
        llm: Optional LLM for generating narrative summary

    Returns:
        Timeline summary string
    """
    try:
        # Search for all timeline memories
        timeline_memories = await self.memory_store.asearch(namespace, "timeline.")

        # Debug: log what we found
        logger.debug(f"Found {len(timeline_memories)} timeline memories")

        # Filter by date range if specified
        if start_date or end_date:
            timeline_memories = self._filter_by_date_range(
                timeline_memories, start_date, end_date
            )

        # Limit results if too many
        if len(timeline_memories) > 1000:
            timeline_memories = timeline_memories[:1000]

        if not timeline_memories:
            return "No timeline events available."

        # Organize timeline data chronologically
        timeline_data = self._organize_timeline_data(timeline_memories)

        # Generate summary
        if llm:
            return await self._generate_llm_summary(timeline_data, llm)
        else:
            return self._generate_structured_summary(timeline_data)

    except Exception as e:
        import traceback

        logger.error(f"Failed to generate timeline summary: {e}")
        logger.error(f"Full traceback: {traceback.format_exc()}")
        return f"Error generating timeline summary: {e}"

IntelligentSearchEngine

LLM-powered search engine that intelligently selects relevant memory paths.

Two selection pipelines are available via the mode argument on :meth:search:

  • mode="single" (default) - one LLM call picks 1-3 paths from the full path inventory (with content samples). Lowest latency; signal-to-noise degrades as the store grows.
  • mode="tiered" - staged drill-down that mirrors the caller-driven [mode=drill] flow used by the memory-recall skill:

  • Pure-compute L1 histogram over stored paths.

  • LLM #1 picks 2-4 L1 prefixes likely to hold the answer.
  • Optional LLM #1.5 picks L2 prefixes when any picked L1 exceeds :data:L2_ESCALATION_THRESHOLD keys.
  • LLM #2 picks 3-7 exact keys from the descended subset.
  • Batched memory fetch via :meth:_extract_memories_from_data.

Both pipelines share path-discovery pre-work and emit comparable step_timings / llm_prompts metadata. Prompt caching markers in the single-stage prompt ([STATIC_SECTION_START] / [STATIC_SECTION_END]) are also applied to the tiered key-pick stage, which reuses :meth:_select_relevant_paths.

Source code in src/memoir/search/intelligent.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
class IntelligentSearchEngine:
    """
    LLM-powered search engine that intelligently selects relevant memory paths.

    Two selection pipelines are available via the ``mode`` argument on
    :meth:`search`:

    - ``mode="single"`` (default) - one LLM call picks 1-3 paths from the full
      path inventory (with content samples). Lowest latency; signal-to-noise
      degrades as the store grows.
    - ``mode="tiered"`` - staged drill-down that mirrors the caller-driven
      ``[mode=drill]`` flow used by the ``memory-recall`` skill:

      1. Pure-compute L1 histogram over stored paths.
      2. LLM #1 picks 2-4 L1 prefixes likely to hold the answer.
      3. Optional LLM #1.5 picks L2 prefixes when any picked L1 exceeds
         :data:`L2_ESCALATION_THRESHOLD` keys.
      4. LLM #2 picks 3-7 exact keys from the descended subset.
      5. Batched memory fetch via :meth:`_extract_memories_from_data`.

    Both pipelines share path-discovery pre-work and emit comparable
    ``step_timings`` / ``llm_prompts`` metadata. Prompt caching markers in the
    single-stage prompt (``[STATIC_SECTION_START]`` / ``[STATIC_SECTION_END]``)
    are also applied to the tiered key-pick stage, which reuses
    :meth:`_select_relevant_paths`.
    """

    def __init__(
        self,
        llm: Any,
        store: Any,
        taxonomy_loader: TaxonomyLoader | None = None,
    ):
        """
        Initialize the intelligent search engine.

        Args:
            llm: Language model for path selection
            store: Memory store (ProllyTreeStore)
            taxonomy_loader: Optional TaxonomyLoader for loading taxonomy from store.
                             When provided, taxonomy data is loaded from the store's taxonomy namespace.
                             When None, falls back to hardcoded TaxonomyPresets.
        """
        self.llm = llm
        self.store = store
        self._taxonomy_loader = taxonomy_loader
        self._static_prompt_cache: str | None = None

    def _get_classification_examples(
        self, limit: int = 100
    ) -> list[tuple[str, str, str]]:
        """Get classification examples from store or fallback to hardcoded.

        Args:
            limit: Maximum number of examples to return.

        Returns:
            List of (input_text, path, reasoning) tuples.
        """
        if self._taxonomy_loader:
            try:
                examples = self._taxonomy_loader.get_examples_from_store(limit=limit)
                if examples:
                    logger.debug(
                        f"[SearchEngine] Loaded {len(examples)} examples FROM STORE"
                    )
                    return examples
            except Exception as e:
                logger.warning(
                    f"[SearchEngine] Failed to load examples from store: {e}"
                )

        # Fallback to hardcoded examples
        logger.debug(f"[SearchEngine] Using FALLBACK examples (limit={limit})")
        return TaxonomyPresets.CLASSIFICATION_EXAMPLES[:limit]

    def _get_category_descriptions(self) -> dict[str, str]:
        """Get category descriptions from store or fallback to hardcoded.

        Returns:
            Dict mapping category to description.
        """
        if self._taxonomy_loader:
            try:
                descriptions = self._taxonomy_loader.get_descriptions_from_store()
                if descriptions:
                    logger.debug(
                        f"[SearchEngine] Loaded {len(descriptions)} descriptions FROM STORE"
                    )
                    return descriptions
            except Exception as e:
                logger.warning(
                    f"[SearchEngine] Failed to load descriptions from store: {e}"
                )

        # Fallback to hardcoded descriptions
        logger.debug("[SearchEngine] Using FALLBACK category descriptions")
        return TaxonomyPresets.CATEGORY_DESCRIPTIONS

    def _build_static_prompt(self) -> str:
        """
        Build the static prompt from store or TaxonomyPresets.

        Uses CLASSIFICATION_EXAMPLES and CATEGORY_DESCRIPTIONS for consistency
        with the IntelligentClassifier.
        """
        if self._static_prompt_cache is not None:
            return self._static_prompt_cache

        # Build category descriptions section (from store or fallback)
        category_lines = []
        for cat, desc in self._get_category_descriptions().items():
            category_lines.append(f"- {cat}: {desc}")
        categories_text = "\n".join(category_lines)

        # Build classification examples section (sample ~100 for prompt size)
        # Group by category for better organization
        examples_by_category: dict[str, list[str]] = {}
        for input_text, path, _reason in self._get_classification_examples(100):
            category = path.split(".")[0]
            if category not in examples_by_category:
                examples_by_category[category] = []
            if len(examples_by_category[category]) < 6:  # Max 6 per category
                examples_by_category[category].append(f'  - "{input_text}" → {path}')

        example_lines = []
        for category in sorted(examples_by_category.keys()):
            example_lines.append(f"{category.upper()}:")
            example_lines.extend(examples_by_category[category])
        examples_text = "\n".join(example_lines)

        prompt = f"""[STATIC_SECTION_START]
You are a memory search assistant. Your task is to select the most relevant memory paths that would answer the user's query.

TAXONOMY CATEGORIES (3-level paths: category.subcategory.type):
{categories_text}

CLASSIFICATION EXAMPLES (how memories are organized):
{examples_text}

SEARCH INSTRUCTIONS:
- Consider BOTH the semantic path meaning AND the content samples provided
- Match query keywords to the taxonomy categories above
- Return ONLY the exact path names from the available paths, one per line
- If no paths are relevant to the query, return "NONE"
[STATIC_SECTION_END]

[DYNAMIC_SECTION_START]"""

        self._static_prompt_cache = prompt
        return prompt

    async def search(
        self,
        query: str,
        namespace: str,
        limit: int = 10,
        return_prompts: bool = False,
        person_filter: str | None = None,
        mode: str = "single",
    ) -> list[IntelligentSearchResult]:
        """
        Search for relevant memories using LLM path selection.

        Args:
            query: Natural language search query
            namespace: User namespace to search in
            limit: Maximum number of results
            return_prompts: Whether to capture and return LLM prompts
            person_filter: Optional person name to filter paths (e.g., "john")
            mode: "single" (default, one LLM call) or "tiered" (multi-stage
                drill-down: L1 pick → optional L2 pick → key pick). Unknown
                values raise ValueError.

        Returns:
            List of IntelligentSearchResult objects
        """
        if mode not in VALID_MODES:
            raise ValueError(
                f"Unknown search mode {mode!r}; expected one of {VALID_MODES}"
            )

        try:
            import time

            step_timings = {}
            llm_prompts = {} if return_prompts else None
            search_start = time.time()
            # Step 1: Path Discovery - Get all available paths from the store
            step1_start = time.time()
            if isinstance(namespace, str):
                namespace_tuple = tuple(namespace.split(":"))
            else:
                namespace_tuple = namespace

            # Step 1a: Get all memories from the store
            all_memories = []
            try:
                all_memories = self.store.search(namespace_tuple, limit=10000)
                logger.info(
                    f"Found {len(all_memories)} memories in namespace {namespace_tuple}"
                )

                # Apply person filtering if specified
                if person_filter:
                    person_prefix = f"{person_filter.lower()}."
                    filtered_memories = []
                    for memory_item in all_memories:
                        _, path, data = memory_item
                        if path.lower().startswith(person_prefix):
                            filtered_memories.append(memory_item)

                    logger.info(
                        f"Person filtering '{person_filter}': {len(all_memories)} -> {len(filtered_memories)} memories"
                    )
                    all_memories = filtered_memories

            except Exception as e:
                logger.error(f"Failed to search memories: {e}")
                return []

            if not all_memories:
                logger.info(f"No memories found in namespace {namespace}")
                # Return timing-only result for early exit
                step_timings["step1_path_discovery"] = round(
                    time.time() - step1_start, 3
                )
                step_timings["step2_path_selection"] = 0.0
                step_timings["step3_memory_retrieval"] = 0.0
                step_timings["total_search"] = round(time.time() - search_start, 3)

                dummy_result = IntelligentSearchResult(
                    path="",
                    content="",
                    metadata={"step_timings": step_timings, "is_timing_only": True},
                    relevance_score=0.0,
                    namespace="",
                )
                return [dummy_result]

            # Check if person filtering resulted in no memories
            if not all_memories and person_filter:
                logger.info(
                    f"No memories found for person '{person_filter}' in namespace {namespace}"
                )
                # Return timing-only result for early exit
                step_timings["step1_path_discovery"] = round(
                    time.time() - step1_start, 3
                )
                step_timings["step2_path_selection"] = 0.0
                step_timings["step3_memory_retrieval"] = 0.0
                step_timings["total_search"] = round(time.time() - search_start, 3)

                dummy_result = IntelligentSearchResult(
                    path="",
                    content="",
                    metadata={
                        "step_timings": step_timings,
                        "is_timing_only": True,
                        "person_filter": person_filter,
                    },
                    relevance_score=0.0,
                    namespace="",
                )
                return [dummy_result]

            # Step 1b: Create path info from loaded memories (like the original logic)
            paths_info = {}
            for _, path, data in all_memories:
                if path not in paths_info and data is not None:
                    # Get a preview of what's stored at this path
                    if isinstance(data, dict) and "memories" in data:
                        # Aggregated memory
                        memory_count = data.get("count", len(data.get("memories", [])))
                        sample_content = ""
                        memories = data.get("memories", [])
                        if memories:
                            content = memories[0].get("content", "")
                            sample_content = str(content)[:100] if content else ""
                        paths_info[path] = {
                            "type": "aggregated",
                            "count": memory_count,
                            "sample": sample_content,
                        }
                    elif isinstance(data, dict):
                        # Single memory
                        content = data.get("content", str(data))
                        paths_info[path] = {
                            "type": "single",
                            "count": 1,
                            "sample": str(content)[:100],
                        }
                    else:
                        # Non-dict data
                        paths_info[path] = {
                            "type": "single",
                            "count": 1,
                            "sample": str(data)[:100] if data else "",
                        }

            if not paths_info:
                logger.info("No valid paths found")
                # Return timing-only result for early exit
                step_timings["step1_path_discovery"] = round(
                    time.time() - step1_start, 3
                )
                step_timings["step2_path_selection"] = 0.0
                step_timings["step3_memory_retrieval"] = 0.0
                step_timings["total_search"] = round(time.time() - search_start, 3)

                dummy_result = IntelligentSearchResult(
                    path="",
                    content="",
                    metadata={"step_timings": step_timings, "is_timing_only": True},
                    relevance_score=0.0,
                    namespace="",
                )
                return [dummy_result]

            step_timings["step1_path_discovery"] = round(time.time() - step1_start, 3)

            # Fork to tiered pipeline once common pre-work (namespace parsing,
            # store read, paths_info build) is done. The tiered path runs its
            # own multi-stage selection and memory retrieval, then returns.
            if mode == "tiered":
                return await self._search_tiered(
                    query=query,
                    namespace_tuple=namespace_tuple,
                    limit=limit,
                    all_memories=all_memories,
                    paths_info=paths_info,
                    step_timings=step_timings,
                    llm_prompts=llm_prompts,
                    search_start=search_start,
                )

            # Step 2: Semantic Path Selection - Ask LLM to select relevant paths
            step2_start = time.time()
            selected_paths = await self._select_relevant_paths(
                query, paths_info, limit=limit, llm_prompts=llm_prompts
            )

            if not selected_paths:
                logger.info(f"LLM didn't select any relevant paths for query: {query}")
                # Return timing-only result for early exit
                step_timings["step2_path_selection"] = round(
                    time.time() - step2_start, 3
                )
                step_timings["step3_memory_retrieval"] = 0.0
                step_timings["total_search"] = round(time.time() - search_start, 3)

                metadata = {"step_timings": step_timings, "is_timing_only": True}
                if llm_prompts:
                    metadata["llm_prompts"] = llm_prompts
                dummy_result = IntelligentSearchResult(
                    path="",
                    content="",
                    metadata=metadata,
                    relevance_score=0.0,
                    namespace="",
                )
                return [dummy_result]

            step_timings["step2_path_selection"] = round(time.time() - step2_start, 3)

            # Step 3: Memory Retrieval - Extract results from already-loaded memories
            step3_start = time.time()
            results = []

            # Create a lookup dict for faster access (O(1) instead of O(n))
            memory_dict = {path: data for _, path, data in all_memories}

            for path in selected_paths[:limit]:  # Limit paths processed
                if path in memory_dict:
                    data = memory_dict[path]
                    path_memories = self._extract_memories_from_data(
                        namespace_tuple, path, data
                    )
                    results.extend(path_memories)

                if len(results) >= limit:
                    break

            step_timings["step3_memory_retrieval"] = round(time.time() - step3_start, 3)
            step_timings["total_search"] = round(time.time() - search_start, 3)

            # Store timing info and prompts in the results for access by the API
            for result in results:
                if hasattr(result, "metadata"):
                    if not result.metadata:
                        result.metadata = {}
                    result.metadata["step_timings"] = step_timings
                    result.metadata["mode"] = "single"
                    if llm_prompts:
                        result.metadata["llm_prompts"] = llm_prompts

            # If no results but we have timing data, create a dummy result to carry timing info
            if not results and step_timings:
                metadata = {"step_timings": step_timings, "is_timing_only": True}
                if llm_prompts:
                    metadata["llm_prompts"] = llm_prompts
                dummy_result = IntelligentSearchResult(
                    path="",
                    content="",
                    metadata=metadata,
                    relevance_score=0.0,
                    namespace="",
                )
                return [dummy_result]

            return results[:limit]

        except Exception as e:
            logger.error(f"Error in intelligent search: {e}")
            # Return timing-only result even for exceptions
            if "step_timings" in locals():
                step_timings["total_search"] = round(time.time() - search_start, 3)
                dummy_result = IntelligentSearchResult(
                    path="",
                    content="",
                    metadata={"step_timings": step_timings, "is_timing_only": True},
                    relevance_score=0.0,
                    namespace="",
                )
                return [dummy_result]
            return []

    async def _search_tiered(
        self,
        query: str,
        namespace_tuple: tuple,
        limit: int,
        all_memories: list,
        paths_info: dict,
        step_timings: dict,
        llm_prompts: dict | None,
        search_start: float,
    ) -> list[IntelligentSearchResult]:
        """Multi-stage drill-down selection, mirroring the skill's ``[mode=drill]``.

        Pipeline: L1 histogram → LLM picks L1 prefixes → (optional LLM L2 pick
        when an L1 is too wide) → LLM picks exact keys → batched memory fetch.
        """
        import time

        all_paths = list(paths_info.keys())

        # Step 2a: L1 survey (pure compute — no LLM).
        step_l1 = time.time()
        l1_counts = _group_by_depth(all_paths, 1)
        step_timings["l1_survey"] = round(time.time() - step_l1, 3)

        # Step 2b: L1 pick (LLM call #1).
        step_l1_llm = time.time()
        picked_l1 = await self._pick_l1_prefixes(
            query, l1_counts, limit=4, llm_prompts=llm_prompts
        )
        step_timings["l1_pick_llm"] = round(time.time() - step_l1_llm, 3)
        if not picked_l1:
            # Defensive fallback: take top-N by count so the search still
            # produces something rather than dying silently.
            picked_l1 = [
                p for p, _ in sorted(l1_counts.items(), key=lambda x: -x[1])[:3]
            ]
            logger.info(
                f"Tiered: L1 pick empty/failed, falling back to top-N by count: {picked_l1}"
            )

        # Step 2c: Descend from L1 into concrete keys.
        step_descend = time.time()
        descended_paths: list[str] = []
        oversized_l1: dict[str, list[str]] = {}
        for l1 in picked_l1:
            scoped = _filter_keys(all_paths, f"{l1}.*")
            if len(scoped) > L2_ESCALATION_THRESHOLD:
                oversized_l1[l1] = scoped
            else:
                descended_paths.extend(scoped)
        step_timings["descend"] = round(time.time() - step_descend, 3)

        # Step 2d: Optional L2 pick (LLM call #1.5) for any wide L1.
        if oversized_l1:
            step_l2_llm = time.time()
            for l1, scoped in oversized_l1.items():
                l2_counts = _group_by_depth(scoped, 2)
                picked_l2 = await self._pick_l2_prefixes(
                    query,
                    l1,
                    l2_counts,
                    limit=3,
                    llm_prompts=llm_prompts,
                )
                if not picked_l2:
                    picked_l2 = [
                        p for p, _ in sorted(l2_counts.items(), key=lambda x: -x[1])[:2]
                    ]
                    logger.info(
                        f"Tiered: L2 pick empty/failed for '{l1}', "
                        f"falling back to top-N by count: {picked_l2}"
                    )
                for l2_prefix in picked_l2:
                    descended_paths.extend(_filter_keys(scoped, f"{l2_prefix}.*"))
            step_timings["l2_pick_llm"] = round(time.time() - step_l2_llm, 3)

        # Step 2e: Key pick (LLM call #2) — choose exact keys from descended set.
        step_key_llm = time.time()
        # Dedupe while preserving order; filter to known paths_info entries.
        seen: set[str] = set()
        descended_info: dict[str, dict] = {}
        for p in descended_paths:
            if p in seen or p not in paths_info:
                continue
            seen.add(p)
            descended_info[p] = paths_info[p]

        if not descended_info:
            step_timings["key_pick_llm"] = 0.0
            step_timings["memory_retrieval"] = 0.0
            step_timings["total_search"] = round(time.time() - search_start, 3)
            metadata = {
                "step_timings": step_timings,
                "is_timing_only": True,
                "mode": "tiered",
            }
            if llm_prompts:
                metadata["llm_prompts"] = llm_prompts
            return [
                IntelligentSearchResult(
                    path="",
                    content="",
                    metadata=metadata,
                    relevance_score=0.0,
                    namespace="",
                )
            ]

        selected_paths = await self._select_relevant_paths(
            query, descended_info, limit=limit, llm_prompts=llm_prompts
        )
        # _select_relevant_paths writes under "path_selection"; rename for the
        # tiered-mode key naming the plan specifies (l1_pick / l2_pick / key_pick).
        if llm_prompts is not None and "path_selection" in llm_prompts:
            llm_prompts["key_pick"] = llm_prompts.pop("path_selection")
        step_timings["key_pick_llm"] = round(time.time() - step_key_llm, 3)

        # Step 3: Memory retrieval (same shape as single-stage).
        step_retrieval = time.time()
        memory_dict = {path: data for _, path, data in all_memories}
        results: list[IntelligentSearchResult] = []
        for path in selected_paths[:limit]:
            if path in memory_dict:
                path_memories = self._extract_memories_from_data(
                    namespace_tuple, path, memory_dict[path]
                )
                results.extend(path_memories)
            if len(results) >= limit:
                break
        step_timings["memory_retrieval"] = round(time.time() - step_retrieval, 3)
        step_timings["total_search"] = round(time.time() - search_start, 3)

        for result in results:
            if hasattr(result, "metadata"):
                if not result.metadata:
                    result.metadata = {}
                result.metadata["step_timings"] = step_timings
                result.metadata["mode"] = "tiered"
                if llm_prompts:
                    result.metadata["llm_prompts"] = llm_prompts

        if not results:
            metadata = {
                "step_timings": step_timings,
                "is_timing_only": True,
                "mode": "tiered",
            }
            if llm_prompts:
                metadata["llm_prompts"] = llm_prompts
            return [
                IntelligentSearchResult(
                    path="",
                    content="",
                    metadata=metadata,
                    relevance_score=0.0,
                    namespace="",
                )
            ]

        return results[:limit]

    async def _pick_l1_prefixes(
        self,
        query: str,
        l1_counts: dict[str, int],
        limit: int = 4,
        llm_prompts: dict | None = None,
    ) -> list[str]:
        """LLM picks 2-4 top-level prefixes likely to hold the answer."""
        if not l1_counts:
            return []

        histogram_lines = [
            f"- {prefix} ({count})" for prefix, count in l1_counts.items()
        ]
        histogram_text = "\n".join(histogram_lines)

        prompt = f"""You are a memory search assistant. You will receive a user query and a histogram of top-level taxonomy prefixes (with memory counts). Pick the prefixes most likely to contain memories that answer the query.

Query: "{query}"

Top-level prefixes in the store:
{histogram_text}

Instructions:
- Select up to {limit} prefixes whose names plausibly cover the query.
- Return ONLY prefix names, one per line. No explanation, no prose.
- If none are relevant, return "NONE".

Selected prefixes (up to {limit}):"""

        if llm_prompts is not None:
            llm_prompts["l1_pick"] = prompt

        try:
            messages = [{"role": "user", "content": prompt}]
            if hasattr(self.llm, "ainvoke"):
                response = await self.llm.ainvoke(messages)
            else:
                response = self.llm.invoke(messages)
            response_text = response.content.strip()
            if response_text.upper() == "NONE":
                return []
            valid = set(l1_counts.keys())
            picked: list[str] = []
            for line in response_text.split("\n"):
                line = line.strip().lstrip("- ").strip()
                if line and line in valid and line not in picked:
                    picked.append(line)
            logger.info(f"Tiered: L1 picked {picked} for query '{query}'")
            return picked
        except Exception as e:
            logger.error(f"Tiered: L1 pick LLM failed: {e}")
            return []

    async def _pick_l2_prefixes(
        self,
        query: str,
        l1: str,
        l2_counts: dict[str, int],
        limit: int = 3,
        llm_prompts: dict | None = None,
    ) -> list[str]:
        """LLM narrows a wide L1 prefix down to 2-3 L2 prefixes."""
        if not l2_counts:
            return []

        histogram_lines = [
            f"- {prefix} ({count})" for prefix, count in l2_counts.items()
        ]
        histogram_text = "\n".join(histogram_lines)

        prompt = f"""You are a memory search assistant drilling into a large taxonomy branch.

Query: "{query}"

The branch '{l1}' has many keys. Here is its L2 histogram:
{histogram_text}

Instructions:
- Select up to {limit} L2 prefixes under '{l1}' most likely to contain memories that answer the query.
- Return ONLY L2 prefix names (as shown above, including the '{l1}.' part), one per line.
- If none are relevant, return "NONE".

Selected prefixes (up to {limit}):"""

        # Accumulate L2 prompts per-l1 so a single query with multiple wide L1s
        # still exposes each sub-prompt to callers.
        if llm_prompts is not None:
            existing = llm_prompts.get("l2_pick")
            combined_entry = f"[l1={l1}]\n{prompt}"
            if existing:
                llm_prompts["l2_pick"] = f"{existing}\n\n{combined_entry}"
            else:
                llm_prompts["l2_pick"] = combined_entry

        try:
            messages = [{"role": "user", "content": prompt}]
            if hasattr(self.llm, "ainvoke"):
                response = await self.llm.ainvoke(messages)
            else:
                response = self.llm.invoke(messages)
            response_text = response.content.strip()
            if response_text.upper() == "NONE":
                return []
            valid = set(l2_counts.keys())
            picked: list[str] = []
            for line in response_text.split("\n"):
                line = line.strip().lstrip("- ").strip()
                if line and line in valid and line not in picked:
                    picked.append(line)
            logger.info(f"Tiered: L2 picked {picked} under '{l1}'")
            return picked
        except Exception as e:
            logger.error(f"Tiered: L2 pick LLM failed for '{l1}': {e}")
            return []

    async def _select_relevant_paths(
        self,
        query: str,
        paths_info: dict,
        limit: int = 5,
        llm_prompts: dict | None = None,
    ) -> list[str]:
        """
        Use LLM to select the most relevant paths for the query.

        Uses a single LLM call with both path names and content samples.
        Supports prompt caching via static/dynamic section markers.

        Args:
            query: User's search query
            paths_info: Dictionary of path -> info (with content samples)
            limit: Maximum number of paths to select

        Returns:
            List of selected path strings
        """
        # Build paths list with content samples for better selection
        paths_list = []
        for path, info in paths_info.items():
            sample = info.get("sample", "")[:100]  # Limit sample length
            count = info.get("count", 1)
            if sample:
                paths_list.append(f"- {path} ({count} memories): {sample}...")
            else:
                paths_list.append(f"- {path} ({count} memories)")

        paths_text = "\n".join(paths_list)

        # Build prompt with static/dynamic sections for caching
        static_prompt = self._build_static_prompt()
        prompt = f"""{static_prompt}
Select up to {limit} paths that most directly answer the query.

Query: "{query}"

Available memory paths with content samples:
{paths_text}

Selected paths (up to {limit}):"""

        try:
            # Store the prompt if requested
            if llm_prompts is not None:
                llm_prompts["path_selection"] = prompt

            # Call the LLM (use ainvoke since we're in async context)
            messages = [{"role": "user", "content": prompt}]
            if hasattr(self.llm, "ainvoke"):
                response = await self.llm.ainvoke(messages)
            else:
                response = self.llm.invoke(messages)

            # Parse the response
            response_text = response.content.strip()

            if response_text.upper() == "NONE":
                return []

            # Extract path names from response
            selected_paths = []
            for line in response_text.split("\n"):
                line = line.strip()
                # Handle potential formatting like "- path.name" or "path.name"
                if line.startswith("- "):
                    line = line[2:]
                if line and line in paths_info:
                    selected_paths.append(line)

            logger.info(
                f"LLM selected {len(selected_paths)} paths for query '{query}': {selected_paths}"
            )
            return selected_paths

        except Exception as e:
            logger.error(f"Error in LLM path selection: {e}")
            # Fallback: return first few paths
            return list(paths_info.keys())[:3]

    def _extract_memories_from_data(
        self, namespace_tuple: tuple, path: str, data: any
    ) -> list[IntelligentSearchResult]:
        """
        Extract memories from data for a specific path (optimized version).

        Args:
            namespace_tuple: Namespace as tuple
            path: Memory path
            data: Memory data

        Returns:
            List of search results from this data
        """
        results = []
        namespace_str = ":".join(namespace_tuple)

        if isinstance(data, dict) and "memories" in data:
            # Aggregated memory - expand all individual memories
            memories = data.get("memories", [])
            for memory_entry in memories:
                content = memory_entry.get("content", "")
                confidence = memory_entry.get("confidence", 1.0)
                metadata = memory_entry.get("metadata", {})
                metadata.update({"path": path, "source": "aggregated"})

                result = IntelligentSearchResult(
                    path=path,
                    content=str(content),
                    metadata=metadata,
                    relevance_score=confidence,
                    namespace=namespace_str,
                )
                results.append(result)
        else:
            # Single memory
            content = (
                data.get("content", str(data)) if isinstance(data, dict) else str(data)
            )
            confidence = data.get("confidence", 1.0) if isinstance(data, dict) else 1.0
            metadata = data.get("metadata", {}) if isinstance(data, dict) else {}
            metadata.update({"path": path, "source": "single"})

            result = IntelligentSearchResult(
                path=path,
                content=str(content),
                metadata=metadata,
                relevance_score=confidence,
                namespace=namespace_str,
            )
            results.append(result)

        return results

    def _get_memories_from_path(
        self, namespace_tuple: tuple, path: str, all_memories: list
    ) -> list[IntelligentSearchResult]:
        """
        Extract memories from a specific path.

        Args:
            namespace_tuple: Namespace as tuple
            path: Memory path to retrieve from
            all_memories: All memory data from store

        Returns:
            List of search results from this path
        """
        results = []

        for _, stored_path, data in all_memories:
            if stored_path != path:
                continue

            if isinstance(data, dict) and "memories" in data:
                # Aggregated memory - expand all individual memories
                memories = data.get("memories", [])
                for memory_entry in memories:
                    content = memory_entry.get("content", "")
                    confidence = memory_entry.get("confidence", 1.0)
                    metadata = memory_entry.get("metadata", {})
                    metadata.update({"path": path, "source": "aggregated"})

                    # Convert namespace tuple to string
                    namespace_str = ":".join(namespace_tuple)

                    result = IntelligentSearchResult(
                        path=path,
                        content=str(content),
                        metadata=metadata,
                        relevance_score=confidence,
                        namespace=namespace_str,
                    )
                    results.append(result)
            else:
                # Single memory
                content = data.get("content", str(data))
                confidence = data.get("confidence", 1.0)
                metadata = data.get("metadata", {})
                metadata.update({"path": path, "source": "single"})

                # Convert namespace tuple to string
                namespace_str = ":".join(namespace_tuple)

                result = IntelligentSearchResult(
                    path=path,
                    content=str(content),
                    metadata=metadata,
                    relevance_score=confidence,
                    namespace=namespace_str,
                )
                results.append(result)

        return results

__init__

__init__(llm: Any, store: Any, taxonomy_loader: TaxonomyLoader | None = None)

Initialize the intelligent search engine.

Parameters:

Name Type Description Default
llm Any

Language model for path selection

required
store Any

Memory store (ProllyTreeStore)

required
taxonomy_loader TaxonomyLoader | None

Optional TaxonomyLoader for loading taxonomy from store. When provided, taxonomy data is loaded from the store's taxonomy namespace. When None, falls back to hardcoded TaxonomyPresets.

None
Source code in src/memoir/search/intelligent.py
def __init__(
    self,
    llm: Any,
    store: Any,
    taxonomy_loader: TaxonomyLoader | None = None,
):
    """
    Initialize the intelligent search engine.

    Args:
        llm: Language model for path selection
        store: Memory store (ProllyTreeStore)
        taxonomy_loader: Optional TaxonomyLoader for loading taxonomy from store.
                         When provided, taxonomy data is loaded from the store's taxonomy namespace.
                         When None, falls back to hardcoded TaxonomyPresets.
    """
    self.llm = llm
    self.store = store
    self._taxonomy_loader = taxonomy_loader
    self._static_prompt_cache: str | None = None

search async

search(query: str, namespace: str, limit: int = 10, return_prompts: bool = False, person_filter: str | None = None, mode: str = 'single') -> list[IntelligentSearchResult]

Search for relevant memories using LLM path selection.

Parameters:

Name Type Description Default
query str

Natural language search query

required
namespace str

User namespace to search in

required
limit int

Maximum number of results

10
return_prompts bool

Whether to capture and return LLM prompts

False
person_filter str | None

Optional person name to filter paths (e.g., "john")

None
mode str

"single" (default, one LLM call) or "tiered" (multi-stage drill-down: L1 pick → optional L2 pick → key pick). Unknown values raise ValueError.

'single'

Returns:

Type Description
list[IntelligentSearchResult]

List of IntelligentSearchResult objects

Source code in src/memoir/search/intelligent.py
async def search(
    self,
    query: str,
    namespace: str,
    limit: int = 10,
    return_prompts: bool = False,
    person_filter: str | None = None,
    mode: str = "single",
) -> list[IntelligentSearchResult]:
    """
    Search for relevant memories using LLM path selection.

    Args:
        query: Natural language search query
        namespace: User namespace to search in
        limit: Maximum number of results
        return_prompts: Whether to capture and return LLM prompts
        person_filter: Optional person name to filter paths (e.g., "john")
        mode: "single" (default, one LLM call) or "tiered" (multi-stage
            drill-down: L1 pick → optional L2 pick → key pick). Unknown
            values raise ValueError.

    Returns:
        List of IntelligentSearchResult objects
    """
    if mode not in VALID_MODES:
        raise ValueError(
            f"Unknown search mode {mode!r}; expected one of {VALID_MODES}"
        )

    try:
        import time

        step_timings = {}
        llm_prompts = {} if return_prompts else None
        search_start = time.time()
        # Step 1: Path Discovery - Get all available paths from the store
        step1_start = time.time()
        if isinstance(namespace, str):
            namespace_tuple = tuple(namespace.split(":"))
        else:
            namespace_tuple = namespace

        # Step 1a: Get all memories from the store
        all_memories = []
        try:
            all_memories = self.store.search(namespace_tuple, limit=10000)
            logger.info(
                f"Found {len(all_memories)} memories in namespace {namespace_tuple}"
            )

            # Apply person filtering if specified
            if person_filter:
                person_prefix = f"{person_filter.lower()}."
                filtered_memories = []
                for memory_item in all_memories:
                    _, path, data = memory_item
                    if path.lower().startswith(person_prefix):
                        filtered_memories.append(memory_item)

                logger.info(
                    f"Person filtering '{person_filter}': {len(all_memories)} -> {len(filtered_memories)} memories"
                )
                all_memories = filtered_memories

        except Exception as e:
            logger.error(f"Failed to search memories: {e}")
            return []

        if not all_memories:
            logger.info(f"No memories found in namespace {namespace}")
            # Return timing-only result for early exit
            step_timings["step1_path_discovery"] = round(
                time.time() - step1_start, 3
            )
            step_timings["step2_path_selection"] = 0.0
            step_timings["step3_memory_retrieval"] = 0.0
            step_timings["total_search"] = round(time.time() - search_start, 3)

            dummy_result = IntelligentSearchResult(
                path="",
                content="",
                metadata={"step_timings": step_timings, "is_timing_only": True},
                relevance_score=0.0,
                namespace="",
            )
            return [dummy_result]

        # Check if person filtering resulted in no memories
        if not all_memories and person_filter:
            logger.info(
                f"No memories found for person '{person_filter}' in namespace {namespace}"
            )
            # Return timing-only result for early exit
            step_timings["step1_path_discovery"] = round(
                time.time() - step1_start, 3
            )
            step_timings["step2_path_selection"] = 0.0
            step_timings["step3_memory_retrieval"] = 0.0
            step_timings["total_search"] = round(time.time() - search_start, 3)

            dummy_result = IntelligentSearchResult(
                path="",
                content="",
                metadata={
                    "step_timings": step_timings,
                    "is_timing_only": True,
                    "person_filter": person_filter,
                },
                relevance_score=0.0,
                namespace="",
            )
            return [dummy_result]

        # Step 1b: Create path info from loaded memories (like the original logic)
        paths_info = {}
        for _, path, data in all_memories:
            if path not in paths_info and data is not None:
                # Get a preview of what's stored at this path
                if isinstance(data, dict) and "memories" in data:
                    # Aggregated memory
                    memory_count = data.get("count", len(data.get("memories", [])))
                    sample_content = ""
                    memories = data.get("memories", [])
                    if memories:
                        content = memories[0].get("content", "")
                        sample_content = str(content)[:100] if content else ""
                    paths_info[path] = {
                        "type": "aggregated",
                        "count": memory_count,
                        "sample": sample_content,
                    }
                elif isinstance(data, dict):
                    # Single memory
                    content = data.get("content", str(data))
                    paths_info[path] = {
                        "type": "single",
                        "count": 1,
                        "sample": str(content)[:100],
                    }
                else:
                    # Non-dict data
                    paths_info[path] = {
                        "type": "single",
                        "count": 1,
                        "sample": str(data)[:100] if data else "",
                    }

        if not paths_info:
            logger.info("No valid paths found")
            # Return timing-only result for early exit
            step_timings["step1_path_discovery"] = round(
                time.time() - step1_start, 3
            )
            step_timings["step2_path_selection"] = 0.0
            step_timings["step3_memory_retrieval"] = 0.0
            step_timings["total_search"] = round(time.time() - search_start, 3)

            dummy_result = IntelligentSearchResult(
                path="",
                content="",
                metadata={"step_timings": step_timings, "is_timing_only": True},
                relevance_score=0.0,
                namespace="",
            )
            return [dummy_result]

        step_timings["step1_path_discovery"] = round(time.time() - step1_start, 3)

        # Fork to tiered pipeline once common pre-work (namespace parsing,
        # store read, paths_info build) is done. The tiered path runs its
        # own multi-stage selection and memory retrieval, then returns.
        if mode == "tiered":
            return await self._search_tiered(
                query=query,
                namespace_tuple=namespace_tuple,
                limit=limit,
                all_memories=all_memories,
                paths_info=paths_info,
                step_timings=step_timings,
                llm_prompts=llm_prompts,
                search_start=search_start,
            )

        # Step 2: Semantic Path Selection - Ask LLM to select relevant paths
        step2_start = time.time()
        selected_paths = await self._select_relevant_paths(
            query, paths_info, limit=limit, llm_prompts=llm_prompts
        )

        if not selected_paths:
            logger.info(f"LLM didn't select any relevant paths for query: {query}")
            # Return timing-only result for early exit
            step_timings["step2_path_selection"] = round(
                time.time() - step2_start, 3
            )
            step_timings["step3_memory_retrieval"] = 0.0
            step_timings["total_search"] = round(time.time() - search_start, 3)

            metadata = {"step_timings": step_timings, "is_timing_only": True}
            if llm_prompts:
                metadata["llm_prompts"] = llm_prompts
            dummy_result = IntelligentSearchResult(
                path="",
                content="",
                metadata=metadata,
                relevance_score=0.0,
                namespace="",
            )
            return [dummy_result]

        step_timings["step2_path_selection"] = round(time.time() - step2_start, 3)

        # Step 3: Memory Retrieval - Extract results from already-loaded memories
        step3_start = time.time()
        results = []

        # Create a lookup dict for faster access (O(1) instead of O(n))
        memory_dict = {path: data for _, path, data in all_memories}

        for path in selected_paths[:limit]:  # Limit paths processed
            if path in memory_dict:
                data = memory_dict[path]
                path_memories = self._extract_memories_from_data(
                    namespace_tuple, path, data
                )
                results.extend(path_memories)

            if len(results) >= limit:
                break

        step_timings["step3_memory_retrieval"] = round(time.time() - step3_start, 3)
        step_timings["total_search"] = round(time.time() - search_start, 3)

        # Store timing info and prompts in the results for access by the API
        for result in results:
            if hasattr(result, "metadata"):
                if not result.metadata:
                    result.metadata = {}
                result.metadata["step_timings"] = step_timings
                result.metadata["mode"] = "single"
                if llm_prompts:
                    result.metadata["llm_prompts"] = llm_prompts

        # If no results but we have timing data, create a dummy result to carry timing info
        if not results and step_timings:
            metadata = {"step_timings": step_timings, "is_timing_only": True}
            if llm_prompts:
                metadata["llm_prompts"] = llm_prompts
            dummy_result = IntelligentSearchResult(
                path="",
                content="",
                metadata=metadata,
                relevance_score=0.0,
                namespace="",
            )
            return [dummy_result]

        return results[:limit]

    except Exception as e:
        logger.error(f"Error in intelligent search: {e}")
        # Return timing-only result even for exceptions
        if "step_timings" in locals():
            step_timings["total_search"] = round(time.time() - search_start, 3)
            dummy_result = IntelligentSearchResult(
                path="",
                content="",
                metadata={"step_timings": step_timings, "is_timing_only": True},
                relevance_score=0.0,
                namespace="",
            )
            return [dummy_result]
        return []

IntelligentSearchResult dataclass

Simple search result containing memory content and metadata.

Source code in src/memoir/search/intelligent.py
@dataclass
class IntelligentSearchResult:
    """Simple search result containing memory content and metadata."""

    path: str
    content: str
    metadata: dict
    relevance_score: float = 1.0
    namespace: str = ""

MemoryItem

Bases: BaseModel

Represents a memory item in the store.

Source code in src/memoir/store/prolly_adapter.py
class MemoryItem(BaseModel):
    """Represents a memory item in the store."""

    key: str = Field(description="Semantic taxonomy key")
    namespace: str = Field(description="User/agent namespace")
    content: Any = Field(description="Memory content")
    metadata: dict[str, Any] = Field(
        default_factory=dict, description="Additional metadata"
    )
    timestamp: float = Field(
        default_factory=time.time, description="Creation timestamp"
    )
    version: str | None = Field(default=None, description="Version/commit ID")
    confidence: float = Field(default=1.0, description="Classification confidence")

ProllyTreeStore

Bases: BaseStore

High-performance semantic memory store using ProllyTree. Implements LangGraph's BaseStore interface following the reference pattern.

Source code in src/memoir/store/prolly_adapter.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
class ProllyTreeStore(BaseStore):
    """
    High-performance semantic memory store using ProllyTree.
    Implements LangGraph's BaseStore interface following the reference pattern.
    """

    def __init__(
        self,
        path: str,
        enable_versioning: bool = True,
        auto_commit: bool = True,
        cache_size: int = 10000,
    ):
        """
        Initialize ProllyTree store.

        Storage layer is responsible only for storing and retrieving data.
        Classification is handled by higher layers (memory manager).

        ProllyTreeStore is strict: it opens an existing memoir store and
        refuses paths that aren't one yet. Use ``StoreService.create_store``
        (or ``memoir new``) to bootstrap a fresh store. Single init path =
        no surprise side-effects from running `memoir remember` in a random
        cwd.

        Args:
            path: Path to an existing memoir store (must contain a ``.git``
                directory when ``enable_versioning`` is True).
            enable_versioning: Whether to enable git-like versioning
            auto_commit: Whether to automatically commit on each put/delete operation
            cache_size: Size of internal caches
        """
        super().__init__()

        self.path = Path(path)
        self.path.mkdir(parents=True, exist_ok=True)

        if enable_versioning and not (self.path / ".git").exists():
            raise FileNotFoundError(
                f"Not a memoir store: {self.path} (no .git directory). "
                f"Create one with `memoir new <path>` first, or pass "
                f"-s/--store / set MEMOIR_STORE / cd into an existing store."
            )

        # Initialize ProllyTree
        if enable_versioning:
            # Create data subdirectory for VersionedKvStore
            data_dir = self.path / "data"
            data_dir.mkdir(exist_ok=True)
            # VersionedKvStore (prollytree Rust binding) uses cwd to locate the
            # enclosing git repository even when handed an absolute path —
            # which means callers in non-git cwds (e.g. /tmp, ~/.memoir) get
            # "Not in a git repository" errors. Construction needs a chdir;
            # so do per-operation calls (`.insert`/`.update`/`.commit`/`.get`).
            # We chdir here for the constructor, then wrap the tree in
            # _CwdLockedTree so every later method call also chdir's first.
            import os as _os

            _saved_cwd = _os.getcwd()
            try:
                _os.chdir(str(self.path))
                _raw_tree = VersionedKvStore(str(data_dir))
            finally:
                _os.chdir(_saved_cwd)
            self.tree = _CwdLockedTree(_raw_tree, self.path)
        else:
            # Memory mode doesn't touch git, so no cwd wrapper needed.
            self.tree = ProllyTree("memory")

        self.enable_versioning = enable_versioning
        self.auto_commit = auto_commit
        # Storage layer doesn't need taxonomy, classifier, or search engine
        # These are handled by higher layers

        # Performance tracking
        self._stats = {"reads": 0, "writes": 0, "searches": 0, "classifications": 0}

        # Key registry for memory mode (since ProllyTree doesn't have list_keys in memory mode)
        self._keys = set()

        # Populate key registry from existing data
        self._populate_key_registry()

        # Track aggregated memories to avoid redundant updates
        self._aggregation_cache = {}

    def _populate_key_registry(self):
        """Populate the key registry from existing data in the store."""
        try:
            if hasattr(self.tree, "scan"):
                # Use scan if available to iterate through all keys
                for key_bytes, _ in self.tree.scan():
                    key_str = key_bytes.decode("utf-8")
                    self._keys.add(key_str)
            elif hasattr(self.tree, "list_keys"):
                # Use list_keys if available
                for key_bytes in self.tree.list_keys():
                    key_str = key_bytes.decode("utf-8")
                    self._keys.add(key_str)
            else:
                # No way to enumerate keys, registry will be empty initially
                # Keys will be added as they are accessed via put()
                pass

            logger.info(f"Populated key registry with {len(self._keys)} existing keys")
        except Exception as e:
            logger.warning(f"Could not populate key registry: {e}")
            # Continue without existing keys - they'll be added as accessed

    def _encode_value(self, value: Any) -> bytes:
        """Encode any value to bytes for storage."""
        if isinstance(value, bytes):
            return value
        elif isinstance(value, str):
            return value.encode("utf-8")
        else:
            # Use JSON for complex objects
            json_str = json.dumps(value, default=str)
            return json_str.encode("utf-8")

    def _decode_value(self, data: bytes) -> Any:
        """Decode bytes from storage back to original type."""
        if not data:
            return None
        try:
            # Try to decode as JSON first
            json_str = data.decode("utf-8")
            return json.loads(json_str)
        except (json.JSONDecodeError, UnicodeDecodeError):
            # Return as string if not JSON
            try:
                return data.decode("utf-8")
            except UnicodeDecodeError:
                return data

    # BaseStore interface methods
    def batch(self, ops: list[tuple]) -> list[Any]:
        """Batch operations - required by BaseStore."""
        results = []
        for op in ops:
            if len(op) == 2:
                method, args = op
                result = getattr(self, method)(*args)
                results.append(result)
        return results

    def abatch(self, ops: list[tuple]) -> list[Any]:
        """Async batch operations - synchronous implementation."""
        return self.batch(ops)

    def search(
        self, namespace: tuple, *, filter: dict | None = None, limit: int = 10
    ) -> list[tuple]:
        """Search for items in a namespace."""
        self._stats["searches"] += 1
        prefix = ":".join(namespace) + ":"
        results = []

        try:
            # Use our key registry to find matching keys
            count = 0
            for full_key in self._keys:
                if count >= limit:
                    break

                if full_key.startswith(prefix):
                    key_bytes = full_key.encode("utf-8")
                    if self.enable_versioning:
                        value = self.tree.get(key_bytes)
                    else:
                        value = self.tree.find(key_bytes)
                    decoded_value = self._decode_value(value)

                    # Apply filter if provided
                    if filter and not all(
                        decoded_value.get(k) == v
                        for k, v in filter.items()
                        if isinstance(decoded_value, dict)
                    ):
                        continue

                    # Extract item key from full key
                    item_key = full_key[len(prefix) :]
                    results.append((namespace, item_key, decoded_value))
                    count += 1
        except Exception as e:
            logger.error(f"Error searching namespace {namespace}: {e}")

        return results

    def put(self, namespace: tuple, key: str, value: dict) -> None:
        """Store a value in a namespace."""
        self._stats["writes"] += 1
        full_key = ":".join(namespace) + ":" + key
        key_bytes = full_key.encode("utf-8")
        value_bytes = self._encode_value(value)

        try:
            if self.enable_versioning:
                # VersionedKvStore API - check if key exists using get
                existing = self.tree.get(key_bytes)
                if existing:
                    self.tree.update(key_bytes, value_bytes)
                else:
                    self.tree.insert(key_bytes, value_bytes)
                # Commit the change if auto_commit is enabled
                if self.auto_commit:
                    self.tree.commit(f"Store {key} in {':'.join(namespace)}")
            else:
                # ProllyTree API - check if key exists using find
                existing = self.tree.find(key_bytes)
                if existing:
                    self.tree.update(key_bytes, value_bytes)
                else:
                    self.tree.insert(key_bytes, value_bytes)

            # Track the key in our registry
            self._keys.add(full_key)

        except Exception as e:
            logger.error(f"Error storing {full_key}: {e}")
            raise

    def get(self, namespace: tuple, key: str) -> dict | None:
        """Retrieve a value from a namespace."""
        self._stats["reads"] += 1
        full_key = ":".join(namespace) + ":" + key
        key_bytes = full_key.encode("utf-8")

        try:
            if self.enable_versioning:
                # VersionedKvStore API
                data = self.tree.get(key_bytes)
            else:
                # ProllyTree API
                data = self.tree.find(key_bytes)
            return self._decode_value(data) if data else None
        except Exception as e:
            logger.error(f"Error getting key {full_key}: {e}")
            return None

    def delete(self, namespace: tuple, key: str) -> None:
        """Delete a key from a namespace."""
        full_key = ":".join(namespace) + ":" + key
        key_bytes = full_key.encode("utf-8")

        try:
            self.tree.delete(key_bytes)
            # Remove from key registry
            self._keys.discard(full_key)
            if self.enable_versioning and self.auto_commit:
                self.tree.commit(f"Delete {key} from {':'.join(namespace)}")
        except Exception as e:
            logger.error(f"Error deleting {full_key}: {e}")

    def commit(self, message: str = "Manual commit") -> str | None:
        """
        Manually commit pending changes to the versioned store.

        This is useful when auto_commit is disabled and you want to batch
        multiple operations before committing.

        Args:
            message: Commit message

        Returns:
            Commit hash if versioning is enabled, None otherwise
        """
        if not self.enable_versioning:
            logger.warning("Commit requested but versioning is not enabled")
            return None

        try:
            commit_hash = self.tree.commit(message)
            logger.debug(f"Manual commit successful: {message}")
            return commit_hash
        except Exception as e:
            logger.error(f"Error committing changes: {e}")
            raise

    def get_key_history(
        self, namespace: tuple, key: str, limit: int = 10
    ) -> list[dict]:
        """
        Get commit history for a specific key.

        Args:
            namespace: Namespace tuple
            key: Key to get history for
            limit: Maximum number of commits to return

        Returns:
            List of commit dictionaries with id, timestamp, message, author, committer
        """
        if not self.enable_versioning:
            return []

        full_key = ":".join(namespace) + ":" + key
        key_bytes = full_key.encode("utf-8")

        try:
            commits = self.tree.get_commits_for_key(key_bytes)
            # Limit results and return most recent first
            return commits[:limit]
        except Exception as e:
            logger.error(f"Error getting history for {full_key}: {e}")
            return []

    def get_key_at_commit(
        self, namespace: tuple, key: str, commit_id: str
    ) -> dict | None:
        """
        Get the value of a key at a specific commit.

        Note: Current implementation returns None since VersionedKvStore doesn't support
        direct commit checkout. This is a placeholder for future enhancement.

        Args:
            namespace: Namespace tuple
            key: Key to retrieve
            commit_id: Commit ID to retrieve from

        Returns:
            None (historical content retrieval not yet implemented)
        """
        if not self.enable_versioning:
            return None

        # TODO: Implement historical content retrieval when VersionedKvStore supports it
        # Current limitation: VersionedKvStore only supports branch checkout, not commit checkout
        logger.debug(
            f"Historical content retrieval not yet implemented for commit {commit_id[:8]}"
        )
        return None

    def create_time_snapshot(self, snapshot_name: str) -> bool:
        """
        Create a branch snapshot at the current point in time.

        When auto_commit=False, this will first commit any pending changes
        before creating the snapshot to ensure all recent changes are included.

        Args:
            snapshot_name: Name for the snapshot branch

        Returns:
            True if snapshot created successfully
        """
        if not self.enable_versioning:
            return False

        try:
            # If auto_commit is disabled, commit pending changes before snapshot
            if not self.auto_commit:
                commit_hash = self.commit(
                    f"Auto-commit before snapshot: {snapshot_name}"
                )
                if commit_hash:
                    logger.debug(
                        f"Auto-committed pending changes before snapshot: {commit_hash[:8]}"
                    )

            self.tree.create_branch(snapshot_name)
            logger.debug(f"Created time snapshot: {snapshot_name}")
            return True
        except Exception as e:
            logger.error(f"Failed to create snapshot {snapshot_name}: {e}")
            return False

    def get_state_at_snapshot(
        self, namespace: tuple, snapshot_name: str
    ) -> dict[str, Any]:
        """
        Get all keys in a namespace at a specific snapshot.

        Args:
            namespace: Namespace tuple
            snapshot_name: Name of the snapshot branch

        Returns:
            Dictionary of key -> value at that snapshot
        """
        if not self.enable_versioning:
            return {}

        try:
            # Save current branch
            current_branch = self.tree.current_branch()

            # Switch to snapshot
            self.tree.checkout(snapshot_name)

            # Get all keys in namespace
            state = {}
            namespace_prefix = ":".join(namespace) + ":"

            keys = self.tree.list_keys()
            for key in keys:
                key_str = key.decode("utf-8") if isinstance(key, bytes) else key
                if key_str.startswith(namespace_prefix):
                    # Get value
                    value = self.tree.get(
                        key if isinstance(key, bytes) else key.encode("utf-8")
                    )
                    if value:
                        # Extract the key without namespace prefix
                        short_key = key_str[len(namespace_prefix) :]
                        state[short_key] = self._decode_value(value)

            # Return to original branch
            self.tree.checkout(current_branch)

            return state

        except Exception as e:
            logger.error(f"Failed to get state at snapshot {snapshot_name}: {e}")
            # Try to return to original branch
            with contextlib.suppress(Exception):
                self.tree.checkout(current_branch)
            return {}

    # Enhanced methods for semantic memory functionality
    async def store_memory_async(
        self, namespace: str, content: Any, key: str
    ) -> MemoryItem:
        """
        Store a memory at the given semantic key.

        Note: Classification must be done by the caller (memory manager).
        Storage layer is responsible only for storing, not classifying.

        Args:
            namespace: User/agent namespace
            content: Memory content to store
            key: Semantic key where to store (REQUIRED - no classification here)

        Returns:
            MemoryItem with storage results
        """
        # Storage layer: just use the provided semantic key (no classification)
        semantic_key = key
        confidence = 1.0  # Confidence is determined by the caller (memory manager)

        # Use semantic key for aggregation
        storage_key = semantic_key

        # Create memory entry (not the full item)
        memory_entry = {
            "content": content,
            "confidence": confidence,
            "timestamp": time.time(),
            "metadata": {},
        }

        # Convert namespace to tuple format
        if ":" in namespace:
            namespace_parts = namespace.split(":")
            namespace_tuple = tuple(namespace_parts)
        else:
            namespace_tuple = (namespace,)

        # Get existing aggregated memory or create new one
        existing = self.get(namespace_tuple, storage_key)

        if existing and isinstance(existing, dict) and "memories" in existing:
            # Append to existing aggregated memory
            aggregated = AggregatedMemory(**existing)
            aggregated.memories.append(memory_entry)
            aggregated.count += 1
            aggregated.last_timestamp = memory_entry["timestamp"]
            aggregated.last_updated = time.time()
        else:
            # Create new aggregated memory
            aggregated = AggregatedMemory(
                path=semantic_key,
                memories=[memory_entry],
                count=1,
                first_timestamp=memory_entry["timestamp"],
                last_timestamp=memory_entry["timestamp"],
            )

        # Store the aggregated memory
        self.put(namespace_tuple, storage_key, aggregated.model_dump())

        # Create MemoryItem for return value (for compatibility)
        item = MemoryItem(
            key=semantic_key,
            namespace=namespace,
            content=content,
            confidence=confidence,
            timestamp=memory_entry["timestamp"],
        )

        if self.enable_versioning and hasattr(self.tree, "get_head"):
            item.version = self.tree.get_head()

        return item

    # Sync store_memory method removed - use store_memory_async for all operations
    # This eliminates the async/sync mismatch and fallback issues

    async def asearch(self, namespace: str, path_prefix: str) -> list[tuple[str, Any]]:
        """
        Async search for items with a given path prefix.
        Used by HierarchicalSearchEngine.

        Args:
            namespace: User namespace
            path_prefix: Path prefix to search for

        Returns:
            List of (semantic_key, data) tuples
        """
        # Use synchronous search with prefix
        results = []
        # Convert string namespace to tuple format
        # "memory:general" -> ("memory", "general")
        namespace_parts = namespace.split(":")
        namespace_tuple = tuple(namespace_parts)

        search_results = self.search(namespace_tuple, limit=100)

        for _, storage_key, data in search_results:
            semantic_key = storage_key

            # Check if semantic path matches prefix
            if semantic_key.startswith(path_prefix):
                # For aggregated memories, we return them as-is
                # The search engine will handle expanding them
                if isinstance(data, dict) and "memories" in data:
                    # This is an aggregated memory - return it
                    results.append((semantic_key, data))
                else:
                    # Legacy single memory format
                    results.append((semantic_key, data))

        return results

    async def retrieve_memories_async(
        self, namespace: str, query: str, limit: int = 10
    ) -> list[MemoryItem]:
        """
        Retrieve memories using semantic search (async version).

        Args:
            namespace: User/agent namespace
            query: Search query
            limit: Maximum number of results

        Returns:
            List of matching memory items
        """
        # Use the hierarchical search engine to find relevant memories
        search_results = await self.search_engine.search(query, namespace)

        # Convert search results to memory items with deduplication
        memories = []
        seen_content = set()

        for result in search_results:
            # The search result contains combined content from multiple items
            if result.combined_content:
                try:
                    # Split combined content back into individual memories
                    individual_contents = result.combined_content.split(" | ")
                    for content_text in individual_contents:
                        if content_text.strip():
                            # Create a memory item from the content
                            memory = MemoryItem(
                                key=result.path,
                                namespace=result.namespace,
                                content=content_text.strip(),
                                confidence=1.0,  # Default confidence
                                timestamp=time.time(),
                            )
                            # Deduplicate by content
                            content_hash = hash(memory.content)
                            if content_hash not in seen_content:
                                seen_content.add(content_hash)
                                memories.append(memory)
                                # Stop when we have enough unique results
                                if len(memories) >= limit:
                                    break
                    if len(memories) >= limit:
                        break
                except Exception as e:
                    logger.warning(f"Failed to parse memory item: {e}")

        return memories

    def retrieve_memories(
        self, namespace: str, query: str, limit: int = 10
    ) -> list[MemoryItem]:
        """
        Retrieve memories using semantic search (sync fallback).

        Note: This is a simple fallback. For proper semantic search,
        use retrieve_memories_async() which leverages the HierarchicalSearchEngine.

        Args:
            namespace: User/agent namespace
            query: Search query
            limit: Maximum number of results

        Returns:
            List of matching memory items
        """
        logger.warning(
            "Using fallback sync search. For better results, use retrieve_memories_async()"
        )

        # Simple fallback - just return all memories
        all_memories = []
        search_results = self.search((namespace,), limit=limit)

        for _, _key, data in search_results:
            if isinstance(data, dict):
                try:
                    memory = MemoryItem(**data)
                    all_memories.append(memory)
                except Exception as e:
                    logger.warning(f"Failed to parse memory item: {e}")

        return all_memories

    def get_statistics(self) -> dict[str, Any]:
        """Get store statistics."""
        stats = {
            "performance": self._stats.copy(),
            "total_keys": len(self._keys),
            "total_namespaces": len({key.split(":")[0] for key in self._keys}),
        }

        if self.enable_versioning and hasattr(self.tree, "get_head"):
            try:
                stats["versioning"] = {
                    "current_commit": self.tree.get_head(),
                }
                if hasattr(self.tree, "log"):
                    commits = self.tree.log()
                    stats["versioning"]["total_commits"] = len(commits)
            except Exception:
                pass

        return stats

    def export_namespace(self, namespace: str, output_path: str) -> None:
        """
        Export all memories from a namespace to JSON.

        Args:
            namespace: Namespace to export
            output_path: Path to save JSON file
        """
        memories = {}
        search_results = self.search((namespace,), limit=1000)

        for _, key, data in search_results:
            memories[key] = data

        with open(output_path, "w") as f:
            json.dump(
                {
                    "namespace": namespace,
                    "timestamp": time.time(),
                    "memories": memories,
                },
                f,
                indent=2,
            )

        logger.info(f"Exported {len(memories)} memories to {output_path}")

__init__

__init__(path: str, enable_versioning: bool = True, auto_commit: bool = True, cache_size: int = 10000)

Initialize ProllyTree store.

Storage layer is responsible only for storing and retrieving data. Classification is handled by higher layers (memory manager).

ProllyTreeStore is strict: it opens an existing memoir store and refuses paths that aren't one yet. Use StoreService.create_store (or memoir new) to bootstrap a fresh store. Single init path = no surprise side-effects from running memoir remember in a random cwd.

Parameters:

Name Type Description Default
path str

Path to an existing memoir store (must contain a .git directory when enable_versioning is True).

required
enable_versioning bool

Whether to enable git-like versioning

True
auto_commit bool

Whether to automatically commit on each put/delete operation

True
cache_size int

Size of internal caches

10000
Source code in src/memoir/store/prolly_adapter.py
def __init__(
    self,
    path: str,
    enable_versioning: bool = True,
    auto_commit: bool = True,
    cache_size: int = 10000,
):
    """
    Initialize ProllyTree store.

    Storage layer is responsible only for storing and retrieving data.
    Classification is handled by higher layers (memory manager).

    ProllyTreeStore is strict: it opens an existing memoir store and
    refuses paths that aren't one yet. Use ``StoreService.create_store``
    (or ``memoir new``) to bootstrap a fresh store. Single init path =
    no surprise side-effects from running `memoir remember` in a random
    cwd.

    Args:
        path: Path to an existing memoir store (must contain a ``.git``
            directory when ``enable_versioning`` is True).
        enable_versioning: Whether to enable git-like versioning
        auto_commit: Whether to automatically commit on each put/delete operation
        cache_size: Size of internal caches
    """
    super().__init__()

    self.path = Path(path)
    self.path.mkdir(parents=True, exist_ok=True)

    if enable_versioning and not (self.path / ".git").exists():
        raise FileNotFoundError(
            f"Not a memoir store: {self.path} (no .git directory). "
            f"Create one with `memoir new <path>` first, or pass "
            f"-s/--store / set MEMOIR_STORE / cd into an existing store."
        )

    # Initialize ProllyTree
    if enable_versioning:
        # Create data subdirectory for VersionedKvStore
        data_dir = self.path / "data"
        data_dir.mkdir(exist_ok=True)
        # VersionedKvStore (prollytree Rust binding) uses cwd to locate the
        # enclosing git repository even when handed an absolute path —
        # which means callers in non-git cwds (e.g. /tmp, ~/.memoir) get
        # "Not in a git repository" errors. Construction needs a chdir;
        # so do per-operation calls (`.insert`/`.update`/`.commit`/`.get`).
        # We chdir here for the constructor, then wrap the tree in
        # _CwdLockedTree so every later method call also chdir's first.
        import os as _os

        _saved_cwd = _os.getcwd()
        try:
            _os.chdir(str(self.path))
            _raw_tree = VersionedKvStore(str(data_dir))
        finally:
            _os.chdir(_saved_cwd)
        self.tree = _CwdLockedTree(_raw_tree, self.path)
    else:
        # Memory mode doesn't touch git, so no cwd wrapper needed.
        self.tree = ProllyTree("memory")

    self.enable_versioning = enable_versioning
    self.auto_commit = auto_commit
    # Storage layer doesn't need taxonomy, classifier, or search engine
    # These are handled by higher layers

    # Performance tracking
    self._stats = {"reads": 0, "writes": 0, "searches": 0, "classifications": 0}

    # Key registry for memory mode (since ProllyTree doesn't have list_keys in memory mode)
    self._keys = set()

    # Populate key registry from existing data
    self._populate_key_registry()

    # Track aggregated memories to avoid redundant updates
    self._aggregation_cache = {}

batch

batch(ops: list[tuple]) -> list[Any]

Batch operations - required by BaseStore.

Source code in src/memoir/store/prolly_adapter.py
def batch(self, ops: list[tuple]) -> list[Any]:
    """Batch operations - required by BaseStore."""
    results = []
    for op in ops:
        if len(op) == 2:
            method, args = op
            result = getattr(self, method)(*args)
            results.append(result)
    return results

abatch

abatch(ops: list[tuple]) -> list[Any]

Async batch operations - synchronous implementation.

Source code in src/memoir/store/prolly_adapter.py
def abatch(self, ops: list[tuple]) -> list[Any]:
    """Async batch operations - synchronous implementation."""
    return self.batch(ops)

search

search(namespace: tuple, *, filter: dict | None = None, limit: int = 10) -> list[tuple]

Search for items in a namespace.

Source code in src/memoir/store/prolly_adapter.py
def search(
    self, namespace: tuple, *, filter: dict | None = None, limit: int = 10
) -> list[tuple]:
    """Search for items in a namespace."""
    self._stats["searches"] += 1
    prefix = ":".join(namespace) + ":"
    results = []

    try:
        # Use our key registry to find matching keys
        count = 0
        for full_key in self._keys:
            if count >= limit:
                break

            if full_key.startswith(prefix):
                key_bytes = full_key.encode("utf-8")
                if self.enable_versioning:
                    value = self.tree.get(key_bytes)
                else:
                    value = self.tree.find(key_bytes)
                decoded_value = self._decode_value(value)

                # Apply filter if provided
                if filter and not all(
                    decoded_value.get(k) == v
                    for k, v in filter.items()
                    if isinstance(decoded_value, dict)
                ):
                    continue

                # Extract item key from full key
                item_key = full_key[len(prefix) :]
                results.append((namespace, item_key, decoded_value))
                count += 1
    except Exception as e:
        logger.error(f"Error searching namespace {namespace}: {e}")

    return results

put

put(namespace: tuple, key: str, value: dict) -> None

Store a value in a namespace.

Source code in src/memoir/store/prolly_adapter.py
def put(self, namespace: tuple, key: str, value: dict) -> None:
    """Store a value in a namespace."""
    self._stats["writes"] += 1
    full_key = ":".join(namespace) + ":" + key
    key_bytes = full_key.encode("utf-8")
    value_bytes = self._encode_value(value)

    try:
        if self.enable_versioning:
            # VersionedKvStore API - check if key exists using get
            existing = self.tree.get(key_bytes)
            if existing:
                self.tree.update(key_bytes, value_bytes)
            else:
                self.tree.insert(key_bytes, value_bytes)
            # Commit the change if auto_commit is enabled
            if self.auto_commit:
                self.tree.commit(f"Store {key} in {':'.join(namespace)}")
        else:
            # ProllyTree API - check if key exists using find
            existing = self.tree.find(key_bytes)
            if existing:
                self.tree.update(key_bytes, value_bytes)
            else:
                self.tree.insert(key_bytes, value_bytes)

        # Track the key in our registry
        self._keys.add(full_key)

    except Exception as e:
        logger.error(f"Error storing {full_key}: {e}")
        raise

get

get(namespace: tuple, key: str) -> dict | None

Retrieve a value from a namespace.

Source code in src/memoir/store/prolly_adapter.py
def get(self, namespace: tuple, key: str) -> dict | None:
    """Retrieve a value from a namespace."""
    self._stats["reads"] += 1
    full_key = ":".join(namespace) + ":" + key
    key_bytes = full_key.encode("utf-8")

    try:
        if self.enable_versioning:
            # VersionedKvStore API
            data = self.tree.get(key_bytes)
        else:
            # ProllyTree API
            data = self.tree.find(key_bytes)
        return self._decode_value(data) if data else None
    except Exception as e:
        logger.error(f"Error getting key {full_key}: {e}")
        return None

delete

delete(namespace: tuple, key: str) -> None

Delete a key from a namespace.

Source code in src/memoir/store/prolly_adapter.py
def delete(self, namespace: tuple, key: str) -> None:
    """Delete a key from a namespace."""
    full_key = ":".join(namespace) + ":" + key
    key_bytes = full_key.encode("utf-8")

    try:
        self.tree.delete(key_bytes)
        # Remove from key registry
        self._keys.discard(full_key)
        if self.enable_versioning and self.auto_commit:
            self.tree.commit(f"Delete {key} from {':'.join(namespace)}")
    except Exception as e:
        logger.error(f"Error deleting {full_key}: {e}")

commit

commit(message: str = 'Manual commit') -> str | None

Manually commit pending changes to the versioned store.

This is useful when auto_commit is disabled and you want to batch multiple operations before committing.

Parameters:

Name Type Description Default
message str

Commit message

'Manual commit'

Returns:

Type Description
str | None

Commit hash if versioning is enabled, None otherwise

Source code in src/memoir/store/prolly_adapter.py
def commit(self, message: str = "Manual commit") -> str | None:
    """
    Manually commit pending changes to the versioned store.

    This is useful when auto_commit is disabled and you want to batch
    multiple operations before committing.

    Args:
        message: Commit message

    Returns:
        Commit hash if versioning is enabled, None otherwise
    """
    if not self.enable_versioning:
        logger.warning("Commit requested but versioning is not enabled")
        return None

    try:
        commit_hash = self.tree.commit(message)
        logger.debug(f"Manual commit successful: {message}")
        return commit_hash
    except Exception as e:
        logger.error(f"Error committing changes: {e}")
        raise

get_key_history

get_key_history(namespace: tuple, key: str, limit: int = 10) -> list[dict]

Get commit history for a specific key.

Parameters:

Name Type Description Default
namespace tuple

Namespace tuple

required
key str

Key to get history for

required
limit int

Maximum number of commits to return

10

Returns:

Type Description
list[dict]

List of commit dictionaries with id, timestamp, message, author, committer

Source code in src/memoir/store/prolly_adapter.py
def get_key_history(
    self, namespace: tuple, key: str, limit: int = 10
) -> list[dict]:
    """
    Get commit history for a specific key.

    Args:
        namespace: Namespace tuple
        key: Key to get history for
        limit: Maximum number of commits to return

    Returns:
        List of commit dictionaries with id, timestamp, message, author, committer
    """
    if not self.enable_versioning:
        return []

    full_key = ":".join(namespace) + ":" + key
    key_bytes = full_key.encode("utf-8")

    try:
        commits = self.tree.get_commits_for_key(key_bytes)
        # Limit results and return most recent first
        return commits[:limit]
    except Exception as e:
        logger.error(f"Error getting history for {full_key}: {e}")
        return []

get_key_at_commit

get_key_at_commit(namespace: tuple, key: str, commit_id: str) -> dict | None

Get the value of a key at a specific commit.

Note: Current implementation returns None since VersionedKvStore doesn't support direct commit checkout. This is a placeholder for future enhancement.

Parameters:

Name Type Description Default
namespace tuple

Namespace tuple

required
key str

Key to retrieve

required
commit_id str

Commit ID to retrieve from

required

Returns:

Type Description
dict | None

None (historical content retrieval not yet implemented)

Source code in src/memoir/store/prolly_adapter.py
def get_key_at_commit(
    self, namespace: tuple, key: str, commit_id: str
) -> dict | None:
    """
    Get the value of a key at a specific commit.

    Note: Current implementation returns None since VersionedKvStore doesn't support
    direct commit checkout. This is a placeholder for future enhancement.

    Args:
        namespace: Namespace tuple
        key: Key to retrieve
        commit_id: Commit ID to retrieve from

    Returns:
        None (historical content retrieval not yet implemented)
    """
    if not self.enable_versioning:
        return None

    # TODO: Implement historical content retrieval when VersionedKvStore supports it
    # Current limitation: VersionedKvStore only supports branch checkout, not commit checkout
    logger.debug(
        f"Historical content retrieval not yet implemented for commit {commit_id[:8]}"
    )
    return None

create_time_snapshot

create_time_snapshot(snapshot_name: str) -> bool

Create a branch snapshot at the current point in time.

When auto_commit=False, this will first commit any pending changes before creating the snapshot to ensure all recent changes are included.

Parameters:

Name Type Description Default
snapshot_name str

Name for the snapshot branch

required

Returns:

Type Description
bool

True if snapshot created successfully

Source code in src/memoir/store/prolly_adapter.py
def create_time_snapshot(self, snapshot_name: str) -> bool:
    """
    Create a branch snapshot at the current point in time.

    When auto_commit=False, this will first commit any pending changes
    before creating the snapshot to ensure all recent changes are included.

    Args:
        snapshot_name: Name for the snapshot branch

    Returns:
        True if snapshot created successfully
    """
    if not self.enable_versioning:
        return False

    try:
        # If auto_commit is disabled, commit pending changes before snapshot
        if not self.auto_commit:
            commit_hash = self.commit(
                f"Auto-commit before snapshot: {snapshot_name}"
            )
            if commit_hash:
                logger.debug(
                    f"Auto-committed pending changes before snapshot: {commit_hash[:8]}"
                )

        self.tree.create_branch(snapshot_name)
        logger.debug(f"Created time snapshot: {snapshot_name}")
        return True
    except Exception as e:
        logger.error(f"Failed to create snapshot {snapshot_name}: {e}")
        return False

get_state_at_snapshot

get_state_at_snapshot(namespace: tuple, snapshot_name: str) -> dict[str, Any]

Get all keys in a namespace at a specific snapshot.

Parameters:

Name Type Description Default
namespace tuple

Namespace tuple

required
snapshot_name str

Name of the snapshot branch

required

Returns:

Type Description
dict[str, Any]

Dictionary of key -> value at that snapshot

Source code in src/memoir/store/prolly_adapter.py
def get_state_at_snapshot(
    self, namespace: tuple, snapshot_name: str
) -> dict[str, Any]:
    """
    Get all keys in a namespace at a specific snapshot.

    Args:
        namespace: Namespace tuple
        snapshot_name: Name of the snapshot branch

    Returns:
        Dictionary of key -> value at that snapshot
    """
    if not self.enable_versioning:
        return {}

    try:
        # Save current branch
        current_branch = self.tree.current_branch()

        # Switch to snapshot
        self.tree.checkout(snapshot_name)

        # Get all keys in namespace
        state = {}
        namespace_prefix = ":".join(namespace) + ":"

        keys = self.tree.list_keys()
        for key in keys:
            key_str = key.decode("utf-8") if isinstance(key, bytes) else key
            if key_str.startswith(namespace_prefix):
                # Get value
                value = self.tree.get(
                    key if isinstance(key, bytes) else key.encode("utf-8")
                )
                if value:
                    # Extract the key without namespace prefix
                    short_key = key_str[len(namespace_prefix) :]
                    state[short_key] = self._decode_value(value)

        # Return to original branch
        self.tree.checkout(current_branch)

        return state

    except Exception as e:
        logger.error(f"Failed to get state at snapshot {snapshot_name}: {e}")
        # Try to return to original branch
        with contextlib.suppress(Exception):
            self.tree.checkout(current_branch)
        return {}

store_memory_async async

store_memory_async(namespace: str, content: Any, key: str) -> MemoryItem

Store a memory at the given semantic key.

Note: Classification must be done by the caller (memory manager). Storage layer is responsible only for storing, not classifying.

Parameters:

Name Type Description Default
namespace str

User/agent namespace

required
content Any

Memory content to store

required
key str

Semantic key where to store (REQUIRED - no classification here)

required

Returns:

Type Description
MemoryItem

MemoryItem with storage results

Source code in src/memoir/store/prolly_adapter.py
async def store_memory_async(
    self, namespace: str, content: Any, key: str
) -> MemoryItem:
    """
    Store a memory at the given semantic key.

    Note: Classification must be done by the caller (memory manager).
    Storage layer is responsible only for storing, not classifying.

    Args:
        namespace: User/agent namespace
        content: Memory content to store
        key: Semantic key where to store (REQUIRED - no classification here)

    Returns:
        MemoryItem with storage results
    """
    # Storage layer: just use the provided semantic key (no classification)
    semantic_key = key
    confidence = 1.0  # Confidence is determined by the caller (memory manager)

    # Use semantic key for aggregation
    storage_key = semantic_key

    # Create memory entry (not the full item)
    memory_entry = {
        "content": content,
        "confidence": confidence,
        "timestamp": time.time(),
        "metadata": {},
    }

    # Convert namespace to tuple format
    if ":" in namespace:
        namespace_parts = namespace.split(":")
        namespace_tuple = tuple(namespace_parts)
    else:
        namespace_tuple = (namespace,)

    # Get existing aggregated memory or create new one
    existing = self.get(namespace_tuple, storage_key)

    if existing and isinstance(existing, dict) and "memories" in existing:
        # Append to existing aggregated memory
        aggregated = AggregatedMemory(**existing)
        aggregated.memories.append(memory_entry)
        aggregated.count += 1
        aggregated.last_timestamp = memory_entry["timestamp"]
        aggregated.last_updated = time.time()
    else:
        # Create new aggregated memory
        aggregated = AggregatedMemory(
            path=semantic_key,
            memories=[memory_entry],
            count=1,
            first_timestamp=memory_entry["timestamp"],
            last_timestamp=memory_entry["timestamp"],
        )

    # Store the aggregated memory
    self.put(namespace_tuple, storage_key, aggregated.model_dump())

    # Create MemoryItem for return value (for compatibility)
    item = MemoryItem(
        key=semantic_key,
        namespace=namespace,
        content=content,
        confidence=confidence,
        timestamp=memory_entry["timestamp"],
    )

    if self.enable_versioning and hasattr(self.tree, "get_head"):
        item.version = self.tree.get_head()

    return item

asearch async

asearch(namespace: str, path_prefix: str) -> list[tuple[str, Any]]

Async search for items with a given path prefix. Used by HierarchicalSearchEngine.

Parameters:

Name Type Description Default
namespace str

User namespace

required
path_prefix str

Path prefix to search for

required

Returns:

Type Description
list[tuple[str, Any]]

List of (semantic_key, data) tuples

Source code in src/memoir/store/prolly_adapter.py
async def asearch(self, namespace: str, path_prefix: str) -> list[tuple[str, Any]]:
    """
    Async search for items with a given path prefix.
    Used by HierarchicalSearchEngine.

    Args:
        namespace: User namespace
        path_prefix: Path prefix to search for

    Returns:
        List of (semantic_key, data) tuples
    """
    # Use synchronous search with prefix
    results = []
    # Convert string namespace to tuple format
    # "memory:general" -> ("memory", "general")
    namespace_parts = namespace.split(":")
    namespace_tuple = tuple(namespace_parts)

    search_results = self.search(namespace_tuple, limit=100)

    for _, storage_key, data in search_results:
        semantic_key = storage_key

        # Check if semantic path matches prefix
        if semantic_key.startswith(path_prefix):
            # For aggregated memories, we return them as-is
            # The search engine will handle expanding them
            if isinstance(data, dict) and "memories" in data:
                # This is an aggregated memory - return it
                results.append((semantic_key, data))
            else:
                # Legacy single memory format
                results.append((semantic_key, data))

    return results

retrieve_memories_async async

retrieve_memories_async(namespace: str, query: str, limit: int = 10) -> list[MemoryItem]

Retrieve memories using semantic search (async version).

Parameters:

Name Type Description Default
namespace str

User/agent namespace

required
query str

Search query

required
limit int

Maximum number of results

10

Returns:

Type Description
list[MemoryItem]

List of matching memory items

Source code in src/memoir/store/prolly_adapter.py
async def retrieve_memories_async(
    self, namespace: str, query: str, limit: int = 10
) -> list[MemoryItem]:
    """
    Retrieve memories using semantic search (async version).

    Args:
        namespace: User/agent namespace
        query: Search query
        limit: Maximum number of results

    Returns:
        List of matching memory items
    """
    # Use the hierarchical search engine to find relevant memories
    search_results = await self.search_engine.search(query, namespace)

    # Convert search results to memory items with deduplication
    memories = []
    seen_content = set()

    for result in search_results:
        # The search result contains combined content from multiple items
        if result.combined_content:
            try:
                # Split combined content back into individual memories
                individual_contents = result.combined_content.split(" | ")
                for content_text in individual_contents:
                    if content_text.strip():
                        # Create a memory item from the content
                        memory = MemoryItem(
                            key=result.path,
                            namespace=result.namespace,
                            content=content_text.strip(),
                            confidence=1.0,  # Default confidence
                            timestamp=time.time(),
                        )
                        # Deduplicate by content
                        content_hash = hash(memory.content)
                        if content_hash not in seen_content:
                            seen_content.add(content_hash)
                            memories.append(memory)
                            # Stop when we have enough unique results
                            if len(memories) >= limit:
                                break
                if len(memories) >= limit:
                    break
            except Exception as e:
                logger.warning(f"Failed to parse memory item: {e}")

    return memories

retrieve_memories

retrieve_memories(namespace: str, query: str, limit: int = 10) -> list[MemoryItem]

Retrieve memories using semantic search (sync fallback).

Note: This is a simple fallback. For proper semantic search, use retrieve_memories_async() which leverages the HierarchicalSearchEngine.

Parameters:

Name Type Description Default
namespace str

User/agent namespace

required
query str

Search query

required
limit int

Maximum number of results

10

Returns:

Type Description
list[MemoryItem]

List of matching memory items

Source code in src/memoir/store/prolly_adapter.py
def retrieve_memories(
    self, namespace: str, query: str, limit: int = 10
) -> list[MemoryItem]:
    """
    Retrieve memories using semantic search (sync fallback).

    Note: This is a simple fallback. For proper semantic search,
    use retrieve_memories_async() which leverages the HierarchicalSearchEngine.

    Args:
        namespace: User/agent namespace
        query: Search query
        limit: Maximum number of results

    Returns:
        List of matching memory items
    """
    logger.warning(
        "Using fallback sync search. For better results, use retrieve_memories_async()"
    )

    # Simple fallback - just return all memories
    all_memories = []
    search_results = self.search((namespace,), limit=limit)

    for _, _key, data in search_results:
        if isinstance(data, dict):
            try:
                memory = MemoryItem(**data)
                all_memories.append(memory)
            except Exception as e:
                logger.warning(f"Failed to parse memory item: {e}")

    return all_memories

get_statistics

get_statistics() -> dict[str, Any]

Get store statistics.

Source code in src/memoir/store/prolly_adapter.py
def get_statistics(self) -> dict[str, Any]:
    """Get store statistics."""
    stats = {
        "performance": self._stats.copy(),
        "total_keys": len(self._keys),
        "total_namespaces": len({key.split(":")[0] for key in self._keys}),
    }

    if self.enable_versioning and hasattr(self.tree, "get_head"):
        try:
            stats["versioning"] = {
                "current_commit": self.tree.get_head(),
            }
            if hasattr(self.tree, "log"):
                commits = self.tree.log()
                stats["versioning"]["total_commits"] = len(commits)
        except Exception:
            pass

    return stats

export_namespace

export_namespace(namespace: str, output_path: str) -> None

Export all memories from a namespace to JSON.

Parameters:

Name Type Description Default
namespace str

Namespace to export

required
output_path str

Path to save JSON file

required
Source code in src/memoir/store/prolly_adapter.py
def export_namespace(self, namespace: str, output_path: str) -> None:
    """
    Export all memories from a namespace to JSON.

    Args:
        namespace: Namespace to export
        output_path: Path to save JSON file
    """
    memories = {}
    search_results = self.search((namespace,), limit=1000)

    for _, key, data in search_results:
        memories[key] = data

    with open(output_path, "w") as f:
        json.dump(
            {
                "namespace": namespace,
                "timestamp": time.time(),
                "memories": memories,
            },
            f,
            indent=2,
        )

    logger.info(f"Exported {len(memories)} memories to {output_path}")

SemanticTaxonomy

Bases: BaseTaxonomy

Fixed semantic taxonomy with predefined paths. Provides hierarchical organization for AI memory classification. Implements TaxonomyInterface for standardized access.

Source code in src/memoir/taxonomy/semantic.py
class SemanticTaxonomy(BaseTaxonomy):
    """
    Fixed semantic taxonomy with predefined paths.
    Provides hierarchical organization for AI memory classification.
    Implements TaxonomyInterface for standardized access.
    """

    def __init__(self, taxonomy_loader: Any | None = None):
        """
        Initialize semantic taxonomy with flexible data loading.

        Args:
            taxonomy_loader: Optional TaxonomyLoader for loading taxonomy from store.
                            If None, uses TaxonomyPresets as fallback.
        """
        self._taxonomy_loader = taxonomy_loader
        self._all_paths = self._load_all_paths()
        self._path_index = self._build_path_index()

    def _load_all_paths(self) -> set[str]:
        """
        Load all paths from TaxonomyLoader or fallback to TaxonomyPresets.

        Returns:
            Set of all valid taxonomy paths.
        """
        paths = set()

        # Try to load from TaxonomyLoader (store-based)
        if self._taxonomy_loader:
            try:
                preset_paths = self._taxonomy_loader.get_preset_paths_from_store()
                if preset_paths:
                    for category, category_paths in preset_paths.items():
                        # Add the category itself
                        paths.add(category)
                        for path in category_paths:
                            full_path = f"{category}.{path}"
                            paths.add(full_path)
                            # Also add intermediate paths
                            parts = full_path.split(".")
                            for i in range(1, len(parts)):
                                paths.add(".".join(parts[:i]))
                    logger.debug(
                        f"[SemanticTaxonomy] Loaded {len(paths)} paths from store"
                    )
                    return paths
            except Exception as e:
                logger.warning(
                    f"[SemanticTaxonomy] Failed to load from store, using fallback: {e}"
                )

        # Fallback to TaxonomyPresets
        from .taxonomy import TaxonomyPresets, TaxonomyVersion

        preset_paths = TaxonomyPresets.PRESETS[TaxonomyVersion.SIMPLIFIED]
        for category, category_paths in preset_paths.items():
            # Add the category itself
            paths.add(category)
            for path in category_paths:
                full_path = f"{category}.{path}"
                paths.add(full_path)
                # Also add intermediate paths
                parts = full_path.split(".")
                for i in range(1, len(parts)):
                    paths.add(".".join(parts[:i]))

        logger.debug(
            f"[SemanticTaxonomy] Loaded {len(paths)} paths from TaxonomyPresets"
        )
        return paths

    def _build_path_index(self) -> dict[str, list[str]]:
        """Build an index for efficient path lookups."""
        index = {}
        for path in self._all_paths:
            parts = path.split(".")
            for i in range(len(parts)):
                prefix = ".".join(parts[: i + 1])
                if prefix not in index:
                    index[prefix] = []
                if path != prefix:
                    index[prefix].append(path)
        return index

    def get_all_paths(self) -> list[str]:
        """Return all valid taxonomy paths."""
        return sorted(self._all_paths)

    def get_children(self, path: str) -> list[str]:
        """Get immediate children of a path."""
        if path not in self._path_index:
            return []

        children = []
        path_depth = len(path.split("."))
        for child in self._path_index[path]:
            if len(child.split(".")) == path_depth + 1:
                children.append(child)
        return sorted(children)

    def get_descendants(self, path: str) -> list[str]:
        """Get all descendants of a path."""
        if path not in self._path_index:
            return []
        return sorted(self._path_index[path])

    def is_valid_path(self, path: str) -> bool:
        """Check if a path exists in the taxonomy."""
        return path in self._all_paths

    def get_path_depth(self, path: str) -> int:
        """Get the depth of a path in the hierarchy."""
        return len(path.split("."))

    def get_category(self, path: str) -> TaxonomyCategory:
        """Get the top-level category for a path."""
        if not path:
            return None
        root = path.split(".")[0]
        try:
            return TaxonomyCategory(root)
        except ValueError:
            return None

    def get_related_paths(self, path: str, max_distance: int = 2) -> list[str]:
        """Get paths related to the given path within a certain distance."""
        if not self.is_valid_path(path):
            return []

        related = set()
        parts = path.split(".")

        # Get siblings
        if len(parts) > 1:
            parent = ".".join(parts[:-1])
            related.update(self.get_children(parent))

        # Get ancestors up to max_distance
        for i in range(1, min(max_distance + 1, len(parts))):
            ancestor = ".".join(parts[:-i])
            related.add(ancestor)

        # Get descendants up to max_distance
        if max_distance > 0:
            descendants = self.get_descendants(path)
            for desc in descendants:
                if (
                    self.get_path_depth(desc) - self.get_path_depth(path)
                    <= max_distance
                ):
                    related.add(desc)

        related.discard(path)  # Remove the path itself
        return sorted(related)

    def get_statistics(self) -> dict:
        """Get statistics about the taxonomy."""
        category_counts = {}
        depth_counts = {}

        for path in self._all_paths:
            category = self.get_category(path)
            if category:
                cat_name = category.value
                category_counts[cat_name] = category_counts.get(cat_name, 0) + 1

            depth = self.get_path_depth(path)
            depth_counts[depth] = depth_counts.get(depth, 0) + 1

        return {
            "total_paths": len(self._all_paths),
            "categories": len(list(TaxonomyCategory)),
            "max_depth": max(depth_counts.keys()),
            "paths_by_category": category_counts,
            "paths_by_depth": depth_counts,
        }

__init__

__init__(taxonomy_loader: Any | None = None)

Initialize semantic taxonomy with flexible data loading.

Parameters:

Name Type Description Default
taxonomy_loader Any | None

Optional TaxonomyLoader for loading taxonomy from store. If None, uses TaxonomyPresets as fallback.

None
Source code in src/memoir/taxonomy/semantic.py
def __init__(self, taxonomy_loader: Any | None = None):
    """
    Initialize semantic taxonomy with flexible data loading.

    Args:
        taxonomy_loader: Optional TaxonomyLoader for loading taxonomy from store.
                        If None, uses TaxonomyPresets as fallback.
    """
    self._taxonomy_loader = taxonomy_loader
    self._all_paths = self._load_all_paths()
    self._path_index = self._build_path_index()

get_all_paths

get_all_paths() -> list[str]

Return all valid taxonomy paths.

Source code in src/memoir/taxonomy/semantic.py
def get_all_paths(self) -> list[str]:
    """Return all valid taxonomy paths."""
    return sorted(self._all_paths)

get_children

get_children(path: str) -> list[str]

Get immediate children of a path.

Source code in src/memoir/taxonomy/semantic.py
def get_children(self, path: str) -> list[str]:
    """Get immediate children of a path."""
    if path not in self._path_index:
        return []

    children = []
    path_depth = len(path.split("."))
    for child in self._path_index[path]:
        if len(child.split(".")) == path_depth + 1:
            children.append(child)
    return sorted(children)

get_descendants

get_descendants(path: str) -> list[str]

Get all descendants of a path.

Source code in src/memoir/taxonomy/semantic.py
def get_descendants(self, path: str) -> list[str]:
    """Get all descendants of a path."""
    if path not in self._path_index:
        return []
    return sorted(self._path_index[path])

is_valid_path

is_valid_path(path: str) -> bool

Check if a path exists in the taxonomy.

Source code in src/memoir/taxonomy/semantic.py
def is_valid_path(self, path: str) -> bool:
    """Check if a path exists in the taxonomy."""
    return path in self._all_paths

get_path_depth

get_path_depth(path: str) -> int

Get the depth of a path in the hierarchy.

Source code in src/memoir/taxonomy/semantic.py
def get_path_depth(self, path: str) -> int:
    """Get the depth of a path in the hierarchy."""
    return len(path.split("."))

get_category

get_category(path: str) -> TaxonomyCategory

Get the top-level category for a path.

Source code in src/memoir/taxonomy/semantic.py
def get_category(self, path: str) -> TaxonomyCategory:
    """Get the top-level category for a path."""
    if not path:
        return None
    root = path.split(".")[0]
    try:
        return TaxonomyCategory(root)
    except ValueError:
        return None
get_related_paths(path: str, max_distance: int = 2) -> list[str]

Get paths related to the given path within a certain distance.

Source code in src/memoir/taxonomy/semantic.py
def get_related_paths(self, path: str, max_distance: int = 2) -> list[str]:
    """Get paths related to the given path within a certain distance."""
    if not self.is_valid_path(path):
        return []

    related = set()
    parts = path.split(".")

    # Get siblings
    if len(parts) > 1:
        parent = ".".join(parts[:-1])
        related.update(self.get_children(parent))

    # Get ancestors up to max_distance
    for i in range(1, min(max_distance + 1, len(parts))):
        ancestor = ".".join(parts[:-i])
        related.add(ancestor)

    # Get descendants up to max_distance
    if max_distance > 0:
        descendants = self.get_descendants(path)
        for desc in descendants:
            if (
                self.get_path_depth(desc) - self.get_path_depth(path)
                <= max_distance
            ):
                related.add(desc)

    related.discard(path)  # Remove the path itself
    return sorted(related)

get_statistics

get_statistics() -> dict

Get statistics about the taxonomy.

Source code in src/memoir/taxonomy/semantic.py
def get_statistics(self) -> dict:
    """Get statistics about the taxonomy."""
    category_counts = {}
    depth_counts = {}

    for path in self._all_paths:
        category = self.get_category(path)
        if category:
            cat_name = category.value
            category_counts[cat_name] = category_counts.get(cat_name, 0) + 1

        depth = self.get_path_depth(path)
        depth_counts[depth] = depth_counts.get(depth, 0) + 1

    return {
        "total_paths": len(self._all_paths),
        "categories": len(list(TaxonomyCategory)),
        "max_depth": max(depth_counts.keys()),
        "paths_by_category": category_counts,
        "paths_by_depth": depth_counts,
    }

TaxonomyCategory

Bases: Enum

Top-level taxonomy categories.

Source code in src/memoir/taxonomy/semantic.py
class TaxonomyCategory(Enum):
    """Top-level taxonomy categories."""

    PROFILE = "profile"
    PREFERENCES = "preferences"
    EXPERIENCE = "experience"
    CONTEXT = "context"
    KNOWLEDGE = "knowledge"
    RELATIONSHIPS = "relationships"
    GOALS = "goals"
    BEHAVIOR = "behavior"

get_taxonomy

get_taxonomy() -> SemanticTaxonomy

Get the thread-safe singleton taxonomy instance.

Source code in src/memoir/taxonomy/semantic.py
def get_taxonomy() -> SemanticTaxonomy:
    """Get the thread-safe singleton taxonomy instance."""
    global _taxonomy_instance
    if _taxonomy_instance is None:
        with _taxonomy_lock:
            # Double-check locking pattern
            if _taxonomy_instance is None:
                _taxonomy_instance = SemanticTaxonomy()
    return _taxonomy_instance

Subpackages

Core Module

memoir.core.memory module

memoir.core.memory

Provides high-performance semantic memory with versioning capabilities.

Memory

Bases: BaseModel

Represents a memory object compatible with LangMem.

Source code in src/memoir/core/memory.py
class Memory(BaseModel):
    """Represents a memory object compatible with LangMem."""

    id: str = Field(description="Memory identifier")
    content: Any = Field(description="Memory content")
    metadata: dict[str, Any] = Field(
        default_factory=dict, description="Memory metadata"
    )

MemoryVersion

Bases: BaseModel

Represents a version of a memory.

Source code in src/memoir/core/memory.py
class MemoryVersion(BaseModel):
    """Represents a version of a memory."""

    commit_id: str
    timestamp: float
    content: Any
    metadata: dict[str, Any]
    message: str
    author: str | None = None

ProllyTreeMemoryStoreManager

Bases: MemoryStoreManager

Enhanced MemoryStoreManager with ProllyTree backend. Provides semantic classification, hierarchical search, and versioning.

Source code in src/memoir/core/memory.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
class ProllyTreeMemoryStoreManager(MemoryStoreManager):
    """
    Enhanced MemoryStoreManager with ProllyTree backend.
    Provides semantic classification, hierarchical search, and versioning.
    """

    def __init__(
        self,
        prolly_store: Any | None = None,  # ProllyTreeStore instance (preferred)
        prolly_path: str | None = None,  # Path to create store (fallback)
        model: str | Any = "gpt-3.5-turbo",  # Default model
        classifier: (
            Any | None
        ) = None,  # SemanticClassifier or IntelligentClassifier instance
        search_engine: Any | None = None,  # Search engine instance
        enable_versioning: bool = True,
        auto_commit: bool = True,
        enable_fast_classification: bool = True,
        cache_size: int = 10000,
        **kwargs,
    ):
        """
        Initialize enhanced memory manager.

        Args:
            prolly_store: ProllyTreeStore instance (preferred - allows proper dependency injection)
            prolly_path: Path to create ProllyTree database (fallback if store not provided)
            classifier: SemanticClassifier or IntelligentClassifier instance
            search_engine: Search engine instance (IntelligentSearchEngine, etc.)
            enable_versioning: Enable git-like versioning
            auto_commit: Whether to automatically commit on each memory operation
            enable_fast_classification: Use optimized classifier
            cache_size: Size of internal caches
            **kwargs: Additional arguments for MemoryStoreManager
        """
        # Initialize classifier - must be provided for production use
        self.classifier = classifier

        # Initialize or use provided ProllyTree store
        if prolly_store is not None:
            # Use provided store (preferred for dependency injection)
            self.prolly_store = prolly_store
        elif prolly_path is not None:
            # Create store from path (fallback)
            # Path-based construction is the SDK fallback / auto-create
            # entry point. ProllyTreeStore itself is strict, so bootstrap
            # the store via StoreService first if it doesn't exist yet.
            from memoir.services.store_service import StoreService

            StoreService(prolly_path).create_store(prolly_path)
            self.prolly_store = ProllyTreeStore(
                path=prolly_path,
                enable_versioning=enable_versioning,
                auto_commit=auto_commit,
                cache_size=cache_size,
            )
        else:
            raise ValueError("Either prolly_store or prolly_path must be provided")

        # Initialize profile memento
        self.profile_manager = ProfileMemento(self.prolly_store)

        # Initialize timeline memento
        self.timeline_manager = TimelineMemento(self.prolly_store)

        # Initialize location memento
        self.location_manager = LocationMemento(self.prolly_store)

        # Use provided search engine
        self.search_engine = search_engine

        self.enable_versioning = enable_versioning
        self.enable_fast_classification = enable_fast_classification

        # Performance metrics
        self._metrics = {
            "searches": 0,
            "search_time_ms": [],
            "writes": 0,
            "write_time_ms": [],
            "classifications": 0,
            "classification_time_ms": [],
        }

        # Initialize parent class with ProllyTree store
        super().__init__(model, store=self.prolly_store, **kwargs)

    async def search_memories(
        self,
        query: str,
        namespace: str,
        limit: int = 10,
    ) -> list[Memory]:
        """
        Search memories using the provided search engine.

        Args:
            query: Natural language search query
            namespace: User namespace
            limit: Maximum results to return

        Returns:
            List of Memory objects
        """
        if not self.search_engine:
            logger.warning("No search engine provided - returning empty results")
            return []

        start_time = time.time()
        self._metrics["searches"] += 1

        # Use the provided search engine
        search_results = await self.search_engine.search(
            query=query, namespace=namespace, limit=limit
        )

        # Convert IntelligentSearchResult objects to Memory objects
        memories = []
        for result in search_results[:limit]:
            memory = Memory(
                id=result.path,
                content=result.content,
                metadata=result.metadata,
            )
            memories.append(memory)

        search_time = (time.time() - start_time) * 1000
        self._metrics["search_time_ms"].append(search_time)

        logger.info(
            f"Search completed in {search_time:.2f}ms, found {len(memories)} memories"
        )

        return memories

    async def store_memory(
        self,
        content: Any,
        namespace: str,
        metadata: dict | None = None,
        auto_classify: bool = True,
    ) -> str:
        """
        Store a memory with automatic semantic classification.

        Args:
            content: Memory content to store
            namespace: User namespace
            metadata: Optional metadata
            auto_classify: Whether to auto-classify the content

        Returns:
            Semantic key where memory was stored
        """
        start_time = time.time()
        self._metrics["writes"] += 1

        if auto_classify and self.classifier:
            # Use LLM classification
            classification_start = time.time()
            self._metrics["classifications"] += 1

            # Use async classification with metadata
            classification = await self.classifier.classify_async(
                str(content), metadata=metadata
            )
            # Handle different classifier result formats
            if hasattr(classification, "primary_path"):
                semantic_key = classification.primary_path  # SemanticClassifier
            else:
                semantic_key = classification.path  # IntelligentClassifier

            # Handle case where classification fails and returns None path
            if semantic_key is None:
                logger.warning("Classification returned None path, using fallback")
                semantic_key = "context.current.session.topic.main"

            classification_time = (time.time() - classification_start) * 1000
            self._metrics["classification_time_ms"].append(classification_time)

            # Apply profile updates if detected
            if (
                hasattr(classification, "profile_updates")
                and classification.profile_updates
            ):
                try:
                    await self.profile_manager.apply_profile_updates(
                        classification.profile_updates, metadata, namespace
                    )
                    # logger.info(
                    #     f"Applied {len(classification.profile_updates)} profile updates"
                    # )
                except Exception as e:
                    logger.error(f"Failed to apply profile updates: {e}")

            # Apply timeline events if detected
            if (
                hasattr(classification, "timeline_events")
                and classification.timeline_events
            ):
                try:
                    await self.timeline_manager.apply_timeline_events(
                        classification.timeline_events, metadata, namespace=namespace
                    )
                    # logger.info(
                    #     f"Applied {len(classification.timeline_events)} timeline events"
                    # )
                except Exception as e:
                    logger.error(f"Failed to apply timeline events: {e}")

            # Add classification metadata
            if metadata is None:
                metadata = {}
            metadata["classification_confidence"] = classification.confidence
            metadata["classification_reasoning"] = classification.reasoning

        else:
            # Use provided key or generate one
            semantic_key = metadata.get("key") if metadata else None
            if not semantic_key:
                semantic_key = "context.current.session.topic.main"

        # Store using the asynchronous method (proper async context)
        await self.prolly_store.store_memory_async(namespace, content, semantic_key)

        write_time = (time.time() - start_time) * 1000
        self._metrics["write_time_ms"].append(write_time)

        # logger.debug(f"Stored memory at {semantic_key} in {write_time:.2f}ms")

        return semantic_key

    def store_commit(self, message: str = "Batch memory operations") -> str | None:
        """
        Commit all pending memory operations to the versioned store.

        This is used when auto_commit=False is set on the ProllyTreeStore to batch
        multiple memory operations into a single commit.

        Args:
            message: Commit message describing the batch of operations

        Returns:
            Commit hash if versioning is enabled, None otherwise
        """
        if not self.enable_versioning:
            logger.warning("Commit requested but versioning is not enabled")
            return None

        try:
            commit_hash = self.prolly_store.commit(message)
            logger.info(f"Committed batch operations: {message}")
            return commit_hash
        except Exception as e:
            logger.error(f"Error committing batch operations: {e}")
            raise

    async def get_memory_versions(
        self, semantic_key: str, namespace: str, limit: int = 10
    ) -> list[MemoryVersion]:
        """
        Get version history for a memory.

        Args:
            semantic_key: Semantic taxonomy key
            namespace: User namespace
            limit: Maximum versions to return

        Returns:
            List of memory versions
        """
        if not self.enable_versioning:
            logger.warning("Versioning is not enabled")
            return []

        # Convert namespace to tuple format
        namespace_tuple = (
            tuple(namespace.split(":")) if ":" in namespace else (namespace,)
        )

        # Get commit history for this key using the new method
        commit_history = self.prolly_store.get_key_history(
            namespace_tuple, semantic_key, limit
        )

        # Get current content as fallback since historical content retrieval is not yet implemented
        current_content = self.prolly_store.get(namespace_tuple, semantic_key)

        versions = []
        for i, commit in enumerate(commit_history):
            # Try to get content at this commit (currently returns None)
            content_at_commit = self.prolly_store.get_key_at_commit(
                namespace_tuple, semantic_key, commit["id"]
            )

            # If historical content is not available, use current content for demonstration
            if content_at_commit is None and current_content:
                # For the most recent commit, use current content
                if i == 0:  # Most recent commit
                    if (
                        isinstance(current_content, dict)
                        and "memories" in current_content
                    ):
                        # Extract from aggregated memory
                        memories = current_content.get("memories", [])
                        if memories:
                            latest_memory = memories[-1]
                            actual_content = latest_memory.get("content", "")
                        else:
                            actual_content = ""
                    else:
                        actual_content = (
                            current_content.get("content", "")
                            if isinstance(current_content, dict)
                            else current_content
                        )
                else:
                    # For older commits, indicate historical content is not available
                    actual_content = f"[Historical content for commit {commit['id'][:8]} not available]"
            else:
                actual_content = content_at_commit or ""

            # Convert commit info to MemoryVersion
            version = MemoryVersion(
                commit_id=commit["id"],
                timestamp=commit["timestamp"],
                content=actual_content,
                metadata={
                    "author": commit.get("author", ""),
                    "committer": commit.get("committer", ""),
                },
                message=commit["message"],
                author=commit.get("author", ""),
            )
            versions.append(version)

        logger.info(f"Retrieved {len(versions)} version(s) for {semantic_key}")
        return versions

    async def time_travel(
        self, namespace: str, target_time: datetime | float
    ) -> dict[str, Any]:
        """
        Get all memories as they were at a specific time.

        Args:
            namespace: User namespace
            target_time: Target datetime or unix timestamp

        Returns:
            Dictionary of memories at that time
        """
        if isinstance(target_time, datetime):
            timestamp = target_time.timestamp()
        else:
            timestamp = target_time

        # Convert namespace to tuple format
        namespace_tuple = (
            tuple(namespace.split(":")) if ":" in namespace else (namespace,)
        )

        # For branch-based time travel, we need to use snapshots
        # Create snapshot name based on timestamp
        snapshot_name = f"snapshot_{int(timestamp)}"

        # Check if we have this snapshot
        if self.enable_versioning and hasattr(self.prolly_store.tree, "list_branches"):
            try:
                branches = self.prolly_store.tree.list_branches()
                if snapshot_name in branches:
                    # Use the snapshot to get historical state
                    state = self.prolly_store.get_state_at_snapshot(
                        namespace_tuple, snapshot_name
                    )
                    logger.info(f"Retrieved state from snapshot {snapshot_name}")
                    return state
                else:
                    logger.warning(
                        f"No snapshot found for timestamp {timestamp}, returning current state"
                    )
            except Exception as e:
                logger.error(f"Error accessing time travel snapshot: {e}")

        # Fallback: return current state
        search_results = self.prolly_store.search(namespace_tuple, limit=1000)
        current_state = {}
        for _, key, data in search_results:
            current_state[key] = data

        return current_state

    async def create_memory_snapshot(
        self, namespace: str, snapshot_name: str | None = None
    ) -> str:
        """
        Create a snapshot of the current memory state.

        Args:
            namespace: User namespace
            snapshot_name: Optional name for snapshot (auto-generated if not provided)

        Returns:
            Name of the created snapshot
        """
        if not self.enable_versioning:
            raise ValueError("Snapshots require versioning to be enabled")

        if snapshot_name is None:
            # Auto-generate snapshot name with timestamp
            snapshot_name = f"snapshot_{int(time.time())}"

        # Create the snapshot
        success = self.prolly_store.create_time_snapshot(snapshot_name)

        if success:
            logger.info(f"Created memory snapshot: {snapshot_name}")
            return snapshot_name
        else:
            raise RuntimeError(f"Failed to create snapshot: {snapshot_name}")

    async def compare_memory_states(
        self,
        namespace: str,
        time1: datetime | float,
        time2: datetime | float,
    ) -> dict[str, Any]:
        """
        Compare memory states between two points in time.

        Args:
            namespace: User namespace
            time1: First timestamp
            time2: Second timestamp

        Returns:
            Comparison results with added/removed/changed memories
        """
        if isinstance(time1, datetime):
            time1 = time1.timestamp()
        if isinstance(time2, datetime):
            time2 = time2.timestamp()

        state1 = await self.time_travel(namespace, time1)
        state2 = await self.time_travel(namespace, time2)

        keys1 = set(state1.keys())
        keys2 = set(state2.keys())

        comparison = {
            "added": {k: state2[k] for k in keys2 - keys1},
            "removed": {k: state1[k] for k in keys1 - keys2},
            "changed": {},
            "unchanged": [],
        }

        for key in keys1 & keys2:
            if state1[key] != state2[key]:
                comparison["changed"][key] = {
                    "before": state1[key],
                    "after": state2[key],
                }
            else:
                comparison["unchanged"].append(key)

        return comparison

    async def branch_memories(self, namespace: str, branch_name: str) -> str:
        """
        Create a new branch of memories for experimentation.

        Args:
            namespace: User namespace
            branch_name: Name for the new branch

        Returns:
            Branch identifier
        """
        if not self.enable_versioning:
            raise ValueError("Branching requires versioning to be enabled")

        # Implementation would create a new branch in ProllyTree
        branch_id = f"{namespace}:{branch_name}:{time.time()}"
        # logger.info(f"Created memory branch: {branch_id}")

        return branch_id

    async def merge_memories(
        self,
        namespace: str,
        source_branch: str,
        target_branch: str = "main",
        strategy: str = "ours",
    ) -> dict[str, Any]:
        """
        Merge memories from one branch to another.

        Args:
            namespace: User namespace
            source_branch: Source branch to merge from
            target_branch: Target branch to merge into
            strategy: Merge strategy ("ours", "theirs", "union")

        Returns:
            Merge results with conflicts if any
        """
        if not self.enable_versioning:
            raise ValueError("Merging requires versioning to be enabled")

        # Implementation would handle branch merging
        merge_result = {"merged": 0, "conflicts": [], "strategy": strategy}

        # logger.info(f"Merged {source_branch} into {target_branch}")

        return merge_result

    def get_performance_metrics(self) -> dict[str, Any]:
        """Get performance metrics for the memory system."""
        metrics = self._metrics.copy()

        # Calculate averages
        if metrics["search_time_ms"]:
            metrics["avg_search_time_ms"] = sum(metrics["search_time_ms"]) / len(
                metrics["search_time_ms"]
            )
            metrics["p95_search_time_ms"] = (
                sorted(metrics["search_time_ms"])[
                    int(len(metrics["search_time_ms"]) * 0.95)
                ]
                if len(metrics["search_time_ms"]) > 1
                else metrics["search_time_ms"][0]
            )

        if metrics["write_time_ms"]:
            metrics["avg_write_time_ms"] = sum(metrics["write_time_ms"]) / len(
                metrics["write_time_ms"]
            )

        if metrics["classification_time_ms"]:
            metrics["avg_classification_time_ms"] = sum(
                metrics["classification_time_ms"]
            ) / len(metrics["classification_time_ms"])

        # Add component statistics
        try:
            metrics["store"] = self.prolly_store.get_statistics()
        except Exception as e:
            logger.warning(f"Failed to get store statistics: {e}")
            metrics["store"] = {}

        # Add classifier statistics if available
        if hasattr(self.classifier, "get_statistics"):
            try:
                metrics["classifier"] = self.classifier.get_statistics()
            except Exception as e:
                logger.warning(f"Failed to get classifier statistics: {e}")
                metrics["classifier"] = {}

        # Add search engine statistics if available
        if hasattr(self.search_engine, "get_statistics"):
            try:
                metrics["search_engine"] = self.search_engine.get_statistics()
            except Exception as e:
                logger.warning(f"Failed to get search engine statistics: {e}")
                metrics["search_engine"] = {}

        return metrics

    async def optimize_memory_layout(self, namespace: str) -> dict[str, Any]:
        """
        Optimize memory layout for better performance.
        Reorganizes memories based on access patterns.

        Args:
            namespace: User namespace to optimize

        Returns:
            Optimization results
        """
        start_time = time.time()

        # Get all memories
        namespace_tuple = (
            tuple(namespace.split(":")) if ":" in namespace else (namespace,)
        )
        search_results = self.prolly_store.search(namespace_tuple, limit=1000)
        all_keys = [key for _, key, _ in search_results]

        # Analyze access patterns (would need access logs in production)
        # For now, we'll just report current organization

        category_counts = {}
        depth_counts = {}

        for key in all_keys:
            parts = key.split(".")
            if parts:
                category = parts[0]
                category_counts[category] = category_counts.get(category, 0) + 1

                depth = len(parts)
                depth_counts[depth] = depth_counts.get(depth, 0) + 1

        optimization_time = time.time() - start_time

        return {
            "total_memories": len(all_keys),
            "categories": category_counts,
            "depth_distribution": depth_counts,
            "optimization_time_seconds": optimization_time,
            "recommendations": [
                "Consider moving frequently accessed memories to shallower paths",
                "Group related memories under common prefixes for faster retrieval",
                "Archive old memories to separate namespace for better performance",
            ],
        }

    async def export_memories(
        self, namespace: str, output_path: str, format: str = "json"
    ) -> None:
        """
        Export memories to file.

        Args:
            namespace: Namespace to export
            output_path: Output file path
            format: Export format (json, csv, markdown)
        """
        self.prolly_store.export_namespace(namespace, output_path)
        # logger.info(f"Exported memories to {output_path}")

    async def import_memories(
        self, input_path: str, namespace: str | None = None
    ) -> int:
        """
        Import memories from file.

        Args:
            input_path: Input file path
            namespace: Override namespace (uses file namespace if None)

        Returns:
            Number of memories imported
        """
        logger.warning(
            "Import functionality not yet implemented in ProllyTreeStore adapter"
        )

        # Parse file to get count and simulate import
        with open(input_path) as f:
            data = json.load(f)
            memories = data.get("memories", {})

            # For demonstration, we could import the memories one by one
            # but for now just return the count
            count = len(memories)

        # logger.info(f"Would import {count} memories from {input_path}")
        return count

__init__

__init__(prolly_store: Any | None = None, prolly_path: str | None = None, model: str | Any = 'gpt-3.5-turbo', classifier: Any | None = None, search_engine: Any | None = None, enable_versioning: bool = True, auto_commit: bool = True, enable_fast_classification: bool = True, cache_size: int = 10000, **kwargs)

Initialize enhanced memory manager.

Parameters:

Name Type Description Default
prolly_store Any | None

ProllyTreeStore instance (preferred - allows proper dependency injection)

None
prolly_path str | None

Path to create ProllyTree database (fallback if store not provided)

None
classifier Any | None

SemanticClassifier or IntelligentClassifier instance

None
search_engine Any | None

Search engine instance (IntelligentSearchEngine, etc.)

None
enable_versioning bool

Enable git-like versioning

True
auto_commit bool

Whether to automatically commit on each memory operation

True
enable_fast_classification bool

Use optimized classifier

True
cache_size int

Size of internal caches

10000
**kwargs

Additional arguments for MemoryStoreManager

{}
Source code in src/memoir/core/memory.py
def __init__(
    self,
    prolly_store: Any | None = None,  # ProllyTreeStore instance (preferred)
    prolly_path: str | None = None,  # Path to create store (fallback)
    model: str | Any = "gpt-3.5-turbo",  # Default model
    classifier: (
        Any | None
    ) = None,  # SemanticClassifier or IntelligentClassifier instance
    search_engine: Any | None = None,  # Search engine instance
    enable_versioning: bool = True,
    auto_commit: bool = True,
    enable_fast_classification: bool = True,
    cache_size: int = 10000,
    **kwargs,
):
    """
    Initialize enhanced memory manager.

    Args:
        prolly_store: ProllyTreeStore instance (preferred - allows proper dependency injection)
        prolly_path: Path to create ProllyTree database (fallback if store not provided)
        classifier: SemanticClassifier or IntelligentClassifier instance
        search_engine: Search engine instance (IntelligentSearchEngine, etc.)
        enable_versioning: Enable git-like versioning
        auto_commit: Whether to automatically commit on each memory operation
        enable_fast_classification: Use optimized classifier
        cache_size: Size of internal caches
        **kwargs: Additional arguments for MemoryStoreManager
    """
    # Initialize classifier - must be provided for production use
    self.classifier = classifier

    # Initialize or use provided ProllyTree store
    if prolly_store is not None:
        # Use provided store (preferred for dependency injection)
        self.prolly_store = prolly_store
    elif prolly_path is not None:
        # Create store from path (fallback)
        # Path-based construction is the SDK fallback / auto-create
        # entry point. ProllyTreeStore itself is strict, so bootstrap
        # the store via StoreService first if it doesn't exist yet.
        from memoir.services.store_service import StoreService

        StoreService(prolly_path).create_store(prolly_path)
        self.prolly_store = ProllyTreeStore(
            path=prolly_path,
            enable_versioning=enable_versioning,
            auto_commit=auto_commit,
            cache_size=cache_size,
        )
    else:
        raise ValueError("Either prolly_store or prolly_path must be provided")

    # Initialize profile memento
    self.profile_manager = ProfileMemento(self.prolly_store)

    # Initialize timeline memento
    self.timeline_manager = TimelineMemento(self.prolly_store)

    # Initialize location memento
    self.location_manager = LocationMemento(self.prolly_store)

    # Use provided search engine
    self.search_engine = search_engine

    self.enable_versioning = enable_versioning
    self.enable_fast_classification = enable_fast_classification

    # Performance metrics
    self._metrics = {
        "searches": 0,
        "search_time_ms": [],
        "writes": 0,
        "write_time_ms": [],
        "classifications": 0,
        "classification_time_ms": [],
    }

    # Initialize parent class with ProllyTree store
    super().__init__(model, store=self.prolly_store, **kwargs)

search_memories async

search_memories(query: str, namespace: str, limit: int = 10) -> list[Memory]

Search memories using the provided search engine.

Parameters:

Name Type Description Default
query str

Natural language search query

required
namespace str

User namespace

required
limit int

Maximum results to return

10

Returns:

Type Description
list[Memory]

List of Memory objects

Source code in src/memoir/core/memory.py
async def search_memories(
    self,
    query: str,
    namespace: str,
    limit: int = 10,
) -> list[Memory]:
    """
    Search memories using the provided search engine.

    Args:
        query: Natural language search query
        namespace: User namespace
        limit: Maximum results to return

    Returns:
        List of Memory objects
    """
    if not self.search_engine:
        logger.warning("No search engine provided - returning empty results")
        return []

    start_time = time.time()
    self._metrics["searches"] += 1

    # Use the provided search engine
    search_results = await self.search_engine.search(
        query=query, namespace=namespace, limit=limit
    )

    # Convert IntelligentSearchResult objects to Memory objects
    memories = []
    for result in search_results[:limit]:
        memory = Memory(
            id=result.path,
            content=result.content,
            metadata=result.metadata,
        )
        memories.append(memory)

    search_time = (time.time() - start_time) * 1000
    self._metrics["search_time_ms"].append(search_time)

    logger.info(
        f"Search completed in {search_time:.2f}ms, found {len(memories)} memories"
    )

    return memories

store_memory async

store_memory(content: Any, namespace: str, metadata: dict | None = None, auto_classify: bool = True) -> str

Store a memory with automatic semantic classification.

Parameters:

Name Type Description Default
content Any

Memory content to store

required
namespace str

User namespace

required
metadata dict | None

Optional metadata

None
auto_classify bool

Whether to auto-classify the content

True

Returns:

Type Description
str

Semantic key where memory was stored

Source code in src/memoir/core/memory.py
async def store_memory(
    self,
    content: Any,
    namespace: str,
    metadata: dict | None = None,
    auto_classify: bool = True,
) -> str:
    """
    Store a memory with automatic semantic classification.

    Args:
        content: Memory content to store
        namespace: User namespace
        metadata: Optional metadata
        auto_classify: Whether to auto-classify the content

    Returns:
        Semantic key where memory was stored
    """
    start_time = time.time()
    self._metrics["writes"] += 1

    if auto_classify and self.classifier:
        # Use LLM classification
        classification_start = time.time()
        self._metrics["classifications"] += 1

        # Use async classification with metadata
        classification = await self.classifier.classify_async(
            str(content), metadata=metadata
        )
        # Handle different classifier result formats
        if hasattr(classification, "primary_path"):
            semantic_key = classification.primary_path  # SemanticClassifier
        else:
            semantic_key = classification.path  # IntelligentClassifier

        # Handle case where classification fails and returns None path
        if semantic_key is None:
            logger.warning("Classification returned None path, using fallback")
            semantic_key = "context.current.session.topic.main"

        classification_time = (time.time() - classification_start) * 1000
        self._metrics["classification_time_ms"].append(classification_time)

        # Apply profile updates if detected
        if (
            hasattr(classification, "profile_updates")
            and classification.profile_updates
        ):
            try:
                await self.profile_manager.apply_profile_updates(
                    classification.profile_updates, metadata, namespace
                )
                # logger.info(
                #     f"Applied {len(classification.profile_updates)} profile updates"
                # )
            except Exception as e:
                logger.error(f"Failed to apply profile updates: {e}")

        # Apply timeline events if detected
        if (
            hasattr(classification, "timeline_events")
            and classification.timeline_events
        ):
            try:
                await self.timeline_manager.apply_timeline_events(
                    classification.timeline_events, metadata, namespace=namespace
                )
                # logger.info(
                #     f"Applied {len(classification.timeline_events)} timeline events"
                # )
            except Exception as e:
                logger.error(f"Failed to apply timeline events: {e}")

        # Add classification metadata
        if metadata is None:
            metadata = {}
        metadata["classification_confidence"] = classification.confidence
        metadata["classification_reasoning"] = classification.reasoning

    else:
        # Use provided key or generate one
        semantic_key = metadata.get("key") if metadata else None
        if not semantic_key:
            semantic_key = "context.current.session.topic.main"

    # Store using the asynchronous method (proper async context)
    await self.prolly_store.store_memory_async(namespace, content, semantic_key)

    write_time = (time.time() - start_time) * 1000
    self._metrics["write_time_ms"].append(write_time)

    # logger.debug(f"Stored memory at {semantic_key} in {write_time:.2f}ms")

    return semantic_key

store_commit

store_commit(message: str = 'Batch memory operations') -> str | None

Commit all pending memory operations to the versioned store.

This is used when auto_commit=False is set on the ProllyTreeStore to batch multiple memory operations into a single commit.

Parameters:

Name Type Description Default
message str

Commit message describing the batch of operations

'Batch memory operations'

Returns:

Type Description
str | None

Commit hash if versioning is enabled, None otherwise

Source code in src/memoir/core/memory.py
def store_commit(self, message: str = "Batch memory operations") -> str | None:
    """
    Commit all pending memory operations to the versioned store.

    This is used when auto_commit=False is set on the ProllyTreeStore to batch
    multiple memory operations into a single commit.

    Args:
        message: Commit message describing the batch of operations

    Returns:
        Commit hash if versioning is enabled, None otherwise
    """
    if not self.enable_versioning:
        logger.warning("Commit requested but versioning is not enabled")
        return None

    try:
        commit_hash = self.prolly_store.commit(message)
        logger.info(f"Committed batch operations: {message}")
        return commit_hash
    except Exception as e:
        logger.error(f"Error committing batch operations: {e}")
        raise

get_memory_versions async

get_memory_versions(semantic_key: str, namespace: str, limit: int = 10) -> list[MemoryVersion]

Get version history for a memory.

Parameters:

Name Type Description Default
semantic_key str

Semantic taxonomy key

required
namespace str

User namespace

required
limit int

Maximum versions to return

10

Returns:

Type Description
list[MemoryVersion]

List of memory versions

Source code in src/memoir/core/memory.py
async def get_memory_versions(
    self, semantic_key: str, namespace: str, limit: int = 10
) -> list[MemoryVersion]:
    """
    Get version history for a memory.

    Args:
        semantic_key: Semantic taxonomy key
        namespace: User namespace
        limit: Maximum versions to return

    Returns:
        List of memory versions
    """
    if not self.enable_versioning:
        logger.warning("Versioning is not enabled")
        return []

    # Convert namespace to tuple format
    namespace_tuple = (
        tuple(namespace.split(":")) if ":" in namespace else (namespace,)
    )

    # Get commit history for this key using the new method
    commit_history = self.prolly_store.get_key_history(
        namespace_tuple, semantic_key, limit
    )

    # Get current content as fallback since historical content retrieval is not yet implemented
    current_content = self.prolly_store.get(namespace_tuple, semantic_key)

    versions = []
    for i, commit in enumerate(commit_history):
        # Try to get content at this commit (currently returns None)
        content_at_commit = self.prolly_store.get_key_at_commit(
            namespace_tuple, semantic_key, commit["id"]
        )

        # If historical content is not available, use current content for demonstration
        if content_at_commit is None and current_content:
            # For the most recent commit, use current content
            if i == 0:  # Most recent commit
                if (
                    isinstance(current_content, dict)
                    and "memories" in current_content
                ):
                    # Extract from aggregated memory
                    memories = current_content.get("memories", [])
                    if memories:
                        latest_memory = memories[-1]
                        actual_content = latest_memory.get("content", "")
                    else:
                        actual_content = ""
                else:
                    actual_content = (
                        current_content.get("content", "")
                        if isinstance(current_content, dict)
                        else current_content
                    )
            else:
                # For older commits, indicate historical content is not available
                actual_content = f"[Historical content for commit {commit['id'][:8]} not available]"
        else:
            actual_content = content_at_commit or ""

        # Convert commit info to MemoryVersion
        version = MemoryVersion(
            commit_id=commit["id"],
            timestamp=commit["timestamp"],
            content=actual_content,
            metadata={
                "author": commit.get("author", ""),
                "committer": commit.get("committer", ""),
            },
            message=commit["message"],
            author=commit.get("author", ""),
        )
        versions.append(version)

    logger.info(f"Retrieved {len(versions)} version(s) for {semantic_key}")
    return versions

time_travel async

time_travel(namespace: str, target_time: datetime | float) -> dict[str, Any]

Get all memories as they were at a specific time.

Parameters:

Name Type Description Default
namespace str

User namespace

required
target_time datetime | float

Target datetime or unix timestamp

required

Returns:

Type Description
dict[str, Any]

Dictionary of memories at that time

Source code in src/memoir/core/memory.py
async def time_travel(
    self, namespace: str, target_time: datetime | float
) -> dict[str, Any]:
    """
    Get all memories as they were at a specific time.

    Args:
        namespace: User namespace
        target_time: Target datetime or unix timestamp

    Returns:
        Dictionary of memories at that time
    """
    if isinstance(target_time, datetime):
        timestamp = target_time.timestamp()
    else:
        timestamp = target_time

    # Convert namespace to tuple format
    namespace_tuple = (
        tuple(namespace.split(":")) if ":" in namespace else (namespace,)
    )

    # For branch-based time travel, we need to use snapshots
    # Create snapshot name based on timestamp
    snapshot_name = f"snapshot_{int(timestamp)}"

    # Check if we have this snapshot
    if self.enable_versioning and hasattr(self.prolly_store.tree, "list_branches"):
        try:
            branches = self.prolly_store.tree.list_branches()
            if snapshot_name in branches:
                # Use the snapshot to get historical state
                state = self.prolly_store.get_state_at_snapshot(
                    namespace_tuple, snapshot_name
                )
                logger.info(f"Retrieved state from snapshot {snapshot_name}")
                return state
            else:
                logger.warning(
                    f"No snapshot found for timestamp {timestamp}, returning current state"
                )
        except Exception as e:
            logger.error(f"Error accessing time travel snapshot: {e}")

    # Fallback: return current state
    search_results = self.prolly_store.search(namespace_tuple, limit=1000)
    current_state = {}
    for _, key, data in search_results:
        current_state[key] = data

    return current_state

create_memory_snapshot async

create_memory_snapshot(namespace: str, snapshot_name: str | None = None) -> str

Create a snapshot of the current memory state.

Parameters:

Name Type Description Default
namespace str

User namespace

required
snapshot_name str | None

Optional name for snapshot (auto-generated if not provided)

None

Returns:

Type Description
str

Name of the created snapshot

Source code in src/memoir/core/memory.py
async def create_memory_snapshot(
    self, namespace: str, snapshot_name: str | None = None
) -> str:
    """
    Create a snapshot of the current memory state.

    Args:
        namespace: User namespace
        snapshot_name: Optional name for snapshot (auto-generated if not provided)

    Returns:
        Name of the created snapshot
    """
    if not self.enable_versioning:
        raise ValueError("Snapshots require versioning to be enabled")

    if snapshot_name is None:
        # Auto-generate snapshot name with timestamp
        snapshot_name = f"snapshot_{int(time.time())}"

    # Create the snapshot
    success = self.prolly_store.create_time_snapshot(snapshot_name)

    if success:
        logger.info(f"Created memory snapshot: {snapshot_name}")
        return snapshot_name
    else:
        raise RuntimeError(f"Failed to create snapshot: {snapshot_name}")

compare_memory_states async

compare_memory_states(namespace: str, time1: datetime | float, time2: datetime | float) -> dict[str, Any]

Compare memory states between two points in time.

Parameters:

Name Type Description Default
namespace str

User namespace

required
time1 datetime | float

First timestamp

required
time2 datetime | float

Second timestamp

required

Returns:

Type Description
dict[str, Any]

Comparison results with added/removed/changed memories

Source code in src/memoir/core/memory.py
async def compare_memory_states(
    self,
    namespace: str,
    time1: datetime | float,
    time2: datetime | float,
) -> dict[str, Any]:
    """
    Compare memory states between two points in time.

    Args:
        namespace: User namespace
        time1: First timestamp
        time2: Second timestamp

    Returns:
        Comparison results with added/removed/changed memories
    """
    if isinstance(time1, datetime):
        time1 = time1.timestamp()
    if isinstance(time2, datetime):
        time2 = time2.timestamp()

    state1 = await self.time_travel(namespace, time1)
    state2 = await self.time_travel(namespace, time2)

    keys1 = set(state1.keys())
    keys2 = set(state2.keys())

    comparison = {
        "added": {k: state2[k] for k in keys2 - keys1},
        "removed": {k: state1[k] for k in keys1 - keys2},
        "changed": {},
        "unchanged": [],
    }

    for key in keys1 & keys2:
        if state1[key] != state2[key]:
            comparison["changed"][key] = {
                "before": state1[key],
                "after": state2[key],
            }
        else:
            comparison["unchanged"].append(key)

    return comparison

branch_memories async

branch_memories(namespace: str, branch_name: str) -> str

Create a new branch of memories for experimentation.

Parameters:

Name Type Description Default
namespace str

User namespace

required
branch_name str

Name for the new branch

required

Returns:

Type Description
str

Branch identifier

Source code in src/memoir/core/memory.py
async def branch_memories(self, namespace: str, branch_name: str) -> str:
    """
    Create a new branch of memories for experimentation.

    Args:
        namespace: User namespace
        branch_name: Name for the new branch

    Returns:
        Branch identifier
    """
    if not self.enable_versioning:
        raise ValueError("Branching requires versioning to be enabled")

    # Implementation would create a new branch in ProllyTree
    branch_id = f"{namespace}:{branch_name}:{time.time()}"
    # logger.info(f"Created memory branch: {branch_id}")

    return branch_id

merge_memories async

merge_memories(namespace: str, source_branch: str, target_branch: str = 'main', strategy: str = 'ours') -> dict[str, Any]

Merge memories from one branch to another.

Parameters:

Name Type Description Default
namespace str

User namespace

required
source_branch str

Source branch to merge from

required
target_branch str

Target branch to merge into

'main'
strategy str

Merge strategy ("ours", "theirs", "union")

'ours'

Returns:

Type Description
dict[str, Any]

Merge results with conflicts if any

Source code in src/memoir/core/memory.py
async def merge_memories(
    self,
    namespace: str,
    source_branch: str,
    target_branch: str = "main",
    strategy: str = "ours",
) -> dict[str, Any]:
    """
    Merge memories from one branch to another.

    Args:
        namespace: User namespace
        source_branch: Source branch to merge from
        target_branch: Target branch to merge into
        strategy: Merge strategy ("ours", "theirs", "union")

    Returns:
        Merge results with conflicts if any
    """
    if not self.enable_versioning:
        raise ValueError("Merging requires versioning to be enabled")

    # Implementation would handle branch merging
    merge_result = {"merged": 0, "conflicts": [], "strategy": strategy}

    # logger.info(f"Merged {source_branch} into {target_branch}")

    return merge_result

get_performance_metrics

get_performance_metrics() -> dict[str, Any]

Get performance metrics for the memory system.

Source code in src/memoir/core/memory.py
def get_performance_metrics(self) -> dict[str, Any]:
    """Get performance metrics for the memory system."""
    metrics = self._metrics.copy()

    # Calculate averages
    if metrics["search_time_ms"]:
        metrics["avg_search_time_ms"] = sum(metrics["search_time_ms"]) / len(
            metrics["search_time_ms"]
        )
        metrics["p95_search_time_ms"] = (
            sorted(metrics["search_time_ms"])[
                int(len(metrics["search_time_ms"]) * 0.95)
            ]
            if len(metrics["search_time_ms"]) > 1
            else metrics["search_time_ms"][0]
        )

    if metrics["write_time_ms"]:
        metrics["avg_write_time_ms"] = sum(metrics["write_time_ms"]) / len(
            metrics["write_time_ms"]
        )

    if metrics["classification_time_ms"]:
        metrics["avg_classification_time_ms"] = sum(
            metrics["classification_time_ms"]
        ) / len(metrics["classification_time_ms"])

    # Add component statistics
    try:
        metrics["store"] = self.prolly_store.get_statistics()
    except Exception as e:
        logger.warning(f"Failed to get store statistics: {e}")
        metrics["store"] = {}

    # Add classifier statistics if available
    if hasattr(self.classifier, "get_statistics"):
        try:
            metrics["classifier"] = self.classifier.get_statistics()
        except Exception as e:
            logger.warning(f"Failed to get classifier statistics: {e}")
            metrics["classifier"] = {}

    # Add search engine statistics if available
    if hasattr(self.search_engine, "get_statistics"):
        try:
            metrics["search_engine"] = self.search_engine.get_statistics()
        except Exception as e:
            logger.warning(f"Failed to get search engine statistics: {e}")
            metrics["search_engine"] = {}

    return metrics

optimize_memory_layout async

optimize_memory_layout(namespace: str) -> dict[str, Any]

Optimize memory layout for better performance. Reorganizes memories based on access patterns.

Parameters:

Name Type Description Default
namespace str

User namespace to optimize

required

Returns:

Type Description
dict[str, Any]

Optimization results

Source code in src/memoir/core/memory.py
async def optimize_memory_layout(self, namespace: str) -> dict[str, Any]:
    """
    Optimize memory layout for better performance.
    Reorganizes memories based on access patterns.

    Args:
        namespace: User namespace to optimize

    Returns:
        Optimization results
    """
    start_time = time.time()

    # Get all memories
    namespace_tuple = (
        tuple(namespace.split(":")) if ":" in namespace else (namespace,)
    )
    search_results = self.prolly_store.search(namespace_tuple, limit=1000)
    all_keys = [key for _, key, _ in search_results]

    # Analyze access patterns (would need access logs in production)
    # For now, we'll just report current organization

    category_counts = {}
    depth_counts = {}

    for key in all_keys:
        parts = key.split(".")
        if parts:
            category = parts[0]
            category_counts[category] = category_counts.get(category, 0) + 1

            depth = len(parts)
            depth_counts[depth] = depth_counts.get(depth, 0) + 1

    optimization_time = time.time() - start_time

    return {
        "total_memories": len(all_keys),
        "categories": category_counts,
        "depth_distribution": depth_counts,
        "optimization_time_seconds": optimization_time,
        "recommendations": [
            "Consider moving frequently accessed memories to shallower paths",
            "Group related memories under common prefixes for faster retrieval",
            "Archive old memories to separate namespace for better performance",
        ],
    }

export_memories async

export_memories(namespace: str, output_path: str, format: str = 'json') -> None

Export memories to file.

Parameters:

Name Type Description Default
namespace str

Namespace to export

required
output_path str

Output file path

required
format str

Export format (json, csv, markdown)

'json'
Source code in src/memoir/core/memory.py
async def export_memories(
    self, namespace: str, output_path: str, format: str = "json"
) -> None:
    """
    Export memories to file.

    Args:
        namespace: Namespace to export
        output_path: Output file path
        format: Export format (json, csv, markdown)
    """
    self.prolly_store.export_namespace(namespace, output_path)

import_memories async

import_memories(input_path: str, namespace: str | None = None) -> int

Import memories from file.

Parameters:

Name Type Description Default
input_path str

Input file path

required
namespace str | None

Override namespace (uses file namespace if None)

None

Returns:

Type Description
int

Number of memories imported

Source code in src/memoir/core/memory.py
async def import_memories(
    self, input_path: str, namespace: str | None = None
) -> int:
    """
    Import memories from file.

    Args:
        input_path: Input file path
        namespace: Override namespace (uses file namespace if None)

    Returns:
        Number of memories imported
    """
    logger.warning(
        "Import functionality not yet implemented in ProllyTreeStore adapter"
    )

    # Parse file to get count and simulate import
    with open(input_path) as f:
        data = json.load(f)
        memories = data.get("memories", {})

        # For demonstration, we could import the memories one by one
        # but for now just return the count
        count = len(memories)

    # logger.info(f"Would import {count} memories from {input_path}")
    return count