SharingManager
Файл:
backend/src/sharing/manager.rs
Оркестрация P2P sharing и синхронизации между устройствами. Управляет фоновым libp2p-сервисом: обнаружение пиров, публикация и приватное расшаривание пространств, файловая синхронизация по чанкам, state sync между устройствами одной identity. Подробнее о протоколе: P2P Protocol: Sharing.
Структура
| Поле | Тип | Описание |
|---|---|---|
identity_manager | Arc<Mutex<IdentityManager>> | Менеджер идентичности |
device_manager | Arc<Mutex<DeviceManager>> | Менеджер устройств |
space_manager | Arc<Mutex<SpaceManager>> | Менеджер пространств |
vault_manager | Arc<Mutex<VaultManager>> | Менеджер хранилища |
cloud_manager | Arc<Mutex<CloudManager>> | Менеджер облаков |
command_tx | Option<Sender<SharingCommand>> | Канал команд к background service |
service_handle | Option<JoinHandle<()>> | Handle фоновой задачи |
known_peers | Arc<Mutex<HashMap<String, KnownPeerInfo>>> | Обнаруженные пиры |
Методы
| Метод | Сигнатура | Описание |
|---|---|---|
| Lifecycle | ||
new | fn(im, dm, sm, vm, cm) -> Self | Создать менеджер |
start_sharing | fn(&mut self, event_bus) -> Result<()> | Запустить P2P сеть |
stop_sharing | fn(&mut self) -> Result<()> | Остановить P2P сеть |
is_running | fn(&self) -> bool | Работает ли сервис |
| Peers | ||
get_known_peers | fn(&self) -> Vec<KnownPeerInfo> | Список известных пиров |
| Sharing | ||
publish_space | fn(&self, space_id) -> Result<()> | Broadcast пространства |
share_space | fn(&self, space_id, target_id) -> Result<()> | Приватное расшаривание |
| Sync | ||
request_file_sync | fn(&self, space_id, file_hash) -> Result<()> | Запросить файл |
request_space_sync | fn(&self, space_id) -> Result<()> | Запросить все файлы |
broadcast_state_update | fn(&self, update) -> Result<()> | Синхронизировать состояние |
| Remote | ||
request_directory_list | fn(&self, device_id, path) -> Result<Receiver> | Листинг удалённой директории |
Зависимости
IdentityManager— identity данные для подписи и шифрованияDeviceManager— данные устройства, keypair для libp2pSpaceManager— CRUD пространств, файлов, tombstonesVaultManager— signing key для шифрованного расшариванияCloudManager— credentials для state sync- libp2p — Noise + Yamux transport, mDNS discovery, gossipsub
start_sharing
fn start_sharing(&mut self, event_bus: EventBus) -> Result<()>
Запускает фоновый сервис P2P sharing. Создаёт libp2p Swarm с mDNS, Noise, Yamux и gossipsub.
Логика
- Получает device keypair из DeviceManager
- Создаёт SharingService (libp2p Swarm)
- Спавнит background tokio task (
run_sharing_service) - Сохраняет command channel и handle
Используется в:
start_sharing_svc
publish_space
fn publish_space(&self, space_id: &str) -> Result<()>
Публикует пространство всем подключённым пирам (plaintext broadcast). На уровне сервиса (publish_space_svc) проверяется владение пространством через IdentityProof.
Логика
- Сервис: проверяет
IdentityProof+space.identity_id == proof.identity_id() - Отправляет команду
SharingCommand::Publish { space_id }через channel - Background service: загружает space + files из SpaceManager
- Формирует SpacePayload
- Отправляет SharingMessage::Publish всем известным пирам
Используется в:
publish_space_svc
share_space
fn share_space(&self, space_id: &str, target_identity_id: &str) -> Result<()>
Приватно расшаривает пространство конкретной identity. Данные шифруются SealedEnvelope (hybrid encryption поверх primitives StrongholdManager::seal_for_recipients / unseal_envelope). На уровне сервиса (share_space_svc) проверяется владение через IdentityProof.
Источник pubkey'ев — таблица
peers(хранитdevice_box_pubkeyкаждого пира изSharingMessage::Announce).Sharefan-out'ится на все известные device-box-pubkey'и target identity. Пиры без объявленногоdevice_box_pubkey(легаси) silently пропускаются с warning'ом в логе.
Логика
- Сервис: проверяет
IdentityProof+space.identity_id == proof.identity_id() - Отправляет команду через channel
- Background service:
- Загружает space + files
- Собирает все
peer.device_box_pubkeyпировtarget_identity_id - Если коллекция пустая — log warning «No device_box_pubkeys for target identity» и не шлёт
- Генерирует random 32-байтный
content_key - Fan-out wrap content_key для каждого recipient device-box-pubkey через
seal_to_recipients(WrappedSpaceKeyper recipient) - AES-256-GCM(content_key, payload, AAD) — content layer
- AAD:
"kontinuum sharing; target={identity_id}"(HKDF info:b"kontinuum sharing key-wrap") - Шлёт один и тот же SealedEnvelope каждому из target_peers (each device receives its own wrapped key inside)
Используется в:
share_space_svcReceiver-side:SharingServiceEvent::SpaceShared→stronghold.unseal_envelope(sealed, aad, hkdf_info).
request_file_sync
fn request_file_sync(&self, space_id: &str, file_hash: &str) -> Result<()>
Запрашивает конкретный файл у пира. Файл передаётся чанками по 32 KB с верификацией blake3 hash. Поддерживает resume через .part файлы.
Логика
- Проверяет наличие
.partфайла — если есть, отправляетresume_offsetв FileRequest - Отправляет FileRequest всем пирам, у которых есть этот файл
- Remote peer читает файл, пропускает
resume_offsetбайт, разбивает остаток на чанки (32 KB) - Отправляет каждый чанк как FileChunk
- Локально: записывает чанки в staging
.partфайл сfsyncпосле каждой записи - При получении последнего чанка: верифицирует blake3 hash
- Эмитит события
file_sync_progressиfile_sync_complete
Используется в:
request_file_sync_svc
broadcast_state_update
fn broadcast_state_update(&self, update: SyncUpdate) -> Result<()>
Рассылает обновление состояния всем пирам с той же identity. Используется для синхронизации устройств, пространств, облаков.
Типы обновлений
DeviceAdded/DeviceUpdated/DeviceRemovedSpaceAdded/SpaceUpdated/SpaceRemovedCloudAdded/CloudUpdated/CloudRemoved
Используется в: внутренне при изменениях состояния
request_directory_list
fn request_directory_list(&self, device_id: &str, path: &str) -> Result<Receiver<Result<Vec<RemoteFileEntry>>>>
Запрашивает листинг директории на удалённом устройстве. Возвращает oneshot Receiver для асинхронного ожидания ответа.
Безопасность
Remote peer выполняет проверки:
- Путь должен быть абсолютным
- Canonicalize для предотвращения traversal
- Ограничение: только home dir, DB_PATH, Android storage
Используется в:
request_remote_directory_svc
Background Service: State Sync
Фоновый сервис автоматически синхронизирует состояние между устройствами одной identity.
Механизм
- При обнаружении пира с той же identity — отправляет
SyncDigest(hash текущего состояния) - Если digest отличается — запрашивает/отправляет
SyncSnapshot(полное состояние) - Snapshot включает: devices, spaces, clouds, tombstones
- Merge: INSERT OR REPLACE для новых/обновлённых записей, tombstones для удалений
Background Service: Vault Integration
Background Operation Mode
При start_sharing() режим фоновых операций определяет, как сервис получает signing key:
| Режим | Получение ключа | Поведение при lock |
|---|---|---|
Continue | VaultManager.issue_scoped_key() → ScopedKey | Ключ остаётся рабочим, операции продолжаются |
Pause | IdentityManager.get_cached_signing_key() при каждой операции | При lock кэш очищается, операции ждут unlock |
При расшифровке полученного пространства (SpaceShared):
- Режим Continue → используется
ScopedKey.secret() - Режим Pause → используется
IdentityManager.get_cached_signing_key() - Если ни одного ключа нет → fallback на прямой
VaultManager.load_secret()
.part File Cleanup
При запуске run_sharing_service():
- Удаляет протухшие
.partфайлы (старше 7 дней) - Обнаруживает незавершённые трансферы для потенциального resume
PeerTracker — State Machine управления пирами
Файл:
backend/src/sharing/peer_tracker.rs
PeerTracker — автомат состояний для отслеживания жизненного цикла каждого обнаруженного пира. Заменяет ad-hoc логику с timestamp проверками.
Состояния
| Состояние | Описание |
|---|---|
Online | Пир подключён, понги приходят |
Reconnecting | TCP-соединение закрыто, попытка переподключения |
Offline | Переподключение не удалось, пир недоступен |
Переходы
Ключевые методы
| Метод | Триггер | Действия |
|---|---|---|
on_discovered | Announce получен | New → EmitOnline; Offline → EmitOnline; Online → noop |
on_pong | Pong получен | Reconnecting → EmitOnline; Online → UpdateStorage |
on_disconnected | ConnectionClosed (n_est == 0) | Online → Reconnecting + Reconnect |
on_reconnect_failed | OutgoingConnectionError | Reconnecting → Offline + EmitOffline |
on_expired | mDNS expired | Remove peer + EmitOffline |
check_timeouts | Каждые 5 секунд | Reconnecting > 5s → Offline; Online > 30s → Offline |
Таймауты
| Константа | Значение | Назначение |
|---|---|---|
RECONNECT_TIMEOUT_SECS | 5 сек | Время ожидания reconnect после ConnectionClosed |
STALE_TIMEOUT_SECS | 30 сек | Таймаут без pong для Online пиров (half-open TCP fallback) |
PeerAction
Методы PeerTracker возвращают Vec<PeerAction> — команды для вызывающего кода:
EmitOnline— эмитировать SSE-событиеpeer_onlineEmitOffline— эмитировать SSE-событиеpeer_offlineReconnect— вызватьservice.reconnect_peer(peer_id)UpdateStorage— обновить хранилище устройства в БД
Background Service: Heartbeat
Каждые 30 секунд:
- Отправляет Ping всем известным пирам (с количеством устройств)
- При получении Pong — PeerTracker обновляет last_seen, возвращает действия
check_timeouts()вызывается каждые 5 секунд — выявляет stale/reconnecting пиров- При несовпадении device count — запускает DeviceSync