EMMA Coverage Report (generated Tue Apr 17 08:51:20 BST 2007)
[all classes][org.jtoolkit.essence.data.impl]

COVERAGE SUMMARY FOR SOURCE FILE [ClusterImpl.java]

nameclass, %method, %block, %line, %
ClusterImpl.java100% (9/9)74%  (86/116)64%  (2146/3334)72%  (422.1/589)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ClusterImpl$ClusterCommitCallback100% (1/1)100% (3/3)43%  (92/215)55%  (16.6/30)
completeChangesToOtherServers (Map, long): void 100% (1/1)33%  (56/170)40%  (7.6/19)
onCallback (Map): void 100% (1/1)77%  (30/39)80%  (8/10)
ClusterImpl$ClusterCommitCallback (ClusterImpl): void 100% (1/1)100% (6/6)100% (1/1)
     
class ClusterImpl$MapView100% (1/1)43%  (13/30)49%  (234/479)54%  (53.1/99)
addListener (CacheListener): void 0%   (0/1)0%   (0/14)0%   (0/2)
entrySet (): Set 0%   (0/1)0%   (0/4)0%   (0/1)
evict (): void 0%   (0/1)0%   (0/1)0%   (0/1)
getCacheStatistics (): CacheStatistics 0%   (0/1)0%   (0/2)0%   (0/1)
getListenerSet (): Event$ListenerSet 0%   (0/1)0%   (0/4)0%   (0/1)
load (Object): void 0%   (0/1)0%   (0/1)0%   (0/1)
loadAll (Collection): void 0%   (0/1)0%   (0/1)0%   (0/1)
peek (Object): Object 0%   (0/1)0%   (0/4)0%   (0/1)
putIfAbsent (Object, Object): Object 0%   (0/1)0%   (0/6)0%   (0/1)
remove (Object, Object): boolean 0%   (0/1)0%   (0/8)0%   (0/1)
removeListener (CacheListener): void 0%   (0/1)0%   (0/14)0%   (0/2)
removeMatching (Predicate): int 0%   (0/1)0%   (0/29)0%   (0/6)
replace (Object, Object): Object 0%   (0/1)0%   (0/7)0%   (0/1)
replace (Object, Object, Object): boolean 0%   (0/1)0%   (0/9)0%   (0/1)
replace0 (Object, Object, boolean, Object): Object 0%   (0/1)0%   (0/72)0%   (0/13)
selectMatching (Predicate): Map 0%   (0/1)0%   (0/5)0%   (0/1)
visit (Visitor): Object 0%   (0/1)0%   (0/29)0%   (0/8)
remove0 (Object, boolean, Object): Object 100% (1/1)79%  (64/81)91%  (15.4/17)
clear (): void 100% (1/1)80%  (36/45)97%  (11.6/12)
put0 (Object, Object, boolean): Object 100% (1/1)88%  (68/77)93%  (13.1/14)
ClusterImpl$MapView (ClusterImpl, String, BackingStore): void 100% (1/1)100% (12/12)100% (4/4)
containsKey (Object): boolean 100% (1/1)100% (5/5)100% (1/1)
get (Object): Object 100% (1/1)100% (5/5)100% (1/1)
getAll (Collection): Map 100% (1/1)100% (5/5)100% (1/1)
getCacheEntry (Object): CacheEntry 100% (1/1)100% (5/5)100% (1/1)
getName (): String 100% (1/1)100% (14/14)100% (1/1)
keySet (): Set 100% (1/1)100% (4/4)100% (1/1)
put (Object, Object): Object 100% (1/1)100% (6/6)100% (1/1)
remove (Object): Object 100% (1/1)100% (6/6)100% (1/1)
size (): int 100% (1/1)100% (4/4)100% (1/1)
     
class ClusterImpl$QueueView100% (1/1)88%  (7/8)68%  (80/118)69%  (18.6/27)
offer (Object): boolean 0%   (0/1)0%   (0/34)0%   (0/8)
peek (): Object 100% (1/1)85%  (11/13)92%  (2.8/3)
poll (): Object 100% (1/1)92%  (22/24)97%  (5.8/6)
ClusterImpl$QueueView (ClusterImpl, String, BackingStore): void 100% (1/1)100% (12/12)100% (4/4)
clear (): void 100% (1/1)100% (12/12)100% (3/3)
getName (): String 100% (1/1)100% (14/14)100% (1/1)
iterator (): Iterator 100% (1/1)100% (5/5)100% (1/1)
size (): int 100% (1/1)100% (4/4)100% (1/1)
     
class ClusterImpl100% (1/1)84%  (52/62)68%  (1590/2344)76%  (305.9/402)
access$900 (ClusterImpl): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
getCacheView (String): Cache 0%   (0/1)0%   (0/4)0%   (0/1)
getConnectedHosts (): Set 0%   (0/1)0%   (0/4)0%   (0/1)
getNames (): Set 0%   (0/1)0%   (0/4)0%   (0/1)
getStatusFailedMesg (String, Exception): String 0%   (0/1)0%   (0/16)0%   (0/1)
isClosed (): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
removeMatching (String, Predicate): int 0%   (0/1)0%   (0/26)0%   (0/4)
selectMatching (String, Predicate): Map 0%   (0/1)0%   (0/6)0%   (0/1)
throwUnableToCreate (String, Exception): ClusterStore 0%   (0/1)0%   (0/13)0%   (0/1)
warnExThrown (Exception): void 0%   (0/1)0%   (0/13)0%   (0/2)
reportEntriesUpdated (int, String, String): void 100% (1/1)10%  (3/29)47%  (1.4/3)
getBackingStoreOrError (String): BackingStore 100% (1/1)32%  (8/25)75%  (3/4)
getClusterCollectionData (String): ClusterComponentBuilder$ClusterCollectionData 100% (1/1)41%  (13/32)71%  (2.8/4)
connectionMonitor (): void 100% (1/1)41%  (93/227)58%  (16.9/29)
checkConnections (boolean): void 100% (1/1)49%  (196/397)68%  (39.6/58)
getValue0 (String): Object 100% (1/1)56%  (5/9)67%  (2/3)
acquireMaster (String): void 100% (1/1)63%  (138/218)77%  (26/34)
getClusterStore (String): ClusterStore 100% (1/1)65%  (28/43)55%  (6/11)
createBackingStore (String, ClusterComponentBuilder$ClusterCollectionData): B... 100% (1/1)69%  (40/58)43%  (3/7)
remotePerformChanges (Map): int 100% (1/1)73%  (51/70)72%  (13/18)
waitUntilAllMastersTried (String): void 100% (1/1)74%  (53/72)76%  (8.4/11)
clusterRemoveAll (String, Set): void 100% (1/1)76%  (51/67)86%  (12/14)
getBackingStore (String): BackingStore 100% (1/1)77%  (43/56)67%  (10/15)
getClusterHosts (): Set 100% (1/1)77%  (10/13)76%  (0.8/1)
evict (): void 100% (1/1)82%  (27/33)78%  (7/9)
equals2 (Object, Object): boolean 100% (1/1)83%  (10/12)83%  (0.8/1)
doEvict (): void 100% (1/1)84%  (47/56)97%  (12.7/13)
start (): void 100% (1/1)84%  (106/126)75%  (12.8/17)
getOtherHosts (): Set 100% (1/1)87%  (20/23)95%  (2.9/3)
getQueueView (String): Queue 100% (1/1)88%  (37/42)88%  (7/8)
getMapView (String): ClusterMap 100% (1/1)90%  (47/52)90%  (9/10)
remotePerformChanges (String, long, Map): void 100% (1/1)91%  (126/139)86%  (25.8/30)
ClusterImpl (String, String, Map, Map): void 100% (1/1)91%  (151/166)92%  (33.1/36)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
access$000 (ClusterImpl): void 100% (1/1)100% (3/3)100% (1/1)
access$100 (ClusterImpl): String 100% (1/1)100% (3/3)100% (1/1)
access$1000 (): Log 100% (1/1)100% (2/2)100% (1/1)
access$1100 (ClusterImpl): Map 100% (1/1)100% (3/3)100% (1/1)
access$1200 (ClusterImpl): boolean 100% (1/1)100% (3/3)100% (1/1)
access$1300 (ClusterImpl): void 100% (1/1)100% (3/3)100% (1/1)
access$200 (ClusterImpl): boolean 100% (1/1)100% (3/3)100% (1/1)
access$300 (ClusterImpl, String): void 100% (1/1)100% (4/4)100% (1/1)
access$400 (Object, Object): boolean 100% (1/1)100% (4/4)100% (1/1)
access$500 (ClusterImpl, String, Object, Object): void 100% (1/1)100% (6/6)100% (1/1)
access$600 (ClusterImpl, String, Object): void 100% (1/1)100% (5/5)100% (1/1)
access$700 (ClusterImpl): boolean 100% (1/1)100% (3/3)100% (1/1)
access$800 (ClusterImpl): String 100% (1/1)100% (3/3)100% (1/1)
cleanupMetaData (long): int 100% (1/1)100% (27/27)100% (6/6)
close (): void 100% (1/1)100% (10/10)100% (4/4)
clusterPut (String, Object, Object): void 100% (1/1)100% (7/7)100% (2/2)
clusterRemove (String, Object): void 100% (1/1)100% (7/7)100% (2/2)
createClusterStore (String): ClusterStore 100% (1/1)100% (6/6)100% (1/1)
getAvailableMasters (): Set 100% (1/1)100% (29/29)100% (5/5)
getListenerSet (): Event$ListenerSet 100% (1/1)100% (6/6)100% (2/2)
getListenerSet (String): Event$ListenerSet 100% (1/1)100% (8/8)100% (2/2)
getName (): String 100% (1/1)100% (3/3)100% (1/1)
getValue (String): Object 100% (1/1)100% (21/21)100% (5/5)
infoMastersAvailable (String): void 100% (1/1)100% (22/22)100% (2/2)
isAMaster (): boolean 100% (1/1)100% (6/6)100% (1/1)
isASlave (): boolean 100% (1/1)100% (6/6)100% (1/1)
remoteGetSnapshot (): Map 100% (1/1)100% (29/29)100% (4/4)
stop (): void 100% (1/1)100% (51/51)100% (10/10)
     
class ClusterImpl$CacheListenerWrapper100% (1/1)67%  (4/6)77%  (60/78)88%  (15/17)
equals (Object): boolean 0%   (0/1)0%   (0/14)0%   (0/1)
hashCode (): int 0%   (0/1)0%   (0/4)0%   (0/1)
ClusterImpl$CacheListenerWrapper (Store, CacheListener): void 100% (1/1)100% (9/9)100% (4/4)
onRemoved (Object, Object): void 100% (1/1)100% (5/5)100% (2/2)
onStatusEvent (boolean): void 100% (1/1)100% (41/41)100% (7/7)
onUpdate (Object, Object): void 100% (1/1)100% (5/5)100% (2/2)
     
class ClusterImpl$ConnectionMonitor100% (1/1)100% (2/2)83%  (19/23)88%  (7/8)
run (): void 100% (1/1)71%  (10/14)80%  (4/5)
ClusterImpl$ConnectionMonitor (ClusterImpl): void 100% (1/1)100% (9/9)100% (3/3)
     
class ClusterImpl$2100% (1/1)100% (1/1)87%  (41/47)87%  (0.9/1)
<static initializer> 100% (1/1)87%  (41/47)87%  (0.9/1)
     
class ClusterImpl$1100% (1/1)100% (2/2)100% (10/10)100% (3/3)
ClusterImpl$1 (ClusterImpl): void 100% (1/1)100% (6/6)100% (1/1)
run (): void 100% (1/1)100% (4/4)100% (2/2)
     
class ClusterImpl$ClusterCommitCallback$1100% (1/1)100% (2/2)100% (20/20)100% (3/3)
ClusterImpl$ClusterCommitCallback$1 (ClusterImpl$ClusterCommitCallback, Map, ... 100% (1/1)100% (12/12)100% (1/1)
run (): void 100% (1/1)100% (8/8)100% (2/2)

1package org.jtoolkit.essence.data.impl;
2 
3import org.apache.commons.logging.Log;
4import static org.apache.commons.logging.LogFactory.getLog;
5import org.jetbrains.annotations.NotNull;
6import org.jetbrains.annotations.Nullable;
7import org.jtoolkit.essence.app.net.DataSocketFactory;
8import org.jtoolkit.essence.app.net.NetObject;
9import org.jtoolkit.essence.app.net.ServerDataSocket;
10import org.jtoolkit.essence.app.Main;
11import org.jtoolkit.essence.concurrency.Callback;
12import static org.jtoolkit.essence.concurrency.Threads.createSingleSES;
13import org.jtoolkit.essence.concurrency.Timeout;
14import org.jtoolkit.essence.data.*;
15import static org.jtoolkit.essence.data.Transaction.*;
16import org.jtoolkit.essence.utils.*;
17import static org.jtoolkit.essence.utils.StringUtils.isSet;
18 
19import javax.cache.Cache;
20import javax.cache.CacheEntry;
21import javax.cache.CacheListener;
22import javax.cache.CacheStatistics;
23import java.io.IOException;
24import java.lang.ref.Reference;
25import java.lang.ref.WeakReference;
26import java.util.*;
27import java.util.concurrent.ConcurrentHashMap;
28import java.util.concurrent.Future;
29import java.util.concurrent.ScheduledExecutorService;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.locks.Condition;
32import java.util.concurrent.locks.Lock;
33import java.util.concurrent.locks.ReentrantLock;
34 
35/*
36   Copyright 2006 Peter Lawrey
37 
38   Licensed under the Apache License, Version 2.0 (the "License");
39   you may not use this file except in compliance with the License.
40   You may obtain a copy of the License at
41 
42       http://www.apache.org/licenses/LICENSE-2.0
43 
44   Unless required by applicable law or agreed to in writing, software
45   distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
46   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47   See the License for the specific language governing permissions and
48   limitations under the License.
49*/
50/**
51 * @author Peter Lawrey
52 */
53@SuppressWarnings({"unchecked", "ClassWithTooManyFields", "ClassWithTooManyMethods", "OverlyComplexClass", "OverlyCoupledClass"})
54public class ClusterImpl implements Cluster {
55    private static final Log LOG = getLog(ClusterImpl.class);
56    public static final long CHECK_PERIOD_MS = 1000;
57 
58    private final String name;
59    private final String logicalHostname;
60    private final Map<String, BackingStore> backingStores = new ConcurrentHashMap<String, BackingStore>(256, 0.5f, 16);
61    @Nullable
62    private final Map<String, ClusterComponentBuilder.ClusterData> clusterData;
63    private final Map<String, ClusterStore> clusterStores = new LinkedHashMap<String, ClusterStore>();
64    @Nullable
65    private final Map<String, ClusterComponentBuilder.ClusterCollectionData> collectionData;
66    private final Callback<Map<String, Map>> commitCallback = new ClusterCommitCallback();
67    private final Map<String, Object> componentMap = new LinkedHashMap<String, Object>();
68    private final Event.ListenerSet listenerSet = new ListenerSetImpl();
69    private final Map<String, NetObject<ClusterImpl>> otherServers = new ConcurrentHashMap<String, NetObject<ClusterImpl>>();
70    private final Map<String, Queue> queueViews = new LinkedHashMap<String, Queue>();
71    private final Map<String, ClusterMap> mapViews = new ConcurrentHashMap<String, ClusterMap>();
72 
73    private final Set<String> masters;
74    private final Set<String> slaves;
75    private final boolean distributedCluster;
76 
77    private final ScheduledExecutorService executor;
78 
79    private final ReentrantLock lock = new ReentrantLock();
80    private final Condition allMastersTriedCondition = lock.newCondition();
81    private boolean allMastersTried = false;
82    @Nullable private Future connectionMonitoring = null;
83    @Nullable private ServerDataSocket sds = null;
84    @Nullable private Factory dsf = null;
85    private boolean running = false;
86    private boolean closed = false;
87 
88    public ClusterImpl(@NotNull String name, @NotNull String logicalHostname, @Nullable Map<String, ClusterComponentBuilder.ClusterData> clusterData, @Nullable Map<String, ClusterComponentBuilder.ClusterCollectionData> collectionData) {
89        this.name = name;
90        this.logicalHostname = logicalHostname;
91        this.clusterData = clusterData;
92        this.collectionData = collectionData;
93        masters = new LinkedHashSet<String>();
94        slaves = new LinkedHashSet<String>();
95        if (clusterData == null || clusterData.isEmpty()) {
96            masters.add(logicalHostname);
97            distributedCluster = false;
98        } else {
99            for (ClusterComponentBuilder.ClusterData cd : clusterData.values()) {
100                if (cd.role == ClusterComponentBuilder.RoleType.MASTER)
101                    masters.add(cd.logicalHostname);
102                if (cd.role == ClusterComponentBuilder.RoleType.SLAVE)
103                    slaves.add(cd.logicalHostname);
104            }
105            distributedCluster = true;
106        }
107        if (isAMaster())
108            listenerSet.notifyReset(true);
109        executor = createSingleSES(logicalHostname + '$' + name);
110    }
111 
112    private void acquireMaster(String master) throws Timeout.TimeoutException {
113        Transaction t = null;
114        NetObject<ClusterImpl> netObject2 = null;
115        try {
116            t = Transaction.start(master, new Timeout(2000));
117            NetObject<ClusterImpl> netObject = otherServers.get(master);
118            if (netObject != null && !netObject.isClosed()) return;
119            if (LOG.isDebugEnabled()) LOG.debug(name + ": Connecting to " + master);
120            if (clusterData == null) throw new InternalError();
121            ClusterComponentBuilder.ClusterData cd = clusterData.get(master);
122            if (cd == null) throw new IllegalStateException("Could not find " + master + " in " + clusterData.keySet());
123            t.complete();
124 
125            t = Transaction.start(master, new Timeout(10 * 1000));
126            try {
127                netObject2 = new NetObject<ClusterImpl>(master, "", new DataSocketFactory(logicalHostname, cd.connectUrl));
128            } finally {
129                t.complete();
130            }
131 
132            // suck down data.
133            if (masters.contains(master)) {
134                Map<String, Collection<MetaData<Object, Object>>> allData = netObject2.call("remoteGetSnapshot");
135                int count = remotePerformChanges(allData);
136                reportEntriesUpdated(count, "bootstrap", master);
137            }
138            // push up data.
139            if (masters.contains(logicalHostname)) {
140                int count = ((Number) netObject2.call("remotePerformChanges", remoteGetSnapshot())).intValue();
141                reportEntriesUpdated(count, "bootstrap push", master);
142            }
143            otherServers.put(master, netObject2);
144            if (LOG.isDebugEnabled())
145                LOG.debug(Thread.currentThread().getName() + ": name=" + name + " adding to otherServers " + otherServers);
146 
147            netObject2 = null;  // so that close in finally doesn't close it.
148        } catch (VisitorException e) {
149            if (e.getCause() instanceof Timeout.TimeoutException)
150                throw (Timeout.TimeoutException) e.getCause();
151            throw e;
152        } catch (InterruptedException e) {
153            throw new Timeout.TimeoutException(e.toString());
154        } finally {
155            IOUtils.close(netObject2);
156            if (t != null) t.complete();
157        }
158    }
159 
160    private void reportEntriesUpdated(int count, String type, String master) {
161        if (count > 0 && LOG.isInfoEnabled()) {
162            LOG.info(logicalHostname + ": Perfomed " + type + " from " + master + " updated " + count + " entries.");
163        }
164    }
165 
166    private int cleanupMetaData(long time) {
167        int count = 0;
168        for (String key : backingStores.keySet()) {
169            BackingStore<Object, Object> store = getBackingStore(key);
170            count += store.expireMetaData(time);
171        }
172        return count;
173    }
174 
175    private void clusterPut(String collectionName, Object key, Object value) {
176        include(commitCallback, collectionName, key, value);
177    }
178 
179    void clusterRemoveAll(String collectionName, Set keys) {
180        if (keys.isEmpty()) return;
181        if (keys.size() == 1) {
182            clusterRemove(collectionName, keys.iterator().next());
183            return;
184        }
185        Map map1 = new LinkedHashMap();
186        for (Object key : keys) {
187            if (key == null) {
188                LOG.warn(name + ": Illegal key null ignored.", new Throwable("here"));
189                continue;
190            }
191            map1.put(key, null);
192        }
193 
194        Map<String, Map> map2 = new LinkedHashMap<String, Map>();
195        map2.put(collectionName, map1);
196        include(commitCallback, map2, null);
197    }
198 
199    private void clusterRemove(String collectionName, Object key) {
200        include(commitCallback, collectionName, key, null);
201    }
202 
203    private <K, V> BackingStore createBackingStore(String collectionName, ClusterComponentBuilder.ClusterCollectionData cd) throws IOException {
204        switch (cd.storeType) {
205            case FILE_SET:
206                return new FileSetBackingStore<K, V>(name + '.' + collectionName, cd, executor);
207            case JDBC:
208                break;
209            case MEMORY:
210                return new MemoryBackingStore<K, V>(name + '.' + collectionName, cd);
211            case NET:
212                break;
213            case SINGLE_FILE:
214                break;
215            case TABS:
216                break;
217        }
218        throw new IllegalArgumentException("Store type " + cd.storeType + " not supported.");
219    }
220 
221    private <K, V> ClusterStore<K, V> createClusterStore(String collectionName) {
222        return new ClusterStore<K, V>(collectionName, this);
223    }
224 
225    public Set<String> getAvailableMasters() {
226        Set<String> ret = new LinkedHashSet<String>();
227        for (Map.Entry<String, NetObject<ClusterImpl>> entry : otherServers.entrySet()) {
228            if (!entry.getValue().isClosed())
229                ret.add(entry.getKey());
230        }
231        return ret;
232    }
233 
234    <K, V> BackingStore<K, V> getBackingStore(String collectionName) throws IllegalArgumentException, IllegalStateException {
235        BackingStore store = backingStores.get(collectionName);
236        if (store != null)
237            return store;
238        synchronized (backingStores) {
239            BackingStore store2 = backingStores.get(collectionName);
240            if (store2 == null) {
241                ClusterComponentBuilder.ClusterCollectionData cd = getClusterCollectionData(collectionName);
242                try {
243                    backingStores.put(collectionName, store2 = createBackingStore(collectionName, cd));
244                } catch (IllegalArgumentException e) {
245                    throw e;
246                } catch (Exception e) {
247                    throwUnableToCreate(collectionName, e);
248                }
249            }
250            return store2;
251        }
252    }
253 
254    private ClusterComponentBuilder.ClusterCollectionData getClusterCollectionData(String collectionName) {
255        ClusterComponentBuilder.ClusterCollectionData cd = collectionData == null ? null : collectionData.get(collectionName);
256        if (cd == null)
257            throw new IllegalArgumentException("No data " + collectionName + " defined in cluster " + name);
258        return cd;
259    }
260 
261    public <K, V> ClusterStore<K, V> getClusterStore(String collectionName) throws IllegalArgumentException, IllegalStateException {
262//        waitUntilAllMastersTried(collectionName);
263        synchronized (clusterStores) {
264            ClusterStore store = clusterStores.get(collectionName);
265            if (store == null) {
266                try {
267                    clusterStores.put(collectionName, store = createClusterStore(collectionName));
268                } catch (IllegalArgumentException e) {
269                    throw e;
270                } catch (Exception e) {
271                    return throwUnableToCreate(collectionName, e);
272                }
273            }
274            return store;
275        }
276    }
277 
278    private static <K, V> ClusterStore<K, V> throwUnableToCreate(String collectionName, Exception e) {
279        throw new IllegalStateException("Unable to create " + collectionName, e);
280    }
281 
282    private Set<String> getClusterHosts() {
283        // slaves only connect to masters which distribute changes.
284        return clusterData == null || isASlave() ? masters : clusterData.keySet();
285    }
286 
287    public Set<String> getConnectedHosts() {
288        // Servers currently connected
289        return otherServers.keySet();
290    }
291 
292    private Set<String> getOtherHosts() {
293        // slaves only connect to masters which distribute changes.
294        Set<String> ret = new LinkedHashSet<String>(clusterData == null || isASlave() ? masters : clusterData.keySet());
295        ret.remove(logicalHostname);
296        return ret;
297    }
298 
299    @NotNull
300    public Event.ListenerSet getListenerSet() {
301        waitUntilAllMastersTried("listenerSet");
302        return listenerSet;
303    }
304 
305    @NotNull
306    public Event.ListenerSet getListenerSet(String collectionName) {
307        waitUntilAllMastersTried("listenerSet2");
308        return getBackingStore(collectionName).getListenerSet();
309    }
310 
311    public <K, V> ClusterMap<K, V> getMapView(String collectionName) {
312        waitUntilAllMastersTried(collectionName);
313        ClusterMap<K, V> map = mapViews.get(collectionName);
314        if (map != null) return map;
315 
316        synchronized (mapViews) {
317            ClusterMap<K, V> map2 = mapViews.get(collectionName);
318            if (map2 == null) {
319                BackingStore<K, V> store = getBackingStoreOrError(collectionName);
320                mapViews.put(collectionName, map2 = new MapView<K, V>(collectionName, store));
321            }
322            return map2;
323        }
324    }
325 
326    public <K, V> Cache<K, V> getCacheView(String collectionName) throws IllegalArgumentException, IllegalStateException {
327        return (Cache<K, V>) getMapView(collectionName);
328    }
329 
330    @NotNull
331    public String getName() {
332        return name;
333    }
334 
335    @NotNull
336    public Set<String> getNames() {
337        return componentMap.keySet();
338    }
339 
340    public <E> Queue<E> getQueueView(String collectionName) {
341        waitUntilAllMastersTried(collectionName);
342        synchronized (queueViews) {
343            Queue<E> queue = queueViews.get(collectionName);
344            if (queue == null) {
345                BackingStore<Object, E> store = getBackingStoreOrError(collectionName);
346                queueViews.put(collectionName, queue = new QueueView<Object, E>(collectionName, store));
347            }
348            return queue;
349        }
350    }
351 
352    private <K, V> BackingStore<K, V> getBackingStoreOrError(String collectionName) {
353        BackingStore<K, V> store = getBackingStore(collectionName);
354        if (store == null)
355            throw new IllegalArgumentException("No data " + collectionName + " defined in cluster " + name);
356        return store;
357    }
358 
359    private void waitUntilAllMastersTried(String collectionName) {
360        if (allMastersTried || connectionMonitoring == null) return;
361        RWLock.checkUnlocked("waitUntilAllMastersTried");
362        try {
363            lock.lock();
364            while (!allMastersTriedCondition.await(2000, TimeUnit.MILLISECONDS) && !allMastersTried)
365                if (LOG.isInfoEnabled())
366                    LOG.info(logicalHostname + ": .... Waiting for " + name + " to have allMastersTried to get " + collectionName + " allMasterTried=" + allMastersTried);
367        } catch (InterruptedException ignored) {
368            throw new IllegalStateException("Interupted waiting for " + collectionName);
369        } finally {
370            lock.unlock();
371        }
372    }
373 
374    public Object getValue(@NotNull String name) {
375        // don't wait or a deadlock will occur attempting to boostrap. waitUntilAllMastersTried(name);
376        Object obj = componentMap.get(name);
377        if (obj == null) {
378            obj = getValue0(name);
379            if (obj != null) componentMap.put(name, obj);
380        }
381        return obj;
382    }
383 
384    @Nullable
385    private Object getValue0(String name) {
386        if (name.length() == 0)
387            return this;
388        return getClusterStore(name);
389    }
390 
391    private boolean isAMaster() {
392        return masters.contains(logicalHostname);
393    }
394 
395    private boolean isASlave() {
396        return slaves.contains(logicalHostname);
397    }
398 
399    public <K, V> Map<K, V> selectMatching(String collectionName, Predicate<? super MetaData<K, V>> predicate) {
400        return (Map<K, V>) getBackingStore(collectionName).selectMatching((Predicate) predicate);
401    }
402 
403    public <K, V> int removeMatching(String collectionName, Predicate<? super MetaData<K, V>> predicate) {
404        Map<K, V> entries = (Map) getBackingStore(collectionName).selectMatching((Predicate) predicate);
405        for (Map.Entry<K, V> entry : entries.entrySet())
406            clusterRemove(collectionName, entry.getKey());
407        return entries.size();
408    }
409 
410    public void start() throws IllegalStateException, IllegalArgumentException {
411        if (closed) throw new IllegalStateException(CLOSED);
412        if (running) return;
413        if (clusterData != null && sds == null) {
414            for (ClusterComponentBuilder.ClusterData cd : clusterData.values())
415                if (cd.logicalHostname.equals(logicalHostname) && isSet(cd.connectUrl)) {
416                    int port = Integer.parseInt(cd.connectUrl.split(":")[1]);
417                    try {
418                        String serverName = logicalHostname + '$' + cd.cluster;
419                        sds = new ServerDataSocket(serverName, port, dsf = new NetObject.ServerProcessorFactory(this));
420                    } catch (IOException e) {
421                        throw new IllegalStateException("Could not start cluster server", e);
422                    }
423                    break;
424                }
425            for (BackingStore bs : backingStores.values())
426                bs.start();
427        }
428        running = true;
429        connectionMonitoring = executor.scheduleAtFixedRate(new ConnectionMonitor(this), 0, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
430        executor.scheduleAtFixedRate(new Runnable() {
431            public void run() {
432                doEvict();
433            }
434        }, CHECK_PERIOD_MS, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
435    }
436 
437    private void doEvict() {
438        for (Map.Entry<String, BackingStore> entry : backingStores.entrySet()) {
439            BackingStore store = entry.getValue();
440            Lock writeLock = store.getLock().writeLock();
441            Transaction t = null;
442            try {
443                writeLock.lock();
444                t = Transaction.start("eviction");
445 
446                Set collection = store.evict();
447 
448                clusterRemoveAll(entry.getKey(), collection);
449                t.commit();
450            } finally {
451                writeLock.unlock();
452                if (t != null) t.complete();
453            }
454        }
455    }
456 
457    public void evict() {
458        double total = 0;
459        for (BackingStore bs : backingStores.values()) {
460            double rating = bs.evictCandidate();
461            if (rating > 0)
462                total += rating;
463        }
464        if (total > 0)
465            doEvict();
466    }
467 
468    public void stop() {
469        running = false;
470        if (connectionMonitoring != null) connectionMonitoring.cancel(true);
471 
472        IOUtils.close(dsf);
473        IOUtils.close(sds);
474 
475        for (BackingStore bs : backingStores.values())
476            bs.stop();
477        for (NetObject netObject : otherServers.values())
478            IOUtils.close(netObject);
479        otherServers.clear();
480    }
481 
482    public boolean isClosed() {
483        return closed;
484    }
485 
486    public void close() {
487        closed = true;
488        stop();
489        executor.shutdownNow();
490    }
491 
492    /**
493     * Called remotely, this method takes a snapshot of all data.
494     *
495     * @return the snapshot.
496     */
497    @SuppressWarnings({"UnusedDeclaration"}) Map<String, Collection<MetaData<Object, Object>>> remoteGetSnapshot() {
498        Map<String, Collection<MetaData<Object, Object>>> ret = new LinkedHashMap<String, Collection<MetaData<Object, Object>>>();
499        for (Map.Entry<String, BackingStore> entry : backingStores.entrySet()) {
500            ret.put(entry.getKey(), ImmutableUtils.immutableWrap(entry.getValue().getAllData()));
501        }
502        return ret;
503    }
504 
505    /**
506     * perform a number of changes in batch.
507     */
508    void remotePerformChanges(@NotNull String source, long timeMS, @NotNull Map<String, Map> changes) throws InterruptedException {
509        Map<String, Collection<MetaData<Object, Object>>> changes2 = new LinkedHashMap<String, Collection<MetaData<Object, Object>>>();
510        for (Map.Entry<String, Map> entry : changes.entrySet()) {
511            String changedBS = entry.getKey();
512            BackingStore<Object, Object> store = getBackingStore(changedBS);
513            Map changesMap = entry.getValue();
514            Collection<MetaData<Object, Object>> changesCollection = store.performChanges(source, timeMS, changesMap);
515            if (!changesCollection.isEmpty())
516                changes2.put(changedBS, changesCollection);
517        }
518        //System.out.println(getName()+": source "+source+"| listenerSet="+listenerSet+"| changes "+ StringUtils.truncate(changes2));
519        // only send actual changes.
520        if (!changes2.isEmpty()) {
521            try {
522                listenerSet.onCallback(new ClusterEvent(name, changes2));
523            } catch (Exception e) {
524                warnExThrown(e);
525            }
526            evict();
527        }
528 
529        // notify the individual cluster stores.
530        for (Map.Entry<String, Map> entry : changes.entrySet()) {
531            String collection = entry.getKey();
532            ClusterStore cstore;
533            synchronized (clusterStores) {
534                cstore = clusterStores.get(collection);
535            }
536            if (cstore != null) {
537                Event.ListenerSet listenerSet = cstore.getListenerSet();
538                if (!listenerSet.isEmpty()) {
539                    for (Map.Entry entry2 : (Set<Map.Entry>) entry.getValue().entrySet())
540                        try {
541                            listenerSet.onCallback(new StoreEvent(entry2.getValue() != null, entry2.getKey(), entry2.getValue()));
542                        } catch (Exception e) {
543                            warnExThrown(e);
544                        }
545                }
546            }
547        }
548    }
549 
550    private void warnExThrown(Exception e) {
551        LOG.warn(getName() + ": Exception thrown by Callback listener", e);
552    }
553 
554    /**
555     * Perform a bootstrap of changes. e.g. from a snap shot.
556     */
557    private int remotePerformChanges(@NotNull Map<String, Collection<MetaData<Object, Object>>> changes) throws InterruptedException {
558        Map<String, Collection<MetaData<Object, Object>>> changes2 = new LinkedHashMap<String, Collection<MetaData<Object, Object>>>();
559        if (Thread.interrupted())
560            throw new InterruptedException();
561        int count = 0;
562        for (Map.Entry<String, Collection<MetaData<Object, Object>>> entry : changes.entrySet()) {
563            String changedBS = entry.getKey();
564            BackingStore<Object, Object> store = getBackingStore(changedBS);
565            Collection<MetaData<Object, Object>> changeCollection = store.performChanges(entry.getValue());
566            count += changeCollection.size();
567            if (!changeCollection.isEmpty())
568                changes2.put(changedBS, changeCollection);
569        }
570        // only send actual changes.
571        if (!changes2.isEmpty())
572            try {
573                listenerSet.onCallback(new ClusterEvent(name, changes2));
574            } catch (Exception e) {
575                warnExThrown(e);
576            }
577        return count;
578    }
579 
580    private static boolean equals2(Object value1, Object value2) {
581        return value1 == null ? value2 == null : value1.equals(value2);
582    }
583 
584    /**
585     * Validate arguments and pass to the approriate method.
586     */
587    final class MapView<K, V> extends AbstractMap<K, V> implements Named, ClusterMap<K, V> {
588        private final String collectionName;
589        private final BackingStore<K, V> store;
590 
591        MapView(@NotNull String collectionName, @NotNull BackingStore<K, V> store) {
592            this.collectionName = collectionName;
593            this.store = store;
594        }
595 
596        public void addListener(@NotNull CacheListener<K> cacheListener) {
597            store.getListenerSet().add(new CacheListenerWrapper<K, V>((Store) getClusterStore(collectionName), cacheListener));
598        }
599 
600        public void clear() {
601            store.checkReadOnly();
602 
603            // this is a significant event, which cluster does not have a specific operation for.
604            // so wait a moment for other threads to settle before doing this.
605            // noinspection CallToThreadYield
606            Thread.yield();
607 
608            Lock writeLock = store.getLock().writeLock();
609            Set<K> keys = store.getKeySet();
610            Transaction t = null;
611            try {
612                writeLock.lock();
613                t = Transaction.start("clear");
614 
615                clusterRemoveAll(collectionName, keys);
616                t.commit();
617            } finally {
618                writeLock.unlock();
619                if (t != null) t.complete();
620            }
621        }
622 
623        public boolean containsKey(Object key) {
624            return store.containsKey(key);
625        }
626 
627        public void evict() {
628        }
629 
630        public Event.ListenerSet getListenerSet() {
631            return store.getListenerSet();
632        }
633 
634        @Nullable public <R> R visit(Visitor<ClusterMap<K, V>, R> visitor) {
635            Lock writeLock = store.getLock().writeLock();
636            try {
637                //noinspection LockAcquiredButNotSafelyReleased
638                writeLock.lockInterruptibly();
639            } catch (InterruptedException e) {
640                Thread.currentThread().interrupt();
641                throw new VisitorException(e);
642            }
643            try {
644                return visitor.visit(this);
645            } finally {
646                writeLock.unlock();
647            }
648        }
649 
650        public Map<K, V> selectMatching(Predicate<? super MetaData<K, V>> predicate) {
651            return store.selectMatching(predicate);
652        }
653 
654        public int removeMatching(Predicate<? super MetaData<K, V>> predicate) {
655            Map<K, V> map = store.selectMatching(predicate);
656            int count = 0;
657            for (Entry<K, V> entry : map.entrySet()) {
658                if (remove(entry.getKey(), entry.getValue()))
659                    count++;
660            }
661            return count;
662        }
663 
664        @NotNull
665        public Set<Entry<K, V>> entrySet() {
666            return store.getEntrySet();
667        }
668 
669        public Map<K, V> getAll(@NotNull Collection<K> keys) {
670            return store.getAll((Collection) keys);
671        }
672 
673        public CacheEntry<K, V> getCacheEntry(@NotNull K key) {
674            return store.getMetaData(key);
675        }
676 
677        public CacheStatistics getCacheStatistics() {
678            return null;
679        }
680 
681        @Nullable public V get(Object key) {
682            return store.getValue((K) key);
683        }
684 
685        @NotNull public String getName() {
686            return name + '.' + collectionName;
687        }
688 
689        public Set<K> keySet() {
690            return store.getKeySet();
691        }
692 
693        public void load(@NotNull K key) {
694        }
695 
696        public void loadAll(@NotNull Collection<K> keys) {
697        }
698 
699        @Nullable public V peek(@NotNull K key) {
700            return get(key);
701        }
702 
703        @Nullable public V put(@NotNull K key, @NotNull V value) {
704            return put0(key, value, true);
705        }
706 
707        @Nullable private V put0(K key, V value, boolean override) {
708            store.checkReadOnly();
709            store.checkKey(key);
710            store.checkValue(value);
711            // make transaction so that the change can be pushed out after the store is unlocked.
712 
713            Transaction t = distributedCluster && !Transaction.isTransactional() ? Transaction.start("put") : null;
714            waitUntilAllMastersTried(getName());
715            Lock writeLock = store.getLock().writeLock();
716            try {
717                writeLock.lock();
718                V prev = store.getValue(key);
719                if ((prev == null || override) && !equals2(prev, value))
720                    clusterPut(collectionName, key, value);
721                if (t != null) t.commit();
722                return prev;
723            } finally {
724                writeLock.unlock();
725                // propegrate changes.
726                if (t != null) t.complete();
727            }
728        }
729 
730        @Nullable public V putIfAbsent(K key, V value) {
731            return put0(key, value, false);
732        }
733 
734        @Nullable public V remove(@Nullable Object key) {
735            return remove0((K) key, false, null);
736        }
737 
738        private V remove0(K key, boolean byValue, Object value) {
739            store.checkReadOnly();
740            if (key == null) return null;
741            try {
742                store.checkKey(key);
743            } catch (IllegalArgumentException ignored) {
744                return null;
745            }
746 
747            boolean transactional = distributedCluster && !Transaction.isTransactional();
748            Transaction t = transactional ? Transaction.start("remove") : null;
749            Lock exclusive = store.getLock().writeLock();
750            try {
751                exclusive.lock();
752                V prev = store.getValue(key);
753                if (prev != null || byValue && equals2(prev, value))
754                    clusterRemove(collectionName, key);
755                if (t != null) t.commit();
756                return prev;
757            } finally {
758                exclusive.unlock();
759                if (t != null) t.complete();
760            }
761        }
762 
763        public boolean remove(Object key, Object value) {
764            return equals2(remove0((K) key, true, value), value);
765        }
766 
767        public void removeListener(@NotNull CacheListener<K> cacheListener) {
768            store.getListenerSet().remove(new CacheListenerWrapper<K, V>((Store) getClusterStore(collectionName), cacheListener));
769        }
770 
771        public boolean replace(K key, V oldValue, V newValue) {
772            return equals2(replace0(key, newValue, true, oldValue), oldValue);
773        }
774 
775        @Nullable public V replace(K key, V value) {
776            return replace0(key, value, false, null);
777        }
778 
779        @Nullable private V replace0(K key, V value, boolean byValue, V oldValue) {
780            store.checkReadOnly();
781            store.checkKey(key);
782            store.checkValue(value);
783 
784            Transaction t = distributedCluster && !Transaction.isTransactional() ? Transaction.start("replace") : null;
785            Lock writeLock = store.getLock().writeLock();
786            try {
787                writeLock.lock();
788                V prev = store.getValue(key);
789                if (prev != null && (!byValue || equals2(prev, oldValue)))
790                    clusterPut(collectionName, key, value);
791                if (t != null) t.commit();
792                return prev;
793            } finally {
794                writeLock.unlock();
795                if (t != null) t.complete();
796            }
797        }
798 
799        public int size() {
800            return store.getSize();
801        }
802    }
803 
804    static final class CacheListenerWrapper<K, V> implements Store.StoreListener<K, V> {
805        private final Store<K, V> store;
806        private final CacheListener<K> cacheListener;
807 
808        CacheListenerWrapper(Store<K, V> store, CacheListener<K> cacheListener) {
809            this.store = store;
810            this.cacheListener = cacheListener;
811        }
812 
813        public void onStatusEvent(boolean online) throws IllegalStateException {
814            if (online) {
815                cacheListener.onClear();
816                for (K key : store.getMapView().keySet())
817                    cacheListener.onLoad(key);
818            } else {
819                for (K key : store.getMapView().keySet())
820                    cacheListener.onEvict(key);
821            }
822        }
823 
824        public void onUpdate(@NotNull K key, @NotNull V value) {
825            cacheListener.onPut(key);
826        }
827 
828        public void onRemoved(@NotNull K key, @Nullable V value) {
829            cacheListener.onRemove(key);
830        }
831 
832        public boolean equals(Object obj) {
833            return obj instanceof CacheListenerWrapper && cacheListener.equals(((CacheListenerWrapper) obj).cacheListener);
834        }
835 
836        public int hashCode() {
837            return cacheListener.hashCode();
838        }
839    }
840 
841    final class QueueView<K, E> extends AbstractQueue<E> implements Named {
842        private final String collectionName;
843        private final BackingStore<K, E> store;
844 
845        QueueView(@NotNull String collectionName, @NotNull BackingStore<K, E> store) {
846            this.collectionName = collectionName;
847            this.store = store;
848        }
849 
850        public void clear() {
851            store.checkReadOnly();
852            clusterRemoveAll(collectionName, store.getKeySet());
853        }
854 
855        @NotNull
856        public String getName() {
857            return name + '.' + collectionName;
858        }
859 
860        @NotNull
861        public Iterator<E> iterator() {
862            return store.getValues().iterator();
863        }
864 
865        public boolean offer(@NotNull E element) {
866            store.checkReadOnly();
867            store.checkValue(element);
868            K key = store.deriveKey(element); // checks element.
869            E prev = store.getValue(key);
870            if (prev != null && !prev.equals(element))
871                return false;
872            clusterPut(collectionName, key, element);
873            return true;
874        }
875 
876        @Nullable
877        public E poll() {
878            store.checkReadOnly();
879            K key = store.firstKey();
880            if (key == null) return null;
881            E prev = store.getValue(key);
882            clusterRemove(collectionName, key);
883            return prev;
884        }
885 
886        @Nullable
887        public E peek() {
888            K key = store.firstKey();
889            if (key == null) return null;
890            return store.getValue(key);
891        }
892 
893        public int size() {
894            return store.getSize();
895        }
896    }
897 
898    // called on a Transaction commit.
899    final class ClusterCommitCallback implements Callback<Map<String, Map>> {
900        public void onCallback(@NotNull final Map<String, Map> changes) throws IllegalStateException {
901            final long timeMS = System.currentTimeMillis();
902            if (isAMaster())
903                try {
904                    remotePerformChanges(logicalHostname, timeMS, changes);
905                } catch (InterruptedException e) {
906                    throw new IllegalStateException("Unable to commit changes", e);
907                }
908            else if (!isASlave())
909                throw new IllegalStateException("Unable to commit changes on a node which is neither a master nor a slave");
910            // called so this can be performed outside writeLock().
911            Transaction.runOnCompleteCurrent(new Runnable() {
912                public void run() {
913                    completeChangesToOtherServers(changes, timeMS);
914                }
915            });
916        }
917 
918        void completeChangesToOtherServers(Map<String, Map> changes, long timeMS) {
919            if (LOG.isDebugEnabled()) LOG.debug("Sending changes to " + otherServers.keySet() + " changes " + changes);
920 
921            for (String master : otherServers.keySet()) {
922                try {
923                    NetObject<ClusterImpl> netObject = otherServers.get(master);
924                    if (LOG.isDebugEnabled()) LOG.debug(name + ": Sending changes to " + master);
925                    netObject.call("remotePerformChanges", logicalHostname, timeMS, changes);
926                } catch (Timeout.TimeoutException te) {
927                    if (running) {
928                        String mesg = name + ": Unable to send change to " + master;
929                        if (LOG.isDebugEnabled())
930                            LOG.warn(mesg, te.getCause());
931                        else
932                            LOG.warn(mesg + ' ' + te.getCause());
933                    }
934                } catch (IllegalStateException ise) {
935                    if (running)
936                        LOG.warn(name + ": Unable to commit changes " + master + ' ' + ise);
937                } catch (Throwable throwable) {
938                    if (running)
939                        LOG.warn(name + ": " + Main.UNHANDLED_EXCEPTION + " attempting to perform changes at " + master, throwable);
940                }
941            }
942        }
943    }
944 
945    static final class ConnectionMonitor implements Runnable {
946        private final Reference<ClusterImpl> clusterRef;
947 
948        ConnectionMonitor(ClusterImpl clusterRef) {
949            this.clusterRef = new WeakReference<ClusterImpl>(clusterRef);
950        }
951 
952        public void run() {
953            ClusterImpl cluster = clusterRef.get();
954            if (cluster == null)
955                throw new IllegalStateException();
956            cluster.connectionMonitor();
957        }
958    }
959 
960    private void connectionMonitor() {
961        Set<String> connectedHosts = getOtherHosts();
962        try {
963            checkConnections(false);
964            if (!allMastersTried) {
965                int retries = 10;
966                ClusterComponentBuilder.ClusterData clusterData2 = clusterData.get(logicalHostname);
967                if (clusterData2 != null && clusterData2.retries != null)
968                    retries = clusterData2.retries;
969                for (int i = 1; i <= retries && running; i++) {
970                    if (otherServers.keySet().equals(connectedHosts)) {
971                        break;
972                    }
973                    if (i > 1) {
974                        Set<String> hosts = new LinkedHashSet<String>(connectedHosts);
975                        hosts.removeAll(otherServers.keySet());
976                        LOG.info(logicalHostname + ": Waiting for " + hosts + " to become available. " + i + " of " + retries + " retries");
977                    }
978                    Thread.sleep(250 * i);
979                    checkConnections(i == retries);
980                }
981            }
982        } catch (InterruptedException e) {
983            LOG.warn(logicalHostname + ": Connection Monitor interrupted", e);
984        } finally {
985            if (!allMastersTried)
986                try {
987                    allMastersTried = true;
988                    lock.lock();
989                    if (LOG.isInfoEnabled())
990                        if (otherServers.keySet().equals(connectedHosts))
991                            infoMastersAvailable("All");
992                        else
993                            infoMastersAvailable("Tried");
994                    allMastersTriedCondition.signalAll();
995                } finally {
996                    lock.unlock();
997                }
998        }
999    }
1000 
1001    private void infoMastersAvailable(String s) {
1002        LOG.info(logicalHostname + ": " + s + " masters " + otherServers.keySet() + " available.");
1003    }
1004 
1005    @SuppressWarnings({"OverlyComplexMethod"})
1006    private void checkConnections(boolean debug) {
1007        try {
1008            long start = System.currentTimeMillis();
1009            boolean allOkay = true;
1010            for (String server : getClusterHosts()) {
1011                if (server.equals(logicalHostname)) continue;
1012                NetObject<ClusterImpl> netObject = otherServers.get(server);
1013                String reason = "netObject is not connected";
1014                if (netObject != null) {
1015                    Transaction t = null;
1016                    try {
1017                        if (netObject.isClosed()) {
1018                            reason = "netObject was closed";
1019                        } else {
1020                            t = Transaction.start("Poll " + server, new Timeout(5 * CHECK_PERIOD_MS));
1021                            closeOnRollbackCurrent(netObject);
1022                            netObject.call("getName");
1023                            if (netObject.isClosed())
1024                                reason = "netObject was closed after call to getName";
1025                            else
1026                                reason = null;
1027                            t.commit();
1028                        }
1029                    } catch (Timeout.TimeoutException e) {
1030                        if (running && LOG.isDebugEnabled())
1031                            LOG.debug(getStatusFailedMesg(server, e));
1032                        reason = e + " " + e.getCause();
1033                        netObject.close();
1034                    } catch (Exception e) {
1035                        // noinspection CallToThreadYield
1036                        Thread.yield();
1037                        if (running && LOG.isInfoEnabled())
1038                            LOG.info(getStatusFailedMesg(server, e));
1039                        reason = e + " " + e.getCause();
1040                        netObject.close();
1041                    } finally {
1042                        if (t != null) t.complete();
1043                        if (reason != null) {
1044                            if (LOG.isDebugEnabled()) LOG.debug(name + ": Connection lost to " + server + ' ' + reason);
1045                            otherServers.remove(server);
1046                        }
1047                    }
1048                    if (reason == null) continue;
1049                }
1050                allOkay = false;
1051                try {
1052                    if (LOG.isDebugEnabled() || debug)
1053                        LOG.info(logicalHostname + ": connecting to " + server + " as " + reason);
1054                    acquireMaster(server);
1055                } catch (Timeout.TimeoutException e) {
1056                    if (!running) return;
1057                    if (LOG.isDebugEnabled()) LOG.debug(name + ": Unable to contact master " + server + ' ' + e);
1058                }
1059            }
1060            if (allOkay) {
1061                int count = cleanupMetaData(start);
1062                if (count > 0 && LOG.isDebugEnabled())
1063                    LOG.debug(logicalHostname + ": Cleaned up " + count + " old meta data entries for deleted keys.");
1064            }
1065        } catch (VisitorException e) {
1066            if (running) {
1067                if (e.getCause() instanceof InterruptedException) {
1068                    if (LOG.isDebugEnabled() || debug) LOG.info("Unexpected interrupt", e.getCause());
1069                } else if (LOG.isInfoEnabled()) {
1070                    LOG.info(Main.UNHANDLED_EXCEPTION, e.getCause());
1071                }
1072            }
1073        } catch (Throwable throwable) {
1074            // noinspection CallToThreadYield
1075            Thread.yield();
1076            if (running) LOG.error(name + ": " + Main.UNHANDLED_EXCEPTION, throwable);
1077        } finally {
1078            rollbackAll();
1079        }
1080    }
1081 
1082    private String getStatusFailedMesg(String server, Exception e) {
1083        return name + ": Failed to get status from " + server + ' ' + e;
1084    }
1085}

[all classes][org.jtoolkit.essence.data.impl]
EMMA 2.0.5312 (C) Vladimir Roubtsov