Skip to content

Commit

Permalink
Merge pull request #146 from chonkie-ai/add-output-text
Browse files Browse the repository at this point in the history
[FEAT] Support `return_type` as `texts` for direct text handling
  • Loading branch information
bhavnicksm authored Jan 8, 2025
2 parents 4966d98 + 6ad760b commit e062e10
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 97 deletions.
51 changes: 35 additions & 16 deletions src/chonkie/chunker/recursive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from bisect import bisect_left
from functools import lru_cache
from itertools import accumulate
from typing import Any, List, Optional, Union
from typing import Any, List, Optional, Union, Literal

from chonkie.chunker.base import BaseChunker
from chonkie.types import Chunk, RecursiveChunk, RecursiveLevel, RecursiveRules
Expand All @@ -21,7 +21,8 @@ def __init__(self,
tokenizer: Union[str, Any] = "gpt2",
chunk_size: int = 512,
rules: RecursiveRules = RecursiveRules(),
min_characters_per_chunk: int = 12
min_characters_per_chunk: int = 12,
return_type: Literal["chunks", "texts"] = "chunks"
) -> None:
"""Initialize the recursive chunker.
Expand All @@ -30,9 +31,22 @@ def __init__(self,
chunk_size: The size of the chunks to return.
rules: The rules to use for chunking.
min_characters_per_chunk: The minimum number of characters per chunk.
return_type: Whether to return chunks or texts.
Raises:
ValueError: If parameters are invalid.
"""
super().__init__(tokenizer)

if chunk_size <= 0:
raise ValueError("chunk_size must be positive")
if min_characters_per_chunk < 1:
raise ValueError("min_characters_per_chunk must be at least 1")
if return_type not in ["chunks", "texts"]:
raise ValueError("Invalid return_type. Must be either 'chunks' or 'texts'.")

self.return_type = return_type
self.rules = rules
self.chunk_size = chunk_size
self.min_characters_per_chunk = min_characters_per_chunk
Expand Down Expand Up @@ -194,7 +208,10 @@ def _recursive_chunk(self,

# If level is out of bounds, return the text as a chunk
if level >= len(self.rules):
return [self._create_chunk(text, self._get_token_count(text), level, full_text)]
if self.return_type == "chunks":
return [self._create_chunk(text, self._get_token_count(text), level, full_text)]
elif self.return_type == "texts":
return [text]

# If full_text is not provided, use the text
if full_text is None:
Expand Down Expand Up @@ -227,30 +244,32 @@ def _recursive_chunk(self,
if token_count > self.chunk_size:
chunks.extend(self._recursive_chunk(split, level + 1, full_text))
else:
if rule.delimiters is None and not rule.whitespace:
# NOTE: This is a hack to get the decoded text, since merged = splits = token_splits
# And we don't want to encode/decode the text again, that would be inefficient
decoded_text = "".join(merged)
chunks.append(self._create_chunk(split, token_count, level, decoded_text))
else:
chunks.append(self._create_chunk(split, token_count, level, full_text))

if self.return_type == "chunks":
if rule.delimiters is None and not rule.whitespace:
# NOTE: This is a hack to get the decoded text, since merged = splits = token_splits
# And we don't want to encode/decode the text again, that would be inefficient
decoded_text = "".join(merged)
chunks.append(self._create_chunk(split, token_count, level, decoded_text))
else:
chunks.append(self._create_chunk(split, token_count, level, full_text))
elif self.return_type == "texts":
chunks.append(split)
return chunks


def chunk(self, text: str) -> List[Chunk]:
"""Chunk the text."""
return self._recursive_chunk(text, level=0, full_text=text)


def __repr__(self) -> str:
"""Get a string representation of the recursive chunker."""
return (f"RecursiveChunker(rules={self.rules}, "
f"chunk_size={self.chunk_size}, "
f"min_characters_per_chunk={self.min_characters_per_chunk})")
f"min_characters_per_chunk={self.min_characters_per_chunk}, "
f"return_type={self.return_type})")

def __str__(self) -> str:
"""Get a string representation of the recursive chunker."""
return (f"RecursiveChunker(rules={self.rules}, "
f"chunk_size={self.chunk_size}, "
f"min_characters_per_chunk={self.min_characters_per_chunk})")
f"min_characters_per_chunk={self.min_characters_per_chunk}, "
f"return_type={self.return_type})")
21 changes: 13 additions & 8 deletions src/chonkie/chunker/sdpm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Semantic Double Pass Merge chunking using sentence embeddings."""

from typing import Any, List, Union
from typing import Any, List, Union, Literal

from chonkie.types import SemanticChunk, Sentence

Expand All @@ -17,15 +17,17 @@ class SDPMChunker(SemanticChunker):
Args:
embedding_model: Sentence embedding model to use
similarity_threshold: Minimum similarity score to consider sentences similar
similarity_percentile: Minimum similarity percentile to consider sentences similar
mode: Mode for grouping sentences, either "cumulative" or "window"
threshold: Threshold for semantic similarity (0-1) or percentile (1-100), defaults to "auto"
chunk_size: Maximum token count for a chunk
initial_sentences: Number of sentences to consider for initial grouping
skip_window: Number of chunks to skip when looking for similarities
similarity_window: Number of sentences to consider for similarity threshold calculation
min_sentences: Minimum number of sentences per chunk
min_chunk_size: Minimum number of tokens per sentence
Methods:
chunk: Split text into chunks using the SDPM approach.
min_characters_per_sentence: Minimum number of characters per sentence
threshold_step: Step size for similarity threshold calculation
delim: Delimiters to split sentences on
skip_window: Number of chunks to skip when looking for similarities
return_type: Whether to return chunks or texts
"""

Expand All @@ -42,6 +44,7 @@ def __init__(
threshold_step: float = 0.01,
delim: Union[str, List[str]] = [".", "!", "?", "\n"],
skip_window: int = 1,
return_type: Literal["chunks", "texts"] = "chunks",
**kwargs
):
"""Initialize the SDPMChunker.
Expand All @@ -58,6 +61,7 @@ def __init__(
threshold_step: Step size for similarity threshold calculation
delim: Delimiters to split sentences on
skip_window: Number of chunks to skip when looking for similarities
return_type: Whether to return chunks or texts
**kwargs: Additional keyword arguments
"""
Expand All @@ -72,6 +76,7 @@ def __init__(
min_characters_per_sentence=min_characters_per_sentence,
threshold_step=threshold_step,
delim=delim,
return_type=return_type,
**kwargs
)
self.skip_window = skip_window
Expand Down
41 changes: 26 additions & 15 deletions src/chonkie/chunker/semantic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Semantic chunking using sentence embeddings."""

import warnings
from typing import List, Union
from typing import List, Union, Literal

import numpy as np

Expand All @@ -24,7 +24,10 @@ class SemanticChunker(BaseChunker):
min_chunk_size: Minimum number of tokens per sentence (defaults to 2)
threshold_step: Step size for similarity threshold calculation
delim: Delimiters to split sentences on
return_type: Whether to return chunks or texts
Raises:
ValueError: If parameters are invalid
"""

