This post assumes a basic comprehension of the Spring Integration Framework. If you just begin with Spring Integration, you could find the short book Just Spring Integration helpful to quickly grasp the core concepts, the different types of channel and their endpoints. Do not hesitate to refer to the official documentation if a term is new to you.
Spring Integration is published under the licence Apache, Version 2.0. The code presented in this post was simplified (robustness, performance, validation, etc) and should not be used outside of this context. This post is based on the version 4.2.5.RELEASE of Spring Integration.
A basic example
To illustrate the inner working of Spring Integration, we will use the following basic example. This example is inspired from the official Spring Enterprise Training and highlights common misunderstandings (message immutability, the Proxy Gateway, the different types of channel, the right use of gateway and service activator to keep our code free from dependencies on the Spring Integration API, …)
The OrderService definition follows:
The flow begins with a proxy Gateway. When the method submitOrder is called, a new message containing the order passed in argument is created and sent to the channel newOrders. This channel is defined as a publish-subscribe channel, so that multiple endpoints could consume the message. The first endpoint to consume the message is a Service Activator that implements the business logic. The service activator delegates to an instance of the following class:
This code creates an object Confirmation after generating a confirmation number. The confirmation is then send to the temporary channel created by the Gateway (as there is no output-channel configured on the service activator).
In addition to the service activator, there is another endpoint of type Bridge that listen to new orders. This component is often used to connect a subscribable channel to a pollable channel to let us use the MessagingTemplate in our test. This is exactly what we will use to test our Spring Integration Flow as demonstrated by the following JUnit test:
We inject the Gateway implementation provided by Spring Integration in our test by using the @Autowired implementation. Then, we call the unique method offered by this interface to start the flow and check the confirmation number was correctly assigned on the returning object. We finish by checking the order is available in our pollableChannel to demonstrate the classic use of the Bridge endpoint when using the MessagingTemplate to test our code.
When run, the test displays a message similar to the following in the console:
We will not go further with our use of Spring Integration. Instead, we remove completely the dependency from our pom.xml. In this post, we will implement the minimal code to make the test pass again, trying to stay as close as possible to the original implementation.
Only the spring-integration-core artifact was commented. We will continue to use the other Spring Core project (spring-beans and spring-context).
Core Abstractions
As a messaging framework, Spring Integration could be described by three core abstractions: Message, MessageChannel and MessageEndpoint.
Message Abstraction
A message, if you use JMS, Kafka, SOAP, and so on, is always identified by a payload representing the data we want to send, and a collection of headers (key-value), used by the messaging infrastructure to route the message among the different destinations. This way, a messaging framework does not have to concern itself with the content of the message (whose size varies unlike headers whose values have simple type).
As of release 4.0, core Spring Integration interfaces migrated to a new project spring-messaging included in Spring Core. The aim is to reuse these abstractions in other modules using the concept of message too.
Here is the definition of the Message interface:
Where MessageHeaders is:
Several points are worth noting about this definition:
A message in Spring Integration is immutable (inherently thread-safe), so Spring Integration developers could write lock-free code. If we want to add a new header, we have to duplicate the message first.
As each message has a unique ID, the duplication will create a new message with its own ID. Internally, Spring uses the class java.util.Random to generate them.
The two abstractions (Message and MessageHeaders) are core classes inside the Spring Integration source. Most of the time, the messages are already created by a Gateway or an Adapter, but sometimes we need to create a message ourselves (to customize the headers or for testing purpose). In this post, we have to provide an implementation. The main implementation of Message is the class GenericMessage but it is recommended to use the MessageBuilder API to construct the message. Here is an implementation of these classes:
With the MessageBuilder fluent API, it’s easy to create new message:
Now that we know how to create a message, let’s see how to send them between components.
MessageChannel Abstraction
Components exchange messages through what is called a Channel. A channel is used to send and/or receive messages. Spring Integration defines many types of channel whose characteristics differ: does the receiver runs in the same thread as the sender (synchronous call), does multiples receivers could consumes a message (point-to-point vs publish-subscribe, does the receiver should wait for new message to arrive (passive endpoint) or does it should poll regularly for new message (active endpoint). To keep this post (relatively) short, we will implement the main ones:
Channel
Pattern
Mode
DirectChannel
Point-to-Point
Synchronous
QueueChannel
Point-to-Point
Asynchronous
PublishSubscribeChannel
Publish-subscribe
Synchronous
PublishSubscribeChannel with executor
Publish-subscribe
Asynchronous
All of these channels implement the MessageChannel interface:
What could surprise you is that this interface defines only methods for sending messages. Why? The answer depends on the channel type: PollableChannel or SubscribableChannel (must not be confused with PublishSubscribeChannel). Does the target endpoint should poll to received a message (active endpoint) or does the channel should send the message to the endpoint (passive endpoint). Let’s draw a diagram to clarify the class hierarchy:
For example, when using a DirectChannel, I should first subscribe to the channel to be notified automatically when a new message comes. When using a QueueChannel, I do not have to subscribe but I should poll regularly (for example, every second) to check if a new message is present. Given the polling interval, there is a latency between the sending and the receiving of a message.
Here is the definitions of the interfaces PollableChannel and SubscribableChannel:
Where MessageHandler is defined like this:
The MessageHandler interface will interest us later when we will implement our first endpoints. For now, let’s focus on the channel implementations, starting with the DirectChannel.
A DirectChannel is a Subscribable Point-to-Point channel. It means that a DirectChannel should send the message to one of the registered handlers, in the same thread as the sender. Concretely, a DirectChannel is nothing more and nothing less than a method call:
This implementation illustrates perfectly the main idea behind the DirectChannel but presents some flaws:
All SubscribableChannel should store a list of subscribers.
What if there is no subscriber?
What if an handler fails? Should I try the next one to see if it succeeds?
The first problem is easily solved. We create a superclass to contains the list of subscribers:
We replaced favorably the java.util.ArrayList by an instance of java.util.concurrent.CopyOnWriteArrayList. This implementation is thread-safe and best suited for applications in which set sizes generally stay small, read-only operations vastly outnumber mutative operations, and you need to prevent interference among threads during traversal.
The two remaining problems are solved by a code lightly more complex because we need to iterate over the handlers and correctly manage exceptions. Here is the final implementation of DirectChannel:
Spring Integration use a well-designed exception hierarchy, centered around the MessagingException. For this post, we choose to only use the root exception, whose definition follows:
Like the DirectChannel, the PublishSubscribeChannel is another example of synchronous channel. All message handlers are called successively in the sender thread. Here is an implementation reusing the utility AbstractSubscribableChannel:
Blocking the sender waiting for all handlers to process the message limits the scalability of our application. With Spring Integration, if the task-executor attribute is used, the actual handling of the message is performed asynchronously. With the standard Java Executor framework, it’s easy to support this attribute:
If an Executor is passed to the constructor, we use it to execute the handlers. According the concrete implementation (see java.util.concurrent.Executors for the available factory methods), the handers could be executed successively, or simultaneously using for example a pool of threads. If no executor is provided, we should conserve the synchronous behavior. To do that, we create a simple Executor implementation to execute the task in the same thread as the caller. With Java Lambda Expression, this definition is a one-liner:
This syntax is exactly the same as this more verbose definition:
One channel remains to define, the QueueChannel. The QueueChannel is by definition a asynchronous channel. Each new message is stored in a queue waiting for a handler to consume it. As for the DirectChannel, multiple handlers could subscribe to the channel but only one should consume the message. So, we need a concurrent implementation of the java.util.Queue interface. We will use a LinkedBlockingQueue:
The implementation relies heavily on the API BlockingQueue to support the different possibilities: the sender accepts to wait indefinitely, or a given amount of time, or not at all. If the timeout exceeds or if a thread is interrupted, our code should respond properly, so we need to catch InterruptedException and returns that the operations could not be performed.
This concludes the channel implementations. Before delving into the next abstraction, let’s recap what we have seen.
MessageEndpoint Abstraction
We just have seen how to send and receive messages from a channel. In practice, we will not use these API directly. Messages comes from different sources (JMS, file, application…) and we do not want our application code tightly coupled with the Spring Integration API (the Spring philosophy). We use endpoints instead. Endpoints are used for many tasks: receive a message from an external resource (JMS broker), send messages to another applications (HTTP call), support complex flow with Router, Filter, Bridge and many other components. In this post, we will confine ourselves to just three endpoints: Service Activator, Proxy Gateway and Bridge. The first two ones are used to protect our code free from any Spring Integration dependency and the last one is mainly used when writing unit tests. Here we go!
The class hierarchy
Unlike Message and MessageChannel interfaces, there isn’t a MessageEndpoint interface. There is an AbstractEndpoint class but not all endpoints extend this class. Although there is a one-to-one mapping between the EIP patterns and the component names in Spring Integration, each endpoint does not necessarily have a corresponding class. Maybe a class diagram could help us make things clearer.
Both consumer types delegates internally to an instance of MessageHandler:
We will now describe each one of these classes.
Message consumption
Regardless the channel type (Pollable vs Subscribable), we should execute some code when the Spring ApplicationContext starts. An endpoint associated to a PollableChannel should start a timer to periodically check if a new message is present. An endpoint associated to a SubscribableChannel should register itself as a subscriber to be notified when a new message comes. To avoid duplicating this logic, we will create the abstract superclass AbstractEndpoint.
This class implements SmartLifecycle. This interface is an extension of the Lifecycle interface for those objects that require to be notified upon ApplicationContext starting/refresh/shutdown. The isAutoStartup() return value indicates whether this object should be started at the time of a context refresh (true for our endpoints). With this hook, we can now implement the two main endpoint types: EventDrivenConsumer and PollingConsumer.
What about synchronization?
This class demonstrates common multithreading idioms, not really required for our basic implementation, but a good opportunity to take a look at common Spring code.
The volatile keyword tells the JVM that the variable will be updated by multiple threads and guarantees that reads and writes would be made directly to main memory, instead of registers or local processor cache. Since Java 5, volatile reads and writes establish a happens-before relationship, much like acquiring and releasing a mutex (like a synchronized block), a guarantee for double-checked locking pattern to work in Java (more on this subject later). volatile variables are typically used for indicating that an important lifecycle
event (such as initialization or shutdown) has occurred (as in our example).
Check the excellent article of Brian Goetz to know more about the volatile keyword.
volatile variables are great for initializing flags but does not prevent multiple threads from executing the start() method simultaneously, we have to resort on locking. In Java, we could declare our method as synchronized to declare an intrinsic lock or use an explicit lock, represented by the java.util.concurrent.Lock interface. ReentrantLock is the most widely used implementation and acts similarly as the synchronized keyword. In the previous code, we could have use synchronized methods instead of explicit locking as the whole method is guarded by the lock. In practice, not all the method should be protected and using an explicit lock is often a good performance optimization.
The first one we will implement is the EventDrivenConsumer:
The passive endpoint code is really simple. We just have to subscribe to the channel when the ApplicationContext starts. The core logic will be present in the message handler.
The code for the PollingConsumer is more complicated:
This code deserves some explanations.
The method doStart is called when the ApplicationContext is starting. A task represented by the Poller class is scheduled using the Spring Scheduling API. This task runs every 10 milliseconds:
The Poller class is defined as an inner class and implements the Runnable interface. This class polls regularly the message channel waiting for a new message:
The method to retrieve the message is similar to what we have done in previous examples:
The message handling is assured by the method handleMessage:
As for the EventDrivenConsumer, we delegates to an instance of MessageHandler. The only thing we have to do is wrap the exception if it is not an instance of MessagingException
The last thing to do is stop the scheduled task. We exploit the doStop() defined in the superclass and simply call the method cancel on the instance of ScheduledFuture.
So, we know how the messages are received from the channel but we still haven’t see how the message are processed by the different message endpoints. The processing happens in an instance of the interface MessageHandler:
There is little to say about this interface. Let’s see its implementations!
Message production
Spring Integration provides an abstract class extended by all message producers to factorize common attributes and common methods. Here is its implementation:
To see how this class is used by subclasses, let’s consider the Bridge implementation. A Bridge simply forward the input message directly to the output channel without modifying it. The main purpose of this handler is to bridge a PollableChannel to a SubscribableChannel or vice-versa.
The Service Activator implementation is a little more complex because we need to use reflection to delegate a Spring bean method:
Compared to the real code, this class ignores many concerns: methods defined by superclasses or methods accepting the Message class are ignored by our implementation.
We still have one endpoint to implement: the Proxy Gateway.
The Proxy Gateway exception
Spring Integration follows the Enterprise Integration Patterns (EIP) book to the letter, with just a few exceptions, as the Proxy Gateway. In messaging, a Gateway is a two-way component. For example, a JMS inbound gateway consumes a message on a queue, process it and publish another JMS message. The Proxy Gateway is an adaptation of this pattern. The Proxy Gateway is very convenient in practice because it keep our code loosely coupled.
Let’s go back to the example with the OrderService interface:
And the Proxy Gateway declaration:
At startup, Spring Integration will create for us an implementation similar to the following code:
When the message is called, a new message is created containing the method parameter as the payload. A temporary channel is also defined in the header replyChannel. This channel will be used by the first endpoint in the flow who do not have the output-channel attribute specified. This behavior is implemented in the previously covered AbstractMessageProducingHandler class:
The Gateway implementation ends by waiting a message listening to this temporary channel, before extracting the payload and returning it to the caller.
The problem with this Gateway implementation is that the code is statically generated and highly coupled with our code (for example, the dependency on Order). A framework like Spring Integration needs a more flexible solution: the combination of a dynamic Proxy and a FactoryBean implementation to instantiate it.
When using Spring AOP, a proxy could be created as simply as:
When using Spring Core, a FactoryBean is a simple bean, registered in the ApplicationContext like any other bean definition, whose task is to create another bean. A factory is often used when you have complex initialization code that is better expressed in Java (like creating a proxy) as opposed to a potentially verbose amount of XML. Here is an example:
When the ApplicationContext starts, Spring call the different methods to register a new bean of type OrderService. If we combine this class with the proxy creation code, we obtain an implementation of the Gateway endpoint:
Our implementation of the Proxy Gateway is almost done. There is only one concern remaining to address. What if an exception is thrown during the message processing? The answer depends on the method signature. Are we allowed to rethrow this exception or should we wrap it into a runtime exception? This is exactly what does the method rethrowExceptionCauseIfPossible:
We now have three perfectly operational endpoints. Finally, we could implement our initial use case using our version directly in XML like this:
Clearly, this code lacks expressiveness and we understand why Spring Integration adds syntactic sugar through the Spring Integration namespace. Let’s try to do the same thing!
A little bit of XML sugar
Almost every Spring module comes with its own namespace to facilitate the configuration of common beans. Spring Integration is no exception (Spring Integration defines more than 30 namespaces, one for each supported technology!).
The implementation of a custom namespace is well documented in the official Spring Core documentation.
Creating new XML configuration extensions can be done by following these (relatively) simple steps:
Authoring an XML schema to describe your custom element(s).
Coding a custom NamespaceHandler implementation (this is an easy step, don’t worry).
Coding one or more BeanDefinitionParser implementations (this is where the real work is done).
Registering the above artifacts with Spring (this too is an easy step).
What follows is a description of each of these steps.
Authoring the schema
We start with authoring an XML Schema to describe the extension. What follows is the schema we’ll use to configure our simple use case.
Coding a NamespaceHandler
In addition to the schema, we need a NamespaceHandler that will parse all elements of this specific namespace Spring encounters while parsing configuration files. We just have to define a class implementing the NamespaceHandler interface and associate a BeanDefinitionParser for each element in our namespace:
The observant reader will notice that there isn’t actually a whole lot of parsing logic in this class.
Indeed, most of this work happens in the NamespaceHandlerSupport class. This Spring Core provided class does most of the work and delegates to a BeanDefinitionParser when it needs to parse an element in the new namespace.
BeanDefinitionParser
The BeanDefinitionParser is responsible for parsing one distinct top-level XML element defined in the schema. Inside the parser, we’ll have access to the XML element (and thus its subelements too) so that we can parse our custom XML content, as can be seen in the following example:
This example handle the element <channel>. In this simple case, this is all that we need to do. We check if the queue subelement is present to determine if we need to create a DirectChannel or a QueueChannel.
Let’s inspect the <service-activator> element:
The code seems more complex but it is only because we need to register two beans: the service-activator handler (delegates to a bean) and a consumer (read new messages from the input channel). We have to use the org.springframework.beans.factory.support.BeanDefinitionReaderUtils class to generate a bean name to link the two beans together. The remaining code is classic BeanDefinitionParser code.
What is interesting is how the consumer is instantiated. How to determine if we need to create an Event-Driven Consumer (to read from a PublishSubscribeChannel for example) or a Polling Consumer (to read from a QueueChannel for example). We can’t. We need to report that decision for later when the application context will really start. So, we create an instance of FactoryBean (like the previous GatewayProxyFactoryBean). Here is its implementation:
We use an instance of ChannelResolver to retrieve the MessageChannel instance corresponding to the name specified in the XML file. This utility class simply delegates to a BeanFactory:
Then, we test the type of the channel to instantiate the right consumer, passing the handler as a constructor argument:
This code presents a serious flaw. If we run our program now, the handler will never receive any message. Did you have an idea?
If we go back in this post, the superclass of PollingConsumer and EventDrivenConsumer, AbstractEndpoint, implements the SmartLifecycle interface to auto-start the consumers. This only works on bean instantiated by Spring. In the previous code:
As we instantiate the consumer ourselves, we have the responsibility to call the lifecycle methods. This is simple to implement by implementing the same interface, and delegating to the inner endpoint:
We will not describe the remaining channels and endpoints. The code is very similar to the code presented here. (You could check the full source code here).
Registering the handler and the schema
The coding part is finished! All that remains to be done is to make the Spring XML parsing infrastructure aware of our custom namespace. For our example, we need to write the following two files:
META-INF/spring.handlers
The first part (the key) of the key-value pair is the URI associated with your custom namespace extension, and needs to match exactly the value of the ‘targetNamespace’ attribute as specified in your custom XSD schema.
META-INF/spring.schemas
This file is needed to prevent Spring from connection to the Internet to retrieve the schema file. If you specify the mapping in this properties file, Spring will search for the schema on the classpath (in this case my-spring-integration-1.0.xsd in the my.springframework.integration.config.xml package).
Why should I not specify the version of the XSD ?
Spring recommends to never specify the schema version when using a namespace. Ex:
But not: (even if it works)
The previous file reveals how this is implemented. The spring.schemas file contains a definition for the two versions. In practice, this file contains all previously versions too!
This explains why the following code continue to work even when we upgrade the version of the Spring Framework:
Using a custom extension
Our namespace could be used in the same way as we used the official namespace at the start of this post, except we should update the namespace URI:
We re-run the test. Green. Done.
Congratulations!
The implementation of our own Spring Integration framework is finished. We now have a basic but working solution implementing core components of the framework. The full code source of this post is available on GitHub.
To remember
volatile variables are useful for initializing variables. ReentrantLock provides explicit locking mechanism similar to the synchronized keyword but are more fine-grained.
Java Executor framework should be privileged instead of using directly the Thread API.
Creating a new XML namespace with Spring is easy. The parsing code is completely hidden behind well designed interfaces. Moreover, adding syntactic sugar increases the chance of adoption of a framework.
A FactoryBean instance could be used when creating an object in XML is too complicated.
The SmartLifecycle interface could be used to auto start-up your bean.
Try for yourself!
There is so much to cover about Spring Integration. Why not try to analyze other features of the framework. Here is some ideas:
Channel Interceptors: Try implement your own Wire-Tap pattern.
Unicast vs Multicast: Our current SubscribableChannel implementations send messages to endpoints. This differs lightly from the official implementations where these channels use two abstraction: UnicastingDispatcher for point-to-point and BroadcastingDispatcher for publish-subscribe. Why not inspect how these classes works internally (load-balancing and fail-over support).
Jms Inbound Adapter vs Jms Inbound Gateway: One of the least understood point when beginning with Spring Integration but an important one to grasp. Inspect the code and see how the JmsTemplate is used to create one-way (Adapter) and two-ways (Gateway) communications.
Java DSL: The Spring Integration JavaConfig and DSL extension provides a set of convenient Builders and a fluent API to configure Spring Integration message flows from Spring @Configuration classes. Try to reuse our code to offer a similar API instead of the verbose XML configuration.
About the author
Julien Sobczak works as a software developer for Scaleway, a French cloud provider. He is a passionate reader who likes to see the world differently to measure the extent of his ignorance. His main areas of interest are productivity (doing less and better), human potential, and everything that contributes in being a better person (including a better dad and a better developer).