001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (c) 2004-2007 Paul Ferraro
004 * 
005 * This library is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published by the 
007 * Free Software Foundation; either version 2.1 of the License, or (at your 
008 * option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this library; if not, write to the Free Software Foundation, 
017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018 * 
019 * Contact: ferraro@users.sourceforge.net
020 */
021package net.sf.hajdbc.distributable;
022
023import java.lang.reflect.Method;
024import java.text.MessageFormat;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.TreeMap;
029import java.util.Vector;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.Lock;
034
035import net.sf.hajdbc.DatabaseCluster;
036import net.sf.hajdbc.LockManager;
037
038import org.jgroups.Address;
039import org.jgroups.Message;
040import org.jgroups.MessageListener;
041import org.jgroups.blocks.GroupRequest;
042import org.jgroups.blocks.MethodCall;
043import org.jgroups.blocks.RpcDispatcher;
044import org.jgroups.util.Rsp;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * LockManager implementation that leverages a JGroups 2-phase voting adapter for obtain remote write locks.
050 * 
051 * @author Paul Ferraro
052 */
053public class DistributableLockManager extends AbstractMembershipListener implements LockManager, MessageListener
054{
055        private static final String CHANNEL = "{0}-lock"; //$NON-NLS-1$
056        
057        static Logger logger = LoggerFactory.getLogger(DistributableLockManager.class);
058        
059        protected RpcDispatcher dispatcher;
060        protected int timeout;
061        private LockManager lockManager;
062        private Map<Address, Map<String, Lock>> addressMap = new ConcurrentHashMap<Address, Map<String, Lock>>();
063        
064        /**
065         * Constructs a new DistributableLock.
066         * @param <D> either java.sql.Driver or javax.sql.Datasource
067         * @param databaseCluster a database cluster
068         * @param decorator a decorator
069         * @throws Exception
070         */
071        public <D> DistributableLockManager(DatabaseCluster<D> databaseCluster, DistributableDatabaseClusterDecorator decorator) throws Exception
072        {
073                super(decorator.createChannel(MessageFormat.format(CHANNEL, databaseCluster.getId())));
074                
075                this.lockManager = databaseCluster.getLockManager();
076                
077                this.timeout = decorator.getTimeout();
078
079                this.dispatcher = new RpcDispatcher(this.channel, this, this, this);
080        }
081
082        /**
083         * @see net.sf.hajdbc.Lifecycle#start()
084         */
085        @Override
086        public void start() throws Exception
087        {
088                this.channel.connect(this.channel.getClusterName());
089                
090                this.lockManager.start();
091        }
092        
093        /**
094         * @see net.sf.hajdbc.Lifecycle#stop()
095         */
096        @Override
097        public void stop()
098        {
099                this.channel.close();
100
101                this.lockManager.stop();
102        }
103        
104        /**
105         * Read locks are local.
106         * @see net.sf.hajdbc.LockManager#readLock(java.lang.String)
107         */
108        @Override
109        public Lock readLock(String object)
110        {
111                return this.lockManager.readLock(object);
112        }
113
114        /**
115         * Write locks are distributed.
116         * @see net.sf.hajdbc.LockManager#writeLock(java.lang.String)
117         */
118        @Override
119        public Lock writeLock(String object)
120        {
121                return new DistributableLock(object, this.lockManager.writeLock(object));
122        }
123
124        /**
125         * Votes on the specified decree.
126         * @param decree a lock decree
127         * @return true, if success, false if failure
128         */
129        public boolean vote(LockDecree decree)
130        {
131                Map<String, Lock> lockMap = this.addressMap.get(decree.getAddress());
132                
133                // Vote negatively for decrees from non-members
134                if (lockMap == null)
135                {
136                        return false;
137                }
138                
139                return decree.vote(this.lockManager, lockMap);
140        }
141        
142        /**
143         * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberJoined(org.jgroups.Address)
144         */
145        @Override
146        protected void memberJoined(Address address)
147        {
148                this.addressMap.put(address, new HashMap<String, Lock>());
149        }
150
151        /**
152         * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberLeft(org.jgroups.Address)
153         */
154        @Override
155        protected void memberLeft(Address address)
156        {
157                Map<String, Lock> lockMap = this.addressMap.remove(address);
158                
159                for (Lock lock: lockMap.values())
160                {
161                        lock.unlock();
162                }
163        }
164
165        /**
166         * @see org.jgroups.MessageListener#getState()
167         */
168        @Override
169        public byte[] getState()
170        {
171                return null;
172        }
173
174        /**
175         * @see org.jgroups.MessageListener#receive(org.jgroups.Message)
176         */
177        @Override
178        public void receive(Message message)
179        {
180                // Do nothing
181        }
182
183        /**
184         * @see org.jgroups.MessageListener#setState(byte[])
185         */
186        @Override
187        public void setState(byte[] arg0)
188        {
189                // Do nothing
190        }
191
192        private class DistributableLock implements Lock
193        {
194                private LockDecree acquireDecree;
195                private LockDecree releaseDecree;
196                private Lock lock;
197                
198                /**
199                 * @param object
200                 * @param lock
201                 */
202                public DistributableLock(String object, Lock lock)
203                {
204                        Address address = DistributableLockManager.this.channel.getLocalAddress();
205                        
206                        this.acquireDecree = new AcquireLockDecree(object, address);
207                        this.releaseDecree = new ReleaseLockDecree(object, address);
208                        
209                        this.lock = lock;
210                }
211                
212                /**
213                 * @see java.util.concurrent.locks.Lock#lock()
214                 */
215                @Override
216                public void lock()
217                {
218                        while (!DistributableLockManager.this.isMembershipEmpty())
219                        {
220                                if (this.tryLockFairly()) return;
221                                
222                                Thread.yield();
223                        }
224                        
225                        this.lock.lock();
226                }
227
228                /**
229                 * @see java.util.concurrent.locks.Lock#lockInterruptibly()
230                 */
231                @Override
232                public void lockInterruptibly() throws InterruptedException
233                {
234                        while (!DistributableLockManager.this.isMembershipEmpty())
235                        {
236                                if (this.tryLockFairly()) return;
237
238                                if (Thread.currentThread().isInterrupted())
239                                {
240                                        throw new InterruptedException();
241                                }
242                                
243                                Thread.yield();
244                        }
245                        
246                        this.lock.lockInterruptibly();
247                }
248
249                /**
250                 * @see java.util.concurrent.locks.Lock#tryLock()
251                 */
252                @Override
253                public boolean tryLock()
254                {
255                        if (this.lock.tryLock())
256                        {
257                                if (this.tryRemoteLock())
258                                {
259                                        return true;
260                                }
261                                
262                                this.lock.unlock();
263                        }
264                        
265                        return false;
266                }
267
268                /**
269                 * Like {@link #tryLock()}, but do not barge on other waiting threads
270                 * @return true, if lock acquired, false otherwise
271                 * @throws InterruptedException
272                 */
273                private boolean tryLockFairly()
274                {
275                        try
276                        {
277                                if (this.lock.tryLock(0, TimeUnit.SECONDS))
278                                {
279                                        if (this.tryRemoteLock())
280                                        {
281                                                return true;
282                                        }
283                                        
284                                        this.lock.unlock();
285                                }
286                        }
287                        catch (InterruptedException e)
288                        {
289                                Thread.currentThread().interrupt();
290                        }
291                        
292                        return false;
293                }
294                
295                /**
296                 * @see java.util.concurrent.locks.Lock#tryLock(long, java.util.concurrent.TimeUnit)
297                 */
298                @Override
299                public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException
300                {
301                        // Convert timeout to milliseconds
302                        long ms = unit.toMillis(timeout);
303                        
304                        long stopTime = System.currentTimeMillis() + ms;
305                        
306                        do
307                        {
308                                if (DistributableLockManager.this.isMembershipEmpty())
309                                {
310                                        return this.lock.tryLock(ms, TimeUnit.MILLISECONDS);
311                                }
312                                
313                                if (this.tryLockFairly())
314                                {
315                                        return true;
316                                }
317
318                                if (Thread.currentThread().isInterrupted())
319                                {
320                                        throw new InterruptedException();
321                                }
322                                
323                                ms = stopTime - System.currentTimeMillis();
324                        }
325                        while (ms >= 0);
326                        
327                        return false;
328                }
329
330                /**
331                 * @see java.util.concurrent.locks.Lock#unlock()
332                 */
333                @Override
334                public void unlock()
335                {
336                        this.remoteUnlock();
337
338                        this.lock.unlock();
339                }
340                
341                private boolean tryRemoteLock()
342                {
343                        Map<Boolean, Vector<Address>> map = null;
344                        
345                        try
346                        {
347                                map = this.remoteLock();
348                                
349                                return map.get(false).isEmpty();
350                        }
351                        finally
352                        {
353                                if (map != null)
354                                {
355                                        this.remoteUnlock(map.get(true));
356                                }
357                        }
358                }
359                
360                private Map<Boolean, Vector<Address>> remoteLock()
361                {
362                        return DistributableLockManager.this.remoteVote(this.acquireDecree, null, DistributableLockManager.this.timeout);
363                }
364
365                private Map<Boolean, Vector<Address>> remoteUnlock()
366                {
367                        return this.remoteUnlock(null);
368                }
369                
370                private Map<Boolean, Vector<Address>> remoteUnlock(Vector<Address> address)
371                {
372                        return DistributableLockManager.this.remoteVote(this.releaseDecree, address, 0);
373                }
374                
375                /**
376                 * @see java.util.concurrent.locks.Lock#newCondition()
377                 */
378                @Override
379                public Condition newCondition()
380                {
381                        throw new UnsupportedOperationException();
382                }
383        }
384        
385        Map<Boolean, Vector<Address>> remoteVote(LockDecree decree, Vector<Address> addresses, long timeout)
386        {
387                Map<Boolean, Vector<Address>> map = new TreeMap<Boolean, Vector<Address>>();
388                
389                int size = (addresses != null) ? addresses.size() : this.getMembershipSize();
390                
391                map.put(true, new Vector<Address>(size));
392                map.put(false, new Vector<Address>(size));
393                
394                if (size > 0)
395                {
396                        try
397                        {
398                                Method method = this.getClass().getMethod("vote", LockDecree.class); //$NON-NLS-1$
399                                
400                                MethodCall call = new MethodCall(method, new Object[] { decree });
401                                
402                                Collection<Rsp> responses = this.dispatcher.callRemoteMethods(addresses, call, GroupRequest.GET_ALL, timeout).values();
403                                
404                                for (Rsp response: responses)
405                                {
406                                        Object value = response.wasReceived() ? response.getValue() : false;
407                                        
408                                        map.get((value != null) ? value : false).add(response.getSender());
409                                }
410                        }
411                        catch (NoSuchMethodException e)
412                        {
413                                throw new IllegalStateException(e);
414                        }
415                }
416                
417                return map;
418        }
419}