Skip to content

Commit baee05d

Browse files
authored
Add --force option to reprocess items ignoring processed state (#76)
1 parent 51005e0 commit baee05d

8 files changed

Lines changed: 135 additions & 83 deletions

File tree

src/cli/groups/bootstrap.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export function addBootstrapCommands(program: Command): void {
2929
.option("--skip-download", "Skip the bulk download step", false)
3030
.option("--skip-ingest", "Skip the ingest step", false)
3131
.option("--skip-forms", "Skip the forms processing step", false)
32+
.option("--force", "Reprocess all items, ignoring processed state", false)
3233
.action(async (options) => {
3334
await runCommand(async () => {
3435
if (!options.skipDownload) {
@@ -41,12 +42,12 @@ export function addBootstrapCommands(program: Command): void {
4142
if (!options.skipIngest) {
4243
const cikWf = pipe([new FetchAllCikNamesTask(), new StoreCikNamesTask()]);
4344
await runWorkflow(cikWf);
44-
await runTasks(new BootstrapSubmissionsTask());
45-
await runTasks(new BootstrapCompanyFactsTask());
45+
await runTasks(new BootstrapSubmissionsTask({ force: options.force }));
46+
await runTasks(new BootstrapCompanyFactsTask({ force: options.force }));
4647
}
4748

4849
if (!options.skipForms) {
49-
await runTasks(new UpdateAllFormsTask({ form: ["D", "C"] }));
50+
await runTasks(new UpdateAllFormsTask({ form: ["D", "C"], force: options.force }));
5051
}
5152
});
5253
});
@@ -82,7 +83,8 @@ export function addBootstrapCommands(program: Command): void {
8283
bootstrap
8384
.command("ingest [domain]")
8485
.description("Ingest pre-downloaded SEC data (submissions, facts, cik-names, or all)")
85-
.action(async (domain?: string) => {
86+
.option("--force", "Reprocess all items, ignoring processed state", false)
87+
.action(async (domain: string | undefined, options) => {
8688
await runCommand(async () => {
8789
const target = domain ?? "all";
8890

@@ -92,11 +94,11 @@ export function addBootstrapCommands(program: Command): void {
9294
}
9395

9496
if (target === "submissions" || target === "all") {
95-
await runTasks(new BootstrapSubmissionsTask());
97+
await runTasks(new BootstrapSubmissionsTask({ force: options.force }));
9698
}
9799

98100
if (target === "facts" || target === "all") {
99-
await runTasks(new BootstrapCompanyFactsTask());
101+
await runTasks(new BootstrapCompanyFactsTask({ force: options.force }));
100102
}
101103

102104
if (

src/cli/groups/sync.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@ export function addSyncCommand(program: Command): void {
1313
.command("sync")
1414
.description("Daily sync — fetch index, update submissions, facts, and forms")
1515
.option("--forms <types>", "Comma-separated form types to process", "D,C,1-A,1-K,1-Z")
16+
.option("--force", "Reprocess all items, ignoring processed state", false)
1617
.action(async (options) => {
1718
await runCommand(async () => {
1819
const indexFlow = pipe([new FetchDailyIndexTask({}), new StoreCikLastUpdatedTask()]);
1920
await runTasks(indexFlow);
2021

21-
await runTasks(new UpdateAllSubmissionsTask());
22-
await runTasks(new UpdateAllCompanyFactsTask());
22+
await runTasks(new UpdateAllSubmissionsTask({ force: options.force }));
23+
await runTasks(new UpdateAllCompanyFactsTask({ force: options.force }));
2324

2425
const formTypes = (options.forms as string).split(",");
25-
await runTasks(new UpdateAllFormsTask({ form: formTypes }));
26+
await runTasks(new UpdateAllFormsTask({ form: formTypes, force: options.force }));
2627
});
2728
});
2829
}

src/cli/groups/update.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,31 @@ export function addUpdateCommands(program: Command): void {
1313
update
1414
.command("submissions")
1515
.description("Update all submissions for all companies")
16-
.action(async () => {
16+
.option("--force", "Reprocess all items, ignoring processed state", false)
17+
.action(async (options) => {
1718
await runCommand(async () => {
18-
await runTasks(new UpdateAllSubmissionsTask());
19+
await runTasks(new UpdateAllSubmissionsTask({ force: options.force }));
1920
});
2021
});
2122

2223
update
2324
.command("facts")
2425
.description("Update all company facts")
25-
.action(async () => {
26+
.option("--force", "Reprocess all items, ignoring processed state", false)
27+
.action(async (options) => {
2628
await runCommand(async () => {
27-
await runTasks(new UpdateAllCompanyFactsTask());
29+
await runTasks(new UpdateAllCompanyFactsTask({ force: options.force }));
2830
});
2931
});
3032

3133
update
3234
.command("forms <types>")
3335
.description("Update forms for all companies (comma-separated form types)")
34-
.action(async (types: string) => {
36+
.option("--force", "Reprocess all items, ignoring processed state", false)
37+
.action(async (types: string, options) => {
3538
await runCommand(async () => {
3639
const formTypes = types.split(",");
37-
await runTasks(new UpdateAllFormsTask({ form: formTypes }));
40+
await runTasks(new UpdateAllFormsTask({ form: formTypes, force: options.force }));
3841
});
3942
});
4043
}

src/task/facts/BootstrapCompanyFactsTask.ts

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import { SEC_RAW_DATA_FOLDER } from "../../config/tokens";
1414
import { todayYYYYdMMdDD } from "../../util/dataCleaningUtils";
1515
import { fetchAndStoreCompanyFacts } from "./fetchAndStoreCompanyFacts";
1616

17-
export type BootstrapCompanyFactsTaskInput = {};
17+
export type BootstrapCompanyFactsTaskInput = {
18+
readonly force?: boolean;
19+
};
1820

1921
export type BootstrapCompanyFactsTaskOutput = {
2022
success: boolean;
@@ -55,22 +57,27 @@ export class BootstrapCompanyFactsTask extends Task<
5557
}
5658
}
5759

58-
const processedFactsRepo = globalServiceRegistry.get(PROCESSED_FACTS_REPOSITORY_TOKEN);
59-
const allProcessedFacts = (await processedFactsRepo.getAll()) ?? [];
60-
const processedSet = new Set<number>();
61-
for (const pf of allProcessedFacts) {
62-
processedSet.add(pf.cik);
63-
}
60+
let ciksToProcess: number[];
6461

65-
const unprocessedCiks = ciks.filter((cik) => !processedSet.has(cik));
62+
if (input.force) {
63+
ciksToProcess = ciks;
64+
} else {
65+
const processedFactsRepo = globalServiceRegistry.get(PROCESSED_FACTS_REPOSITORY_TOKEN);
66+
const allProcessedFacts = (await processedFactsRepo.getAll()) ?? [];
67+
const processedSet = new Set<number>();
68+
for (const pf of allProcessedFacts) {
69+
processedSet.add(pf.cik);
70+
}
71+
ciksToProcess = ciks.filter((cik) => !processedSet.has(cik));
72+
}
6673

67-
if (unprocessedCiks.length) {
74+
if (ciksToProcess.length) {
6875
const wf = context.own(new Workflow());
6976
const loop = wf.map({ concurrencyLimit: 2 });
7077
loop.pipe(fetchAndStoreCompanyFacts);
7178
loop.endMap();
7279
await wf.run({
73-
cik: unprocessedCiks,
80+
cik: ciksToProcess,
7481
date: todayYYYYdMMdDD(),
7582
});
7683
}

src/task/facts/UpdateAllCompanyFactsTask.ts

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import {
1414
} from "../../storage/processing/ProcessedFactsSchema";
1515
import { fetchAndStoreCompanyFacts } from "./fetchAndStoreCompanyFacts";
1616

17-
export type UpdateAllCompanyFactsTaskInput = {};
17+
export type UpdateAllCompanyFactsTaskInput = {
18+
readonly force?: boolean;
19+
};
1820

1921
export type UpdateAllCompanyFactsTaskOutput = {
2022
success: boolean;
@@ -49,23 +51,30 @@ export class UpdateAllCompanyFactsTask extends Task<
4951
{},
5052
{ orderBy: [{ column: "last_update", direction: "DESC" }] }
5153
)) ?? [];
52-
const allProcessedFacts = (await processedFactsRepo.getAll()) ?? [];
53-
54-
const processedMap = new Map<number, ProcessedFacts>();
55-
for (const pf of allProcessedFacts) {
56-
processedMap.set(pf.cik, pf);
57-
}
5854

5955
const needsUpdating: { cik: number; last_update: string }[] = [];
6056
const needsProcessing: { cik: number; last_update: string }[] = [];
6157

62-
for (const clu of allCikUpdates) {
63-
const pf = processedMap.get(clu.cik);
64-
if (!pf) {
65-
needsProcessing.push({ cik: clu.cik, last_update: clu.last_update });
66-
} else if (clu.last_update > pf.last_processed) {
58+
if (input.force) {
59+
for (const clu of allCikUpdates) {
6760
needsUpdating.push({ cik: clu.cik, last_update: clu.last_update });
6861
}
62+
} else {
63+
const allProcessedFacts = (await processedFactsRepo.getAll()) ?? [];
64+
65+
const processedMap = new Map<number, ProcessedFacts>();
66+
for (const pf of allProcessedFacts) {
67+
processedMap.set(pf.cik, pf);
68+
}
69+
70+
for (const clu of allCikUpdates) {
71+
const pf = processedMap.get(clu.cik);
72+
if (!pf) {
73+
needsProcessing.push({ cik: clu.cik, last_update: clu.last_update });
74+
} else if (clu.last_update > pf.last_processed) {
75+
needsUpdating.push({ cik: clu.cik, last_update: clu.last_update });
76+
}
77+
}
6978
}
7079

7180
if (needsUpdating.length) {

src/task/forms/UpdateAllFormsTask.ts

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import {
1717
import { ProcessAccessionDocFormTask } from "./ProcessAccessionDocFormTask";
1818

1919
export type UpdateAllFormsTaskInput = {
20-
form: string[];
20+
readonly form: string[];
21+
readonly force?: boolean;
2122
};
2223

2324
export type UpdateAllFormsTaskOutput = {
@@ -47,40 +48,53 @@ export class UpdateAllFormsTask extends Task<UpdateAllFormsTaskInput, UpdateAllF
4748

4849
const formSet = new Set(input.form);
4950

50-
// Build a set of already-processed (cik:accession_number) keys per form
51-
const processedSet = new Set<string>();
52-
for (const form of formSet) {
53-
const processed = await processedFilingsRepo.query({ form });
54-
if (processed) {
55-
for (const pf of processed) {
56-
processedSet.add(`${pf.cik}:${pf.accession_number}`);
51+
const formsToProcess: Filing[] = [];
52+
53+
if (input.force) {
54+
// Reprocess all filings for requested forms
55+
for (const form of formSet) {
56+
const filings = await filingRepo.query({ form });
57+
if (filings) {
58+
for (const f of filings) {
59+
formsToProcess.push(f);
60+
}
61+
}
62+
}
63+
} else {
64+
// Build a set of already-processed (cik:accession_number) keys per form
65+
const processedSet = new Set<string>();
66+
for (const form of formSet) {
67+
const processed = await processedFilingsRepo.query({ form });
68+
if (processed) {
69+
for (const pf of processed) {
70+
processedSet.add(`${pf.cik}:${pf.accession_number}`);
71+
}
5772
}
5873
}
59-
}
6074

61-
// Query filings per form and collect only unprocessed ones
62-
const missingForms: Filing[] = [];
63-
for (const form of formSet) {
64-
const filings = await filingRepo.query({ form });
65-
if (filings) {
66-
for (const f of filings) {
67-
if (!processedSet.has(`${f.cik}:${f.accession_number}`)) {
68-
missingForms.push(f);
75+
// Query filings per form and collect only unprocessed ones
76+
for (const form of formSet) {
77+
const filings = await filingRepo.query({ form });
78+
if (filings) {
79+
for (const f of filings) {
80+
if (!processedSet.has(`${f.cik}:${f.accession_number}`)) {
81+
formsToProcess.push(f);
82+
}
6983
}
7084
}
7185
}
7286
}
7387

74-
if (missingForms.length) {
88+
if (formsToProcess.length) {
7589
const wf = context.own(new Workflow());
7690
const loop = wf.map({ concurrencyLimit: 10 });
7791
loop.pipe(new ProcessAccessionDocFormTask());
7892
loop.endMap();
7993
await wf.run({
80-
accessionNumber: missingForms.map((f) => f.accession_number),
81-
cik: missingForms.map((f) => f.cik),
82-
form: missingForms.map((f) => f.form!),
83-
fileName: missingForms.map((f) => f.primary_doc.replaceAll(/^(xsl[^\/]+\/)/g, "")),
94+
accessionNumber: formsToProcess.map((f) => f.accession_number),
95+
cik: formsToProcess.map((f) => f.cik),
96+
form: formsToProcess.map((f) => f.form!),
97+
fileName: formsToProcess.map((f) => f.primary_doc.replaceAll(/^(xsl[^\/]+\/)/g, "")),
8498
});
8599
}
86100
return { success: true };

src/task/submissions/BootstrapSubmissionsTask.ts

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import { PROCESSED_SUBMISSIONS_REPOSITORY_TOKEN } from "../../storage/processing
1313
import { SEC_RAW_DATA_FOLDER } from "../../config/tokens";
1414
import { fetchAndStoreSubmission } from "./fetchAndStoreSubmission";
1515

16-
export type BootstrapSubmissionsTaskInput = {};
16+
export type BootstrapSubmissionsTaskInput = {
17+
readonly force?: boolean;
18+
};
1719

1820
export type BootstrapSubmissionsTaskOutput = {
1921
success: boolean;
@@ -54,24 +56,29 @@ export class BootstrapSubmissionsTask extends Task<
5456
}
5557
}
5658

57-
const processedSubmissionsRepo = globalServiceRegistry.get(
58-
PROCESSED_SUBMISSIONS_REPOSITORY_TOKEN
59-
);
60-
const allProcessedSubmissions = (await processedSubmissionsRepo.getAll()) ?? [];
61-
const processedSet = new Set<number>();
62-
for (const ps of allProcessedSubmissions) {
63-
processedSet.add(ps.cik);
64-
}
59+
let ciksToProcess: number[];
6560

66-
const unprocessedCiks = ciks.filter((cik) => !processedSet.has(cik));
61+
if (input.force) {
62+
ciksToProcess = ciks;
63+
} else {
64+
const processedSubmissionsRepo = globalServiceRegistry.get(
65+
PROCESSED_SUBMISSIONS_REPOSITORY_TOKEN
66+
);
67+
const allProcessedSubmissions = (await processedSubmissionsRepo.getAll()) ?? [];
68+
const processedSet = new Set<number>();
69+
for (const ps of allProcessedSubmissions) {
70+
processedSet.add(ps.cik);
71+
}
72+
ciksToProcess = ciks.filter((cik) => !processedSet.has(cik));
73+
}
6774

68-
if (unprocessedCiks.length) {
75+
if (ciksToProcess.length) {
6976
const wf = context.own(new Workflow());
7077
const loop = wf.map({ concurrencyLimit: 2 });
7178
loop.pipe(fetchAndStoreSubmission);
7279
loop.endMap();
7380
await wf.run({
74-
cik: unprocessedCiks,
81+
cik: ciksToProcess,
7582
});
7683
}
7784

src/task/submissions/UpdateAllSubmissionsTask.ts

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ import {
1616
} from "../../storage/processing/ProcessedSubmissionsSchema";
1717
import { fetchAndStoreSubmission } from "./fetchAndStoreSubmission";
1818

19-
export type UpdateAllSubmissionsTaskInput = {};
19+
export type UpdateAllSubmissionsTaskInput = {
20+
readonly force?: boolean;
21+
};
2022

2123
export type UpdateAllSubmissionsTaskOutput = {
2224
success: boolean;
@@ -53,23 +55,30 @@ export class UpdateAllSubmissionsTask extends Task<
5355
{},
5456
{ orderBy: [{ column: "last_update", direction: "DESC" }] }
5557
)) ?? [];
56-
const allProcessedSubmissions = (await processedSubmissionsRepo.getAll()) ?? [];
57-
58-
const processedMap = new Map<number, ProcessedSubmissions>();
59-
for (const ps of allProcessedSubmissions) {
60-
processedMap.set(ps.cik, ps);
61-
}
6258

6359
const needsUpdating: { cik: number; last_update: string }[] = [];
6460
const needsInitialProcessing: { cik: number; last_update: string }[] = [];
6561

66-
for (const clu of allCikUpdates) {
67-
const ps = processedMap.get(clu.cik);
68-
if (!ps) {
69-
needsInitialProcessing.push({ cik: clu.cik, last_update: clu.last_update });
70-
} else if (clu.last_update > ps.last_processed) {
62+
if (input.force) {
63+
for (const clu of allCikUpdates) {
7164
needsUpdating.push({ cik: clu.cik, last_update: clu.last_update });
7265
}
66+
} else {
67+
const allProcessedSubmissions = (await processedSubmissionsRepo.getAll()) ?? [];
68+
69+
const processedMap = new Map<number, ProcessedSubmissions>();
70+
for (const ps of allProcessedSubmissions) {
71+
processedMap.set(ps.cik, ps);
72+
}
73+
74+
for (const clu of allCikUpdates) {
75+
const ps = processedMap.get(clu.cik);
76+
if (!ps) {
77+
needsInitialProcessing.push({ cik: clu.cik, last_update: clu.last_update });
78+
} else if (clu.last_update > ps.last_processed) {
79+
needsUpdating.push({ cik: clu.cik, last_update: clu.last_update });
80+
}
81+
}
7382
}
7483

7584
if (needsUpdating.length) {

0 commit comments

Comments
 (0)