# What's new in Camel-Kafka-connector 0.7.0

Apache Camel Kafka Connector 0.7.0 has just been released.

This is based on the LTS release of Apache Camel 3.7.0, this means we will provide patch releases, as Camel 3.7.x is an LTS release.

## So what’s in this release?

This release introduce bug fixes, improvements, new features and new connectors obviously

### New connectors

The new connectors introduced in this release are the following:

• AtlasMap: Transforms the message using an AtlasMap transformation
• Kubernetes Custom Resources: Perform operations on Kubernetes Custom Resources and get notified on Deployment changes
• Vert.X Kafka: Sent and receive messages to/from an Apache Kafka broker using vert.x Kafka client
• JSON JSON-B: Marshal POJOs to JSON and back using JSON-B
• CSimple: Evaluate a compile simple expression language
• DataSonnet: To use DataSonnet scripts in Camel expressions or predicates
• jOOR: Evaluate a jOOR (Java compiled once at runtime) expression language

### Support for idempotent repository

In this release we introduced the idempotency support.

The initial support will provide:

• in-memory idempotent repository
• kafka idempotent repository

The idempotency will be supported on both source and sink connector, this means:

• From a source point of view, by enabling the idempotency, you’ll be able to avoid ingesting the same payload to the target Kafka topic. This can be done at body or header level.
• From a sink point of view, by enabling the idempotency, you’ll be able to avoid sending the same payload to an external system, through checking body or header of the Kafka record from the Kafka source topic.

### Support for remove headers option

There are situation in which you’ll be using a Camel-Kafka-Connector source connector and sink connector together. Sometimes you’ll need to remove some of the Camel headers to achieve a particular behavior. This option should be used in this case. The value of this option can be a list of headers or a regexp.

In the camel-kafka-connector-examples repository we provided a little example based on 0.7.0 SNAPSHOT version.

### New Archetypes

• Apicurio Archetype: To be able to leveraging the apicurio service registry
• Dataformat Archetype: To extend a connector by adding a dataformat to it

### Improved integration tests

We improved the integration tests by leveraging the really good work by Otavio R. Piske on the camel-test-infra platform. We are now basing the integration on the camel-test-infra modules coming from the Camel release.

### Documentation

On the Archetypes side we added more documentation about the base extensible connector and on the apicurio archetype and dataformat archetype (added in this release)

### Strimzi and Kafka Version

This release has been based on Kafka 2.6.0 and Strimzi 0.20.1

### Examples

We added a bunch of new examples to the camel-kafka-connector-examples repository

### What to expect

We are already working on the next release based on the upcoming 3.8.0 Camel release. We are working on improving the documentation and adding new stuff.

# Introducing Idempotency Support in Camel Kafka Connector

In the next Camel Kafka connector release (0.7.0) there will be a new feature: the idempotency support on both source and sink connectors. The aim of this post is giving some hints on how and when to use the idempotency feature.

### What is Idempotency?

The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages: it essentially acts like a Message Filter to filter out duplicates, as reported in the Camel documentation

From the Enterprise Integration Patterns documentation: Sometimes the same message gets delivered more than once, either because the messaging system is not certain the message has been successfully delivered yet, or because the Message Channel’s quality-of-service has been lowered to improve performance. Message receivers, on the other hand, tend to assume that each message will be delivered exactly once, and tend to cause problems when they repeat processing because of repeat messages. A receiver designed as an Idempotent Receiver handles duplicate messages and prevents them from causing problems in the receiver application.

This is a very useful feature in the integration world and it is an important new feature in the camel-kafka-connector project. Apache Camel provides multiple implementation of the Idempotent Consumer, in Camel-Kafka-connector we’ll support the in Memory and Kafka implementations.

### When to use idempotency

Suppose you’re using a source connector of any kind. By using the idempotency feature you’ll be able to avoid consuming the same message multiple times.

This means, in the Kafkish language, you won’t ingest the same payload multiple times in the target Kafka topic. This is something critical on the cloud for example, where you’ll pay for each API operation and for using increasing storage.

Now lets think about the sink connector scenario.

In this case, we’ll stream out of a Kafka topic multiple records, transform/convert/manipulate them and send them to an external system, like a messaging broker, a storage infra, a database etc.

In the Kafka topic used as source we may have multiple repeated records with the same payload or same metadata. Based on this information we can choose to skip the same records while sending data to the external system and for doing this we can leverage the idempotency feature of ckc.

### Camel-Kafka-connector idempotency configuration

The idempotency feature can be enabled through a number of configuration options available in ckc with the 0.7.0 release. In particular we are talking about:

Name Description Default
camel.idempotency.enabled If idempotency must be enabled or not false
camel.idempotency.repository.type The idempotent repository type to use, possible values are memory and kafka memory
camel.idempotency.expression.type How the idempotency will be evaluated: possible values are body and header body
camel.idempotency.memory.dimension The Memory dimension of the in memory idempotent Repository 100
camel.idempotency.kafka.topic The Kafka topic name to use for the idempotent repository kafka_idempotent_repository
camel.idempotency.kafka.bootstrap.servers A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live localhost:9092
camel.idempotency.kafka.max.cache.size Sets the maximum size of the local key cache 1000
camel.idempotency.kafka.poll.duration.ms Sets the poll duration (in milliseconds) of the Kafka consumer 100

The in-memory approach has been provided for short running connector workload, while the kafka one is for long running/interruptable connector.

The table is self-explaining.

A typical configuration for the kafka idempotent repository approach could be:

camel.idempotency.enabled=true
camel.idempotency.repository.type=kafka
camel.idempotency.expression.type=body
camel.idempotency.kafka.topic=my.idempotency.topic
camel.idempotency.kafka.bootstrap.servers=localhost:9092
camel.idempotency.kafka.max.cache.size=1500
camel.idempotency.kafka.poll.duration.ms=150


Some of the options can be used with their default value, in this example we’re just listing them for a Kafka idempotent repository configuration.

### A real example

The best way to show how the idempotency feature works, in camel-kafka-connector, it’s through an example. We’ll use the AWS2-S3 Source connector

As first step you’ll need to fully build the Camel-Kafka-connector project and install the connectors/camel-aws2-s3-kafka-connector zip package in your Kafka Broker plugin.path. Once the connector is in the plugin.path location, just unzip it. We describe how to build and unpack in the next steps:

You’ll need to setup the plugin.path property in your kafka

