25 Jul 2016
From Camel-2.15.0 we have a component for the integration with Apache Cassandra. With this post I’d like to show you what you can do with this component on a real Cassandra cluster. We will use the docker images from my personal account on Docker Hub. As first step we need to setup our Cluster.
Spinning up an Apache Cassandra cluster with Docker
For this post we will use Docker to spin up our Apache Cassandra Cluster. When everything will be up and running we will run a simple example to show how the Camel-cassandraql component can be used to interact with the cluster.
As first step we will need to run a single node cluster:
docker run --name master_node -dt oscerd/cassandra
This command will run a single container with an Apache Cassandra instance (the Cassandra version is the one of latest tick-tock releases, the 3.6). We could use a single-node cluster but in this way we won’t exploit the Cassandra features. So let’s add two other nodes!
docker run --name node1 -d -e SEED="$(docker inspect --format='' master_node)" oscerd/cassandra
docker run --name node2 -d -e SEED="$(docker inspect --format='' master_node)" oscerd/cassandra
Now we have two other nodes in our Cluster. The environment variable SEED will be used to make the node aware of, at least, one of the nodes in the Cluster. In the cassandra.yaml
file there will be a seeds
entry to track this information. More informations can be found in the Cassandra documentation.
To be sure everything is up and running we need to use the Nodetool utility, by running the following command:
docker exec -ti master_node /opt/cassandra/bin/nodetool status
and we should get the Cluster status as output:
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.3 102.67 KiB 256 65.9% 1a985c48-33a1-44aa-b7e9-f1a3620a6482 rack1
UN 172.17.0.2 107.64 KiB 256 68.2% da54ce5e-6433-4ea0-b2c3-fbc6c63ea955 rack1
UN 172.17.0.4 15.42 KiB 256 65.8% 0f2ba25a-37b0-4f27-a10a-d9a44655396a rack1
To run the example we need to create a keyspace and a table. Locally I have downloaded my Apache Cassandra package with version 3.7. Run the cqlsh
command:
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='' master_node)
You should see the Cqlsh prompt
Connected to Test Cluster at 172.17.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.6 | CQL spec 3.4.2 | Native protocol v4]
Use HELP for help.
cqlsh>
Let’s create a namespace test
with a table users
create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;
and check everything is fine by run a simple query:
cqlsh> use test;
cqlsh:test> select * from users;
id | name
----+--------
1 | oscerd
(1 rows)
cqlsh:test>
You can do the same on the other two nodes, to check you cluster is working as you expect.
Following this steps we now have a three nodes cluster up and running. Let’s use a simple camel-cassandraql example.
Run the example from Apache Camel
In Apache Camel I added a little example showing how to query (a select and an insert operations) an Apache Cassandra cluster with the related component, you can find the example here Camel-example-cdi-cassandraql
You can run the example with:
mvn clean compile
mvn camel:run
and you should have the following output:
2016-07-24 15:33:50,812 [cdi.Main.main()] INFO Version - WELD-000900: 2.3.5 (Final)
Jul 24, 2016 3:33:50 PM org.apache.deltaspike.core.impl.config.EnvironmentPropertyConfigSourceProvider <init>
INFO: Custom config found by DeltaSpike. Name: 'META-INF/apache-deltaspike.properties', URL: 'file:/home/oscerd/workspace/apache-camel/camel/examples/camel-example-cdi-cassandraql/target/classes/META-INF/apache-deltaspike.properties'
Jul 24, 2016 3:33:50 PM org.apache.deltaspike.core.util.ProjectStageProducer initProjectStage
INFO: Computed the following DeltaSpike ProjectStage: Production
2016-07-24 15:33:51,064 [cdi.Main.main()] INFO Bootstrap - WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2016-07-24 15:33:51,170 [cdi.Main.main()] INFO Event - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.message.MessageBundleExtension.detectInterfaces(@Observes ProcessAnnotatedType) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,174 [cdi.Main.main()] INFO Event - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.interceptor.GlobalInterceptorExtension.promoteInterceptors(@Observes ProcessAnnotatedType, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,189 [cdi.Main.main()] INFO Event - WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,195 [cdi.Main.main()] INFO Event - WELD-000411: Observer method [BackedAnnotatedMethod] protected org.apache.deltaspike.core.impl.exclude.extension.ExcludeExtension.vetoBeans(@Observes ProcessAnnotatedType, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN Validator - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.throttling.ThrottledInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN Validator - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.lock.LockedInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:51,491 [cdi.Main.main()] WARN Validator - WELD-001478: Interceptor class org.apache.deltaspike.core.impl.future.FutureableInterceptor is enabled for the application and for the bean archive /home/oscerd/.m2/repository/org/apache/deltaspike/core/deltaspike-core-impl/1.7.1/deltaspike-core-impl-1.7.1.jar. It will only be invoked in the @Priority part of the chain.
2016-07-24 15:33:52,244 [cdi.Main.main()] INFO CdiCamelExtension - Camel CDI is starting Camel context [camel-example-cassandraql-cdi]
2016-07-24 15:33:52,245 [cdi.Main.main()] INFO DefaultCamelContext - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-cassandraql-cdi) is starting
2016-07-24 15:33:52,246 [cdi.Main.main()] INFO ManagedManagementStrategy - JMX is enabled
2016-07-24 15:33:52,352 [cdi.Main.main()] INFO DefaultTypeConverter - Loaded 189 type converters
2016-07-24 15:33:52,367 [cdi.Main.main()] INFO DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-07-24 15:33:52,465 [cdi.Main.main()] INFO DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-07-24 15:33:52,547 [cdi.Main.main()] INFO NettyUtil - Did not find Netty's native epoll transport in the classpath, defaulting to NIO.
2016-07-24 15:33:52,789 [cdi.Main.main()] INFO DCAwareRoundRobinPolicy - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-07-24 15:33:52,790 [cdi.Main.main()] INFO Cluster - New Cassandra host /172.17.0.3:9042 added
2016-07-24 15:33:52,791 [cdi.Main.main()] INFO Cluster - New Cassandra host /172.17.0.2:9042 added
2016-07-24 15:33:52,791 [cdi.Main.main()] INFO Cluster - New Cassandra host /172.17.0.4:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO DCAwareRoundRobinPolicy - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO Cluster - New Cassandra host /172.17.0.3:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO Cluster - New Cassandra host /172.17.0.2:9042 added
2016-07-24 15:33:52,914 [cdi.Main.main()] INFO Cluster - New Cassandra host /172.17.0.4:9042 added
2016-07-24 15:33:52,985 [cdi.Main.main()] INFO DefaultCamelContext - Route: route1 started and consuming from: timer://stream?repeatCount=1
2016-07-24 15:33:52,986 [cdi.Main.main()] INFO DefaultCamelContext - Total 1 routes, of which 1 are started.
2016-07-24 15:33:52,987 [cdi.Main.main()] INFO DefaultCamelContext - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-cassandraql-cdi) started in 0.742 seconds
2016-07-24 15:33:53,018 [cdi.Main.main()] INFO Bootstrap - WELD-ENV-002003: Weld SE container STATIC_INSTANCE initialized
2016-07-24 15:33:54,041 [ timer://stream] INFO route1 - Result from query [Row[1, oscerd]]
Running the query again you should see another entry in the users
table:
cqlsh> use test;
cqlsh:test> select * from users;
id | name
----+-----------
1 | oscerd
2 | davsclaus
(2 rows)
cqlsh:test>
The route of the example is simple:
@ContextName("camel-example-cassandraql-cdi")
static class KubernetesRoute extends RouteBuilder {
@Override
public void configure() {
from("timer:stream?repeatCount=1")
.to("cql://{{cassandra-master-ip}},{{cassandra-node1-ip}},{{cassandra-node2-ip}}/test?cql={{cql-select-query}}&consistencyLevel=quorum")
.log("Result from query ${body}")
.process(exchange -> {
exchange.getIn().setBody(Arrays.asList("davsclaus"));
})
.to("cql://{{cassandra-master-ip}},{{cassandra-node1-ip}},{{cassandra-node2-ip}}/test?cql={{cql-insert-query}}&consistencyLevel=quorum");
}
}
As you may see we are using a Quorum Consistency Level for both the select and insert operations. You can find more informations about it in the Cassandra Consistency Level documentation
Conclusions
Using Docker to spin up a Cassandra Cluster can be useful during integration and performance testing. In this post we saw a little example of the camel-cassandraql features in a real Apache Cassandra cluster.
Before the camel-cassandraql Pull Request submission, I was working on a Camel-Cassandra component too. This component is a bit different from the cassandraql one and you can find more information in the Github Repository. It is aligned with the latest Apache Camel released version 2.17.2.
11 Jul 2016
Since Camel 2.17.0 we have a Kubernetes component to interact with a Kubernetes cluster. The aim of this post is providing a first introduction to the component: how it works, what you can do with it and a little example of producer endpoint. The article is based on the Camel-Kubernetes 2.18-SNAPSHOT version.
Main Features
Camel-Kubernetes is based on the popular Kubernetes/Openshift client from the Fabric8 team. The component provides both Producer and Consumer endpoints and it has the following options (directly from the brand new automatic generated documentation of Apache Camel 2.18). To better understand the component you may need to take a look at Kubernetes site
Name |
Group |
Default |
Java Type |
Description |
masterUrl |
common |
|
String |
Required Kubernetes Master url |
apiVersion |
common |
|
String |
The Kubernetes API Version to use |
category |
common |
|
String |
Required Kubernetes Producer and Consumer category |
dnsDomain |
common |
|
String |
The dns domain used for ServiceCall EIP |
kubernetesClient |
common |
|
DefaultKubernetesClient |
Default KubernetesClient to use if provided |
portName |
common |
|
String |
The port name used for ServiceCall EIP |
bridgeErrorHandler |
consumer |
false |
boolean |
Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored. |
labelKey |
consumer |
|
String |
The Consumer Label key when watching at some resources |
labelValue |
consumer |
|
String |
The Consumer Label value when watching at some resources |
namespace |
consumer |
|
String |
The namespace |
poolSize |
consumer |
1 |
int |
The Consumer pool size |
resourceName |
consumer |
|
String |
The Consumer Resource Name we would like to watch |
exceptionHandler |
consumer (advanced) |
|
ExceptionHandler |
To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored. |
operation |
producer |
|
String |
Producer operation to do on Kubernetes |
exchangePattern |
advanced |
InOnly |
ExchangePattern |
Sets the default exchange pattern when creating an exchange |
synchronous |
advanced |
false |
boolean |
Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). |
caCertData |
security |
|
String |
The CA Cert Data |
caCertFile |
security |
|
String |
The CA Cert File |
clientCertData |
security |
|
String |
The Client Cert Data |
clientCertFile |
security |
|
String |
The Client Cert File |
clientKeyAlgo |
security |
|
String |
The Key Algorithm used by the client |
clientKeyData |
security |
|
String |
The Client Key data |
clientKeyFile |
security |
|
String |
The Client Key file |
clientKeyPassphrase |
security |
|
String |
The Client Key Passphrase |
oauthToken |
security |
|
String |
The Auth Token |
password |
security |
|
String |
Password to connect to Kubernetes |
trustCerts |
security |
|
Boolean |
Define if the certs we used are trusted anyway or not |
username |
security |
|
String |
Username to connect to Kubernetes |
The headers used by the component are the following:
Name |
Type |
Description |
CamelKubernetesOperation |
String |
The Producer operation |
CamelKubernetesNamespaceName |
String |
The Namespace name |
CamelKubernetesNamespaceLabels |
Map |
The Namespace Labels |
CamelKubernetesServiceLabels |
Map |
The Service labels |
CamelKubernetesServiceName |
String |
The Service name |
CamelKubernetesServiceSpec |
io.fabric8.kubernetes.api.model.ServiceSpec |
The Spec for a Service |
CamelKubernetesReplicationControllersLabels |
Map |
Replication controller labels |
CamelKubernetesReplicationControllerName |
String |
Replication controller name |
CamelKubernetesReplicationControllerSpec |
io.fabric8.kubernetes.api.model.ReplicationControllerSpec |
The Spec for a Replication Controller |
CamelKubernetesReplicationControllerReplicas |
Integer |
The number of replicas for a Replication Controller during the Scale operation |
CamelKubernetesPodsLabels |
Map |
Pod labels |
CamelKubernetesPodName |
String |
Pod name |
CamelKubernetesPodSpec |
io.fabric8.kubernetes.api.model.PodSpec |
The Spec for a Pod |
CamelKubernetesPersistentVolumesLabels |
Map |
Persistent Volume labels |
CamelKubernetesPersistentVolumesName |
String |
Persistent Volume name |
CamelKubernetesPersistentVolumesClaimsLabels |
Map |
Persistent Volume Claim labels |
CamelKubernetesPersistentVolumesClaimsName |
String |
Persistent Volume Claim name |
CamelKubernetesPersistentVolumesClaimsSpec |
io.fabric8.kubernetes.api.model.PersistentVolumeClaimSpec |
The Spec for a Persistent Volume claim |
CamelKubernetesSecretsLabels |
Map |
Secret labels |
CamelKubernetesSecretsName |
String |
Secret name |
CamelKubernetesSecret |
io.fabric8.kubernetes.api.model.Secret |
A Secret Object |
CamelKubernetesResourcesQuotaLabels |
Map |
Resource Quota labels |
CamelKubernetesResourcesQuotaName |
String |
Resource Quota name |
CamelKubernetesResourceQuotaSpec |
io.fabric8.kubernetes.api.model.ResourceQuotaSpec |
The Spec for a Resource Quota |
CamelKubernetesServiceAccountsLabels |
Map |
Service Account labels |
CamelKubernetesServiceAccountName |
String |
Service Account name |
CamelKubernetesServiceAccount |
io.fabric8.kubernetes.api.model.ServiceAccount |
A Service Account object |
CamelKubernetesNodesLabels |
Map |
Node labels |
CamelKubernetesNodeName |
String |
Node name |
CamelKubernetesBuildsLabels |
Map |
Openshift Build labels |
CamelKubernetesBuildName |
String |
Openshift Build name |
CamelKubernetesBuildConfigsLabels |
Map |
Openshift Build Config labels |
CamelKubernetesBuildConfigName |
String |
Openshift Build Config name |
CamelKubernetesEventAction |
io.fabric8.kubernetes.client.Watcher.Action |
Action watched by the consumer |
CamelKubernetesEventTimestamp |
String |
Timestamp of the action watched by the consumer |
CamelKubernetesConfigMapName |
String |
ConfigMap name |
CamelKubernetesConfigMapsLabels |
Map |
ConfigMap labels |
CamelKubernetesConfigData |
Map |
ConfigMap Data |
The whole component divides his features in a set of categories:
- namespaces
- services
- replicationControllers
- pods
- persistentVolumes
- persistentVolumesClaims
- secrets
- resourcesQuota
- serviceAccounts
- nodes
- configMaps
- builds
- buildConfigs
The producer endpoints are based on a set of Operations:
- Namespaces
- listNamespaces
- listNamespacesByLabels
- getNamespace
- createNamespace
- deleteNamespace
- Services
- listServices
- listServicesByLabels
- getService
- createService
- deleteService
- Replication Controllers
- listReplicationControllers
- listReplicationControllersByLabels
- getReplicationController
- createReplicationController
- deleteReplicationController
- scaleReplicationController
- Pods
- listPods
- listPodsByLabels
- getPod
- createPod
- deletePod
- Persistent Volumes
- listPersistentVolumes
- listPersistentVolumesByLabels
- getPersistentVolume
- Persistent Volumes Claims
- listPersistentVolumesClaims
- listPersistentVolumesClaimsByLabels
- getPersistentVolumeClaim
- createPersistentVolumeClaim
- deletePersistentVolumeClaim
- Secrets
- listSecrets
- listSecretsByLabels
- getSecret
- createSecret
- deleteSecret
- Resources quota
- listResourcesQuota
- listResourcesQuotaByLabels
- getResourceQuota
- createResourceQuota
- deleteResourceQuota
- Service Accounts
- listServiceAccounts
- listServiceAccountsByLabels
- getServiceAccount
- createServiceAccount
- deleteServiceAccount
- Nodes
- listNodes
- listNodesByLabels
- getNode
- Config Maps
- listConfigMaps
- listConfigMapsByLabels
- getConfigMap
- createConfigMap
- deleteConfigMap
- Builds
- listBuilds
- listBuildsByLabels
- getBuild
- Build Configs
- listBuildConfigs
- listBuildConfigsByLabels
- getBuildConfig
The documentation part is usually so boring, so let’s see how you can declare a producer or a consumer to interact with a Kubernetes cluster:
from("direct:list")
.to("kubernetes://https://localhost:8443?oauthToken=xxxxxxxx&category=pods&operation=listPods")
.to("mock:result")
In this Producer snippet you will ask for the list of Pods in any namespace of your Kubernetes cluster. The operation will return a list of Pod (io.fabric8.kubernetes.api.model.Pod).
from("kubernetes://https://localhost:8443?oauthToken=xxxxxxxx&category=pods&namespace=default&labelKey=kind&labelValue=http")
.to("mock:result")
In this Consumer snippet you will consume events from the Kubernetes cluster related to Pods, in the default
namespace, labeled with key kind
and value http
.
The exchange will contain the Pod object (in the body) with all the related metadata, a timestamp (in the CamelKubernetesEventTimestamp
header) and an action (in the CamelKubernetesEventAction
header).
A Little example
In the last days I worked on a little example that lists pods from a Kubernetes Cluster. I added the example to the Apache Camel examples folder.
The Camel CDI Kubernetes example assumes you have a Kubernetes Cluster running in your environment.
Personally, for testing I prefer to use the Vagrant Openshift Image from the Fabric8 team:
to run the image and getting started you can follow the guide from Fabric8 documentation.
After the vagrant up
command has completed remember to run the following command:
fabric8-installer/vagrant/openshift$ export KUBERNETES_DOMAIN=vagrant.f8
fabric8-installer/vagrant/openshift$ export DOCKER_HOST=tcp://vagrant.f8:2375
and login in Openshift
fabric8-installer/vagrant/openshift$ oc login https://172.28.128.4:8443
you will be asked from user/password (admin/admin). After the login will be successful you will be able to get your OAuth token (this will be useful for the Camel-Kubernetes example):
fabric8-installer/vagrant/openshift$ oc whoami --token
XXxnZi2YghQyke-eARXBZ38K-p5zIpco0ShdzO8gK6U
Now we have all the elements to run our example. Edit the apache-deltaspike.properties with the correct OAuth Token from Openshift.
oscerd@localhost:~/workspace/apache-camel/camel/examples/camel-example-cdi-kubernetes$ mvn compile camel:run
The route is triggered with a timer and it has a repeatCount option equals to 3. The output of the examples will be:
2016-07-12 12:00:42,220 [cdi.Main.main()] INFO CdiCamelExtension - Camel CDI is starting Camel context [camel-example-kubernetes-cdi]
2016-07-12 12:00:42,220 [cdi.Main.main()] INFO DefaultCamelContext - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) is starting
2016-07-12 12:00:42,222 [cdi.Main.main()] INFO ManagedManagementStrategy - JMX is enabled
2016-07-12 12:00:42,355 [cdi.Main.main()] INFO DefaultTypeConverter - Loaded 188 type converters
2016-07-12 12:00:42,376 [cdi.Main.main()] INFO DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-07-12 12:00:42,468 [cdi.Main.main()] INFO DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-07-12 12:00:42,828 [cdi.Main.main()] INFO DefaultCamelContext - Route: route1 started and consuming from: timer://stream?repeatCount=3
2016-07-12 12:00:42,831 [cdi.Main.main()] INFO DefaultCamelContext - Total 1 routes, of which 1 are started.
2016-07-12 12:00:42,832 [cdi.Main.main()] INFO DefaultCamelContext - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) started in 0.611 seconds
2016-07-12 12:00:42,878 [cdi.Main.main()] INFO Bootstrap - WELD-ENV-002003: Weld SE container STATIC_INSTANCE initialized
We currently have 13 pods
Pod name docker-registry-1-c6ie5 with status Running
Pod name fabric8-docker-registry-pgo5y with status Running
Pod name fabric8-forge-wvyw7 with status Running
Pod name fabric8-tr0b9 with status Running
Pod name gogs-2p4mn with status Running
Pod name grafana-754y7 with status Running
Pod name infinispan-client-a7z3k with status Running
Pod name infinispan-client-iubag with status Running
Pod name infinispan-server-wl0in with status Running
Pod name jenkins-cr2ez with status Running
Pod name nexus-aarks with status Running
Pod name prometheus-mp0kr with status Running
Pod name router-1-dkjsb with status Running
We currently have 13 pods
Pod name docker-registry-1-c6ie5 with status Running
Pod name fabric8-docker-registry-pgo5y with status Running
Pod name fabric8-forge-wvyw7 with status Running
Pod name fabric8-tr0b9 with status Running
Pod name gogs-2p4mn with status Running
Pod name grafana-754y7 with status Running
Pod name infinispan-client-a7z3k with status Running
Pod name infinispan-client-iubag with status Running
Pod name infinispan-server-wl0in with status Running
Pod name jenkins-cr2ez with status Running
Pod name nexus-aarks with status Running
Pod name prometheus-mp0kr with status Running
Pod name router-1-dkjsb with status Running
We currently have 13 pods
Pod name docker-registry-1-c6ie5 with status Running
Pod name fabric8-docker-registry-pgo5y with status Running
Pod name fabric8-forge-wvyw7 with status Running
Pod name fabric8-tr0b9 with status Running
Pod name gogs-2p4mn with status Running
Pod name grafana-754y7 with status Running
Pod name infinispan-client-a7z3k with status Running
Pod name infinispan-client-iubag with status Running
Pod name infinispan-server-wl0in with status Running
Pod name jenkins-cr2ez with status Running
Pod name nexus-aarks with status Running
Pod name prometheus-mp0kr with status Running
Pod name router-1-dkjsb with status Running
^C2016-07-12 12:00:50,946 [Thread-1 ] INFO MainSupport$HangupInterceptor - Received hang up - stopping the main instance.
2016-07-12 12:00:50,950 [Thread-1 ] INFO CamelContextProducer - Camel CDI is stopping Camel context [camel-example-kubernetes-cdi]
2016-07-12 12:00:50,950 [Thread-1 ] INFO DefaultCamelContext - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) is shutting down
2016-07-12 12:00:50,951 [Thread-1 ] INFO DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
2016-07-12 12:00:50,953 [ - ShutdownTask] INFO DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: timer://stream?repeatCount=3
2016-07-12 12:00:50,953 [Thread-1 ] INFO DefaultShutdownStrategy - Graceful shutdown of 1 routes completed in 0 seconds
2016-07-12 12:00:50,965 [Thread-1 ] INFO MainLifecycleStrategy - CamelContext: camel-example-kubernetes-cdi has been shutdown, triggering shutdown of the JVM.
2016-07-12 12:00:50,968 [Thread-1 ] INFO DefaultCamelContext - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) uptime 8.748 seconds
2016-07-12 12:00:50,968 [Thread-1 ] INFO DefaultCamelContext - Apache Camel 2.18-SNAPSHOT (CamelContext: camel-example-kubernetes-cdi) is shutdown in 0.018 seconds
2016-07-12 12:00:50,977 [Thread-1 ] INFO Bootstrap - WELD-ENV-002001: Weld SE container STATIC_INSTANCE shut down
As you can see the returned pods are 13 in my environment and all of them are in Running status.
Conclusions
This post was just a little introduction on what Camel-Kubernetes can do. I think the most interesting part is the consumer side anyway. I will focus on the different types of consumer in one of next Camel related post. Meanwhile please try the component yourself and if you find something wrong, you think something can be improved or you think there is something else we can focus on, don’t hesitate and post on Camel Dev mailing list or on the Camel Users mailing list, or, if you are sure there is a bug or you think an improvement is truly needed, raise a JIRA on Apache Camel JIRA.
You can find examples of different operations you can do with Producer Endpoints and Consumer Endpoints in the Component unit test.
Stay tuned for the next Camel-Kubernetes consumers post.
06 Jul 2016
Writing a new Camel component is simple, but sometimes it’s not so easy for a beginner to understand how it can be integrated in current Camel codebase.
With this post I’d like to show you how you can do this step-by-step.
A simple component
In this guide I will add a simple component: camel-square. This is just a simple example, useful to show the steps needed for integrating and contributing your component to Apache Camel codebase.
Using the Archetype
The latest Camel release is 2.17.2 at the moment of writing, so we will use the archetype from this version.
Be sure you’ve cloned the Apache Camel repository from github and the codebase is aligned with the upstream codebase.
Open a terminal and enter into the components folder
~/workspace/apache-camel/camel/components$
Generate the skeleton for the camel-square component
~/workspace/apache-camel/camel/components$ mvn archetype:generate -DarchetypeGroupId=org.apache.camel.archetypes -DarchetypeArtifactId=camel-archetype-component -DarchetypeVersion=2.17.2 -DgroupId=org.apache.camel -DartifactId=camel-square -Dname=Square -Dscheme=square
You will be asked for the version of the component and other things in interactive way. Since the camel version currently in development is 2.18, use 2.18-SNAPSHOT as version of your component.
Cleaning up the generated component
First thing to do is cleaning the pom of the newly generated component a bit. So:
- Change the packaging from bundle to jar.
- Remove all the version tags from dependencies
- In the properties section of POM remove everything and add the following properties:
<properties>
<camel.osgi.export.pkg>org.apache.camel.component.square.*</camel.osgi.export.pkg>
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=square</camel.osgi.export.service>
</properties>
<name>Camel :: Square</name>
- Since the GroupId is inherited from the parent POM of components folder, you can remove the groupId tag from your component POM.
- Remove camel-apt from the dependencies
Finally your POM should be like this:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>components</artifactId>
<groupId>org.apache.camel</groupId>
<version>2.18-SNAPSHOT</version>
</parent>
<artifactId>camel-square</artifactId>
<packaging>jar</packaging>
<name>Camel :: Square</name>
<properties>
<camel.osgi.export.pkg>org.apache.camel.component.square.*</camel.osgi.export.pkg>
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=square</camel.osgi.export.service>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<!-- testing -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
You now need to create the right packaging. So move the generated classes from src/main/java/org/apache/camel to src/main/java/org/apache/camel/component/square
and do the same for the test package. Don’t forget to align also the file src/main/resources/META-INF/services/org/apache/camel/component/square
to point to the component class
class=org.apache.camel.component.square.SquareComponent
We are now ready to write our component.
Writing the component
This is a simple example: the component won’t have a lot of features and we will use only a Producer Endpoint. You can remove the class SquareConsumer then.
Your SquareComponent may look like this:
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.square;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
/**
* Represents the component that manages {@link SquareEndpoint}.
*/
public class SquareComponent extends UriEndpointComponent {
public SquareComponent() {
super(SquareEndpoint.class);
}
public SquareComponent(CamelContext context) {
super(context, SquareEndpoint.class);
}
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
Endpoint endpoint = new SquareEndpoint(uri, this);
setProperties(endpoint, parameters);
return endpoint;
}
}
and your SquareEndpoint class like this:
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.square;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriPath;
/**
* Represents a Square endpoint.
*/
@UriEndpoint(scheme = "square", title = "Square", syntax="square:name", label = "Square")
public class SquareEndpoint extends DefaultEndpoint {
@UriPath @Metadata(required = "true")
private String name;
public SquareEndpoint() {
}
public SquareEndpoint(String uri, SquareComponent component) {
super(uri, component);
}
public Producer createProducer() throws Exception {
return new SquareProducer(this);
}
public Consumer createConsumer(Processor processor) throws Exception {
throw new UnsupportedOperationException("The Square endpoint doesn't support consumers.");
}
public boolean isSingleton() {
return true;
}
/**
* Some description of this option, and what it does
*/
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
Since the component won’t support Consumer Endpoints, we throw an UnsupportedOperationException in that particular case.
Let’s write our SquareProducer. The Producer Endpoint will get the body of the message and it will calculate the square of the body.
At the beginning we will have a producer of this form:
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.square;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Square producer.
*/
public class SquareProducer extends DefaultProducer {
private static final Logger LOG = LoggerFactory.getLogger(SquareProducer.class);
private SquareEndpoint endpoint;
public SquareProducer(SquareEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
public void process(Exchange exchange) throws Exception {
System.out.println(exchange.getIn().getBody());
}
}
The Math.pow() method is slow. For this example we will simply multiply the number by itself.
Once again: this is just an example to show how you can contribute your component to the Apache Camel community, so there won’t be validation on the Body type and so on.
We may think about a Producer like this:
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.square;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Square producer.
*/
public class SquareProducer extends DefaultProducer {
private static final Logger LOG = LoggerFactory.getLogger(SquareProducer.class);
private SquareEndpoint endpoint;
public SquareProducer(SquareEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
public void process(Exchange exchange) throws Exception {
LOG.debug("Getting value from exchange");
Integer value = exchange.getIn().getBody(Integer.class);
LOG.debug("Computing square");
Integer square = value * value;
LOG.info("The square is " + square);
if (exchange.getPattern().isOutCapable()) {
Message out = exchange.getOut();
out.copyFrom(exchange.getIn());
out.setBody(square);
} else {
Message in = exchange.getIn();
in.setBody(square);
}
}
}
It’s very simple. It just compute the square and, based on the Exchange Pattern, put the computation result in the in/out message.
Let’s take a look at the testing part. You should have a test class SquareComponentTest: let’s modify it a bit. The result can be:
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.square;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
public class SquareComponentTest extends CamelTestSupport {
@Test
public void testSquare() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMinimumMessageCount(1);
mock.expectedBodiesReceived(9);
template.sendBody("direct:square", 3);
assertMockEndpointsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
from("direct:square")
.to("square://bar")
.to("mock:result");
}
};
}
}
Now the component is ready for the last little things to do.
Try to install it:
~/workspace/apache-camel/camel/components/camel-square$ mvn clean install
Check for code-style errors and eventually fix it
~/workspace/apache-camel/camel/components/camel-square$ mvn -Psourcecheck
Integrating the component in the Apache Camel codebase
The component is now ready to be integrated in the current Camel codebase.
We need to add a dependency in apache-camel/pom.xml
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-square</artifactId>
</dependency>
Include the component in apache-camel/src/main/descriptors/common-bin.xml
<include>org.apache.camel:camel-square</include>
Include the component in parent/pom.xml
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-square</artifactId>
<version>${project.version}</version>
</dependency>
Now you’re able to build your new Camel 2.18-SNAPSHOT with camel-square included:
~/workspace/apache-camel/camel/$ mvn clean install -DskipTests
At the end of the build you should see the camel-square component listed.
Component documentation
From Camel 2.18-SNAPSHOT we will have documentation generated from code inside our codebase.
The default directory inside the new component folder is src/main/docs
and the documentation file in is an .adoc file with the same name of the component, in this case it will be square.adoc
Create the src/main/docs/square.adoc
file with content and the following placeholders
// component options: START
// component options: END
// endpoint options: START
// endpoint options: END
The placeholders are just for the automatic documentation generation.
Run a clean install on the component once again
~/workspace/apache-camel/camel/components/camel-square$ mvn clean install
Your .adoc file will contain the documentation updated for component and endpoint.
Now add a link to your component in the file docs/user-manual/en/SUMMARY.md
. This way the component will be added to the Gitbook generated from the Asciidoc component files.
Integrating with Apache Karaf
Apache Karaf is an important ally for Apache Camel.
Usually a Camel component should work also in an OSGi environment. For each component in Camel there is a Karaf feature definition. Let’s add the one for camel-square.
In platform/karaf/features/src/main/resources/features.xml
add the following code
<feature name='camel-square' version='${project.version}' resolver='(obr)' start-level='50'>
<feature version='${project.version}'>camel-core</feature>
<bundle>mvn:org.apache.camel/camel-square/${project.version}</bundle>
</feature>
Since the component is very simple we don’t need external bundle in this case. To test if our feature work in an OSGi enviroment we have to add an integration test.
Let’s create it in tests/camel-itest-karaf
with the name CamelSquareTest. The content of the test will be:
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.itest.karaf;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
@RunWith(PaxExam.class)
public class CamelSquareTest extends BaseKarafTest {
public static final String COMPONENT = extractName(CamelSquareTest.class);
@Test
public void test() throws Exception {
testComponent(COMPONENT);
}
}
To run the test we have to follow two steps. First, we need to build our Karaf features
~/workspace/apache-camel/camel/platform/karaf/features$ mvn clean install
and second run the integration test
~/workspace/apache-camel/camel/tests/camel-itest-karaf$ mvn clean test -Dtest=CamelSquareTest
or
~/workspace/apache-camel/camel/tests/camel-itest-karaf$ run-tests CamelSquareTest
If everything is fine and test passes the component will install in Karaf.
Contributions
The camel-square component is now part of Camel. What is still missing? The Pull Request off course :-)
This surely is the most satisfactory part :-)
You should have everything committed locally and maybe you need to align to the current Apache Camel codebase.
~/workspace/apache-camel/camel$ git pull --rebase <remote_name> master
If you have multiple commit for your component, squash them in a single one it’s a good idea.
At this point you just need to open the Pull Request and the Apache Camel team will review it for you. You’ll receive feedback about improvements you can do and, off course, thanks from the community :-)
Conclusions
In this post I’ve shown how a Camel component can be added to the Apache Camel codebase step-by-step.
I think writing components is one of the best things to do to understand Camel architecture and features and to improve your Camel skill.
Camel community love contributions, you just need to start :-)
18 Apr 2016
In the latest version of Apache Camel (2.17.0) we released camel-nats. NATS, is a cloud-native messaging system from Apcera.
The first version of the component was based on java_nats, this library has been deprecated from a while and we decide to switch
to the brand new client JNats in the next major release (Camel 2.18.0)
The component
Camel-Nats provides both producing/consuming endpoints. These are the options you can define:
- servers - a list of comma separated gnatsd servers
- topic - The topic name you want to use
- reconnect - Whether or not using reconnection feature (default true)
- pedantic - Whether or not running in pedantic mode (default false)
- verbose - Whether or not running in verbose mode (default false)
- ssl - Whether or not using SSL (default false)
- reconnectTimeWait - Waiting time before attempts reconnection (in milliseconds, default 3000)
- maxReconnectAttempts - Max reconnection attempts (default 3)
- pingInterval - Ping interval to be aware if connection is still alive (in milliseconds, default 4000)
- noRandomizeServers - Whether or not randomizing the order of servers for the connection attempts (default false)
- queueName - The Queue name if we are using nats for a queue configuration (consumer only)
- maxMessages - Stop receiving messages from a topic we are subscribing to after maxMessages (default unlimited, consumer only)
- poolSize - Consumer pool size (default 10, consumer only)
For more information about Nats configuration take a look at the docs
Examples
From the producer perspective:
from("direct:send").to("nats://localhost:4222?topic=test");
while from the consumer perspective you can have:
from("nats://localhost:4222?topic=test&maxMessages=5").to("mock:result")
In this case you will consume messages from the topic test until you received 5 messages.
The message from the consumer will have two headers:
- CamelNatsMessageTimestamp, the timestamp of the consumed message
- CamelNatsSubscriptionId, the Subscription Id of the consumer