kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)#12680
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
@3AceShowHand This PR has conflicts, I have hold it. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
There was a problem hiding this comment.
Code Review
This pull request refactors the DDL sink to close the underlying sink if sending DDL or checkpoint events fails, and refactors the Kafka DDL sink. However, the changes contain several unresolved git conflict markers across multiple files, including kafka_ddl_producer.go, kafka_ddl_sink.go, pulsar_ddl_sink.go, factory.go, and sarama_factory.go. The review comments correctly point out these unresolved conflicts and provide actionable suggestions to resolve them, which is critical to prevent compilation errors and ensure interface compatibility.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| <<<<<<< HEAD | ||
| err := k.syncProducer.SendMessages(ctx, topic, | ||
| totalPartitionsNum, message.Key, message.Value) | ||
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) | ||
| ======= | ||
| return k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
There are unresolved git conflict markers here. Please resolve the conflict by keeping the correct call to SendMessages with message.Key and message.Value to match the SyncProducer interface, and return the error directly since the factory implementation already wraps it.
return k.syncProducer.SendMessages(ctx, topic,
totalPartitionsNum, message.Key, message.Value)| <<<<<<< HEAD | ||
| err := k.syncProducer.SendMessage(ctx, topic, | ||
| partitionNum, message.Key, message.Value) | ||
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) | ||
| ======= | ||
| return k.syncProducer.SendMessage(ctx, topic, partitionNum, message) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
There are unresolved git conflict markers here. Please resolve the conflict by keeping the correct call to SendMessage with message.Key and message.Value to match the SyncProducer interface, and return the error directly since the factory implementation already wraps it.
return k.syncProducer.SendMessage(ctx, topic,
partitionNum, message.Key, message.Value)| <<<<<<< HEAD | ||
| s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol) | ||
| ======= | ||
| s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
| <<<<<<< HEAD | ||
| s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) | ||
| ======= | ||
| s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
| <<<<<<< HEAD | ||
| func (p *saramaSyncProducer) SendMessages(ctx context.Context, | ||
| topic string, partitionNum int32, | ||
| key []byte, value []byte, | ||
| ) error { | ||
| ======= | ||
| func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, partitionNum int32, message *common.Message) error { | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
There are unresolved git conflict markers here. Please resolve the conflict by keeping the original signature of SendMessages that takes key []byte, value []byte to match the SyncProducer interface.
func (p *saramaSyncProducer) SendMessages(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
) error {| <<<<<<< HEAD | ||
|
|
||
| client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config) | ||
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
| p, err := newSaramaSyncProducerFromClientImpl(client) | ||
| ======= | ||
| p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) | ||
| if err != nil { | ||
| closeSaramaClientOnFailure(f.changefeedID, client, "close sarama client after sync producer init failed") | ||
| return nil, errors.Trace(err) | ||
| } |
There was a problem hiding this comment.
There are unresolved git conflict markers here. Also, the client variable is no longer defined if we use sarama.NewSyncProducer directly, which would cause a compilation error in closeSaramaClientOnFailure. Please resolve the conflict by using sarama.NewSyncProducer and removing the redundant closeSaramaClientOnFailure call on failure.
p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12112
What problem does this PR solve?
Issue Number: close #12666, close #12096
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note