These are the notes I took while reading the Manning’s “Spring Integration in Action” book (Mark Fisher, Jonas Partner, Marius Bogoevici, and Iwein Fuld), before taking the Spring Integration exam.
I found the book to be a nice guide to some of the topics, before actually starting to read the Java documentation.
More notes about preparation for the exam itself are
here.
Table of contents
- 1. Background
- 2. Messaging in Spring Integration
- 3. Building Messaging Systems
- 4. Integrating Existing Systems
- 5. Advanced Topics
1. Background
1.1. Enterprise Intergration Patterns - introduction
EIP are made of three base patterns:
- Message
- Message Channel
- Message Endpoint
Message
Message consists of:
- header - data relevant only to the messaging system
- payload - data to be processed
Message types:
- command message - tells the receiver to do sth
- event message - notifies that sth has happened
- document message - transfers data
The message is a representation of the contract between the sender and the receiver.
Message Channel
Manages how (e.g. async or sync) and where the message is delivered, but does not interact with its content. It also decouples the sender from receiver. There are two types of channels:
- point-to-point - one message is received by exactly one receiver (but not neccessarily always the same)
- publish-subscribe - one message received by multiple subscribers (zero or more)
Message Endpoints
They actually do sth with the message.
- Channel Adapter - connects an application to the messaging system. The message flow is
uni-directional.
|\ _____
APP --> || --> |_____| --> ...
|/ - Messaging Gateway - used for bi-directional messaging, e.g. where we want to di asynchronous stuff behind the curtain, and the caller is aware only of sync invocation.
sync _____ async
--> |\ --> |_____| --> ...
APP || sync _____ async
<-- |/ <-- |_____| <-- ...
(inbound gateway) - Service Activator - invokes a service based on the message and sends back the result. It's like
outbound gateway but with certain purpose and within same Spring Context.
_____ ___
... --> |_____| --> | o | -->
_____ | | SERVICE
... <-- |_____| <-- | o | <--
'---' - Router - everyone knows, but two points to note: it does not change the message and it is aware of other channels
,-----,
| / *-| --> ...
... --> |* *-| --> ...
|___*-| --> ... - Splitter - splits the message
,---------,
| []|
... --> |[] --> []| --> ...
| []|
'---------' - Aggregator - waits for group of correlated messages and merges them when the group is complete (needs to know correlation id of each message and group size)
,---------,
|[] |
... --> |[] --> []| --> ...
|[] |
'---------'
1.2 Event-driven architecture
1.3. Integration styles
Actually Spring focuses on number 4, but also supports the other ones.
- File based - no transactions, no message medatada, no atomicity; though sometimes can be used
- Shared database - atomic operations, data consistency, domain model specified => extra coupling
- staging tables, for transferring data in steps
- sharing data - Remote procedure calls - tries to hide the fact that different services are running on different systems; has to serialize objects (XML, Java), network is not always reliable, there is nothing in the middle to take care of assuring delivery
- Message based integration - small data packets are exchanged frequently between endpoints via channels, in async manner
2. Messaging in Spring Integration
2.1. Message
- metadata is in message's headers
- the message is immutable! use MessageBuilder
2.2. Channels
The main interface goes like this:
send (m)
send (m, timeout)
unsubscribe (messageHandler)
Bridge
With the email example we also want to have more consumers, so we change the channel which puts stuff in the queue to publish-subscribe-channel, and add a bridge right after it and before the queue. This is done so that the pub-sub channel can handover the messages to another thread in the bridge, which guarantees the pub-sub channel does not run out of threads from its thread pool. (I don’t get why the threads would get exhausted but ok)
Priority queue
It’s easy to change a channel into a priority queue by adding “priority-queue” XML tag inside. It has a reference to PriorityComparator.
Channel collabolators
They are MessageDispatchers and ChannelInterceptors, which says nothing now, but I promise it’s the last additional piece of information on component types.
Message Dispatcher
Is a thing which decides what happens once a message arrives at a channel which was a subscribable channel.
<MessageDispatcher>
bool addHandler (messageHandler)
bool removeHandler (messaheHandler)
bool dispatch (m)
It can either do the “competing consumers” thing or broadcasting. And correspondigly, we have UnicastingDispatcher and BroadcastingDispatcher. If we choose competing consumers we have to define competition rules by adding reference to a LoadBalancingStrategy.
<LoadBalancingStrategy>
Iterator<MessageHandler> getHandlerIterator(m, List<handlers>)
The available implementation choice is one: RoundRobinLoadBalancingStrategy. Yes, in round robin the message is not really important in the method, but if you wanna implement crazy rules, with this interface you can (though they say you typicaly shouldn’t).
They didn’t show XML definition. I think normally it’s done under the hood so you don’t touch this. It’s just to know how things work internally.
Channel Interceptor
Seems like extra piece of functionality. You can intercept or filter messages.
<ChannelInterceptor>
m preSend (m, channel)
postSend (m, channel, wasSent)
bool preReceive (channel)
m postReceive (m, channel)
The “receive” methods are available only for pollable channels. In general, return null instead of message to break the processing. In “preReceive” return false to break the processing, this is called before the message is eve read, that’s why it does not deal with the message.
We add interceptors like this:
<channel ..>
<interceptors>
<beans:ref=”…>
…
There are Spring examples of interceptors.
- WireTap - non invasively copies the message to another processing path (async); it is also possible to define it globally but they didn't show how;
- MessageSelectingInterceptor that uses a MessageSelector (bool accept(m)), e.g. PayloadTypeSelector, which allows to implement a Datatype Channel EI pattern; (don't confuse with MessageFilter which does exactly same but as a message endpoint)
2.3 Message Endpoints
- if channel is subscribable, the endpoint will be invoked by channel's TaskExecutor - not much to do
- if channel is pollable, it needs to be polled by the endpoint
- if channel is bidirectional, ie expects a reply, then the endpoint should provide it of course
- ...
- inbound/outbound - from perspective of Spring Integration application
- internal/external - with respect to application context
Polling vs not polling
- polling endpoint = asychronous message hand-off (as you don't know when the messahe arrives so you have to poll in time intervals)
- event-driven endpoint = synchronous message hand-off
- polling endpoint - polls for new messages periodically, and manages thread(s) which do(es) that; Spring automatically wraps any passive endpoint connected to a PollableChannel into a polling endpoint;
- event-driven endpoint - used in most cases, such endpoint does not take responsibility for thread management; if the subscribable channel sending to this endpoint does not want to wait for message hand-off it can still use a thread pool, but then the trasaction boundary will be broken
Transaction boundaries
- a message source - if the message source is transactional, it participates in same transaction
- a queue channel
Under the hood
3. Building Messaging Systems
3.1. Separation of concerns
Transformers
- when you design your business domain don't keep the intergration in mind, it should be independent; a dto should not have a "toXml()" method, rather add an appropriate transformer in your configuration later;
- later you can even build transformers that will call other services to build the message, that's exactly fine;
- a such transformer you'd annotate with @MessageEndpoint, which is also a stereotype for @Component;
- testing of a workflow is done by @Autowiring the channels and sending a message using MessageBuilder;
- normally for every endpoint one of the two is required: output-channel, or reply_to header on the message; if none is set, exception is thrown;
- <mail:header-enricher><mail:to expression="payload?.emailAddress"/>..
Service Activators
- if the output-channel is not defined, then the service activator will use the message's reply_to header's value to send the reply;
- so switching between chained services and request-reply model is a matter of adding/removing the output-channel from XML
- you can provide output-channel="nullChannel" to ignore the response
Interceptors that publish messages
- @Publisher(channel="targetChannel") - method will additionally publish result to that channel; it's an interceptor;
Messaging Gateways
- if we wanna publish to multiple, we use a gateway; we set the default-request-channel to the one from which the messages will be picked up, and passed to transformer, and next to pub-sub channel; that's just the example configuration;
Chaining endpoints
- we can do <chain> .... </chain> and put the endpoints inside in proper order, without specifying the channels at all;
- the channels will be sync
- all the channels but the last must return output (returning null is fine)
- last channel must have an output-channel or replyChannel header defined
- a router can only be last
3.2. Routing and filtering
Filter
- if both "discard channel" and "throw exception" are set, the message will also be sent to the discard channel;
Router
- if router's method returns null, the message won't be processed anymore
- if it returns list of strings, the message will be sent to all these channels
- all routers are content-based routers
- if the resolver is specified, it maps the result name to the real channel name
Payload-type router
- routing based on payload type:
Header value router
- routing based on a specific header's value:
- often used with a header enricher:
Recipient list router
- sends message to multiple channels:
3.3 Splitting and aggregating
- Meal prepareMeal(List<m>) - returns an aggregated message based on contents of the message group
- Object getCorrelationKey(m) - returns an identifier of this group
- bool canCookMeal(List<m>) - returns true if the group is complete
Scatter-gather algorithm
4. Integrating Existing Systems
4.1. XML
Marshalling and unmarshalling
- @javax.xml.bind.annotation.XmlRootElement(name = "rootTagName")
- @XmlElement
- @XmlJavaTypeAdapter(YourAdapter.class)
<si-xml:marshalling-transformer
input-channel="input"
output-channel="xmlOut"
marshaller="myMarshaller"
result-transformer="resultToDocumentTransformer"/>
You need the result transformer, as the marshaller returns an object of type Result, which you can further convert to String or Document, or whatever. There are two ready result transformers in Spring Integration, ResultToDocumentTransformer and ResultToStringTransformer.
In case you wanted to so an XSLT transformation, use:
<si-xml:xslt-transformer
input-channel="inputXml"
output-channel="transformedXml"
xsl-resource="classpath:/xsl/blablah.xsl"/>
You also may wanna split XML using XPath, you can do it usingXPath splitter:
<si-xml:xpath-splitter create-documents="true"
input-channel, output-channel/>
<si-xml:xpath-expression
expression="parentNodeName"
ns-prefix="hb"
ns-uri="http://www.example.com/blablah"
namespace-map="namespaceMap"
</si-xml:xpath>
<util:map id="namespaceMap">
<entry key="hb" value="http://www.example.com/blablah"
...
</util:map>
- if create-documents is set to true, each part will be wrapped in a separate XML document, otherwise it will be just raw content
4.2. JMS
- general abstraction over MOM (Message Oriented Middleware)
- Spring Integration is doing well already without JMS, but it's worth to be able to connect to existing JMS system; or to have some persistent storage in an independent queue, or between different JVMs, or to actually have transactions, or implicit load balancing
- ActiveMQ is an opensource implementation of JMS
JMS terminology
JMS refresh
- JmsTemplate
jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setDefaultDestination(new ActiveMQQueue("siisa.queue"));
jmsTemplate.convertAndSend("Helloo");
String res = jmsTemplate.receiveAndConvert();
If transaction is active and was already opened by some process, this executes in same transaction. Transaction is bound to JMS session.
The conversion is done by default by SimpleMessageConverter, which maps the Java type to the MessageType (TextMessage, MapMessage, BytesMessage, ObjectMessage, etc), but it can be customized. It can even be replaced by Spring XOM MarshallingMessageConverter, to have to conversion from and to XML.
JmsTemplate is sync. - MessageListener
This is async. And provides transaction handling which would be tricky with async.
You'd actually implement MessageListenerAdapter, as it's eliminating some boilerplate code. And even better, define it in XML config:
<jms:listener-container>
<jms:listener
destination="myQueue"
ref="aPojo"
method="someMethod"/
</jms:listener-container>
Spring Integration & JMS - one way communication
destination=fromSi"
<jee:jndi-lookup id="fromSi" jndi-name="jms/queue.fromSi"/>
- the responsibilities are handled inernally by JmsTemplate
- set pub-sub-domain to true if the destination is a topic
- specify either destination or destination-name
destination-name="myQueue"
pub-sub-domain="true">
<int-poller fixed-delay="3000" max-messages-per-poll="1"/>
</int-jms:inbound-channel-adapter>
- connection-factory should be set if its name is different than "connectionFactory"
<int-jms:message-driven-channel-adapter
id, channel, destination-name/>
Spring Integration & JMS - two ways communication
If you need bi-directional communication, use a gateway:
<int-jms:outbound-gateway
reply-channel="jmsReplies"
request-destniation-name="examples.queue"
request-pub-sub-domain="true"/>
- use request-pub-sub-domain if you use JMS topic
- notice that reply-destination-name is not required; the gateway will add a JMSReplyTo property to each message as property, and set it to a temporary queue which it creates for you
<int-jms:inbound-gateway id
request-channel=”fromJms”
request-destniation-name=”examples.queue”
default-reply-destination=”examples.replies”
default-reply-queue-name=”examples.replies”
default-reply-topic-name=”examples.replies”
request-pub-sub-domain=”true”
concurrent-consumers=”5”
max-concurrent-consumers=”25”
idle-task-execution-limit=”3”/>
- if JMS has set the JMSReplyTo, it takes precedence over the default-reply-destination
- reply-channel is not required; if it is not set, the gateway will create implicit channel and set it as the "replyChannel" header on the message
- connection-factory should be set if its name is different than "connectionFactory"
- the last three attrs are for concurrency settings
Spring Integration & JMS - “tunnelling”
Transactions with JMS Integration
The JMS Session object can be created by JMS connection with two exclusive flags: transacted(bool) and acknowledgeMode(AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE). DUPS_OK will also call aknowledge() automatically but lazily. I skip the details as it was in Spring Core already.
MessageListener has a property "acknowledge", which can take one of 4 values: auto (default), client, dups-ok and transacted. In XML you set it on <jms:listener-container ..>.
Transactions are in general managed by PlatformTransactionManager (JmsTransactionManager or DataSourceTransactionManager). If you want to include a database operation into same transaction, you do not neccessarily have to switch to the complex XA transactions. Often it is possible to just order your calls properly, e.g. in same method receive JMS message and call the database at the end. The tradeoff is that very rarely you may still end up with duplicates, but it's sometimes easier to compensate that than introduce global transactions.
4.3. Email
Sending Email
- supported payload types: String, byte array (for attachments), MailMessage, MimeMessage
- the from and to, and even email subject, are in message headers; we should fill them in using mail:header-enricher
- internally it uses JavaMail API, more specifically the Spring JavaMailSender which builds on top of it
- JavaMailSender introduced also MailMessage, next to the low level MimeMessage
- the java-mail-properties are for setting extra properties specific to JavaMail, like mail.store.protocol=imap (it will be important)
Receiving Email - polling
- POP3 is designed for downloading; therefore it will work only with polling; there is no session, so messages can be downloaded multiple times, that's why we have the option to delete them on read; should-mark-messages-as-read will be ignored here
- IMAP is designed for keeping emails on server; the mailbox maintains state, so duplicates are not a risk; you can use it also with event driven reception.
- SMTP is also supported
Receiving Email - event driven
- the differences are the name and auto-startup property
4.4. Filesystem Integration
- there's file: namespace in Spring context, use it
- remember that Java's File is not a psychical file; and that onlt the path of the file is immutable, the rest is read from filesystem, on request
Writing the file
- message payload can be of type String, File or byte[]
- delete-source-file - if payload type is File, it tells whether delete that file
- filename-generator should be clever enough; if it generates non-unique names it's the programmer's fault
- writing file is first performed to another temp file with special extension, and only at the end the file is renamed (moved)
- there's also analogical <file:outbound-gateway/>
Reading the file
- by overriding the filter, you override the default FileListFilters, which are by default set in such a way to match with the outbound-channel and not pick up temporary files
- by overriding scanner you also override the filter, as the scanner uses the filter (defines how to scan e.g. subdirectories)
- comparator says in which order to read files
- filename-pattern and regex should not be used together with the custom filter
- by default, in the default filter, Spring remembers all the read files not to read them twice
- the reader maintains an internal queue, which it populated on calling .receive() by listing the directory content; the queue is prioritized by using the comparator, if present
Transformers
- FileToByteArrayTransformer (<file:file-to-bytes-transformer/>)
- FileToStringTransformer (<file:file-to-string-transformer/>)
- they both have "delete-files" property for deleting file on consumption
4.5. Web Services Integration
POX and SOAP Web Services
HTTP Web Services (meaning REST)
- name - e.g. "/subscribe", so that it allows it to be used with DispatcherServlet
- view-name - is the Spring MVC view name
- you can also use inbound-message-adapter if you don't need two way communication, it uses MessageTemplate
- you can use outbound-channel-adapter if you don't need two way communication, it uses RestTemplate;
- in the case above it's better to override the error handler, as the default one treats only 4** and 5** responses as errors
4.6. XMPP and Twitter
- annotating a method with @Publisher("channelName"), and enabling <int:annotation-config/> will make the return value of that method be additionally published on that channel
- it's possible to set the global publishing channel, by setting:
<int:annotation-config default-publisher-channel="channelName"/> - an alternative to the above is defining an <int:gateway/> with service-interface set to our custom interface, and such bean will be created automagically and we can autowire it, and call our method.. cool.. and in this method we can even add @Header(XmppHeaders.TO) String username input parameter; but there are also xmpp header enrichers fir that;
- there's sth like xmppConnection bean
- the Twitter API needs authentication, and it is done by configuring the twitterTemplate bean
5. Advanced Topics
(means the ones he could not find a way to group under common name:P)
5.1. Monitoring
Message history
Wire tapCan be used to record e.g. endpoint statistics. It was described above how to use it, it’s an interceptor. Remember about transaction boundaries, e.g. if you use this after wire tap:
<channel …>
<dispatcher task-executor=”someExecutor”/>
</channel>
then bye bye transaction.
JMX
Control Bus
5.2. Scheduling and concurrency
Pollers
- fixed-delay and fixed-rate are different ways to specify the polling interval; rate happens every x miliseconds, and delay assures that there is a gap of x seconds between each two polls;
- the example above will use "rate" in the end
- max messages per pool - how many messages will be processed at one poll operation
- receive timeout - if nothing is there for that amount of time, give up; watch out, also: if you aleady started give up;
Task Executors
<dispatcher task-executor="someExecutor"/>
</channel>
Executor will introduce less lag, but is also more simple than queue (no prioritization, persistence, blah).
<task:executor id="someExecutor"
pool-size="2-5"
queue-capacity="100"/>
- pool-size - number of threads available, here 2 and if needed can go up to 5
- queue-capacity - how many tasks can be queued waiting for free thread
You can push your existing executor also into another places, like:
1. pub sub channel:
<publish-subscribe-channel id, task-executor="executor"/>
- this will cause that each subscriber will process the message in separate thread
- why would you do that: if polling takes too much time, the other scheduled polls may have to wait; with this solution they will be executed in other threads;
5.3. Spring Batch
<bean id="itemReader" class="....FlatFileItemReader" scope="step">
<property name="resource" value="file://#{jobParameters['filena']}"/>
<property name="lineMapper">
<bean class="....DefaultLineMapper">
<property name="lineTokenizer">
<bean class="....DelimitedLineTokenizer">
<property name="names" value="source,dest,amount,date"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="....MyMapper"/>
</property>
</bean>
</property>
</bean>
- delegating to the tasklet eliminates the boilerplate code connected to events, maintaining state, etc.
Launching the job
Spring Batch and Spring Integration
<si:service-activator
method="launch"
input-channel="requests"
output-channel="statuses">
<bean class="....JobLaunchingMessageHandler"/>
</si:service-activator>
- JobLauchingMessageHandler expects a message payload of type JobLaunchRequest, which you can provide for example like this:
Event driven integration
- StepListener
- ChunkListener
- JobExecutionListener
5.3. OSGi
Comments
Comments: