Load properties from Vault/Secrets cloud services: an update

In Camel 3.16.0 we introduced the ability to load properties from vault and use them in the Camel context.

This post aims to show the updates and improvements we’ve done in the last two releases.

Supported Services

In 3.16.0 we’re supporting two of the main services available in the cloud space:

  • AWS Secret Manager
  • Google Cloud Secret Manager

In 3.19.0, to be released, we’re going to have four services available:

  • AWS Secret Manager
  • Google Cloud Secret Manager
  • Azure Key Vault
  • Hashicorp Vault

Setting up the Properties Function

Each of the Secret management cloud services require different parameters to complete authentication and authorization.

For both the Properties Functions currently available we provide two different approaches:

  • Environment variables
  • Main Configuration properties

You already have the information for AWS and GCP in the old blog post.

Let’s explore Azure Key Vault and Hashicorp Vault.

AWS Secrets Manager

The Azure Key Vault Properties Function configurations through enviroment variables are the following:

export $CAMEL_VAULT_AZURE_TENANT_ID=tenantId
export $CAMEL_VAULT_AZURE_CLIENT_ID=clientId
export $CAMEL_VAULT_AZURE_CLIENT_SECRET=clientSecret
export $CAMEL_VAULT_AZURE_VAULT_NAME=vaultName

While as Main Configuration properties it is possible to define the credentials through the following:

camel.vault.azure.tenantId = accessKey
camel.vault.azure.clientId = clientId
camel.vault.azure.clientSecret = clientSecret
camel.vault.azure.vaultName = vaultName

To recover a secret from azure you might run something like:

<camelContext>
    <route>
        <from uri="direct:start"/>
        <to uri="{{azure:route}}"/>
    </route>
</camelContext>

Hashicorp Vault

The Hashicorp Vault Properties Function configurations through enviroment variables are the following:

export $CAMEL_VAULT_HASHICORP_TOKEN=token
export $CAMEL_VAULT_HASHICORP_ENGINE=secretKey
export $CAMEL_VAULT_HASHICORP_HOST=host
export $CAMEL_VAULT_HASHICORP_PORT=port
export $CAMEL_VAULT_HASHICORP_SCHEME=http/https

While as Main Configuration properties it is possible to define the credentials through the following:

camel.vault.hashicorp.token = token
camel.vault.hashicorp.engine = engine
camel.vault.hashicorp.host = host
camel.vault.hashicorp.port = port
camel.vault.hashicorp.scheme = scheme

To recover a secret from Hashicorp Vault you might run something like:

<camelContext>
    <route>
        <from uri="direct:start"/>
        <to uri="{{hashicorp:route}}"/>
    </route>
</camelContext>

Multi fields Secrets and Default value

As for AWS Secrets Manager and Google Secrets Manager, the multi fields secrets and default value are both supported by Azure Key Vault and Hashicorp Vault Properties functions.

Versioning

In the next Camel version we are going to release the support for recovering a secret with a particular version. This will be supported by all the vault we currently support in Camel.

In particular you’ll be able to recover a specific version of a secrets with the following syntax.

<camelContext>
    <route>
        <from uri="direct:start"/>
        <log message="Username is {{hashicorp:database/username:admin@2}}"/>
    </route>
</camelContext>

In this example we’re going to recover the field username from the secret database, with version “2”. In case the version is not available, we’re going to have a default value of ‘admin’.

Future

We plan to work on the ability to reload the whole context once a secret has been rotated or updated. This is something still in the design phase, but we really would like to see it implemented soon.

Stay tuned for more news!

Camel 3.16.0 new feature: Load properties from Vault/Secrets cloud services

In the last weeks, together with Claus, we’ve been working on a new feature: loading properties from Vault/Secrets cloud services.

It will arrive with Camel 3.16.0, currently on vote and to be released by the end of this week (24/3).

This post introduces the new features and provide some examples.

Secrets Management in Camel

In the past there were many discussions around the possibility of managing secrets in Camel through Vault Services.

