Add metadata column support for Kafka CDC connector#7315
Open
nickdelnano wants to merge 27 commits intoapache:masterfrom
Open
Add metadata column support for Kafka CDC connector#7315nickdelnano wants to merge 27 commits intoapache:masterfrom
nickdelnano wants to merge 27 commits intoapache:masterfrom
Conversation
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
…no/kafka-metadata
9c59111 to
1dcf9f2
Compare
AbstractDataFormat.createParser(3-arg) silently dropped the metadataConverters array by delegating to the 2-arg version. Fix by adding a withMetadataConverters() setter on AbstractRecordParser and chaining it in AbstractDataFormat, so all formats receive metadata converters uniformly. This also makes DebeziumAvroDataFormat's createParser override redundant, so remove it along with the 3-arg constructor from DebeziumAvroRecordParser. Revert testComputedColumn to master state since metadata testing belongs in testMetadataColumn. Fix testMetadataColumn column order in expected RowType and data assertion to match the full 8-column row. Update KafkaMetadataE2ETest to construct parsers via the 2-arg constructor + withMetadataConverters(), matching the production code path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1dcf9f2 to
1cbd850
Compare
nickdelnano
commented
Mar 3, 2026
...on-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java
Outdated
Show resolved
Hide resolved
...k-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
Outdated
Show resolved
Hide resolved
Remove the unused 3-arg constructor from AbstractRecordParser — all callers use the 2-arg constructor + withMetadataConverters() setter. Move the 3-arg createParser default implementation from AbstractDataFormat up into the DataFormat interface, making the 2-arg method the primary abstract contract. This eliminates the inverted delegation between the interface and implementation and removes a redundant override from AbstractDataFormat. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
dced44f to
d53a2e5
Compare
… AWS DMS Previously only KafkaDebeziumSyncTableActionITCase exercised metadata columns. Add the same test to the four other format-specific IT case classes, each delegating to the base testMetadataColumn(format) method. Create corresponding test data fixtures under each format's metadatacolumn/ resource directory. The Canal fixture omits mysqlType and sqlType fields so columns are inferred as STRING, matching the base test's expected RowType. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
d53a2e5 to
4fdea2d
Compare
Contributor
Author
|
@JingsongLi could you please review this PR, or identify someone who can? Thank you |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR supercedes #6353 - I took over this work from my colleague @gmdfalk
Description
Add --metadata_column support to Paimon Kafka CDC connector, similar to the already existing options added for MySQL and Postgres: #2077
Also add optional --metadata_column_prefix to avoid conflicts with existing Paimon fields like topic, timestamp etc.
Supported metadata columns are those on org.apache.kafka.clients.consumer.ConsumerRecord i.e.:
The feature is backwards compatible. It's only active when --metadata_column is supplied resp. SynchronizationActionBase.withMetadataColumns is used.
Motivation
This is a requested feature: #3210
We primarly use this feature for two purposes:
Tests
Unit and Integration Tests
API and Format
No changes to public apis or storage format.
The changes here are contained to the flink cdc package but I did have to update CdcSourceRecord since it previously didn't provide a way to surface arbitrary metadata for a record.
The metadata attribute on CdcSourceRecord is intentionally a generic Map so that it can potentially be used to add metadata support for other connectors like Pulsar or Mongo that are not yet implemented.
Documentation
Added the new --metadata_column and --metadata_column_prefix parameter to Kafka CDC docs.
Dev notes
For running integration tests on MacOS with Rancher Desktop, i had to properly expose the docker socket to testcontainers e.g. system wide via sudo ln -sf "$HOME/.rd/docker.sock" /var/run/docker.sock.