import { getDb } from '../config/database.js'; import { log, fmtDuration } from '../config/logger.js'; import { isFederationEnabled, getFederationDomain, signPayload, discoverInstance, parseAddress, } from '../config/federation.js'; const SYNC_INTERVAL_MS = 60_000; // 1 minute let syncTimer = null; /** * Periodic federation sync job. * Groups federated rooms by origin domain, then batch-queries each origin * for current room settings. Updates local records if settings changed or * if the room was deleted on the origin. */ async function runSync() { if (!isFederationEnabled()) return; const syncStart = Date.now(); let totalUpdated = 0; let totalDeleted = 0; let totalRooms = 0; try { const db = getDb(); // Fetch all non-deleted federated rooms const rooms = await db.all( 'SELECT id, meet_id, from_user, room_name, max_participants, allow_recording FROM federated_rooms WHERE deleted = 0' ); if (rooms.length === 0) return; totalRooms = rooms.length; // Group by origin domain const byDomain = new Map(); for (const room of rooms) { if (!room.meet_id) continue; // no room UID, can't sync const { domain } = parseAddress(room.from_user); if (!domain) continue; if (!byDomain.has(domain)) byDomain.set(domain, []); byDomain.get(domain).push(room); } // Query each origin domain for (const [domain, domainRooms] of byDomain) { try { const roomUids = [...new Set(domainRooms.map(r => r.meet_id))]; const payload = { room_uids: roomUids, timestamp: new Date().toISOString(), }; const signature = signPayload(payload); const { baseUrl: remoteApi } = await discoverInstance(domain); const response = await fetch(`${remoteApi}/room-sync`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Federation-Signature': signature, 'X-Federation-Origin': getFederationDomain(), }, body: JSON.stringify(payload), signal: AbortSignal.timeout(15_000), }); if (!response.ok) { log.fedSync.warn(`${domain} responded with status ${response.status}`); continue; } const data = await response.json(); const remoteRooms = data.rooms || {}; // Update local records for (const localRoom of domainRooms) { const remote = remoteRooms[localRoom.meet_id]; if (!remote) continue; // UID not in response, skip if (remote.deleted) { // Room was deleted on origin await db.run( 'UPDATE federated_rooms SET deleted = 1, updated_at = CURRENT_TIMESTAMP WHERE id = ?', [localRoom.id] ); totalDeleted++; log.fedSync.info(`Room ${localRoom.meet_id} deleted on origin ${domain}`); } else { // Check if settings changed const changed = localRoom.room_name !== remote.room_name || (localRoom.max_participants ?? 0) !== (remote.max_participants ?? 0) || (localRoom.allow_recording ?? 1) !== (remote.allow_recording ?? 1); if (changed) { await db.run( `UPDATE federated_rooms SET room_name = ?, max_participants = ?, allow_recording = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?`, [remote.room_name, remote.max_participants ?? 0, remote.allow_recording ?? 1, localRoom.id] ); totalUpdated++; log.fedSync.info(`Room ${localRoom.meet_id} settings updated from ${domain}`); } } } } catch (err) { log.fedSync.warn(`Sync with ${domain} failed: ${err.message}`); } } // Summary log (only if something happened) if (totalUpdated > 0 || totalDeleted > 0) { log.fedSync.info( `Sync complete: ${totalRooms} rooms, ${totalUpdated} updated, ${totalDeleted} deleted (${fmtDuration(Date.now() - syncStart)})` ); } } catch (err) { log.fedSync.error(`Sync job failed: ${err.message}`); } } /** * Start the periodic federation sync job. */ export function startFederationSync() { if (!isFederationEnabled()) { log.fedSync.info('Disabled (federation not configured)'); return; } // Run first sync after a short delay to let the server fully start setTimeout(() => { runSync(); syncTimer = setInterval(runSync, SYNC_INTERVAL_MS); log.fedSync.info('Started (interval: 60s)'); }, 5_000); } /** * Stop the periodic federation sync job. */ export function stopFederationSync() { if (syncTimer) { clearInterval(syncTimer); syncTimer = null; } }