Open the $KAFKA_HOME/config/connect-standalone.properties and set the plugin.path property to your choosen location In this example we’ll use /home/connectors/ > cd <ckc_project> > mvn clean package > cp <ckc_project>/connectors/camel-aws2-s3-kafka-connector/target/camel-aws2-s3-kafka-connector-0.7.0-SNAPSHOT-package.zip /home/connectors/ > cd /home/connectors/ > unzip camel-aws2-s3-kafka-connector-0.7.0-SNAPSHOT-package.zip  The configuration for the source connector should be like: name=CamelAWS2S3SourceConnector connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter camel.source.maxPollDuration=10000 topics=s3.source.topic camel.source.path.bucketNameOrArn=camel-kafka-connector camel.source.endpoint.deleteAfterRead=false camel.component.aws2-s3.access-key=xxxx camel.component.aws2-s3.secret-key=yyyy camel.component.aws2-s3.region=eu-west-1 camel.idempotency.enabled=true camel.idempotency.repository.type=kafka camel.idempotency.expression.type=body camel.idempotency.kafka.topic=my.idempotency.topic  Don’t forget to add the correct credentials for your AWS account. We can call the configuration file s3-source.properties for example At this point we can run the connector. >$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties s3-source.properties  You have to have a running kafka cluster for this purpose. In your camel-kafka-connector bucket, try to load two files (test1.txt and test2.txt) with the same content, for example “Camel-Kafka-connector rocks”. And consume from the s3.source.topic through kafkacat > kafkacat -b localhost:9092 -t s3.source.topic % Auto-selecting Consumer mode (use -P or -C to override) % Reached end of topic s3.source.topic [0] at offset 0 Camel-Kafka-connector rocks % Reached end of topic s3.source.topic [0] at offset 1  The body of the second file was discarded and you just have one message in the topic. You can also have a look at the my.idempotency.topic content > kafkacat -b localhost:9092 -t my.idempotency.topic -f 'Value:%s\nKey:%k\n' % Auto-selecting Consumer mode (use -P or -C to override) Value:add Key:Camel-Kafka-connector rocks  We have just one operation of add with the body of the message as key. ### Conclusion This is just a little introduction on the camel-kafka-connector idempotency support. There are more case to cover and probably more work to be done. I just wanted to show something new in the camel-kafka-connector world. Feedback are welcome as always. # Introducing Camel AWS2 Eventbridge In Camel 3.6.0 we will introduce the camel-aws2-eventbridge among others new cool components. The aim of this blog post is showing what you can do with the Eventbridge AWS Service and the related camel component. ### What is AWS Eventbridge? The definition from the AWS official website is the following: Amazon EventBridge is a serverless event bus that makes it easy to connect applications together using data from your own applications, integrated Software-as-a-Service (SaaS) applications, and AWS services. EventBridge delivers a stream of real-time data from event sources, such as Zendesk, Datadog, or Pagerduty, and routes that data to targets like AWS Lambda. You can set up routing rules to determine where to send your data to build application architectures that react in real time to all of your data sources. So basically you can listen for events on your bus, which can be the default event bus (the AWS one) or a custom event bus. The events coming from the bus can be send to AWS services, like an SQS queue, SNS topic or an S3 bucket. Obviously events can trigger actions. For each event bus, you can set rules: each rule specify a target action to take when EventBridge receives an event that matches the rule. When an event matches the rule, EventBridge sends the event to the specified target and triggers the action defined in the rule. All of this can be done from AWS Console UI or from code, indifferently. ### What is the component structure? The AWS2-Eventbridge component act as producer-only component. At the moment of writing the operations you can do are the following: • putRule • putTargets • removeTargets • deleteRule • enableRule • disableRule • listRules • describeRule • listTargetsByRule • listRuleNamesByTarget The interesting operations are for sure the putRule and putTargets rule. I set up a little example in the camel-examples repository, showing what can be done, but we’ll talk about this later. ### Eventbridge Rules, Events Pattern and Targets The important parts of an Eventbridge rule are the event pattern and the targets. An event in AWS looks in this way: { "version": "0", "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718", "detail-type": "EC2 Instance State-change Notification", "source": "aws.ec2", "account": "111122223333", "time": "2017-12-22T18:43:48Z", "region": "us-west-1", "resources": [ "arn:aws:ec2:us-west-1:123456789012:instance/ i-1234567890abcdef0" ], "detail": { "instance-id": " i-1234567890abcdef0", "state": "terminated" } }  This is, for example, an event notifying the state-change of an EC2 instance. An event pattern is similar to an event. They have the same structure. Event patterns look much like the events they are filtering. So you can write your own event pattern like this one { "source": [ "aws.ec2" ], "detail-type": [ "EC2 Instance State-change Notification" ], "detail": { "state": [ "terminated" ] } }  In this case we want to get a notification of all the EC2 instances termination in our account. But you can also would like to know when an EC2 is running again for example. { "source": [ "aws.ec2" ], "detail-type": [ "EC2 Instance State-change Notification" ], "detail": { "state": [ "terminated", "running" ] } }  Now that we know what kind of events we want, we need to set up targets to be able to consume them. The request syntax for adding a target to a rule is the following { "EventBusName": "string", "Rule": "string", "Targets": [ { "Arn": "string", "BatchParameters": { "ArrayProperties": { "Size": number }, "JobDefinition": "string", "JobName": "string", "RetryStrategy": { "Attempts": number } }, "DeadLetterConfig": { "Arn": "string" }, "EcsParameters": { "Group": "string", "LaunchType": "string", "NetworkConfiguration": { "awsvpcConfiguration": { "AssignPublicIp": "string", "SecurityGroups": [ "string" ], "Subnets": [ "string" ] } }, "PlatformVersion": "string", "TaskCount": number, "TaskDefinitionArn": "string" }, "HttpParameters": { "HeaderParameters": { "string" : "string" }, "PathParameterValues": [ "string" ], "QueryStringParameters": { "string" : "string" } }, "Id": "string", "Input": "string", "InputPath": "string", "InputTransformer": { "InputPathsMap": { "string" : "string" }, "InputTemplate": "string" }, "KinesisParameters": { "PartitionKeyPath": "string" }, "RedshiftDataParameters": { "Database": "string", "DbUser": "string", "SecretManagerArn": "string", "Sql": "string", "StatementName": "string", "WithEvent": boolean }, "RetryPolicy": { "MaximumEventAgeInSeconds": number, "MaximumRetryAttempts": number }, "RoleArn": "string", "RunCommandParameters": { "RunCommandTargets": [ { "Key": "string", "Values": [ "string" ] } ] }, "SqsParameters": { "MessageGroupId": "string" } } ] }  As you may see there are many parameters. We just need to set the rule name and add our target. ### Creating a rule through Camel As I said there is a little example in the camel-examples repository based on camel-aws2-eventbridge, camel-aws2-s3 and camel-aws2-sqs. Creating a rule in Camel is as easy as writing this little snippet. public class MyRouteBuilder extends EndpointRouteBuilder { @Override public void configure() throws Exception { from(timer("fire").repeatCount("1")) .setHeader(EventbridgeConstants.RULE_NAME, constant("s3-events-rule")) .to(aws2Eventbridge("default") .operation(EventbridgeOperations.putRule) .eventPatternFile("file:src/main/resources/eventpattern.json")) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(EventbridgeConstants.RULE_NAME, "s3-events-rule"); Target target = Target.builder().id("sqs-queue").arn("arn:aws:sqs:eu-west-1:780410022477:camel-connector-test") .build(); List<Target> targets = new ArrayList<Target>(); targets.add(target); exchange.getIn().setHeader(EventbridgeConstants.TARGETS, targets); } }) .to(aws2Eventbridge("default") .operation(EventbridgeOperations.putTargets)) .log("All set, enjoy!"); } }  In this route we are creating a single rule, called s3-events-rule, by using an eventpattern.json file. The target of this rule is the arn:aws:sqs:eu-west-1:780410022477:camel-connector-test and the target Id is sqs-queue. This means we are pointing an SQS queue called camel-connector-test. All of this will be done on the default event bus, which is the AWS event bus. What we have in the eventpattern.json file: { "source": [ "aws.s3" ], "detail": { "eventSource": [ "s3.amazonaws.com" ], "eventName": [ "DeleteBucket", "DeleteBucketCors", "DeleteBucketLifecycle", "DeleteBucketPolicy", "DeleteBucketReplication", "DeleteBucketTagging", "DeleteBucketWebsite", "CreateBucket", "PutBucketAcl", "PutBucketCors", "PutBucketLifecycle", "PutBucketPolicy", "PutBucketLogging", "PutBucketNotification", "PutBucketReplication", "PutBucketTagging", "PutBucketRequestPayment", "PutBucketVersioning", "PutBucketWebsite", "PutBucketEncryption", "DeleteBucketEncryption", "DeleteBucketPublicAccessBlock", "PutBucketPublicAccessBlock" ] } }  We want to be informed on the list of events. The eventpattern json can be built by hand, but also through the AWS console UI, through a series of dropdown menus during the rule creation. One important note on the usage of AWS Eventbridge is the following: to create a rule that triggers on an action by an AWS service that does not emit events, you can base the rule on API calls made by that service. The API calls are recorded by AWS CloudTrail, so you’ll need to have CloudTrail enabled. In this way you’ll be notified anyway. Through an AWS SQS consumer, we should be able to consume the events coming in from the eventbridge. public class MyRouteBuilder extends EndpointRouteBuilder { @Override public void configure() throws Exception { from(aws2Sqs("").deleteAfterRead(true)) .log("${body}");
}
}


