Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 42 additions & 50 deletions cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -2601,6 +2601,34 @@ function resolveSendAliases(options) {
if (!options.message && options.text) options.message = options.text;
}

// Parse and build the send args shared by every send command (delivery target,
// threading, parse mode, flood/timeout knobs), in the same validation order
// across commands. The schedule is parsed last by the caller so a media command
// can run its --parse-mode/--caption check first. Per-command payload fields
// (text, photo + caption, file + filename, ...) are added by the caller. Returns
// the parsed retries alongside the args so the handler can classify a later error.
function buildCommonSendArgs(options, timeoutMs) {
// Parse in the same order the per-command handlers always have, so a malformed
// flag reports the same first error regardless of command.
const parseMode = parseSendParseMode(options.parseMode);
const retries = parseNonNegativeInt(options.retries, '--retries') ?? DEFAULT_SEND_RETRIES;
const retryBackoff = parseRetryBackoff(options.retryBackoff);
const topicId = parsePositiveInt(options.topic, '--topic');
const replyToMessageId = parsePositiveInt(options.replyTo, '--reply-to');
const args = {
chat: options.to,
topicId,
replyToMessageId,
parseMode,
silent: options.silent || false,
noforwards: options.forwards === false,
retries,
retryBackoff,
timeoutMs,
};
return { args, retries };
}