def __init__(
Expand All @@ -39,6 +42,7 @@ def __init__(
min_characters_per_sentence: int = 12,
threshold_step: float = 0.01,
delim: Union[str, List[str]] = [".", "!", "?", "\n"],
return_type: Literal["chunks", "texts"] = "chunks",
**kwargs
):
"""Initialize the SemanticChunker.
Expand All @@ -56,6 +60,7 @@ def __init__(
min_chunk_size: Minimum number of tokens per chunk (and sentence, defaults to 2)
threshold_step: Step size for similarity threshold calculation
delim: Delimiters to split sentences on
return_type: Whether to return chunks or texts
**kwargs: Additional keyword arguments
Raises:
Expand Down Expand Up @@ -85,6 +90,8 @@ def __init__(
raise ValueError("threshold (float) must be between 0 and 1")
elif type(threshold) == int and (threshold < 1 or threshold > 100):
raise ValueError("threshold (int) must be between 1 and 100")
if return_type not in ["chunks", "texts"]:
raise ValueError("Invalid return_type. Must be either 'chunks' or 'texts'.")

self.mode = mode
self.chunk_size = chunk_size
Expand All @@ -96,6 +103,7 @@ def __init__(
self.threshold_step = threshold_step
self.delim = delim
self.sep = "🦛"
self.return_type = return_type

if isinstance(threshold, float):
self.similarity_threshold = threshold
Expand Down Expand Up @@ -453,24 +461,27 @@ def _group_sentences(self, sentences: List[Sentence]) -> List[List[Sentence]]:
return self._group_sentences_window(sentences)

def _create_chunk(
self, sentences: List[Sentence], similarity_scores: List[float] = None
self, sentences: List[Sentence]
) -> SemanticChunk:
"""Create a chunk from a list of sentences."""
if not sentences:
raise ValueError("Cannot create chunk from empty sentence list")

# Compute chunk text and token count from sentences
text = "".join(sent.text for sent in sentences)
token_count = sum(sent.token_count for sent in sentences)

return SemanticChunk(
text=text,
start_index=sentences[0].start_index,
end_index=sentences[-1].end_index,
token_count=token_count,
sentences=sentences,
)

if self.return_type == "chunks":
# Compute chunk text and token count from sentences
text = "".join(sent.text for sent in sentences)
token_count = sum(sent.token_count for sent in sentences)
return SemanticChunk(
text=text,
start_index=sentences[0].start_index,
end_index=sentences[-1].end_index,
token_count=token_count,
sentences=sentences,
)
elif self.return_type == "texts":
return "".join(sent.text for sent in sentences)
else:
raise ValueError("Invalid return_type. Must be either 'chunks' or 'texts'.")

def _split_chunks(
self, sentence_groups: List[List[Sentence]]
) -> List[SemanticChunk]:
Expand Down
28 changes: 18 additions & 10 deletions src/chonkie/chunker/sentence.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Sentence chunker."""
from bisect import bisect_left
from itertools import accumulate
from typing import Any, List, Union
from typing import Any, List, Union, Literal

from chonkie.types import Chunk, Sentence, SentenceChunk

Expand Down Expand Up @@ -33,7 +33,7 @@ def __init__(
min_characters_per_sentence: int = 12,
approximate: bool = True,
delim: Union[str, List[str]] = [".", "!", "?", "\n"],
**kwargs
return_type: Literal["chunks", "texts"] = "chunks"
):
"""Initialize the SentenceChunker with configuration parameters.
Expand All @@ -48,6 +48,8 @@ def __init__(
min_characters_per_sentence: Minimum number of characters per sentence
approximate: Whether to use approximate token counting (defaults to True)
delim: Delimiters to split sentences on
return_type: Whether to return chunks or texts
Raises:
ValueError: If parameters are invalid
Expand All @@ -62,6 +64,8 @@ def __init__(
raise ValueError("min_sentences_per_chunk must be at least 1")
if min_characters_per_sentence < 1:
raise ValueError("min_characters_per_sentence must be at least 1")
if return_type not in ["chunks", "texts"]:
raise ValueError("Invalid return_type. Must be either 'chunks' or 'texts'.")

self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
Expand All @@ -70,6 +74,7 @@ def __init__(
self.approximate = approximate
self.delim = delim
self.sep = "🦛"
self.return_type = return_type

# TODO: This is a older method of sentence splitting that uses Regex
# but since Regex in python via re is super slooooow we use a different method
Expand Down Expand Up @@ -297,13 +302,16 @@ def _create_chunk(self, sentences: List[Sentence], token_count: int) -> Chunk:
"""
chunk_text = "".join([sentence.text for sentence in sentences])
return SentenceChunk(
text=chunk_text,
start_index=sentences[0].start_index,
end_index=sentences[-1].end_index,
token_count=token_count,
sentences=sentences,
)
if self.return_type == "texts":
return chunk_text
else:
return SentenceChunk(
text=chunk_text,
start_index=sentences[0].start_index,
end_index=sentences[-1].end_index,
token_count=token_count,
sentences=sentences,
)

def chunk(self, text: str) -> List[Chunk]:
"""Split text into overlapping chunks based on sentences while respecting token limits.
Expand Down Expand Up @@ -372,7 +380,7 @@ def chunk(self, text: str) -> List[Chunk]:
chunk_sentences = sentences[pos:split_idx]
chunk_text = "".join(s.text for s in chunk_sentences)
actual = len(self._encode(chunk_text))

chunks.append(self._create_chunk(chunk_sentences, actual))

# Calculate next position with overlap
Expand Down
Loading

0 comments on commit e062e10

Please sign in to comment.