org.jgroups.blocks

Class TransactionalHashtable

Implemented Interfaces:
MessageListener, ReplicationReceiver

public class TransactionalHashtable
extends HashMap
implements ReplicationReceiver, MessageListener

Hashtable which replicates its contents to all members of the group. Reads and writes can optionally be forced to acquire locks (r/w locks) to ensure total serializability between replicas. The update modes can be chosen per method and are (in order of cost)
  1. asynchronous (non-blocking) updates
  2. synchronous (blocking) updates (optionally with a timeout)
  3. synchronous (blocking) updates with locking
This class needs to have a state transfer protocol present in the protocol stack used (e.g. pbcast.STATE_TRANSFER).
Author:
Bela Ban Nov 2002

Nested Class Summary

static class
TransactionalHashtable.Data
Class used to transport updates to all replicas
static interface
TransactionalHashtable.Notification

Field Summary

protected boolean
auto_commit
protected Channel
channel
protected boolean
default_sync_repl
protected long
default_sync_repl_timeout
protected String
groupname
protected Address
local_addr
protected long
lock_acquisition_timeout
protected long
lock_lease_timeout
protected Log
log
protected List
notifs
protected String
properties
protected ReplicationManager
repl_mgr
protected HashMap
row_locks
protected long
state_timeout
protected RWLock
table_lock
protected int
transaction_mode

Constructor Summary

TransactionalHashtable(String groupname, String properties, long state_timeout)
TransactionalHashtable(String groupname, String properties, long state_timeout, Map m)
TransactionalHashtable(String groupname, String properties, long state_timeout, int initialCapacity)
TransactionalHashtable(String groupname, String properties, long state_timeout, int initialCapacity, float loadFactor)

Method Summary

void
addNotifier(TransactionalHashtable.Notification n)
void
begin()
Starts a new transaction and associates it with the current thread.
void
begin(int transaction_mode)
Starts a new transaction and associates it with the current thread.
protected void
checkResults(RspList rsps)
Checks whether responses from members contain exceptions or timeouts.
void
clear()
Replicates the update to all members.
void
clear(boolean synchronous, long timeout)
Replicates the update to all members.
void
clear(long sync_timeout, long lock_acquisition_timeout, long lock_lease_timeout, boolean commit)
Replicated the update to all members, and use locks at each member to ensure serializability.
Object
clone()
void
commit()
Commits all modifications done in the current transaction (kept in temporary storage) to the hashtable.
void
commit(Xid transaction)
Commit the modifications to the locally replicated data and release all locks.
boolean
containsKey(Object key)
boolean
containsValue(Object value)
Set
entrySet()
Object
get(Object key)
boolean
getAutoCommit()
static Xid
getCurrentTransaction()
Returns the transaction associated with the current thread.
long
getDefaultSyncReplTimeout()
long
getLockAcquisitionTimeout()
long
getLockLeaseTimeout()
byte[]
getState()
TODO: use read lock on entire hashmap while making copy
int
getTransactionMode()
protected Object
handleClear(Xid transaction, long lock_acquisition_timeout, long lock_lease_timeout, boolean use_locks)
protected Object
handlePut(Serializable key, Serializable value, Xid transaction, long lock_acquisition_timeout, long lock_lease_timeout, boolean use_locks)
protected Object
handlePutAll(Map map, Xid transaction, long lock_acquisition_timeout, long lock_lease_timeout, boolean use_locks)
protected Object
handleRemove(Serializable key, Xid transaction, long lock_acquisition_timeout, long lock_lease_timeout, boolean use_locks)
protected void
initChannel(String groupname, String properties, long state_timeout)
boolean
isDefaultSyncRepl()
Set
keySet()
void
lclear()
Object
lput(Object key, Object value)
void
lputAll(Map m)
Object
lremove(Object key, Object value)
static void
main(args[] )
Object
put(Object key, Object value)
Replicates the update to all members.
Object
put(Object key, Object value, boolean synchronous, long timeout)
Replicates the update to all members.
Object
put(Object key, Object value, long sync_timeout, long lock_acquisition_timeout, long lock_lease_timeout, boolean commit)
Replicates the update to all members, and use locks at each member to ensure serializability.
void
putAll(Map m)
Replicates the update to all members.
void
putAll(Map m, boolean synchronous, long timeout)
Replicates the update to all members.
void
putAll(Map m, long sync_timeout, long lock_acquisition_timeout, long lock_lease_timeout, boolean commit)
Replicated the update to all members, and use locks at each member to ensure serializability.
void
receive(Message msg)
Object
receive(Xid transaction, byte[] buf, byte[] lock_info, long lock_acquisition_timeout, long lock_lease_timeout, boolean use_locks)
Receives an update.
Object
remove(Object key)
Replicates the update to all members.
Object
remove(Object key, boolean synchronous, long timeout)
Replicates the update to all members.
Object
remove(Object key, long sync_timeout, long lock_acquisition_timeout, long lock_lease_timeout, boolean commit)
Replicated the update to all members, and use locks at each member to ensure serializability.
void
rollback()
Discards all changes done within the current transaction.
void
rollback(Xid transaction)
Discard all modifications and release all locks.
void
setAutoCommit(boolean b)
void
setDefaultSyncRepl(boolean b)
Sets the default replication mode.
void
setDefaultSyncReplTimeout(long timeout)
void
setLockAcquisitionTimeout(long l)
void
setLockLeaseTimeout(long l)
void
setMembershipListener(MembershipListener ml)
void
setState(byte[] state)
TODO: use write lock on entire hashmap to set state
void
setTransactionMode(int m)
void
stop()
Leaves the group.
Collection
values()

Field Details

auto_commit

protected boolean auto_commit

channel

protected Channel channel

default_sync_repl

protected boolean default_sync_repl

default_sync_repl_timeout

protected long default_sync_repl_timeout

groupname

protected String groupname

local_addr

protected Address local_addr

lock_acquisition_timeout

protected long lock_acquisition_timeout

lock_lease_timeout

protected long lock_lease_timeout

log

protected Log log

notifs

protected List notifs

properties

protected String properties

repl_mgr

protected ReplicationManager repl_mgr

row_locks

protected HashMap row_locks

state_timeout

protected long state_timeout

table_lock

protected RWLock table_lock

transaction_mode

protected int transaction_mode

Constructor Details

TransactionalHashtable

public TransactionalHashtable(String groupname,
                              String properties,
                              long state_timeout)
            throws Exception

TransactionalHashtable

public TransactionalHashtable(String groupname,
                              String properties,
                              long state_timeout,
                              Map m)
            throws Exception

TransactionalHashtable

public TransactionalHashtable(String groupname,
                              String properties,
                              long state_timeout,
                              int initialCapacity)
            throws Exception

TransactionalHashtable

public TransactionalHashtable(String groupname,
                              String properties,
                              long state_timeout,
                              int initialCapacity,
                              float loadFactor)
            throws Exception

Method Details

addNotifier

public void addNotifier(TransactionalHashtable.Notification n)

begin

public void begin()
            throws Exception
Starts a new transaction and associates it with the current thread. Reuses the transaction if the current thread already has a transaction.

begin

public void begin(int transaction_mode)
            throws Exception
Starts a new transaction and associates it with the current thread. Reuses the transaction if the current thread already has a transaction.
Parameters:
transaction_mode - Mode in which the transaction should run. Possible values are Xid.DIRTY_READS, Xid.READ_COMMITTED, Xid.REPEATABLE_READ and Xid.SERIALIZABLE

checkResults

protected void checkResults(RspList rsps)
            throws LockingException,
                   TimeoutException
Checks whether responses from members contain exceptions or timeouts. Throws an exception if that is the case

clear

public void clear()
Replicates the update to all members. Depending on the value of default_sync_repl_timeout the update will be sent synchronously or asynchronously

clear

public void clear(boolean synchronous,
                  long timeout)
Replicates the update to all members. Depending on the value of the parameters the update will be synchronous or asynchronous.
Parameters:
synchronous - If true the update will be synchronous, ie. the caller will block until all responses have been received. If timeout is 0, we will block indefinitely (until all responses have been received), otherwise the call is guaranteed to return after at most timeout milliseconds. If false, the call will be asynchronous.
timeout - The number of milliseconds to wait for a synchronous call. 0 means to wait forever. This parameter is not used if synchronous is false.

clear

public void clear(long sync_timeout,
                  long lock_acquisition_timeout,
                  long lock_lease_timeout,
                  boolean commit)
            throws LockingException,
                   TimeoutException
Replicated the update to all members, and use locks at each member to ensure serializability. When a lock in a member cannot be acquired, a LockingException will be thrown. Typically the caller will then abort the transaction (releasing all locks) and retry.

This call can be one of many inside the same transaction, or it may be the only one. In the first case, the caller is responsible to call commit() or rollback() once the transaction is done. In the latter case, the transaction can be committed by setting commit to true.
A transaction (Xid) is always associated with the current thread. If this call is invoked, and there is no transaction associated with the current thread, a default transaction will be created. Otherwise the current transaction will be used.

