Introducing Idempotency Support in Camel Kafka Connector

In the next Camel Kafka connector release (0.7.0) there will be a new feature: the idempotency support on both source and sink connectors. The aim of this post is giving some hints on how and when to use the idempotency feature.

What is Idempotency?

The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages: it essentially acts like a Message Filter to filter out duplicates, as reported in the Camel documentation

From the Enterprise Integration Patterns documentation: Sometimes the same message gets delivered more than once, either because the messaging system is not certain the message has been successfully delivered yet, or because the Message Channel’s quality-of-service has been lowered to improve performance. Message receivers, on the other hand, tend to assume that each message will be delivered exactly once, and tend to cause problems when they repeat processing because of repeat messages. A receiver designed as an Idempotent Receiver handles duplicate messages and prevents them from causing problems in the receiver application.

This is a very useful feature in the integration world and it is an important new feature in the camel-kafka-connector project. Apache Camel provides multiple implementation of the Idempotent Consumer, in Camel-Kafka-connector we’ll support the in Memory and Kafka implementations.

When to use idempotency

Suppose you’re using a source connector of any kind. By using the idempotency feature you’ll be able to avoid consuming the same message multiple times.

This means, in the Kafkish language, you won’t ingest the same payload multiple times in the target Kafka topic. This is something critical on the cloud for example, where you’ll pay for each API operation and for using increasing storage.

Now lets think about the sink connector scenario.

In this case, we’ll stream out of a Kafka topic multiple records, transform/convert/manipulate them and send them to an external system, like a messaging broker, a storage infra, a database etc.

In the Kafka topic used as source we may have multiple repeated records with the same payload or same metadata. Based on this information we can choose to skip the same records while sending data to the external system and for doing this we can leverage the idempotency feature of ckc.

Camel-Kafka-connector idempotency configuration

The idempotency feature can be enabled through a number of configuration options available in ckc with the 0.7.0 release. In particular we are talking about:

Name Description Default
camel.idempotency.enabled If idempotency must be enabled or not false
camel.idempotency.repository.type The idempotent repository type to use, possible values are memory and kafka memory
camel.idempotency.expression.type How the idempotency will be evaluated: possible values are body and header body
camel.idempotency.expression.header The header name that will be evaluated in case of camel.idempotency.expression.type equals to header null
camel.idempotency.memory.dimension The Memory dimension of the in memory idempotent Repository 100
camel.idempotency.kafka.topic The Kafka topic name to use for the idempotent repository kafka_idempotent_repository
camel.idempotency.kafka.bootstrap.servers A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live localhost:9092
camel.idempotency.kafka.max.cache.size Sets the maximum size of the local key cache 1000
camel.idempotency.kafka.poll.duration.ms Sets the poll duration (in milliseconds) of the Kafka consumer 100

The in-memory approach has been provided for short running connector workload, while the kafka one is for long running/interruptable connector.

The table is self-explaining.

A typical configuration for the kafka idempotent repository approach could be:

camel.idempotency.enabled=true
camel.idempotency.repository.type=kafka
camel.idempotency.expression.type=body
camel.idempotency.kafka.topic=my.idempotency.topic
camel.idempotency.kafka.bootstrap.servers=localhost:9092
camel.idempotency.kafka.max.cache.size=1500
camel.idempotency.kafka.poll.duration.ms=150

Some of the options can be used with their default value, in this example we’re just listing them for a Kafka idempotent repository configuration.

A real example

The best way to show how the idempotency feature works, in camel-kafka-connector, it’s through an example. We’ll use the AWS2-S3 Source connector

As first step you’ll need to fully build the Camel-Kafka-connector project and install the connectors/camel-aws2-s3-kafka-connector zip package in your Kafka Broker plugin.path. Once the connector is in the plugin.path location, just unzip it. We describe how to build and unpack in the next steps:

You’ll need to setup the plugin.path property in your kafka

Open the $KAFKA_HOME/config/connect-standalone.properties

and set the plugin.path property to your choosen location

In this example we’ll use /home/connectors/

> cd <ckc_project> 
> mvn clean package
> cp <ckc_project>/connectors/camel-aws2-s3-kafka-connector/target/camel-aws2-s3-kafka-connector-0.7.0-SNAPSHOT-package.zip /home/connectors/
> cd /home/connectors/
> unzip camel-aws2-s3-kafka-connector-0.7.0-SNAPSHOT-package.zip

The configuration for the source connector should be like:

name=CamelAWS2S3SourceConnector
connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

camel.source.maxPollDuration=10000

topics=s3.source.topic

camel.source.path.bucketNameOrArn=camel-kafka-connector

camel.source.endpoint.deleteAfterRead=false

camel.component.aws2-s3.access-key=xxxx
camel.component.aws2-s3.secret-key=yyyy
camel.component.aws2-s3.region=eu-west-1

camel.idempotency.enabled=true
camel.idempotency.repository.type=kafka
camel.idempotency.expression.type=body
camel.idempotency.kafka.topic=my.idempotency.topic

Don’t forget to add the correct credentials for your AWS account.

We can call the configuration file s3-source.properties for example

At this point we can run the connector.

> $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties s3-source.properties

You have to have a running kafka cluster for this purpose.

In your camel-kafka-connector bucket, try to load two files (test1.txt and test2.txt) with the same content, for example “Camel-Kafka-connector rocks”.

And consume from the s3.source.topic through kafkacat

> kafkacat -b localhost:9092 -t s3.source.topic
% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic s3.source.topic [0] at offset 0
Camel-Kafka-connector rocks
% Reached end of topic s3.source.topic [0] at offset 1

The body of the second file was discarded and you just have one message in the topic.

You can also have a look at the my.idempotency.topic content

> kafkacat -b localhost:9092 -t my.idempotency.topic -f 'Value:%s\nKey:%k\n'
% Auto-selecting Consumer mode (use -P or -C to override)
Value:add
Key:Camel-Kafka-connector rocks

We have just one operation of add with the body of the message as key.

Conclusion

This is just a little introduction on the camel-kafka-connector idempotency support. There are more case to cover and probably more work to be done. I just wanted to show something new in the camel-kafka-connector world. Feedback are welcome as always.

Introducing Camel AWS2 Eventbridge

In Camel 3.6.0 we will introduce the camel-aws2-eventbridge among others new cool components. The aim of this blog post is showing what you can do with the Eventbridge AWS Service and the related camel component.

What is AWS Eventbridge?

The definition from the AWS official website is the following:

Amazon EventBridge is a serverless event bus that makes it easy to connect applications together using data from your own applications, integrated Software-as-a-Service (SaaS) applications, and AWS services. EventBridge delivers a stream of real-time data from event sources, such as Zendesk, Datadog, or Pagerduty, and routes that data to targets like AWS Lambda. You can set up routing rules to determine where to send your data to build application architectures that react in real time to all of your data sources.

So basically you can listen for events on your bus, which can be the default event bus (the AWS one) or a custom event bus. The events coming from the bus can be send to AWS services, like an SQS queue, SNS topic or an S3 bucket. Obviously events can trigger actions.

For each event bus, you can set rules: each rule specify a target action to take when EventBridge receives an event that matches the rule. When an event matches the rule, EventBridge sends the event to the specified target and triggers the action defined in the rule.

All of this can be done from AWS Console UI or from code, indifferently.

What is the component structure?

The AWS2-Eventbridge component act as producer-only component. At the moment of writing the operations you can do are the following:

  • putRule
  • putTargets
  • removeTargets
  • deleteRule
  • enableRule
  • disableRule
  • listRules
  • describeRule
  • listTargetsByRule
  • listRuleNamesByTarget

The interesting operations are for sure the putRule and putTargets rule.

I set up a little example in the camel-examples repository, showing what can be done, but we’ll talk about this later.

Eventbridge Rules, Events Pattern and Targets

The important parts of an Eventbridge rule are the event pattern and the targets.

An event in AWS looks in this way:

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "resources": [
    "arn:aws:ec2:us-west-1:123456789012:instance/ i-1234567890abcdef0"
],
"detail": {
  "instance-id": " i-1234567890abcdef0",
  "state": "terminated"
  }
}

This is, for example, an event notifying the state-change of an EC2 instance.

An event pattern is similar to an event. They have the same structure. Event patterns look much like the events they are filtering.

So you can write your own event pattern like this one

{
  "source": [ "aws.ec2" ],
  "detail-type": [ "EC2 Instance State-change Notification" ],
  "detail": {
    "state": [ "terminated" ]
  }
}

In this case we want to get a notification of all the EC2 instances termination in our account.

But you can also would like to know when an EC2 is running again for example.

{
  "source": [ "aws.ec2" ],
  "detail-type": [ "EC2 Instance State-change Notification" ],
  "detail": {
    "state": [ "terminated", "running" ]
  }
}

Now that we know what kind of events we want, we need to set up targets to be able to consume them.

The request syntax for adding a target to a rule is the following

{
   "EventBusName": "string",
   "Rule": "string",
   "Targets": [ 
      { 
         "Arn": "string",
         "BatchParameters": { 
            "ArrayProperties": { 
               "Size": number
            },
            "JobDefinition": "string",
            "JobName": "string",
            "RetryStrategy": { 
               "Attempts": number
            }
         },
         "DeadLetterConfig": { 
            "Arn": "string"
         },
         "EcsParameters": { 
            "Group": "string",
            "LaunchType": "string",
            "NetworkConfiguration": { 
               "awsvpcConfiguration": { 
                  "AssignPublicIp": "string",
                  "SecurityGroups": [ "string" ],
                  "Subnets": [ "string" ]
               }
            },
            "PlatformVersion": "string",
            "TaskCount": number,
            "TaskDefinitionArn": "string"
         },
         "HttpParameters": { 
            "HeaderParameters": { 
               "string" : "string" 
            },
            "PathParameterValues": [ "string" ],
            "QueryStringParameters": { 
               "string" : "string" 
            }
         },
         "Id": "string",
         "Input": "string",
         "InputPath": "string",
         "InputTransformer": { 
            "InputPathsMap": { 
               "string" : "string" 
            },
            "InputTemplate": "string"
         },
         "KinesisParameters": { 
            "PartitionKeyPath": "string"
         },
         "RedshiftDataParameters": { 
            "Database": "string",
            "DbUser": "string",
            "SecretManagerArn": "string",
            "Sql": "string",
            "StatementName": "string",
            "WithEvent": boolean
         },
         "RetryPolicy": { 
            "MaximumEventAgeInSeconds": number,
            "MaximumRetryAttempts": number
         },
         "RoleArn": "string",
         "RunCommandParameters": { 
            "RunCommandTargets": [ 
               { 
                  "Key": "string",
                  "Values": [ "string" ]
               }
            ]
         },
         "SqsParameters": { 
            "MessageGroupId": "string"
         }
      }
   ]
}

As you may see there are many parameters. We just need to set the rule name and add our target.

Creating a rule through Camel

As I said there is a little example in the camel-examples repository based on camel-aws2-eventbridge, camel-aws2-s3 and camel-aws2-sqs.

Creating a rule in Camel is as easy as writing this little snippet.

public class MyRouteBuilder extends EndpointRouteBuilder {

    @Override
    public void configure() throws Exception {

        from(timer("fire").repeatCount("1"))
        .setHeader(EventbridgeConstants.RULE_NAME, constant("s3-events-rule"))
        .to(aws2Eventbridge("default")
        		.operation(EventbridgeOperations.putRule)
        		.eventPatternFile("file:src/main/resources/eventpattern.json"))
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                exchange.getIn().setHeader(EventbridgeConstants.RULE_NAME, "s3-events-rule");
                Target target = Target.builder().id("sqs-queue").arn("arn:aws:sqs:eu-west-1:780410022477:camel-connector-test")
                        .build();
                List<Target> targets = new ArrayList<Target>();
                targets.add(target);
                exchange.getIn().setHeader(EventbridgeConstants.TARGETS, targets);
            }
        })
        .to(aws2Eventbridge("default")
        		.operation(EventbridgeOperations.putTargets))
        .log("All set, enjoy!");
    }
}

In this route we are creating a single rule, called s3-events-rule, by using an eventpattern.json file. The target of this rule is the arn:aws:sqs:eu-west-1:780410022477:camel-connector-test and the target Id is sqs-queue. This means we are pointing an SQS queue called camel-connector-test. All of this will be done on the default event bus, which is the AWS event bus.

What we have in the eventpattern.json file:

{
  "source": [
    "aws.s3"
  ],
  "detail": {
    "eventSource": [
      "s3.amazonaws.com"
    ],
    "eventName": [
      "DeleteBucket",
      "DeleteBucketCors",
      "DeleteBucketLifecycle",
      "DeleteBucketPolicy",
      "DeleteBucketReplication",
      "DeleteBucketTagging",
      "DeleteBucketWebsite",
      "CreateBucket",
      "PutBucketAcl",
      "PutBucketCors",
      "PutBucketLifecycle",
      "PutBucketPolicy",
      "PutBucketLogging",
      "PutBucketNotification",
      "PutBucketReplication",
      "PutBucketTagging",
      "PutBucketRequestPayment",
      "PutBucketVersioning",
      "PutBucketWebsite",
      "PutBucketEncryption",
      "DeleteBucketEncryption",
      "DeleteBucketPublicAccessBlock",
      "PutBucketPublicAccessBlock"
    ]
  }
}

We want to be informed on the list of events. The eventpattern json can be built by hand, but also through the AWS console UI, through a series of dropdown menus during the rule creation.

One important note on the usage of AWS Eventbridge is the following: to create a rule that triggers on an action by an AWS service that does not emit events, you can base the rule on API calls made by that service. The API calls are recorded by AWS CloudTrail, so you’ll need to have CloudTrail enabled. In this way you’ll be notified anyway.

Through an AWS SQS consumer, we should be able to consume the events coming in from the eventbridge.

public class MyRouteBuilder extends EndpointRouteBuilder {

    @Override
    public void configure() throws Exception {

        from(aws2Sqs("").deleteAfterRead(true))
        .log("${body}");
    }
}

The sqs-queue-name is camel-connector-test in this example. The property is defined in an application.properties file. All is well explained in the example anyway.

We can now try to create events to consume. Through the following route:

public class MyRouteBuilder extends EndpointRouteBuilder {

    @Override
    public void configure() throws Exception {

    	from(timer("fire").repeatCount("1"))
    	.setBody(constant("Camel rocks"))
    	.to(aws2S3("").keyName("firstfile"));
    }
}

In this case the bucketName will be the name of a not already created bucket. In my example I was using camel-bucket-12567. The aws2-s3 has the autocreateBucket option set to true by default, so it will be created during the route execution and the event will be created.

In the terminal of the SQS consumer you should see a CreateBucket event logged.

