This article was written on Rickard Öberg’s blog, 6 Nov 2010
The past week I’ve had to deal with a lot of data shuffling, both in raw form as bytes and strings, and as SPI and domain level objects. What struck me is that it is notoriously hard to shuffle things from one place to another in a way that is scalable, performant and handles errors correctly. And I had to do some things over and over again, like reading strings from files.
So the thought occurred: there must be a general pattern to how this thing works, which can be extracted and put into a library. "Reading lines from a text file" should only have to be done once, and then used in whatever scenario requires it. Let’s take a look at a typical example of reading from one file and writing to another to see if we can find out what the possible pieces could be:
1: File source = new File( getClass().getResource( "/iotest.txt" ).getFile() ); 1: File destination = File.createTempFile( "test", ".txt" ); 1: destination.deleteOnExit(); 2: BufferedReader reader = new BufferedReader(new FileReader(source)); 3: long count = 0; 2: try 2: { 4: BufferedWriter writer = new BufferedWriter(new FileWriter(destination)); 4: try 4: { 2: String line = null; 2: while ((line = reader.readLine()) != null) 2: { 3: count++; 4: writer.append( line ).append( '\n' ); 2: } 4: writer.close(); 4: } catch (IOException e) 4: { 4: writer.close(); 4: destination.delete(); 4: } 2: } finally 2: { 2: reader.close(); 2: } 1: System.out.println(count)
As the numbers to the left indicates, I’ve identified four parts in this type of code that could be separated from each other.
1) is the client code that initiates a transfer, and which have to know the input and output source.
2) is the code that reads lines from an input.
3) is helper code that I use to keep track of what’s going on, and which I’d like to reuse no matter what kind of transfer is being done.
4) receives the data and writes it down. In this code, if I wanted to implement batching on the read and write side I could do so by changing the 2 and 4 parts to read/write multiple lines at a time.
If you want to reproduce what’s explained in this tutorial, remember to depend on the Core Runtime artifact that depends on Core API, Core SPI, Core Bootstrap and Core Functional & I/O APIs:
See the Depend on Zest™ in your build tutorial for details.
Once theses parts were identified it was mostly just a matter of putting interfaces on these pieces, and making sure they can be easily used in many different situations. The result is as follows.
To start with we have Input:
public interface Input<T, SenderThrowableType extends Throwable> { <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) throws SenderThrowableType, ReceiverThrowableType; }
Inputs, like Iterables, can be used over and over again to initiate transfers of data from one place to another, in this case an Output. Since I want this to be generic the type of things that is sent is T, so can be anything (byte[], String, EntityState, MyDomainObject). I also want the sender and receiver of data to be able to throw their own exceptions, and this is marked by declaring these as generic exception types. For example, the input may want to throw SQLException and the output IOException, if anything goes wrong. This should be strongly typed, and both sender and receiver must know when either side screws up, so that they can recover properly and close any resources they have opened.
On the receiving side we then have Output:
public interface Output<T, ReceiverThrowableType extends Throwable> { [...snip...] <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender ) throws ReceiverThrowableType, SenderThrowableType; }
When receiveFrom is invoked by an Input, as a result of invoking transferTo on the Input, the Output should open whatever resources it needs to write to, and then expect data to come from a Sender. Both the Input and Output must have the same type T, so that they agree on what is being sent. We will see later how this can be handled if this is not the case.
Next we have Sender:
public interface Sender<T, SenderThrowableType extends Throwable> { [...snip...] <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super T, ReceiverThrowableType> receiver ) throws ReceiverThrowableType, SenderThrowableType; }
The Output invokes sendTo and passes in a Receiver that the Sender will use to send individual items. The sender at this point can start transferring data of type T to the receiver, one at a time. The Receiver looks like this:
public interface Receiver<T, ReceiverThrowableType extends Throwable> { [...snip...] void receive( T item ) throws ReceiverThrowableType; }
When the receiver gets the individual items from the sender it can either immediately write them to its underlying resource, or batch them up. Since the receiver will know when the transfer is done (sendTo returns) it can write the remaining batches properly and close any resource it holds.
This simple pattern, with two interfaces on the sending side and two on the receiving side, gives us the potential to do scalable, performant and fault-tolerant transfers of data.
So now that the above API defines the contract of sending and receiving data, I can then create a couple of standard inputs and outputs. Let’s say, reading lines of text from a file, and writing lines of text to a file. These implementations I can then put in static methods so they are easy to use. In the end, to make a copy of a text file looks like this:
File source = new File("source.txt"); File destination = new File("destination.txt"); Inputs.text( source ).transferTo( Outputs.text( destination ) );
One line of code that handles the reading, the writing, the cleaning up, buffering, and whatnot. Pretty nifty! The transferTo method will throw IOException, which I can catch if I want to present any errors to the user. But actually dealing with those errors, i.e. closing the files and potentially deleting the destination if the transfer failed, is already handled by the Input and Output. I will never have to deal with the details of reading text from a file ever again!
While the above handles the basic input/output of a transfer, there are usually other things that I want to do as well. I may want to count how many items were transferred, do some filtering, or log every 1000 items or so to see what’s going on. Since input and output are now separated this becomes simply a matter of inserting something in the middle that mediates the input and output. Since many of these mediations have a similar character I can put these into standard utility methods to make them easier to use.
The first standard decorator is a filter. I will implement this by means of supplying a Specification:
public static <T,ReceiverThrowableType extends Throwable> Output<T, ReceiverThrowableType> filter( final Specification<T> specification, final Output<T, ReceiverThrowableType> output) { ... create an Output that filters items based on the Specification<T> ... }
Where Specification is:
interface Specification<T> { boolean test(T item); }
With this simple construct I can now perform transfers and easily filter out items I don’t want on the receiving side. This example removes empty lines from a text file.
File source = ... File destination = ... Inputs.text( source ).transferTo( Transforms.filter(new Specification<String>() { public boolean test(String string) { return string.length() != 0; } }, Outputs.text(destination) );
The second common operation is mapping from one type to the other. This deals with the case that one Input you have may not match the Output you want to send to, but there’s a way to map from the input type to the output type. An example would be to map from String to JSONObject, for example. The operation itself looks like this:
public static <From,To,ReceiverThrowableType extends Throwable> Output<From, ReceiverThrowableType> map( Function<From,To> function, Output<To, ReceiverThrowableType> output)
Where Function is defined as:
interface Function<From, To> { To map(From from); }
With this I can then connect an Input of Strings to an Output of JSONObject like so:
Input<String,IOException> input = ...; Output<JSONObject,RuntimeException> output = ...; input.transferTo(Transforms.map(new String2JSON(), output);
Where String2JSON implements Function and it’s map method converts the String into a JSONObject.
At this point we can now deal with the last part of the initial example, the counting of items. This can be implemented as a generic Map that has the same input and output type, and just maintains a count internally that updates on every call to map(). The example can then be written as:
File source = ... File destination = ... Counter<String> counter = new Counter<String>(); Inputs.text( source ).transferTo( Transforms.map(counter, Outputs.text(destination) )); System.out.println("Nr of lines:"+counter.getCount())
Now I can finally get back to my initial problem that led me to look into this: how to implement a good way to access EntityStates in a Zest™ EntityStore, and perform restores of backups. The current version of accessing EntityStates look like this:
<ThrowableType extends Throwable> void visitEntityStates( EntityStateVisitor<ThrowableType> visitor, ModuleSPI module ) throws ThrowableType; interface EntityStateVisitor<ThrowableType extends Throwable> { void visitEntityState( EntityState entityState ) throws ThrowableType; }
This can now be replaced with:
Input<EntityState, EntityStoreException> entityStates(ModuleSPI module);
Because of the pattern outlined above, users of this will get more information about what’s happening in the traversal, such as if the EntityStore raised an EntityStoreException during the traversal, which they can then handle gracefully. It also becomes easy to add decorators such as maps and filters to users of this. Let’s say you only are interested in EntityState’s of a given type. Then add a filter for this, without changing the consumer.
For importing backup data into an EntityStore, the interface used to look like this:
interface ImportSupport { ImportResult importFrom( Reader in ) throws IOException; }
This ties the EntityStore to only being able to read JSON lines from Reader’s, the client will not know if the IOException raised is due to errors in the Reader or writing in the store, and the ImportResult, which contains a list of exceptions and count of stuff, is quite ugly to create and use. With the I/O API at hand this can now be replaced with:
interface ImportSupport { Output<String,EntityStoreException> importJSON(); }
To use this, given the helpers outlined above, is as simple as:
File backup = ... ImportSupport entityStore = ... Inputs.text(backup).transferTo(entityStore.importJSON());
If the client wants any "extras", such as counting how many objects were imported, this can be done by adding filters as previously shown. If you only want to, say, import entities modified before a particular date (let’s say you know some junk was introduced after a given time), then add a specification filter that performs this check. And so on.
It is quite common while developing software that you have to shuffle data or objects from one input to another output, possible with some transformations in the middle. Usually these things have to be done from scratch, which opens up for errors and badly applied patterns. By introducing a generic Input/Output API that encapsulates and separates these things properly it becomes easier to perform these tasks in a scalable, performant and error-free way, and while still allowing these tasks to be decorated with extra features when needed.
This article has outlined one way to do this, and the API and helpers that I’ve described are available in the current Zest™ Core 1.3-SNAPSHOT in Git (see Zest™ homepage for access details). The idea is to start using it throughout Zest wherever we need to do I/O of the type described here.