Testing camel-cassandraql on a Dockerized Apache Cassandra cluster

From Camel-2.15.0 we have a component for the integration with Apache Cassandra. With this post I’d like to show you what you can do with this component on a real Cassandra cluster. We will use the docker images from my personal account on Docker Hub. As first step we need to setup our Cluster.

Spinning up an Apache Cassandra cluster with Docker

For this post we will use Docker to spin up our Apache Cassandra Cluster. When everything will be up and running we will run a simple example to show how the Camel-cassandraql component can be used to interact with the cluster.

As first step we will need to run a single node cluster:

docker run --name master_node -dt oscerd/cassandra

This command will run a single container with an Apache Cassandra instance (the Cassandra version is the one of latest tick-tock releases, the 3.6). We could use a single-node cluster but in this way we won’t exploit the Cassandra features. So let’s add two other nodes!

docker run --name node1 -d -e SEED="$(docker inspect --format='' master_node)" oscerd/cassandra
docker run --name node2 -d -e SEED="$(docker inspect --format='' master_node)" oscerd/cassandra

Now we have two other nodes in our Cluster. The environment variable SEED will be used to make the node aware of, at least, one of the nodes in the Cluster. In the cassandra.yaml file there will be a seeds entry to track this information. More informations can be found in the Cassandra documentation. To be sure everything is up and running we need to use the Nodetool utility, by running the following command:

docker exec -ti master_node /opt/cassandra/bin/nodetool status

and we should get the Cluster status as output:

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.17.0.3  102.67 KiB  256          65.9%             1a985c48-33a1-44aa-b7e9-f1a3620a6482  rack1
UN  172.17.0.2  107.64 KiB  256          68.2%             da54ce5e-6433-4ea0-b2c3-fbc6c63ea955  rack1
UN  172.17.0.4  15.42 KiB  256          65.8%             0f2ba25a-37b0-4f27-a10a-d9a44655396a  rack1

To run the example we need to create a keyspace and a table. Locally I have downloaded my Apache Cassandra package with version 3.7. Run the cqlsh command:

<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='' master_node)

You should see the Cqlsh prompt

Connected to Test Cluster at 172.17.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.6 | CQL spec 3.4.2 | Native protocol v4]
Use HELP for help.
cqlsh>

Let’s create a namespace test with a table users

create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;

and check everything is fine by run a simple query:

cqlsh> use test;
cqlsh:test> select * from users;

 id | name
----+--------
  1 | oscerd

(1 rows)
cqlsh:test> 

You can do the same on the other two nodes, to check you cluster is working as you expect. Following this steps we now have a three nodes cluster up and running. Let’s use a simple camel-cassandraql example.

Run the example from Apache Camel

In Apache Camel I added a little example showing how to query (a select and an insert operations) an Apache Cassandra cluster with the related component, you can find the example here Camel-example-cdi-cassandraql You can run the example with:

mvn clean compile
mvn camel:run

and you should have the following output:

2016-07-24 15:33:50,812 [cdi.Main.main()] INFO  Version                        - WELD-000900: 2.3.5 (Final)
Jul 24, 2016 3:33:50 PM org.apache.deltaspike.core.impl.config.EnvironmentPropertyConfigSourceProvider <init>
INFO: Custom config found by DeltaSpike. Name: 'META-INF/apache-deltaspike.properties', URL: 'file:/home/oscerd/workspace/apache-camel/camel/examples/camel-example-cdi-cassandraql/target/classes/META-INF/apache-deltaspike.properties'
Jul 24, 2016 3:33:50 PM org.apache.deltaspike.core.util.ProjectStageProducer initProjectStage
INFO: Computed the following DeltaSpike ProjectStage: Production
2016-07-24 15:33:51,064 [cdi.Main.main()] INFO  Bootstrap                      - WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2016-07-24 15:33:51,170 [cdi.Main.main()] INFO  Event                          - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.message.MessageBundleExtension.detectInterfaces(@Observes ProcessAnnotatedType) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,174 [cdi.Main.main()] INFO  Event                          - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.interceptor.GlobalInterceptorExtension.promoteInterceptors(@Observes ProcessAnnotatedType, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,189 [cdi.Main.main()] INFO  Event                          - WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,195 [cdi.Main.main()] INFO  Event                          - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.exclude.extension.ExcludeExtension.vetoBeans(@Observes ProcessAnnotatedType, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN  Validator                      - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.throttling.ThrottledInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN  Validator                      - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.lock.LockedInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN  Validator                      - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.future.FutureableInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:52,244 [cdi.Main.main()] INFO  CdiCamelExtension              - Camel CDI is starting Camel context [camel-example-cassandraql-cdi]
2016-07-24 15:33:52,245 [cdi.Main.main()] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-cassandraql-cdi) is starting
2016-07-24 15:33:52,246 [cdi.Main.main()] INFO  ManagedManagementStrategy      - JMX is enabled
2016-07-24 15:33:52,352 [cdi.Main.main()] INFO  DefaultTypeConverter           - Loaded 189 type converters
2016-07-24 15:33:52,367 [cdi.Main.main()] INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-07-24 15:33:52,465 [cdi.Main.main()] INFO  DefaultCamelContext            - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-07-24 15:33:52,547 [cdi.Main.main()] INFO  NettyUtil                      - Did not find Netty's native epoll transport in the classpath, defaulting to NIO.
2016-07-24 15:33:52,789 [cdi.Main.main()] INFO  DCAwareRoundRobinPolicy        - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-07-24 15:33:52,790 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.3:9042 added
2016-07-24 15:33:52,791 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.2:9042 added
2016-07-24 15:33:52,791 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.4:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO  DCAwareRoundRobinPolicy        - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.3:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.2:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO  Cluster                        - New Cassandra host /172.17.0.4:9042 added
2016-07-24 15:33:52,985 [cdi.Main.main()] INFO  DefaultCamelContext            - Route: route1 started and consuming from: timer://stream?repeatCount=1
2016-07-24 15:33:52,986 [cdi.Main.main()] INFO  DefaultCamelContext            - Total 1 routes, of which 1 are started.
2016-07-24 15:33:52,987 [cdi.Main.main()] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-cassandraql-cdi) started in 0.742 seconds
2016-07-24 15:33:53,018 [cdi.Main.main()] INFO  Bootstrap                      - WELD-ENV-002003: Weld SE container STATIC_INSTANCE initialized
2016-07-24 15:33:54,041 [ timer://stream] INFO  route1                         - Result from query [Row[1, oscerd]]

Running the query again you should see another entry in the users table:

cqlsh> use test;
cqlsh:test> select * from users;

 id | name
----+-----------
  1 |    oscerd
  2 | davsclaus

(2 rows)
cqlsh:test>

The route of the example is simple:

    @ContextName("camel-example-cassandraql-cdi")
    static class KubernetesRoute extends RouteBuilder {

        @Override
        public void configure() {
            from("timer:stream?repeatCount=1")
                .to("cql://{{cassandra-master-ip}},{{cassandra-node1-ip}},{{cassandra-node2-ip}}/test?cql={{cql-select-query}}&consistencyLevel=quorum")
                .log("Result from query ${body}")
                .process(exchange -> {
                    exchange.getIn().setBody(Arrays.asList("davsclaus"));
                })
                .to("cql://{{cassandra-master-ip}},{{cassandra-node1-ip}},{{cassandra-node2-ip}}/test?cql={{cql-insert-query}}&consistencyLevel=quorum");
        }
    }

As you may see we are using a Quorum Consistency Level for both the select and insert operations. You can find more informations about it in the Cassandra Consistency Level documentation

Conclusions

Using Docker to spin up a Cassandra Cluster can be useful during integration and performance testing. In this post we saw a little example of the camel-cassandraql features in a real Apache Cassandra cluster.

Before the camel-cassandraql Pull Request submission, I was working on a Camel-Cassandra component too. This component is a bit different from the cassandraql one and you can find more information in the Github Repository. It is aligned with the latest Apache Camel released version 2.17.2.

An introduction to Camel Kubernetes component

Since Camel 2.17.0 we have a Kubernetes component to interact with a Kubernetes cluster. The aim of this post is providing a first introduction to the component: how it works, what you can do with it and a little example of producer endpoint. The article is based on the Camel-Kubernetes 2.18-SNAPSHOT version.

Main Features

Camel-Kubernetes is based on the popular Kubernetes/Openshift client from the Fabric8 team. The component provides both Producer and Consumer endpoints and it has the following options (directly from the brand new automatic generated documentation of Apache Camel 2.18). To better understand the component you may need to take a look at Kubernetes site

Name Group Default Java Type Description
masterUrl common   String Required Kubernetes Master url
apiVersion common   String The Kubernetes API Version to use
category common   String Required Kubernetes Producer and Consumer category
dnsDomain common   String The dns domain used for ServiceCall EIP
kubernetesClient common   DefaultKubernetesClient Default KubernetesClient to use if provided
portName common   String The port name used for ServiceCall EIP
bridgeErrorHandler consumer false boolean Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
labelKey consumer   String The Consumer Label key when watching at some resources
labelValue consumer   String The Consumer Label value when watching at some resources
namespace consumer   String The namespace
poolSize consumer 1 int The Consumer pool size
resourceName consumer   String The Consumer Resource Name we would like to watch
exceptionHandler consumer (advanced)   ExceptionHandler To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
operation producer   String Producer operation to do on Kubernetes
exchangePattern advanced InOnly ExchangePattern Sets the default exchange pattern when creating an exchange
synchronous advanced false boolean Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
caCertData security   String The CA Cert Data
caCertFile security   String The CA Cert File
clientCertData security   String The Client Cert Data
clientCertFile security   String The Client Cert File
clientKeyAlgo security   String The Key Algorithm used by the client
clientKeyData security   String The Client Key data
clientKeyFile security   String The Client Key file
clientKeyPassphrase security   String The Client Key Passphrase
oauthToken security   String The Auth Token
password security   String Password to connect to Kubernetes
trustCerts security   Boolean Define if the certs we used are trusted anyway or not
username security   String Username to connect to Kubernetes

The headers used by the component are the following:

Name Type Description
CamelKubernetesOperation String The Producer operation
CamelKubernetesNamespaceName String The Namespace name
CamelKubernetesNamespaceLabels Map The Namespace Labels
CamelKubernetesServiceLabels Map The Service labels
CamelKubernetesServiceName String The Service name
CamelKubernetesServiceSpec io.fabric8.kubernetes.api.model.ServiceSpec The Spec for a Service
CamelKubernetesReplicationControllersLabels Map Replication controller labels
CamelKubernetesReplicationControllerName String Replication controller name
CamelKubernetesReplicationControllerSpec io.fabric8.kubernetes.api.model.ReplicationControllerSpec The Spec for a Replication Controller
CamelKubernetesReplicationControllerReplicas Integer The number of replicas for a Replication Controller during the Scale operation
CamelKubernetesPodsLabels Map Pod labels
CamelKubernetesPodName String Pod name
CamelKubernetesPodSpec io.fabric8.kubernetes.api.model.PodSpec The Spec for a Pod
CamelKubernetesPersistentVolumesLabels Map Persistent Volume labels
CamelKubernetesPersistentVolumesName String Persistent Volume name
CamelKubernetesPersistentVolumesClaimsLabels Map Persistent Volume Claim labels
CamelKubernetesPersistentVolumesClaimsName String Persistent Volume Claim name
CamelKubernetesPersistentVolumesClaimsSpec io.fabric8.kubernetes.api.model.PersistentVolumeClaimSpec The Spec for a Persistent Volume claim
CamelKubernetesSecretsLabels Map Secret labels
CamelKubernetesSecretsName String Secret name
CamelKubernetesSecret io.fabric8.kubernetes.api.model.Secret A Secret Object
CamelKubernetesResourcesQuotaLabels Map Resource Quota labels
CamelKubernetesResourcesQuotaName String Resource Quota name
CamelKubernetesResourceQuotaSpec io.fabric8.kubernetes.api.model.ResourceQuotaSpec The Spec for a Resource Quota
CamelKubernetesServiceAccountsLabels Map Service Account labels
CamelKubernetesServiceAccountName String Service Account name
CamelKubernetesServiceAccount io.fabric8.kubernetes.api.model.ServiceAccount A Service Account object
CamelKubernetesNodesLabels Map Node labels
CamelKubernetesNodeName String Node name
CamelKubernetesBuildsLabels Map Openshift Build labels
CamelKubernetesBuildName String Openshift Build name
CamelKubernetesBuildConfigsLabels Map Openshift Build Config labels
CamelKubernetesBuildConfigName String Openshift Build Config name
CamelKubernetesEventAction io.fabric8.kubernetes.client.Watcher.Action Action watched by the consumer
CamelKubernetesEventTimestamp String Timestamp of the action watched by the consumer
CamelKubernetesConfigMapName String ConfigMap name
CamelKubernetesConfigMapsLabels Map ConfigMap labels
CamelKubernetesConfigData Map ConfigMap Data

The whole component divides his features in a set of categories:

  • namespaces
  • services
  • replicationControllers
  • pods
  • persistentVolumes
  • persistentVolumesClaims
  • secrets
  • resourcesQuota
  • serviceAccounts
  • nodes
  • configMaps
  • builds
  • buildConfigs

The producer endpoints are based on a set of Operations:

  • Namespaces
    • listNamespaces
    • listNamespacesByLabels
    • getNamespace
    • createNamespace
    • deleteNamespace
  • Services
    • listServices
    • listServicesByLabels
    • getService
    • createService
    • deleteService
  • Replication Controllers
    • listReplicationControllers
    • listReplicationControllersByLabels
    • getReplicationController
    • createReplicationController
    • deleteReplicationController
    • scaleReplicationController
  • Pods
    • listPods
    • listPodsByLabels
    • getPod
    • createPod
    • deletePod
  • Persistent Volumes
    • listPersistentVolumes
    • listPersistentVolumesByLabels
    • getPersistentVolume
  • Persistent Volumes Claims
    • listPersistentVolumesClaims
    • listPersistentVolumesClaimsByLabels
    • getPersistentVolumeClaim
    • createPersistentVolumeClaim
    • deletePersistentVolumeClaim
  • Secrets
    • listSecrets
    • listSecretsByLabels
    • getSecret
    • createSecret
    • deleteSecret
  • Resources quota
    • listResourcesQuota
    • listResourcesQuotaByLabels
    • getResourceQuota
    • createResourceQuota
    • deleteResourceQuota
  • Service Accounts
    • listServiceAccounts
    • listServiceAccountsByLabels
    • getServiceAccount
    • createServiceAccount
    • deleteServiceAccount
  • Nodes
    • listNodes
    • listNodesByLabels
    • getNode
  • Config Maps
    • listConfigMaps
    • listConfigMapsByLabels
    • getConfigMap
    • createConfigMap
    • deleteConfigMap
  • Builds
    • listBuilds
    • listBuildsByLabels
    • getBuild
  • Build Configs
    • listBuildConfigs
    • listBuildConfigsByLabels
    • getBuildConfig

The documentation part is usually so boring, so let’s see how you can declare a producer or a consumer to interact with a Kubernetes cluster:

    from("direct:list")
        .to("kubernetes://https://localhost:8443?oauthToken=xxxxxxxx&category=pods&operation=listPods")
        .to("mock:result")

In this Producer snippet you will ask for the list of Pods in any namespace of your Kubernetes cluster. The operation will return a list of Pod (io.fabric8.kubernetes.api.model.Pod).

    from("kubernetes://https://localhost:8443?oauthToken=xxxxxxxx&category=pods&namespace=default&labelKey=kind&labelValue=http")
        .to("mock:result")

In this Consumer snippet you will consume events from the Kubernetes cluster related to Pods, in the default namespace, labeled with key kind and value http. The exchange will contain the Pod object (in the body) with all the related metadata, a timestamp (in the CamelKubernetesEventTimestamp header) and an action (in the CamelKubernetesEventAction header).

A Little example

In the last days I worked on a little example that lists pods from a Kubernetes Cluster. I added the example to the Apache Camel examples folder. The Camel CDI Kubernetes example assumes you have a Kubernetes Cluster running in your environment. Personally, for testing I prefer to use the Vagrant Openshift Image from the Fabric8 team: to run the image and getting started you can follow the guide from Fabric8 documentation.

After the vagrant up command has completed remember to run the following command:

fabric8-installer/vagrant/openshift$ export KUBERNETES_DOMAIN=vagrant.f8
fabric8-installer/vagrant/openshift$ export DOCKER_HOST=tcp://vagrant.f8:2375

and login in Openshift

fabric8-installer/vagrant/openshift$ oc login https://172.28.128.4:8443

you will be asked from user/password (admin/admin). After the login will be successful you will be able to get your OAuth token (this will be useful for the Camel-Kubernetes example):

fabric8-installer/vagrant/openshift$ oc whoami --token
XXxnZi2YghQyke-eARXBZ38K-p5zIpco0ShdzO8gK6U

Now we have all the elements to run our example. Edit the apache-deltaspike.properties with the correct OAuth Token from Openshift.

oscerd@localhost:~/workspace/apache-camel/camel/examples/camel-example-cdi-kubernetes$ mvn compile camel:run

The route is triggered with a timer and it has a repeatCount option equals to 3. The output of the examples will be:

2016-07-12 12:00:42,220 [cdi.Main.main()] INFO  CdiCamelExtension              - Camel CDI is starting Camel context [camel-example-kubernetes-cdi]
2016-07-12 12:00:42,220 [cdi.Main.main()] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) is starting
2016-07-12 12:00:42,222 [cdi.Main.main()] INFO  ManagedManagementStrategy      - JMX is enabled
2016-07-12 12:00:42,355 [cdi.Main.main()] INFO  DefaultTypeConverter           - Loaded 188 type converters
2016-07-12 12:00:42,376 [cdi.Main.main()] INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-07-12 12:00:42,468 [cdi.Main.main()] INFO  DefaultCamelContext            - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-07-12 12:00:42,828 [cdi.Main.main()] INFO  DefaultCamelContext            - Route: route1 started and consuming from: timer://stream?repeatCount=3
2016-07-12 12:00:42,831 [cdi.Main.main()] INFO  DefaultCamelContext            - Total 1 routes, of which 1 are started.
2016-07-12 12:00:42,832 [cdi.Main.main()] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) started in 0.611 seconds
2016-07-12 12:00:42,878 [cdi.Main.main()] INFO  Bootstrap                      - WELD-ENV-002003: Weld SE container STATIC_INSTANCE initialized
We currently have 13 pods
Pod name docker-registry-1-c6ie5 with status Running
Pod name fabric8-docker-registry-pgo5y with status Running
Pod name fabric8-forge-wvyw7 with status Running
Pod name fabric8-tr0b9 with status Running
Pod name gogs-2p4mn with status Running
Pod name grafana-754y7 with status Running
Pod name infinispan-client-a7z3k with status Running
Pod name infinispan-client-iubag with status Running
Pod name infinispan-server-wl0in with status Running
Pod name jenkins-cr2ez with status Running
Pod name nexus-aarks with status Running
Pod name prometheus-mp0kr with status Running
Pod name router-1-dkjsb with status Running
We currently have 13 pods
Pod name docker-registry-1-c6ie5 with status Running
Pod name fabric8-docker-registry-pgo5y with status Running
Pod name fabric8-forge-wvyw7 with status Running
Pod name fabric8-tr0b9 with status Running
Pod name gogs-2p4mn with status Running
Pod name grafana-754y7 with status Running
Pod name infinispan-client-a7z3k with status Running
Pod name infinispan-client-iubag with status Running
Pod name infinispan-server-wl0in with status Running
Pod name jenkins-cr2ez with status Running
Pod name nexus-aarks with status Running
Pod name prometheus-mp0kr with status Running
Pod name router-1-dkjsb with status Running
We currently have 13 pods
Pod name docker-registry-1-c6ie5 with status Running
Pod name fabric8-docker-registry-pgo5y with status Running
Pod name fabric8-forge-wvyw7 with status Running
Pod name fabric8-tr0b9 with status Running
Pod name gogs-2p4mn with status Running
Pod name grafana-754y7 with status Running
Pod name infinispan-client-a7z3k with status Running
Pod name infinispan-client-iubag with status Running
Pod name infinispan-server-wl0in with status Running
Pod name jenkins-cr2ez with status Running
Pod name nexus-aarks with status Running
Pod name prometheus-mp0kr with status Running
Pod name router-1-dkjsb with status Running
^C2016-07-12 12:00:50,946 [Thread-1       ] INFO  MainSupport$HangupInterceptor  - Received hang up - stopping the main instance.
2016-07-12 12:00:50,950 [Thread-1       ] INFO  CamelContextProducer           - Camel CDI is stopping Camel context [camel-example-kubernetes-cdi]
2016-07-12 12:00:50,950 [Thread-1       ] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) is shutting down
2016-07-12 12:00:50,951 [Thread-1       ] INFO  DefaultShutdownStrategy        - Starting to graceful shutdown 1 routes (timeout 300 seconds)
2016-07-12 12:00:50,953 [ - ShutdownTask] INFO  DefaultShutdownStrategy        - Route: route1 shutdown complete, was consuming from: timer://stream?repeatCount=3
2016-07-12 12:00:50,953 [Thread-1       ] INFO  DefaultShutdownStrategy        - Graceful shutdown of 1 routes completed in 0 seconds
2016-07-12 12:00:50,965 [Thread-1       ] INFO  MainLifecycleStrategy          - CamelContext: camel-example-kubernetes-cdi has been shutdown, triggering shutdown of the JVM.
2016-07-12 12:00:50,968 [Thread-1       ] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) uptime 8.748 seconds
2016-07-12 12:00:50,968 [Thread-1       ] INFO  DefaultCamelContext            - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) is shutdown in 0.018 seconds
2016-07-12 12:00:50,977 [Thread-1       ] INFO  Bootstrap                      - WELD-ENV-002001: Weld SE container STATIC_INSTANCE shut down

As you can see the returned pods are 13 in my environment and all of them are in Running status.

Conclusions

This post was just a little introduction on what Camel-Kubernetes can do. I think the most interesting part is the consumer side anyway. I will focus on the different types of consumer in one of next Camel related post. Meanwhile please try the component yourself and if you find something wrong, you think something can be improved or you think there is something else we can focus on, don’t hesitate and post on Camel Dev mailing list or on the Camel Users mailing list, or, if you are sure there is a bug or you think an improvement is truly needed, raise a JIRA on Apache Camel JIRA. You can find examples of different operations you can do with Producer Endpoints and Consumer Endpoints in the Component unit test.

Stay tuned for the next Camel-Kubernetes consumers post.

Contributing new components to Apache Camel project

Writing a new Camel component is simple, but sometimes it’s not so easy for a beginner to understand how it can be integrated in current Camel codebase.

With this post I’d like to show you how you can do this step-by-step.

A simple component

In this guide I will add a simple component: camel-square. This is just a simple example, useful to show the steps needed for integrating and contributing your component to Apache Camel codebase.

Using the Archetype

The latest Camel release is 2.17.2 at the moment of writing, so we will use the archetype from this version.

Be sure you’ve cloned the Apache Camel repository from github and the codebase is aligned with the upstream codebase.

