Skip to content

Add metadata column support for Kafka CDC connector#7315

Open
nickdelnano wants to merge 27 commits intoapache:masterfrom
nickdelnano:nickdelnano/kafka-metadata
Open

Add metadata column support for Kafka CDC connector#7315
nickdelnano wants to merge 27 commits intoapache:masterfrom
nickdelnano:nickdelnano/kafka-metadata

Conversation

@nickdelnano
Copy link
Contributor

@nickdelnano nickdelnano commented Feb 27, 2026

This PR supercedes #6353 - I took over this work from my colleague @gmdfalk

Description

Add --metadata_column support to Paimon Kafka CDC connector, similar to the already existing options added for MySQL and Postgres: #2077
Also add optional --metadata_column_prefix to avoid conflicts with existing Paimon fields like topic, timestamp etc.

Supported metadata columns are those on org.apache.kafka.clients.consumer.ConsumerRecord i.e.:

  • topic
  • partition
  • offset
  • timestamp
  • timestampType: This is the name of the enum i.e. NoTimestampType, CreateTime or LogAppendTime

The feature is backwards compatible. It's only active when --metadata_column is supplied resp. SynchronizationActionBase.withMetadataColumns is used.

Motivation

This is a requested feature: #3210

We primarly use this feature for two purposes:

  1. Troubleshooting and data lineage (e.g. where in our Kafka infrastructure does this Paimon row come from?)
  2. Mapping large Kafka topic partitions 1:1 to Paimon buckets to avoid reshuffling (see this issue it would solve: [Feature] 1-1 mapping between paimon buckets and kafka partitions #3249)

Tests

Unit and Integration Tests

API and Format

No changes to public apis or storage format.

The changes here are contained to the flink cdc package but I did have to update CdcSourceRecord since it previously didn't provide a way to surface arbitrary metadata for a record.

The metadata attribute on CdcSourceRecord is intentionally a generic Map so that it can potentially be used to add metadata support for other connectors like Pulsar or Mongo that are not yet implemented.

Documentation

Added the new --metadata_column and --metadata_column_prefix parameter to Kafka CDC docs.

Dev notes

For running integration tests on MacOS with Rancher Desktop, i had to properly expose the docker socket to testcontainers e.g. system wide via sudo ln -sf "$HOME/.rd/docker.sock" /var/run/docker.sock.

gmdfalk and others added 24 commits September 29, 2025 11:34
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
Signed-off-by: Max Falk <gfalk@yelp.com>
@nickdelnano nickdelnano force-pushed the nickdelnano/kafka-metadata branch 2 times, most recently from 9c59111 to 1dcf9f2 Compare March 3, 2026 00:09
AbstractDataFormat.createParser(3-arg) silently dropped the
metadataConverters array by delegating to the 2-arg version. Fix by
adding a withMetadataConverters() setter on AbstractRecordParser and
chaining it in AbstractDataFormat, so all formats receive metadata
converters uniformly. This also makes DebeziumAvroDataFormat's
createParser override redundant, so remove it along with the 3-arg
constructor from DebeziumAvroRecordParser.

Revert testComputedColumn to master state since metadata testing belongs
in testMetadataColumn. Fix testMetadataColumn column order in expected
RowType and data assertion to match the full 8-column row. Update
KafkaMetadataE2ETest to construct parsers via the 2-arg constructor +
withMetadataConverters(), matching the production code path.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@nickdelnano nickdelnano force-pushed the nickdelnano/kafka-metadata branch from 1dcf9f2 to 1cbd850 Compare March 3, 2026 00:13
Remove the unused 3-arg constructor from AbstractRecordParser — all
callers use the 2-arg constructor + withMetadataConverters() setter.
Move the 3-arg createParser default implementation from
AbstractDataFormat up into the DataFormat interface, making the 2-arg
method the primary abstract contract. This eliminates the inverted
delegation between the interface and implementation and removes a
redundant override from AbstractDataFormat.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@nickdelnano nickdelnano force-pushed the nickdelnano/kafka-metadata branch from dced44f to d53a2e5 Compare March 4, 2026 19:37
… AWS DMS

Previously only KafkaDebeziumSyncTableActionITCase exercised metadata
columns. Add the same test to the four other format-specific IT case
classes, each delegating to the base testMetadataColumn(format) method.
Create corresponding test data fixtures under each format's
metadatacolumn/ resource directory. The Canal fixture omits mysqlType
and sqlType fields so columns are inferred as STRING, matching the base
test's expected RowType.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@nickdelnano nickdelnano force-pushed the nickdelnano/kafka-metadata branch from d53a2e5 to 4fdea2d Compare March 4, 2026 21:30
@nickdelnano nickdelnano marked this pull request as ready for review March 4, 2026 22:51
@nickdelnano nickdelnano changed the title Add kafka metadata Add metadata column support for Kafka CDC connector Mar 4, 2026
@nickdelnano
Copy link
Contributor Author

@JingsongLi could you please review this PR, or identify someone who can? Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants