Skip to content

kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)#12680

Open
ti-chi-bot wants to merge 1 commit into
pingcap:release-7.5from
ti-chi-bot:cherry-pick-12112-to-release-7.5
Open

kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)#12680
ti-chi-bot wants to merge 1 commit into
pingcap:release-7.5from
ti-chi-bot:cherry-pick-12112-to-release-7.5

Conversation

@ti-chi-bot

Copy link
Copy Markdown
Member

This is an automated cherry-pick of #12112

What problem does this PR solve?

Issue Number: close #12666, close #12096

What is changed and how it works?

  • owner/ddl_sink should close if meet any error, and rebuilt by the retry.
  • kafka factory implementations should wrap error immediately, and caller should just return error, no need to trace the error.
  • kafka ddl sink use encoder directly, instead of encoderBuilder
  • syncProducer no need to close in a goroutine, since it only deliver one message at a time, won't be blocked if the downstream kafka cluster unavailable.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

`None`

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
@ti-chi-bot ti-chi-bot added do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note-none Denotes a PR that doesn't merit a release note. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. type/cherry-pick-for-release-7.5 This PR is cherry-picked to release-7.5 from a source PR. labels Jun 5, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

This cherry pick PR is for a release branch and has not yet been approved by triage owners.
Adding the do-not-merge/cherry-pick-not-approved label.

To merge this cherry pick:

  1. It must be LGTMed and approved by the reviewers firstly.
  2. For pull requests to TiDB-x branches, it must have no failed tests.
  3. AFTER it has lgtm and approved labels, please wait for the cherry-pick merging approval from triage owners.
Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot

ti-chi-bot Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign okjiang for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot

Copy link
Copy Markdown
Member Author

@3AceShowHand This PR has conflicts, I have hold it.
Please resolve them or ask others to resolve them, then comment /unhold to remove the hold label.

@ti-chi-bot

ti-chi-bot Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the DDL sink to close the underlying sink if sending DDL or checkpoint events fails, and refactors the Kafka DDL sink. However, the changes contain several unresolved git conflict markers across multiple files, including kafka_ddl_producer.go, kafka_ddl_sink.go, pulsar_ddl_sink.go, factory.go, and sarama_factory.go. The review comments correctly point out these unresolved conflicts and provide actionable suggestions to resolve them, which is critical to prevent compilation errors and ensure interface compatibility.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +72 to +78
<<<<<<< HEAD
err := k.syncProducer.SendMessages(ctx, topic,
totalPartitionsNum, message.Key, message.Value)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
=======
return k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git conflict markers here. Please resolve the conflict by keeping the correct call to SendMessages with message.Key and message.Value to match the SyncProducer interface, and return the error directly since the factory implementation already wraps it.

		return k.syncProducer.SendMessages(ctx, topic,
			totalPartitionsNum, message.Key, message.Value)

Comment on lines +96 to +102
<<<<<<< HEAD
err := k.syncProducer.SendMessage(ctx, topic,
partitionNum, message.Key, message.Value)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
=======
return k.syncProducer.SendMessage(ctx, topic, partitionNum, message)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git conflict markers here. Please resolve the conflict by keeping the correct call to SendMessage with message.Key and message.Value to match the SyncProducer interface, and return the error directly since the factory implementation already wraps it.

		return k.syncProducer.SendMessage(ctx, topic,
			partitionNum, message.Key, message.Value)

Comment on lines +115 to +119
<<<<<<< HEAD
s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
=======
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git conflict markers here. Please resolve the conflict by passing encoderBuilder.Build() and keeping the ctx parameter as required by newDDLSink.

	s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)

Comment on lines +103 to +107
<<<<<<< HEAD
s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol)
=======
s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git conflict markers here. Please resolve the conflict by passing encoderBuilder.Build() and keeping the ctx parameter as required by newDDLSink.

	s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)

Comment thread pkg/sink/kafka/factory.go
Comment on lines +106 to +113
<<<<<<< HEAD
func (p *saramaSyncProducer) SendMessages(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
) error {
=======
func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, partitionNum int32, message *common.Message) error {
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git conflict markers here. Please resolve the conflict by keeping the original signature of SendMessages that takes key []byte, value []byte to match the SyncProducer interface.

func (p *saramaSyncProducer) SendMessages(ctx context.Context,
	topic string, partitionNum int32,
	key []byte, value []byte,
) error {

Comment on lines +111 to 124
<<<<<<< HEAD

client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}
p, err := newSaramaSyncProducerFromClientImpl(client)
=======
p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))
if err != nil {
closeSaramaClientOnFailure(f.changefeedID, client, "close sarama client after sync producer init failed")
return nil, errors.Trace(err)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git conflict markers here. Also, the client variable is no longer defined if we use sarama.NewSyncProducer directly, which would cause a compilation error in closeSaramaClientOnFailure. Please resolve the conflict by using sarama.NewSyncProducer and removing the redundant closeSaramaClientOnFailure call on failure.

	p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)
	if err != nil {
		return nil, errors.Trace(err)
	}

@ti-chi-bot

ti-chi-bot Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

@ti-chi-bot: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-verify 409d3b0 link true /test pull-verify
pull-cdc-integration-pulsar-test 409d3b0 link true /test pull-cdc-integration-pulsar-test
pull-cdc-integration-storage-test 409d3b0 link true /test pull-cdc-integration-storage-test
pull-cdc-integration-mysql-test 409d3b0 link true /test pull-cdc-integration-mysql-test
pull-cdc-integration-kafka-test 409d3b0 link true /test pull-cdc-integration-kafka-test

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/cherry-pick-not-approved do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note-none Denotes a PR that doesn't merit a release note. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. type/cherry-pick-for-release-7.5 This PR is cherry-picked to release-7.5 from a source PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants