28 Jun 2018
In the last months I worked on Syndesis project. Syndesis is an hybrid integration platform based on Apache Camel. During this time I had the need to build this platform against a Camel Snapshot version to test some new features I added into the Apache Camel project and it wasn’t truly easy. Adding the possibility to build the platform against different Camel snapshots and versions can be very useful to test Camel master and also to have an idea of how new/updated Camel components behave in this platform. I think it would be useful for end users too.
Normal Workflow
Building Syndesis platform is not super easy at first sight, but it’s very well documented and complete.
> syndesis/tools/bin$ ./syndesis build --help
Run Syndesis builds
Usage: syndesis build [... options ...]
Options for build:
-b --backend Build only backend modules (core, extension, integration, connectors, server, meta)
--images Build only modules with Docker images (ui, server, meta, s2i)
-m --module <m1>,<m2>, .. Build modules
Modules: ui, server, connector, s2i, meta, integration, extension, common
-d --dependencies Build also all project the specified module depends on
--skip-tests Skip unit and system test execution
--skip-checks Disable all checks
-f --flash Skip checks and tests execution (fastest mode)
-i --image-mode <mode> <mode> can be
- "none" : No images are build (default)
- "openshift" : Build for OpenShift image streams
- "docker" : Build against a plain Docker daemon
- "auto" : Automatically detect whether to use
"openshift" or "docker"
--docker == --image-mode docker
--openshift == --image-mode openshift
-p --project <project> Specifies the project to create images in when using '--openshift'
-k --kill-pods Kill pods after the image has been created.
Useful when building with image-mode docker
-c --clean Run clean builds (mvn clean)
--batch-mode Run mvn in batch mode
--camel-snapshot Run a build with a specific Camel snapshot. You'll need to set an environment variable CAMEL_SNAPSHOT_VERSION with the SNAPSHOT version you want to use.
--man Open HTML documentation in the Syndesis Developer Handbook
I use Minishift to play with Syndesis and my normal workflow is the following:
First I spin up a Minishift instance
> minishift start --memory 8384
You could need to specify a vm-driver too with the –vm-driver flag.
Then I set the docker environment coming from Minishift
> eval $(minishift docker-env)
At this point I’m able to build
> syndesis/tools/bin$ ./syndesis build --openshift --clean
Once the build it’s done (it may take a while) we are able to deploy Syndesis platform on Minishift
> syndesis/tools/bin$ ./syndesis minishift --install --openshift
Once the deployment is finished we are able to start using Syndesis platform.
Building with a Camel Snapshot
The option you’ll need in this case will be –camel-snapshot, in combination with an environment variable called CAMEL_SNAPSHOT_VERSION
.
In my case I need to test a new feature in a component from Camel 2.21.2-SNAPSHOT. The workflow to obtain a running Syndesis instance based on Camel 2.21.2-SNAPSHOT is the following (supposing you have a running Minishift). I built Camel 2.21.2-SNAPSHOT locally before following these steps.
Set the docker environment coming from Minishift
> eval $(minishift docker-env)
Export CAMEL_SNAPSHOT_VERSION
environment variable
> export CAMEL_SNAPSHOT_VERSION="2.21.2-SNAPSHOT"
Run a Syndesis build with –camel-snapshot flag enabled
> syndesis/tools/bin$ ./syndesis build --openshift --clean --camel-snapshot
Once the build finished, you can deploy your Syndesis on Minishift
> syndesis/tools/bin$ ./syndesis minishift --install --openshift
This is all you need to test a Syndesis platform based on Camel snapshot.
Conclusion
In the future I’ll blog about the Syndesis platform a bit more. If you want to contribute you can start from the Github project, the site or an extension.
31 Jul 2017
In the last couple of months I worked on a side project: Infinispan-Kafka. This project is based on the Kafka Connect tool: Kafka Connect is a tool for streaming data between Apache Kafka and other systems. There are two sides where data can be streamed: from Kafka to a different system (Sink Connector) and from a different system to Kafka (Source Connector). The Infinispan Kafka project implements only the Sink Connector (for the moment).
Basic Idea
In Infinispan, through the Protostream and Infinispan Remote Querying an end-user is able to remote querying Infinispan in a language-neutral manner, by using Protobuf. The Infinispan client must be configured to use a dedicated marshaller, ProtoStreamMarshaller and this one will use the ProtoStream library for encoding objects. ProtoStream library has to be instructed on how to marshall the message types, here is the documentation.
So the idea behind the Infinispan-Kafka project is to use the Protostream library and the ProtoStreamMarshaller to define what kind of data can be saved in Infinispan cache from Kafka and let Infinispan manage the marshalling/storing, creating a contract between Infinispan and Kafka.
The Infinispan-Kafka connector use the following configuration properties (for the moment):
Name |
Description |
Type |
Default |
Importance |
infinispan.connection.hosts |
List of comma separated Infinispan hosts |
string |
localhost |
high |
infinispan.connection.hotrod.port |
Infinispan Hot Rod port |
int |
11222 |
high |
infinispan.connection.cache.name |
Infinispan Cache name of use |
String |
default |
medium |
infinispan.use.proto |
If true, the Remote Cache Manager will be configured to use protostream schemas |
boolean |
false |
medium |
infinispan.proto.marshaller.class |
If infinispan.use.proto is true, this option has to contain an annotated protostream class to be used |
Class |
String.class |
medium |
infinispan.cache.force.return.values |
By default, previously existing values for Map operations are not returned, if set to true the values will be returned |
boolean |
false |
low |
an example of annotated protostream class is the following
package org.infinispan.kafka;
import java.io.Serializable;
import org.infinispan.protostream.annotations.ProtoDoc;
import org.infinispan.protostream.annotations.ProtoField;
@ProtoDoc("@Indexed")
public class Author implements Serializable {
private String name;
@ProtoField(number = 1, required = true)
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Author [name=" + name + "]";
}
}
A running example
Lets see how the connector works in a real example. You’ll need:
- Infinispan server 9.1.0.Final
- Kafka 0.11.0.0 running
and these three projects
- https://github.com/oscerd/infinispan-kafka-demo
- https://github.com/oscerd/infinispan-kafka-producer
- https://github.com/oscerd/camel-infinispan-kafka-demo
But, since infinispan-kafka is not yet released, you’ll need to build the project to have it available in your Local Maven repository.
So fork or clone the Infinispan-Kafka project and run
Now lets see what is inside the different projects.
The Infinispan-kafka demo
In the repository you’ll find a properties file for the Infinispan-Kafka sink connector and a Protostream annotated class named Author.
The configuration is the following:
name=InfinispanSinkConnector
topics=test
tasks.max=1
connector.class=org.infinispan.kafka.InfinispanSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
infinispan.connection.hosts=127.0.0.1
infinispan.connection.hotrod.port=11222
infinispan.connection.cache.name=default
infinispan.cache.force.return.values=true
infinispan.use.proto=true
infinispan.proto.marshaller.class=org.infinispan.kafka.Author
We will use the topic test from our Kafka cluster
The Infinispan-kafka producer
In this project we define a Simple Producer that will send record to our Kafka cluster in the topic test. We will send five record of Author.
The Camel-infispan-kafka demo
In this project we define a Standalone camel route. This route will run every 10 seconds and it will query the Infinispan cache with a Remote Query defined here.
Run the example now!
Now that we have a full view of this demo we can run it!
Let’s start from the servers.
First we need to start the Infinispan server, in this case in standalone mode.
>infinispan-server-9.1.0.Final/bin$ ./standalone.sh
=========================================================================
JBoss Bootstrap Environment
JBOSS_HOME: /home/oscerd/playground/infinispan-server-9.1.0.Final
JAVA: /usr/lib/jvm/jdk1.8.0_65//bin/java
JAVA_OPTS: -server -server -Xms64m -Xmx512m -Djava.net.preferIPv4Stack=true -Djboss.modules.system.pkgs=org.jboss.byteman -Djava.awt.headless=true
=========================================================================
11:36:28,886 INFO [org.jboss.modules] (main) JBoss Modules version 1.5.2.Final
11:36:29,046 INFO [org.jboss.msc] (main) JBoss MSC version 1.2.6.Final
11:36:29,104 INFO [org.jboss.as] (MSC service thread 1-6) WFLYSRV0049: Infinispan Server 9.1.0.Final (WildFly Core 2.2.0.Final) starting
11:36:29,788 INFO [org.jboss.as.server] (Controller Boot Thread) WFLYSRV0039: Creating http management service using socket-binding (management-http)
11:36:29,801 INFO [org.xnio] (MSC service thread 1-6) XNIO version 3.4.0.Final
11:36:29,806 INFO [org.xnio.nio] (MSC service thread 1-6) XNIO NIO Implementation Version 3.4.0.Final
11:36:29,823 INFO [org.jboss.as.clustering.infinispan] (ServerService Thread Pool -- 20) Activating Infinispan subsystem.
11:36:29,823 INFO [org.wildfly.extension.io] (ServerService Thread Pool -- 19) WFLYIO001: Worker 'default' has auto-configured to 16 core threads with 128 task threads based on your 8 available processors
11:36:29,835 INFO [org.jboss.as.connector.subsystems.datasources] (ServerService Thread Pool -- 18) WFLYJCA0004: Deploying JDBC-compliant driver class org.h2.Driver (version 1.3)
11:36:29,843 INFO [org.jboss.as.naming] (ServerService Thread Pool -- 25) WFLYNAM0001: Activating Naming Subsystem
11:36:29,848 WARN [org.jboss.as.txn] (ServerService Thread Pool -- 29) WFLYTX0013: Node identifier property is set to the default value. Please make sure it is unique.
11:36:29,849 INFO [org.jboss.as.connector] (MSC service thread 1-8) WFLYJCA0009: Starting JCA Subsystem (WildFly/IronJacamar 1.3.4.Final)
11:36:29,851 INFO [org.jboss.as.connector.deployers.jdbc] (MSC service thread 1-3) WFLYJCA0018: Started Driver service with driver-name = h2
11:36:29,860 INFO [org.jboss.as.security] (ServerService Thread Pool -- 27) WFLYSEC0002: Activating Security Subsystem
11:36:29,861 INFO [org.jboss.remoting] (MSC service thread 1-6) JBoss Remoting version 4.0.21.Final
11:36:29,872 INFO [org.jboss.as.security] (MSC service thread 1-2) WFLYSEC0001: Current PicketBox version=4.9.6.Final
11:36:29,876 INFO [org.jboss.as.naming] (MSC service thread 1-7) WFLYNAM0003: Starting Naming Service
11:36:30,079 INFO [org.jboss.as.server.deployment.scanner] (MSC service thread 1-4) WFLYDS0013: Started FileSystemDeploymentService for directory /home/oscerd/playground/infinispan-server-9.1.0.Final/standalone/deployments
11:36:30,394 INFO [org.infinispan.factories.GlobalComponentRegistry] (MSC service thread 1-5) ISPN000128: Infinispan version: Infinispan 'Bastille' 9.1.0.Final
11:36:30,535 INFO [org.jboss.as.connector.subsystems.datasources] (MSC service thread 1-8) WFLYJCA0001: Bound data source [java:jboss/datasources/ExampleDS]
11:36:30,745 INFO [org.jboss.as.clustering.infinispan] (MSC service thread 1-6) DGISPN0001: Started default cache from local container
11:36:30,746 INFO [org.jboss.as.clustering.infinispan] (MSC service thread 1-7) DGISPN0001: Started namedCache cache from local container
11:36:30,763 INFO [org.infinispan.server.endpoint] (MSC service thread 1-3) DGENDPT10000: HotRodServer starting
11:36:30,766 INFO [org.infinispan.server.endpoint] (MSC service thread 1-3) DGENDPT10001: HotRodServer listening on 127.0.0.1:11222
11:36:30,770 INFO [org.infinispan.server.endpoint] (MSC service thread 1-5) DGENDPT10000: REST starting
11:36:31,072 INFO [org.infinispan.server.endpoint] (MSC service thread 1-5) DGENDPT10002: REST listening on 127.0.0.1:8080 (mapped to rest)
11:36:31,258 INFO [org.jboss.as] (Controller Boot Thread) WFLYSRV0060: Http management interface listening on http://127.0.0.1:9990/management
11:36:31,259 INFO [org.jboss.as] (Controller Boot Thread) WFLYSRV0051: Admin console listening on http://127.0.0.1:9990
11:36:31,259 INFO [org.jboss.as] (Controller Boot Thread) WFLYSRV0025: Infinispan Server 9.1.0.Final (WildFly Core 2.2.0.Final) started in 2602ms - Started 152 of 163 services (49 services are lazy, passive or on-demand)
Next we need to start our Kafka server, as always we’ll need to start zookeeper first and then the server.
>kafka_2.12-0.11.0.0$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2017-07-31 11:38:55,401] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2017-07-31 11:38:55,404] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-07-31 11:38:55,404] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-07-31 11:38:55,404] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-07-31 11:38:55,404] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2017-07-31 11:38:55,417] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2017-07-31 11:38:55,417] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2017-07-31 11:38:55,728] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2017-07-31 11:38:55,728] INFO Server environment:host.name=ghost (org.apache.zookeeper.server.ZooKeeperServer)
[2017-07-31 11:38:55,728] INFO Server environment:java.version=1.8.0_65 (org.apache.zookeeper.server.ZooKeeperServer)
.
.
.
.
And the server
>kafka_2.12-0.11.0.0$ bin/kafka-server-start.sh config/server.properties
[2017-07-31 11:40:31,244] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
.
.
.
.
.
.
.
Next we’ll need to create a new topic named test.
>kafka_2.12-0.11.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
The servers are now up and running. At this point we need to start the Infinispan-Kafka connector demo.
>infinispan-kafka-demo$ mvn clean package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building infinispan-kafka-demo 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ infinispan-kafka-demo ---
[INFO] Deleting /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ infinispan-kafka-demo ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO]
[INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ infinispan-kafka-demo ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 1 source file to /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ infinispan-kafka-demo ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ infinispan-kafka-demo ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ infinispan-kafka-demo ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ infinispan-kafka-demo ---
[INFO] Building jar: /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target/infinispan-kafka-demo-0.0.1-SNAPSHOT.jar
[INFO]
[INFO] --- maven-assembly-plugin:2.5.3:single (make-assembly) @ infinispan-kafka-demo ---
[INFO] Reading assembly descriptor: src/main/assembly/package.xml
[WARNING] The following patterns were never triggered in this artifact exclusion filter:
o 'org.apache.kafka:connect-api'
[INFO] Copying files to /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target/infinispan-kafka-demo-0.0.1-SNAPSHOT-package
[WARNING] Assembly file: /home/oscerd/workspace/miscellanea/infinispan-kafka-demo/target/infinispan-kafka-demo-0.0.1-SNAPSHOT-package is not a regular file (it may be a directory). It cannot be attached to the project build for installation or deployment.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.799 s
[INFO] Finished at: 2017-07-31T11:50:11+02:00
[INFO] Final Memory: 30M/530M
[INFO] ------------------------------------------------------------------------
>infinispan-kafka-demo$ export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
We can start the connector now with the configuration of the demo project, in this way:
>infinispan-kafka-demo$ kafka_2.12-0.11.0.0/bin/connect-standalone.sh kafka_2.12-0.11.0.0/config/connect-standalone.properties config/InfinispanSinkConnector.properties
[2017-07-31 11:58:37,893] INFO Registered loader: sun.misc.Launcher$AppClassLoader@18b4aac2 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-07-31 11:58:37,895] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,895] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.infinispan.kafka.InfinispanSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,896] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,897] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,897] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,897] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,898] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-07-31 11:58:37,899] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,899] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,899] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,899] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,899] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'SchemaSourceConnector' and 'SchemaSource' to plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'InfinispanSinkConnector' and 'InfinispanSink' to plugin 'org.infinispan.kafka.InfinispanSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-07-31 11:58:37,900] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-07-31 11:58:37,901] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-07-31 11:58:37,901] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-07-31 11:58:37,912] INFO StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
bootstrap.servers = [localhost:9092]
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
offset.flush.interval.ms = 10000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = /tmp/connect.offsets
plugin.path = null
rest.advertised.host.name = null
rest.advertised.port = null
rest.host.name = null
rest.port = 8083
task.shutdown.graceful.timeout.ms = 5000
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:223)
[2017-07-31 11:58:38,004] INFO Logging initialized @2593ms (org.eclipse.jetty.util.log:186)
[2017-07-31 11:58:38,153] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:49)
[2017-07-31 11:58:38,153] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2017-07-31 11:58:38,153] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:144)
[2017-07-31 11:58:38,154] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:59)
[2017-07-31 11:58:38,155] INFO Worker started (org.apache.kafka.connect.runtime.Worker:149)
[2017-07-31 11:58:38,155] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-07-31 11:58:38,155] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-07-31 11:58:38,218] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Jul 31, 2017 11:58:38 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2017-07-31 11:58:38,561] INFO Started o.e.j.s.ServletContextHandler@5dab9949{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-07-31 11:58:38,568] INFO Started ServerConnector@740c2895{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-07-31 11:58:38,569] INFO Started @3158ms (org.eclipse.jetty.server.Server:379)
[2017-07-31 11:58:38,569] INFO REST server listening at http://192.168.1.10:8083/, advertising URL http://192.168.1.10:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-07-31 11:58:38,569] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2017-07-31 11:58:38,575] INFO ConnectorConfig values:
connector.class = org.infinispan.kafka.InfinispanSinkConnector
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = InfinispanSinkConnector
tasks.max = 1
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-07-31 11:58:38,576] INFO EnrichedConnectorConfig values:
connector.class = org.infinispan.kafka.InfinispanSinkConnector
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = InfinispanSinkConnector
tasks.max = 1
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-07-31 11:58:38,576] INFO Creating connector InfinispanSinkConnector of type org.infinispan.kafka.InfinispanSinkConnector (org.apache.kafka.connect.runtime.Worker:204)
[2017-07-31 11:58:38,576] INFO Instantiated connector InfinispanSinkConnector with version 0.0.1-SNAPSHOT of type class org.infinispan.kafka.InfinispanSinkConnector (org.apache.kafka.connect.runtime.Worker:207)
[2017-07-31 11:58:38,577] INFO Finished creating connector InfinispanSinkConnector (org.apache.kafka.connect.runtime.Worker:225)
[2017-07-31 11:58:38,578] INFO SinkConnectorConfig values:
connector.class = org.infinispan.kafka.InfinispanSinkConnector
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = InfinispanSinkConnector
tasks.max = 1
topics = [test]
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig:223)
[2017-07-31 11:58:38,578] INFO EnrichedConnectorConfig values:
connector.class = org.infinispan.kafka.InfinispanSinkConnector
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = InfinispanSinkConnector
tasks.max = 1
topics = [test]
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-07-31 11:58:38,578] INFO Setting task configurations for 1 workers. (org.infinispan.kafka.InfinispanSinkConnector:50)
[2017-07-31 11:58:38,579] INFO Creating task InfinispanSinkConnector-0 (org.apache.kafka.connect.runtime.Worker:358)
[2017-07-31 11:58:38,579] INFO ConnectorConfig values:
connector.class = org.infinispan.kafka.InfinispanSinkConnector
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = InfinispanSinkConnector
tasks.max = 1
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-07-31 11:58:38,579] INFO EnrichedConnectorConfig values:
connector.class = org.infinispan.kafka.InfinispanSinkConnector
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = InfinispanSinkConnector
tasks.max = 1
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-07-31 11:58:38,580] INFO TaskConfig values:
task.class = class org.infinispan.kafka.InfinispanSinkTask
(org.apache.kafka.connect.runtime.TaskConfig:223)
[2017-07-31 11:58:38,580] INFO Instantiated task InfinispanSinkConnector-0 with version 0.0.1-SNAPSHOT of type org.infinispan.kafka.InfinispanSinkTask (org.apache.kafka.connect.runtime.Worker:373)
[2017-07-31 11:58:38,587] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-InfinispanSinkConnector
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:223)
[2017-07-31 11:58:38,631] INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-07-31 11:58:38,631] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-07-31 11:58:38,633] INFO Created connector InfinispanSinkConnector (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-07-31 11:58:38,634] INFO InfinispanSinkConnectorConfig values:
infinispan.cache.force.return.values = true
infinispan.connection.cache.name = default
infinispan.connection.hosts = 127.0.0.1
infinispan.connection.hotrod.port = 11222
infinispan.proto.marshaller.class = class org.infinispan.kafka.Author
infinispan.use.proto = true
(org.infinispan.kafka.InfinispanSinkConnectorConfig:223)
[2017-07-31 11:58:38,661] INFO Adding protostream (org.infinispan.kafka.InfinispanSinkTask:90)
[2017-07-31 11:58:38,749] INFO ISPN004021: Infinispan version: 9.1.0.Final (org.infinispan.client.hotrod.RemoteCacheManager:212)
[2017-07-31 11:58:38,908] INFO Sink task WorkerSinkTask{id=InfinispanSinkConnector-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-07-31 11:58:38,996] INFO Discovered coordinator ghost:9092 (id: 2147483647 rack: null) for group connect-InfinispanSinkConnector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-07-31 11:58:38,998] INFO Revoking previously assigned partitions [] for group connect-InfinispanSinkConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-07-31 11:58:38,998] INFO (Re-)joining group connect-InfinispanSinkConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-07-31 11:58:39,056] INFO Successfully joined group connect-InfinispanSinkConnector with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-07-31 11:58:39,057] INFO Setting newly assigned partitions [test-0] for group connect-InfinispanSinkConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
Lets run the camel route to query the Infinispan cache and see the result in this moment.
>camel-infinispan-kafka-demo$ mvn clean compile exec:java
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Camel example with Infinispan 2.20.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ camel-infinispan-kafka-demo ---
[INFO] Deleting /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/target
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ camel-infinispan-kafka-demo ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ camel-infinispan-kafka-demo ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 4 source files to /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/target/classes
[WARNING] /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/src/main/java/com/github/oscerd/camel/infinispan/kafka/demo/CamelInfinispanRoute.java: /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/src/main/java/com/github/oscerd/camel/infinispan/kafka/demo/CamelInfinispanRoute.java uses unchecked or unsafe operations.
[WARNING] /home/oscerd/workspace/miscellanea/camel-infinispan-kafka-demo/src/main/java/com/github/oscerd/camel/infinispan/kafka/demo/CamelInfinispanRoute.java: Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- exec-maven-plugin:1.5.0:java (default-cli) @ camel-infinispan-kafka-demo ---
[.kafka.demo.Application.main()] RemoteCacheManager INFO ISPN004021: Infinispan version: 9.1.0.Final
[.kafka.demo.Application.main()] DefaultCamelContext INFO Apache Camel 2.20.0-SNAPSHOT (CamelContext: camel-1) is starting
[.kafka.demo.Application.main()] ManagedManagementStrategy INFO JMX is enabled
[.kafka.demo.Application.main()] DefaultTypeConverter INFO Type converters loaded (core: 192, classpath: 0)
[.kafka.demo.Application.main()] DefaultCamelContext INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[.kafka.demo.Application.main()] DefaultCamelContext INFO Route: route1 started and consuming from: timer://foo?period=10000&repeatCount=0
[.kafka.demo.Application.main()] DefaultCamelContext INFO Total 1 routes, of which 1 are started.
[.kafka.demo.Application.main()] DefaultCamelContext INFO Apache Camel 2.20.0-SNAPSHOT (CamelContext: camel-1) started in 0.221 seconds
Starting Camel. Use ctrl + c to terminate the JVM.
[.kafka.demo.Application.main()] DefaultCamelContext INFO Apache Camel 2.20.0-SNAPSHOT (CamelContext: camel-2) is starting
[.kafka.demo.Application.main()] ManagedManagementStrategy INFO JMX is enabled
[.kafka.demo.Application.main()] DefaultTypeConverter INFO Type converters loaded (core: 192, classpath: 0)
[.kafka.demo.Application.main()] DefaultCamelContext INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[.kafka.demo.Application.main()] DefaultCamelContext INFO Total 0 routes, of which 0 are started.
[.kafka.demo.Application.main()] DefaultCamelContext INFO Apache Camel 2.20.0-SNAPSHOT (CamelContext: camel-2) started in 0.012 seconds
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 0
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content []
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 0
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content []
.
.
.
.
.
We don’t have data because no records has been sent to Kafka topic. Lets add some data through the Infinispan-Kafka producer.
>infinispan-kafka-producer$ mvn clean compile exec:exec
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Infinispan Kafka Producer 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ infinispan-kafka-producer ---
[INFO] Deleting /home/oscerd/workspace/miscellanea/infinispan-kafka-producer/target
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ infinispan-kafka-producer ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.2:compile (default-compile) @ infinispan-kafka-producer ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /home/oscerd/workspace/miscellanea/infinispan-kafka-producer/target/classes
[INFO]
[INFO] --- exec-maven-plugin:1.5.0:exec (default-cli) @ infinispan-kafka-producer ---
2017-07-31 12:03:12 INFO ProducerConfig:223 - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2017-07-31 12:03:12 INFO AppInfoParser:83 - Kafka version : 0.11.0.0
2017-07-31 12:03:12 INFO AppInfoParser:84 - Kafka commitId : cb8625948210849f
2017-07-31 12:03:13 INFO KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.864 s
[INFO] Finished at: 2017-07-31T12:03:13+02:00
[INFO] Final Memory: 18M/296M
[INFO] ------------------------------------------------------------------------
In the connector log we should see something like this:
[2017-07-31 12:03:13,261] INFO Received 5 records (org.infinispan.kafka.InfinispanSinkTask:65)
[2017-07-31 12:03:13,261] INFO Record kafka coordinates:(test-1-{"name":"Andrea Cosentino"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:13,290] INFO Record kafka coordinates:(test-2-{"name":"Jonathan Anstey"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:13,292] INFO Record kafka coordinates:(test-3-{"name":"Claus Ibsen"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:13,294] INFO Record kafka coordinates:(test-4-{"name":"Normam Maurer"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:13,295] INFO Record kafka coordinates:(test-5-{"name":"Philip Roth"}). Writing it to Infinispan... (org.infinispan.kafka.InfinispanSinkTask:69)
[2017-07-31 12:03:20,015] INFO WorkerSinkTask{id=InfinispanSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:278)
While in the Camel-Infinispan-Kafka demo logs we should see a different result from the query:
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 0
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content []
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result size 1
[mel-1) thread #1 - timer://foo] CamelInfinispanRoute INFO Query Result content [Author [name=Andrea Cosentino]]
As you may see the Infinispan Cache has been populated with the data coming from Kafka topic test.
Conclusion
This blog post introduces the new Infinispan-Kafka connector and show a little demo involving, Kafka, Infinispan and Camel. Obviously there is so much more work to do on the connector and contributions are more than welcome. Follow the developments on the Github repo.
24 Feb 2017
In the last days I started to look at the Cryptography’s Theory again. I guess it’s an interesting subject for anyone involved in the IT world: for developers and security specialists understand some key concepts is essential to manage particular scenarios. Yesterday there was the Google’s announce about the first SHA-1 collision: hash functions will be in one of the articles I currently have in mind. I’ll try to write about Cryptographic pills in episodes and I hope you’ll appreciate the subject. Let’s start!
In the beginning
Until the late 20th century Cryptography was considered an art form. Constructing good codes, breaking existing ones and be able to understand the algorithm behind a code relies on creativity and personal skill. There wasn’t so much theory around the subject. With the Cryptography’s spread in areas different from Military and Defense the whole picture radically changed: a rich theory and rigorous study made the Cryptography an applied Science. Today Cryptography is everywhere.
Private-key Encryption
Cryptography was historically concerned with secret communications. In the past we were talking about ciphers, today we talk about encryption schemes: the concepts are the same. Two parts want to communicate in a secret manner and they decide to use a particular encryption scheme. An assumption in this scenario is that the communicating parties must have a way to initially sharing a key in a secret manner.
A private-key Encryption scheme is based on three algorithms [1]:
- A Key-generation algorithm Gen is a probabilistic algorithm that outputs a key k chosen according to some distribution that is determined by the scheme.
- The Encryption Algorithm Enc takes as input a key k and a plaintext message m and outputs a cyphertext c. We denote by the encryption of the plaintext m using the key k in the following way
$$Enc_k(m)$$
- The Decryption Algorithm Dec takes as input a key k and a cyphertext message c and outputs a plaintext m. We denote by the decryption of the cyphertext c using the key k in the following way
$$Dec_k(c)$$
- The set of all possible keys generated by the Gen algorithm is denoted as K and it is called the Key Space.
- The set of all “legal” messages (supported by the Encryption Algorithm Enc) is denoted as M and it is called the Message Space.
- Since any Cyphertext is obtained by encrypting some plaintext under some key, the sets K and M together define a set of all possible Cyphertexts denoted with C.
Kerckhoffs’s principle
From the definitions above it’s clear that if an eavesdropping adversary knows the Dec algorithm and the chosen key k, he will be able to decrypt the messages exchanged between the two communicating parts. The key must be secret and this is a fundamental point, but do we need to maintain secret the Encryption and Decryption algorithms too? In the late 19th century August Kerckhoffs gave his opinion:
The cipher method must not be required to be secret, and it must be able to fall in the hands of the enemy without incovenience
It’s something related to Open Source and Open Standards too. Today any Encryption Scheme is usually demanded to be made public. The main reasons for this can be summarized in the following points:
- The public scrutiny make the design stronger.
- If flaws exist it can be found by someone hacking on the scheme
- Public design enables the establishment of standards.
The Kerckhoffs’ principle is obvious, but in same cases it was ignored with disastrous results. Only publicly tried and tested algorithm should be used by organizations. Kerchoffs was one of the first person with an Open vision about these things.
What’s next
In the next articles I would like to talk about the theory behind the Cryptographic schemes strenght. How we can say a scheme is secure? What are the attack scenarios? What is the definition of a secure scheme?
[1] Introduction to Modern Cryptography, Chapman & Hall/Crc, Jonathan Katz, Yehuda Lindell, Chapter 1.
14 Oct 2016
In the last months I blogged about Testing Camel-cassandraql on a Dockerized cluster. This time I’d like to show you a similar example running on Kubernetes. I’ve worked on a fabric8 quickstart splitted in two different projects: Cassandra server and Cassandra client. The server will spin up a Cassandra cluster with three nodes and the client will run a camel-cassandraql route querying the keyspace in the Cluster. You’ll need a running Kubernetes cluster to follow this example. I decided to use Fabric8 on Minikube, but you can also try to run the example using Fabric8 on Minishift. Once you have your environment is up and running you can start to execute the following sections commands.
Spinning up an Apache Cassandra cluster on Kubernetes
For this post we will use Kubernetes to spin up our Apache Cassandra Cluster. To start the cluster you can use the Fabric8 IPaas Cassandra app. Since we are using Minikube, you’ll need to follow the related instructions.
This project will spin up a Cassandra Cluster with three nodes.
As first step, in the cassandra folder, we will need to run
and
Once we’re done with this commands we can watch the Kubernetes pods state for our specific app and after a while we should be able to see our pods running:
oscerd@localhost:~/workspace/fabric8-universe/fabric8-ipaas/cassandra$ kubectl get pods -l app=cassandra --watch
NAME READY STATUS RESTARTS AGE
cassandra-2459930792-lfh93 1/1 Running 0 15s
cassandra-2459930792-m8c9p 1/1 Running 0 15s
cassandra-2459930792-zxiv3 1/1 Running 0 15s
I usually prefer the command line, but you can also watch the status of your pods on the wonderful Fabric8 Console and with Minikube/Minishift is super easy to access the console, simply run the following command:
or on Minishift
minishift service fabric8
In Apache Cassandra nodetool is one of the most important command to manage a Cluster and executing command on single node.
It is interesting to look at the way the Kubernetes team developed the Seeds discovery into them Cassandra example. The class to look at is KubernetesSeedProvider and it implements the SeedProvider interface from Apache Cassandra project. The getSeeds method is implemented here: if the KubernetesSeedProvider is unable to find seeds it will fall back to createDefaultSeeds method that is exactly the simpleSeedProvider implementation from Apache Cassandra Project.
The Kubenertes-cassandra jar is used in the Cassandra DockerFile from Google example and it is added to the classpath in the run.sh script.
In the Cassandra fabric8-ipaas project we used the deployment.yml and service.yml configuration. You can find the details in the repository. You can also scale up/down your cluster with this simple command:
kubectl scale --replicas=2 deployment/cassandra
and after a while you’ll see the scaled deployment:
oscerd@localhost:~/workspace/fabric8-universe/fabric8-ipaas/cassandrakubectl get pods -l app=cassandra --watch
NAME READY STATUS RESTARTS AGE
cassandra-2459930792-lfh93 1/1 Running 0 28m
cassandra-2459930792-m8c9p 1/1 Running 0 28m
Our cluster is up and running now and we can run our camel-cassandraql route from Cassandra client.
Running the Cassandra-client Fabric8 Quickstart
In Fabric8 Quickstarts organization I’ve added a little Cassandra-client example. The route will do a simple query on the keyspace we’ve just created on our Cluster. This quickstart requires Camel 2.18.0 release to work fine, now that 2.18.0 is out we can cover the example. The quickstart will first execute a bean to pre-populate the Cassandra keyspace as init-method and it defines a main route. It’s a very simple quickstart (single query/print result set), but it shows how it is easy to interact with the Cassandra Cluster. You don’t have to add contact points of single nodes to your Cassandra endpoint URI or use a complex configuration, you can simply use the service name (“cassandra”) and Kubernetes will do the work for you.
You can run the example with:
mvn clean install
mvn fabric8:run
You should be able to see your cassandra-client running:
oscerd@localhost:~/workspace/fabric8-universe/fabric8-quickstarts/cassandra-client$ kubectl get pods
NAME READY STATUS RESTARTS AGE
cassandra-2459930792-lfh93 1/1 Running 0 45m
cassandra-2459930792-m8c9p 1/1 Running 0 45m
cassandra-2459930792-ms265 1/1 Running 0 15m
cassandra-client-3789635050-e1f2g 1/1 Running 0 1m
With the fabric8:run command you’ll see the logs of the application in the console directly:
2016-10-14 09:06:15,786 [main ] INFO DCAwareRoundRobinPolicy - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-10-14 09:06:15,789 [main ] INFO Cluster - New Cassandra host cassandra/10.0.0.67:9042 added
2016-10-14 09:06:15,790 [main ] INFO Cluster - New Cassandra host /172.17.0.10:9042 added
2016-10-14 09:06:15,790 [main ] INFO Cluster - New Cassandra host /172.17.0.12:9042 added
2016-10-14 09:06:20,701 [main ] INFO SpringCamelContext - Apache Camel 2.18.0 (CamelContext: camel-1) is starting
2016-10-14 09:06:20,703 [main ] INFO ManagedManagementStrategy - JMX is enabled
2016-10-14 09:06:20,928 [main ] INFO DefaultTypeConverter - Loaded 190 type converters
2016-10-14 09:06:20,977 [main ] INFO DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-10-14 09:06:21,228 [main ] INFO SpringCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-10-14 09:06:21,231 [main ] INFO ClockFactory - Using native clock to generate timestamps.
2016-10-14 09:06:21,503 [main ] INFO DCAwareRoundRobinPolicy - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-10-14 09:06:21,548 [main ] INFO Cluster - New Cassandra host cassandra/10.0.0.67:9042 added
2016-10-14 09:06:21,548 [main ] INFO Cluster - New Cassandra host /172.17.0.11:9042 added
2016-10-14 09:06:21,548 [main ] INFO Cluster - New Cassandra host /172.17.0.10:9042 added
2016-10-14 09:06:22,047 [main ] INFO SpringCamelContext - Route: cassandra-route started and consuming from: timer://foo?period=5000
2016-10-14 09:06:22,075 [main ] INFO SpringCamelContext - Total 1 routes, of which 1 are started.
2016-10-14 09:06:22,083 [main ] INFO SpringCamelContext - Apache Camel 2.18.0 (CamelContext: camel-1) started in 1.374 seconds
2016-10-14 09:06:23,133 [0 - timer://foo] INFO cassandra-route - Query result set [Row[1, oscerd]]
2016-10-14 09:06:28,092 [0 - timer://foo] INFO cassandra-route - Query result set [Row[1, oscerd]]
As you can see everything is straightforward and you can think about more complex architectures. For example: an Infinispan cache with a persisted cache store based on Apache Cassandra Cluster with Java application, Camel routes or Spring-boot applications interacting with the Infinispan platform.
The route of the example is super simple:
<camelContext xmlns="http://camel.apache.org/schema/spring" depends-on="populate">
<route id="cassandra-route">
<from uri="timer:foo?period=5000"/>
<to uri="cql://cassandra/test?cql=select * from users;&consistencyLevel=quorum" />
<log message="Query result set ${body}"/>
</route>
</camelContext>
and it abstracts completely the details of the Cassandra cluster (how many nodes? Where are the nodes? What are the addresses of them? and so on).
Conclusions
This example shows only a bit of the power of Kubernetes/Openshift/Fabric8/Microservices world and in the future I would like to create a complex architecture quickstart. Stay tuned!
03 Oct 2016
The Camel community is actually in the process of releasing the new Apache Camel 2.18.0. This is a big release with a lot of new features and new components. Lets take a deeper look at what is coming.
-
Java 8
This is the first release requiring Java 8. We worked hard on this and we are working on Java 8 code style in the codebase.
-
Automatic Documentation
In Apache Camel 2.18.0 you’ll find new documentation. The components, endpoints, dataformats and languages are now documented completely in automatic way. This is important because in the past the Confluence documentation was often misaligned with respect to the codebase. The documentation is generated, if some modification has been done, during the component/endpoint/dataformat/language build. This feature will be the base for a new website including this material. You can see the generated docs in the camel-website
folder in the Apache Camel Repository.
-
Spring-boot and Wildfly-Swarm support
Camel is now present on the Spring-starter site and on the Wildfly-swarm one. All the new feature related to Spring-boot can be found in the article from Nicola Ferraro’s blog Apache Camel meets Spring-boot. Running Camel on Spring-boot has never been so easy.
-
Hystrix Circuit Breaker and Netflix OSS
This release will have a circuit breaker implementation using the Netflix Hystrix project (with dashboard too).
-
Distributed message tracing with camel-zipkin component
This component is used for tracing and timing incoming and outgoing Camel messages using Zipkin.
For more information take a look at the documentation or the Asciidoc file in the Apache Camel repository.
-
Service Call
This provide the feature of calling a remote service in a distributed system where the service is looked up from a service registry of some sorts. Camel won’t need to know where the service is hosted. Camel will lookup the service from from kubernetes, openshift, cloud foundry, zuul, consul, zookeeper etc.
-
New components
- camel-asterisk - For interacting with Asterisk PBX Servers
- camel-cm-sms - For sending SMS messages using SM SMS Gateway.
- camel-consul - For integrating your application with Consul.
- camel-ehcache - For interacting with Ehcache 3 cache.
- camel-flink - Bridges Camel connectors with Apache Flink tasks.
- camel-lumberjack - For receiving logs over the lumberjack protocol (used by Filebeat for instance)
- camel-ribbon - To use Netflixx Ribbon with the Service Call EIP.
- camel-servicenow - For cloud management with ServiceNow.
- camel-telegram - For messaging with Telegram.
- camel-zipkin - For tracking Camel message flows/timings using zipkin.
- camel-chronicle - For interacting with OpenHFT’s Chronicle-Engine.
-
New Dataformats
- camel-johnzon - Apache Johnzon is an implementation of JSR-353 (JavaTM API for JSON Processing).
-
New examples
In this release there will be some new examples. To name a few:
- camel-example-cdi-kubernetes - An example of camel-kubernetes component used to retrieve a list of pod from your Kubernetes cluster. The example is CDI-based.
- camel-example-java8 - Demonstrates the Java DSL with experimental new Java8 lambda support for expression/preidcate/processor’s. We love feedback on this DSL and expect to improved the API over the next couple of releases.
- camel-example-java8-rx - Demonstrates the Java DSL with experimental new Java8 lambda support for typesafe filtering and transforming of messages wit Rx-Java. We love feedback on this DSL and expect to improved the API over the next couple of releases.
-
Important changes
- Karaf 2.4.x is no longer supported. Karaf 4.x is the primary supported OSGi platform.
- Jetty 8.x is no longer supported and camel-jetty8 has been removed
- Spring 4.0 is no longer supported and camel-test-spring40 has been removed
- Spring 3.x is no longer supported
- Upgraded to Spring 4.3.x and Spring Boot 1.4.x (only spring-dm using spring 3.2.x as part of camel-spring in osgi/karaf is still in use - but spring-dm is deprecated and we recommend using blueprint)
- Camel-gae has been removed (was not working anyway)
- MongoDB component is migrated to MongoDB 3.
- The camel-cache module is deprecated, you should use camel-ehcache instead.
- The camel-docker module has been removed from Karaf features as it does not work in OSGi
For more informations about the upcoming release you can read the Camel 2.18.0 Release page. We hope you’ll enjoy this release and obviously feedback are more than welcome, like Contributions.