Introducing Infinispan-Kafka, connect your Kafka cluster with Infinispan

In the last couple of months I worked on a side project: Infinispan-Kafka. This project is based on the Kafka Connect tool: Kafka Connect is a tool for streaming data between Apache Kafka and other systems. There are two sides where data can be streamed: from Kafka to a different system (Sink Connector) and from a different system to Kafka (Source Connector). The Infinispan Kafka project implements only the Sink Connector (for the moment).

Basic Idea

In Infinispan, through the Protostream and Infinispan Remote Querying an end-user is able to remote querying Infinispan in a language-neutral manner, by using Protobuf. The Infinispan client must be configured to use a dedicated marshaller, ProtoStreamMarshaller and this one will use the ProtoStream library for encoding objects. ProtoStream library has to be instructed on how to marshall the message types, here is the documentation.

So the idea behind the Infinispan-Kafka project is to use the Protostream library and the ProtoStreamMarshaller to define what kind of data can be saved in Infinispan cache from Kafka and let Infinispan manage the marshalling/storing, creating a contract between Infinispan and Kafka.

The Infinispan-Kafka connector use the following configuration properties (for the moment):

Name Description Type Default Importance
infinispan.connection.hosts List of comma separated Infinispan hosts string localhost high
infinispan.connection.hotrod.port Infinispan Hot Rod port int 11222 high
infinispan.connection.cache.name Infinispan Cache name of use String default medium
infinispan.use.proto If true, the Remote Cache Manager will be configured to use protostream schemas boolean false medium
infinispan.proto.marshaller.class If infinispan.use.proto is true, this option has to contain an annotated protostream class to be used Class String.class medium
infinispan.cache.force.return.values By default, previously existing values for Map operations are not returned, if set to true the values will be returned boolean false low

an example of annotated protostream class is the following

package org.infinispan.kafka;

import java.io.Serializable;

import org.infinispan.protostream.annotations.ProtoDoc;
import org.infinispan.protostream.annotations.ProtoField;

@ProtoDoc("@Indexed")
public class Author implements Serializable {

   private String name;

   @ProtoField(number = 1, required = true)
   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   @Override
   public String toString() {
      return "Author [name=" + name + "]";
   }
}

A running example

Lets see how the connector works in a real example. You’ll need:

  • Infinispan server 9.1.0.Final
  • Kafka 0.11.0.0 running

and these three projects

  • https://github.com/oscerd/infinispan-kafka-demo
  • https://github.com/oscerd/infinispan-kafka-producer
  • https://github.com/oscerd/camel-infinispan-kafka-demo

But, since infinispan-kafka is not yet released, you’ll need to build the project to have it available in your Local Maven repository.

So fork or clone the Infinispan-Kafka project and run

> mvn clean install

Now lets see what is inside the different projects.

The Infinispan-kafka demo

In the repository you’ll find a properties file for the Infinispan-Kafka sink connector and a Protostream annotated class named Author.

The configuration is the following:

name=InfinispanSinkConnector
topics=test
tasks.max=1
connector.class=org.infinispan.kafka.InfinispanSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

infinispan.connection.hosts=127.0.0.1
infinispan.connection.hotrod.port=11222
infinispan.connection.cache.name=default
infinispan.cache.force.return.values=true
infinispan.use.proto=true
infinispan.proto.marshaller.class=org.infinispan.kafka.Author

We will use the topic test from our Kafka cluster

The Infinispan-kafka producer

In this project we define a Simple Producer that will send record to our Kafka cluster in the topic test. We will send five record of Author.

The Camel-infispan-kafka demo

In this project we define a Standalone camel route. This route will run every 10 seconds and it will query the Infinispan cache with a Remote Query defined here.

Run the example now!

Now that we have a full view of this demo we can run it!

Let’s start from the servers.

First we need to start the Infinispan server, in this case in standalone mode.

>infinispan-server-9.1.0.Final/bin$ ./standalone.sh 
=========================================================================

  JBoss Bootstrap Environment

  JBOSS_HOME: /home/oscerd/playground/infinispan-server-9.1.0.Final

  JAVA: /usr/lib/jvm/jdk1.8.0_65//bin/java

  JAVA_OPTS:  -server  -server -Xms64m -Xmx512m -Djava.net.preferIPv4Stack=true -Djboss.modules.system.pkgs=org.jboss.byteman -Djava.awt.headless=true

=========================================================================