The hidden troubles are a lot when we talk about Secrets Management:

  • Ability to automatically retrieve secrets after a secret rotation has been completed
  • Writing the function (script, serverless function etc.) to operate the rotation
  • Being notified once a rotation happens

We choose to start from the beginning: retrieve secrets from a vault service and use them as properties in the Camel configuration.

Supported Services

In 3.16.0 we’re supporting two of the main services available in the cloud space:

  • AWS Secret Manager
  • Google Cloud Secret Manager

How it works

The Vault feature works by specifying a particular prefix while using the Properties component.

For example for AWS:

<camelContext>
    <route>
        <from uri="direct:start"/>
        <log message="Username is {{aws:username}}"/>
    </route>
</camelContext>

or

<camelContext>
    <route>
        <from uri="direct:start"/>
        <log message="Username is {{gcp:username}}"/>
    </route>
</camelContext>

This notation will allow to run the following workflow while starting a camel route:

  • Connect and authenticate to AWS Secret Manager (or GCP)
  • Retrieve the value related to the secret named username
  • Substitute the property with the secret value just returned

For using the particular Properties Function the two requirements are adding the camel-aws-secret-manager JAR for using the AWS one or adding the camel-google-secret-manager JAR for GCP and setting up the credentials to access the cloud service.

Setting up the Properties Function

Each of the Secret management cloud services require different parameters to complete authentication and authorization.

For both the Properties Functions currently available we provide two different approaches:

  • Environment variables
  • Main Configuration properties

AWS Secrets Manager

The AWS Secret Manager Properties Function configurations through enviroment variables are the following:

export $CAMEL_VAULT_AWS_USE_DEFAULT_CREDENTIALS_PROVIDER=accessKey
export $CAMEL_VAULT_AWS_SECRET_KEY=secretKey
export $CAMEL_VAULT_AWS_REGION=region

While as Main Configuration properties it is possible to define the credentials through the following:

camel.vault.aws.accessKey = accessKey
camel.vault.aws.secretKey = secretKey
camel.vault.aws.region = region

The above examples are not considering the Default Credentials Provider chain coming from AWS SDK, but the Properties Function can be configured even in that way. This is how to do that through enviroment variables:

export $CAMEL_VAULT_AWS_USE_DEFAULT_CREDENTIALS_PROVIDER=true
export $CAMEL_VAULT_AWS_REGION=region

This could be done even with main configuration properties:

camel.vault.aws.defaultCredentialsProvider = true
camel.vault.aws.region = region

GCP Secret Manager

The GCP Secret Manager Properties Function configurations through enviroment variables are the following:

export $CAMEL_VAULT_GCP_SERVICE_ACCOUNT_KEY=file:////path/to/service.accountkey
export $CAMEL_VAULT_GCP_PROJECT_ID=projectId

While as Main Configuration properties it is possible to define the credentials through the following:

camel.vault.gcp.serviceAccountKey = accessKey
camel.vault.gcp.projectId = secretKey

The above examples are not considering the Default Credentials Provider coming from GCP, but the Properties Function can be configured even in that way. This is how to do that through enviroment variables:

export $CAMEL_VAULT_GCP_USE_DEFAULT_INSTANCE=true
export $CAMEL_VAULT_GCP_PROJECT_ID=projectId

This could be done even with main configuration properties:

camel.vault.gcp.useDefaultInstance = true
camel.vault.aws.projectId = region

Multi fields Secrets

Some of the Secret manager services allow users to create multiple fields in a secret, like for example:

{
  "username": "admin",
  "password": "password123",
  "engine": "postgres",
  "host": "127.0.0.1",
  "port": "3128",
  "dbname": "db"
}

Usually the format of the secret will be a JSON. With the Properties Function related to secrets we can retrieve a single value of the secret and use it. As example:

You’re able to do get single secret value in your route, like for example:

<camelContext>
    <route>
        <from uri="direct:start"/>
        <log message="Username is {{gcp:database/username}}"/>
    </route>
</camelContext>

In this route the property will be replaced by the field username of the value of the secret named database.

Default Values

It is possible to fallback to a default value. Taking back the example above, we could use:

You could specify a default value in case the particular field of secret is not present on GCP Secret Manager:

<camelContext>
    <route>
        <from uri="direct:start"/>
        <log message="Username is {{gcp:database/username:admin}}"/>
    </route>
</camelContext>

And in case something is not working, like authentication fails, secret doesn’t exists or service is down, the value returned will be admin.

Future

In the next Camel version we are planning to work on more Secret Management Services. In particular we want to add two main components to the list:

  • Azure Key Vault
  • Hashicorp Vault

Follow the Camel’s development to know more about the work in progress.

Use the Properties Functions in your projects and give us feedback, once the release 3.16.0 will be out (it’s on vote in these days).

Stay tuned!

Camel-AWS-S3 - New Streaming upload feature

In the last weeks I was focused on a particular feature for the Camel AWS S3 component: the streaming upload feature.

In this post I’m going to summarize what it is an how to use it.

Streaming upload

The AWS S3 component had already a multipart upload feature in his producer operations: the main “problem” with it, was the need of knowing the size of the upload ahead of time.

The streaming upload feature coming in Camel 3.10.0 won’t need to know the size before starting the upload.

How it works

Obviously this feature has been implemented on the S3 component producer side.

The idea is to continuously send data to the producer and batching the messages. On the endpoint you’ll have three possible way of stopping the batching:

  • timeout
  • buffer size
  • batch size

Buffer size and batch size will work together, this means that the batch will be completed when the batch size is complete or when the set buffer size has been excedeed.

With the timeout in the picture the batching will be stopped and the upload completed (also) when the timeout will be reached.

S3 Files naming

In the streaming upload producer two different naming strategy are provided:

  • progressive
  • random

The progressive one will add a progressive suffix to the uploaded part, while the random one will add a random id as keyname suffix.

If the S3 key name you’ll specify on your endpoint will be “file_upload_part.txt”, during the upload you can expect a list like:

  • file_upload_part.txt
  • file_upload_part-1.txt
  • file_upload_part-2.txt

and so on.

The progressive naming strategy will make you ask how does it work when I stop and restart the route?

Restarting Strategies

The restarting strategies provided in the S3 Streaming upload producer are:

  • lastPart
  • override

the lastPart strategy will make sense only in combination with the progressive naming strategy, obviously.

At the time of restarting the route, the producer will check for the S3 keyname prefix in the bucket specified and get the last index uploaded.

The index will be used to start again from the same point.

Sample

This feature is very nice to see in action.

In the camel-examples repository I added an example of the feature with Kafka as consumer.

The example will poll one kafka topic s3.topic.1 and upload batch of 25 messages (or 1 Mb batch) as single file into an s3 bucket (mycamel-1).

In the how to run section of the README it is explained well how to ingest data to your Kafka broker.

Conclusion

The streaming upload feature will be useful in situation where the user don’t know the amount of data he wants to upload to S3, but also when he just wants to ingest data continuously without having to care about the size.

There is probably more work to do, but this can be a feature to introduce even in other storage components we have in Apache Camel.

What's new in Camel-Kafka-connector 0.7.0

Apache Camel Kafka Connector 0.7.0 has just been released.

This is based on the LTS release of Apache Camel 3.7.0, this means we will provide patch releases, as Camel 3.7.x is an LTS release.

So what’s in this release?

This release introduce bug fixes, improvements, new features and new connectors obviously

New connectors

The new connectors introduced in this release are the following:

  • AtlasMap: Transforms the message using an AtlasMap transformation
  • Kubernetes Custom Resources: Perform operations on Kubernetes Custom Resources and get notified on Deployment changes
  • Vert.X Kafka: Sent and receive messages to/from an Apache Kafka broker using vert.x Kafka client
  • JSON JSON-B: Marshal POJOs to JSON and back using JSON-B
  • CSimple: Evaluate a compile simple expression language
  • DataSonnet: To use DataSonnet scripts in Camel expressions or predicates
  • jOOR: Evaluate a jOOR (Java compiled once at runtime) expression language

Support for idempotent repository

In this release we introduced the idempotency support.

The initial support will provide:

  • in-memory idempotent repository
  • kafka idempotent repository

The idempotency will be supported on both source and sink connector, this means:

  • From a source point of view, by enabling the idempotency, you’ll be able to avoid ingesting the same payload to the target Kafka topic. This can be done at body or header level.
  • From a sink point of view, by enabling the idempotency, you’ll be able to avoid sending the same payload to an external system, through checking body or header of the Kafka record from the Kafka source topic.

Support for remove headers option

There are situation in which you’ll be using a Camel-Kafka-Connector source connector and sink connector together. Sometimes you’ll need to remove some of the Camel headers to achieve a particular behavior. This option should be used in this case. The value of this option can be a list of headers or a regexp.

In the camel-kafka-connector-examples repository we provided a little example based on 0.7.0 SNAPSHOT version.

New Archetypes

We added two new archetypes:

  • Apicurio Archetype: To be able to leveraging the apicurio service registry
  • Dataformat Archetype: To extend a connector by adding a dataformat to it

Improved integration tests

We improved the integration tests by leveraging the really good work by Otavio R. Piske on the camel-test-infra platform. We are now basing the integration on the camel-test-infra modules coming from the Camel release.

Documentation

On the Archetypes side we added more documentation about the base extensible connector and on the apicurio archetype and dataformat archetype (added in this release)

Strimzi and Kafka Version

This release has been based on Kafka 2.6.0 and Strimzi 0.20.1

Examples

We added a bunch of new examples to the camel-kafka-connector-examples repository

What to expect

We are already working on the next release based on the upcoming 3.8.0 Camel release. We are working on improving the documentation and adding new stuff.

Introducing Idempotency Support in Camel Kafka Connector

In the next Camel Kafka connector release (0.7.0) there will be a new feature: the idempotency support on both source and sink connectors. The aim of this post is giving some hints on how and when to use the idempotency feature.

What is Idempotency?

The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages: it essentially acts like a Message Filter to filter out duplicates, as reported in the Camel documentation

From the Enterprise Integration Patterns documentation: Sometimes the same message gets delivered more than once, either because the messaging system is not certain the message has been successfully delivered yet, or because the Message Channel’s quality-of-service has been lowered to improve performance. Message receivers, on the other hand, tend to assume that each message will be delivered exactly once, and tend to cause problems when they repeat processing because of repeat messages. A receiver designed as an Idempotent Receiver handles duplicate messages and prevents them from causing problems in the receiver application.

This is a very useful feature in the integration world and it is an important new feature in the camel-kafka-connector project. Apache Camel provides multiple implementation of the Idempotent Consumer, in Camel-Kafka-connector we’ll support the in Memory and Kafka implementations.

When to use idempotency

Suppose you’re using a source connector of any kind. By using the idempotency feature you’ll be able to avoid consuming the same message multiple times.

This means, in the Kafkish language, you won’t ingest the same payload multiple times in the target Kafka topic. This is something critical on the cloud for example, where you’ll pay for each API operation and for using increasing storage.

Now lets think about the sink connector scenario.

In this case, we’ll stream out of a Kafka topic multiple records, transform/convert/manipulate them and send them to an external system, like a messaging broker, a storage infra, a database etc.

In the Kafka topic used as source we may have multiple repeated records with the same payload or same metadata. Based on this information we can choose to skip the same records while sending data to the external system and for doing this we can leverage the idempotency feature of ckc.

Camel-Kafka-connector idempotency configuration

The idempotency feature can be enabled through a number of configuration options available in ckc with the 0.7.0 release. In particular we are talking about:

