Skip to content

Commit e5e8f24

Browse files
committed
Enhance CLI commands with --force option and dry run support
- Added a --force option to various CLI commands to allow reprocessing of items, ignoring their processed state. - Integrated dry run functionality across tasks to log actions without executing them, providing feedback on what would be processed. - Updated relevant tasks to handle the new options and ensure consistent behavior during dry runs.
1 parent baee05d commit e5e8f24

10 files changed

Lines changed: 137 additions & 58 deletions

CLAUDE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Uses `@workglow/util`'s `globalServiceRegistry` with typed tokens. Production us
3838
### Schema Pattern
3939

4040
Schemas use TypeBox (v1, imported as `typebox`). Each storage module exports:
41+
4142
- A TypeBox schema (e.g., `AddressSchema`)
4243
- Primary key name constants (e.g., `AddressPrimaryKeyNames`)
4344
- A DI token (e.g., `ADDRESS_REPOSITORY_TOKEN`)
@@ -46,6 +47,7 @@ Schemas use TypeBox (v1, imported as `typebox`). Each storage module exports:
4647
### Environment Variables
4748

4849
Set in `.env.local` (see `.env.test` for test defaults):
50+
4951
- `SEC_RAW_DATA_FOLDER` — path to raw downloaded data
5052
- `SEC_DB_FOLDER` — path to SQLite database directory
5153
- `SEC_DB_NAME` — database name (default: `edgar`)
@@ -60,6 +62,7 @@ Set in `.env.local` (see `.env.test` for test defaults):
6062
## TypeScript Conventions
6163

6264
From `.cursor/rules/`:
65+
6366
- Use **Bun** runtime, not Node.js (`bun test`, `bun run`, etc.)
6467
- **No default exports** (use named exports)
6568
- **No enums** — use `as const` objects instead

