001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (c) 2004-2007 Paul Ferraro
004 * 
005 * This library is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published by the 
007 * Free Software Foundation; either version 2.1 of the License, or (at your 
008 * option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this library; if not, write to the Free Software Foundation, 
017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018 * 
019 * Contact: ferraro@users.sourceforge.net
020 */
021package net.sf.hajdbc.sync;
022
023import java.sql.Connection;
024import java.sql.PreparedStatement;
025import java.sql.ResultSet;
026import java.sql.SQLException;
027import java.sql.Statement;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.List;
033import java.util.concurrent.Callable;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Future;
037
038import net.sf.hajdbc.Dialect;
039import net.sf.hajdbc.Messages;
040import net.sf.hajdbc.SynchronizationContext;
041import net.sf.hajdbc.SynchronizationStrategy;
042import net.sf.hajdbc.TableProperties;
043import net.sf.hajdbc.UniqueConstraint;
044import net.sf.hajdbc.util.SQLExceptionFactory;
045import net.sf.hajdbc.util.Strings;
046
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Database-independent synchronization strategy that only updates differences between two databases.
052 * This strategy is best used when there are <em>few</em> differences between the active database and the inactive database (i.e. barely out of sync).
053 * The following algorithm is used:
054 * <ol>
055 *  <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li>
056 *  <li>For each database table:
057 *   <ol>
058 *    <li>Drop the unique constraints on the table (to avoid integrity constraint violations)</li>
059 *    <li>Find the primary key(s) of the table</li>
060 *    <li>Query all rows in the inactive database table, sorting by the primary key(s)</li>
061 *    <li>Query all rows on the active database table</li>
062 *    <li>For each row in table:
063 *     <ol>
064 *      <li>If primary key of the rows are the same, determine whether or not row needs to be updated</li>
065 *      <li>Otherwise, determine whether row should be deleted, or a new row is to be inserted</li>
066 *     </ol>
067 *    </li>
068 *    <li>Re-create the unique constraints on the table (to avoid integrity constraint violations)</li>
069 *   </ol>
070 *  </li>
071 *  <li>Re-create the foreign keys on the inactive database</li>
072 *  <li>Synchronize sequences</li>
073 * </ol>
074 * @author  Paul Ferraro
075 */
076public class DifferentialSynchronizationStrategy implements SynchronizationStrategy
077{
078        private static Logger logger = LoggerFactory.getLogger(DifferentialSynchronizationStrategy.class);
079
080        private int fetchSize = 0;
081        private int maxBatchSize = 100;
082        
083        /**
084         * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(net.sf.hajdbc.SynchronizationContext)
085         */
086        @Override
087        public <D> void synchronize(SynchronizationContext<D> context) throws SQLException
088        {
089                Connection sourceConnection = context.getConnection(context.getSourceDatabase());
090                Connection targetConnection = context.getConnection(context.getTargetDatabase());
091
092                Dialect dialect = context.getDialect();
093                ExecutorService executor = context.getExecutor();
094                
095                boolean autoCommit = targetConnection.getAutoCommit();
096                
097                targetConnection.setAutoCommit(true);
098                
099                SynchronizationSupport.dropForeignKeys(context);
100                SynchronizationSupport.dropUniqueConstraints(context);
101                
102                targetConnection.setAutoCommit(false);
103                
104                try
105                {
106                        for (TableProperties table: context.getSourceDatabaseProperties().getTables())
107                        {
108                                String tableName = table.getName();
109                                
110                                UniqueConstraint primaryKey = table.getPrimaryKey();
111                                
112                                if (primaryKey == null)
113                                {
114                                        throw new SQLException(Messages.getMessage(Messages.PRIMARY_KEY_REQUIRED, this.getClass().getName(), tableName));
115                                }
116                                
117                                List<String> primaryKeyColumnList = primaryKey.getColumnList();
118                                
119                                Collection<String> columns = table.getColumns();
120                                
121                                // List of colums for select statement - starting with primary key
122                                List<String> columnList = new ArrayList<String>(columns.size());
123                                
124                                columnList.addAll(primaryKeyColumnList);
125                                
126                                for (String column: columns)
127                                {
128                                        if (!primaryKeyColumnList.contains(column))
129                                        {
130                                                columnList.add(column);
131                                        }
132                                }
133                                
134                                List<String> nonPrimaryKeyColumnList = columnList.subList(primaryKeyColumnList.size(), columnList.size());
135                                
136                                String commaDelimitedColumns = Strings.join(columnList, Strings.PADDED_COMMA);
137                                
138                                // Retrieve table rows in primary key order
139                                final String selectSQL = "SELECT " + commaDelimitedColumns + " FROM " + tableName + " ORDER BY " + Strings.join(primaryKeyColumnList, Strings.PADDED_COMMA); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
140                                
141                                final Statement targetStatement = targetConnection.createStatement();
142
143                                targetStatement.setFetchSize(this.fetchSize);
144        
145                                logger.debug(selectSQL);
146                                
147                                Callable<ResultSet> callable = new Callable<ResultSet>()
148                                {
149                                        public ResultSet call() throws SQLException
150                                        {
151                                                return targetStatement.executeQuery(selectSQL);
152                                        }
153                                };
154        
155                                Future<ResultSet> future = executor.submit(callable);
156                                
157                                Statement sourceStatement = sourceConnection.createStatement();
158                                sourceStatement.setFetchSize(this.fetchSize);
159                                
160                                ResultSet sourceResultSet = sourceStatement.executeQuery(selectSQL);
161
162                                ResultSet inactiveResultSet = future.get();
163                                
164                                String primaryKeyWhereClause = " WHERE " + Strings.join(primaryKeyColumnList, " = ? AND ") + " = ?"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
165                                
166                                // Construct DELETE SQL
167                                String deleteSQL = "DELETE FROM " + tableName + primaryKeyWhereClause; //$NON-NLS-1$
168                                
169                                logger.debug(deleteSQL);
170                                
171                                PreparedStatement deleteStatement = targetConnection.prepareStatement(deleteSQL);
172                                
173                                // Construct INSERT SQL
174                                String insertSQL = "INSERT INTO " + tableName + " (" + commaDelimitedColumns + ") VALUES (" + Strings.join(Collections.nCopies(columnList.size(), Strings.QUESTION), Strings.PADDED_COMMA) + ")"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
175                                
176                                logger.debug(insertSQL);
177                                
178                                PreparedStatement insertStatement = targetConnection.prepareStatement(insertSQL);
179                                
180                                // Construct UPDATE SQL
181                                PreparedStatement updateStatement = null;
182                                
183                                if (!nonPrimaryKeyColumnList.isEmpty())
184                                {
185                                        String updateSQL = "UPDATE " + tableName + " SET " + Strings.join(nonPrimaryKeyColumnList, " = ?, ") + " = ?" + primaryKeyWhereClause; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
186                                        
187                                        logger.debug(updateSQL);
188                                        
189                                        updateStatement = targetConnection.prepareStatement(updateSQL);
190                                }
191                                
192                                boolean hasMoreActiveResults = sourceResultSet.next();
193                                boolean hasMoreInactiveResults = inactiveResultSet.next();
194                                
195                                int insertCount = 0;
196                                int updateCount = 0;
197                                int deleteCount = 0;
198                                
199                                while (hasMoreActiveResults || hasMoreInactiveResults)
200                                {
201                                        int compare = 0;
202                                        
203                                        if (!hasMoreActiveResults)
204                                        {
205                                                compare = 1;
206                                        }
207                                        else if (!hasMoreInactiveResults)
208                                        {
209                                                compare = -1;
210                                        }
211                                        else
212                                        {
213                                                for (int i = 1; i <= primaryKeyColumnList.size(); ++i)
214                                                {
215                                                        Object activeObject = sourceResultSet.getObject(i);
216                                                        Object inactiveObject = inactiveResultSet.getObject(i);
217                                                        
218                                                        // We assume that the primary keys column types are Comparable
219                                                        compare = this.compare(activeObject, inactiveObject);
220                                                        
221                                                        if (compare != 0)
222                                                        {
223                                                                break;
224                                                        }
225                                                }
226                                        }
227                                        
228                                        if (compare > 0)
229                                        {
230                                                deleteStatement.clearParameters();
231                                                
232                                                for (int i = 1; i <= primaryKeyColumnList.size(); ++i)
233                                                {
234                                                        int type = dialect.getColumnType(table.getColumnProperties(columnList.get(i - 1)));
235                                                        
236                                                        deleteStatement.setObject(i, inactiveResultSet.getObject(i), type);
237                                                }
238                                                
239                                                deleteStatement.addBatch();
240                                                
241                                                deleteCount += 1;
242                                                
243                                                if ((deleteCount % this.maxBatchSize) == 0)
244                                                {
245                                                        deleteStatement.executeBatch();
246                                                        deleteStatement.clearBatch();
247                                                }
248                                        }
249                                        else if (compare < 0)
250                                        {
251                                                insertStatement.clearParameters();
252
253                                                for (int i = 1; i <= columnList.size(); ++i)
254                                                {
255                                                        int type = dialect.getColumnType(table.getColumnProperties(columnList.get(i - 1)));
256
257                                                        Object object = SynchronizationSupport.getObject(sourceResultSet, i, type);
258                                                        
259                                                        if (sourceResultSet.wasNull())
260                                                        {
261                                                                insertStatement.setNull(i, type);
262                                                        }
263                                                        else
264                                                        {
265                                                                insertStatement.setObject(i, object, type);
266                                                        }
267                                                }
268                                                
269                                                insertStatement.addBatch();
270                                                
271                                                insertCount += 1;
272                                                
273                                                if ((insertCount % this.maxBatchSize) == 0)
274                                                {
275                                                        insertStatement.executeBatch();
276                                                        insertStatement.clearBatch();
277                                                }
278                                        }
279                                        else if (updateStatement != null) // if (compare == 0)
280                                        {
281                                                updateStatement.clearParameters();
282                                                
283                                                boolean updated = false;
284                                                
285                                                for (int i = primaryKeyColumnList.size() + 1; i <= columnList.size(); ++i)
286                                                {
287                                                        int type = dialect.getColumnType(table.getColumnProperties(columnList.get(i - 1)));
288                                                        
289                                                        Object activeObject = SynchronizationSupport.getObject(sourceResultSet, i, type);
290                                                        Object inactiveObject = SynchronizationSupport.getObject(inactiveResultSet, i, type);
291                                                        
292                                                        int index = i - primaryKeyColumnList.size();
293                                                        
294                                                        if (sourceResultSet.wasNull())
295                                                        {
296                                                                updateStatement.setNull(index, type);
297                                                                
298                                                                updated |= !inactiveResultSet.wasNull();
299                                                        }
300                                                        else
301                                                        {
302                                                                updateStatement.setObject(index, activeObject, type);
303                                                                
304                                                                updated |= inactiveResultSet.wasNull();
305                                                                updated |= !equals(activeObject, inactiveObject);
306                                                        }
307                                                }
308                                                
309                                                if (updated)
310                                                {
311                                                        for (int i = 1; i <= primaryKeyColumnList.size(); ++i)
312                                                        {
313                                                                int type = dialect.getColumnType(table.getColumnProperties(columnList.get(i - 1)));
314                                                                
315                                                                updateStatement.setObject(i + nonPrimaryKeyColumnList.size(), inactiveResultSet.getObject(i), type);
316                                                        }
317                                                        
318                                                        updateStatement.addBatch();
319                                                        
320                                                        updateCount += 1;
321                                                        
322                                                        if ((updateCount % this.maxBatchSize) == 0)
323                                                        {
324                                                                updateStatement.executeBatch();
325                                                                updateStatement.clearBatch();
326                                                        }
327                                                }
328                                        }
329                                        
330                                        if (hasMoreActiveResults && (compare <= 0))
331                                        {
332                                                hasMoreActiveResults = sourceResultSet.next();
333                                        }
334                                        
335                                        if (hasMoreInactiveResults && (compare >= 0))
336                                        {
337                                                hasMoreInactiveResults = inactiveResultSet.next();
338                                        }
339                                }
340                                
341                                if ((deleteCount % this.maxBatchSize) > 0)
342                                {
343                                        deleteStatement.executeBatch();
344                                }
345                                
346                                deleteStatement.close();
347                                
348                                if ((insertCount % this.maxBatchSize) > 0)
349                                {
350                                        insertStatement.executeBatch();
351                                }
352                                
353                                insertStatement.close();
354                                
355                                if (updateStatement != null)
356                                {
357                                        if ((updateCount % this.maxBatchSize) > 0)
358                                        {
359                                                updateStatement.executeBatch();
360                                        }
361                                        
362                                        updateStatement.close();
363                                }
364                                
365                                targetStatement.close();
366                                sourceStatement.close();
367                                
368                                targetConnection.commit();
369                                
370                                logger.info(Messages.getMessage(Messages.INSERT_COUNT, insertCount, tableName));
371                                logger.info(Messages.getMessage(Messages.UPDATE_COUNT, updateCount, tableName));
372                                logger.info(Messages.getMessage(Messages.DELETE_COUNT, deleteCount, tableName));                        
373                        }
374                }
375                catch (ExecutionException e)
376                {
377                        SynchronizationSupport.rollback(targetConnection);
378                        
379                        throw SQLExceptionFactory.createSQLException(e.getCause());
380                }
381                catch (InterruptedException e)
382                {
383                        SynchronizationSupport.rollback(targetConnection);
384                        
385                        throw SQLExceptionFactory.createSQLException(e.getCause());
386                }
387                catch (SQLException e)
388                {
389                        SynchronizationSupport.rollback(targetConnection);
390                        
391                        throw e;
392                }
393                
394                targetConnection.setAutoCommit(true);
395                
396                SynchronizationSupport.restoreUniqueConstraints(context);
397                SynchronizationSupport.restoreForeignKeys(context);
398                
399                SynchronizationSupport.synchronizeIdentityColumns(context);
400                SynchronizationSupport.synchronizeSequences(context);
401                
402                targetConnection.setAutoCommit(autoCommit);
403        }
404
405        private boolean equals(Object object1, Object object2)
406        {
407                if ((object1 instanceof byte[]) && (object2 instanceof byte[]))
408                {
409                        byte[] bytes1 = (byte[]) object1;
410                        byte[] bytes2 = (byte[]) object2;
411                        
412                        if (bytes1.length != bytes2.length)
413                        {
414                                return false;
415                        }
416                        
417                        return Arrays.equals(bytes1, bytes2);
418                }
419                
420                return object1.equals(object2);
421        }
422        
423        @SuppressWarnings("unchecked")
424        private int compare(Object object1, Object object2)
425        {
426                return ((Comparable) object1).compareTo(object2);
427        }
428
429        /**
430         * @return the fetchSize.
431         */
432        public int getFetchSize()
433        {
434                return this.fetchSize;
435        }
436
437        /**
438         * @param fetchSize the fetchSize to set.
439         */
440        public void setFetchSize(int fetchSize)
441        {
442                this.fetchSize = fetchSize;
443        }
444
445        /**
446         * @return Returns the maxBatchSize.
447         */
448        public int getMaxBatchSize()
449        {
450                return this.maxBatchSize;
451        }
452
453        /**
454         * @param maxBatchSize The maxBatchSize to set.
455         */
456        public void setMaxBatchSize(int maxBatchSize)
457        {
458                this.maxBatchSize = maxBatchSize;
459        }
460}