11:36:28,886 INFO  [org.jboss.modules] (main) JBoss Modules version 1.5.2.Final
11:36:29,046 INFO  [org.jboss.msc] (main) JBoss MSC version 1.2.6.Final
11:36:29,104 INFO  [org.jboss.as] (MSC service thread 1-6) WFLYSRV0049: Infinispan Server 9.1.0.Final (WildFly Core 2.2.0.Final) starting
11:36:29,788 INFO  [org.jboss.as.server] (Controller Boot Thread) WFLYSRV0039: Creating http management service using socket-binding (management-http)
11:36:29,801 INFO  [org.xnio] (MSC service thread 1-6) XNIO version 3.4.0.Final
11:36:29,806 INFO  [org.xnio.nio] (MSC service thread 1-6) XNIO NIO Implementation Version 3.4.0.Final
11:36:29,823 INFO  [org.jboss.as.clustering.infinispan] (ServerService Thread Pool -- 20) Activating Infinispan subsystem.
11:36:29,823 INFO  [org.wildfly.extension.io] (ServerService Thread Pool -- 19) WFLYIO001: Worker 'default' has auto-configured to 16 core threads with 128 task threads based on your 8 available processors
11:36:29,835 INFO  [org.jboss.as.connector.subsystems.datasources] (ServerService Thread Pool -- 18) WFLYJCA0004: Deploying JDBC-compliant driver class org.h2.Driver (version 1.3)
11:36:29,843 INFO  [org.jboss.as.naming] (ServerService Thread Pool -- 25) WFLYNAM0001: Activating Naming Subsystem
11:36:29,848 WARN  [org.jboss.as.txn] (ServerService Thread Pool -- 29) WFLYTX0013: Node identifier property is set to the default value. Please make sure it is unique.
11:36:29,849 INFO  [org.jboss.as.connector] (MSC service thread 1-8) WFLYJCA0009: Starting JCA Subsystem (WildFly/IronJacamar 1.3.4.Final)
11:36:29,851 INFO  [org.jboss.as.connector.deployers.jdbc] (MSC service thread 1-3) WFLYJCA0018: Started Driver service with driver-name = h2
11:36:29,860 INFO  [org.jboss.as.security] (ServerService Thread Pool -- 27) WFLYSEC0002: Activating Security Subsystem
11:36:29,861 INFO  [org.jboss.remoting] (MSC service thread 1-6) JBoss Remoting version 4.0.21.Final
11:36:29,872 INFO  [org.jboss.as.security] (MSC service thread 1-2) WFLYSEC0001: Current PicketBox version=4.9.6.Final
11:36:29,876 INFO  [org.jboss.as.naming] (MSC service thread 1-7) WFLYNAM0003: Starting Naming Service
11:36:30,079 INFO  [org.jboss.as.server.deployment.scanner] (MSC service thread 1-4) WFLYDS0013: Started FileSystemDeploymentService for directory /home/oscerd/playground/infinispan-server-9.1.0.Final/standalone/deployments
11:36:30,394 INFO  [org.infinispan.factories.GlobalComponentRegistry] (MSC service thread 1-5) ISPN000128: Infinispan version: Infinispan 'Bastille' 9.1.0.Final
11:36:30,535 INFO  [org.jboss.as.connector.subsystems.datasources] (MSC service thread 1-8) WFLYJCA0001: Bound data source [java:jboss/datasources/ExampleDS]
11:36:30,745 INFO  [org.jboss.as.clustering.infinispan] (MSC service thread 1-6) DGISPN0001: Started default cache from local container
11:36:30,746 INFO  [org.jboss.as.clustering.infinispan] (MSC service thread 1-7) DGISPN0001: Started namedCache cache from local container
11:36:30,763 INFO  [org.infinispan.server.endpoint] (MSC service thread 1-3) DGENDPT10000: HotRodServer starting
11:36:30,766 INFO  [org.infinispan.server.endpoint] (MSC service thread 1-3) DGENDPT10001: HotRodServer listening on 127.0.0.1:11222
11:36:30,770 INFO  [org.infinispan.server.endpoint] (MSC service thread 1-5) DGENDPT10000: REST starting
11:36:31,072 INFO  [org.infinispan.server.endpoint] (MSC service thread 1-5) DGENDPT10002: REST listening on 127.0.0.1:8080 (mapped to rest)
11:36:31,258 INFO  [org.jboss.as] (Controller Boot Thread) WFLYSRV0060: Http management interface listening on http://127.0.0.1:9990/management
11:36:31,259 INFO  [org.jboss.as] (Controller Boot Thread) WFLYSRV0051: Admin console listening on http://127.0.0.1:9990
11:36:31,259 INFO  [org.jboss.as] (Controller Boot Thread) WFLYSRV0025: Infinispan Server 9.1.0.Final (WildFly Core 2.2.0.Final) started in 2602ms - Started 152 of 163 services (49 services are lazy, passive or on-demand)

Next we need to start our Kafka server, as always we’ll need to start zookeeper first and then the server.

>kafka_2.12-0.11.0.0$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2017-07-31 11:38:55,401] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2017-07-31 11:38:55,404] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-07-31 11:38:55,404] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-07-31 11:38:55,404] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-07-31 11:38:55,404] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2017-07-31 11:38:55,417] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2017-07-31 11:38:55,417] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2017-07-31 11:38:55,728] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2017-07-31 11:38:55,728] INFO Server environment:host.name=ghost (org.apache.zookeeper.server.ZooKeeperServer)
[2017-07-31 11:38:55,728] INFO Server environment:java.version=1.8.0_65 (org.apache.zookeeper.server.ZooKeeperServer)
.
.
.
.

And the server

>kafka_2.12-0.11.0.0$ bin/kafka-server-start.sh config/server.properties
[2017-07-31 11:40:31,244] INFO KafkaConfig values: 
	advertised.host.name = null
	advertised.listeners = null
	advertised.port = null
	alter.config.policy.class.name = null
	authorizer.class.name = 
	auto.create.topics.enable = true
	auto.leader.rebalance.enable = true
	background.threads = 10
	broker.id = 0
	broker.id.generation.enable = true
	broker.rack = null
	compression.type = producer
	connections.max.idle.ms = 600000
	controlled.shutdown.enable = true
	controlled.shutdown.max.retries = 3
	controlled.shutdown.retry.backoff.ms = 5000
	controller.socket.timeout.ms = 30000
	create.topic.policy.class.name = null
	default.replication.factor = 1
.
.
.
.
.
.
.

Next we’ll need to create a new topic named test.

>kafka_2.12-0.11.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

The servers are now up and running. At this point we need to start the Infinispan-Kafka connector demo.

>infinispan-kafka-demo$ mvn clean package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building infinispan-kafka-demo 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ infinispan-kafka-demo ---
[INFO] Deleting /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ infinispan-kafka-demo ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO] 
[INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ infinispan-kafka-demo ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 1 source file to /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ infinispan-kafka-demo ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/src/test/resources
[INFO] 
[INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ infinispan-kafka-demo ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ infinispan-kafka-demo ---
[INFO] No tests to run.
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ infinispan-kafka-demo ---
[INFO] Building jar: /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target/infinispan-kafka-demo-0.0.1-SNAPSHOT.jar
[INFO] 
[INFO] --- maven-assembly-plugin:2.5.3:single (make-assembly) @ infinispan-kafka-demo ---
[INFO] Reading assembly descriptor: src/main/assembly/package.xml
[WARNING] The following patterns were never triggered in this artifact exclusion filter:
o  'org.apache.kafka:connect-api'

[INFO] Copying files to /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target/infinispan-kafka-demo-0.0.1-SNAPSHOT-package
[WARNING] Assembly file: /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target/infinispan-kafka-demo-0.0.1-SNAPSHOT-package is not a regular file (it may be a directory). It cannot be attached to the project build for installation or deployment.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.799 s
[INFO] Finished at: 2017-07-31T11:50:11+02:00
[INFO] Final Memory: 30M/530M
[INFO] ------------------------------------------------------------------------
>infinispan-kafka-demo$ export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"

We can start the connector now with the configuration of the demo project, in this way:

>infinispan-kafka-demo$ kafka_2.12-0.11.0.0/bin/connect-standalone.sh kafka_2.12-0.11.0.0/config/connect-standalone.properties config/InfinispanSinkConnector.properties 
[2017-07-31 11:58:37,893] INFO Registered loader: sun.misc.Launcher$AppClassLoader@18b4aac2 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-07-31 11:58:37,895] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,895] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.infinispan.kafka.InfinispanSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,897] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,897] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,897] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,899] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,899] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,899] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,899] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,899] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'SchemaSourceConnector' and 'SchemaSource' to plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'InfinispanSinkConnector' and 'InfinispanSink' to plugin 'org.infinispan.kafka.InfinispanSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-07-31 11:58:37,901] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-07-31 11:58:37,901] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-07-31 11:58:37,912] INFO StandaloneConfig values: 
	access.control.allow.methods = 
	access.control.allow.origin = 
	bootstrap.servers = [localhost:9092]
	internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
	internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
	key.converter = class org.apache.kafka.connect.json.JsonConverter
	offset.flush.interval.ms = 10000
	offset.flush.timeout.ms = 5000
	offset.storage.file.filename = /tmp/connect.offsets
	plugin.path = null
	rest.advertised.host.name = null
	rest.advertised.port = null
	rest.host.name = null
	rest.port = 8083
	task.shutdown.graceful.timeout.ms = 5000
	value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:223)
[2017-07-31 11:58:38,004] INFO Logging initialized @2593ms (org.eclipse.jetty.util.log:186)
[2017-07-31 11:58:38,153] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:49)
[2017-07-31 11:58:38,153] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2017-07-31 11:58:38,153] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:144)
[2017-07-31 11:58:38,154] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:59)
[2017-07-31 11:58:38,155] INFO Worker started (org.apache.kafka.connect.runtime.Worker:149)
[2017-07-31 11:58:38,155] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-07-31 11:58:38,155] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-07-31 11:58:38,218] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Jul 31, 2017 11:58:38 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2017-07-31 11:58:38,561] INFO Started o.e.j.s.ServletContextHandler@5dab9949{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-07-31 11:58:38,568] INFO Started ServerConnector@740c2895{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-07-31 11:58:38,569] INFO Started @3158ms (org.eclipse.jetty.server.Server:379)
[2017-07-31 11:58:38,569] INFO REST server listening at http://192.168.1.10:8083/, advertising URL http://192.168.1.10:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-07-31 11:58:38,569] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2017-07-31 11:58:38,575] INFO ConnectorConfig values: 
	connector.class = org.infinispan.kafka.InfinispanSinkConnector
	key.converter = class org.apache.kafka.connect.storage.StringConverter
	name = InfinispanSinkConnector
	tasks.max = 1
	transforms = null
	value.converter = class org.apache.kafka.connect.storage.StringConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-07-31 11:58:38,576] INFO EnrichedConnectorConfig values: 
	connector.class = org.infinispan.kafka.InfinispanSinkConnector
	key.converter = class org.apache.kafka.connect.storage.StringConverter
	name = InfinispanSinkConnector
	tasks.max = 1
	transforms = null
	value.converter = class org.apache.kafka.connect.storage.StringConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-07-31 11:58:38,576] INFO Creating connector InfinispanSinkConnector of type org.infinispan.kafka.InfinispanSinkConnector (org.apache.kafka.connect.runtime.Worker:204)
[2017-07-31 11:58:38,576] INFO Instantiated connector InfinispanSinkConnector with version 0.0.1-SNAPSHOT of type class org.infinispan.kafka.InfinispanSinkConnector (org.apache.kafka.connect.runtime.Worker:207)
[2017-07-31 11:58:38,577] INFO Finished creating connector InfinispanSinkConnector (org.apache.kafka.connect.runtime.Worker:225)
[2017-07-31 11:58:38,578] INFO SinkConnectorConfig values: 
	connector.class = org.infinispan.kafka.InfinispanSinkConnector
	key.converter = class org.apache.kafka.connect.storage.StringConverter
	name = InfinispanSinkConnector
	tasks.max = 1
	topics = [test]
	transforms = null
	value.converter = class org.apache.kafka.connect.storage.StringConverter
 (org.apache.kafka.connect.runtime.SinkConnectorConfig:223)
[2017-07-31 11:58:38,578] INFO EnrichedConnectorConfig values: 
	connector.class = org.infinispan.kafka.InfinispanSinkConnector
	key.converter = class org.apache.kafka.connect.storage.StringConverter
	name = InfinispanSinkConnector
	tasks.max = 1
	topics = [test]
	transforms = null
	value.converter = class org.apache.kafka.connect.storage.StringConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-07-31 11:58:38,578] INFO Setting task configurations for 1 workers. (org.infinispan.kafka.InfinispanSinkConnector:50)
[2017-07-31 11:58:38,579] INFO Creating task InfinispanSinkConnector-0 (org.apache.kafka.connect.runtime.Worker:358)
[2017-07-31 11:58:38,579] INFO ConnectorConfig values: 
	connector.class = org.infinispan.kafka.InfinispanSinkConnector
	key.converter = class org.apache.kafka.connect.storage.StringConverter
	name = InfinispanSinkConnector
	tasks.max = 1
	transforms = null
	value.converter = class org.apache.kafka.connect.storage.StringConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-07-31 11:58:38,579] INFO EnrichedConnectorConfig values: 
	connector.class = org.infinispan.kafka.InfinispanSinkConnector
	key.converter = class org.apache.kafka.connect.storage.StringConverter
	name = InfinispanSinkConnector
	tasks.max = 1
	transforms = null
	value.converter = class org.apache.kafka.connect.storage.StringConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-07-31 11:58:38,580] INFO TaskConfig values: 
	task.class = class org.infinispan.kafka.InfinispanSinkTask
 (org.apache.kafka.connect.runtime.TaskConfig:223)
[2017-07-31 11:58:38,580] INFO Instantiated task InfinispanSinkConnector-0 with version 0.0.1-SNAPSHOT of type org.infinispan.kafka.InfinispanSinkTask (org.apache.kafka.connect.runtime.Worker:373)
[2017-07-31 11:58:38,587] INFO ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = connect-InfinispanSinkConnector
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
[2017-07-31 11:58:38,631] INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-07-31 11:58:38,631] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-07-31 11:58:38,633] INFO Created connector InfinispanSinkConnector (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-07-31 11:58:38,634] INFO InfinispanSinkConnectorConfig values: 
	infinispan.cache.force.return.values = true
	infinispan.connection.cache.name = default
	infinispan.connection.hosts = 127.0.0.1
	infinispan.connection.hotrod.port = 11222
	infinispan.proto.marshaller.class = class org.infinispan.kafka.Author
	infinispan.use.proto = true
 (org.infinispan.kafka.InfinispanSinkConnectorConfig:223)
[2017-07-31 11:58:38,661] INFO Adding protostream (org.infinispan.kafka.InfinispanSinkTask:90)
[2017-07-31 11:58:38,749] INFO ISPN004021: Infinispan version: 9.1.0.Final (org.infinispan.client.hotrod.RemoteCacheManager:212)
[2017-07-31 11:58:38,908] INFO Sink task WorkerSinkTask{id=InfinispanSinkConnector-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-07-31 11:58:38,996] INFO Discovered coordinator ghost:9092 (id: 2147483647 rack: null) for group connect-InfinispanSinkConnector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-07-31 11:58:38,998] INFO Revoking previously assigned partitions [] for group connect-InfinispanSinkConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-07-31 11:58:38,998] INFO (Re-)joining group connect-InfinispanSinkConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-07-31 11:58:39,056] INFO Successfully joined group connect-InfinispanSinkConnector with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-07-31 11:58:39,057] INFO Setting newly assigned partitions [test-0] for group connect-InfinispanSinkConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

Lets run the camel route to query the Infinispan cache and see the result in this moment.

>camel-infinispan-kafka-demo$ mvn clean compile exec:java
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Camel example with Infinispan 2.20.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ camel-infinispan-kafka-demo ---
[INFO] Deleting /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/target
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ camel-infinispan-kafka-demo ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ camel-infinispan-kafka-demo ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 4 source files to /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/target/classes
[WARNING] /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/src/main/java/com/github/oscerd/camel/infinispan/kafka/demo/CamelInfinispanRoute.java: /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/src/main/java/com/github/oscerd/camel/infinispan/kafka/demo/CamelInfinispanRoute.java uses unchecked or unsafe operations.
[WARNING] /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/src/main/java/com/github/oscerd/camel/infinispan/kafka/demo/CamelInfinispanRoute.java: Recompile with -Xlint:unchecked for details.
[INFO] 
[INFO] --- exec-maven-plugin:1.5.0:java (default-cli) @ camel-infinispan-kafka-demo ---
[.kafka.demo.Application.main()] RemoteCacheManager             INFO  ISPN004021: Infinispan version: 9.1.0.Final
[.kafka.demo.Application.main()] DefaultCamelContext            INFO  Apache Camel 2.20.0-SNAPSHOT (CamelContext: camel-1) is starting
[.kafka.demo.Application.main()] ManagedManagementStrategy      INFO  JMX is enabled
[.kafka.demo.Application.main()] DefaultTypeConverter           INFO  Type converters loaded (core: 192, classpath: 0)
[.kafka.demo.Application.main()] DefaultCamelContext            INFO  StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[.kafka.demo.Application.main()] DefaultCamelContext            INFO  Route: route1 started and consuming from: timer://foo?period=10000&repeatCount=0
[.kafka.demo.Application.main()] DefaultCamelContext            INFO  Total 1 routes, of which 1 are started.
[.kafka.demo.Application.main()] DefaultCamelContext            INFO  Apache Camel 2.20.0-SNAPSHOT (CamelContext: camel-1) started in 0.221 seconds
Starting Camel. Use ctrl + c to terminate the JVM.

[.kafka.demo.Application.main()] DefaultCamelContext            INFO  Apache Camel 2.20.0-SNAPSHOT (CamelContext: camel-2) is starting
[.kafka.demo.Application.main()] ManagedManagementStrategy      INFO  JMX is enabled
[.kafka.demo.Application.main()] DefaultTypeConverter           INFO  Type converters loaded (core: 192, classpath: 0)
[.kafka.demo.Application.main()] DefaultCamelContext            INFO  StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[.kafka.demo.Application.main()] DefaultCamelContext            INFO  Total 0 routes, of which 0 are started.
[.kafka.demo.Application.main()] DefaultCamelContext            INFO  Apache Camel 2.20.0-SNAPSHOT (CamelContext: camel-2) started in 0.012 seconds
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 0
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content []
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 0
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content []
.
.
.
.
.

We don’t have data because no records has been sent to Kafka topic. Lets add some data through the Infinispan-Kafka producer.

>infinispan-kafka-producer$ mvn clean compile exec:exec
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Infinispan Kafka Producer 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ infinispan-kafka-producer ---
[INFO] Deleting /home/oscerd/workspace/miscellanea/infinispan-kafka-producer/target
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ infinispan-kafka-producer ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.2:compile (default-compile) @ infinispan-kafka-producer ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /home/oscerd/workspace/miscellanea/infinispan-kafka-producer/target/classes
[INFO] 
[INFO] --- exec-maven-plugin:1.5.0:exec (default-cli) @ infinispan-kafka-producer ---
2017-07-31 12:03:12 INFO  ProducerConfig:223 - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2017-07-31 12:03:12 INFO  AppInfoParser:83 - Kafka version : 0.11.0.0
2017-07-31 12:03:12 INFO  AppInfoParser:84 - Kafka commitId : cb8625948210849f
2017-07-31 12:03:13 INFO  KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.864 s
[INFO] Finished at: 2017-07-31T12:03:13+02:00
[INFO] Final Memory: 18M/296M
[INFO] ------------------------------------------------------------------------

In the connector log we should see something like this:

[2017-07-31 12:03:13,261] INFO Received 5 records (org.infinispan.kafka.InfinispanSinkTask:65)
[2017-07-31 12:03:13,261] INFO Record kafka coordinates:(test-1-{"name":"Andrea Cosentino"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:13,290] INFO Record kafka coordinates:(test-2-{"name":"Jonathan Anstey"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:13,292] INFO Record kafka coordinates:(test-3-{"name":"Claus Ibsen"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:13,294] INFO Record kafka coordinates:(test-4-{"name":"Normam Maurer"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:13,295] INFO Record kafka coordinates:(test-5-{"name":"Philip Roth"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:20,015] INFO WorkerSinkTask{id=InfinispanSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)

While in the Camel-Infinispan-Kafka demo logs we should see a different result from the query:

[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 0
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content []
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute           INFO  Query Result content [Author [name=Andrea Cosentino]]

As you may see the Infinispan Cache has been populated with the data coming from Kafka topic test.

Conclusion

This blog post introduces the new Infinispan-Kafka connector and show a little demo involving, Kafka, Infinispan and Camel. Obviously there is so much more work to do on the connector and contributions are more than welcome. Follow the developments on the Github repo.

Cryptographic Pills Episode 1 - Modern Cryptography

In the last days I started to look at the Cryptography’s Theory again. I guess it’s an interesting subject for anyone involved in the IT world: for developers and security specialists understand some key concepts is essential to manage particular scenarios. Yesterday there was the Google’s announce about the first SHA-1 collision: hash functions will be in one of the articles I currently have in mind. I’ll try to write about Cryptographic pills in episodes and I hope you’ll appreciate the subject. Let’s start!

In the beginning

Until the late 20th century Cryptography was considered an art form. Constructing good codes, breaking existing ones and be able to understand the algorithm behind a code relies on creativity and personal skill. There wasn’t so much theory around the subject. With the Cryptography’s spread in areas different from Military and Defense the whole picture radically changed: a rich theory and rigorous study made the Cryptography an applied Science. Today Cryptography is everywhere.

Private-key Encryption

Cryptography was historically concerned with secret communications. In the past we were talking about ciphers, today we talk about encryption schemes: the concepts are the same. Two parts want to communicate in a secret manner and they decide to use a particular encryption scheme. An assumption in this scenario is that the communicating parties must have a way to initially sharing a key in a secret manner.

A private-key Encryption scheme is based on three algorithms [1]:

  1. A Key-generation algorithm Gen is a probabilistic algorithm that outputs a key k chosen according to some distribution that is determined by the scheme.
  2. The Encryption Algorithm Enc takes as input a key k and a plaintext message m and outputs a cyphertext c. We denote by the encryption of the plaintext m using the key k in the following way
$$Enc_k(m)$$
  1. The Decryption Algorithm Dec takes as input a key k and a cyphertext message c and outputs a plaintext m. We denote by the decryption of the cyphertext c using the key k in the following way
$$Dec_k(c)$$
  • The set of all possible keys generated by the Gen algorithm is denoted as K and it is called the Key Space.
  • The set of all “legal” messages (supported by the Encryption Algorithm Enc) is denoted as M and it is called the Message Space.
  • Since any Cyphertext is obtained by encrypting some plaintext under some key, the sets K and M together define a set of all possible Cyphertexts denoted with C.

Kerckhoffs’s principle

From the definitions above it’s clear that if an eavesdropping adversary knows the Dec algorithm and the chosen key k, he will be able to decrypt the messages exchanged between the two communicating parts. The key must be secret and this is a fundamental point, but do we need to maintain secret the Encryption and Decryption algorithms too? In the late 19th century August Kerckhoffs gave his opinion:

The cipher method must not be required to be secret, and it must be able to fall in the hands of the enemy without incovenience

It’s something related to Open Source and Open Standards too. Today any Encryption Scheme is usually demanded to be made public. The main reasons for this can be summarized in the following points:

  1. The public scrutiny make the design stronger.
  2. If flaws exist it can be found by someone hacking on the scheme
  3. Public design enables the establishment of standards.

The Kerckhoffs’ principle is obvious, but in same cases it was ignored with disastrous results. Only publicly tried and tested algorithm should be used by organizations. Kerchoffs was one of the first person with an Open vision about these things.

What’s next

In the next articles I would like to talk about the theory behind the Cryptographic schemes strenght. How we can say a scheme is secure? What are the attack scenarios? What is the definition of a secure scheme?

[1] Introduction to Modern Cryptography, Chapman & Hall/Crc, Jonathan Katz, Yehuda Lindell, Chapter 1.

Using Camel-cassandraql on a Kubernetes Cassandra cluster with Fabric8

In the last months I blogged about Testing Camel-cassandraql on a Dockerized cluster. This time I’d like to show you a similar example running on Kubernetes. I’ve worked on a fabric8 quickstart splitted in two different projects: Cassandra server and Cassandra client. The server will spin up a Cassandra cluster with three nodes and the client will run a camel-cassandraql route querying the keyspace in the Cluster. You’ll need a running Kubernetes cluster to follow this example. I decided to use Fabric8 on Minikube, but you can also try to run the example using Fabric8 on Minishift. Once you have your environment is up and running you can start to execute the following sections commands.

Spinning up an Apache Cassandra cluster on Kubernetes

For this post we will use Kubernetes to spin up our Apache Cassandra Cluster. To start the cluster you can use the Fabric8 IPaas Cassandra app. Since we are using Minikube, you’ll need to follow the related instructions.

This project will spin up a Cassandra Cluster with three nodes.

As first step, in the cassandra folder, we will need to run

mvn clean install

and

mvn fabric8:deploy

Once we’re done with this commands we can watch the Kubernetes pods state for our specific app and after a while we should be able to see our pods running:

oscerd@localhost:~/workspace/fabric8-universe/fabric8-ipaas/cassandra$ kubectl get pods -l app=cassandra --watch
NAME                         READY     STATUS    RESTARTS   AGE
cassandra-2459930792-lfh93   1/1       Running   0          15s
cassandra-2459930792-m8c9p   1/1       Running   0          15s
cassandra-2459930792-zxiv3   1/1       Running   0          15s

I usually prefer the command line, but you can also watch the status of your pods on the wonderful Fabric8 Console and with Minikube/Minishift is super easy to access the console, simply run the following command:

minikube service fabric8

or on Minishift

minishift service fabric8

In Apache Cassandra nodetool is one of the most important command to manage a Cluster and executing command on single node.

It is interesting to look at the way the Kubernetes team developed the Seeds discovery into them Cassandra example. The class to look at is KubernetesSeedProvider and it implements the SeedProvider interface from Apache Cassandra project. The getSeeds method is implemented here: if the KubernetesSeedProvider is unable to find seeds it will fall back to createDefaultSeeds method that is exactly the simpleSeedProvider implementation from Apache Cassandra Project.

The Kubenertes-cassandra jar is used in the Cassandra DockerFile from Google example and it is added to the classpath in the run.sh script.

In the Cassandra fabric8-ipaas project we used the deployment.yml and service.yml configuration. You can find the details in the repository. You can also scale up/down your cluster with this simple command:

kubectl scale --replicas=2 deployment/cassandra

and after a while you’ll see the scaled deployment:

oscerd@localhost:~/workspace/fabric8-universe/fabric8-ipaas/cassandrakubectl get pods -l app=cassandra --watch
NAME                         READY     STATUS    RESTARTS   AGE
cassandra-2459930792-lfh93   1/1       Running   0          28m
cassandra-2459930792-m8c9p   1/1       Running   0          28m

Our cluster is up and running now and we can run our camel-cassandraql route from Cassandra client.

Running the Cassandra-client Fabric8 Quickstart

In Fabric8 Quickstarts organization I’ve added a little Cassandra-client example. The route will do a simple query on the keyspace we’ve just created on our Cluster. This quickstart requires Camel 2.18.0 release to work fine, now that 2.18.0 is out we can cover the example. The quickstart will first execute a bean to pre-populate the Cassandra keyspace as init-method and it defines a main route. It’s a very simple quickstart (single query/print result set), but it shows how it is easy to interact with the Cassandra Cluster. You don’t have to add contact points of single nodes to your Cassandra endpoint URI or use a complex configuration, you can simply use the service name (“cassandra”) and Kubernetes will do the work for you.

You can run the example with:

mvn clean install
mvn fabric8:run

You should be able to see your cassandra-client running:

oscerd@localhost:~/workspace/fabric8-universe/fabric8-quickstarts/cassandra-client$ kubectl get pods
NAME                                      READY     STATUS    RESTARTS   AGE
cassandra-2459930792-lfh93                1/1       Running   0          45m
cassandra-2459930792-m8c9p                1/1       Running   0          45m
cassandra-2459930792-ms265                1/1       Running   0          15m
cassandra-client-3789635050-e1f2g         1/1       Running   0          1m

With the fabric8:run command you’ll see the logs of the application in the console directly:

2016-10-14 09:06:15,786 [main           ] INFO  DCAwareRoundRobinPolicy        - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-10-14 09:06:15,789 [main           ] INFO  Cluster                        - New Cassandra host cassandra/10.0.0.67:9042 added
2016-10-14 09:06:15,790 [main           ] INFO  Cluster                        - New Cassandra host /172.17.0.10:9042 added
2016-10-14 09:06:15,790 [main           ] INFO  Cluster                        - New Cassandra host /172.17.0.12:9042 added
2016-10-14 09:06:20,701 [main           ] INFO  SpringCamelContext             - Apache Camel 2.18.0 (CamelContext: camel-1) is starting
2016-10-14 09:06:20,703 [main           ] INFO  ManagedManagementStrategy      - JMX is enabled
2016-10-14 09:06:20,928 [main           ] INFO  DefaultTypeConverter           - Loaded 190 type converters
2016-10-14 09:06:20,977 [main           ] INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-10-14 09:06:21,228 [main           ] INFO  SpringCamelContext             - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-10-14 09:06:21,231 [main           ] INFO  ClockFactory                   - Using native clock to generate timestamps.
2016-10-14 09:06:21,503 [main           ] INFO  DCAwareRoundRobinPolicy        - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-10-14 09:06:21,548 [main           ] INFO  Cluster                        - New Cassandra host cassandra/10.0.0.67:9042 added
2016-10-14 09:06:21,548 [main           ] INFO  Cluster                        - New Cassandra host /172.17.0.11:9042 added
2016-10-14 09:06:21,548 [main           ] INFO  Cluster                        - New Cassandra host /172.17.0.10:9042 added
2016-10-14 09:06:22,047 [main           ] INFO  SpringCamelContext             - Route: cassandra-route started and consuming from: timer://foo?period=5000
2016-10-14 09:06:22,075 [main           ] INFO  SpringCamelContext             - Total 1 routes, of which 1 are started.
2016-10-14 09:06:22,083 [main           ] INFO  SpringCamelContext             - Apache Camel 2.18.0 (CamelContext: camel-1) started in 1.374 seconds
2016-10-14 09:06:23,133 [0 - timer://foo] INFO  cassandra-route                - Query result set [Row[1, oscerd]]
2016-10-14 09:06:28,092 [0 - timer://foo] INFO  cassandra-route                - Query result set [Row[1, oscerd]]

As you can see everything is straightforward and you can think about more complex architectures. For example: an Infinispan cache with a persisted cache store based on Apache Cassandra Cluster with Java application, Camel routes or Spring-boot applications interacting with the Infinispan platform.

The route of the example is super simple:

  <camelContext xmlns="http://camel.apache.org/schema/spring" depends-on="populate">
    <route id="cassandra-route">
      <from uri="timer:foo?period=5000"/>
      <to uri="cql://cassandra/test?cql=select * from users;&amp;consistencyLevel=quorum" />
      <log message="Query result set ${body}"/>
    </route>

  </camelContext>

and it abstracts completely the details of the Cassandra cluster (how many nodes? Where are the nodes? What are the addresses of them? and so on).

Conclusions

This example shows only a bit of the power of Kubernetes/Openshift/Fabric8/Microservices world and in the future I would like to create a complex architecture quickstart. Stay tuned!

Apache Camel 2.18.0 release, what is coming

The Camel community is actually in the process of releasing the new Apache Camel 2.18.0. This is a big release with a lot of new features and new components. Lets take a deeper look at what is coming.

  • Java 8

    This is the first release requiring Java 8. We worked hard on this and we are working on Java 8 code style in the codebase.

  • Automatic Documentation

    In Apache Camel 2.18.0 you’ll find new documentation. The components, endpoints, dataformats and languages are now documented completely in automatic way. This is important because in the past the Confluence documentation was often misaligned with respect to the codebase. The documentation is generated, if some modification has been done, during the component/endpoint/dataformat/language build. This feature will be the base for a new website including this material. You can see the generated docs in the camel-website folder in the Apache Camel Repository.

  • Spring-boot and Wildfly-Swarm support

    Camel is now present on the Spring-starter site and on the Wildfly-swarm one. All the new feature related to Spring-boot can be found in the article from Nicola Ferraro’s blog Apache Camel meets Spring-boot. Running Camel on Spring-boot has never been so easy.

  • Hystrix Circuit Breaker and Netflix OSS

    This release will have a circuit breaker implementation using the Netflix Hystrix project (with dashboard too).

  • Distributed message tracing with camel-zipkin component

    This component is used for tracing and timing incoming and outgoing Camel messages using Zipkin. For more information take a look at the documentation or the Asciidoc file in the Apache Camel repository.

  • Service Call

    This provide the feature of calling a remote service in a distributed system where the service is looked up from a service registry of some sorts. Camel won’t need to know where the service is hosted. Camel will lookup the service from from kubernetes, openshift, cloud foundry, zuul, consul, zookeeper etc.

  • New components

    • camel-asterisk - For interacting with Asterisk PBX Servers
    • camel-cm-sms - For sending SMS messages using SM SMS Gateway.
    • camel-consul - For integrating your application with Consul.
    • camel-ehcache - For interacting with Ehcache 3 cache.
    • camel-flink - Bridges Camel connectors with Apache Flink tasks.
    • camel-lumberjack - For receiving logs over the lumberjack protocol (used by Filebeat for instance)
    • camel-ribbon - To use Netflixx Ribbon with the Service Call EIP.
    • camel-servicenow - For cloud management with ServiceNow.
    • camel-telegram - For messaging with Telegram.
    • camel-zipkin - For tracking Camel message flows/timings using zipkin.
    • camel-chronicle - For interacting with OpenHFT’s Chronicle-Engine.
  • New Dataformats

    • camel-johnzon - Apache Johnzon is an implementation of JSR-353 (JavaTM API for JSON Processing).
  • New examples

    In this release there will be some new examples. To name a few:

    • camel-example-cdi-kubernetes - An example of camel-kubernetes component used to retrieve a list of pod from your Kubernetes cluster. The example is CDI-based.
    • camel-example-java8 - Demonstrates the Java DSL with experimental new Java8 lambda support for expression/preidcate/processor’s. We love feedback on this DSL and expect to improved the API over the next couple of releases.
    • camel-example-java8-rx - Demonstrates the Java DSL with experimental new Java8 lambda support for typesafe filtering and transforming of messages wit Rx-Java. We love feedback on this DSL and expect to improved the API over the next couple of releases.
  • Important changes

    • Karaf 2.4.x is no longer supported. Karaf 4.x is the primary supported OSGi platform.
    • Jetty 8.x is no longer supported and camel-jetty8 has been removed
    • Spring 4.0 is no longer supported and camel-test-spring40 has been removed
    • Spring 3.x is no longer supported
    • Upgraded to Spring 4.3.x and Spring Boot 1.4.x (only spring-dm using spring 3.2.x as part of camel-spring in osgi/karaf is still in use - but spring-dm is deprecated and we recommend using blueprint)
    • Camel-gae has been removed (was not working anyway)
    • MongoDB component is migrated to MongoDB 3.
    • The camel-cache module is deprecated, you should use camel-ehcache instead.
    • The camel-docker module has been removed from Karaf features as it does not work in OSGi

For more informations about the upcoming release you can read the Camel 2.18.0 Release page. We hope you’ll enjoy this release and obviously feedback are more than welcome, like Contributions.

Testing camel-cassandraql on a Dockerized Apache Cassandra cluster

From Camel-2.15.0 we have a component for the integration with Apache Cassandra. With this post I’d like to show you what you can do with this component on a real Cassandra cluster. We will use the docker images from my personal account on Docker Hub. As first step we need to setup our Cluster.

Spinning up an Apache Cassandra cluster with Docker

For this post we will use Docker to spin up our Apache Cassandra Cluster. When everything will be up and running we will run a simple example to show how the Camel-cassandraql component can be used to interact with the cluster.

As first step we will need to run a single node cluster:

docker run --name master_node -dt oscerd/cassandra

This command will run a single container with an Apache Cassandra instance (the Cassandra version is the one of latest tick-tock releases, the 3.6). We could use a single-node cluster but in this way we won’t exploit the Cassandra features. So let’s add two other nodes!

docker run --name node1 -d -e SEED="$(docker inspect --format='' master_node)" oscerd/cassandra
docker run --name node2 -d -e SEED="$(docker inspect --format='' master_node)" oscerd/cassandra

Now we have two other nodes in our Cluster. The environment variable SEED will be used to make the node aware of, at least, one of the nodes in the Cluster. In the cassandra.yaml file there will be a seeds entry to track this information. More informations can be found in the Cassandra documentation. To be sure everything is up and running we need to use the Nodetool utility, by running the following command:

docker exec -ti master_node /opt/cassandra/bin/nodetool status

and we should get the Cluster status as output:

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.17.0.3  102.67 KiB  256          65.9%             1a985c48-33a1-44aa-b7e9-f1a3620a6482  rack1
UN  172.17.0.2  107.64 KiB  256          68.2%             da54ce5e-6433-4ea0-b2c3-fbc6c63ea955  rack1
UN  172.17.0.4  15.42 KiB  256          65.8%             0f2ba25a-37b0-4f27-a10a-d9a44655396a  rack1

To run the example we need to create a keyspace and a table. Locally I have downloaded my Apache Cassandra package with version 3.7. Run the cqlsh command:

<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='' master_node)

You should see the Cqlsh prompt

Connected to Test Cluster at 172.17.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.6 | CQL spec 3.4.2 | Native protocol v4]
Use HELP for help.
cqlsh>

Let’s create a namespace test with a table users

create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;

and check everything is fine by run a simple query:

cqlsh> use test;
cqlsh:test> select * from users;

 id | name
----+--------
  1 | oscerd

(1 rows)
cqlsh:test> 

You can do the same on the other two nodes, to check you cluster is working as you expect. Following this steps we now have a three nodes cluster up and running. Let’s use a simple camel-cassandraql example.

Run the example from Apache Camel

In Apache Camel I added a little example showing how to query (a select and an insert operations) an Apache Cassandra cluster with the related component, you can find the example here Camel-example-cdi-cassandraql You can run the example with:

mvn clean compile
mvn camel:run

and you should have the following output:

2016-07-24 15:33:50,812 [cdi.Main.main()] INFO  Version                        - WELD-000900: 2.3.5 (Final)
Jul 24, 2016 3:33:50 PM org.apache.deltaspike.core.impl.config.EnvironmentPropertyConfigSourceProvider <init>
INFO: Custom config found by DeltaSpike. Name: 'META-INF/apache-deltaspike.properties', URL: 'file:/home/oscerd/workspace/apache-camel/camel/examples/camel-example-cdi-cassandraql/target/classes/META-INF/apache-deltaspike.properties'
Jul 24, 2016 3:33:50 PM org.apache.deltaspike.core.util.ProjectStageProducer initProjectStage
INFO: Computed the following DeltaSpike ProjectStage: Production
2016-07-24 15:33:51,064 [cdi.Main.main()] INFO  Bootstrap                      - WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2016-07-24 15:33:51,170 [cdi.Main.main()] INFO  Event                          - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.message.MessageBundleExtension.detectInterfaces(@Observes ProcessAnnotatedType) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,174 [cdi.Main.main()] INFO  Event                          - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.interceptor.GlobalInterceptorExtension.promoteInterceptors(@Observes ProcessAnnotatedType, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,189 [cdi.Main.main()] INFO  Event                          - WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,195 [cdi.Main.main()] INFO  Event                          - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.exclude.extension.ExcludeExtension.vetoBeans(@Observes ProcessAnnotatedType, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN  Validator                      - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.throttling.ThrottledInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN  Validator                      - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.lock.LockedInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN  Validator                      - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.future.FutureableInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:52,244 [cdi.Main.main()] INFO  CdiCamelExtension              - Camel CDI is starting Camel context [camel-example-cassandraql-cdi]
2016-07-24 15:33:52,245 [cdi.Main.main()] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-cassandraql-cdi) is starting
2016-07-24 15:33:52,246 [cdi.Main.main()] INFO  ManagedManagementStrategy      - JMX is enabled
2016-07-24 15:33:52,352 [cdi.Main.main()] INFO  DefaultTypeConverter           - Loaded 189 type converters
2016-07-24 15:33:52,367 [cdi.Main.main()] INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-07-24 15:33:52,465 [cdi.Main.main()] INFO  DefaultCamelContext            - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-07-24 15:33:52,547 [cdi.Main.main()] INFO  NettyUtil                      - Did not find Netty's native epoll transport in the classpath, defaulting to NIO.
2016-07-24 15:33:52,789 [cdi.Main.main()] INFO  DCAwareRoundRobinPolicy        - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-07-24 15:33:52,790 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.3:9042 added
2016-07-24 15:33:52,791 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.2:9042 added
2016-07-24 15:33:52,791 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.4:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO  DCAwareRoundRobinPolicy        - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.3:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.2:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.4:9042 added
2016-07-24 15:33:52,985 [cdi.Main.main()] INFO  DefaultCamelContext            - Route: route1 started and consuming from: timer://stream?repeatCount=1
2016-07-24 15:33:52,986 [cdi.Main.main()] INFO  DefaultCamelContext            - Total 1 routes, of which 1 are started.
2016-07-24 15:33:52,987 [cdi.Main.main()] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-cassandraql-cdi) started in 0.742 seconds
2016-07-24 15:33:53,018 [cdi.Main.main()] INFO  Bootstrap                      - WELD-ENV-002003: Weld SE container STATIC_INSTANCE initialized
2016-07-24 15:33:54,041 [ timer://stream] INFO  route1                         - Result from query [Row[1, oscerd]]

Running the query again you should see another entry in the users table:

cqlsh> use test;
cqlsh:test> select * from users;

 id | name
----+-----------
  1 |    oscerd
  2 | davsclaus

(2 rows)
cqlsh:test>

The route of the example is simple:

    @ContextName("camel-example-cassandraql-cdi")
    static class KubernetesRoute extends RouteBuilder {

        @Override
        public void configure() {
            from("timer:stream?repeatCount=1")
                .to("cql://{{cassandra-master-ip}},{{cassandra-node1-ip}},{{cassandra-node2-ip}}/test?cql={{cql-select-query}}&consistencyLevel=quorum")
                .log("Result from query ${body}")
                .process(exchange -> {
                    exchange.getIn().setBody(Arrays.asList("davsclaus"));
                })
                .to("cql://{{cassandra-master-ip}},{{cassandra-node1-ip}},{{cassandra-node2-ip}}/test?cql={{cql-insert-query}}&consistencyLevel=quorum");
        }
    }

As you may see we are using a Quorum Consistency Level for both the select and insert operations. You can find more informations about it in the Cassandra Consistency Level documentation

Conclusions

Using Docker to spin up a Cassandra Cluster can be useful during integration and performance testing. In this post we saw a little example of the camel-cassandraql features in a real Apache Cassandra cluster.

Before the camel-cassandraql Pull Request submission, I was working on a Camel-Cassandra component too. This component is a bit different from the cassandraql one and you can find more information in the Github Repository. It is aligned with the latest Apache Camel released version 2.17.2.