Diff of /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
Parent Directory
| Revision Log
| Patch
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java 2006/03/07 20:19:47 383996
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java 2006/03/07 20:24:06 383997
@@ -15,17 +15,26 @@
*/
package org.apache.catalina.tribes.tipis;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.util.UUIDGenerator;
/**
* A channel to handle RPC messaging
* @author Filip Hanik
*/
-public class RpcChannel {
+public class RpcChannel implements ChannelListener{
public static final int FIRST_REPLY = 1;
public static final int MAJORITY_REPLY = 2;
@@ -33,7 +42,9 @@ public class RpcChannel {
private Channel channel;
private RpcCallback callback;
- private String rpcId;
+ private byte[] rpcId;
+
+ private HashMap responseMap = new HashMap();
/**
* Create an RPC channel. You can have several RPC channels attached to a group
@@ -42,10 +53,11 @@ public class RpcChannel {
* @param channel Channel
* @param callback RpcCallback
*/
- public RpcChannel(String rpcId, Channel channel, RpcCallback callback) {
+ public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
this.channel = channel;
this.callback = callback;
this.rpcId = rpcId;
+ //channel.addChannelListener(this);
}
@@ -54,15 +66,47 @@ public class RpcChannel {
* @param destination Member[] - the destination for the message, and the members you request a reply from
* @param message Serializable - the message you are sending out
* @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY
- * @param timeout long - timeout in milliseconds, if no reply is received within this time an exception is thrown
+ * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned
* @return Response[] - an array of response objects.
* @throws ChannelException
*/
public Response[] send(Member[] destination,
Serializable message,
int options,
- long timeout) throws ChannelException {
- throw new UnsupportedOperationException();
+ long timeout) throws ChannelException, InterruptedException {
+
+ RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));
+ RpcCollector collector = new RpcCollector(key,options,destination.length,timeout);
+ synchronized (collector) {
+ responseMap.put(key,collector);
+ RpcMessage rmsg = new RpcMessage(rpcId,key.id,message);
+ channel.send(destination,rmsg);
+ collector.wait(timeout);
+ }
+ return collector.getResponses();
+ }
+
+
+ public void messageReceived(Serializable msg, Member sender) {
+ RpcMessage rmsg = (RpcMessage)msg;
+ RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
+ RpcCollector collector = (RpcCollector)responseMap.get(key);
+ if ( collector == null ) {
+ callback.leftOver(rmsg.message,sender);
+ } else {
+ synchronized (collector) {
+ collector.addResponse(rmsg.message,sender);
+ if ( collector.isComplete() ) collector.notifyAll();
+ }
+ }
+
+ }
+
+ public boolean accept(Serializable msg, Member sender) {
+ if ( msg instanceof RpcMessage ) {
+ RpcMessage rmsg = (RpcMessage)msg;
+ return Arrays.equals(rmsg.rpcId,rpcId);
+ }else return false;
}
public Channel getChannel() {
@@ -73,7 +117,7 @@ public class RpcChannel {
return callback;
}
- public String getRpcId() {
+ public byte[] getRpcId() {
return rpcId;
}
@@ -85,8 +129,116 @@ public class RpcChannel {
this.callback = callback;
}
- public void setRpcId(String rpcId) {
+ public void setRpcId(byte[] rpcId) {
this.rpcId = rpcId;
}
+
+ public static class RpcMessage implements Externalizable {
+
+ private Serializable message;
+ private byte[] uuid;
+ private byte[] rpcId;
+
+ public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
+ this.rpcId = rpcId;
+ this.uuid = uuid;
+ this.message = message;
+ }
+
+ public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
+ int length = in.readInt();
+ uuid = new byte[length];
+ in.read(uuid, 0, length);
+ length = in.readInt();
+ rpcId = new byte[length];
+ in.read(rpcId, 0, length);
+ message = (Serializable)in.readObject();
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(uuid.length);
+ out.write(uuid, 0, uuid.length);
+ out.writeInt(rpcId.length);
+ out.write(rpcId, 0, rpcId.length);
+ out.writeObject(message);
+ }
+
+ }
+
+ /**
+ *
+ * Class that holds all response.
+ * @author not attributable
+ * @version 1.0
+ */
+ public static class RpcCollector {
+ public ArrayList responses = new ArrayList();
+ public RpcCollectorKey key;
+ public int options;
+ public int destcnt;
+ public long timeout;
+
+ public RpcCollector(RpcCollectorKey key, int options, int destcnt, long timeout) {
+ this.key = key;
+ this.options = options;
+ this.destcnt = destcnt;
+ this.timeout = timeout;
+ }
+
+ public void addResponse(Serializable message, Member sender){
+ Response resp = new Response(sender,message);
+ responses.add(resp);
+ }
+
+ public boolean isComplete() {
+ switch (options) {
+ case ALL_REPLY:
+ return destcnt == responses.size();
+ case MAJORITY_REPLY:
+ {
+ float perc = ((float)responses.size()) / ((float)destcnt);
+ return perc >= 50f;
+ }
+ case FIRST_REPLY:
+ return responses.size()>0;
+ default:
+ return false;
+ }
+ }
+
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ if ( o instanceof RpcCollector ) {
+ RpcCollector r = (RpcCollector)o;
+ return r.key.equals(this.key);
+ } else return false;
+ }
+
+ public Response[] getResponses() {
+ return (Response[])responses.toArray(new Response[responses.size()]);
+ }
+ }
+
+ public static class RpcCollectorKey {
+ byte[] id;
+ public RpcCollectorKey(byte[] id) {
+ this.id = id;
+ }
+
+ public int hashCode() {
+ return id[0]+id[1]+id[2]+id[3];
+ }
+
+ public boolean equals(Object o) {
+ if ( o instanceof RpcCollectorKey ) {
+ RpcCollectorKey r = (RpcCollectorKey)o;
+ return Arrays.equals(id,r.id);
+ } else return false;
+ }
+
+ }
}
\ No newline at end of file