Open a terminal and enter into the components folder

~/workspace/apache-camel/camel/components$ 

Generate the skeleton for the camel-square component

~/workspace/apache-camel/camel/components$ mvn archetype:generate -DarchetypeGroupId=org.apache.camel.archetypes -DarchetypeArtifactId=camel-archetype-component -DarchetypeVersion=2.17.2  -DgroupId=org.apache.camel -DartifactId=camel-square -Dname=Square -Dscheme=square

You will be asked for the version of the component and other things in interactive way. Since the camel version currently in development is 2.18, use 2.18-SNAPSHOT as version of your component.

Cleaning up the generated component

First thing to do is cleaning the pom of the newly generated component a bit. So:

  • Change the packaging from bundle to jar.
  • Remove all the version tags from dependencies
  • In the properties section of POM remove everything and add the following properties:
   <properties>
      <camel.osgi.export.pkg>org.apache.camel.component.square.*</camel.osgi.export.pkg>
      <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=square</camel.osgi.export.service>
   </properties>
  • Add the tag
  <name>Camel :: Square</name>
  • Since the GroupId is inherited from the parent POM of components folder, you can remove the groupId tag from your component POM.
  • Remove camel-apt from the dependencies

Finally your POM should be like this:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <parent>
    <artifactId>components</artifactId>
    <groupId>org.apache.camel</groupId>
    <version>2.18-SNAPSHOT</version>
  </parent>

  <artifactId>camel-square</artifactId>
  <packaging>jar</packaging>
  <name>Camel :: Square</name>

   <properties>
      <camel.osgi.export.pkg>org.apache.camel.component.square.*</camel.osgi.export.pkg>
      <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=square</camel.osgi.export.service>
   </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-core</artifactId>
    </dependency>

    <!-- logging -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <scope>test</scope>
    </dependency>

    <!-- testing -->
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

</project>

You now need to create the right packaging. So move the generated classes from src/main/java/org/apache/camel to src/main/java/org/apache/camel/component/square and do the same for the test package. Don’t forget to align also the file src/main/resources/META-INF/services/org/apache/camel/component/square to point to the component class class=org.apache.camel.component.square.SquareComponent

We are now ready to write our component.

Writing the component

This is a simple example: the component won’t have a lot of features and we will use only a Producer Endpoint. You can remove the class SquareConsumer then.

Your SquareComponent may look like this:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.square;

import java.util.Map;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;

import org.apache.camel.impl.UriEndpointComponent;

/**
 * Represents the component that manages {@link SquareEndpoint}.
 */
public class SquareComponent extends UriEndpointComponent {
    
    public SquareComponent() {
        super(SquareEndpoint.class);
    }

    public SquareComponent(CamelContext context) {
        super(context, SquareEndpoint.class);
    }

    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
        Endpoint endpoint = new SquareEndpoint(uri, this);
        setProperties(endpoint, parameters);
        return endpoint;
    }
}

and your SquareEndpoint class like this:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.square;

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriPath;

/**
 * Represents a Square endpoint.
 */
@UriEndpoint(scheme = "square", title = "Square", syntax="square:name", label = "Square")
public class SquareEndpoint extends DefaultEndpoint {
    @UriPath @Metadata(required = "true")
    private String name;

    public SquareEndpoint() {
    }

    public SquareEndpoint(String uri, SquareComponent component) {
        super(uri, component);
    }

    public Producer createProducer() throws Exception {
        return new SquareProducer(this);
    }

    public Consumer createConsumer(Processor processor) throws Exception {
    	throw new UnsupportedOperationException("The Square endpoint doesn't support consumers.");
    }

    public boolean isSingleton() {
        return true;
    }

    /**
     * Some description of this option, and what it does
     */
    public void setName(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

}

Since the component won’t support Consumer Endpoints, we throw an UnsupportedOperationException in that particular case. Let’s write our SquareProducer. The Producer Endpoint will get the body of the message and it will calculate the square of the body. At the beginning we will have a producer of this form:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.square;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The Square producer.
 */
public class SquareProducer extends DefaultProducer {
    private static final Logger LOG = LoggerFactory.getLogger(SquareProducer.class);
    private SquareEndpoint endpoint;

    public SquareProducer(SquareEndpoint endpoint) {
        super(endpoint);
        this.endpoint = endpoint;
    }

    public void process(Exchange exchange) throws Exception {
        System.out.println(exchange.getIn().getBody());    
    }

}

The Math.pow() method is slow. For this example we will simply multiply the number by itself. Once again: this is just an example to show how you can contribute your component to the Apache Camel community, so there won’t be validation on the Body type and so on. We may think about a Producer like this:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.square;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The Square producer.
 */
public class SquareProducer extends DefaultProducer {
    private static final Logger LOG = LoggerFactory.getLogger(SquareProducer.class);
    private SquareEndpoint endpoint;

    public SquareProducer(SquareEndpoint endpoint) {
        super(endpoint);
        this.endpoint = endpoint;
    }

    public void process(Exchange exchange) throws Exception {
    	LOG.debug("Getting value from exchange");
    	Integer value = exchange.getIn().getBody(Integer.class);
    	LOG.debug("Computing square");
    	Integer square = value * value;
    	LOG.info("The square is " + square);
        if (exchange.getPattern().isOutCapable()) {
            Message out = exchange.getOut();
            out.copyFrom(exchange.getIn());
            out.setBody(square);
        } else {
            Message in = exchange.getIn();
            in.setBody(square);
        }
        
    }

}

It’s very simple. It just compute the square and, based on the Exchange Pattern, put the computation result in the in/out message. Let’s take a look at the testing part. You should have a test class SquareComponentTest: let’s modify it a bit. The result can be:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.square;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

public class SquareComponentTest extends CamelTestSupport {

    @Test
    public void testSquare() throws Exception {
        MockEndpoint mock = getMockEndpoint("mock:result");
        mock.expectedMinimumMessageCount(1);
        mock.expectedBodiesReceived(9);
        
        template.sendBody("direct:square", 3);
        
        assertMockEndpointsSatisfied();
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            public void configure() {
                from("direct:square")
                  .to("square://bar")
                  .to("mock:result");
            }
        };
    }
}

Now the component is ready for the last little things to do.

Try to install it:

~/workspace/apache-camel/camel/components/camel-square$ mvn clean install

Check for code-style errors and eventually fix it

~/workspace/apache-camel/camel/components/camel-square$ mvn -Psourcecheck

Integrating the component in the Apache Camel codebase

The component is now ready to be integrated in the current Camel codebase.

We need to add a dependency in apache-camel/pom.xml

     <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-square</artifactId>
     </dependency>

Include the component in apache-camel/src/main/descriptors/common-bin.xml

     <include>org.apache.camel:camel-square</include>

Include the component in parent/pom.xml

       <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-square</artifactId>
          <version>${project.version}</version>
       </dependency>

Now you’re able to build your new Camel 2.18-SNAPSHOT with camel-square included:

~/workspace/apache-camel/camel/$ mvn clean install -DskipTests

At the end of the build you should see the camel-square component listed.

Component documentation

From Camel 2.18-SNAPSHOT we will have documentation generated from code inside our codebase.

The default directory inside the new component folder is src/main/docs and the documentation file in is an .adoc file with the same name of the component, in this case it will be square.adoc

Create the src/main/docs/square.adoc file with content and the following placeholders

// component options: START
// component options: END

// endpoint options: START
// endpoint options: END

The placeholders are just for the automatic documentation generation.

Run a clean install on the component once again

~/workspace/apache-camel/camel/components/camel-square$ mvn clean install 

Your .adoc file will contain the documentation updated for component and endpoint.

Now add a link to your component in the file docs/user-manual/en/SUMMARY.md. This way the component will be added to the Gitbook generated from the Asciidoc component files.

Integrating with Apache Karaf

Apache Karaf is an important ally for Apache Camel. Usually a Camel component should work also in an OSGi environment. For each component in Camel there is a Karaf feature definition. Let’s add the one for camel-square. In platform/karaf/features/src/main/resources/features.xml add the following code

  <feature name='camel-square' version='${project.version}' resolver='(obr)' start-level='50'>
    <feature version='${project.version}'>camel-core</feature>
    <bundle>mvn:org.apache.camel/camel-square/${project.version}</bundle>
  </feature>

Since the component is very simple we don’t need external bundle in this case. To test if our feature work in an OSGi enviroment we have to add an integration test. Let’s create it in tests/camel-itest-karaf with the name CamelSquareTest. The content of the test will be:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.itest.karaf;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;

@RunWith(PaxExam.class)
public class CamelSquareTest extends BaseKarafTest {

    public static final String COMPONENT = extractName(CamelSquareTest.class);

    @Test
    public void test() throws Exception {
        testComponent(COMPONENT);
    }
}

To run the test we have to follow two steps. First, we need to build our Karaf features

~/workspace/apache-camel/camel/platform/karaf/features$ mvn clean install 

and second run the integration test

~/workspace/apache-camel/camel/tests/camel-itest-karaf$ mvn clean test -Dtest=CamelSquareTest

or

~/workspace/apache-camel/camel/tests/camel-itest-karaf$ run-tests CamelSquareTest

If everything is fine and test passes the component will install in Karaf.

Contributions

The camel-square component is now part of Camel. What is still missing? The Pull Request off course :-)

This surely is the most satisfactory part :-)

You should have everything committed locally and maybe you need to align to the current Apache Camel codebase.

~/workspace/apache-camel/camel$ git pull --rebase <remote_name> master

If you have multiple commit for your component, squash them in a single one it’s a good idea.

At this point you just need to open the Pull Request and the Apache Camel team will review it for you. You’ll receive feedback about improvements you can do and, off course, thanks from the community :-)

Conclusions

In this post I’ve shown how a Camel component can be added to the Apache Camel codebase step-by-step. I think writing components is one of the best things to do to understand Camel architecture and features and to improve your Camel skill. Camel community love contributions, you just need to start :-)

Introducing Camel-Nats

In the latest version of Apache Camel (2.17.0) we released camel-nats. NATS, is a cloud-native messaging system from Apcera. The first version of the component was based on java_nats, this library has been deprecated from a while and we decide to switch to the brand new client JNats in the next major release (Camel 2.18.0)

The component

Camel-Nats provides both producing/consuming endpoints. These are the options you can define:

  • servers - a list of comma separated gnatsd servers
  • topic - The topic name you want to use
  • reconnect - Whether or not using reconnection feature (default true)
  • pedantic - Whether or not running in pedantic mode (default false)
  • verbose - Whether or not running in verbose mode (default false)
  • ssl - Whether or not using SSL (default false)
  • reconnectTimeWait - Waiting time before attempts reconnection (in milliseconds, default 3000)
  • maxReconnectAttempts - Max reconnection attempts (default 3)
  • pingInterval - Ping interval to be aware if connection is still alive (in milliseconds, default 4000)
  • noRandomizeServers - Whether or not randomizing the order of servers for the connection attempts (default false)
  • queueName - The Queue name if we are using nats for a queue configuration (consumer only)
  • maxMessages - Stop receiving messages from a topic we are subscribing to after maxMessages (default unlimited, consumer only)
  • poolSize - Consumer pool size (default 10, consumer only)

For more information about Nats configuration take a look at the docs

Examples

From the producer perspective:

from("direct:send").to("nats://localhost:4222?topic=test");

while from the consumer perspective you can have:

from("nats://localhost:4222?topic=test&maxMessages=5").to("mock:result")

In this case you will consume messages from the topic test until you received 5 messages.

The message from the consumer will have two headers:

  • CamelNatsMessageTimestamp, the timestamp of the consumed message
  • CamelNatsSubscriptionId, the Subscription Id of the consumer