Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2cfd1f2
feat(table): chunked dispatcher for workflow-column runs
TheodoreSpeaks May 16, 2026
6b30f14
fix(table): eager bulk clear on column run so cells flip immediately
TheodoreSpeaks May 16, 2026
b315324
fix(table): bulk clear honors in-flight execs under mode: 'incomplete'
TheodoreSpeaks May 16, 2026
e76469a
refactor(table): dispatcher uses batchTriggerAndWait + tag-based cancel
TheodoreSpeaks May 18, 2026
af07b16
fix(table): show Stop button on optimistic-pending row cells
TheodoreSpeaks May 18, 2026
8fa3568
refactor(table): loop-in-cell cascade + dispatcher-everywhere routing
TheodoreSpeaks May 19, 2026
bd99d6c
fix(table): SQL cancellation guard allows worker to claim a null-exec…
TheodoreSpeaks May 19, 2026
125ae7e
fix(table): dispatcher cursor starts at -1 so position 0 is included
TheodoreSpeaks May 19, 2026
99e623a
refactor(table): align optimistic UI with new dispatcher; sticky canc…
TheodoreSpeaks May 19, 2026
563fc37
refactor(table): unify trigger.dev and inline dispatcher paths
TheodoreSpeaks May 19, 2026
757033a
feat(table): backend running counter, dep-aware retrigger, sidebar po…
TheodoreSpeaks May 20, 2026
e57380d
fix(table): paused workflow cells route through executeResumeJob; ren…
TheodoreSpeaks May 20, 2026
7279172
feat(table): typewriter reveal for SSE-driven workflow cell values
TheodoreSpeaks May 20, 2026
b2b1a83
fix(table): address bugbot/greptile review feedback
TheodoreSpeaks May 20, 2026
01bb233
refactor(table): row executions sidecar + left-to-right dep retrigger…
TheodoreSpeaks May 20, 2026
bfb847b
fix(table): address remaining cursor/greptile review feedback
TheodoreSpeaks May 20, 2026
0b14dbb
fix(table): cancel prior runs, scope batch insert dispatch, recover o…
TheodoreSpeaks May 20, 2026
ec0b73e
fix(table): row-scoped Refresh cancels in-flight; counter includes qu…
TheodoreSpeaks May 20, 2026
5c657e7
fix(table): per-row Stop tombstones ahead-of-cursor rows during Run-all
TheodoreSpeaks May 20, 2026
655269e
fix(table): seed dispatch overlay on Run; surface batch-enqueue failu…
TheodoreSpeaks May 20, 2026
9f30cb7
fix(table): seed dispatch overlay on Run; surface batch-enqueue failu…
TheodoreSpeaks May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions apps/sim/app/api/resume/poll/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,21 @@ async function dispatchRow(row: DueRow, now: Date): Promise<RowResult> {
})

if (enqueueResult.status === 'starting') {
PauseResumeManager.startResumeExecution({
// Route through `executeResumeJob` (not `PauseResumeManager.startResumeExecution`
// directly) so cell-context restoration + cascade-loop continuation
// fires. This is the same primitive the trigger.dev `resumeExecutionTask`
// wraps — calling it directly handles both trigger.dev-disabled local
// dev and trigger.dev-enabled prod identically.
const { executeResumeJob } = await import('@/background/resume-execution')
void executeResumeJob({
resumeEntryId: enqueueResult.resumeEntryId,
resumeExecutionId: enqueueResult.resumeExecutionId,
pausedExecution: enqueueResult.pausedExecution,
pausedExecutionId: enqueueResult.pausedExecution.id,
contextId: enqueueResult.contextId,
resumeInput: enqueueResult.resumeInput,
userId: enqueueResult.userId,
workflowId: row.workflowId,
parentExecutionId: row.executionId,
}).catch((error) => {
logger.error('Background time-pause resume failed', {
executionId: row.executionId,
Expand Down
10 changes: 2 additions & 8 deletions apps/sim/app/api/table/[tableId]/columns/run/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { type NextRequest, NextResponse } from 'next/server'
import { runColumnContract } from '@/lib/api/contracts/tables'
import { parseRequest } from '@/lib/api/server'
Expand Down Expand Up @@ -30,21 +29,16 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
const access = await checkAccess(tableId, auth.userId, 'write')
if (!access.ok) return accessError(access, requestId, tableId)

// Dispatch in the background — large fan-outs (thousands of rows) issue
// sequential trigger.dev calls and would otherwise hold the HTTP response
// open for minutes, blocking the AI/copilot tool span and the UI mutation.
void runWorkflowColumn({
Comment thread
TheodoreSpeaks marked this conversation as resolved.
const { dispatchId } = await runWorkflowColumn({
tableId,
workspaceId,
groupIds,
mode: runMode,
rowIds,
requestId,
}).catch((err) => {
logger.error(`[${requestId}] run-column dispatch failed:`, toError(err).message)
})

return NextResponse.json({ success: true, data: { triggered: null } })
return NextResponse.json({ success: true, data: { dispatchId } })
} catch (error) {
if (error instanceof Error && error.message === 'Invalid workspace ID') {
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
Expand Down
65 changes: 65 additions & 0 deletions apps/sim/app/api/table/[tableId]/dispatches/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { type ActiveDispatch, listActiveDispatchesContract } from '@/lib/api/contracts/tables'
import { parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { countRunningCells, listActiveDispatches } from '@/lib/table/dispatcher'
import { accessError, checkAccess } from '@/app/api/table/utils'

const logger = createLogger('TableDispatchesAPI')

interface RouteParams {
params: Promise<{ tableId: string }>
}

/**
* GET /api/table/[tableId]/dispatches
*
* Returns active (`pending` / `dispatching`) dispatches for the table. Drives
* the client's "about to run" overlay so refresh during a long Run-all keeps
* the queued indicators on rows the dispatcher hasn't reached yet.
*/
export const GET = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
const requestId = generateRequestId()

try {
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
if (!authResult.success || !authResult.userId) {
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
}

const parsed = await parseRequest(listActiveDispatchesContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params

const result = await checkAccess(tableId, authResult.userId, 'read')
if (!result.ok) return accessError(result, requestId, tableId)

const [rows, running] = await Promise.all([
listActiveDispatches(tableId),
countRunningCells(tableId),
])
const dispatches: ActiveDispatch[] = rows.map((r) => ({
id: r.id,
status: r.status as 'pending' | 'dispatching',
mode: r.mode,
isManualRun: r.isManualRun,
cursor: r.cursor,
scope: r.scope,
}))

return NextResponse.json({
success: true,
data: {
dispatches,
runningCellCount: running.total,
runningByRowId: running.byRowId,
},
})
} catch (error) {
logger.error(`[${requestId}] list-dispatches failed:`, error)
return NextResponse.json({ error: 'Failed to list active dispatches' }, { status: 500 })
}
})
5 changes: 5 additions & 0 deletions apps/sim/app/api/table/[tableId]/rows/[rowId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ export const PATCH = withRouteHandler(async (request: NextRequest, context: RowR
// Only `null` when a `cancellationGuard` is supplied and the SQL guard
// rejects the write — this route doesn't pass one, so reaching null is a bug.
if (!updatedRow) throw new Error('updateRow returned null without a cancellationGuard')
// Auto-dispatch for user edits is handled inside `updateRow` (mode: 'new').
// Firing a second mode: 'incomplete' dispatch here would race with the
// `mode: 'new'` one AND bulk-clear sibling-group outputs (the incomplete
// bulk-clear wipes ALL targeted columns when any one column on the row
// is empty).

return NextResponse.json({
success: true,
Expand Down
51 changes: 46 additions & 5 deletions apps/sim/app/api/table/[tableId]/rows/route.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { db } from '@sim/db'
import { userTableRows } from '@sim/db/schema'
import { tableRowExecutions, userTableRows } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { and, eq, sql } from 'drizzle-orm'
import { and, eq, inArray, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import {
type BatchInsertTableRowsBodyInput,
Expand All @@ -17,7 +17,14 @@ import { isZodError, validationErrorResponse } from '@/lib/api/server/validation
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import type { Filter, RowData, Sort, TableSchema } from '@/lib/table'
import type {
Filter,
RowData,
RowExecutionMetadata,
RowExecutions,
Sort,
TableSchema,
} from '@/lib/table'
import {
batchInsertRows,
batchUpdateRows,
Expand Down Expand Up @@ -277,7 +284,6 @@ export const GET = withRouteHandler(
.select({
id: userTableRows.id,
data: userTableRows.data,
executions: userTableRows.executions,
position: userTableRows.position,
createdAt: userTableRows.createdAt,
updatedAt: userTableRows.updatedAt,
Expand Down Expand Up @@ -308,6 +314,41 @@ export const GET = withRouteHandler(

const rows = await query.limit(validated.limit).offset(validated.offset)

// Sidecar: fetch per-(row, group) execution state and group into a map
// so the response preserves the legacy `row.executions[groupId]` wire
// shape. One indexed-IN scan against table_row_executions.
const executionsByRow = new Map<string, RowExecutions>()
if (rows.length > 0) {
const execRows = await db
.select()
.from(tableRowExecutions)
.where(
inArray(
tableRowExecutions.rowId,
rows.map((r) => r.id)
)
)
for (const e of execRows) {
const existing = executionsByRow.get(e.rowId) ?? {}
const meta: RowExecutionMetadata = {
status: e.status as RowExecutionMetadata['status'],
executionId: e.executionId ?? null,
jobId: e.jobId ?? null,
workflowId: e.workflowId,
error: e.error ?? null,
...(e.runningBlockIds && e.runningBlockIds.length > 0
? { runningBlockIds: e.runningBlockIds }
: {}),
...(e.blockErrors && Object.keys(e.blockErrors as Record<string, string>).length > 0
? { blockErrors: e.blockErrors as Record<string, string> }
: {}),
...(e.cancelledAt ? { cancelledAt: e.cancelledAt.toISOString() } : {}),
}
existing[e.groupId] = meta
executionsByRow.set(e.rowId, existing)
}
}

logger.info(
`[${requestId}] Queried ${rows.length} rows from table ${tableId} (total: ${totalCount ?? 'n/a'})`
)
Expand All @@ -318,7 +359,7 @@ export const GET = withRouteHandler(
rows: rows.map((r) => ({
id: r.id,
data: r.data,
executions: r.executions ?? {},
executions: executionsByRow.get(r.id) ?? {},
position: r.position,
createdAt:
r.createdAt instanceof Date ? r.createdAt.toISOString() : String(r.createdAt),
Expand Down
3 changes: 3 additions & 0 deletions apps/sim/app/api/v1/tables/[tableId]/rows/[rowId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ export const PATCH = withRouteHandler(async (request: NextRequest, context: RowR
if (!updatedRow) {
return NextResponse.json({ error: 'Row not found' }, { status: 404 })
}
// Auto-dispatch for user edits is handled inside `updateRow` (mode: 'new').
// Firing a second mode: 'incomplete' dispatch here would race with it AND
// bulk-clear sibling-group outputs.

return NextResponse.json({
success: true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use client'

import type React from 'react'
import { useEffect, useRef, useState } from 'react'
import { parse } from 'tldts'
import { Badge, Checkbox, Tooltip } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
Expand Down Expand Up @@ -60,6 +61,14 @@ export function resolveCellRender({
if (!isNull) return { kind: 'value', text: stringifyValue(value) }

if (inFlight && !(groupHasBlockErrors && !blockRunning)) {
// A `pending` cell whose jobId starts with `paused-` is mid-pause
// (workflow yielded for human-in-the-loop). Render as Pending rather
// than Queued so the user can tell it's not just waiting to start.
const isPaused =
exec?.status === 'pending' &&
typeof exec.jobId === 'string' &&
exec.jobId.startsWith('paused-')
if (isPaused) return { kind: 'pending-upstream' }
if (exec?.status === 'queued' || exec?.status === 'pending') return { kind: 'queued' }
return { kind: 'pending-upstream' }
}
Expand Down Expand Up @@ -119,6 +128,9 @@ interface CellRenderProps {
}

export function CellRender({ kind, isEditing }: CellRenderProps): React.ReactElement | null {
const valueText = kind.kind === 'value' ? kind.text : null
const revealedValueText = useTypewriter(valueText)

switch (kind.kind) {
case 'value':
return (
Expand All @@ -128,7 +140,7 @@ export function CellRender({ kind, isEditing }: CellRenderProps): React.ReactEle
isEditing && 'invisible'
)}
>
{kind.text}
{revealedValueText ?? kind.text}
</span>
)

Expand Down Expand Up @@ -275,3 +287,45 @@ function Wrap({ isEditing, children }: { isEditing: boolean; children: React.Rea
if (!isEditing) return <>{children}</>
return <div className='invisible'>{children}</div>
}

const TYPEWRITER_MS_PER_CHAR = 15

/**
* Reveals `text` character-by-character whenever it changes after the first
* render. Initial render (page hydration or virtualization remount) shows the
* value statically — animation fires only for subsequent updates, which in
* practice means SSE-driven workflow completions arriving via
* `useTableEventStream → applyCell()`.
*/
function useTypewriter(text: string | null): string | null {
const [revealed, setRevealed] = useState<string | null>(text)
const isFirstRunRef = useRef(true)
const prevTextRef = useRef<string | null>(text)

useEffect(() => {
if (isFirstRunRef.current) {
isFirstRunRef.current = false
prevTextRef.current = text
setRevealed(text)
return
}
if (prevTextRef.current === text) return
prevTextRef.current = text

if (text === null || text.length === 0) {
setRevealed(text)
return
}

setRevealed('')
let i = 0
const id = window.setInterval(() => {
i++
setRevealed(text.slice(0, i))
if (i >= text.length) window.clearInterval(id)
}, TYPEWRITER_MS_PER_CHAR)
return () => window.clearInterval(id)
}, [text])

return revealed
}
Loading
Loading