function normalizeSendCommandError(error, { method, retries, attempt = 1 } = {}) {
if (error instanceof SendCommandError) return error;
if (error instanceof TypeError || error instanceof ReferenceError || error instanceof SyntaxError || error instanceof RangeError) return error;
Expand Down Expand Up @@ -2655,30 +2683,18 @@ async function runSendText(globalFlags, options = {}) {
try {
if (!options.to) throw new Error('--to is required');
if (!options.message) throw new Error('--message is required');
const parseMode = parseSendParseMode(options.parseMode);
retries = parseNonNegativeInt(options.retries, '--retries') ?? DEFAULT_SEND_RETRIES;
const retryBackoff = parseRetryBackoff(options.retryBackoff);
const topicId = parsePositiveInt(options.topic, '--topic');
const replyToMessageId = parsePositiveInt(options.replyTo, '--reply-to');
const scheduleDate = parseScheduleDate(options.schedule);
const common = buildCommonSendArgs(options, timeoutMs);
retries = common.retries;
const { result, attempts } = await runSendOperation(globalFlags, {
op: 'sendText',
method,
retries,
invokeTimeoutMs: timeoutMs,
args: {
chat: options.to,
...common.args,
text: options.message,
topicId,
replyToMessageId,
parseMode,
noPreview: options.noPreview,
silent: options.silent || false,
noforwards: options.forwards === false,
scheduleDate,
retries,
retryBackoff,
timeoutMs,
scheduleDate: parseScheduleDate(options.schedule),
},
});
const payload = { channelId: options.to, ...result };
Expand All @@ -2704,35 +2720,23 @@ async function runSendPhoto(globalFlags, options = {}) {
try {
if (!options.to) throw new Error('--to is required');
if (!options.photo) throw new Error('--photo is required');
const parseMode = parseSendParseMode(options.parseMode);
if (parseMode && !(typeof options.caption === 'string' && options.caption.trim())) {
if (parseSendParseMode(options.parseMode) && !(typeof options.caption === 'string' && options.caption.trim())) {
throw new Error('--parse-mode requires --caption for send photo');
}
retries = parseNonNegativeInt(options.retries, '--retries') ?? DEFAULT_SEND_RETRIES;
const retryBackoff = parseRetryBackoff(options.retryBackoff);
const topicId = parsePositiveInt(options.topic, '--topic');
const replyToMessageId = parsePositiveInt(options.replyTo, '--reply-to');
const scheduleDate = parseScheduleDate(options.schedule);
const common = buildCommonSendArgs(options, timeoutMs);
retries = common.retries;
const { result, attempts } = await runSendOperation(globalFlags, {
op: 'sendPhoto',
method,
retries,
invokeTimeoutMs: timeoutMs,
args: {
chat: options.to,
...common.args,
photo: options.photo,
caption: options.caption,
topicId,
replyToMessageId,
parseMode,
silent: options.silent || false,
noforwards: options.forwards === false,
captionAbove: options.captionAbove || false,
spoiler: options.spoiler || false,
scheduleDate,
retries,
retryBackoff,
timeoutMs,
scheduleDate: parseScheduleDate(options.schedule),
},
});
if (globalFlags.json) {
Expand All @@ -2754,37 +2758,25 @@ async function runSendFile(globalFlags, options = {}) {
try {
if (!options.to) throw new Error('--to is required');
if (!options.file) throw new Error('--file is required');
const parseMode = parseSendParseMode(options.parseMode);
if (parseMode && !(typeof options.caption === 'string' && options.caption.trim())) {
if (parseSendParseMode(options.parseMode) && !(typeof options.caption === 'string' && options.caption.trim())) {
throw new Error('--parse-mode requires --caption for send file');
}
retries = parseNonNegativeInt(options.retries, '--retries') ?? DEFAULT_SEND_RETRIES;
const retryBackoff = parseRetryBackoff(options.retryBackoff);
const topicId = parsePositiveInt(options.topic, '--topic');
const replyToMessageId = parsePositiveInt(options.replyTo, '--reply-to');
const scheduleDate = parseScheduleDate(options.schedule);
const common = buildCommonSendArgs(options, timeoutMs);
retries = common.retries;
const { result, attempts } = await runSendOperation(globalFlags, {
op: 'sendFile',
method,
retries,
invokeTimeoutMs: timeoutMs,
args: {
chat: options.to,
...common.args,
file: options.file,
caption: options.caption,
filename: options.filename,
topicId,
replyToMessageId,
parseMode,
silent: options.silent || false,
noforwards: options.forwards === false,
captionAbove: options.captionAbove || false,
spoiler: options.spoiler || false,
scheduleDate,
forceDocument: options.forceDocument || false,
retries,
retryBackoff,
timeoutMs,
scheduleDate: parseScheduleDate(options.schedule),
},
});
const payload = { channelId: options.to, ...result };
Expand Down
28 changes: 19 additions & 9 deletions core/command-context.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { resolveStoreDir } from './store.js';
import { acquireReadLock, acquireStoreLock } from '../store-lock.js';
import { createMessageSyncService, createTelegramClient, resolveValidatedConfig } from './services.js';
import { ensureServer, invoke } from './control-client.js';
import { ensureServer, invoke, ServerUnavailableError } from './control-client.js';
import { OPERATIONS } from './operations.js';

// Runs fn(ctx) with exactly the services a command needs, owning the surrounding
Expand Down Expand Up @@ -84,20 +84,30 @@ export async function withCommand(globalFlags, opts, fn) {
// MTProto connection, the open DB, auth, and store locking, so the CLI is a thin
// client here.
//
// - Auto-start the server when it is not already running (ensureServer), then
// ask it to run OPERATIONS[op] against its warm services via POST
// /control/invoke and return that result.
// - invokeTimeoutMs bounds the client's wait on that request (e.g. the 30s
// send default); it is independent of the global --timeout.
// The common case — a server already running — costs a single round-trip: invoke
// the op directly. Only when that fails because the server is unreachable
// (ServerUnavailableError: no control.json or a refused loopback connection) do
// we start one (ensureServer) and retry the invoke once. An operation that runs
// and fails (a 4xx/5xx, including a send error) never triggers a server start.
//
// A failure to reach or start the server surfaces as a clear error.
// invokeTimeoutMs bounds the client's wait on each request (e.g. the 30s send
// default); it is independent of the global --timeout. A request timeout means
// the server was reached but did not reply in time, so it surfaces to the caller
// rather than starting a second server.
export async function runOperation(globalFlags, { op, args, invokeTimeoutMs } = {}) {
if (!OPERATIONS[op]) {
throw new Error(`runOperation: unknown operation "${op}"`);
}
const storeDir = resolveStoreDir();
await ensureServer(storeDir, { idleExit: '60s' });
return invoke(storeDir, { op, args, timeoutMs: invokeTimeoutMs });
try {
return await invoke(storeDir, { op, args, timeoutMs: invokeTimeoutMs });
} catch (error) {
if (!(error instanceof ServerUnavailableError)) {
throw error;
}
await ensureServer(storeDir, { idleExit: '60s' });
return invoke(storeDir, { op, args, timeoutMs: invokeTimeoutMs });
}
}

const VALID_NEEDS = new Set(['telegram', 'archive', 'full', 'worker']);
Expand Down
52 changes: 46 additions & 6 deletions core/control-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ const ENQUEUE_TIMEOUT_MS = 30000;
const SERVER_START_TIMEOUT_MS = 10000;
const SERVER_POLL_INTERVAL_MS = 250;

// Raised when the control server cannot be reached at all: no control.json, a
// refused/failed loopback connection, etc. This is distinct from an operation
// that ran on a live server and failed (a 4xx/5xx). Callers use it to decide
// whether to start a server and retry, versus surfacing the operation's error.
export class ServerUnavailableError extends Error {
constructor(message, { cause } = {}) {
super(message);
this.name = 'ServerUnavailableError';
if (cause !== undefined) {
this.cause = cause;
}
}
}

// True for a network-level fetch failure (e.g. ECONNREFUSED) where the server
// never answered. An AbortSignal.timeout fires a TimeoutError and an explicit
// abort fires an AbortError; those mean the server was reached but did not reply
// in time, so they are NOT connection failures.
function isFetchConnectionFailure(error) {
if (!error) {
return false;
}
if (error.name === 'TimeoutError' || error.name === 'AbortError') {
return false;
}
return error instanceof TypeError || typeof error.code === 'string';
}

// Parse <store>/control.json, returning the descriptor or null when absent.
// Shape: { pid, port, token, startedAt, version }.
export function readControlFile(storeDir) {
Expand Down Expand Up @@ -153,14 +181,26 @@ export async function retryBackfill(storeDir, { jobId, channelId, allErrors } =
export async function invoke(storeDir, { op, args, timeoutMs } = {}) {
const control = readControlFile(storeDir);
if (!control) {
throw new Error('No control server is running.');
throw new ServerUnavailableError('No control server is running.');
}
const effectiveTimeoutMs = timeoutMs === undefined ? ENQUEUE_TIMEOUT_MS : timeoutMs;
const { status, json } = await controlFetch(control, '/control/invoke', {
method: 'POST',
body: { op, args },
timeoutMs: effectiveTimeoutMs > 0 ? effectiveTimeoutMs : undefined,
});
let response;
try {
response = await controlFetch(control, '/control/invoke', {
method: 'POST',
body: { op, args },
timeoutMs: effectiveTimeoutMs > 0 ? effectiveTimeoutMs : undefined,
});
} catch (error) {
// A refused/failed loopback connection means the server is not actually up;
// surface it as unavailable so the caller can start one and retry. A request
// timeout (TimeoutError/AbortError) reaches a live server and is re-thrown.
if (isFetchConnectionFailure(error)) {
throw new ServerUnavailableError('Could not reach the control server.', { cause: error });
}
throw error;
}
const { status, json } = response;
if (status !== 200) {
if (json?.sendError) {
throw new SendCommandError(json.sendError);
Expand Down
Loading