src/cli/groups/bootstrap.ts

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,28 @@ export function addBootstrapCommands(program: Command): void {
3131
.option("--skip-forms", "Skip the forms processing step", false)
3232
.option("--force", "Reprocess all items, ignoring processed state", false)
3333
.action(async (options) => {
34-
await runCommand(async () => {
35-
if (!options.skipDownload) {
36-
for (const config of Object.values(BULK_DOWNLOADS)) {
37-
const task = new BootstrapDownloadTask(config);
38-
await runTasks(task);
34+
await runCommand(
35+
async () => {
36+
if (!options.skipDownload) {
37+
for (const config of Object.values(BULK_DOWNLOADS)) {
38+
const task = new BootstrapDownloadTask(config);
39+
await runTasks(task);
40+
}
3941
}
40-
}
4142

42-
if (!options.skipIngest) {
43-
const cikWf = pipe([new FetchAllCikNamesTask(), new StoreCikNamesTask()]);
44-
await runWorkflow(cikWf);
45-
await runTasks(new BootstrapSubmissionsTask({ force: options.force }));
46-
await runTasks(new BootstrapCompanyFactsTask({ force: options.force }));
47-
}
43+
if (!options.skipIngest) {
44+
const cikWf = pipe([new FetchAllCikNamesTask(), new StoreCikNamesTask()]);
45+
await runWorkflow(cikWf);
46+
await runTasks(new BootstrapSubmissionsTask({ force: options.force }));
47+
await runTasks(new BootstrapCompanyFactsTask({ force: options.force }));
48+
}
4849

49-
if (!options.skipForms) {
50-
await runTasks(new UpdateAllFormsTask({ form: ["D", "C"], force: options.force }));
51-
}
52-
});
50+
if (!options.skipForms) {
51+
await runTasks(new UpdateAllFormsTask({ form: ["D", "C"], force: options.force }));
52+
}
53+
},
54+
{ force: options.force }
55+
);
5356
});
5457

5558
bootstrap
@@ -85,32 +88,35 @@ export function addBootstrapCommands(program: Command): void {
8588
.description("Ingest pre-downloaded SEC data (submissions, facts, cik-names, or all)")
8689
.option("--force", "Reprocess all items, ignoring processed state", false)
8790
.action(async (domain: string | undefined, options) => {
88-
await runCommand(async () => {
89-
const target = domain ?? "all";
91+
await runCommand(
92+
async () => {
93+
const target = domain ?? "all";
9094

91-
if (target === "cik-names" || target === "all") {
92-
const wf = pipe([new FetchAllCikNamesTask(), new StoreCikNamesTask()]);
93-
await runWorkflow(wf);
94-
}
95+
if (target === "cik-names" || target === "all") {
96+
const wf = pipe([new FetchAllCikNamesTask(), new StoreCikNamesTask()]);
97+
await runWorkflow(wf);
98+
}
9599

96-
if (target === "submissions" || target === "all") {
97-
await runTasks(new BootstrapSubmissionsTask({ force: options.force }));
98-
}
100+
if (target === "submissions" || target === "all") {
101+
await runTasks(new BootstrapSubmissionsTask({ force: options.force }));
102+
}
99103

100-
if (target === "facts" || target === "all") {
101-
await runTasks(new BootstrapCompanyFactsTask({ force: options.force }));
102-
}
104+
if (target === "facts" || target === "all") {
105+
await runTasks(new BootstrapCompanyFactsTask({ force: options.force }));
106+
}
103107

104-
if (
105-
target !== "all" &&
106-
target !== "submissions" &&
107-
target !== "facts" &&
108-
target !== "cik-names"
109-
) {
110-
throw new Error(
111-
`Invalid domain "${target}". Must be submissions, facts, cik-names, or all.`
112-
);
113-
}
114-
});
108+
if (
109+
target !== "all" &&
110+
target !== "submissions" &&
111+
target !== "facts" &&
112+
target !== "cik-names"
113+
) {
114+
throw new Error(
115+
`Invalid domain "${target}". Must be submissions, facts, cik-names, or all.`
116+
);
117+
}
118+
},
119+
{ force: options.force }
120+
);
115121
});
116122
}

src/cli/groups/sync.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,18 @@ export function addSyncCommand(program: Command): void {
1515
.option("--forms <types>", "Comma-separated form types to process", "D,C,1-A,1-K,1-Z")
1616
.option("--force", "Reprocess all items, ignoring processed state", false)
1717
.action(async (options) => {
18-
await runCommand(async () => {
19-
const indexFlow = pipe([new FetchDailyIndexTask({}), new StoreCikLastUpdatedTask()]);
20-
await runTasks(indexFlow);
18+
await runCommand(
19+
async () => {
20+
const indexFlow = pipe([new FetchDailyIndexTask({}), new StoreCikLastUpdatedTask()]);
21+
await runTasks(indexFlow);
2122

22-
await runTasks(new UpdateAllSubmissionsTask({ force: options.force }));
23-
await runTasks(new UpdateAllCompanyFactsTask({ force: options.force }));
23+
await runTasks(new UpdateAllSubmissionsTask({ force: options.force }));
24+
await runTasks(new UpdateAllCompanyFactsTask({ force: options.force }));
2425

25-
const formTypes = (options.forms as string).split(",");
26-
await runTasks(new UpdateAllFormsTask({ form: formTypes, force: options.force }));
27-
});
26+
const formTypes = (options.forms as string).split(",");
27+
await runTasks(new UpdateAllFormsTask({ form: formTypes, force: options.force }));
28+
},
29+
{ force: options.force }
30+
);
2831
});
2932
}

src/cli/groups/update.ts

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,38 @@ export function addUpdateCommands(program: Command): void {
1515
.description("Update all submissions for all companies")
1616
.option("--force", "Reprocess all items, ignoring processed state", false)
1717
.action(async (options) => {
18-
await runCommand(async () => {
19-
await runTasks(new UpdateAllSubmissionsTask({ force: options.force }));
20-
});
18+
await runCommand(
19+
async () => {
20+
await runTasks(new UpdateAllSubmissionsTask({ force: options.force }));
21+
},
22+
{ force: options.force }
23+
);
2124
});
2225

2326
update
2427
.command("facts")
2528
.description("Update all company facts")
2629
.option("--force", "Reprocess all items, ignoring processed state", false)
2730
.action(async (options) => {
28-
await runCommand(async () => {
29-
await runTasks(new UpdateAllCompanyFactsTask({ force: options.force }));
30-
});
31+
await runCommand(
32+
async () => {
33+
await runTasks(new UpdateAllCompanyFactsTask({ force: options.force }));
34+
},
35+
{ force: options.force }
36+
);
3137
});
3238

3339
update
3440
.command("forms <types>")
3541
.description("Update forms for all companies (comma-separated form types)")
3642
.option("--force", "Reprocess all items, ignoring processed state", false)
3743
.action(async (types: string, options) => {
38-
await runCommand(async () => {
39-
const formTypes = types.split(",");
40-
await runTasks(new UpdateAllFormsTask({ form: formTypes, force: options.force }));
41-
});
44+
await runCommand(
45+
async () => {
46+
const formTypes = types.split(",");
47+
await runTasks(new UpdateAllFormsTask({ form: formTypes, force: options.force }));
48+
},
49+
{ force: options.force }
50+
);
4251
});
4352
}

src/cli/runCommand.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { isDryRun } from "./isDryRun";
33

44
export interface RunCommandOptions {
55
readonly onError?: (error: unknown) => void;
6+
readonly force?: boolean;
67
}
78

89
export async function runCommand(
@@ -11,7 +12,8 @@ export async function runCommand(
1112
): Promise<number> {
1213
try {
1314
if (isDryRun()) {
14-
console.log(statusMessage("info", "Dry run — no data will be written"));
15+
const suffix = options?.force ? " (force — all items will be reprocessed)" : "";
16+
console.log(statusMessage("info", `Dry run${suffix} — no data will be written`));
1517
}
1618
await action();
1719
process.exitCode = 0;

src/task/facts/BootstrapCompanyFactsTask.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { PROCESSED_FACTS_REPOSITORY_TOKEN } from "../../storage/processing/Proce
1313
import { SEC_RAW_DATA_FOLDER } from "../../config/tokens";
1414
import { todayYYYYdMMdDD } from "../../util/dataCleaningUtils";
1515
import { fetchAndStoreCompanyFacts } from "./fetchAndStoreCompanyFacts";
16+
import { isDryRun } from "../../cli/isDryRun";
1617

1718
export type BootstrapCompanyFactsTaskInput = {
1819
readonly force?: boolean;
@@ -71,6 +72,14 @@ export class BootstrapCompanyFactsTask extends Task<
7172
ciksToProcess = ciks.filter((cik) => !processedSet.has(cik));
7273
}
7374

75+
if (isDryRun()) {
76+
const label = input.force ? "all" : "unprocessed";
77+
console.log(
78+
`Would bootstrap ${ciksToProcess.length} ${label} CIK company facts (${ciks.length} total)`
79+
);
80+
return { success: true };
81+
}
82+
7483
if (ciksToProcess.length) {
7584
const wf = context.own(new Workflow());
7685
const loop = wf.map({ concurrencyLimit: 2 });

src/task/facts/UpdateAllCompanyFactsTask.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
type ProcessedFacts,
1414
} from "../../storage/processing/ProcessedFactsSchema";
1515
import { fetchAndStoreCompanyFacts } from "./fetchAndStoreCompanyFacts";
16+
import { isDryRun } from "../../cli/isDryRun";
1617

1718
export type UpdateAllCompanyFactsTaskInput = {
1819
readonly force?: boolean;
@@ -77,6 +78,19 @@ export class UpdateAllCompanyFactsTask extends Task<
7778
}
7879
}
7980

81+
if (isDryRun()) {
82+
if (input.force) {
83+
console.log(
84+
`Would update ${needsUpdating.length} company facts (force — reprocessing all)`
85+
);
86+
} else {
87+
console.log(
88+
`Would update ${needsUpdating.length} changed and ${needsProcessing.length} new company facts`
89+
);
90+
}
91+
return { success: true };
92+
}
93+
8094
if (needsUpdating.length) {
8195
const wf = context.own(new Workflow());
8296
const loop = wf.map({ concurrencyLimit: 1 });

src/task/forms/UpdateAllFormsTask.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
PROCESSED_FILINGS_REPOSITORY_TOKEN,
1616
} from "../../storage/processing/ProcessedFilingsSchema";
1717
import { ProcessAccessionDocFormTask } from "./ProcessAccessionDocFormTask";
18+
import { isDryRun } from "../../cli/isDryRun";
1819

1920
export type UpdateAllFormsTaskInput = {
2021
readonly form: string[];
@@ -85,6 +86,15 @@ export class UpdateAllFormsTask extends Task<UpdateAllFormsTaskInput, UpdateAllF
8586
}
8687
}
8788

89+
if (isDryRun()) {
90+
const forms = [...formSet].join(", ");
91+
const label = input.force ? "all" : "unprocessed";
92+
console.log(
93+
`Would process ${formsToProcess.length} ${label} filings for forms: ${forms}`
94+
);
95+
return { success: true };
96+
}
97+
8898
if (formsToProcess.length) {
8999
const wf = context.own(new Workflow());
90100
const loop = wf.map({ concurrencyLimit: 10 });

src/task/submissions/BootstrapSubmissionsTask.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { resolve } from "node:path";
1212
import { PROCESSED_SUBMISSIONS_REPOSITORY_TOKEN } from "../../storage/processing/ProcessedSubmissionsSchema";
1313
import { SEC_RAW_DATA_FOLDER } from "../../config/tokens";
1414
import { fetchAndStoreSubmission } from "./fetchAndStoreSubmission";
15+
import { isDryRun } from "../../cli/isDryRun";
1516

1617
export type BootstrapSubmissionsTaskInput = {
1718
readonly force?: boolean;
@@ -72,6 +73,14 @@ export class BootstrapSubmissionsTask extends Task<
7273
ciksToProcess = ciks.filter((cik) => !processedSet.has(cik));
7374
}
7475

76+
if (isDryRun()) {
77+
const label = input.force ? "all" : "unprocessed";
78+
console.log(
79+
`Would bootstrap ${ciksToProcess.length} ${label} CIK submissions (${ciks.length} total)`
80+
);
81+
return { success: true };
82+
}
83+
7584
if (ciksToProcess.length) {
7685
const wf = context.own(new Workflow());
7786
const loop = wf.map({ concurrencyLimit: 2 });

src/task/submissions/UpdateAllSubmissionsTask.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
type ProcessedSubmissions,
1616
} from "../../storage/processing/ProcessedSubmissionsSchema";
1717
import { fetchAndStoreSubmission } from "./fetchAndStoreSubmission";
18+
import { isDryRun } from "../../cli/isDryRun";
1819

1920
export type UpdateAllSubmissionsTaskInput = {
2021
readonly force?: boolean;
@@ -81,6 +82,19 @@ export class UpdateAllSubmissionsTask extends Task<
8182
}
8283
}
8384

85+
if (isDryRun()) {
86+
if (input.force) {
87+
console.log(
88+
`Would update ${needsUpdating.length} submissions (force — reprocessing all)`
89+
);
90+
} else {
91+
console.log(
92+
`Would update ${needsUpdating.length} changed and ${needsInitialProcessing.length} new submissions`
93+
);
94+
}
95+
return { success: true };
96+
}
97+
8498
if (needsUpdating.length) {
8599
const wf = context.own(new Workflow());
86100
const loop = wf.map({ concurrencyLimit: 1 });

0 commit comments

Comments
 (0)