Camel 2.3 - Overhaul of Aggregator EIP
The Aggregator EIP needs an overhaul in Camel to remedy a few new features, improvements and fix an issue with using completion predicate and timeout.
Current issues
- Build on top of BatchProcessor which has to complex logic for concurrent locking and accepting new messages.
- BatchProcessor was originally designed to help with batch and transactions
- Either BatchProcessor needs to be reworked or a new base class implemented
- Completion predicate and batch timeout does not work together
- Batch timeout is actually a interval check timeout
- AggregationStrategy is invoked to late as its only invoked when the batch timeout triggers
- BatchSender is single threaded
New Design
- BatchSender should use ExecutorsService that ensures if a thread is dead a new thread will be started, eg the task is always running. DONE
- BatchSender is currently only a single thread. We should support concurrency, so you can have multiple output based on per correlation key. DONE
- BatchSender should have simpler logic for enqueing and locking if possible. DONE
- BatchProcessor should support persistent, so aggregated exchanges can be stored in a database using JPA. See how we do this with the Idempotent Consumer and/or Tracer. DONE
- If possible: AggregationStrategy should be invoked on the fly so you in the future can use it to determine if we are complete. DONE
- Batch timeout and completion predicate should work together so the completion is triggered by the first one. DONE
- Batch timeout should be per correlation key based, and not a single global timeout. DONE
- Add POJO based aggregation strategy so you can avoid using the low level Exchange APIs
- Add option what to do in case the correlation key cannot be evaluated, eg to ignore or throw an exception back to caller. DONE
- Add option to ignore to late exchanges if a correlation key has been closed. DONE
- Look all JIRA tickets at Apache what we have for the Aggregator DONE
- Resequencer is also using the
BatchProcessor
so we should ensure it works as well and maybe it can benefit some of the new features as well. - Add persistent AggregationRepository out of the box DONE
- Configuration of completion options should be more dynamic such as timeout and size DONE