Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions apps/mesh/migrations/109-channels.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { type Kysely, sql } from "kysely";

/**
* Org-chat channels. Each row is one configured chat-platform integration
* (Microsoft Teams, Discord, ...) that registers a synthetic bot org-member.
* Inbound platform messages run a Decopilot agent turn and the reply is posted
* back to the platform.
*
* Mirrors the AI-provider-keys shape: org-scoped, secrets vault-encrypted into a
* single opaque blob (`encrypted_credentials`), never columnized. `metadata`
* carries only NON-secret display info (bot display name, etc.).
*
* Lifecycle: a channel is created as a `draft` (no credentials yet) so the
* inbound webhook URL — which embeds the channel id — exists before the admin
* configures the platform portal. `CHANNEL_TEST` flips it to `active`.
*/
export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable("channels")
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("organization_id", "text", (col) =>
col.notNull().references("organization.id").onDelete("cascade"),
)
// 'teams' | 'discord' — enforced at app level, not DB level.
.addColumn("channel_type", "text", (col) => col.notNull())
.addColumn("label", "text", (col) => col.notNull())
// Vault-encrypted JSON blob of the per-platform secret credentials.
// Nullable: a draft channel has no credentials until the configure step.
.addColumn("encrypted_credentials", "text")
// virtual_mcp_id of the Decopilot agent the bot runs. Nullable: bound during
// setup; runChannelTurn falls back to the org default home agent when unset.
.addColumn("agent_id", "text")
// Synthetic bot org-member (user.id). Managed by the app (no FK cascade so
// the bot user/member teardown stays explicit in CHANNEL_DELETE).
.addColumn("bot_user_id", "text", (col) => col.notNull())
// JSON, non-secret display metadata (e.g. bot display name surfaced by TEST).
.addColumn("metadata", "text")
// 'draft' | 'active' | 'error' | 'disabled'
.addColumn("status", "text", (col) => col.notNull().defaultTo("draft"))
.addColumn("created_by", "text", (col) => col.notNull())
.addColumn("created_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.execute();

await db.schema
.createIndex("idx_channels_org")
.on("channels")
.column("organization_id")
.execute();

await db.schema
.createIndex("idx_channels_org_type")
.on("channels")
.columns(["organization_id", "channel_type"])
.execute();
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable("channels").execute();
}
2 changes: 2 additions & 0 deletions apps/mesh/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ import * as migration105orgfs from "./105-org-fs.ts";
import * as migration106automationtools from "./106-automation-tools.ts";
import * as migration107orgfspublicorg from "./107-org-fs-public-org.ts";
import * as migration108automationmaxagentsteps from "./108-automation-max-agent-steps.ts";
import * as migration109channels from "./109-channels.ts";

/**
* Core migrations for the Mesh application.
Expand Down Expand Up @@ -236,6 +237,7 @@ const migrations: Record<string, Migration> = {
"106-automation-tools": migration106automationtools,
"107-org-fs-public-org": migration107orgfspublicorg,
"108-automation-max-agent-steps": migration108automationmaxagentsteps,
"109-channels": migration109channels,
};

export default migrations;
5 changes: 5 additions & 0 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ import {
sweepOrphanedWorkflows,
} from "../dispatch-queue/dbos-orphan-recovery";
import { backfillStudioPackForAllOrgs } from "../auth/install-studio-pack-workflow";
import { setChannelRuntime } from "../channels/runtime";
import { DBOS } from "@dbos-inc/dbos-sdk";
import {
dispatchRunAndWait,
Expand Down Expand Up @@ -1424,6 +1425,10 @@ export async function createApp(options: CreateAppOptions = {}) {
meshContextFactory: automationContextFactory,
});

// Channel inbound webhooks build a bot-scoped context the same way
// automations do (background context factory, no HTTP session).
setChannelRuntime({ meshContextFactory: automationContextFactory });

// Same deps shape as automations — the per-thread gate calls
// `dispatchRunAndWait` once the queue lets a message through. Wiring
// happens before `DBOS.launch()` for the same reasons.
Expand Down
165 changes: 165 additions & 0 deletions apps/mesh/src/api/routes/channel-webhooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* Channel Inbound Webhook Endpoints
*
* Receives platform callbacks for configured chat channels and drives the
* conversational loop: verify the platform signature, ACK within the platform
* deadline, then (asynchronously) run a Decopilot agent turn and post the reply
* back into the conversation.
*
* Routes (mounted under /api/:org by org-scoped.ts):
* POST /:channelId/teams
* POST /:channelId/discord
*
* Auth is per-platform: Discord verifies an Ed25519 signature against the
* stored public key; Teams verifies the Bot Framework JWT. The :channelId path
* segment + resolved org locate the channel; the signature authenticates.
*/

import { createHash } from "node:crypto";
import { Hono, type Context } from "hono";
import { bodyLimit } from "hono/body-limit";
import { getChannelAdapter } from "@/channels/registry";
import { runChannelTurn } from "@/channels/run-channel-turn";
import type { ChannelType } from "@/storage/types";
import type { Env } from "../hono-env";

const MAX_BODY_SIZE = 1_048_576; // 1MB

/** Deterministic, stable thread id for a channel conversation. */
function threadIdFor(channelId: string, conversationKey: string): string {
const hash = createHash("sha1")
.update(`${channelId}:${conversationKey}`)
.digest("hex")
.slice(0, 24);
return `thrd_chan_${hash}`;
}

export function createChannelWebhookRoutes() {
const app = new Hono<Env>();

const limit = bodyLimit({
maxSize: MAX_BODY_SIZE,
onError: (c) => c.json({ error: "Payload too large" }, 413),
});

const handle = async (c: Context<Env>, channelType: ChannelType) => {
const ctx = c.get("meshContext");
if (!ctx?.organization) {
return c.json({ error: "Organization context missing" }, 500);
}
const orgId = ctx.organization.id;
const channelId = c.req.param("channelId");
if (!channelId) {
return c.json({ error: "channelId required" }, 400);
}

// Load + decrypt the channel credentials (needed for signature verify).
let resolved: Awaited<ReturnType<typeof ctx.storage.channels.resolve>>;
try {
resolved = await ctx.storage.channels.resolve(channelId, orgId);
} catch {
return c.json({ error: "Channel not found" }, 404);
}
const { info, credentials } = resolved;
if (info.channelType !== channelType) {
return c.json({ error: "Channel type mismatch" }, 404);
}
if (!credentials) {
return c.json({ error: "Channel not configured" }, 409);
}

const adapter = getChannelAdapter(channelType);

// Signatures are computed over the raw bytes — read them before parsing.
const rawBody = await c.req.arrayBuffer();
const verified = await adapter.verifySignature({
rawBody,
headers: c.req.raw.headers,
credentials,
});
if (!verified) {
return c.json({ error: "Signature verification failed" }, 401);
}

let payload: unknown;
try {
payload = JSON.parse(new TextDecoder().decode(rawBody));
} catch {
payload = null;
}

const parsed = adapter.parseInbound(payload);
if (parsed.kind === "ack") {
return parsed.response !== undefined
? c.json(parsed.response as object, 200)
: c.body(null, 200);
}

// It's a real message. If the channel isn't fully set up we can't run the
// agent — acknowledge so the platform doesn't retry, and stop.
const agentId = info.agentId;
if (!agentId) {
console.warn(
`[channel-webhook] channel ${channelId} has no agent bound — skipping`,
);
return ackResponse(c, parsed.ackResponse);
}

const { message } = parsed;
const threadId = threadIdFor(channelId, message.conversationKey);

// Run the agent turn AFTER acking (platform deadlines are short; the agent
// loop can take minutes). Fire-and-forget with error logging + best-effort
// error reply, mirroring the automation dispatch fire-and-forget pattern.
void (async () => {
try {
const { replyText } = await runChannelTurn({
organizationId: orgId,
botUserId: info.botUserId,
agentId,
threadId,
userText: message.text,
sender: {
platform: channelType,
senderId: message.senderId,
senderName: message.senderName,
},
});
await adapter.sendOutbound({
credentials,
conversationRef: message.conversationRef,
text:
replyText ||
"I wasn't able to produce a response. Please try again.",
});
} catch (err) {
console.error(
`[channel-webhook] turn failed for channel ${channelId}:`,
err instanceof Error ? err.message : err,
);
try {
await adapter.sendOutbound({
credentials,
conversationRef: message.conversationRef,
text: "Something went wrong while handling your message.",
});
} catch {
// best-effort
}
}
})();

return ackResponse(c, parsed.ackResponse);
};

app.post("/:channelId/teams", limit, (c) => handle(c, "teams"));
app.post("/:channelId/discord", limit, (c) => handle(c, "discord"));

return app;
}

function ackResponse(c: Context<Env>, response: unknown) {
return response !== undefined
? c.json(response as object, 200)
: c.body(null, 200);
}
2 changes: 1 addition & 1 deletion apps/mesh/src/api/routes/decopilot/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ function toModelInfo(resolved: Awaited<ReturnType<typeof resolveTier>>) {
* can compose a ModelsConfig the same way HTTP chat does, instead of
* duplicating the tier-resolution + tryResolve fallback logic.
*/
async function resolvePerRequestModels(
export async function resolvePerRequestModels(
ctx: StudioContext,
tier: SimpleModeTier | undefined,
harnessId: HarnessId | null | undefined,
Expand Down
2 changes: 2 additions & 0 deletions apps/mesh/src/api/routes/org-scoped.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { resolveOrgFromPath } from "../middleware/resolve-org-from-path";
import type { Env } from "../hono-env";

import { createAutomationWebhookRoutes } from "./automation-webhooks";
import { createChannelWebhookRoutes } from "./channel-webhooks";
import { createDecoSitesOrgRoutes } from "./deco-sites";
import { createDevAssetsRoutes } from "./dev-assets";
import { createDownstreamTokenRoutes } from "./downstream-token";
Expand Down Expand Up @@ -97,6 +98,7 @@ export const createOrgScopedApi = (deps: OrgScopedDeps) => {
}),
); // /api/:org/trigger-callback
app.route("/webhooks", createAutomationWebhookRoutes()); // /api/:org/webhooks/:triggerId[/:token]
app.route("/channels", createChannelWebhookRoutes()); // /api/:org/channels/:channelId/{teams,discord}
app.route(
"/",
createLinkIngestRoutes({
Expand Down
93 changes: 93 additions & 0 deletions apps/mesh/src/channels/adapters/adapters.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { describe, expect, it } from "bun:test";
import { discordAdapter } from "./discord";
import { teamsAdapter } from "./teams";

describe("discordAdapter", () => {
it("answers a PING with a PONG and does no work", () => {
expect(discordAdapter.parseInbound({ type: 1 })).toEqual({
kind: "ack",
response: { type: 1 },
});
});

it("parses a slash command into a deferred message", () => {
const parsed = discordAdapter.parseInbound({
type: 2,
id: "i1",
token: "tok",
application_id: "app1",
channel_id: "chan1",
member: { user: { id: "u1", global_name: "Ada" } },
data: { name: "ask", options: [{ name: "message", value: "hi there" }] },
});
expect(parsed.kind).toBe("message");
if (parsed.kind !== "message") return;
expect(parsed.message.text).toBe("hi there");
expect(parsed.message.senderName).toBe("Ada");
expect(parsed.message.conversationKey).toBe("chan1");
expect(parsed.message.conversationRef.interactionToken).toBe("tok");
expect(parsed.ackResponse).toEqual({ type: 5 });
});

it("rejects a bad signature without throwing", async () => {
const ok = await discordAdapter.verifySignature({
rawBody: new TextEncoder().encode("{}").buffer,
headers: new Headers({
"x-signature-ed25519": "00".repeat(64),
"x-signature-timestamp": "1",
}),
credentials: {
publicKey: "aa".repeat(32),
applicationId: "a",
botToken: "b",
},
});
expect(ok).toBe(false);
});

it("masks the bot token, leaving the application id visible", () => {
const masked = discordAdapter.maskCredentials({
applicationId: "12345",
publicKey: "abcdef0123456789",
botToken: "supersecrettoken",
});
expect(masked.applicationId).toBe("12345");
expect(masked.botToken).toMatch(/oken$/);
expect(masked.botToken).not.toContain("supersecret");
});
});

describe("teamsAdapter", () => {
it("acks non-message activities", () => {
expect(teamsAdapter.parseInbound({ type: "conversationUpdate" })).toEqual({
kind: "ack",
});
});

it("parses a message activity with its conversation reference", () => {
const parsed = teamsAdapter.parseInbound({
type: "message",
text: "hello bot",
from: { id: "29:abc", name: "Grace" },
serviceUrl: "https://smba.trafficmanager.net/teams/",
conversation: { id: "conv-1" },
});
expect(parsed.kind).toBe("message");
if (parsed.kind !== "message") return;
expect(parsed.message.text).toBe("hello bot");
expect(parsed.message.senderName).toBe("Grace");
expect(parsed.message.conversationKey).toBe("conv-1");
expect(parsed.message.conversationRef.serviceUrl).toBe(
"https://smba.trafficmanager.net/teams/",
);
});

it("rejects an inbound request with no bearer token", async () => {
const ok = await teamsAdapter.verifySignature({
rawBody: new TextEncoder().encode("{}").buffer,
headers: new Headers(),
credentials: { appId: "a", appPassword: "p" },
});
expect(ok).toBe(false);
});
});
Loading
Loading