14:08:16.585 [Camel (AWS2-SQS-Consumer) thread #0 - aws2-sqs://camel-connector-test] INFO  route1 - {"version":"0","id":"a79c33f3-fd64-481c-7964-8929b26ac2ae","detail-type":"AWS API Call via CloudTrail","source":"aws.s3","account":"xxxx","time":"2020-10-16T12:08:12Z","region":"eu-west-1","resources":[],"detail":{"eventVersion":"1.05","userIdentity":{"type":"xxx","principalId":"xxx","arn":"arn:xxx","accountId":"xxx","accessKeyId":"xxx"},"eventTime":"2020-10-16T12:08:12Z","eventSource":"s3.amazonaws.com","eventName":"CreateBucket","awsRegion":"eu-west-1","sourceIPAddress":"xxx","userAgent":"[aws-sdk-java/2.15.8 Linux/3.10.0-1127.19.1.el7.x86_64 OpenJDK_64-Bit_Server_VM/25.252-b09 Java/1.8.0_252 vendor/AdoptOpenJDK io/sync http/Apache]","requestParameters":{"CreateBucketConfiguration":{"LocationConstraint":"eu-west-1","xmlns":"http://s3.amazonaws.com/doc/2006-03-01/"},"bucketName":"camel-bucket-12567","Host":"camel-bucket-12567.s3.eu-west-1.amazonaws.com"}
.
.
.

You can also try to delete the bucket from the AWS Console too and you should get a message like this one:

13:42:55.560 [Camel (AWS2-SQS-Consumer) thread #0 - aws2-sqs://camel-connector-test] INFO  route1 - {"version":"0","id":"f8f289ab-bb8f-65c5-0bf6-a4929333bc4c","detail-type":"AWS API Call via CloudTrail","source":"aws.s3","account":"xxx","time":"2020-10-16T11:42:33Z","region":"eu-west-1","resources":[],"detail":{"eventVersion":"1.05","userIdentity":{"type":"xxx","principalId":"xxx","arn":"arn:xxx","accountId":"xxx","accessKeyId":"xxxx","sessionContext":{"sessionIssuer":{},"webIdFederationData":{}}},"eventTime":"2020-10-16T11:42:33Z","eventSource":"s3.amazonaws.com","eventName":"DeleteBucket","awsRegion":"eu-west-1","sourceIPAddress":"xxxx","userAgent":"[S3Console/0.4, aws-internal/3 aws-sdk-java/1.11.783 Linux/4.9.217-0.3.ac.206.84.332.metal1.x86_64 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 vendor/Oracle_Corporation]","requestParameters":{"bucketName":"camel-bucket-12567","Host":"s3-eu-west-1.amazonaws.com"},"responseElements":null ...
.
.
.

As you may see the userAgent is different in this case.

Conclusion

This is just a really basic example, but you may have triggered an action on receiving the events, like for example adding a bucketPolicy to the newly created bucket. AWS Eventbridge is for sure an interesting service. The camel component can be improved by better supporting not only the default event bus of AWS Services but also external buses. We are working on that and on expanding the AWS services supported at the same time: you’re welcome to help!

Camel AWS2 Components are here, what are the changes for end users?

In Camel 3.2.0 we’ll release the complete set of Camel AWS2 components. In Camel 3.1.0 we already have a bunch of AWS2 components living together with the original AWS components. The aim of this post is giving a full perspective of what will change for the end users and the roadmap for new features.

New components

Except camel-aws-xray, which is a particular component needing much more work to be migrated, all the original AWS components have been migrated to AWS SDK v2. The original set of supported components was:

  • camel-aws-cw
  • camel-aws-ddb
  • camel-aws-ec2
  • camel-aws-ecs
  • camel-aws-eks
  • camel-aws-iam
  • camel-aws-kinesis
  • camel-aws-kms
  • camel-aws-lambda
  • camel-aws-mq
  • camel-aws-msk
  • camel-aws-s3
  • camel-aws-sdb
  • camel-aws-ses
  • camel-aws-sns
  • camel-aws-sqs
  • camel-aws-swf
  • camel-aws-translate
  • camel-aws-xray

We now have also:

  • camel-aws2-cw
  • camel-aws2-ddb
  • camel-aws2-ec2
  • camel-aws2-ecs
  • camel-aws2-eks
  • camel-aws2-iam
  • camel-aws2-kinesis
  • camel-aws2-kms
  • camel-aws2-lambda
  • camel-aws2-mq
  • camel-aws2-msk
  • camel-aws2-s3
  • camel-aws2-ses
  • camel-aws2-sns
  • camel-aws2-sqs
  • camel-aws2-translate

Moving from v1 to v2

For the end users, nothing will change, except the needed model classes coming from the SDK v2. So migrating should be a matter of changing imports and scheme. The first effort for the community was being able to obtain features parity between old and new components, and we did that. The AWS2-S3 components is still under heavy development, but the basic features are still the same. The Camel community really expects feedback from end users before deprecating the original AWS components. We won’t deprecate them until we’ll be totally sure the community is totally happy with the AWS2 support.

Supported Platforms

Actually the AWS2 components are not supported in OSGi containers. We need to work on the needed bundles first but this will be an optional effort and we’ll focus on it if there will be real interest from the community. Camel Spring Boot starters for AWS2 components are already developed and generated in the Camel-spring-boot Repository. There is an ongoing effort for supporting them on Quarkus too, through the Camel-Quarkus project and we are creating new examples for the Camel-kafka-connector project.

Future and roadmap

As already said, our plan is to deprecate the original AWS components, but we’ll do this only after the community will confirm the features parity and we’ll have enough feedback. So, please, test them and report bugs, improvements and whatever you’ve in mind while using them. We are working on more components to add for AWS2 components family, like Athena, RDS, DocumentDB and more. At the same time, we’d like to improve the existing AWS2 components, by supporting the new features coming in the new SDK, like the non-blocking I/O and the ability to plug in a different HTTP implementation at run time (actually we are using only the Apache-client one). So there is still a lot of work to do and this is a good area for getting started and starting your contributions history to Apache Camel. We’re waiting for you!

Webinar - What's new in Apache Camel 3?

On March 3rd, at 3pm CET (Central European Timezone) I will co-host, with my good friend and colleague Claus Ibsen, a webinar session of an 1 hour with title ‘Apache Camel 3 is here: What’s new?’

Together with Claus we will going deep in to the details of Apache Camel high level goals, and focus on the optimizations done for making Camel lighter and faster for the cloud. So we’ll have a look at the internal details.

We’ll talk also about the ecosystem we’re building around Apache Camel, with Camel-K, Camel-Quarkus and latest Camel-Kafka-connector: there will be 4 different little demos in the webinar.

As you may expect at the end of the webinar there will be a Q&A session with us, so you’ll get answers to a lot of the questions you have.

Apache Camel 3.1.0 has been released last week and the webinar will be focused on that and on the Camel’s ecosystem roadmap.

You’re very welcome to attend :-)

The webinar is free but requires a registration. You can find more details at the Registration page.

Apache Camel 2.22.0 release, what's new

The Camel community released the new Apache Camel 2.22.0. This is a big release that arrives near to the 2.21.0 one, for a specific reason: supporting Spring Boot 2. Here is a list of the most important features:

  • Spring Boot Support

    Camel switched from Spring Boot v1 to v2 and therefore v1 is no longer supported.

  • Spring Support

    Upgraded to Spring Framework 5. Actually Camel is able to work with Spring 4.3.x as well, but going forward Spring 5.x will be the minimum Spring version in future releases.

  • Karaf Support

    Upgraded to Karaf 4.2. Camel is able to work with Karaf 4.1.x too, but we suggest to switch to Karaf 4.2.0

  • Rest DSL client request validation

    Rest DSL now supports client request validation to validate that Content-Type/Accept headers is possible for the rest service.

  • Optimised using toD DSL

    Allows to reuse endpoints and producers for components where its possible. For example HTTP based components will now reuse producer (http clients) with dynamic uris sending to the same host. See more details in the toD documentation.

  • New components

    • camel-as2 - Component used for transferring data secure and reliable over the internet using the AS2 protocol
    • camel-google-mail-stream - Component used for consuming email from your Gmail account in streaming mode.
    • camel-mybatis - Now has a mybatis-bean component that supports using MyBatis annotations on POJO beans to specify the SQL queries and mappings.
    • camel-micrometer - Component created for gathering metrics.
    • camel-rxjava2 - Component for supporting RxJava2 in Camel.
    • camel-service - Represents an endpoint which only becomes active when the CamelClusterView has the leadership.
    • camel-testcontainers - Component for testing purpose through containers.
    • camel-web3j - The web3j component uses the Web3j client API and allows you to add/read nodes to/from a web3j compliant content repositories.
  • Important change to consider

    • Unit testing Camel with Spring Boot and extending the base classes CamelTestSupport or CamelSpringTestSupport is now throwing an exception as this has never been intended/support. Instead use the CamelSpringBootRunner JUnit runner, and do not extend a base class.
    • The file consumer has changed to use readLock=none as default instead of readLock=markerFile. Documentation already indicated that readLock=none was the default.
    • The SEDA component now has a default queue size of 1000 instead of unlimited.

For more informations about the upcoming release you can read the Camel 2.22.0 Release page. We hope you’ll enjoy this release and obviously feedback are more than welcome, like Contributions. Feel free to test it and report bugs or enhancements on JIRA