The sqs-queue-name is camel-connector-test in this example. The property is defined in an application.properties file. All is well explained in the example anyway.

We can now try to create events to consume. Through the following route:

public class MyRouteBuilder extends EndpointRouteBuilder {

@Override
public void configure() throws Exception {

from(timer("fire").repeatCount("1"))
.setBody(constant("Camel rocks"))
.to(aws2S3("").keyName("firstfile"));
}
}


In this case the bucketName will be the name of a not already created bucket. In my example I was using camel-bucket-12567. The aws2-s3 has the autocreateBucket option set to true by default, so it will be created during the route execution and the event will be created.

In the terminal of the SQS consumer you should see a CreateBucket event logged.

14:08:16.585 [Camel (AWS2-SQS-Consumer) thread #0 - aws2-sqs://camel-connector-test] INFO  route1 - {"version":"0","id":"a79c33f3-fd64-481c-7964-8929b26ac2ae","detail-type":"AWS API Call via CloudTrail","source":"aws.s3","account":"xxxx","time":"2020-10-16T12:08:12Z","region":"eu-west-1","resources":[],"detail":{"eventVersion":"1.05","userIdentity":{"type":"xxx","principalId":"xxx","arn":"arn:xxx","accountId":"xxx","accessKeyId":"xxx"},"eventTime":"2020-10-16T12:08:12Z","eventSource":"s3.amazonaws.com","eventName":"CreateBucket","awsRegion":"eu-west-1","sourceIPAddress":"xxx","userAgent":"[aws-sdk-java/2.15.8 Linux/3.10.0-1127.19.1.el7.x86_64 OpenJDK_64-Bit_Server_VM/25.252-b09 Java/1.8.0_252 vendor/AdoptOpenJDK io/sync http/Apache]","requestParameters":{"CreateBucketConfiguration":{"LocationConstraint":"eu-west-1","xmlns":"http://s3.amazonaws.com/doc/2006-03-01/"},"bucketName":"camel-bucket-12567","Host":"camel-bucket-12567.s3.eu-west-1.amazonaws.com"}
.
.
.


You can also try to delete the bucket from the AWS Console too and you should get a message like this one:

13:42:55.560 [Camel (AWS2-SQS-Consumer) thread #0 - aws2-sqs://camel-connector-test] INFO  route1 - {"version":"0","id":"f8f289ab-bb8f-65c5-0bf6-a4929333bc4c","detail-type":"AWS API Call via CloudTrail","source":"aws.s3","account":"xxx","time":"2020-10-16T11:42:33Z","region":"eu-west-1","resources":[],"detail":{"eventVersion":"1.05","userIdentity":{"type":"xxx","principalId":"xxx","arn":"arn:xxx","accountId":"xxx","accessKeyId":"xxxx","sessionContext":{"sessionIssuer":{},"webIdFederationData":{}}},"eventTime":"2020-10-16T11:42:33Z","eventSource":"s3.amazonaws.com","eventName":"DeleteBucket","awsRegion":"eu-west-1","sourceIPAddress":"xxxx","userAgent":"[S3Console/0.4, aws-internal/3 aws-sdk-java/1.11.783 Linux/4.9.217-0.3.ac.206.84.332.metal1.x86_64 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 vendor/Oracle_Corporation]","requestParameters":{"bucketName":"camel-bucket-12567","Host":"s3-eu-west-1.amazonaws.com"},"responseElements":null ...
.
.
.


As you may see the userAgent is different in this case.

### Conclusion

This is just a really basic example, but you may have triggered an action on receiving the events, like for example adding a bucketPolicy to the newly created bucket. AWS Eventbridge is for sure an interesting service. The camel component can be improved by better supporting not only the default event bus of AWS Services but also external buses. We are working on that and on expanding the AWS services supported at the same time: you’re welcome to help!

# Camel AWS2 Components are here, what are the changes for end users?

In Camel 3.2.0 we’ll release the complete set of Camel AWS2 components. In Camel 3.1.0 we already have a bunch of AWS2 components living together with the original AWS components. The aim of this post is giving a full perspective of what will change for the end users and the roadmap for new features.

### New components

Except camel-aws-xray, which is a particular component needing much more work to be migrated, all the original AWS components have been migrated to AWS SDK v2. The original set of supported components was:

• camel-aws-cw
• camel-aws-ddb
• camel-aws-ec2
• camel-aws-ecs
• camel-aws-eks
• camel-aws-iam
• camel-aws-kinesis
• camel-aws-kms
• camel-aws-lambda
• camel-aws-mq
• camel-aws-msk
• camel-aws-s3
• camel-aws-sdb
• camel-aws-ses
• camel-aws-sns
• camel-aws-sqs
• camel-aws-swf
• camel-aws-translate
• camel-aws-xray

We now have also:

• camel-aws2-cw
• camel-aws2-ddb
• camel-aws2-ec2
• camel-aws2-ecs
• camel-aws2-eks
• camel-aws2-iam
• camel-aws2-kinesis
• camel-aws2-kms
• camel-aws2-lambda
• camel-aws2-mq
• camel-aws2-msk
• camel-aws2-s3
• camel-aws2-ses
• camel-aws2-sns
• camel-aws2-sqs
• camel-aws2-translate

### Moving from v1 to v2

For the end users, nothing will change, except the needed model classes coming from the SDK v2. So migrating should be a matter of changing imports and scheme. The first effort for the community was being able to obtain features parity between old and new components, and we did that. The AWS2-S3 components is still under heavy development, but the basic features are still the same. The Camel community really expects feedback from end users before deprecating the original AWS components. We won’t deprecate them until we’ll be totally sure the community is totally happy with the AWS2 support.

### Supported Platforms

Actually the AWS2 components are not supported in OSGi containers. We need to work on the needed bundles first but this will be an optional effort and we’ll focus on it if there will be real interest from the community. Camel Spring Boot starters for AWS2 components are already developed and generated in the Camel-spring-boot Repository. There is an ongoing effort for supporting them on Quarkus too, through the Camel-Quarkus project and we are creating new examples for the Camel-kafka-connector project.

As already said, our plan is to deprecate the original AWS components, but we’ll do this only after the community will confirm the features parity and we’ll have enough feedback. So, please, test them and report bugs, improvements and whatever you’ve in mind while using them. We are working on more components to add for AWS2 components family, like Athena, RDS, DocumentDB and more. At the same time, we’d like to improve the existing AWS2 components, by supporting the new features coming in the new SDK, like the non-blocking I/O and the ability to plug in a different HTTP implementation at run time (actually we are using only the Apache-client one). So there is still a lot of work to do and this is a good area for getting started and starting your contributions history to Apache Camel. We’re waiting for you!

# Webinar - What's new in Apache Camel 3?

On March 3rd, at 3pm CET (Central European Timezone) I will co-host, with my good friend and colleague Claus Ibsen, a webinar session of an 1 hour with title ‘Apache Camel 3 is here: What’s new?’

Together with Claus we will going deep in to the details of Apache Camel high level goals, and focus on the optimizations done for making Camel lighter and faster for the cloud. So we’ll have a look at the internal details.

We’ll talk also about the ecosystem we’re building around Apache Camel, with Camel-K, Camel-Quarkus and latest Camel-Kafka-connector: there will be 4 different little demos in the webinar.

As you may expect at the end of the webinar there will be a Q&A session with us, so you’ll get answers to a lot of the questions you have.

Apache Camel 3.1.0 has been released last week and the webinar will be focused on that and on the Camel’s ecosystem roadmap.

You’re very welcome to attend :-)

The webinar is free but requires a registration. You can find more details at the Registration page.