Queue System & Auto Retries#944
Conversation
📝 WalkthroughWalkthroughThis PR fundamentally refactors the job management system from in-memory storage to a persistent database-backed queue using a new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Handler as HTTP Handler
participant DB as Database
participant Queue as Job Queue<br/>(DB-backed)
participant Worker as Background<br/>Worker
participant Processor as OCR/Tag<br/>Processor
Client->>Handler: POST /api/submitOCR
activate Handler
Handler->>Handler: Create QueuedJob<br/>(JobType: "manual_ocr")
Handler->>DB: EnqueueJob(job)
activate DB
DB->>Queue: Insert with Status=pending
DB-->>Handler: Success
deactivate DB
Handler-->>Client: HTTP 200
deactivate Handler
Worker->>DB: Poll for next job
activate DB
DB->>Queue: SELECT * WHERE Status IN<br/>(pending, failed) ORDER BY CreatedAt
DB->>Queue: UPDATE Status=in_progress<br/>(atomic claim)
DB-->>Worker: QueuedJob
deactivate DB
activate Worker
Worker->>Worker: processJob(job)<br/>by JobType
Worker->>Processor: processOCRJob() or<br/>processAutoTagJob()
activate Processor
Processor-->>Worker: Result
deactivate Processor
Worker->>DB: UpdateJobStatus<br/>(completed, result)
activate DB
DB->>Queue: UPDATE Status, Result
DB-->>Worker: Success
deactivate DB
deactivate Worker
Client->>Handler: GET /api/job/{jobID}
activate Handler
Handler->>DB: GetJob(jobID)
activate DB
DB->>Queue: SELECT * WHERE ID=jobID
DB-->>Handler: QueuedJob with Status
deactivate DB
Handler-->>Client: HTTP 200 {status, result}
deactivate Handler
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (2)
jobs.go (2)
178-180: Silent discard of JSON unmarshal error may hide configuration issues.If
job.OptionsJSONcontains malformed JSON, the error is silently ignored and the job proceeds with default options. This could mask bugs or user errors.Proposed fix: log the error
if job.OptionsJSON != "" { - _ = json.Unmarshal([]byte(job.OptionsJSON), &options) + if err := json.Unmarshal([]byte(job.OptionsJSON), &options); err != nil { + logger.Warnf("Failed to parse OptionsJSON for job %s: %v, using defaults", job.ID, err) + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@jobs.go` around lines 178 - 180, The code silently discards errors from json.Unmarshal of job.OptionsJSON; change the block that calls json.Unmarshal(&options) to capture the returned error and log it (including job identifier and the OptionsJSON or summary) using the project's logger instead of ignoring it, e.g., err := json.Unmarshal(...); if err != nil { logger.Errorf("failed to unmarshal job.OptionsJSON for job %s: %v", job.ID, err) } so malformed JSON is visible in logs and can be diagnosed; keep falling back to defaults only after logging the error.
207-234: Potential duplicate tag addition whenpdfOCRTaggingis enabled.When
app.pdfOCRTaggingis true,pdfOCRCompleteTagis added both viaAddTags(Line 217) andSuggestedTags(Line 224). Depending on howUpdateDocumentshandles these fields, the tag could be added twice or cause unexpected behavior.Suggested consolidation
if job.JobType == "auto_ocr" { doc, err := app.Client.GetDocument(ctx, job.DocumentID) if err == nil { documentSuggestion := DocumentSuggestion{ ID: job.DocumentID, OriginalDocument: doc, SuggestedContent: processedDoc.Text, RemoveTags: []string{autoOcrTag}, - AddTags: func() []string { - if app.pdfOCRTagging && !options.UploadPDF { - return []string{app.pdfOCRCompleteTag} - } - return nil - }(), } - if (app.pdfOCRTagging) && app.pdfOCRCompleteTag != "" { - documentSuggestion.SuggestedTags = []string{app.pdfOCRCompleteTag} + if app.pdfOCRTagging && app.pdfOCRCompleteTag != "" && !options.UploadPDF { + documentSuggestion.AddTags = []string{app.pdfOCRCompleteTag} documentSuggestion.KeepOriginalTags = true }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@jobs.go` around lines 207 - 234, The code may add pdfOCRCompleteTag twice when app.pdfOCRTagging is true because AddTags in the DocumentSuggestion literal and the later assignment to documentSuggestion.SuggestedTags both include the same tag; modify the construction of DocumentSuggestion (and the subsequent conditional that sets SuggestedTags/KeepOriginalTags) to ensure the tag is only set once—either populate AddTags or SuggestedTags but not both—by checking app.pdfOCRTagging, options.UploadPDF and processedDoc state and only assigning the pdfOCRCompleteTag to one of AddTags or SuggestedTags before calling app.Client.UpdateDocuments (refer to DocumentSuggestion, AddTags, SuggestedTags, app.pdfOCRTagging, app.pdfOCRCompleteTag, and UpdateDocuments).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app_http_handlers.go`:
- Around line 252-256: GetJob's error is always translated to 404; change the
handler to distinguish not-found vs other DB errors by using errors.Is(err,
gorm.ErrRecordNotFound) (import "errors" and "gorm.io/gorm" if missing). If
errors.Is(..., gorm.ErrRecordNotFound) return c.JSON(http.StatusNotFound, ...)
otherwise return c.JSON(http.StatusInternalServerError, gin.H{"error":
err.Error()}) so database/connectivity errors produce 500.
In `@background.go`:
- Around line 80-104: The enqueueIfNotExists function has a TOCTOU race: two
callers can both see count==0 and both call EnqueueJob, creating duplicates; add
a defensive check by ensuring the DB has a unique constraint on (document_id,
job_type, status) for active statuses and then catch and treat the resulting
unique-constraint/constraint-violation error from EnqueueJob as a benign
"already exists" case (return false, nil) instead of failing, while still
returning other errors; keep generateJobID, QueuedJob construction and the log
intact, but handle the DB uniqueness violation from app.Database/EnqueueJob and
document in comments that the DB-level unique index is required.
In `@docker-compose.yml`:
- Around line 26-29: The docker-compose.yml contains Windows-specific absolute
volume mounts (e.g., the D:\paperless\paperless-ngx\data /media /export /consume
entries) and hardcoded private IPs referenced elsewhere (lines indicated in the
review); replace those absolute paths with portable alternatives by using
relative paths or environment variable substitution (e.g., ${PAPERLESS_DATA},
${PAPERLESS_MEDIA}, etc.) in the volume definitions and replace hardcoded IPs
with variables or network names, or remove this personal dev config from the PR
if not intended for the project; update the docker-compose service volume
entries and any IP references to use the new env vars (or document defaults in a
.env.example) so the file is portable across environments.
- Around line 48-51: Remove the hardcoded PAPERLESS_API_TOKEN from the
docker-compose environment block and replace it with a secure injection method:
reference the environment variable (e.g., use ${PAPERLESS_API_TOKEN}) or
configure a Docker secret for PAPERLESS_API_TOKEN, and add an .env file
(excluded from VCS) or secrets files to store the real token; update the
environment block that currently contains PAPERLESS_BASE_URL,
PAPERLESS_API_TOKEN and MANUAL_TAG to use the chosen secure mechanism instead of
the plaintext token.
In `@jobs.go`:
- Line 31: The call logger.WithField("prefix", "JOB_QUEUE") is discarding the
returned *logrus.Entry so the prefix is never applied; capture the returned
entry (e.g. jobLogger := logger.WithField(...) or reassign logger =
logger.WithFields(...)) and use that entry (jobLogger) instead of logger across
this file (or change logger's type to *logrus.Entry where you reassign) so
subsequent calls from the job queue include the "prefix":"JOB_QUEUE" field.
In `@local_db.go`:
- Around line 36-51: The QueuedJob model allows duplicates because there’s no
composite unique constraint on (DocumentID, JobType), enabling race conditions
in enqueueIfNotExists; add a partial unique index for active statuses after
AutoMigrate (e.g., create a unique index named idx_queued_jobs_active on
document_id and job_type with a WHERE clause limiting statuses to pending,
in_progress, failed) via a migration or db.Exec call so concurrent inserts fail
atomically and enqueueIfNotExists can handle the unique-constraint error.
---
Nitpick comments:
In `@jobs.go`:
- Around line 178-180: The code silently discards errors from json.Unmarshal of
job.OptionsJSON; change the block that calls json.Unmarshal(&options) to capture
the returned error and log it (including job identifier and the OptionsJSON or
summary) using the project's logger instead of ignoring it, e.g., err :=
json.Unmarshal(...); if err != nil { logger.Errorf("failed to unmarshal
job.OptionsJSON for job %s: %v", job.ID, err) } so malformed JSON is visible in
logs and can be diagnosed; keep falling back to defaults only after logging the
error.
- Around line 207-234: The code may add pdfOCRCompleteTag twice when
app.pdfOCRTagging is true because AddTags in the DocumentSuggestion literal and
the later assignment to documentSuggestion.SuggestedTags both include the same
tag; modify the construction of DocumentSuggestion (and the subsequent
conditional that sets SuggestedTags/KeepOriginalTags) to ensure the tag is only
set once—either populate AddTags or SuggestedTags but not both—by checking
app.pdfOCRTagging, options.UploadPDF and processedDoc state and only assigning
the pdfOCRCompleteTag to one of AddTags or SuggestedTags before calling
app.Client.UpdateDocuments (refer to DocumentSuggestion, AddTags, SuggestedTags,
app.pdfOCRTagging, app.pdfOCRCompleteTag, and UpdateDocuments).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5cccdd9e-5070-455a-8087-fbcf5f083b3d
📒 Files selected for processing (5)
app_http_handlers.gobackground.godocker-compose.ymljobs.golocal_db.go
| job, err := GetJob(app.Database, jobID) | ||
| if err != nil { | ||
| c.JSON(http.StatusNotFound, gin.H{"error": "Job not found"}) | ||
| return | ||
| } |
There was a problem hiding this comment.
All GetJob errors return 404, but database errors should return 500.
Currently, any error from GetJob (including database connectivity issues) results in a 404 response. Distinguish between "not found" and other errors:
Proposed fix
job, err := GetJob(app.Database, jobID)
if err != nil {
+ if errors.Is(err, gorm.ErrRecordNotFound) {
+ c.JSON(http.StatusNotFound, gin.H{"error": "Job not found"})
+ return
+ }
+ log.Errorf("Failed to retrieve job %s: %v", jobID, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve job"})
- c.JSON(http.StatusNotFound, gin.H{"error": "Job not found"})
return
}This requires importing "gorm.io/gorm" and "errors" if not already present.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| job, err := GetJob(app.Database, jobID) | |
| if err != nil { | |
| c.JSON(http.StatusNotFound, gin.H{"error": "Job not found"}) | |
| return | |
| } | |
| job, err := GetJob(app.Database, jobID) | |
| if err != nil { | |
| if errors.Is(err, gorm.ErrRecordNotFound) { | |
| c.JSON(http.StatusNotFound, gin.H{"error": "Job not found"}) | |
| return | |
| } | |
| log.Errorf("Failed to retrieve job %s: %v", jobID, err) | |
| c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve job"}) | |
| return | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app_http_handlers.go` around lines 252 - 256, GetJob's error is always
translated to 404; change the handler to distinguish not-found vs other DB
errors by using errors.Is(err, gorm.ErrRecordNotFound) (import "errors" and
"gorm.io/gorm" if missing). If errors.Is(..., gorm.ErrRecordNotFound) return
c.JSON(http.StatusNotFound, ...) otherwise return
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) so
database/connectivity errors produce 500.
| func enqueueIfNotExists(app *App, docID int, jobType string) (bool, error) { | ||
| var count int64 | ||
| err := app.Database.Model(&QueuedJob{}). | ||
| Where("document_id = ? AND job_type = ? AND status IN ?", docID, jobType, []string{"pending", "in_progress", "failed"}). | ||
| Count(&count).Error | ||
|
|
||
| if err != nil { | ||
| return false, err | ||
| } | ||
| if count > 0 { | ||
| return false, nil // Already queued | ||
| } | ||
|
|
||
| job := &QueuedJob{ | ||
| ID: generateJobID(), | ||
| DocumentID: docID, | ||
| JobType: jobType, | ||
| Status: "pending", | ||
| } | ||
| if err := EnqueueJob(app.Database, job); err != nil { | ||
| return false, err | ||
| } | ||
| log.Infof("Enqueued new %s job for Document %d (Job ID: %s)", jobType, docID, job.ID) | ||
| return true, nil | ||
| } |
There was a problem hiding this comment.
TOCTOU race condition allows duplicate job creation.
The check-then-insert pattern has a race window: two concurrent calls for the same (docID, jobType) can both observe count == 0 and both proceed to insert, creating duplicate jobs.
Options to fix:
- Add a unique index on
(document_id, job_type)for active statuses (seelocal_db.gocomment) and handle the constraint violation error gracefully here. - Use a database transaction with
SELECT ... FOR UPDATE(though SQLite support is limited). - Use
INSERT ... ON CONFLICT DO NOTHINGpattern.
Minimal fix: handle duplicate key error gracefully
func enqueueIfNotExists(app *App, docID int, jobType string) (bool, error) {
- var count int64
- err := app.Database.Model(&QueuedJob{}).
- Where("document_id = ? AND job_type = ? AND status IN ?", docID, jobType, []string{"pending", "in_progress", "failed"}).
- Count(&count).Error
-
- if err != nil {
- return false, err
- }
- if count > 0 {
- return false, nil // Already queued
- }
-
job := &QueuedJob{
ID: generateJobID(),
DocumentID: docID,
JobType: jobType,
Status: "pending",
}
if err := EnqueueJob(app.Database, job); err != nil {
+ // If unique constraint violation, job already exists
+ if strings.Contains(err.Error(), "UNIQUE constraint failed") {
+ return false, nil
+ }
return false, err
}
log.Infof("Enqueued new %s job for Document %d (Job ID: %s)", jobType, docID, job.ID)
return true, nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func enqueueIfNotExists(app *App, docID int, jobType string) (bool, error) { | |
| var count int64 | |
| err := app.Database.Model(&QueuedJob{}). | |
| Where("document_id = ? AND job_type = ? AND status IN ?", docID, jobType, []string{"pending", "in_progress", "failed"}). | |
| Count(&count).Error | |
| if err != nil { | |
| return false, err | |
| } | |
| if count > 0 { | |
| return false, nil // Already queued | |
| } | |
| job := &QueuedJob{ | |
| ID: generateJobID(), | |
| DocumentID: docID, | |
| JobType: jobType, | |
| Status: "pending", | |
| } | |
| if err := EnqueueJob(app.Database, job); err != nil { | |
| return false, err | |
| } | |
| log.Infof("Enqueued new %s job for Document %d (Job ID: %s)", jobType, docID, job.ID) | |
| return true, nil | |
| } | |
| func enqueueIfNotExists(app *App, docID int, jobType string) (bool, error) { | |
| job := &QueuedJob{ | |
| ID: generateJobID(), | |
| DocumentID: docID, | |
| JobType: jobType, | |
| Status: "pending", | |
| } | |
| if err := EnqueueJob(app.Database, job); err != nil { | |
| // If unique constraint violation, job already exists | |
| if strings.Contains(err.Error(), "UNIQUE constraint failed") { | |
| return false, nil | |
| } | |
| return false, err | |
| } | |
| log.Infof("Enqueued new %s job for Document %d (Job ID: %s)", jobType, docID, job.ID) | |
| return true, nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@background.go` around lines 80 - 104, The enqueueIfNotExists function has a
TOCTOU race: two callers can both see count==0 and both call EnqueueJob,
creating duplicates; add a defensive check by ensuring the DB has a unique
constraint on (document_id, job_type, status) for active statuses and then catch
and treat the resulting unique-constraint/constraint-violation error from
EnqueueJob as a benign "already exists" case (return false, nil) instead of
failing, while still returning other errors; keep generateJobID, QueuedJob
construction and the log intact, but handle the DB uniqueness violation from
app.Database/EnqueueJob and document in comments that the DB-level unique index
is required.
| - D:\paperless\paperless-ngx\data:/usr/src/paperless/data | ||
| - D:\paperless\paperless-ngx\media:/usr/src/paperless/media | ||
| - D:\paperless\paperless-ngx\export:/usr/src/paperless/export | ||
| - D:\paperless\paperless-ngx\consume:/usr/src/paperless/consume |
There was a problem hiding this comment.
Windows-specific absolute paths make this configuration non-portable.
The hardcoded D:\paperless\... paths and private IP addresses (Lines 42, 61, 91) suggest this is a personal development configuration. Consider using relative paths or environment variable substitution, or exclude this file from the PR if it's not intended for general use.
Also applies to: 77-80
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docker-compose.yml` around lines 26 - 29, The docker-compose.yml contains
Windows-specific absolute volume mounts (e.g., the
D:\paperless\paperless-ngx\data /media /export /consume entries) and hardcoded
private IPs referenced elsewhere (lines indicated in the review); replace those
absolute paths with portable alternatives by using relative paths or environment
variable substitution (e.g., ${PAPERLESS_DATA}, ${PAPERLESS_MEDIA}, etc.) in the
volume definitions and replace hardcoded IPs with variables or network names, or
remove this personal dev config from the PR if not intended for the project;
update the docker-compose service volume entries and any IP references to use
the new env vars (or document defaults in a .env.example) so the file is
portable across environments.
| environment: | ||
| PAPERLESS_BASE_URL: http://paperless-ngx:8000 | ||
| PAPERLESS_API_TOKEN: b195eaf1d339bbd7b838b51c5d7e456e9190ebb9 | ||
| MANUAL_TAG: paperless-gpt |
There was a problem hiding this comment.
Hardcoded API token should be removed or externalized.
The PAPERLESS_API_TOKEN is committed in plaintext, which is a security risk. Use Docker secrets, environment variable substitution, or an .env file excluded from version control instead.
environment:
PAPERLESS_BASE_URL: http://paperless-ngx:8000
- PAPERLESS_API_TOKEN: b195eaf1d339bbd7b838b51c5d7e456e9190ebb9
+ PAPERLESS_API_TOKEN: ${PAPERLESS_API_TOKEN}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| environment: | |
| PAPERLESS_BASE_URL: http://paperless-ngx:8000 | |
| PAPERLESS_API_TOKEN: b195eaf1d339bbd7b838b51c5d7e456e9190ebb9 | |
| MANUAL_TAG: paperless-gpt | |
| environment: | |
| PAPERLESS_BASE_URL: http://paperless-ngx:8000 | |
| PAPERLESS_API_TOKEN: ${PAPERLESS_API_TOKEN} | |
| MANUAL_TAG: paperless-gpt |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docker-compose.yml` around lines 48 - 51, Remove the hardcoded
PAPERLESS_API_TOKEN from the docker-compose environment block and replace it
with a secure injection method: reference the environment variable (e.g., use
${PAPERLESS_API_TOKEN}) or configure a Docker secret for PAPERLESS_API_TOKEN,
and add an .env file (excluded from VCS) or secrets files to store the real
token; update the environment block that currently contains PAPERLESS_BASE_URL,
PAPERLESS_API_TOKEN and MANUAL_TAG to use the chosen secure mechanism instead of
the plaintext token.
| }) | ||
| logger.SetLevel(logrus.InfoLevel) | ||
| logger.WithField("prefix", "OCR_JOB") | ||
| logger.WithField("prefix", "JOB_QUEUE") |
There was a problem hiding this comment.
Logger prefix is not applied; WithField returns a new logger.
logger.WithField(...) returns a new *logrus.Entry but the result is discarded. The prefix is never applied to subsequent log calls.
Proposed fix
func init() {
logger.SetOutput(os.Stdout)
logger.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
logger.SetLevel(logrus.InfoLevel)
- logger.WithField("prefix", "JOB_QUEUE")
}
+
+var jobLogger = logger.WithField("prefix", "JOB_QUEUE")Then use jobLogger instead of logger throughout the file, or reassign:
logger = logger.WithFields(logrus.Fields{"prefix": "JOB_QUEUE"})Note: This requires changing logger type from *logrus.Logger to *logrus.Entry.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@jobs.go` at line 31, The call logger.WithField("prefix", "JOB_QUEUE") is
discarding the returned *logrus.Entry so the prefix is never applied; capture
the returned entry (e.g. jobLogger := logger.WithField(...) or reassign logger =
logger.WithFields(...)) and use that entry (jobLogger) instead of logger across
this file (or change logger's type to *logrus.Entry where you reassign) so
subsequent calls from the job queue include the "prefix":"JOB_QUEUE" field.
| // QueuedJob represents a persistent asynchronous background task | ||
| type QueuedJob struct { | ||
| ID string `gorm:"primaryKey"` | ||
| DocumentID int `gorm:"index;not null"` | ||
| JobType string `gorm:"index;not null"` // "auto_tag", "auto_ocr", "manual_ocr" | ||
| Status string `gorm:"index;not null;default:'pending'"` // "pending", "in_progress", "failed", "completed", "cancelled" | ||
| Attempts int `gorm:"not null;default:0"` | ||
| MaxRetries int `gorm:"not null;default:3"` | ||
| NextRetryAt time.Time `gorm:"index"` | ||
| Result string `gorm:"type:TEXT"` | ||
| PagesDone int `gorm:"default:0"` | ||
| TotalPages int `gorm:"default:0"` | ||
| OptionsJSON string `gorm:"type:TEXT"` // JSON serialized OCROptions | ||
| CreatedAt time.Time | ||
| UpdatedAt time.Time | ||
| } |
There was a problem hiding this comment.
Missing composite unique index enables duplicate job creation.
The QueuedJob model has separate indexes on DocumentID and JobType, but no composite unique constraint. This allows the TOCTOU race condition in enqueueIfNotExists (background.go) to create duplicate jobs when concurrent workers check and insert simultaneously.
Consider adding a partial unique index on (document_id, job_type) for active statuses:
Proposed fix
type QueuedJob struct {
ID string `gorm:"primaryKey"`
- DocumentID int `gorm:"index;not null"`
- JobType string `gorm:"index;not null"` // "auto_tag", "auto_ocr", "manual_ocr"
+ DocumentID int `gorm:"not null"`
+ JobType string `gorm:"not null"` // "auto_tag", "auto_ocr", "manual_ocr"
Status string `gorm:"index;not null;default:'pending'"` // "pending", "in_progress", "failed", "completed", "cancelled"Then add a unique index via a migration or db.Exec after AutoMigrate:
db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_queued_jobs_active
ON queued_jobs(document_id, job_type)
WHERE status IN ('pending', 'in_progress', 'failed')`)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@local_db.go` around lines 36 - 51, The QueuedJob model allows duplicates
because there’s no composite unique constraint on (DocumentID, JobType),
enabling race conditions in enqueueIfNotExists; add a partial unique index for
active statuses after AutoMigrate (e.g., create a unique index named
idx_queued_jobs_active on document_id and job_type with a WHERE clause limiting
statuses to pending, in_progress, failed) via a migration or db.Exec call so
concurrent inserts fail atomically and enqueueIfNotExists can handle the
unique-constraint error.
If an error occurs, it causes a global backoff for the entire loop. This plan will redesign the architecture to use a robust Producer/Consumer Queue System. It will decouple document discovery from document processing, allowing parallel processing, task persistence, and individual task-level Auto-Retries with exponential backoff on failure (API rate limits, timeout, etc.).
Summary by CodeRabbit
New Features
Chores