Parameters:
sync_timeout - Max number of milliseconds to wait for all responses. Note that this needs to be higher than lock_acquisition_timeout. 0 means to wait forever for all responses.
lock_acquisition_timeout - Number of milliseonds to wait until a lock becomes available. Needs to be lower than sync_timeout. 0 means to wait forever. 0 will block forever in case of deadlocks. Once we have deadlock detection in place, this parameter may be deprecated.
lock_lease_timeout - Number of milliseonds until a lock is released automatically (if not released before). Not currently used.
commit - If true the transaction will be committed after this call if the call was successful.
Throws:
LockingException - Throw when one or more of the members failed acquiring the lock within lock_acquisition_timeout milliseconds
TimeoutException - Thrown when one or more of the members didn't send a response. LockingExceptions take precedence over TimeoutExceptions, e.g. if we have both locking and timeout exceptions, a LockingException will be thrown.

clone

public Object clone()

commit

public void commit()
Commits all modifications done in the current transaction (kept in temporary storage) to the hashtable. Releases all locks acquired by the current transaction.

commit

public void commit(Xid transaction)
Commit the modifications to the locally replicated data and release all locks. If the receive() call already applied the changes, then this method is a nop.
Specified by:
commit in interface ReplicationReceiver

containsKey

public boolean containsKey(Object key)

containsValue

public boolean containsValue(Object value)

entrySet

public Set entrySet()

get

public Object get(Object key)

getAutoCommit

public boolean getAutoCommit()

getCurrentTransaction

public static Xid getCurrentTransaction()
Returns the transaction associated with the current thread.
Returns:
Xid The current transaction. Null if no transaction is associated.

getDefaultSyncReplTimeout

public long getDefaultSyncReplTimeout()

getLockAcquisitionTimeout

public long getLockAcquisitionTimeout()

getLockLeaseTimeout

public long getLockLeaseTimeout()

getState

public byte[] getState()
TODO: use read lock on entire hashmap while making copy
Specified by:
getState in interface MessageListener

getTransactionMode

public int getTransactionMode()

handleClear

protected Object handleClear(Xid transaction,
                             long lock_acquisition_timeout,
                             long lock_lease_timeout,
                             boolean use_locks)
            throws LockingException,
                   UpdateException

handlePut

protected Object handlePut(Serializable key,
                           Serializable value,
                           Xid transaction,
                           long lock_acquisition_timeout,
                           long lock_lease_timeout,
                           boolean use_locks)
            throws LockingException,
                   UpdateException

handlePutAll

protected Object handlePutAll(Map map,
                              Xid transaction,
                              long lock_acquisition_timeout,
                              long lock_lease_timeout,
                              boolean use_locks)
            throws LockingException,
                   UpdateException

handleRemove

protected Object handleRemove(Serializable key,
                              Xid transaction,
                              long lock_acquisition_timeout,
                              long lock_lease_timeout,
                              boolean use_locks)
            throws LockingException,
                   UpdateException

initChannel

protected void initChannel(String groupname,
                           String properties,
                           long state_timeout)
            throws Exception

isDefaultSyncRepl

public boolean isDefaultSyncRepl()

keySet

public Set keySet()

lclear

public void lclear()
            throws LockingException,
                   TimeoutException

lput

public Object lput(Object key,
                   Object value)
            throws LockingException,
                   TimeoutException

lputAll

public void lputAll(Map m)
            throws LockingException,
                   TimeoutException

lremove

public Object lremove(Object key,
                      Object value)
            throws LockingException,
                   TimeoutException

main

public static void main(args[] )

put

public Object put(Object key,
                  Object value)
Replicates the update to all members. Depending on the value of default_sync_repl_timeout the update will be sent synchronously or asynchronously
Parameters:
key - The key to be set. Needs to be serializable. Can be null.
value - The value to be set. Needs to be serializable. Can be null.
Returns:
Object The previous value associated with the given key, or null if none was associated

put

public Object put(Object key,
                  Object value,
                  boolean synchronous,
                  long timeout)
Replicates the update to all members. Depending on the value of the parameters the update will be synchronous or asynchronous.
Parameters:
key - The key to be set. Needs to be serializable. Can be null.
value - The value to be set. Needs to be serializable. Can be null.
synchronous - If true the update will be synchronous, ie. the caller will block until all responses have been received. If timeout is 0, we will block indefinitely (until all responses have been received), otherwise the call is guaranteed to return after at most timeout milliseconds. If false, the call will be asynchronous.
timeout - The number of milliseconds to wait for a synchronous call. 0 means to wait forever. This parameter is not used if synchronous is false.
Returns:
Object The previous value associated with the given key, or null if none was associated

put

public Object put(Object key,
                  Object value,
                  long sync_timeout,
                  long lock_acquisition_timeout,
                  long lock_lease_timeout,
                  boolean commit)
            throws LockingException,
                   TimeoutException
Replicates the update to all members, and use locks at each member to ensure serializability. When a lock in a member cannot be acquired, a LockingException will be thrown. Typically the caller will then abort the transaction (releasing all locks) and retry.

This call can be one of many inside the same transaction, or it may be the only one. In the first case, the caller is responsible to call commit() or rollback() once the transaction is done. In the latter case, the transaction can be committed by setting commit to true.
A transaction (Xid) is always associated with the current thread. If this call is invoked, and there is no transaction associated with the current thread, a default transaction will be created. Otherwise the current transaction will be used.

Parameters:
key - The key to be set. Needs to be serializable. Can be null.
value - The value to be set. Needs to be serializable. Can be null.
sync_timeout - Max number of milliseconds to wait for all responses. Note that this needs to be higher than lock_acquisition_timeout. 0 means to wait forever for all responses.
lock_acquisition_timeout - Number of milliseonds to wait until a lock becomes available. Needs to be lower than sync_timeout. 0 means to wait forever. 0 will block forever in case of deadlocks. Once we have deadlock detection in place, this parameter may be deprecated.
lock_lease_timeout - Number of milliseonds until a lock is released automatically (if not released before). Not currently used.
commit - If true the transaction will be committed after this call if the call was successful.
Returns:
Object The previous value associated with the given key, or null if none was associated
Throws:
LockingException - Throw when one or more of the members failed acquiring the lock within lock_acquisition_timeout milliseconds
TimeoutException - Thrown when one or more of the members didn't send a response. LockingExceptions take precedence over TimeoutExceptions, e.g. if we have both locking and timeout exceptions, a LockingException will be thrown.

putAll

public void putAll(Map m)
Replicates the update to all members. Depending on the value of default_sync_repl_timeout the update will be sent synchronously or asynchronously
Parameters:
m - The map to be set. All entries need to be serializable. Cannot be null.

putAll

public void putAll(Map m,
                   boolean synchronous,
                   long timeout)
Replicates the update to all members. Depending on the value of the parameters the update will be synchronous or asynchronous.
Parameters:
m - The map to be set. All entries need to be serializable. Cannot be null.
synchronous - If true the update will be synchronous, ie. the caller will block until all responses have been received. If timeout is 0, we will block indefinitely (until all responses have been received), otherwise the call is guaranteed to return after at most timeout milliseconds. If false, the call will be asynchronous.
timeout - The number of milliseconds to wait for a synchronous call. 0 means to wait forever. This parameter is not used if synchronous is false.

putAll

public void putAll(Map m,
                   long sync_timeout,
                   long lock_acquisition_timeout,
                   long lock_lease_timeout,
                   boolean commit)
            throws LockingException,
                   TimeoutException
Replicated the update to all members, and use locks at each member to ensure serializability. When a lock in a member cannot be acquired, a LockingException will be thrown. Typically the caller will then abort the transaction (releasing all locks) and retry.

This call can be one of many inside the same transaction, or it may be the only one. In the first case, the caller is responsible to call commit() or rollback() once the transaction is done. In the latter case, the transaction can be committed by setting commit to true.
A transaction (Xid) is always associated with the current thread. If this call is invoked, and there is no transaction associated with the current thread, a default transaction will be created. Otherwise the current transaction will be used.

Parameters:
m - The map to be set. All entries need to be serializable. Cannot be null.
sync_timeout - Max number of milliseconds to wait for all responses. Note that this needs to be higher than lock_acquisition_timeout. 0 means to wait forever for all responses.
lock_acquisition_timeout - Number of milliseonds to wait until a lock becomes available. Needs to be lower than sync_timeout. 0 means to wait forever. 0 will block forever in case of deadlocks. Once we have deadlock detection in place, this parameter may be deprecated.
lock_lease_timeout - Number of milliseonds until a lock is released automatically (if not released before). Not currently used.
commit - If true the transaction will be committed after this call if the call was successful.
Throws:
LockingException - Throw when one or more of the members failed acquiring the lock within lock_acquisition_timeout milliseconds
TimeoutException - Thrown when one or more of the members didn't send a response. LockingExceptions take precedence over TimeoutExceptions, e.g. if we have both locking and timeout exceptions, a LockingException will be thrown.

receive

public void receive(Message msg)
Specified by:
receive in interface MessageListener

receive