Name Description Default
camel.idempotency.enabled If idempotency must be enabled or not false
camel.idempotency.repository.type The idempotent repository type to use, possible values are memory and kafka memory
camel.idempotency.expression.type How the idempotency will be evaluated: possible values are body and header body
camel.idempotency.expression.header The header name that will be evaluated in case of camel.idempotency.expression.type equals to header null
camel.idempotency.memory.dimension The Memory dimension of the in memory idempotent Repository 100
camel.idempotency.kafka.topic The Kafka topic name to use for the idempotent repository kafka_idempotent_repository
camel.idempotency.kafka.bootstrap.servers A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live localhost:9092
camel.idempotency.kafka.max.cache.size Sets the maximum size of the local key cache 1000
camel.idempotency.kafka.poll.duration.ms Sets the poll duration (in milliseconds) of the Kafka consumer 100

The in-memory approach has been provided for short running connector workload, while the kafka one is for long running/interruptable connector.

The table is self-explaining.

A typical configuration for the kafka idempotent repository approach could be:

camel.idempotency.enabled=true
camel.idempotency.repository.type=kafka
camel.idempotency.expression.type=body
camel.idempotency.kafka.topic=my.idempotency.topic
camel.idempotency.kafka.bootstrap.servers=localhost:9092
camel.idempotency.kafka.max.cache.size=1500
camel.idempotency.kafka.poll.duration.ms=150

Some of the options can be used with their default value, in this example we’re just listing them for a Kafka idempotent repository configuration.

A real example

The best way to show how the idempotency feature works, in camel-kafka-connector, it’s through an example. We’ll use the AWS2-S3 Source connector

As first step you’ll need to fully build the Camel-Kafka-connector project and install the connectors/camel-aws2-s3-kafka-connector zip package in your Kafka Broker plugin.path. Once the connector is in the plugin.path location, just unzip it. We describe how to build and unpack in the next steps:

You’ll need to setup the plugin.path property in your kafka

Open the $KAFKA_HOME/config/connect-standalone.properties

and set the plugin.path property to your choosen location

In this example we’ll use /home/connectors/

> cd <ckc_project> 
> mvn clean package
> cp <ckc_project>/connectors/camel-aws2-s3-kafka-connector/target/camel-aws2-s3-kafka-connector-0.7.0-SNAPSHOT-package.zip /home/connectors/
> cd /home/connectors/
> unzip camel-aws2-s3-kafka-connector-0.7.0-SNAPSHOT-package.zip

The configuration for the source connector should be like:

name=CamelAWS2S3SourceConnector
connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

camel.source.maxPollDuration=10000

topics=s3.source.topic

camel.source.path.bucketNameOrArn=camel-kafka-connector

camel.source.endpoint.deleteAfterRead=false

camel.component.aws2-s3.access-key=xxxx
camel.component.aws2-s3.secret-key=yyyy
camel.component.aws2-s3.region=eu-west-1

camel.idempotency.enabled=true
camel.idempotency.repository.type=kafka
camel.idempotency.expression.type=body
camel.idempotency.kafka.topic=my.idempotency.topic

Don’t forget to add the correct credentials for your AWS account.

We can call the configuration file s3-source.properties for example

At this point we can run the connector.

> $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties s3-source.properties

You have to have a running kafka cluster for this purpose.

In your camel-kafka-connector bucket, try to load two files (test1.txt and test2.txt) with the same content, for example “Camel-Kafka-connector rocks”.

And consume from the s3.source.topic through kafkacat

> kafkacat -b localhost:9092 -t s3.source.topic
% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic s3.source.topic [0] at offset 0
Camel-Kafka-connector rocks
% Reached end of topic s3.source.topic [0] at offset 1

The body of the second file was discarded and you just have one message in the topic.

You can also have a look at the my.idempotency.topic content

> kafkacat -b localhost:9092 -t my.idempotency.topic -f 'Value:%s\nKey:%k\n'
% Auto-selecting Consumer mode (use -P or -C to override)
Value:add
Key:Camel-Kafka-connector rocks

We have just one operation of add with the body of the message as key.

Conclusion

This is just a little introduction on the camel-kafka-connector idempotency support. There are more case to cover and probably more work to be done. I just wanted to show something new in the camel-kafka-connector world. Feedback are welcome as always.