View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer;
20  
21  
22  import org.apache.log4j.Logger;
23  
24  public class ClientAck {
25    static Logger log = Logger.getLogger(ClientAck.class);
26  
27    // TODO move all constant to config
28  
29    public static final int OK = 100;
30    public static final int KO = -100;
31    public static final int KO_LOCK = -200;
32  
33    private long ts = 0;
34  
35    private Object lock = new Object();
36    private int status = 0;
37    private Throwable exception = null;
38    private int waitTime = 6 * 1000;// 6 secs
39    private int timeOut = 15 * 1000;
40  
41    public ClientAck() {
42      this.ts = System.currentTimeMillis() + timeOut;
43    }
44  
45    public int getTimeOut() {
46      return timeOut;
47    }
48  
49    public void wait4Ack() {
50      synchronized (lock) {
51        // log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client synch");
52        while (this.status == 0) {
53          // log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client Before wait");
54          try {
55            lock.wait(waitTime);
56          } catch (InterruptedException e) {
57          }
58          long now = System.currentTimeMillis();
59          if (now > ts) {
60            this.status = KO_LOCK;
61            this.exception = new RuntimeException("More than maximum time lock ["
62                + this.toString() + "]");
63          }
64        }
65        // log.info("[" + Thread.currentThread().getName() +
66        // "] >>>>>>>>>>>>>>>>> Client after wait status [" + status + "] [" +
67        // this.toString() + "]");
68      }
69    }
70  
71    public void releaseLock(int status, Throwable exception) {
72      this.exception = exception;
73      this.status = status;
74  
75      // log.info("[" + Thread.currentThread().getName() +
76      // "] <<<<<<<<<<<<<<<<< Server synch [" + status + "] ----->>>> [" +
77      // this.toString() + "]");
78      synchronized (lock) {
79        // log.info("<<<<<<<<<<<<<<< Server before notify");
80        lock.notifyAll();
81      }
82      // log.info("<<<<<<<<<<<<<<< Server after notify");
83    }
84  
85    public int getStatus() {
86      return status;
87    }
88  
89    public void setStatus(int status) {
90      this.status = status;
91    }
92  
93    public Throwable getException() {
94      return exception;
95    }
96  
97    public void setException(Throwable exception) {
98      this.exception = exception;
99    }
100 }