View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.jetspeed.statistics.impl;
18  
19  import java.sql.Connection;
20  import java.sql.PreparedStatement;
21  import java.sql.SQLException;
22  import java.util.Iterator;
23  import java.util.LinkedList;
24  import java.util.List;
25  
26  import javax.sql.DataSource;
27  
28  /***
29   * <p>
30   * BatchedStatistics
31   * </p>
32   * 
33   * @author <a href="mailto:chris@bluesunrise.com">Chris Schaefer </a>
34   * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a>
35   * @version $Id: TestPortletEntityDAO.java,v 1.3 2005/05/24 14:43:19 ate Exp $
36   */
37  public abstract class BatchedStatistics implements Runnable
38  {
39  
40      public BatchedStatistics(DataSource ds, int batchSize,
41              long msElapsedTimeThreshold, String name)
42      {
43          this.ds = ds;
44          this.msElapsedTimeThreshold = msElapsedTimeThreshold;
45          this.batchSize = batchSize;
46          this.name = name;
47          if (this.name == null)
48          {
49              this.name = this.getClass().getName();
50          }
51          msLastFlushTime = System.currentTimeMillis();
52          thread = new Thread(this, name);
53          
54      }
55      
56      public void startThread() {
57          
58          thread.start();
59          
60          // give a quick break until the thread is running
61          // we know thread is running when done is false
62          while (this.done)
63          {
64              try
65              {
66                  Thread.sleep(1);
67              } catch (InterruptedException e)
68              {
69              }
70          }
71      }
72  
73      protected Connection getConnection() throws SQLException
74      {
75          return ds.getConnection();
76      }
77  
78      /***
79       * should only be called from code synchronized to the linked list
80       */
81      private void checkAndDoFlush()
82      {
83          long msCurrentTime = System.currentTimeMillis();
84          if ((logRecords.size() >= batchSize)
85                  || (msCurrentTime - msLastFlushTime > msElapsedTimeThreshold))
86          {
87              flush();
88              msLastFlushTime = msCurrentTime;
89          }
90      }
91  
92      public void addStatistic(LogRecord logRecord)
93      {
94          synchronized (logRecords)
95          {
96              logRecords.add(logRecord);
97              checkAndDoFlush();
98          }
99      }
100 
101     public boolean isDone()
102     {
103         return done;
104     }
105 
106     public void tellThreadToStop()
107     {
108         keepRunning = false;
109         //this.thread.notify();
110     }
111 
112     private boolean done = true;
113 
114     private boolean keepRunning = true;
115 
116     public void run()
117     {
118         done = false;
119         while (keepRunning)
120         {
121             try
122             {
123                 synchronized (this.thread)
124                 {
125                     this.thread.wait(msElapsedTimeThreshold / 4);
126                 }
127             } catch (InterruptedException ie)
128             {
129                 keepRunning = false;
130             }
131             synchronized (logRecords)
132             {
133                 checkAndDoFlush();
134             }
135         }
136         // force a flush on the way out even if the constraints have not been
137         // met
138         synchronized (logRecords)
139         {
140             flush();
141         }
142         done = true;
143     }
144 
145     /*
146      * (non-Javadoc)
147      * 
148      * @see org.apache.jetspeed.statistics.impl.BatchedStatistics#flush() should
149      *      only be called from code synchronized to the linked list
150      */
151     public void flush()
152     {
153         if (logRecords.isEmpty()) return;
154 
155         Connection con = null;
156         PreparedStatement stm = null;
157 
158         try
159         {
160             con = getConnection();
161             boolean autoCommit = con.getAutoCommit();
162             con.setAutoCommit(false);
163 
164             stm = getPreparedStatement(con);
165             Iterator recordIterator = logRecords.iterator();
166             while (recordIterator.hasNext())
167             {
168                 LogRecord record = (LogRecord) recordIterator.next();
169 
170                 loadOneRecordToStatement(stm, record);
171 
172                 stm.addBatch();
173             }
174             stm.executeBatch();
175             con.commit();
176             // only clear the records if we actually store them...
177             logRecords.clear();
178             con.setAutoCommit(autoCommit);
179         } 
180         catch (SQLException e)
181         {
182             // todo log to standard Jetspeed logger
183             e.printStackTrace();
184             try
185             {
186                 con.rollback();
187             }
188             catch (Exception e2) {}
189         } 
190         finally
191         {
192             try
193             {
194                 if (stm != null) stm.close();
195             } catch (SQLException se)
196             {
197             }
198             releaseConnection(con);
199         }
200     }
201 
202     abstract protected PreparedStatement getPreparedStatement(Connection con)
203             throws SQLException;
204 
205     abstract protected void loadOneRecordToStatement(PreparedStatement stm,
206             LogRecord rec) throws SQLException;
207 
208     void releaseConnection(Connection con)
209     {
210         try
211         {
212             if (con != null) con.close();
213         } catch (SQLException e)
214         {
215         }
216     }
217 
218     protected Thread thread;
219 
220     protected long msLastFlushTime = 0;
221 
222     protected int batchSize = 10;
223 
224     protected long msElapsedTimeThreshold = 5000;
225 
226     protected List logRecords = new LinkedList();
227 
228     protected DataSource ds = null;
229 
230     protected String name;
231 
232     public abstract boolean canDoRecordType(LogRecord rec);
233 
234 }