From 954b78e80d51ea0d00466cb3ebf4a5dba5c1ee3a Mon Sep 17 00:00:00 2001 From: Konstantin Fastov Date: Thu, 4 Jun 2026 17:35:56 +0300 Subject: [PATCH] refactor: simplify the operations/command layer (#49, #43) Internal cleanup of the op/command layer the warm-backend work produced; no change to user-visible output, errors, or exit codes. - runOperation invokes the op directly and only starts a server (then retries once) when the call fails because the server is unreachable. invoke now raises ServerUnavailableError for a missing control.json or a refused loopback connection, distinct from an op that ran and failed (a 4xx/5xx/send error) or a request timeout, neither of which spawns a server. The warm path drops from two loopback round-trips to one. - Extract one source-resolution helper for the single-result message ops (messagesGet/messagesContext): live / both / archive-then-live-fallback, whole result from a single source. Extract one result-selection helper for the list/search ops. Behavior identical. - Forward the ~16 one-line passthrough ops through a small declaration table; only composite ops (message ops, sends, anything reshaping a result or calling more than one method) stay hand-written. - Build the send args shared by send text/photo/file once instead of repeating the per-handler flag map, keeping the same validation order. - Tests: invoke-first then start-on-connection-failure (and that an op error does not spawn a server); the shared source resolution across all four source modes (no source mixing); a generic passthrough forwarding. --- cli.js | 92 +++++----- core/command-context.js | 28 +++- core/control-client.js | 52 +++++- core/operations.js | 304 ++++++++++++++-------------------- tests/backfill-rename.test.js | 11 +- tests/control-client.test.js | 31 +++- tests/operations-seam.test.js | 130 ++++++++++++++- tests/send-timeout.test.js | 8 + 8 files changed, 405 insertions(+), 251 deletions(-) diff --git a/cli.js b/cli.js index 6d5c210..dc15b9c 100755 --- a/cli.js +++ b/cli.js @@ -2601,6 +2601,34 @@ function resolveSendAliases(options) { if (!options.message && options.text) options.message = options.text; } +// Parse and build the send args shared by every send command (delivery target, +// threading, parse mode, flood/timeout knobs), in the same validation order +// across commands. The schedule is parsed last by the caller so a media command +// can run its --parse-mode/--caption check first. Per-command payload fields +// (text, photo + caption, file + filename, ...) are added by the caller. Returns +// the parsed retries alongside the args so the handler can classify a later error. +function buildCommonSendArgs(options, timeoutMs) { + // Parse in the same order the per-command handlers always have, so a malformed + // flag reports the same first error regardless of command. + const parseMode = parseSendParseMode(options.parseMode); + const retries = parseNonNegativeInt(options.retries, '--retries') ?? DEFAULT_SEND_RETRIES; + const retryBackoff = parseRetryBackoff(options.retryBackoff); + const topicId = parsePositiveInt(options.topic, '--topic'); + const replyToMessageId = parsePositiveInt(options.replyTo, '--reply-to'); + const args = { + chat: options.to, + topicId, + replyToMessageId, + parseMode, + silent: options.silent || false, + noforwards: options.forwards === false, + retries, + retryBackoff, + timeoutMs, + }; + return { args, retries }; +} + function normalizeSendCommandError(error, { method, retries, attempt = 1 } = {}) { if (error instanceof SendCommandError) return error; if (error instanceof TypeError || error instanceof ReferenceError || error instanceof SyntaxError || error instanceof RangeError) return error; @@ -2655,30 +2683,18 @@ async function runSendText(globalFlags, options = {}) { try { if (!options.to) throw new Error('--to is required'); if (!options.message) throw new Error('--message is required'); - const parseMode = parseSendParseMode(options.parseMode); - retries = parseNonNegativeInt(options.retries, '--retries') ?? DEFAULT_SEND_RETRIES; - const retryBackoff = parseRetryBackoff(options.retryBackoff); - const topicId = parsePositiveInt(options.topic, '--topic'); - const replyToMessageId = parsePositiveInt(options.replyTo, '--reply-to'); - const scheduleDate = parseScheduleDate(options.schedule); + const common = buildCommonSendArgs(options, timeoutMs); + retries = common.retries; const { result, attempts } = await runSendOperation(globalFlags, { op: 'sendText', method, retries, invokeTimeoutMs: timeoutMs, args: { - chat: options.to, + ...common.args, text: options.message, - topicId, - replyToMessageId, - parseMode, noPreview: options.noPreview, - silent: options.silent || false, - noforwards: options.forwards === false, - scheduleDate, - retries, - retryBackoff, - timeoutMs, + scheduleDate: parseScheduleDate(options.schedule), }, }); const payload = { channelId: options.to, ...result }; @@ -2704,35 +2720,23 @@ async function runSendPhoto(globalFlags, options = {}) { try { if (!options.to) throw new Error('--to is required'); if (!options.photo) throw new Error('--photo is required'); - const parseMode = parseSendParseMode(options.parseMode); - if (parseMode && !(typeof options.caption === 'string' && options.caption.trim())) { + if (parseSendParseMode(options.parseMode) && !(typeof options.caption === 'string' && options.caption.trim())) { throw new Error('--parse-mode requires --caption for send photo'); } - retries = parseNonNegativeInt(options.retries, '--retries') ?? DEFAULT_SEND_RETRIES; - const retryBackoff = parseRetryBackoff(options.retryBackoff); - const topicId = parsePositiveInt(options.topic, '--topic'); - const replyToMessageId = parsePositiveInt(options.replyTo, '--reply-to'); - const scheduleDate = parseScheduleDate(options.schedule); + const common = buildCommonSendArgs(options, timeoutMs); + retries = common.retries; const { result, attempts } = await runSendOperation(globalFlags, { op: 'sendPhoto', method, retries, invokeTimeoutMs: timeoutMs, args: { - chat: options.to, + ...common.args, photo: options.photo, caption: options.caption, - topicId, - replyToMessageId, - parseMode, - silent: options.silent || false, - noforwards: options.forwards === false, captionAbove: options.captionAbove || false, spoiler: options.spoiler || false, - scheduleDate, - retries, - retryBackoff, - timeoutMs, + scheduleDate: parseScheduleDate(options.schedule), }, }); if (globalFlags.json) { @@ -2754,37 +2758,25 @@ async function runSendFile(globalFlags, options = {}) { try { if (!options.to) throw new Error('--to is required'); if (!options.file) throw new Error('--file is required'); - const parseMode = parseSendParseMode(options.parseMode); - if (parseMode && !(typeof options.caption === 'string' && options.caption.trim())) { + if (parseSendParseMode(options.parseMode) && !(typeof options.caption === 'string' && options.caption.trim())) { throw new Error('--parse-mode requires --caption for send file'); } - retries = parseNonNegativeInt(options.retries, '--retries') ?? DEFAULT_SEND_RETRIES; - const retryBackoff = parseRetryBackoff(options.retryBackoff); - const topicId = parsePositiveInt(options.topic, '--topic'); - const replyToMessageId = parsePositiveInt(options.replyTo, '--reply-to'); - const scheduleDate = parseScheduleDate(options.schedule); + const common = buildCommonSendArgs(options, timeoutMs); + retries = common.retries; const { result, attempts } = await runSendOperation(globalFlags, { op: 'sendFile', method, retries, invokeTimeoutMs: timeoutMs, args: { - chat: options.to, + ...common.args, file: options.file, caption: options.caption, filename: options.filename, - topicId, - replyToMessageId, - parseMode, - silent: options.silent || false, - noforwards: options.forwards === false, captionAbove: options.captionAbove || false, spoiler: options.spoiler || false, - scheduleDate, forceDocument: options.forceDocument || false, - retries, - retryBackoff, - timeoutMs, + scheduleDate: parseScheduleDate(options.schedule), }, }); const payload = { channelId: options.to, ...result }; diff --git a/core/command-context.js b/core/command-context.js index f445965..b6a0838 100644 --- a/core/command-context.js +++ b/core/command-context.js @@ -1,7 +1,7 @@ import { resolveStoreDir } from './store.js'; import { acquireReadLock, acquireStoreLock } from '../store-lock.js'; import { createMessageSyncService, createTelegramClient, resolveValidatedConfig } from './services.js'; -import { ensureServer, invoke } from './control-client.js'; +import { ensureServer, invoke, ServerUnavailableError } from './control-client.js'; import { OPERATIONS } from './operations.js'; // Runs fn(ctx) with exactly the services a command needs, owning the surrounding @@ -84,20 +84,30 @@ export async function withCommand(globalFlags, opts, fn) { // MTProto connection, the open DB, auth, and store locking, so the CLI is a thin // client here. // -// - Auto-start the server when it is not already running (ensureServer), then -// ask it to run OPERATIONS[op] against its warm services via POST -// /control/invoke and return that result. -// - invokeTimeoutMs bounds the client's wait on that request (e.g. the 30s -// send default); it is independent of the global --timeout. +// The common case — a server already running — costs a single round-trip: invoke +// the op directly. Only when that fails because the server is unreachable +// (ServerUnavailableError: no control.json or a refused loopback connection) do +// we start one (ensureServer) and retry the invoke once. An operation that runs +// and fails (a 4xx/5xx, including a send error) never triggers a server start. // -// A failure to reach or start the server surfaces as a clear error. +// invokeTimeoutMs bounds the client's wait on each request (e.g. the 30s send +// default); it is independent of the global --timeout. A request timeout means +// the server was reached but did not reply in time, so it surfaces to the caller +// rather than starting a second server. export async function runOperation(globalFlags, { op, args, invokeTimeoutMs } = {}) { if (!OPERATIONS[op]) { throw new Error(`runOperation: unknown operation "${op}"`); } const storeDir = resolveStoreDir(); - await ensureServer(storeDir, { idleExit: '60s' }); - return invoke(storeDir, { op, args, timeoutMs: invokeTimeoutMs }); + try { + return await invoke(storeDir, { op, args, timeoutMs: invokeTimeoutMs }); + } catch (error) { + if (!(error instanceof ServerUnavailableError)) { + throw error; + } + await ensureServer(storeDir, { idleExit: '60s' }); + return invoke(storeDir, { op, args, timeoutMs: invokeTimeoutMs }); + } } const VALID_NEEDS = new Set(['telegram', 'archive', 'full', 'worker']); diff --git a/core/control-client.js b/core/control-client.js index d550446..d2a7854 100644 --- a/core/control-client.js +++ b/core/control-client.js @@ -23,6 +23,34 @@ const ENQUEUE_TIMEOUT_MS = 30000; const SERVER_START_TIMEOUT_MS = 10000; const SERVER_POLL_INTERVAL_MS = 250; +// Raised when the control server cannot be reached at all: no control.json, a +// refused/failed loopback connection, etc. This is distinct from an operation +// that ran on a live server and failed (a 4xx/5xx). Callers use it to decide +// whether to start a server and retry, versus surfacing the operation's error. +export class ServerUnavailableError extends Error { + constructor(message, { cause } = {}) { + super(message); + this.name = 'ServerUnavailableError'; + if (cause !== undefined) { + this.cause = cause; + } + } +} + +// True for a network-level fetch failure (e.g. ECONNREFUSED) where the server +// never answered. An AbortSignal.timeout fires a TimeoutError and an explicit +// abort fires an AbortError; those mean the server was reached but did not reply +// in time, so they are NOT connection failures. +function isFetchConnectionFailure(error) { + if (!error) { + return false; + } + if (error.name === 'TimeoutError' || error.name === 'AbortError') { + return false; + } + return error instanceof TypeError || typeof error.code === 'string'; +} + // Parse /control.json, returning the descriptor or null when absent. // Shape: { pid, port, token, startedAt, version }. export function readControlFile(storeDir) { @@ -153,14 +181,26 @@ export async function retryBackfill(storeDir, { jobId, channelId, allErrors } = export async function invoke(storeDir, { op, args, timeoutMs } = {}) { const control = readControlFile(storeDir); if (!control) { - throw new Error('No control server is running.'); + throw new ServerUnavailableError('No control server is running.'); } const effectiveTimeoutMs = timeoutMs === undefined ? ENQUEUE_TIMEOUT_MS : timeoutMs; - const { status, json } = await controlFetch(control, '/control/invoke', { - method: 'POST', - body: { op, args }, - timeoutMs: effectiveTimeoutMs > 0 ? effectiveTimeoutMs : undefined, - }); + let response; + try { + response = await controlFetch(control, '/control/invoke', { + method: 'POST', + body: { op, args }, + timeoutMs: effectiveTimeoutMs > 0 ? effectiveTimeoutMs : undefined, + }); + } catch (error) { + // A refused/failed loopback connection means the server is not actually up; + // surface it as unavailable so the caller can start one and retry. A request + // timeout (TimeoutError/AbortError) reaches a live server and is re-thrown. + if (isFetchConnectionFailure(error)) { + throw new ServerUnavailableError('Could not reach the control server.', { cause: error }); + } + throw error; + } + const { status, json } = response; if (status !== 200) { if (json?.sendError) { throw new SendCommandError(json.sendError); diff --git a/core/operations.js b/core/operations.js index b7855b7..c06a980 100644 --- a/core/operations.js +++ b/core/operations.js @@ -28,6 +28,47 @@ function resolveSource(source) { return resolved; } +// Resolve a single result (a message or a context bundle) from one source, +// shared by the message ops that return one item. fetchLive/fetchArchive each +// return the formatted result or null when absent; the whole result comes from a +// single source so live and archive data never mix: +// live -> live only +// both -> live first, then archive +// archive -> archive first, then live as a fallback (flags usedLiveFallback) +// Returns { result, source, usedLiveFallback }, or null when nothing is found. +async function resolveFromSource({ source, fetchLive, fetchArchive }) { + let result = null; + let resolvedFrom = null; + let usedLiveFallback = false; + + if (source === 'live' || source === 'both') { + result = await fetchLive(); + if (result) { + resolvedFrom = 'live'; + } + } + + if (!result && (source === 'archive' || source === 'both')) { + result = await fetchArchive(); + if (result) { + resolvedFrom = 'archive'; + } + } + + if (!result && source === 'archive') { + result = await fetchLive(); + if (result) { + resolvedFrom = 'live'; + usedLiveFallback = true; + } + } + + if (!result) { + return null; + } + return { result, source: resolvedFrom ?? source, usedLiveFallback }; +} + function parseDateMs(value, label) { if (!value) { return null; @@ -126,6 +167,20 @@ function mergeMessageSets(sets, limit) { return limit && limit > 0 ? merged.slice(0, limit) : merged; } +// Pick the final message set and the source label it reports, shared by the +// list/search ops. A 'both' request merges the two sets; otherwise the result is +// drawn from a single source so live and archive rows never mix. An archive +// request that fell back to live reports 'live'. Returns { messages, source }. +function selectMessageResults({ source, archivedResults, liveResults, usedLiveFallback, limit }) { + if (source === 'both') { + return { messages: mergeMessageSets([archivedResults, liveResults], limit), source }; + } + if (source === 'live' || usedLiveFallback) { + return { messages: liveResults, source: 'live' }; + } + return { messages: archivedResults, source }; +} + function buildMessageListPayload(source, messages, requestedLimit) { const hasMore = requestedLimit ? messages.length === requestedLimit : false; const payload = { @@ -231,11 +286,6 @@ async function channelSetSync(ctx, args = {}) { }; } -// args: { chat, messageId }. Marks the channel read up to messageId. -async function channelMarkRead(ctx, args = {}) { - return ctx.telegramClient.markChannelRead(args.chat, args.messageId); -} - // --- messages --- async function fetchLiveMessagesForChannel(ctx, id, options = {}) { @@ -340,16 +390,13 @@ async function messagesList(ctx, args = {}) { usedLiveFallback = true; } - let messages = []; - let outputSource = resolvedSource; - if (resolvedSource === 'both') { - messages = mergeMessageSets([archivedResults, liveResults], limit); - } else if (resolvedSource === 'live' || usedLiveFallback) { - messages = liveResults; - outputSource = 'live'; - } else { - messages = archivedResults; - } + const { messages, source: outputSource } = selectMessageResults({ + source: resolvedSource, + archivedResults, + liveResults, + usedLiveFallback, + limit, + }); return { ...buildMessageListPayload(outputSource, messages, limit), usedLiveFallback }; } @@ -462,16 +509,13 @@ async function messagesSearch(ctx, args = {}) { } } - let messages = []; - let outputSource = resolvedSource; - if (resolvedSource === 'both') { - messages = mergeMessageSets([archivedResults, liveResults], limit); - } else if (resolvedSource === 'live' || usedLiveFallback) { - messages = liveResults; - outputSource = 'live'; - } else { - messages = archivedResults; - } + const { messages, source: outputSource } = selectMessageResults({ + source: resolvedSource, + archivedResults, + liveResults, + usedLiveFallback, + limit, + }); return { source: outputSource, returned: messages.length, messages, usedLiveFallback }; } @@ -481,9 +525,6 @@ async function messagesGet(ctx, args = {}) { const resolvedSource = resolveSource(args.source); const channelId = args.channelId; const messageId = args.messageId; - let message = null; - let resolvedFrom = null; - let usedLiveFallback = false; const fetchLive = async () => { const live = await ctx.telegramClient.getMessageById(channelId, messageId); @@ -497,34 +538,17 @@ async function messagesGet(ctx, args = {}) { }; }; - if (resolvedSource === 'live' || resolvedSource === 'both') { - message = await fetchLive(); - if (message) { - resolvedFrom = 'live'; - } - } - - if (!message && (resolvedSource === 'archive' || resolvedSource === 'both')) { + const fetchArchive = async () => { const archived = ctx.messageSyncService.getArchivedMessage({ channelId, messageId }); - if (archived) { - message = { ...archived, source: 'archive' }; - resolvedFrom = 'archive'; - } - } - - if (!message && resolvedSource === 'archive') { - message = await fetchLive(); - if (message) { - resolvedFrom = 'live'; - usedLiveFallback = true; - } - } + return archived ? { ...archived, source: 'archive' } : null; + }; - if (!message) { + const resolved = await resolveFromSource({ source: resolvedSource, fetchLive, fetchArchive }); + if (!resolved) { throw new Error('Message not found.'); } - return { source: resolvedFrom ?? resolvedSource, message, usedLiveFallback }; + return { source: resolved.source, message: resolved.result, usedLiveFallback: resolved.usedLiveFallback }; } // args: { channelId, messageId, before?, after?, source? }. @@ -535,9 +559,6 @@ async function messagesContext(ctx, args = {}) { const messageId = args.messageId; const safeBefore = Number.isFinite(args.before) ? args.before : 20; const safeAfter = Number.isFinite(args.after) ? args.after : 20; - let context = null; - let resolvedFrom = null; - let usedLiveFallback = false; const fetchLive = async () => { const liveContext = await ctx.telegramClient.getMessageContext(channelId, messageId, { @@ -559,43 +580,29 @@ async function messagesContext(ctx, args = {}) { }; }; - if (resolvedSource === 'live' || resolvedSource === 'both') { - context = await fetchLive(); - if (context) { - resolvedFrom = 'live'; - } - } - - if (!context && (resolvedSource === 'archive' || resolvedSource === 'both')) { + const fetchArchive = async () => { const archiveContext = ctx.messageSyncService.getArchivedMessageContext({ channelId, messageId, before: safeBefore, after: safeAfter, }); - if (archiveContext.target) { - context = { - target: { ...archiveContext.target, source: 'archive' }, - before: archiveContext.before.map((message) => ({ ...message, source: 'archive' })), - after: archiveContext.after.map((message) => ({ ...message, source: 'archive' })), - }; - resolvedFrom = 'archive'; - } - } - - if (!context && resolvedSource === 'archive') { - context = await fetchLive(); - if (context) { - resolvedFrom = 'live'; - usedLiveFallback = true; + if (!archiveContext.target) { + return null; } - } + return { + target: { ...archiveContext.target, source: 'archive' }, + before: archiveContext.before.map((message) => ({ ...message, source: 'archive' })), + after: archiveContext.after.map((message) => ({ ...message, source: 'archive' })), + }; + }; - if (!context) { + const resolved = await resolveFromSource({ source: resolvedSource, fetchLive, fetchArchive }); + if (!resolved) { throw new Error('Message not found.'); } - return { source: resolvedFrom ?? resolvedSource, ...context, usedLiveFallback }; + return { source: resolved.source, ...resolved.result, usedLiveFallback: resolved.usedLiveFallback }; } // --- send --- @@ -672,15 +679,6 @@ async function sendFile(ctx, args = {}) { ); } -// --- media --- - -// args: { channelId, messageId, outputPath? }. Returns the download descriptor. -async function mediaDownload(ctx, args = {}) { - return ctx.telegramClient.downloadMessageMedia(args.channelId, args.messageId, { - outputPath: args.outputPath, - }); -} - // --- topics --- // args: { channelId, query?, limit? }. Lists/searches forum topics and refreshes @@ -706,20 +704,6 @@ async function tagsSet(ctx, args = {}) { return { channelId: args.chat, tags: finalTags }; } -// args: { chat, source? }. Lists a channel's tags. -async function tagsList(ctx, args = {}) { - return ctx.messageSyncService.listChannelTags(args.chat, { source: args.source }); -} - -// args: { tag, source?, limit? }. Lists channels carrying a tag. -async function tagsSearch(ctx, args = {}) { - const limit = args.limit ?? 100; - return ctx.messageSyncService.listTaggedChannels(args.tag, { - source: args.source, - limit, - }); -} - // args: { channelIds?, limit?, source?, refreshMetadata? }. Auto-tags channels. async function tagsAuto(ctx, args = {}) { const channelIds = normalizeChannelIds(args.channelIds ?? args.channelId); @@ -821,19 +805,6 @@ async function contactsNotesSet(ctx, args = {}) { // --- groups --- -// args: { query?, limit? }. Lists group chats and supergroups. -async function groupsList(ctx, args = {}) { - return ctx.telegramClient.listGroups({ - query: args.query, - limit: args.limit ?? 100, - }); -} - -// args: { chat }. Returns group info and metadata. -async function groupsInfo(ctx, args = {}) { - return ctx.telegramClient.getGroupInfo(args.chat); -} - // args: { chat, name }. Renames a group. async function groupsRename(ctx, args = {}) { await ctx.telegramClient.renameGroup(args.chat, args.name); @@ -852,22 +823,12 @@ async function groupMembersRemove(ctx, args = {}) { return { channelId: args.chat, ...result }; } -// args: { chat }. Returns the primary invite-link descriptor. -async function getGroupInviteLink(ctx, args = {}) { - return ctx.telegramClient.getGroupInviteLink(args.chat); -} - // args: { chat }. Revokes the primary invite link, returning the new descriptor. async function revokeGroupInviteLink(ctx, args = {}) { const existing = await ctx.telegramClient.getGroupInviteLink(args.chat); return ctx.telegramClient.revokeGroupInviteLink(args.chat, existing); } -// args: { invite }. Joins a group via invite link/code. -async function groupsJoin(ctx, args = {}) { - return ctx.telegramClient.joinGroup(args.invite); -} - // args: { chat }. Leaves a group. async function groupsLeave(ctx, args = {}) { await ctx.telegramClient.leaveGroup(args.chat); @@ -876,16 +837,6 @@ async function groupsLeave(ctx, args = {}) { // --- folders --- -// args: {}. Lists all chat folders. -async function foldersList(ctx) { - return ctx.telegramClient.getFolders(); -} - -// args: { folder, resolve? }. Shows folder details. -async function foldersShow(ctx, args = {}) { - return ctx.telegramClient.showFolder(args.folder, { resolve: args.resolve }); -} - // args: folder definition fields. Creates a folder. async function foldersCreate(ctx, args = {}) { return ctx.telegramClient.createFolder({ @@ -905,41 +856,45 @@ async function foldersCreate(ctx, args = {}) { }); } -// args: { folder, modification }. Edits a folder. -async function foldersEdit(ctx, args = {}) { - return ctx.telegramClient.editFolder(args.folder, args.modification ?? {}); -} - -// args: { folder }. Deletes a folder. -async function foldersDelete(ctx, args = {}) { - return ctx.telegramClient.deleteFolder(args.folder); -} - -// args: { ids }. Reorders folders. -async function foldersReorder(ctx, args = {}) { - return ctx.telegramClient.setFoldersOrder(args.ids); -} - -// args: { folder, chat }. Adds a chat to a folder. -async function foldersChatsAdd(ctx, args = {}) { - return ctx.telegramClient.addChatToFolder(args.folder, args.chat); -} - -// args: { folder, chat }. Removes a chat from a folder. -async function foldersChatsRemove(ctx, args = {}) { - return ctx.telegramClient.removeChatFromFolder(args.folder, args.chat); -} +// --- passthrough ops --- + +// Simple ops are a 1:1 forward to a warm service method whose result is returned +// unchanged. Each entry maps the op's args object to that method's call +// arguments; PASSTHROUGHS expands into handlers with the same (ctx, args) => +// result contract as the hand-written ops. Anything that reshapes a result, +// applies defaults beyond a plain map, or calls more than one method stays a +// hand-written composite above. +const PASSTHROUGHS = { + channelMarkRead: ['telegramClient', 'markChannelRead', (a) => [a.chat, a.messageId]], + mediaDownload: ['telegramClient', 'downloadMessageMedia', (a) => [a.channelId, a.messageId, { outputPath: a.outputPath }]], + tagsList: ['messageSyncService', 'listChannelTags', (a) => [a.chat, { source: a.source }]], + tagsSearch: ['messageSyncService', 'listTaggedChannels', (a) => [a.tag, { source: a.source, limit: a.limit ?? 100 }]], + groupsList: ['telegramClient', 'listGroups', (a) => [{ query: a.query, limit: a.limit ?? 100 }]], + groupsInfo: ['telegramClient', 'getGroupInfo', (a) => [a.chat]], + getGroupInviteLink: ['telegramClient', 'getGroupInviteLink', (a) => [a.chat]], + groupsJoin: ['telegramClient', 'joinGroup', (a) => [a.invite]], + foldersList: ['telegramClient', 'getFolders', () => []], + foldersShow: ['telegramClient', 'showFolder', (a) => [a.folder, { resolve: a.resolve }]], + foldersEdit: ['telegramClient', 'editFolder', (a) => [a.folder, a.modification ?? {}]], + foldersDelete: ['telegramClient', 'deleteFolder', (a) => [a.folder]], + foldersReorder: ['telegramClient', 'setFoldersOrder', (a) => [a.ids]], + foldersChatsAdd: ['telegramClient', 'addChatToFolder', (a) => [a.folder, a.chat]], + foldersChatsRemove: ['telegramClient', 'removeChatFromFolder', (a) => [a.folder, a.chat]], + foldersJoin: ['telegramClient', 'joinChatlist', (a) => [a.link]], +}; -// args: { link }. Joins a shared folder via invite link. -async function foldersJoin(ctx, args = {}) { - return ctx.telegramClient.joinChatlist(args.link); +function buildPassthroughOps(table) { + const ops = {}; + for (const [name, [target, method, mapArgs]] of Object.entries(table)) { + ops[name] = (ctx, args = {}) => ctx[target][method](...mapArgs(args)); + } + return ops; } export const OPERATIONS = { listChannels, channelShow, channelSetSync, - channelMarkRead, messagesList, messagesSearch, messagesGet, @@ -947,11 +902,8 @@ export const OPERATIONS = { sendText, sendPhoto, sendFile, - mediaDownload, topicsList, tagsSet, - tagsList, - tagsSearch, tagsAuto, metadataGet, metadataRefresh, @@ -962,28 +914,18 @@ export const OPERATIONS = { contactsTagsAdd, contactsTagsRemove, contactsNotesSet, - groupsList, - groupsInfo, groupsRename, groupMembersAdd, groupMembersRemove, - getGroupInviteLink, revokeGroupInviteLink, - groupsJoin, groupsLeave, - foldersList, - foldersShow, foldersCreate, - foldersEdit, - foldersDelete, - foldersReorder, - foldersChatsAdd, - foldersChatsRemove, - foldersJoin, + ...buildPassthroughOps(PASSTHROUGHS), }; -// Re-exported so the CLI and MCP layers can render live/merged message sets -// without re-deriving the shared formatting the ops produce. +// Shared building blocks the ops use, re-exported so other layers can render +// live/merged message sets with the same formatting the ops produce rather than +// re-deriving it. export { buildSendSuccessPayload, formatLiveMessage, diff --git a/tests/backfill-rename.test.js b/tests/backfill-rename.test.js index 8ee078d..b7538a5 100644 --- a/tests/backfill-rename.test.js +++ b/tests/backfill-rename.test.js @@ -27,6 +27,13 @@ vi.mock('../core/services.js', () => ({ // Commands routed through the warm server reach it via ensureServer + invoke; // stub both so `channels watch/unwatch` resolve to a server invoke without any // real process or network. +class ServerUnavailableError extends Error { + constructor(message) { + super(message); + this.name = 'ServerUnavailableError'; + } +} + vi.mock('../core/control-client.js', () => ({ ensureServer: (...args) => control.ensureServer(...args), invoke: (...args) => control.invoke(...args), @@ -34,6 +41,7 @@ vi.mock('../core/control-client.js', () => ({ enqueueBackfill: (...args) => control.enqueueBackfill(...args), cancelBackfill: vi.fn(), retryBackfill: vi.fn(), + ServerUnavailableError, })); vi.mock('../store-lock.js', () => ({ @@ -173,7 +181,8 @@ describe('channels watch / unwatch', () => { it('`channels watch --chat X` routes channelSetSync(enable) to the server', async () => { await runProgram(['channels', 'watch', '--chat', '@chan']); - expect(control.ensureServer).toHaveBeenCalledTimes(1); + // The server is reachable, so the op invokes directly without a spawn. + expect(control.ensureServer).not.toHaveBeenCalled(); expect(control.invoke).toHaveBeenCalledWith('/tmp/tgcli-test-store', { op: 'channelSetSync', args: { chat: '@chan', enable: true }, diff --git a/tests/control-client.test.js b/tests/control-client.test.js index 1fecc0f..1584f60 100644 --- a/tests/control-client.test.js +++ b/tests/control-client.test.js @@ -19,6 +19,7 @@ import { pingServer, readControlFile, retryBackfill, + ServerUnavailableError, } from '../core/control-client.js'; import { CONTROL_TOKEN_HEADER } from '../core/control-server.js'; import { SendCommandError } from '../core/send-utils.js'; @@ -157,7 +158,35 @@ describe('invoke', () => { it('throws the server error message on a non-200 response', async () => { global.fetch.mockResolvedValue(jsonResponse(400, { error: 'Unknown operation: nope' })); - await expect(invoke(storeDir, { op: 'nope', args: {} })).rejects.toThrow('Unknown operation: nope'); + const error = await invoke(storeDir, { op: 'nope', args: {} }).catch((e) => e); + expect(error.message).toBe('Unknown operation: nope'); + // An op that ran and failed is NOT a connection failure. + expect(error).not.toBeInstanceOf(ServerUnavailableError); + }); + + it('throws ServerUnavailableError when no control file is present', async () => { + fs.rmSync(path.join(storeDir, 'control.json')); + const error = await invoke(storeDir, { op: 'listChannels', args: {} }).catch((e) => e); + expect(error).toBeInstanceOf(ServerUnavailableError); + expect(global.fetch).not.toHaveBeenCalled(); + }); + + it('throws ServerUnavailableError on a refused loopback connection', async () => { + const refused = new TypeError('fetch failed'); + refused.cause = Object.assign(new Error('connect ECONNREFUSED'), { code: 'ECONNREFUSED' }); + global.fetch.mockRejectedValue(refused); + const error = await invoke(storeDir, { op: 'listChannels', args: {} }).catch((e) => e); + expect(error).toBeInstanceOf(ServerUnavailableError); + expect(error.cause).toBe(refused); + }); + + it('re-throws a request timeout (not a connection failure, so no retry/spawn)', async () => { + const timeout = new Error('The operation was aborted due to timeout'); + timeout.name = 'TimeoutError'; + global.fetch.mockRejectedValue(timeout); + const error = await invoke(storeDir, { op: 'sendText', args: {} }).catch((e) => e); + expect(error).toBe(timeout); + expect(error).not.toBeInstanceOf(ServerUnavailableError); }); it('reconstructs a SendCommandError from a sendError payload', async () => { diff --git a/tests/operations-seam.test.js b/tests/operations-seam.test.js index 510c8b9..064c2a3 100644 --- a/tests/operations-seam.test.js +++ b/tests/operations-seam.test.js @@ -12,9 +12,17 @@ const hooks = vi.hoisted(() => ({ invoke: null, })); +class ServerUnavailableError extends Error { + constructor(message) { + super(message); + this.name = 'ServerUnavailableError'; + } +} + vi.mock('../core/control-client.js', () => ({ ensureServer: (...args) => hooks.ensureServer(...args), invoke: (...args) => hooks.invoke(...args), + ServerUnavailableError, })); // command-context.js still imports services.js for the retained withCommand seam; @@ -116,13 +124,103 @@ describe('OPERATIONS registry', () => { }); }); +describe('generic passthrough ops', () => { + it('foldersShow forwards args to the warm client method and returns the result', async () => { + const showFolder = vi.fn().mockResolvedValue({ id: 4, title: 'Work' }); + const result = await OPERATIONS.foldersShow( + { telegramClient: { showFolder } }, + { folder: 'Work', resolve: true }, + ); + expect(showFolder).toHaveBeenCalledWith('Work', { resolve: true }); + expect(result).toEqual({ id: 4, title: 'Work' }); + }); + + it('tagsSearch forwards to the archive service with the default limit', async () => { + const listTaggedChannels = vi.fn().mockReturnValue([{ channelId: '1' }]); + const result = await OPERATIONS.tagsSearch( + { messageSyncService: { listTaggedChannels } }, + { tag: 'news', source: 'user' }, + ); + expect(listTaggedChannels).toHaveBeenCalledWith('news', { source: 'user', limit: 100 }); + expect(result).toEqual([{ channelId: '1' }]); + }); + + it('groupsInfo forwards the chat through unchanged', async () => { + const getGroupInfo = vi.fn().mockResolvedValue({ id: '-100', title: 'G' }); + const result = await OPERATIONS.groupsInfo({ telegramClient: { getGroupInfo } }, { chat: '@g' }); + expect(getGroupInfo).toHaveBeenCalledWith('@g'); + expect(result).toEqual({ id: '-100', title: 'G' }); + }); +}); + +describe('messagesGet source resolution', () => { + function buildCtx({ archived = null, live = null } = {}) { + return { + telegramClient: { + getMessageById: vi.fn().mockResolvedValue(live), + getPeerMetadata: vi.fn().mockResolvedValue({ peerTitle: 'Live Title', username: 'liveuser' }), + }, + messageSyncService: { + getArchivedMessage: vi.fn().mockReturnValue(archived), + getChannelMetadata: vi.fn().mockReturnValue(null), + }, + }; + } + + it('source live returns only the live message', async () => { + const ctx = buildCtx({ live: { id: 5, message: 'live text', date: 1 } }); + const result = await OPERATIONS.messagesGet(ctx, { channelId: '-100', messageId: 5, source: 'live' }); + expect(result.source).toBe('live'); + expect(result.message.source).toBe('live'); + expect(result.usedLiveFallback).toBe(false); + expect(ctx.messageSyncService.getArchivedMessage).not.toHaveBeenCalled(); + }); + + it('source archive returns only the archived message (no live call)', async () => { + const ctx = buildCtx({ archived: { messageId: 5, text: 'archived', date: '2026-01-01' } }); + const result = await OPERATIONS.messagesGet(ctx, { channelId: '-100', messageId: 5, source: 'archive' }); + expect(result.source).toBe('archive'); + expect(result.message.source).toBe('archive'); + expect(result.usedLiveFallback).toBe(false); + expect(ctx.telegramClient.getMessageById).not.toHaveBeenCalled(); + }); + + it('source archive falls back to live and flags usedLiveFallback when absent in archive', async () => { + const ctx = buildCtx({ archived: null, live: { id: 5, message: 'live text', date: 1 } }); + const result = await OPERATIONS.messagesGet(ctx, { channelId: '-100', messageId: 5, source: 'archive' }); + expect(result.source).toBe('live'); + expect(result.message.source).toBe('live'); + expect(result.usedLiveFallback).toBe(true); + }); + + it('source both prefers live and never mixes sources', async () => { + const ctx = buildCtx({ + archived: { messageId: 5, text: 'archived', date: '2026-01-01' }, + live: { id: 5, message: 'live text', date: 1 }, + }); + const result = await OPERATIONS.messagesGet(ctx, { channelId: '-100', messageId: 5, source: 'both' }); + expect(result.source).toBe('live'); + expect(result.message.source).toBe('live'); + expect(ctx.messageSyncService.getArchivedMessage).not.toHaveBeenCalled(); + }); + + it('throws when the message is absent everywhere', async () => { + const ctx = buildCtx({ archived: null, live: null }); + await expect( + OPERATIONS.messagesGet(ctx, { channelId: '-100', messageId: 5, source: 'archive' }), + ).rejects.toThrow('Message not found.'); + }); +}); + describe('runOperation seam', () => { - it('auto-starts the server then invokes the op, returning its result', async () => { + it('invokes the op directly when a server is already up (no spawn)', async () => { hooks.invoke.mockResolvedValue([{ id: 'warm' }]); const result = await runOperation({}, { op: 'listChannels', args: { limit: 3 } }); - expect(hooks.ensureServer).toHaveBeenCalledWith('/tmp/tgcli-seam-store', { idleExit: '60s' }); + // The warm path costs a single round-trip: invoke, no ensureServer. + expect(hooks.ensureServer).not.toHaveBeenCalled(); + expect(hooks.invoke).toHaveBeenCalledTimes(1); expect(hooks.invoke).toHaveBeenCalledWith('/tmp/tgcli-seam-store', { op: 'listChannels', args: { limit: 3 }, @@ -131,6 +229,30 @@ describe('runOperation seam', () => { expect(result).toEqual([{ id: 'warm' }]); }); + it('starts a server and retries once when the first invoke finds none running', async () => { + hooks.invoke + .mockRejectedValueOnce(new ServerUnavailableError('No control server is running.')) + .mockResolvedValueOnce([{ id: 'warm' }]); + + const result = await runOperation({}, { op: 'listChannels', args: { limit: 3 } }); + + expect(hooks.ensureServer).toHaveBeenCalledWith('/tmp/tgcli-seam-store', { idleExit: '60s' }); + expect(hooks.invoke).toHaveBeenCalledTimes(2); + expect(result).toEqual([{ id: 'warm' }]); + }); + + it('does NOT start a server when the op runs and fails (a 4xx/5xx)', async () => { + // A live server ran the op and it failed; that is not a connection failure, + // so we surface the error without spawning a second server. + hooks.invoke.mockRejectedValue(new Error('Operation failed (HTTP 500)')); + + await expect( + runOperation({}, { op: 'listChannels', args: {} }), + ).rejects.toThrow('Operation failed (HTTP 500)'); + expect(hooks.ensureServer).not.toHaveBeenCalled(); + expect(hooks.invoke).toHaveBeenCalledTimes(1); + }); + it('forwards invokeTimeoutMs as the invoke wait budget', async () => { hooks.invoke.mockResolvedValue({ ok: true }); @@ -150,10 +272,12 @@ describe('runOperation seam', () => { }); it('surfaces a server start failure as a clear error', async () => { + hooks.invoke.mockRejectedValue(new ServerUnavailableError('No control server is running.')); hooks.ensureServer.mockRejectedValue(new Error('Timed out waiting for the control server to start.')); await expect( runOperation({}, { op: 'listChannels', args: {} }), ).rejects.toThrow('Timed out waiting for the control server to start.'); - expect(hooks.invoke).not.toHaveBeenCalled(); + // Only the initial invoke ran; the retry never happened because start failed. + expect(hooks.invoke).toHaveBeenCalledTimes(1); }); }); diff --git a/tests/send-timeout.test.js b/tests/send-timeout.test.js index 46df6c0..067a0ac 100644 --- a/tests/send-timeout.test.js +++ b/tests/send-timeout.test.js @@ -21,12 +21,20 @@ vi.mock('../core/services.js', () => ({ // Send commands route through the warm server (ensureServer + invoke). The CLI's // timeoutMs bounds its wait on invoke; modeling that wait here lets us assert the // CLI surfaces a clear send-timeout without any real process or network. +class ServerUnavailableError extends Error { + constructor(message) { + super(message); + this.name = 'ServerUnavailableError'; + } +} + vi.mock('../core/control-client.js', () => ({ ensureServer: (...args) => control.ensureServer(...args), invoke: (...args) => control.invoke(...args), pingServer: vi.fn(), enqueueBackfill: vi.fn(), cancelBackfill: vi.fn(), + ServerUnavailableError, })); vi.mock('../store-lock.js', () => ({