Skip to content

Commit e1566a6

Browse files
oracledb_cdc: improve error handling
1 parent cbc4a55 commit e1566a6

1 file changed

Lines changed: 9 additions & 7 deletions

File tree

internal/impl/oracledb/logminer/logminer.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,9 @@ func (lm *LogMiner) processRedoEvent(ctx context.Context, redoEvent *sqlredo.Red
288288
}
289289

290290
if lm.cfg.LOBEnabled && (redoEvent.Operation == sqlredo.OpInsert || redoEvent.Operation == sqlredo.OpUpdate) {
291-
lm.drainPendingLOBWrites(ctx, redoEvent.TransactionID)
291+
if err = lm.drainPendingLOBWrites(ctx, redoEvent.TransactionID); err != nil {
292+
return fmt.Errorf("draining pending lob writes: %w", err)
293+
}
292294
}
293295

294296
case sqlredo.OpSelectLobLocator:
@@ -420,8 +422,7 @@ func (lm *LogMiner) processRedoEvent(ctx context.Context, redoEvent *sqlredo.Red
420422
// not as HEXTORAW. Only BLOB uses binary/hex encoding.
421423
writeInfo, err := sqlredo.ParseLobWrite(redoEvent.SQLRedo.String, acc.IsBinary)
422424
if err != nil {
423-
lm.log.Warnf("Failed to parse LOB_WRITE SQL (scn=%d, txn=%s): %v\nSQL: %.500s", redoEvent.SCN, redoEvent.TransactionID, err, redoEvent.SQLRedo.String)
424-
return nil
425+
return fmt.Errorf("parsing LOB_WRITE SQL (scn=%d, txn=%s): %v\nSQL: %.500s", redoEvent.SCN, redoEvent.TransactionID, err, redoEvent.SQLRedo.String)
425426
}
426427
acc.AddFragment(writeInfo.Offset, writeInfo.Data)
427428

@@ -766,10 +767,10 @@ func (lm *LogMiner) inferLOBLocator(ctx context.Context, event *sqlredo.RedoEven
766767
// drainPendingLOBWrites retries LOB_WRITE events that were buffered because their
767768
// INSERT had not yet been processed. Called after each INSERT or UPDATE is added
768769
// to the transaction cache, at which point inferLOBLocator should succeed.
769-
func (lm *LogMiner) drainPendingLOBWrites(ctx context.Context, txnID sqlredo.TransactionID) {
770+
func (lm *LogMiner) drainPendingLOBWrites(ctx context.Context, txnID sqlredo.TransactionID) error {
770771
pending := lm.pendingLOBWrites[txnID]
771772
if len(pending) == 0 {
772-
return
773+
return nil
773774
}
774775

775776
var remaining []*sqlredo.RedoEvent
@@ -792,8 +793,7 @@ func (lm *LogMiner) drainPendingLOBWrites(ctx context.Context, txnID sqlredo.Tra
792793
}
793794
writeInfo, err := sqlredo.ParseLobWrite(event.SQLRedo.String, acc.IsBinary)
794795
if err != nil {
795-
lm.log.Warnf("Failed to parse buffered LOB_WRITE SQL (scn=%d, txn=%s): %v\nSQL: %.500s", event.SCN, txnID, err, event.SQLRedo.String)
796-
continue
796+
return fmt.Errorf("parsing buffered LOB_WRITE SQL (scn=%d, txn=%s): %v\nSQL: %.500s", event.SCN, txnID, err, event.SQLRedo.String)
797797
}
798798
acc.AddFragment(writeInfo.Offset, writeInfo.Data)
799799
lm.log.Debugf("Drained buffered LOB_WRITE for %s.%s.%s (txn=%s)", acc.Schema, acc.Table, acc.Column, txnID)
@@ -804,6 +804,8 @@ func (lm *LogMiner) drainPendingLOBWrites(ctx context.Context, txnID sqlredo.Tra
804804
} else {
805805
lm.pendingLOBWrites[txnID] = remaining
806806
}
807+
808+
return nil
807809
}
808810

809811
func (lm *LogMiner) queryLogMinerContents(ctx context.Context, conn *sql.Conn, startSCN, endSCN uint64, processEvent func(context.Context, *sqlredo.RedoEvent) error) error {

0 commit comments

Comments
 (0)