-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathindex.ts
More file actions
122 lines (103 loc) · 4.22 KB
/
index.ts
File metadata and controls
122 lines (103 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import type { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler'
import { StarbasePlugin } from '../../src/plugin'
import type { DataSource } from '../../src/types'
import { createResponse } from '../../src/utils'
import { CronPlugin } from '../cron'
import {
deleteDataSyncTask,
ensureDataSyncTables,
listDataSyncTasks,
runDataSyncTask,
upsertDataSyncTask,
} from './service'
export class DataSyncPlugin extends StarbasePlugin {
public pathPrefix = '/data-sync'
private cronPlugin?: CronPlugin
private dataSource?: DataSource
private config?: StarbaseDBConfiguration
constructor(opts?: { cronPlugin?: CronPlugin }) {
super('starbasedb:data-sync', { requiresAuth: true })
this.cronPlugin = opts?.cronPlugin
}
override async register(app: StarbaseApp): Promise<void> {
app.use(async (c, next) => {
const dataSource = c.get('dataSource') as DataSource
this.dataSource = dataSource
this.config = c.get('config') as StarbaseDBConfiguration
await ensureDataSyncTables(dataSource)
await next()
})
if (this.cronPlugin) {
this.cronPlugin.onEvent(async ({ name, payload }) => {
const eventTaskName =
typeof payload?.taskName === 'string'
? payload.taskName
: undefined
const taskName =
eventTaskName ||
(name.startsWith('data-sync:')
? name.replace('data-sync:', '')
: undefined)
if (!taskName || !this.dataSource || !this.config) {
return
}
try {
await runDataSyncTask(this.dataSource, taskName, this.config)
} catch (error) {
console.error(`Data sync cron run failed for ${taskName}:`, error)
}
})
}
app.get(this.pathPrefix, async (c) => {
const dataSource = c.get('dataSource') as DataSource
const tasks = await listDataSyncTasks(dataSource)
return createResponse({ tasks }, undefined, 200)
})
app.post(this.pathPrefix, async (c) => {
const dataSource = c.get('dataSource') as DataSource
let body: any = {}
try {
body = await c.req.json()
} catch {
return createResponse(undefined, 'Invalid JSON payload', 400)
}
const task = await upsertDataSyncTask(dataSource, {
name: body.name,
sourceTable: body.sourceTable,
targetTable: body.targetTable,
cursorColumn: body.cursorColumn,
sourceSchema: body.sourceSchema,
intervalCron: body.intervalCron,
batchSize: body.batchSize,
})
if (this.cronPlugin) {
await this.cronPlugin.addEvent(
task.cronTab,
`data-sync:${task.name}`,
{ taskName: task.name },
new URL(c.req.url).origin
)
}
return createResponse({ task }, undefined, 201)
})
app.delete(`${this.pathPrefix}/:name`, async (c) => {
const dataSource = c.get('dataSource') as DataSource
const name = c.req.param('name')
const deleted = await deleteDataSyncTask(dataSource, name)
if (!deleted) {
return createResponse(undefined, 'Task not found', 404)
}
if (this.cronPlugin) {
await this.cronPlugin.removeEvent(`data-sync:${name}`)
}
return createResponse({ deleted: true, name }, undefined, 200)
})
app.post(`${this.pathPrefix}/run/:name`, async (c) => {
const dataSource = c.get('dataSource') as DataSource
const config = c.get('config') as StarbaseDBConfiguration
const name = c.req.param('name')
const summary = await runDataSyncTask(dataSource, name, config)
return createResponse({ summary }, undefined, 200)
})
}
}