public Object receive(Xid transaction,
                      byte[] buf,
                      byte[] lock_info,
                      long lock_acquisition_timeout,
                      long lock_lease_timeout,
                      boolean use_locks)
            throws LockingException,
                   UpdateException
Receives an update. Handles the update depending on whether locks are to be used (use_locks):
  1. No locks: simply apply the update to the hashmap
  2. Use locks: lock the corresponding resource (e.g. entire table in case of clear(), or individual row in case of (remove()) and apply the update.If the lock cannot be acquired, throw a LockingException.
Specified by:
receive in interface ReplicationReceiver

remove

public Object remove(Object key)
Replicates the update to all members. Depending on the value of default_sync_repl_timeout the update will be sent synchronously or asynchronously
Parameters:
key - The key to be set. Needs to be serializable. Can be null.
Returns:
Object The previous value associated with the given key, or null if none was associated

remove

public Object remove(Object key,
                     boolean synchronous,
                     long timeout)
Replicates the update to all members. Depending on the value of the parameters the update will be synchronous or asynchronous.
Parameters:
key - The key to be set. Needs to be serializable. Can be null.
synchronous - If true the update will be synchronous, ie. the caller will block until all responses have been received. If timeout is 0, we will block indefinitely (until all responses have been received), otherwise the call is guaranteed to return after at most timeout milliseconds. If false, the call will be asynchronous.
timeout - The number of milliseconds to wait for a synchronous call. 0 means to wait forever. This parameter is not used if synchronous is false.
Returns:
Object The previous value associated with the given key, or null if none was associated

remove

public Object remove(Object key,
                     long sync_timeout,
                     long lock_acquisition_timeout,
                     long lock_lease_timeout,
                     boolean commit)
            throws LockingException,
                   TimeoutException
Replicated the update to all members, and use locks at each member to ensure serializability. When a lock in a member cannot be acquired, a LockingException will be thrown. Typically the caller will then abort the transaction (releasing all locks) and retry.

This call can be one of many inside the same transaction, or it may be the only one. In the first case, the caller is responsible to call commit() or rollback() once the transaction is done. In the latter case, the transaction can be committed by setting commit to true.
A transaction (Xid) is always associated with the current thread. If this call is invoked, and there is no transaction associated with the current thread, a default transaction will be created. Otherwise the current transaction will be used.

Parameters:
key - The key to be set. Needs to be serializable. Can be null.
sync_timeout - Max number of milliseconds to wait for all responses. Note that this needs to be higher than lock_acquisition_timeout. 0 means to wait forever for all responses.
lock_acquisition_timeout - Number of milliseonds to wait until a lock becomes available. Needs to be lower than sync_timeout. 0 means to wait forever. 0 will block forever in case of deadlocks. Once we have deadlock detection in place, this parameter may be deprecated.
lock_lease_timeout - Number of milliseonds until a lock is released automatically (if not released before). Not currently used.
commit - If true the transaction will be committed after this call if the call was successful.
Returns:
Object The previous value associated with the given key, or null if none was associated
Throws:
LockingException - Throw when one or more of the members failed acquiring the lock within lock_acquisition_timeout milliseconds
TimeoutException - Thrown when one or more of the members didn't send a response. LockingExceptions take precedence over TimeoutExceptions, e.g. if we have both locking and timeout exceptions, a LockingException will be thrown.

rollback

public void rollback()
Discards all changes done within the current transaction. Releases all locks acquired by the current transaction.

rollback

public void rollback(Xid transaction)
Discard all modifications and release all locks. If the receive() call already applied the changes, this method will not be able to rollback the modifications, but will only release the locks.
Specified by:
rollback in interface ReplicationReceiver

setAutoCommit

public void setAutoCommit(boolean b)

setDefaultSyncRepl

public void setDefaultSyncRepl(boolean b)
Sets the default replication mode. This will be used if the methods inherited from HashMap are used. However, if one of the methods provided by TransactionalHashtable are used, they will override the default mode.

setDefaultSyncReplTimeout

public void setDefaultSyncReplTimeout(long timeout)

setLockAcquisitionTimeout

public void setLockAcquisitionTimeout(long l)

setLockLeaseTimeout

public void setLockLeaseTimeout(long l)

setMembershipListener

public void setMembershipListener(MembershipListener ml)

setState

public void setState(byte[] state)
TODO: use write lock on entire hashmap to set state
Specified by:
setState in interface MessageListener

setTransactionMode

public void setTransactionMode(int m)

stop

public void stop()
Leaves the group. The instance is unusable after this call, ie. a new instance should be created. Behavior is undefined if the instance is still used after this call.

values

public Collection values()

Copyright B) 2001,2002 www.jgroups.com . All Rights Reserved.