A UIMA Collection Processing Engine (CPE) is a combination of UIMA components assembled to analyze a collection of artifacts. A CPE is an instantiation of the UIMA Collection Processing Architecture, which defines the collection processing components, interfaces, and APIs. A CPE is executed by a UIMA framework component called the Collection Processing Manager (CPM), which provides a number of services for deploying CPEs, running CPEs, and handling errors.
A CPE can be assembled programmatically within a Java application, or it can be assembled declaratively via a CPE configuration specification, called a CPE Descriptor. This chapter describes the format of the CPE Descriptor.
Details about the CPE, including its function, sub-components, APIs, and related tools, can be found in Chapter 5, Collection Processing Engine Developer's Guide. Here we briefly summarize the CPE to define terms and provide context for the later sections that describe the CPE Descriptor.
An illustration of the CPE runtime is shown in Figure nn. Some of the CPE components, such as the queues and processing pipelines, are internal to the CPE, but their behavior and deployment may be configured using the CPE Descriptor. Other CPE components, such as the Collection Reader and CAS Processors, are defined and configured externally from the CPE and then plugged in to the CPE to create the overall engine. The parts of a CPE are:
Collection Reader –understands the native data collection format and iterates over the collection producing subjects of analysis
CAS Initializer –initializes a CAS with a subject of analysis
Artifact Producer – asynchronously pulls CASes from the Collection Reader, creates batches of CASes and puts them into the work queue
Work Queue – shared queue containing batches of CASes queued by the Artifact Producer for analysis by Analysis Engines
B1-Bn – individual batches containing 1 or more CASes
AE1-AEn – Analysis Engines arranged by a CPE descriptor
Processing Pipelines – each pipeline runs in a separate thread and contains a replicated set of the Analysis Engines running in the defined sequence
Output Queue – holds batches of CASes with analysis results intended for CAS Consumers
CAS Consumers –perform collection level analysis over the CASes and extract analysis results, e.g., creating indexes or databases
CPE Descriptors are XML files. This chapter uses an informal notation to specify the syntax of CPE Descriptors.
The notation used in this chapter is:
<collectionReader>
...
</collectionReader>
<parameter>[String]</parameter>
<parameter>[String]</parameter>
...
indicates that there may be arbitrarily many parameter elements in this context.
<casProcessor ...>
[String]
) indicate the type of value that
may be used at that location.true|false
, indicates alternatives. This can be applied to literal values,
bracketed type names, and elements.
Which elements are optional and which are required is specified in prose, not in the syntax definition.
A CPE Descriptor uses the following notation to reference descriptors for other components that are incorporated into the defined CPE:
<descriptor> <include href="[File]"/> </descriptor>
The [File]
attribute is a filename for the descriptor of the incorporated component. A fully qualified filename may be provided,
or the filename may relative to a directory specified using the CPM_HOME
variable, e.g.,
<descriptor> <include href="${CPM_HOME}/desc_dir/descriptor.xml"/> </descriptor>
In this case, the value for the CPM_HOME
variable must be provided to the CPE by specifying it on the Java command line,
e.g.,
java -DCPM_HOME="C:/Program Files/apache-uima/cpm" ...
Note that this mechanism for referencing other component descriptor files is different from and in no way related to either of the two import mechanisms described in Chapter 23
A CPE Descriptor consists of information describing the following four main elements.
The CPE Descriptor has the following high level skeleton:
<?xml version="1.0" encoding="UTF-8"?> <cpeDescription> <collectionReader> ... </collectionReader> <casProcessors> ... </casProcessors> <cpeConfig> ... </cpeConfig> <resourceManagerConfiguration> ... </resourceManagerConfiguration> </cpeDescription>
Details of each of the four main elements are described in the sections that follow.
The <collectionReader>
section identifies the Collection Reader and optional CAS Initializer that are
to be used in the CPE. The Collection
Reader is responsible for retrieval of artifacts from a collection outside of
the CPE, and the optional CAS Initializer is responsible for initializing the
CAS with the artifact.
A Collection Reader may initialize the CAS itself, in which case it does not require a CAS Initializer. This should be clearly specified in the documentation for the Collection Reader. Specifying a CAS Initializer for a Collection Reader that does not make use of a CAS Initializer will not cause an error, but the specified CAS Initializer will not be used.
The complete structure of the <collectionReader>
section is:
<collectionReader> <collectionIterator> <descriptor> <include href="[File]"/> </descriptor> <configurationParameterSettings>...</configurationParameterSettings> <sofaNameMappings>...</sofaNameMappings> </collectionIterator> <casInitializer> <descriptor> <include href="[File]"/> </descriptor> <configurationParameterSettings>...</configurationParameterSettings> <sofaNameMappings>...</sofaNameMappings> </casInitializer> </collectionReader>
The <collectionIterator>
identifies the descriptor for the Collection Reader, and the <casInitializer>
identifies the descriptor for the
CAS Initializer. The format and details
of the Collection Reader and CAS Initializer descriptors are described in Chapter 23 . The <configurationParameterSettings>
and the <sofaNameMappings>
elements are
described in the next section.
The CPM will abort if the Collection Reader throws a large
number of consecutive exceptions (default = 100). This default can by changed by using the Java
initialization parameter -DMaxCRErrorThreshold xxx.
The <casProcessors>
section identifies the components that perform the analysis on the input data,
including CAS analysis (Analysis Engines) and analysis results extraction (CAS
Consumers). The CAS Consumers may also
perform collection level analysis, where the analysis is performed (or
aggregated) over multiple CASes. The
basic structure of the CAS Processors section is:
<casProcessors dropCasOnException="true|false" casPoolSize="[Number]" processingUnitThreadCount="[Number]"> <casProcessor ...> ... </casProcessor> <casProcessor ...> ... </casProcessor> ... </casProcessors>
The <casProcessors>
section has two mandatory attributes and one optional attribute that configure
the characteristics of the CAS Processor flow in the CPE. The first mandatory attribute is a casPoolSize, which defines the fixed number of CAS
instances that the CPM will create and use during processing. All CAS instances
are maintained in a CAS Pool with a check-in and check-out access. Each CAS is checked-out
from the CAS Pool by the Collection Reader and initialized with an initial
subject of analysis. The CAS is checked-in into the CAS Pool when it is completely
processed, at the end of the processing chain. A larger CAS Pool size will
result in more memory being used by the CPM. CAS objects can be large and care
should be taken to determine the optimum size of the CAS Pool, weighing memory
tradeoffs with performance.
The second mandatory <casProcessors>
attribute is processingUnitThreadCount
,
which specifies the number of replicated Processing Pipelines. Each Processing Pipeline runs in its own
thread. The CPM takes CASes from the
work queue and submits each CAS to one of the Processing Pipelines for
analysis. A Processing Pipeline contains
one or more Analysis Engines invoked in a given sequence. If more than one Processing Pipeline is
specified, the CPM replicates instances of each Analysis Engine defined in the
CPE descriptor. Each Processing Pipeline
thread runs independently, consuming CASes from work queue and depositing CASes
with analysis results onto the output queue. On multiprocessor machines, multiple Processing Pipelines can run in
parallel, improving overall throughput of the CPM.
The final, optional, <casProcessors> attribute is dropCasOnException
. It defines a policy that determines
what happens with the CAS when an exception happens during processing. If the
value of this attribute is set to true and an exception happens, the CPM will
notify all registered listeners of the exception (see Using
Listeners ), clear the CAS and check the CAS back into the CAS
Pool so that it can be re-used. The presumption is that an exception may leave
the CAS in an inconsistent state and therefore that CAS should not be allowed
to move through the processing chain. When this attribute is omitted the CPM’s
default is the same as specifying dropCasOnException="false"
.
The CAS Processors that make up the Processing Pipeline
and the CAS Consumer pipeline are specified with the <casProcessor>
entity, which appears within the <casProcessors>
entity. It may appear multiple times,
once for each CAS Processor specified for this CPE.
The order of the <casProcessor>
entities with the <casProcessors>
section
specifies the order in which the CAS Processors will run. Although CAS Consumers are usually put at the
end of the pipeline, they need not be. Also, Aggregate Analysis Engines may include CAS Consumers.
The overall format of the <casProcessor>
entity is:
<casProcessor deployment="local|remote|integrated" name="[String]" > <descriptor> <include href=[File]/> </descriptor> <configurationParameterSettings>...</configurationParameterSettings> <sofaNameMappings>...</sofaNameMappings> <runInSeparateProcess>...</runInSeparateProcess> <deploymentParameters>...</deploymentParameters> <filter/> <errorHandling>...</errorHandling> <checkpoint batch="Number"/> </casProcessor>
The <casProcessor>
element
has two mandatory attributes, deployment
and name.
The mandatory name
attribute specifies a unique
string identifying the CAS Processor.
The mandatory deployment
attribute specifies the CAS
Processor deployment mode. Currently,
three deployment options are supported:
integrated
– indicates integrated deployment of the CAS Processor. The CPM deploys and collocates the CAS
Processor in the same process space as the CPM. This type of deployment is recommended to increase the performance of
the CPE. However, it is NOT recommended
to deploy annotators containing JNI this way. Such CAS Processors may cause a fatal exception and force the JVM to
exit without cleanup (bringing down the CPM). Any UIMA SDK compliant pure Java CAS Processors may be safely deployed
this way.remote
– indicates non-managed deployment of the CAS Processor. The CAS Processor descriptor referenced in
the <descriptor>
element must be a
Vinci Service Client Descriptor, which identifies a remotely deployed
CAS Processor service (see Section 6.6, Working with Analysis Engine and CAS Consumer
Services). The
CPM assumes that the CAS Processor is already running as a remote service and
will connect to it using the URI provided in the client service
descriptor. The lifecycle of a remotely
deployed CAS Processor is not managed by the CPM, so appropriate infrastructure
should be in place to start/restart such CAS Processors when necessary. This deployment provides fault isolation and
is implementation (i.e., programming language) neutral. local
–
indicates managed deployment of the CAS Processor. The CAS Processor descriptor referenced in the
<descriptor>
element must be a Vinci Service
Deployment Descriptor, which configures a CAS Processor for deployment as a
Vinci service (see Section 6.6, Working with Analysis Engine and CAS Consumer
Services). The
CPM deploys the CAS Processor in a separate process and manages the life cycle
(start/stop) of the CAS Processor. Communication between the CPM and the CAS Processor is done with
Vinci. When the CPM completes
processing, the process containing the CAS Processor is terminated. This deployment mode insulates the CPM from
the CAS Processor, creating a more robust deployment at the cost of a small
communication overhead. On
multiprocessor machines, the separate processes may run concurrently and
improve overall throughput.
A number of elements may appear within the <casProcessor>
element.
The <descriptor>
element
is mandatory. It identifies the
descriptor for the referenced CAS Processor using the syntax described in
Section 23.2
above.
remote
CAS Processors, the referenced descriptor must be a Vinci Service Client
Descriptor, which identifies a remotely deployed CAS Processor
service. See Section 6.6, Working with Analysis Engine and CAS Consumer Services for more information on creating these descriptors and deploying services.
This element provides a way to override the contained Analysis Engine's parameters settings. Any entry specified here must already be defined; values specified replace the corresponding values for each parameter. For Cas Processors, this mechanism is only available when they are deployed in "integrated" mode. For Collection Readers and Initializers, it always is available.
The content of this element is identical to the component descriptor for specifying parameters (in the case where no parameter groups are specified), except that the names for the primitive types have a "_p" suffixed to them: string_p, integer_p, float_p. Here is an example:
<configurationParameterSettings>
<nameValuePair>
<name>CivilianTitles</name>
<value>
<array>
<string_p>Mr.</string_p>
<string_p>Ms.</string_p>
<string_p>Mrs.</string_p>
<string_p>Dr.</string_p>
</array>
</value>
</nameValuePair>
...
</configurationParameterSettings>
This optional element provides a mapping from defined Sofa names in the component, or the default Sofa name (if the component does not declare any Sofa names). The form of this element is:
<sofaNameMappings> <sofaNameMapping cpeSofaName="a_CPE_name" componentSofaName="a_component_Name"/> ... </sofaNameMappings>
There can be any number of
<sofaNameMapping>
elements contained in the <sofaNameMappings>
element. The componentSofaName
attribute is optional; leave it out to
specify a mapping for the _InitialView - that is, for Single-View components.
The <runInSeparateProcess>
element is mandatory for local
CAS Processors, but
should not appear for remote
or integrated
CAS Processors. It enables the CPM to
create external processes using the provided runtime environment. Applications launched this way communicate
with the CPM using the Vinci protocol and connectivity is enabled by a local instance
of the VNS that the CPM manages. Since
communication is based on Vinci, the application need not be implemented in Java.
Any language for which Vinci provides support may be used to create an
application, and the CPM will seamlessly communicate with it. The overall structure of this element is:
<runInSeparateProcess> <exec dir="[String]" executable="[String]"> <env key="[String]" value ="[String]"/> ... <arg>[String]</arg> ... </exec> </runInSeparateProcess>
The <exec>
element
provides information about how to execute the referenced CAS Processor. Two attributes are defined for the <exec>
element. The dir
attribute is currently not used – it
is reserved for future functionality. The executable
attribute specifies the actual
Vinci service executable that will be run by the CPM, e.g., java
, a batch script, an application (.exe), etc. The executable must be specified with a fully
qualified path, or be found in the PATH
of the CPM.
The <exec>
element has
two elements within it that define parameters used to construct the command
line for executing the CAS Processor. These elements must be listed in the order in which they should be
defined for the CAS Processor.
The optional <env>
element is used to set an environment variable. The variable key
will be set to value
. For example,
<env key="CLASSPATH" value ="C:Javalib"/>
will set the environment variable CLASSPATH
to the value C:\Java\lib
. The <env>
element may be repeated to set multiple environment variables. All of the key/value pairs will be added to
the environment by the CPM prior to launching the executable.
Note: The CPM actually adds ALL system environment variables when it launches the program. It queries the Operating System for its current system variables and one by one adds them to the program's process configuration.
The <arg>
element is used
to specify arbitrary string arguments that will appear on the command line when
the CPM runs the command specified in the executable
attribute.
For example, the following would be used to invoke the UIMA Java implementation of the Vinci service wrapper on a Java CAS Processor:
<runInSeparateProcess> <exec executable="java"> <arg>-DVNS_HOST=localhost</arg> <arg>-DVNS_PORT=9099</arg> <arg>com.ibm.uima.reference_impl.analysis_engine.service. vinci.VinciAnalysisEngineService_impl</arg> <arg>C:uimadescdeployCasProcessor.xml</arg> </exec> <runInSeparateProcess>
This will cause the CPM to run the following command line when starting the CAS Processor:
java -DVNS_HOST=localhost -DVNS_PORT=9099 com.ibm.uima.reference_impl.analysis_engine.service.vinci.VinciAnalysisEngineService_impl C:uimadescdeployCasProcessor.xml
The first argument specifies that the Vinci Naming Service
is running on the localhost
. The second argument specifies that the Vinci
Naming Service port number is 9099
. The third argument identifies the UIMA
implementation of the Vinci service wrapper. This class contains the main
method that will
execute. That main method in turn takes
a single argument – the filename for the CAS Processor service deployment
descriptor. Thus the last argument
identifies the Vinci service deployment descriptor file for the CAS
Processor. Since this is the same
descriptor file specified earlier in the <descriptor>
element, the string ${desc
riptor}
can be used to refer to the descriptor, e.g.:
<arg>${descriptor}</arg>
The CPM will expand this out to the service deployment
descriptor file referenced in the <descriptor>
element.
The <deploymentParameters>
element defines a number of deployment parameters that control how the CPM will
interact with the CAS Processor. This
element has the following overall form:
<deploymentParameters> <parameter name="[String]" value="..." type="string|integer" /> ... </deploymentParameters>
The name
attribute
identifies the parameter, the value
attribute specifies the value that will be assigned
to the parameter, and the type
attribute
indicates the type of the parameter, either string
or integer
. The available parameters include:
vnsHost
– (Deprecated) string parameter specifying the VNS host, e.g., localhost
for local CAS Processors, host name or IP
address of VNS host for remote CAS Processors. This parameter is deprecated; use the parameter specification instead
inside the Vinci Service Client Descriptor, if needed. It is ignored for integrated and local deployments. If present, for remote deployments, it
specifies the VNS Host to use, unless that is specified in the Vinci Service
Client Descriptor.vnsPort
–
(Deprecated) integer parameter specifying the VNS port number. This parameter is deprecated; use the
parameter specification instead inside the Vinci Service Client Descriptor,
if needed. It is ignored for integrated
and local deployments. If present, for
remote deployments, it specifies the VNS
Port number to use, unless that is
specified in the Vinci Service Client Descriptor.<maxConsecutiveRestarts
value="10" action="kill-pipeline" waitTimeBetweenRetries="50" />
xml
element. The "value" attribute
is the number of reconnection tries; the "action" says what to do if
the retries exceed the limit. The
"kill-pipeline" action stops the pipeline that was associated with
the failing service (other pipelines will continue to work). The CAS in process within a killed pipeline
will be dropped. These events are
communicated to the application using the normal event listener mechanism. The waitTimeBetweenRetries
says how many milliseconds to wait
inbetween attempts to reconnect.
For example, the following parameters might be used with a CAS Processor deployed in local mode:
<deploymentParameters> <parameter name="service-access" value="exclusive" type="string"/> </deploymentParameters>
The <filter> element is a required element but currently should be left empty. This element is reserved for future use.
The mandatory <er
rorHandling>
element defines error and restart
policies for the CAS Processor. Each CAS
Processor may define different actions in the event of errors and
restarts. The CPM monitors and logs
errant behaviors and attempts to recover the component based on the policies
specified in this element.
There are two kinds of faults.
The <errorHandling> has specifications for each of these kinds of faults. The format of this element is:
<errorHandling> <maxConsecutiveRestarts action="continue|disable|terminate" value="[Number]"/> <errorRateThreshold action="continue|disable|terminate" value="[Rate]"/> <timeout max="[Number]"/> </errorHandling>
The mandatory <maxConsecutiveRestarts>
element
applies only to faults of the first kind, and therefore, only applies to
non-integrated deployments. If such a
fault occurs, a retry is attempted, up to value="[Number]"
of times. This retry resets the connection
(if one was made) and attempts to reconnect and perhaps re-launch (see below
for details). The original CAS (not a
partially updated one) is sent to the CAS Processor as part of the retry, once
the deployed component has been successfully restarted or reconnected to.
The action
attribute specifies
the action to take when the threshold specified by the value="[Number]"
is exceeded. The possible actions are:
continue
– skip any further processing for this CAS by this CAS Processor, and pass
the CAS to the next CAS Processor in the Pipeline. dropCasOnException=
"true
", the CPM will NOT
pass the CAS to the next CAS Processor in the chain. Instead, the CPM will
abort processing of this CAS, release the CAS back to the CAS Pool and will
process the next CAS in the queue.disable
– the current CAS is handled just as in the continue
case, but in addition, the CAS Processor is marked so that its process()
method will not be called again (i.e., it will be "skipped" for
future CASes)terminate
– the CPM will terminate all processing and exit
The definition of an error for the <maxConsecutiveRestarts>
element differs slightly for each of the three CAS Processor deployment modes:
local |
Local CAS Processors experience two general error types:
A launch error is defined by a failure of the process to successfully register with the local VNS within a default time window. The current timeout is 15 minutes. Multiple local CAS Processors are launched sequentially, with a subsequent processor launched immediately after its previous processor successfully registers with the VNS. A processing error is detected if a connection to the CAS Processor is lost or if the processing time exceeds a specified timeout value. For local CAS Processors, the <maxConsecutiveRestarts> element specifies the number of consecutive attempts made to launch the CAS Processor at CPM startup or after the CPM has lost a connection to the CAS Processor. |
remote |
For remote CAS Processors, the <maxConsecutiveRestarts> element applies to errors from sending Vinci commands. An error is detected if a connection to the CAS Processor is lost, or if the processing time exceeds the timeout value specified in the <timeout> element (see below). |
integrated |
Although mandatory, the <maxConsecutiveRestarts> element is NOT used for integrated CAS Processors, because Integrated CAS Processors are not re-instantiated/restarted on exceptions. This setting is ignored by the CPM for Integrated CAS Processors but it is required. Future version of the CPM will make this element mandatory for remote and local CAS Processors only. |
The mandatory <errorRateThreshold>
element is used for all faults – both those above, and exceptions thrown by the
CAS Processor itself. It specifies the
number of retries for exceptions thrown by the CAS Processor itself, a maximum
error rate, and the corresponding action to take when this rate is
exceeded. The value
attribute specifies the error rate in terms of errors per sample size in the
form “N/M
", where N
is the number of errors and M
is the sample size,
defined in terms of the number of documents.
The first number is used also to indicate the maximum
number of retries. If this number is
less than the <maxConsecutiveRestarts value="[Number]">,
it will override, reducing the number of "restarts"
attempted. A retry is done only if the dropCasOnException
is false. If it is set to true, no retry occurs, but
the error is counted.
When the number of counted errors exceeds the sample size,
an action specified by the action
attribute is
taken. The possible actions and their
meaning are the same as described above for the <maxConsecutiveRestarts>
element:
continue
disable
terminate
The dropCasOnException=
"true
" attribute of the <casProcessors>
element modifies the action taken
for continue and disable, in the same manner as above. For example:
<errorRateThreshold value="3/1000" action="disable" />
specifies that each error thrown by the CAS Processor
itself will be retried up to 3 times (if dropCasOnException
is false) and the CAS Processor will be disabled if the error rate exceeds 3
errors in 1000 documents.
If a document causes an error and the error rate threshold
for the CAS Processor is not exceeded, the CPM increments the CAS Processor’s
error count and retries processing that document (if dropCasOnException
is false). The retry means that the CPM
calls the CAS Processor’s process() method again, passing in as an argument the
same CAS that previously caused an exception.
Errors are accumulated across documents. For example, assume the error rate threshold
is 3/1000
. The same document may fail three times before
finally succeeding on the fourth try, but the error count is now 3. If one more error occurs within the current
sample of 1000 documents, the error rate threshold will be exceeded and the
specified action will be taken. If no
more errors occur within the current sample, the error counter is reset to 0
for the next sample of 1000 documents.
The <timeout>
element is
a mandatory element. Although mandatory for all CAS Processors, this element is
only relevant for local and remote CAS Processors. For integrated CAS
Processors, this element is ignored. In the current CPM implementation the
integrated CAS Processor process() method is not subject to timeouts.
The max
attribute specifies the
maximum amount of time in milliseconds the CPM will wait for a process() method
to complete When exceeded, the CPM will
generate an exception and will treat this as an error subject to the threshold
defined in the <errorRateThreshold>
element
above, including doing retries.
The action taken depends on whether the CAS Processor is local (managed) or remote (unmanaged). Local CAS Processors (which are services) are killed and restarted, and a new connection to them is established. For remote CAS Processors, the connection to them is dropped, and a new connection is reestablished (which may actually connect to a different instance of the remote services, if it has multiple instances).
The <checkpoint>
element
is an optional element used to improve the performance of CAS Consumers. It has a single attribute, batch
, which specifies the number of CASes in a batch,
e.g.:
<checkpoint batch="1000">
sets the batch size to 1000 CASes. The batch size is the interval used to mark a
point in processing requiring special handling. The CAS Processor's batchProcessComplete()
method will be called by the CPM
when this mark is reached so that the processor can take appropriate
action. This mark could be used as a
mechanism to buffer up results in CAS Consumers and perform time-consuming
operations, such as check-pointing, that should not be done on a per-document
basis.
The parameters for configuring the overall CPE and CPM are
specified in the <cpeConfig>
section. The overall format of this section is:
<cpeConfig>
<startAt>[NumberOrID]</startAt>
<numToProcess>[Number]</numToProcess>
<outputQueue dequeueTimeout="[Number]" queueClass="[ClassName]" />
<checkpoint file="[File]" time="[Number]" batch="[Number]"/>
<timerImpl>[ClassName]</timerImpl>
<deployAs>vinciService|interactive|immediate|single-threaded </deployAs>
</cpeConfig>
This section of the CPE descriptor allows for defining the starting entity, the number of entities to process, a checkpoint file and frequency, a pluggable timer, an optional output queue implementation, and finally a mode of operation. The mode of operation determines how the CPM interacts with users and other systems.
The <startAt>
element is an optional argument. It defines the starting entity in the collection at which the CPM should
start processing.
The implementation in the CPM passes the this argument to
the Collection Reader as the value of the parameter "startNumber
". The CPM does not do anything else with this
parameter; in particular, the CPM has no ability to skip to a specific document
- that function, if available, is only provided by a particular Collection
Reader implementation.
If the <startAt>
element
is used, the Collection Reader descriptor must define a single-valued
configuration parameter with the name startNumber
. It can declare this value to be of any type;
the value passed in this XML element must be convertible to that type.
A typical use is to declare this to be an integer type, and to pass the sequential document number where processing should start. An alternative implementation might take a specific document ID; the collection reader could search through its collection until it reaches this ID and then start there.
This parameter will only make sense if the particular
collection reader is implemented to use the startNumber
configuration parameter.
The <numToProcess>
element is an optional element. It specifies the total number of entities to process. Use -1 to indicate ALL. If not defined, the number of entities to
process will be taken from the Collection Reader configuration. If present, this value overrides the
Collection Reader configuration.
The <outputQueue>
element is an optional element. It enables plugging in a custom implementation
for the Output Queue. When omitted, the CPM will use a default output queue
that is based on First-in First-out (FIFO) model.
The UIMA SDK provides a second implementation
for the Output Queue that can be plugged in to the CPM, named "com.ibm.uima.reference_impl.
.
collection.cpm.engine.SequencedQueue"
This implementation supports handling very
large documents that are split into "chunks"; it provides a delivery
mechanism that insures the sequential order of the chunks using information
carried in the CAS metadata. This metadata, which is required for this
implementation to work correctly, must be added as an instance of a Feature
Structure of type com.ibm.es.tt.DocumentMetaData
and referred to by an additional feature named esDocumentMetaData
in the special instance of uima.tcas.DocumentAnnotation
that is associated with the
CAS. This is usually done by the Collection Reader; the instance contains the
following features:
This implementation of a sequenced queue supports proper sequencing of CASes in CPM deployments that use document chunking. Chunking is a technique of splitting large documents into pieces to reduce overall memory consumption. Chunking does not depend on the number of CASes in the CAS Pool. It works equally well with one or more CASes in the CAS Pool. Each chunk is packaged in a separate CAS and placed in the Work Queue. If the CAS Pool is depleted, the CollectionReader thread is suspended until a CAS is released back to the pool by the processing threads. A document may be split into 1, 2, 3 or more chunks that are analyzed independently. In order to reconstruct the document correctly, the CAS Consumer can depend on receiving the chunks in the same sequential order that the chunks were "produced", when this sequenced queue implementation is used. To plug in this sequenced queue to the CPM use the following specification:
<outputQueue
dequeueTimeout="100000"
queueClass="com.ibm.uima.reference_impl.collection.cpm.engine.SequencedQueue"/>
where the mandatory queueClass
attribute defines
the name of the class and the second mandatory attribute, dequeueTimeout
specifies the maximum number of
milliseconds to wait for the expected chunk.
If the chunk doesn’t arrive in the configured time window, the entire document is presumed to be invalid and the CAS is dropped from further processing. This action occurs regardless of any other error action specification. The SequencedQueue invalidate the document, adding the offending document’s metadata to a local cache of invalid documents.
If the time out occurs, the CPM notifies all registered listeners (see Using Listeners ) by calling entityProcessComplete(). As part of this call, the SequencedQueue will pass null instead of a CAS as the first argument, and a special exception – CPMChunkTimeoutException. The reason for passing null as the first argument is because the time out occurs due to the fact that the chunk has not been received in the configured timeout window, so there is no CAS available when the timeout event occurs.
The CPMChunkTimeoutException object exposes an API that allows the listener to retrieve the offending document id as well as the other metadata attributes as defined above. These attributes are part of each chunk’s metadata and are added by the Collection Reader.
Each chunk that SequencedQueue works on is subjected to a test to determine if the chunk belongs to an invalid document. This test checks the chunk’s metadata against the data in the local cache. If there is a match, the chunk is dropped. This check is only performed for chunks and complete documents are not subject to this check.
If there is an exception during the processing of a chunk, the CPM sends a notification to all registered listeners. The notification includes the CAS and an exception. When the listener notification is completed, the CPM also sends separate notifications, containing the CAS, to the Artifact Producer and the SequencedQueue. The intent is to stop adding new chunks to the Work Queue that belong to an "invalid" document and also to deal with chunks that are en-route, being processed by the processing threads.
In response to the notification, the Artifact Producer will drop and release back to the CAS Pool all CASes that belong to an "invalid" document. Currently, there is no support in the CollectionReader’s API to tell it to stop generating chunks. The CollectionReader keeps producing the chunks but the Artifact Producer immediately drops/releases them to the CAS Pool. Before the CAS is released back to the CAS Pool, the Artifact Producer sends notification to all registered listeners. This notification includes the CAS and an exception – SkipCasException.
In response to the notification of an exception involving a chunk, the SequencedQueue retrieves from the CAS the metadata and adds it to its local cache of "invalid" documents. All chunks de-queued from the OutputQueue and belonging to "invalid" documents will be dropped and released back to the CAS Pool. Before dropping the CAS, the CPM sends notification to all registered listeners. The notification includes the CAS and SkipCasException.
The <checkpoint>
element is an optional element. It specifies a CPE checkpoint file, checkpoint frequency, and strategy
for checkpoints (time or count based). At checkpoint time, the CPM saves status information and statistics to
the checkpoint file. The checkpoint file
is specified in the file
attribute, which has the
same form as the href
attribute of the <include>
element described in Section 23.2
. The time
attribute indicates that a checkpoint should be taken
every [Number]
seconds,
and the b
atch
attribute
indicates that a checkpoint should be taken every [Number]
batches.
The <timerImpl>
element is optional. It is
used to identify a custom timer plug-in class to generate time stamps during
the CPM execution. The value of the
element is a Java class name.
The <deployAs>
element
indicates the type of CPM deployment. Valid contents for this element include:
vinciService
– Vinci service exposing APIs for stop, pause, resume, and getStatsinteractive
– provide command line menus (start, stop, pause, resume)immediate
– run the CPM without menus or a service APIsingle-threaded
– run the CPM in a single threaded mode. In this mode, the Collection Reader,
the Processing Pipeline, and the CAS Consumer Pipeline are all running in one
thread without the work queue and the output queue.
External resource bindings for the CPE may optionally be specified in an element:
<resourceManagerConfiguration href="..."/>
For an introduction to external resources, refer to sections 4.5.4 , .
In
the resourceManagerConfiguration
element, the value of the href
attribute refers to another file that contains definitions and bindings for the
external resources used by the CPE. The
format of this file is the same as the XML snippet . For example,
in a CPE containing an aggregate analysis engine with two annotators, and a CAS
Consumer, the following resource manager configuration file would bind external
resource dependencies in all three components to the same physical resource:
<resourceManagerConfiguration>
<!-- Declare Resource -->
<externalResources> <externalResource> <name>ExampleResource</name> <fileResourceSpecifier> <fileUrl>file:MyResourceFile.dat</fileUrl> </fileResourceSpecifier> </externalResource> </externalResources>
<!-- Bind component resource dependencies to ExampleResource -->
<externalResourceBindings> <externalResourceBinding> <key>MyAE/annotator1/myResourceKey</key> <resourceName>ExampleResource</resourceName> </externalResourceBinding>
<externalResourceBinding> <key>MyAE/annotator2/someResourceKey</key> <resourceName>ExampleResource</resourceName> </externalResourceBinding>
<externalResourceBinding> <key>MyCasConsumer/otherResourceKey</key> <resourceName>ExampleResource</resourceName> </externalResourceBinding>
</externalResourceBindings>
</resourceManagerConfiguration>
In
this example, MyAE
and MyCasConsumer
are
the names of the Analysis Engine and CAS Consumer, as specified by the name
attributes of the CPE's <casProcessor>
elements. annotator1
and annotator2
are
the annotator keys specified within the Aggregate AE Descriptor, and myResourceKey
, someResourceKey
,
and otherResourceKey
are the keys of the resource dependencies
declared in the individual annotator and CAS Consumer descriptors.
<?xml version="1.0" encoding="UTF-8"?> <cpeDescription> <collectionReader> <collectionIterator> <descriptor> <include href="C:Program FilesIBMuimadocsexamplesdescriptorscollection_readerXMLFileCollectionReader.xml"/> </descriptor> </collectionIterator> <casInitializer> <descriptor> <include href="C:Program FilesIBMuimadocsexamplesdescriptorscas_initializerXMLCasInitializer.xml"/> </descriptor> </casInitializer> </collectionReader> <casProcessors dropCasOnException="true" casPoolSize="1" processingUnitThreadCount="1"> <casProcessor deployment="integrated" name="Aggregate TAE - Name Recognizer and Person Title Annotator"> <descriptor> <include href="C:Program FilesIBMuimadocsexamplesdescriptorsanalysis_engineNamesAndPersonTitles_TAE.xml"/> </descriptor> <deploymentParameters/> <filter/> <errorHandling> <errorRateThreshold action="terminate" value="100/1000"/> <maxConsecutiveRestarts action="terminate" value="30"/> <timeout max="100000"/> </errorHandling> <checkpoint batch="1"/> </casProcessor> <casProcessor deployment="integrated" name="Annotation Printer"> <descriptor> <include href="C:Program FilesIBMuimadocsexamplesdescriptorscas_consumerAnnotationPrinter.xml"/> </descriptor> <deploymentParameters/> <filter/> <errorHandling> <errorRateThreshold action="terminate" value="100/1000"/> <maxConsecutiveRestarts action="terminate" value="30"/> <timeout max="100000"/> </errorHandling> <checkpoint batch="1"/> </casProcessor> </casProcessors> <cpeConfig> <numToProcess>1</numToProcess> <deployAs>immediate</deployAs> <checkpoint file="" time="3000"/> <timerImpl/> </cpeConfig> </cpeDescription>