npm install @convex-dev/workflow
Have you ever wanted to sleep for 7 days within a Convex function? Find yourself in callback hell chaining together function calls through queues? Sick of manual state management and scheduling in long-lived workflows? Convex workflows might just be what you're looking for.
import { WorkflowManager } from "@convex-dev/workflow";
import { components } from "./_generated/server";
export const workflow = new WorkflowManager(components.workflow);
export const exampleWorkflow = workflow.define({
args: {
storageId: v.id("_storage"),
},
handler: async (step, args) => {
const transcription = await step.runAction(
internal.index.computeTranscription,
{ storageId: args.storageId },
);
// Sleep for a month after computing the transcription.
await step.sleep(30 * 24 * 60 * 60 * 1000);
const embedding = await step.runAction(internal.index.computeEmbedding, {
transcription,
});
console.log(embedding);
},
});
This component adds durably executed workflows to Convex. Combine Convex queries, mutations, and actions into long-lived workflows, and the system will always fully execute a workflow to completion.
This component is currently in beta and may have some rough edges. Open a GitHub issue with any feedback or bugs you find.
First, add @convex-dev/workflow
to your Convex project:
npm install @convex-dev/workflow
Then, install the component within your convex/convex.config.ts
file:
// convex/convex.config.ts
import workflow from "@convex-dev/workflow/convex.config";
import { defineApp } from "convex/server";
const app = defineApp();
app.use(workflow);
export default app;
Finally, create a workflow manager within your convex/
folder, and point it
to the installed component:
// convex/index.ts
import { WorkflowManager } from "@convex-dev/workflow";
import { components } from "./_generated/server";
export const workflow = new WorkflowManager(components.workflow);
The first step is to define a workflow using workflow.define()
. This function
is designed to feel like a Convex action but with a few restrictions:
Math.random()
, Date.now()
, and
fetch
within our workflow environment.export const exampleWorkflow = workflow.define({
args: { name: v.string() },
handler: async (step, args) => {
const queryResult = await step.runQuery(
internal.example.exampleQuery,
args,
);
const actionResult = await step.runAction(
internal.example.exampleAction,
args,
);
console.log(queryResult, actionResult);
},
});
export const exampleQuery = internalQuery({
args: { name: v.string() },
handler: async (ctx, args) => {
return `The query says... Hi ${args.name}!`;
},
});
export const exampleAction = internalAction({
args: { name: v.string() },
handler: async (ctx, args) => {
return `The action says... Hi ${args.name}!`;
},
});
Once you've defined a workflow, you can start it from a mutation or action
using workflow.start()
.
export const kickoffWorkflow = mutation({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{
name: "James",
},
);
},
});
The workflow.start()
method returns a WorkflowId
, which can then be used for querying
a workflow's status.
export const kickoffWorkflow = action({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{
name: "James",
},
);
await new Promise((resolve) => setTimeout(resolve, 1000));
const status = await workflow.status(ctx, workflowId);
console.log("Workflow status after 1s", status);
},
});
You can also cancel a workflow with workflow.cancel()
, halting the workflow's execution immmediately. In-progress calls to step.runAction()
, however, only have best-effort cancelation.
export const kickoffWorkflow = action({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{
name: "James",
},
);
await new Promise((resolve) => setTimeout(resolve, 1000));
// Cancel the workflow after 1 second.
await workflow.cancel(ctx, workflowId);
},
});
After a workflow has completed, you can clean up its storage with workflow.cleanup()
.
Completed workflows are not automatically cleaned up by the system.
export const kickoffWorkflow = action({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{
name: "James",
},
);
try {
while (true) {
const status = await workflow.status(ctx, workflowId);
if (status.type === "inProgress") {
await new Promise((resolve) => setTimeout(resolve, 1000));
continue;
}
console.log("Workflow completed with status:", status);
break;
}
} finally {
await workflow.cleanup(ctx, workflowId);
}
},
});
Convex workflows is a beta product currently under active development. Here are a few limitations to keep in mind:
console.log()
isn't currently captured, so you may see duplicate log lines
within your Convex dashboard.fetch
, Math.random()
, or Date.now()
,
you'll need to define a separate Convex action, perform the side effects there,
and then call that action from the workflow with step.runAction()
.