import pytest
import threading
from unittest.mock import MagicMock, patch
from app.core.grpc.core.journal import TaskJournal
@pytest.fixture
def mock_settings():
with patch('app.core.grpc.core.journal.settings') as mock:
mock.STREAM_HEAD_CHARS = 10
mock.STREAM_TAIL_CHARS = 10
mock.THOUGHT_HEAD_COUNT = 2
mock.THOUGHT_TAIL_COUNT = 2
yield mock
@pytest.fixture
def journal(mock_settings):
# Prevent cleanup thread from starting
with patch('threading.Thread'):
return TaskJournal()
def test_stream_limits_from_config(journal):
assert journal.STREAM_HEAD_CHARS == 10
assert journal.STREAM_TAIL_CHARS == 10
assert journal.STREAM_MAX_CHARS == 20
def test_trim_stream(journal):
journal.STREAM_HEAD_CHARS = 5
journal.STREAM_TAIL_CHARS = 5
journal.STREAM_MAX_CHARS = 10
# Within limit
buf = "abc"
chunk = "def"
result = journal._trim_stream(buf, chunk)
assert result == "abcdef"
# Over limit
buf = "123456"
chunk = "789012" # Total 12
result = journal._trim_stream(buf, chunk)
# Head 5: "12345"
# Tail 5: "89012"
# Omitted: 12 - 5 - 5 = 2
assert result.startswith("12345")
assert result.endswith("89012")
assert "2 bytes omitted" in result
def test_sharded_locking_structure(journal):
assert journal.NUM_SHARDS == 16
assert len(journal.shards) == 16
for shard in journal.shards:
assert "tasks" in shard
assert "lock" in shard
assert hasattr(shard["lock"], "acquire")
def test_get_shard(journal):
shard1 = journal._get_shard("task1")
assert shard1 in journal.shards
def test_register_acquires_lock(journal):
shard = journal._get_shard("task1")
mock_lock = MagicMock()
shard["lock"] = mock_lock
journal.register("task1")
mock_lock.__enter__.assert_called_once()
mock_lock.__exit__.assert_called_once()