"Spring Integration in Action" Mark Fisher, Jonas Partner, Marius Bogoevici and Iwein Fuld

Posted by Monik, 12 January 2015.
Programming Java Spring
Book summary

These are the notes I took while reading the Manning’s “Spring Integration in Action” book (Mark Fisher, Jonas Partner, Marius Bogoevici, and Iwein Fuld), before taking the Spring Integration exam.

I found the book to be a nice guide to some of the topics, before actually starting to read the Java documentation.

More notes about preparation for the exam itself are here.

Table of contents

1. Background

1.1. Enterprise Intergration Patterns - introduction

EIP are made of three base patterns:

Message

Message consists of:

Message types:

The message is a representation of the contract between the sender and the receiver.

Message Channel

Manages how (e.g. async or sync) and where the message is delivered, but does not interact with its content. It also decouples the sender from receiver. There are two types of channels:

Message Endpoints

They actually do sth with the message.

  1. Channel Adapter - connects an application to the messaging system. The message flow is uni-directional.
            |\      _____
    APP --> || --> |_____| --> ...
            |/
  2. Messaging Gateway - used for bi-directional messaging, e.g. where we want to di asynchronous stuff behind the curtain, and the caller is aware only of sync invocation.
               sync _____  async
        --> |\ --> |_____| --> ...
    APP     || sync _____  async
        <-- |/ <-- |_____| <-- ...

    (inbound gateway)
  3. Service Activator - invokes a service based on the message and sends back the result. It's like outbound gateway but with certain purpose and within same Spring Context.
             _____       ___
    ... --> |_____| --> | o | -->
             _____      |   |     SERVICE
    ... <-- |_____| <-- | o | <--
                        '---'
  4. Router - everyone knows, but two points to note: it does not change the message and it is aware of other channels
            ,-----,
            | / *-| --> ...
    ... --> |*  *-| --> ...
            |___*-| --> ...
  5. Splitter - splits the message
            ,---------,
            |       []|
    ... --> |[] --> []| --> ...
            |       []|
            '---------'
  6. Aggregator - waits for group of correlated messages and merges them when the group is complete (needs to know correlation id of each message and group size)
            ,---------,
            |[]       |
    ... --> |[] --> []| --> ...
            |[]       |
            '---------'

1.2 Event-driven architecture

SEDA - Stacked Event-Driven Architecture, i.e. when the events are not only exchanged but also buffered, e.g. in a queue; this can be good idea in case of fluctuating load on the system; it's up to you to choose between EDA and SEDA style;

(S)EDA helps reducing all kind of coupling, that's why Spring Integration builds around it. But what is coupling? There are two types of coupling, type-level coupling (the one connected with dependency injection, that everyone knows), but also system-level coupling, like e.g. temporal coupling (an external service which is not available freezes all the system).

1.3. Integration styles

Actually Spring focuses on number 4, but also supports the other ones.

  1. File based - no transactions, no message medatada, no atomicity; though sometimes can be used
  2. Shared database - atomic operations, data consistency, domain model specified => extra coupling
    - staging tables, for transferring data in steps
    - sharing data
  3. Remote procedure calls - tries to hide the fact that different services are running on different systems; has to serialize objects (XML, Java), network is not always reliable, there is nothing in the middle to take care of assuring delivery
  4. Message based integration - small data packets are exchanged frequently between endpoints via channels, in async manner

2. Messaging in Spring Integration

2.1. Message

2.2. Channels

The main interface goes like this:

<MessageChannel>
send (m)
send (m, timeout)

It's extended with interfaces of two types:

<SubscribableChannel>
subscribe (messageHandler)
unsubscribe (messageHandler)

<PollableChannel>
receive ()
receive (timeout)

The default configuration in sync message transmission and subscribable channels (don't confuse with pub-sub channel). You should adjust it to your needs. For example in the book they have a workflow which finishes with sending an email, which can be done async, as email does not have to be immediately sent. Making it async is just changing the channel type from direct to queue.
Bridge

With the email example we also want to have more consumers, so we change the channel which puts stuff in the queue to publish-subscribe-channel, and add a bridge right after it and before the queue. This is done so that the pub-sub channel can handover the messages to another thread in the bridge, which guarantees the pub-sub channel does not run out of threads from its thread pool. (I don’t get why the threads would get exhausted but ok)

Priority queue

It’s easy to change a channel into a priority queue by adding “priority-queue” XML tag inside. It has a reference to PriorityComparator.

Channel collabolators

They are MessageDispatchers and ChannelInterceptors, which says nothing now, but I promise it’s the last additional piece of information on component types.

Message Dispatcher

Is a thing which decides what happens once a message arrives at a channel which was a subscribable channel.

<MessageDispatcher>
bool addHandler (messageHandler)
bool removeHandler (messaheHandler)
bool dispatch (m)
It can either do the “competing consumers” thing or broadcasting. And correspondigly, we have UnicastingDispatcher and BroadcastingDispatcher. If we choose competing consumers we have to define competition rules by adding reference to a LoadBalancingStrategy.

<LoadBalancingStrategy>
Iterator<MessageHandler> getHandlerIterator(m, List<handlers>)

The available implementation choice is one: RoundRobinLoadBalancingStrategy. Yes, in round robin the message is not really important in the method, but if you wanna implement crazy rules, with this interface you can (though they say you typicaly shouldn’t).

They didn’t show XML definition. I think normally it’s done under the hood so you don’t touch this. It’s just to know how things work internally.

Channel Interceptor

Seems like extra piece of functionality. You can intercept or filter messages.

<ChannelInterceptor>
   m preSend (m, channel)
     postSend (m, channel, wasSent)
bool preReceive (channel)
   m postReceive (m, channel)

The “receive” methods are available only for pollable channels. In general, return null instead of message to break the processing. In “preReceive” return false to break the processing, this is called before the message is eve read, that’s why it does not deal with the message.

We add interceptors like this:
<channel ..>
  <interceptors>
    <beans:ref=”…>


There are Spring examples of interceptors.

2.3 Message Endpoints

Each message endpoint is an implementation of MessageHandler(.handleMessage(m)), wrapped in an adapter, which connects the endpoint to the channel. Depending of what kind of channel it is, the adapter will have to add appropriate capabilities:
Below is a nice table from the book, which gives the overview of all the options:

Endpoint                  Polling/      Inbound/   Direction  Internal/
                          Event driven  Outbound              External
inbound-channel-adapter   poll          in         uni        int
outbound-channel-adapter  both          out        uni        int
gateway                   event         in         bi         int
service-activator         both          out        bi         int
http:outbound-gateway     both          out        bi         ext
amqp:inb-channel-adapter  event         in         uni        ext

  • inbound/outbound - from perspective of Spring Integration application
  • internal/external - with respect to application context
Polling vs not polling
Remember that: 
For example a web collaboration tool (like Google Docs) should use async hand-off with polling, so that sending an event never blocks the typing (as seeing my own typing is more important than seeing the changes of others).
* gateway is always performing sync communication; if you need an async gateway, combine two async channel adapters;
* REPLY_CHANNEL header is used to know who to return the message to, in case of bidirectional communication;

Transaction boundaries
Always when there is an async hand-off (queue, task executor, aggregator), the transaction boundaries and security context continuity are broken. 

* Don't try to workaround by putting transaction context in message header, this limitation is there for a reason of promoting good design (but you can use this trick with security context).

Transactions usually start at the poller (if it was cinfugured with transaction manager), when it pools from:
The transaction lasts until the message is sent to a something that does not maintain same thread.

Under the hood
All these things you don't have to know as Spring will choose the right endpoint adapters configuration automatically. It has an AbstractConsumerEndpointParser, which parses te XML definition and puts factories (ConsumerEndpointFactoryBean) in place of endpoints, and those factories will know on runtime what kind of adapter is needed for an endpoint. We have PollingConsumer and EventDrivenConsumer adapters.
We also have a Lifecycle object that has upgraded version called SmartLifecycle, which automatically starts the endpoints by calling "subscribe" on the channel, or schedules a poller task. The Lifecycle is bound to Spring Application Context.

3. Building Messaging Systems

3.1. Separation of concerns

Is important. AOP did that, IoC did, so Spring Integration also strives to do it: sepration of business logic from integration concerns. Shortly what they point out:
Transformers
Service Activators
Interceptors that publish messages
Messaging Gateways
Chaining endpoints

3.2. Routing and filtering

Filter
<filter id="cancellationFilter"
        input-channel="input"
        ref="cancellationsFilterBean"
        method="accept"
        discard-channel="rejected"
        output-channel="validated"
        throw-exception-on-rejection="true"
        expression="payload?.reservationCode matches 'GOLD[A-Z0-9]{6}'"/>

cancellationsFilterBean->accept(m|payload)

Router
<router method="routePaymentSettlement"
        input-channel="payments"
        expression="payload.creditCardType"
        channel-resolver="creditCardChannelResolver">
    <beans:bean class="....PaymentSettlementRouter"/>
</router>

PaymentSettlementRouter
  ->String|String[]routePaymentSettlement(m|payload|@Header...)
Payload-type router
<payload-type-router input-channel="payments">
  <mapping type="....CreditCardPayment"
           channel="credit"/>
  <mapping type="....Invoice"
           channel="invoices"/>
  ...
</payload-type-router>
Header value router
<header-value-router input-channel="payments"
                     header-name="PAYMENT_INFO"/>

<header-enricher input-channel="payments"
                 output-channel="enriched-payments">
   <header name="PAYMENT_INFO"
           ref="enricher"
           method="determineProcessingDestination"/>
</header-enricher>
Recipient list router
<recipient-list-router input-channel="notifications">
  <recipient channel="sms"/>
  <recipient channel="email"/>
  <recipient channel="phone"/>
</recipient-list-router>

3.3 Splitting and aggregating

Aggregator and resequencer are stateful endpoints. And both aggregator and splitter modify the message (numbers of it).

<chain id="splitRecipesIntoIngrdients"
       input-channel="recipes"
       output-channel="ingredients">
  <header-enricher>
     <header name="recipe" expression="payload"/>
  </header-enricher>
  <splitter expression="payload.ingredients">
    <beans:bean class="....MySplitter"/>
  </splitter>

Splitter will automatically set group size, sequence number and correlation id on split messages, but you don't have to use them in your aggregation logic. In our example above we passed the whole message into the header as we will use it as the correlation key.

<aggregator id="kitchen"
            input-channel="products"
            output-channel="meals"
            ref="cook"
            method="prepareMeal"
            correlation-strategy="cook"
            correlation-strategy-method="getCorrelationKey"
            release-strategy="cook"
            release-strategy-method="canCookMeal"/>

Evey arriving message from same group the aggregator will internally append to a MessageGroupStore.
The aggregator internally also uses a CorrelatingMessageHandler. It holds references to Correlation- and ReleaseStrategy, as well as to MessageGroupProcessor, which is processing the released group.

The same CorrelatingMessageHandler is also used by the resequencer endpoint. Resequencer buffers incoming messages, waits and tries to assure that the messages are in right order, but since this can be tricky, it can do partial releases, which can be based e.g. on a timeout (releasePartialSequences flag). You can also customize the comparator used to determine the order.
Scatter-gather algorithm
Is about copying same message to many processors, each is specialised in sth else, and then aggregating the results.

4. Integrating Existing Systems

4.1. XML

Sometimes it is worth avoiding conversion from and to XML, if both input and output are in XML, and the processing is relatively simple. It's better to use XPath or XSLT directly in such case. Or even, like in the example in the book, if we have to devide an object into a number of small parts and send it as separated XMLs, it may be a better idea to first convert it into XML and split/transform only the XML file.
Marshalling and unmarshalling
It's conversion between XML and Object. In Spring it's called OXM (it's just an aggregator of existing solutions).

To marshall, annotate class with:
and annotate fields with:
If you have a non-standard java types inside your class, you have to add on the field level:
But Spring XOM has a ready marshaller for this, which you configure like this:

<bean id="myMarshaller" class="....Jaxb2Marshaller">
  <p:name="classesToBeBound" value="....ClassToMarshall"/>
</bean>
Spring Integration Support for XMLAfter you configured Spring XOM marshaller and unmarshaller, you can add an endpoint:

<si-xml:marshalling-transformer 
       input-channel="input"
       output-channel="xmlOut"
       marshaller="myMarshaller"
       result-transformer="resultToDocumentTransformer"/>

You need the result transformer, as the marshaller returns an object of type Result, which you can further convert to String or Document, or whatever. There are two ready result transformers in Spring Integration, ResultToDocumentTransformer and ResultToStringTransformer.

In case you wanted to so an XSLT transformation, use:

<si-xml:xslt-transformer
    input-channel="inputXml"
    output-channel="transformedXml"
    xsl-resource="classpath:/xsl/blablah.xsl"/>

You also may wanna split XML using XPath, you can do it usingXPath splitter:

<si-xml:xpath-splitter create-documents="true"
                       input-channel, output-channel/>
   <si-xml:xpath-expression 
           expression="parentNodeName"
           ns-prefix="hb"
           ns-uri="http://www.example.com/blablah"
           namespace-map="namespaceMap"
</si-xml:xpath>

<util:map id="namespaceMap">
   <entry key="hb" value="http://www.example.com/blablah"
   ...
</util:map>

  • if create-documents is set to true, each part will be wrapped in a separate XML document, otherwise it will be just raw content
You can also route messages based on XPath expression:

<si-xml:xpath-router id="myRouter"
                     input-channel="splitXml"
                     evaluate-as-string="true">
   <si-xml:xpath-expression expression="local-name(/*)"/>
   <si-xml:mapping value="carQuote" channel="carQuoChannel"/>
   <si-xml:mapping value="...
</si-xml:xpath-router>

Or validate them:

<si-xml:validating-filter id, input-channel, output-channel
   discard-channel="invalidReqs"
   schema-location="classpath:xsd/fligthQuote.xsd"/>

Spring Integration supports DOM, but not SAX parser, as streaming would introduce problems when combined with messaging. So big XML support is not there.

4.2. JMS

JMS terminology
JMS message has a lot in common with Spring Integration message. JMS body = Spring payload and JMS properties = Spring headers. Additionally, JMS destination = Spring channel, and point-to-point channel is called Queue, and pub-sub is called Topic.

JMS refresh

  1. JmsTemplate

    jmsTemplate = new JmsTemplate(connectionFactory);
    jmsTemplate.setDefaultDestination(new ActiveMQQueue("siisa.queue"));
    jmsTemplate.convertAndSend("Helloo");
    String res = jmsTemplate.receiveAndConvert();

    If transaction is active and was already opened by some process, this executes in same transaction. Transaction is bound to JMS session.

    The conversion is done by default by SimpleMessageConverter, which maps the Java type to the MessageType (TextMessage, MapMessage, BytesMessage, ObjectMessage, etc), but it can be customized. It can even be replaced by Spring XOM MarshallingMessageConverter, to have to conversion from and to XML.

    JmsTemplate is sync.
  2. MessageListener

    This is async. And provides transaction handling which would be tricky with async.

    You'd actually implement MessageListenerAdapter, as it's eliminating some boilerplate code. And even better, define it in XML config:

    <jms:listener-container>
       <jms:listener
             destination="myQueue"
             ref="aPojo"
             method="someMethod"/
    </jms:listener-container>
Spring Integration & JMS - one way communication
For sending to JMS use an outbound channel adapter:
<int-jms:outbound-channel-adapter channel="toJms"
          destination-name=""samples.queue"
          destination=fromSi"
          pub-sub-domain="true"/>

<jee:jndi-lookup id="fromSi" jndi-name="jms/queue.fromSi"/>
For receiving use an inbound channel adapter (sync polling):
<int-jms:inbound-channel-adapter 
          id="pollingJmsAdapter"
          channel="jmsMessage"
          destination-name="myQueue"
          pub-sub-domain="true">
   <int-poller fixed-delay="3000" max-messages-per-poll="1"/>
</int-jms:inbound-channel-adapter>
To do it asynchronously, use:

<int-jms:message-driven-channel-adapter
    id, channel, destination-name/>
Spring Integration & JMS - two ways communication

If you need bi-directional communication, use a gateway:

<int-jms:outbound-gateway 

        request-channel="toJms"
        reply-channel="jmsReplies"
        request-destniation-name="examples.queue"
        request-pub-sub-domain="true"/>

<int-jms:inbound-gateway id
        request-channel=”fromJms”
        request-destniation-name=”examples.queue”
        default-reply-destination=”examples.replies”
        default-reply-queue-name=”examples.replies”
        default-reply-topic-name=”examples.replies”
        request-pub-sub-domain=”true”
        concurrent-consumers=”5”
        max-concurrent-consumers=”25”
        idle-task-execution-limit=”3”/>

Spring Integration & JMS - “tunnelling”
If two Spring Integration apps want to communicate via JMS, we can reduce some complexity by passing whole Spring message as the JMS message body, instead of converting payload to body and headers to properties and back. In such case add to your both gateways: extract-request-payload="false". It's also a good idea to set the message-converter property, so that you don't use Java Serialisation but e.g. XML. You can use e.g. the Spring's MarshallingMessageConverter.

But in general, this approach is not recommended - :^)
Transactions with JMS Integration
No matter how you do it, you will either have lost messages or duplicates.

The JMS Session object can be created by JMS connection with two exclusive flags: transacted(bool) and acknowledgeMode(AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE). DUPS_OK will also call aknowledge() automatically but lazily. I skip the details as it was in Spring Core already.

MessageListener has a property "acknowledge", which can take one of 4 values: auto (default), client, dups-ok and transacted. In XML you set it on <jms:listener-container ..>.

Transactions are in general managed by PlatformTransactionManager (JmsTransactionManager or DataSourceTransactionManager).  If you want to include a database operation into same transaction, you do not neccessarily have to switch to the complex XA transactions. Often it is possible to just order your calls properly, e.g. in same method receive JMS message and call the database at the end. The tradeoff is that very rarely you may still end up with duplicates, but it's sometimes easier to compensate that than introduce global transactions.

4.3. Email

Sending Email
<mail:outbound-channel-adapter 
      channel="outboundMail"
      host="${host}"
      username="${username}"
      password="${password}"
      java-mail-properties="properties"/>
Receiving Email - polling
 <mail:inbound-channel-adapter
      id="mailAdapter"
      store-uri="imaps://..."
      java-mail-properties="properties"
      channel="emails"
      should-delete-messages="true"
      should-mark-messages-as-read="true">
   <poller max-messages-per-poll="1" fixed-rate="5000"/>
</mail:inbound-channel-adapter>

Receiving Email - event driven
It's not that beautiful as this works max for 30 minutes of client being idle because of a timeout.

 <mail:imap-idle-channel-adapter
      id="mailAdapter"
      store-uri="imaps://..."
      java-mail-properties="properties"
      channel="emails"
      should-delete-messages="false"
      should-mark-messages-as-read="true"
      auto-startup="true">
   <poller max-messages-per-poll="1" fixed-rate="5000"/>
</mail:imap-idle-channel-adapter>
You may find this handy:

<mail:mail-to-string-transformer/>

it will also copy the headers. You can also implement a custom AbstractMailMessageTransformer, but the headers will be copied as well (the abstract method returns MessageBuilder).

4.4. Filesystem Integration

The advantages are that they are fairly simple, disk space is big enough, and data is persisted. Disadvantages: slow, no ACID, and pain in the ass with locking. Still it's simple so should be used when possible (simple!=easy:P).
Spring Integration provides abstraction over all the stupid new BufferedReader(new FileReader(new ...)). This is already cool.
Writing the file
<file:outbound-channel-adapter
    channel="outgoingChanges"
    directory="#{config.diary.store}"
    auto-create-directory="true"
    filename-generator="nameGenerator"
    detele-source-file="false"/>
Reading the file
<file:inbound-channel-adapter
    channel="incomingChanges"
    directory="#{config.diary.store}"
    filter="myFilter"
    scanner="myScanner"
    comparator="myComparator"
    filename-pattern="..."
    filename-regex="...">
  <poller />
</file:inbound-channel-adapter>
For preventing reading unfinished files you can also use locking mechanism, but don't use it if you can. If you use Spring's moving file strategy you don't need locking.
Transformers

4.5. Web Services Integration

The good side is that the information exchange protocol and format is clear: HTTP and XML (or JSON). And no firewalls in between.

Well, ok maybe not everything is exactly clear, namely how to use HTTP to transfer XML: some argue on SOAP and WSDL, others do not give a shit about those technologies anymore and use REST. Spring does give a shit about both approaches.
POX and SOAP Web Services
POX means not fox, but Plain Old Xml. It is still not same as REST, as the main difference of those services to REST that they use only POST and/or GET method for all the operations. In other words, they don't put semantics to the HTTP method. Another characeristic is that they are "contract-first" Web Services, and are described by WSDL.

Spring WS already gives support (webServiceClient) for creating e.g. SOAP message, which would be otherwise crazy to do manually (envelope, body, this stuff).

Spring Integration builds on top of Spring WS, and lets create endpoints which behave like WS, or are able to consume WS. Minimal configuration for receiving requests is as follows:

web.xml:
<servlet>
  <servlet-name>si-ws-gateway</servlet-name>
  <servlet-class>....MessageDispatcherServlet</servlet-class>
   <init-param>
     <param-name>contextConfigLocation</param-name>
     <param-value>si-ws-gateway-config.xml</param-value>
  <load-on-startup>1</load-on-startup>
</servlet>

<servlet-mapping>
  <servlet-name>si-ws-gateway</servlet-name>
  <url-pattern>/quoteservice</url-pattern>
</servlet-mapping>

si-ws-gateway-config.xml:
<bean class="....UriEndpointMapping">
  <property name="defaultEndpoint" ref="ws-inbound-gateway"/>
</bean>

<int-ws:inbound-gateway id="ws-inbound-gateway"
                        request-channel="ws-requests"
                        extract-payload="false"/>

If you need to make requests, do it like this:

<int-ws:outbound-gateway
   uri="http://blblah"
   request-channel="requests"
   reply-channel="responses"/>
HTTP Web Services (meaning REST)
They are also the ones that Spring MVC exposes.

web.xml:
<servlet>
  <servlet-name>http-ws-gateway</servlet-name>
  <servlet-class>....HttpRequestHandlerServlet</servlet-class>
   <init-param>
     <param-name>contextConfigLocation</param-name>
     <param-value>http-ws-gateway.xml</param-value>
</servlet>

<servlet-mapping>
  <servlet-name>http-ws-gateway</servlet-name>
  <url-pattern>/httpquote</url-pattern>
</servlet-mapping>

http-ws-gateway.xml:

<int-http:inbound-gateway 
  id="http-inbound-gateway"
  request-channel="http-request"
  reply-channel="http-response"
  extract-reply-payload="false"
  view-name="about"
  reply-key, reply-timeout,message-converters, 
  supported-methods, convert-exceptions, 
  request-payload-type, error-code, errors-key,
  header-mapper, name/>
  • name - e.g. "/subscribe", so that it allows it to be used with DispatcherServlet
  • view-name - is the Spring MVC view name
  • you can also use inbound-message-adapter if you don't need two way communication, it uses MessageTemplate
If you need to make requests, do it like this:

<int-http:outbound-gateway
    url="http://blblah"
    request-channel="requests"
    http-method="GET"
    expected-response-type="java.lang.String">
      <int-http:uri-variable 
        name="location" 
        expression="payload"/>
</int:http:outbound-gateway>
  • you can use outbound-channel-adapter if you don't need two way communication, it uses RestTemplate;
  • in the case above it's better to override the error handler, as the default one treats only 4** and 5** responses as errors

4.6. XMPP and Twitter

XMPP (Extensible Messaging and Presence Protocol) is protocol especially for chatting. It can send messages (both directions same time) and presence notifications. Orginal server implemeentation is called Jabber.

And Twitter has an exposed API.

So Spring Integration is providing endpoints for these as well. The namespaces are int-xmpp and int-twitter, respectively. I will not go into details. But some things I learned by the way:

5. Advanced Topics

(means the ones he could not find a way to group under common name:P)

5.1. Monitoring

Message history
If you add to your spring config this:

<int:message-history/>

then every message will have a header called "history" with entries for each endpoint it visited, each entry contains endpoint name, type and timestamp. Neat!

Wire tapCan be used to record e.g. endpoint statistics. It was described above how to use it, it’s an interceptor. Remember about transaction boundaries, e.g. if you use this after wire tap:
<channel …>
   <dispatcher task-executor=”someExecutor”/>
</channel>

then bye bye transaction.

JMX
I skip now, many adapters
Control Bus
I skip now, it's for sending a command message which managed endpoints

5.2. Scheduling and concurrency

Pollers

Used for receiving messages transmitted asynchronously, as well as publishing messages, in time intervals. You can define a general poller definition and then only override what you need, e.g.:

<poller id="defaultPoller" fixed-delay="500" default="true"/>

...

<file:inbound-channel-adapter id, ...>
   <poller fixed-rate="10000"/>
</file:inbound-channel-adapter>
You can also define the poller with cron expression:

<poller cron="0 0 0 * * *"/>

which stands for:

sec, min, hour, dayofthe-month, month, dayofthe-week (, year)

other options:

<poller fixed-rate="10000"
        max-messages-per-pool="2" 
        receive-timeout="2000"/>
You can use poller for publishing e.g. by including it inside an inbound-channel-adapter. No one calls the adapter, but poller does.
Task Executors
By deafult everything is done in single thread. To change that, either change a channel to a queue channel, or add a task executor:
<channel ...>
   <dispatcher task-executor="someExecutor"/>
</channel>

Executor will introduce less lag, but is also more simple than queue (no prioritization, persistence, blah).

<task:executor id="someExecutor"
               pool-size="2-5"
               queue-capacity="100"/>


You can push your existing executor also into another places, like:

1. pub sub channel:

<publish-subscribe-channel id, task-executor="executor"/>
2. Poller

<poller id, fixed-delay, task-executor="executor"/>
  • why would you do that: if polling takes too much time, the other scheduled polls may have to wait; with this solution they will be executed in other threads;
##### Task Scheduler
public interface TaskScheduler {
  ScheduledFuture schedule(Runnable task, Trigger trigger);
  ScheduledFuture schedule(Runnable task, Date startTime);
  ScheduledFuture schedule(Runnable task, Date startTime, long period);
  ScheduledFuture schedule(Runnable task, long period);
  ScheduledFuture schedule(Runnable task, Date startTime, long delay);
  ScheduledFuture schedule(Runnable task, long delay);
}

public interface Trigger {
  Date nextExecutionTime(TriggerContext context);
}

public interface TriggerContext {
  Date lastScheduledExecutionTime();
  Date lastActualExecutionTime();
  Date lastCompletionTime();
}

(no XML example..)

5.3. Spring Batch

JobJob - requires no manual intervention, but status should be able to be seen, also restart should be possible in the place when it finished, and stream processing is also important (memory limitations). Spring Batch is the solution.

JobParameters - identify the job instance, same instance cannot be run twice thats why add there some counter.

Chunks are important concept, we read and write data in chunks of optimal size, and a transaction spans from beginning to the end of a chunk.

<batch:job id="bla">
  <batch:step id="loadPayments">
    <batch:tasklet>
      <batch:chunk 
            reader="itemReader" 
            writer="itemWriter" 
            commit-interval="5"/>
    </batch:tasklet>
  </batch:step>
</batch:job>

<bean id="itemReader" class="....FlatFileItemReader" scope="step">
  <property name="resource" value="file://#{jobParameters['filena']}"/>
  <property name="lineMapper">

    <bean class="....DefaultLineMapper">

      <property name="lineTokenizer">
        <bean class="....DelimitedLineTokenizer">
          <property name="names" value="source,dest,amount,date"/>
        </bean>
      </property>

      <property name="fieldSetMapper">
        <bean class="....MyMapper"/>
      </property>

    </bean>

  </property>
</bean>

public class MyMapper implements FieldSetMapper<Payment>{
  @Override
  public Payment mapFieldSet(FieldSet fieldSet) 
       throws BindException {
     ... = fieldSet.readString("source");
     ... = fieldSet.readBigDecimal("amount");
     ... = fieldSet.readDate("date");
  }
}
Launching the job
<bean id="jobLancher" class="....SimpleJobLauncher">
   <property name="jobRepository" ref="jobRepository"/>
</bean>

<batch:job-repository 
        data-source="dataSource" 
        id="jobRepository" 
        transaction-manager="transactionManager" 
        table-prefix="BATCH_"/>

To actually launch it you can do:

JobParametersBuilder jpb = new JobParametersBuilder();
jpb.addString('filena', 'payment.xml');
JobExecution execution = jobLauncher.run(job, jpb.toJobParameters());

the launching is sync or async, so in case of async, you can immediately check jobExecution for the status of the job, as this is persisted in db you provided in job repositpry definition.

There is also the Spring Batch Admin web app where you can view job statuses and restart failed ones.
Spring Batch and Spring Integration
Spring Integration provides support for everything you can see in Spring Batch Admin.

<si:service-activator 
         method="launch" 
         input-channel="requests" 
         output-channel="statuses">
   <bean class="....JobLaunchingMessageHandler"/>
</si:service-activator>
...
@Transformer
public JobLaunchRequest toRequest(Message<File> m){
   JobParametersBuilder jpb = new JobParametersBuilder();
   jpb.addString(fileParameterName, message.getPayload().getAbsolutePath());
   return new JobLauchRequest(job, jpb.toJobParameters())
}
Event driven integration
Spring Batch provides three listeners:
You use them as gateway and register the gateway at the job:

<batch:job id>
   ...
   <batch:listeners>
      <batch:listener ref="notificationListener"/>
   </batcg:listeners>
</batch:job>

<si:gateway id="notificationListener"
            service-interface="....JobExecutionListener"
            default-request-channel="jobExecutions"/>

5.3. OSGi

I skip also, not required for the exam.

Comments


Comments: