| 1 | package org.jtoolkit.essence.data.impl; |
| 2 | |
| 3 | import org.jetbrains.annotations.NotNull; |
| 4 | import org.jetbrains.annotations.Nullable; |
| 5 | import org.jtoolkit.essence.app.pojo.impl.DataValueClass; |
| 6 | import org.jtoolkit.essence.app.pojo.impl.PojoContext; |
| 7 | import org.jtoolkit.essence.concurrency.Callback; |
| 8 | import static org.jtoolkit.essence.concurrency.Threads.CONTROL_THREAD; |
| 9 | import static org.jtoolkit.essence.concurrency.Threads.createSingleSES; |
| 10 | import org.jtoolkit.essence.data.Event; |
| 11 | import org.jtoolkit.essence.data.Predicate; |
| 12 | import org.jtoolkit.essence.data.Store; |
| 13 | import org.jtoolkit.essence.data.Cluster; |
| 14 | import static org.jtoolkit.essence.data.Transaction.include; |
| 15 | import static org.jtoolkit.essence.data.Transaction.isTransactional; |
| 16 | import org.jtoolkit.essence.utils.Pair; |
| 17 | import static org.jtoolkit.essence.utils.StringUtils.NEW_LINE; |
| 18 | |
| 19 | import javax.cache.Cache; |
| 20 | import javax.cache.CacheEntry; |
| 21 | import javax.cache.CacheListener; |
| 22 | import javax.cache.CacheStatistics; |
| 23 | import java.util.*; |
| 24 | import java.util.concurrent.ConcurrentMap; |
| 25 | import java.util.concurrent.ScheduledExecutorService; |
| 26 | |
| 27 | /* |
| 28 | Copyright 2006 Peter Lawrey |
| 29 | |
| 30 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 31 | you may not use this file except in compliance with the License. |
| 32 | You may obtain a copy of the License at |
| 33 | |
| 34 | http://www.apache.org/licenses/LICENSE-2.0 |
| 35 | |
| 36 | Unless required by applicable law or agreed to in writing, software |
| 37 | distributed under the License is distributed on an "AS IS" BASIS, |
| 38 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 39 | See the License for the specific language governing permissions and |
| 40 | limitations under the License. |
| 41 | */ |
| 42 | |
| 43 | /** |
| 44 | * @author Peter Lawrey |
| 45 | */ |
| 46 | @SuppressWarnings({"ClassWithTooManyMethods"}) |
| 47 | public abstract class AbstractStore<K, V> implements Store<K, V> { |
| 48 | private static final Object storeMonitorLock = new Object(); |
| 49 | /** |
| 50 | * @noinspection StaticNonFinalField |
| 51 | */ |
| 52 | private static ScheduledExecutorService storeMonitor = null; |
| 53 | |
| 54 | protected final String name; |
| 55 | @Nullable protected final DataValueClass<K> keyClass; |
| 56 | @NotNull protected final DataValueClass<V> valueClass; |
| 57 | @NotNull protected final Event.ListenerSet listenerSet; |
| 58 | @NotNull private final ReadMode readMode; |
| 59 | @NotNull protected final PersistMode persistMode; |
| 60 | protected final boolean isQueue; |
| 61 | private final MapView mapView = new MapView(); |
| 62 | private final Queue<V> queueView = new QueueView(); |
| 63 | |
| 64 | protected final Callback<Map<String, Map>> changeCallback = new Callback<Map<String, Map>>() { |
| 65 | public void onCallback(@NotNull Map<String, Map> changes) throws Exception { |
| 66 | Map changes2 = changes.remove(name); |
| 67 | for (Map.Entry<K, V> entry : (Set<Map.Entry<K, V>>) changes2.entrySet()) { |
| 68 | K key = entry.getKey(); |
| 69 | V value = entry.getValue(); |
| 70 | if (value == null) |
| 71 | doRemove(key, false, null); |
| 72 | else |
| 73 | put0(key, value); |
| 74 | } |
| 75 | } |
| 76 | }; |
| 77 | |
| 78 | protected boolean closed = false; |
| 79 | |
| 80 | protected AbstractStore(@NotNull String name, @NotNull CollectionType collectionType, @Nullable DataValueClass<K> keyClass, |
| 81 | @NotNull DataValueClass<V> valueClass, @NotNull Event.ListenerSet listenerSet, @NotNull ReadMode readMode, |
| 82 | @NotNull PersistMode persistMode) { |
| 83 | this.name = name; |
| 84 | this.keyClass = keyClass; |
| 85 | this.valueClass = valueClass; |
| 86 | this.listenerSet = listenerSet; |
| 87 | this.readMode = readMode; |
| 88 | this.persistMode = persistMode; |
| 89 | isQueue = collectionType == CollectionType.ANY || collectionType == CollectionType.QUEUE; |
| 90 | } |
| 91 | |
| 92 | private long counter = 0; |
| 93 | |
| 94 | public Cluster getCluster() { |
| 95 | return null; |
| 96 | } |
| 97 | |
| 98 | private K getKey(V value) { |
| 99 | K key; |
| 100 | try { |
| 101 | if (keyClass == null) { |
| 102 | key = (K) Long.valueOf(counter++); |
| 103 | } else { |
| 104 | key = keyClass.build(valueClass.asMap2(value), PojoContext.EMPTY); |
| 105 | } |
| 106 | } catch (InstantiationException e) { |
| 107 | throw new IllegalArgumentException("Unable to derive a key from value " + value, e.getCause()); |
| 108 | } |
| 109 | return key; |
| 110 | } |
| 111 | |
| 112 | protected void checkKey(@NotNull Object key) throws IllegalArgumentException { |
| 113 | Class clazz = key.getClass(); |
| 114 | if (keyClass != null && keyClass.getType() != clazz && keyClass.isNotAssignableFrom(clazz)) |
| 115 | throw new IllegalArgumentException("Cannot cast key " + key + ' ' + clazz + " to " + keyClass); |
| 116 | |
| 117 | if (key instanceof Number) { |
| 118 | Number num = (Number) key; |
| 119 | if (counter <= num.longValue()) |
| 120 | counter = num.longValue() + 1; |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | protected boolean add(@NotNull V v) { |
| 125 | V prev = doPut(getKey(v), v, true, true); |
| 126 | return !v.equals(prev); |
| 127 | } |
| 128 | |
| 129 | protected abstract V doPut(K key, V value, boolean ifAbsert, boolean ifPresent); |
| 130 | |
| 131 | protected abstract void put0(K key, V value); |
| 132 | |
| 133 | protected abstract boolean containsKey(@NotNull Object key); |
| 134 | |
| 135 | protected abstract boolean containsValue(@NotNull Object value); |
| 136 | |
| 137 | protected abstract void clear(); |
| 138 | |
| 139 | public void close() { |
| 140 | closed = true; |
| 141 | } |
| 142 | |
| 143 | @Nullable protected abstract V first(); |
| 144 | |
| 145 | @Nullable protected abstract K firstKey(); |
| 146 | |
| 147 | @Nullable protected abstract V get(Object key); |
| 148 | |
| 149 | private Map<K, V> getAll(Collection<K> keys) { |
| 150 | Map<K, V> ret = new LinkedHashMap<K, V>(); |
| 151 | for (K key : keys) |
| 152 | ret.put(key, get(key)); |
| 153 | return ret; |
| 154 | } |
| 155 | |
| 156 | @NotNull public String getName() { |
| 157 | return name; |
| 158 | } |
| 159 | |
| 160 | @NotNull public Event.ListenerSet getListenerSet() { |
| 161 | return listenerSet; |
| 162 | } |
| 163 | |
| 164 | @NotNull public Cache<K, V> getCacheView() { |
| 165 | return mapView; |
| 166 | } |
| 167 | |
| 168 | @NotNull public ConcurrentMap<K, V> getMapView() { |
| 169 | return mapView; |
| 170 | } |
| 171 | |
| 172 | @NotNull public Queue<V> getQueueView() { |
| 173 | return queueView; |
| 174 | } |
| 175 | |
| 176 | @NotNull protected ReadMode getReadMode() { |
| 177 | return readMode; |
| 178 | } |
| 179 | |
| 180 | public boolean isClosed() { |
| 181 | return closed; |
| 182 | } |
| 183 | |
| 184 | protected abstract Set<K> keySet(); |
| 185 | |
| 186 | @Nullable protected abstract V removeFirst(); |
| 187 | |
| 188 | public Map<K, V> selectMatching(Predicate<Map.Entry<K, V>> predicate) { |
| 189 | Map<K, V> matching = new LinkedHashMap<K, V>(); |
| 190 | for (Map.Entry<K, V> value : asMap().entrySet()) |
| 191 | if (predicate.isSatisfiedBy(value)) |
| 192 | matching.put(value.getKey(), value.getValue()); |
| 193 | return matching; |
| 194 | } |
| 195 | |
| 196 | public int removeMatching(Predicate<Map.Entry<K, V>> predicate) { |
| 197 | try { |
| 198 | int count = 0; |
| 199 | for (Map.Entry<K, V> value : asMap().entrySet()) |
| 200 | if (predicate.isSatisfiedBy(value)) { |
| 201 | if (doRemove(value.getKey(), false, null) != null) |
| 202 | count++; |
| 203 | } |
| 204 | return count; |
| 205 | } catch (RuntimeException e) { |
| 206 | throw e; |
| 207 | } catch (Exception e) { |
| 208 | throw new IllegalStateException(e); |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | protected abstract int getSize(); |
| 213 | |
| 214 | protected abstract Collection<V> values(); |
| 215 | |
| 216 | protected abstract Collection<V> orderedValues(); |
| 217 | |
| 218 | protected boolean canWrite() { |
| 219 | return persistMode != PersistMode.NONE && persistMode != PersistMode.READ_ONLY; |
| 220 | } |
| 221 | |
| 222 | private void checkOpen() { |
| 223 | if (closed) throw new IllegalStateException(name + " closed"); |
| 224 | } |
| 225 | |
| 226 | protected void checkWrite() { |
| 227 | checkOpen(); |
| 228 | if (persistMode == PersistMode.READ_ONLY) |
| 229 | throw new UnsupportedOperationException(name + ": Collection is read only"); |
| 230 | } |
| 231 | |
| 232 | protected void checkValue(@NotNull Object value) throws IllegalArgumentException { |
| 233 | Class clazz = value.getClass(); |
| 234 | if (valueClass.isNotAssignableFrom(clazz)) |
| 235 | throw new IllegalArgumentException("Cannot cast value " + value + ' ' + clazz + " to " + valueClass); |
| 236 | } |
| 237 | |
| 238 | // Called from NetObject on construction. |
| 239 | @SuppressWarnings({"UnusedDeclaration"}) |
| 240 | public Pair<Class<K>, Class<V>> getClasses() { |
| 241 | return new Pair<Class<K>, Class<V>>(keyClass == null ? null : keyClass.getType(), valueClass.getType()); |
| 242 | } |
| 243 | |
| 244 | protected static ScheduledExecutorService getStoreMonitor() { |
| 245 | synchronized (storeMonitorLock) { |
| 246 | if (storeMonitor == null) |
| 247 | storeMonitor = createSingleSES(CONTROL_THREAD + "tab-monitor"); |
| 248 | return storeMonitor; |
| 249 | } |
| 250 | } |
| 251 | |
| 252 | protected void notifyUpdate(@NotNull K key, @Nullable V newValue) { |
| 253 | if (!listenerSet.isEmpty()) |
| 254 | try { |
| 255 | listenerSet.onCallback(new StoreEvent<K, V>(true, key, newValue)); |
| 256 | } catch (Exception e) { |
| 257 | throw new IllegalStateException("Listener set threw an exception processing event key="+key+", newValue="+newValue, e); |
| 258 | } |
| 259 | } |
| 260 | |
| 261 | private void doClear() { |
| 262 | checkWrite(); |
| 263 | if (isTransactional()) { |
| 264 | Map<String, Map> mapOfKeys = new LinkedHashMap<String, Map>(); |
| 265 | Map<K, V> map2 = new LinkedHashMap<K, V>(); |
| 266 | for (K key : keySet()) |
| 267 | map2.put(key, null); |
| 268 | mapOfKeys.put(name, map2); |
| 269 | include(changeCallback, mapOfKeys, null); |
| 270 | return; |
| 271 | } |
| 272 | for (K key : keySet()) { |
| 273 | } |
| 274 | clear(); |
| 275 | } |
| 276 | |
| 277 | @Nullable protected abstract V doRemove(@NotNull K key, boolean byValue, @Nullable V value); |
| 278 | |
| 279 | static boolean equals2(Object value1, Object value2) { |
| 280 | return value1 == null ? value2 == null : value1.equals(value2); |
| 281 | } |
| 282 | |
| 283 | final class MapView extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Cache<K, V> { |
| 284 | public void addListener(@NotNull CacheListener<K> cacheListener) { |
| 285 | listenerSet.add(new ClusterImpl.CacheListenerWrapper<K, V>(AbstractStore.this, cacheListener)); |
| 286 | } |
| 287 | |
| 288 | public void evict() { |
| 289 | } |
| 290 | |
| 291 | public Map<K, V> getAll(@NotNull Collection<K> ks) { |
| 292 | return AbstractStore.this.getAll(ks); |
| 293 | } |
| 294 | |
| 295 | public CacheEntry getCacheEntry(@NotNull K k) { |
| 296 | return null; |
| 297 | } |
| 298 | |
| 299 | public CacheStatistics getCacheStatistics() { |
| 300 | return null; |
| 301 | } |
| 302 | |
| 303 | public void load(@NotNull K k) { |
| 304 | } |
| 305 | |
| 306 | public void loadAll(@NotNull Collection<K> ks) { |
| 307 | } |
| 308 | |
| 309 | @Nullable public V peek(@NotNull K key) { |
| 310 | return get(key); |
| 311 | } |
| 312 | |
| 313 | public void removeListener(@NotNull CacheListener<K> cacheListener) { |
| 314 | listenerSet.remove(new ClusterImpl.CacheListenerWrapper<K, V>(AbstractStore.this, cacheListener)); |
| 315 | } |
| 316 | |
| 317 | public boolean containsKey(@NotNull Object key) { |
| 318 | try { |
| 319 | checkKey(key); |
| 320 | boolean ret = AbstractStore.this.containsKey(key); |
| 321 | if (ret) { |
| 322 | } |
| 323 | return ret; |
| 324 | } catch (IllegalArgumentException ignored) { |
| 325 | return false; |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | public boolean containsValue(@NotNull Object value) { |
| 330 | try { |
| 331 | checkValue(value); |
| 332 | return AbstractStore.this.containsValue(value); |
| 333 | } catch (IllegalArgumentException ignored) { |
| 334 | return false; |
| 335 | } |
| 336 | } |
| 337 | |
| 338 | public void clear() { |
| 339 | doClear(); |
| 340 | } |
| 341 | |
| 342 | public Set<Entry<K, V>> entrySet() { |
| 343 | return AbstractStore.this.asMap().entrySet(); |
| 344 | } |
| 345 | |
| 346 | @Nullable public V get(@NotNull Object key) { |
| 347 | // allow get even if not open. |
| 348 | // try { |
| 349 | // checkKey(key); |
| 350 | return AbstractStore.this.get(key); |
| 351 | // } catch (IllegalArgumentException ignored) { |
| 352 | // return null; |
| 353 | // } |
| 354 | } |
| 355 | |
| 356 | public Set<K> keySet() { |
| 357 | return AbstractStore.this.keySet(); |
| 358 | } |
| 359 | |
| 360 | @Nullable public V put(@NotNull K key, @NotNull V value) { |
| 361 | return doPut(key, value, true, true); |
| 362 | } |
| 363 | |
| 364 | public V putIfAbsent(K key, V value) { |
| 365 | return doPut(key, value, true, false); |
| 366 | } |
| 367 | |
| 368 | @Nullable public V remove(@NotNull Object key) { |
| 369 | return doRemove((K) key, false, null); |
| 370 | } |
| 371 | |
| 372 | public boolean remove(Object key, Object value) { |
| 373 | return equals2(doRemove((K) key, true, (V) value), value); |
| 374 | } |
| 375 | |
| 376 | public boolean replace(K key, V oldValue, V newValue) { |
| 377 | return false; |
| 378 | } |
| 379 | |
| 380 | public V replace(K key, V value) { |
| 381 | return doPut(key, value, false, true); |
| 382 | } |
| 383 | |
| 384 | public int size() { |
| 385 | return AbstractStore.this.getSize(); |
| 386 | } |
| 387 | |
| 388 | public Collection<V> values() { |
| 389 | return AbstractStore.this.values(); |
| 390 | } |
| 391 | } |
| 392 | |
| 393 | final class QueueView extends AbstractQueue<V> { |
| 394 | public boolean add(@NotNull V value) { |
| 395 | return AbstractStore.this.add(value); |
| 396 | } |
| 397 | |
| 398 | public void clear() { |
| 399 | doClear(); |
| 400 | } |
| 401 | |
| 402 | public boolean contains(@NotNull Object value) { |
| 403 | checkValue(value); |
| 404 | return AbstractStore.this.containsValue(value); |
| 405 | } |
| 406 | |
| 407 | public Iterator<V> iterator() { |
| 408 | return orderedValues().iterator(); |
| 409 | } |
| 410 | |
| 411 | public boolean offer(@NotNull V value) { |
| 412 | return AbstractStore.this.add(value); |
| 413 | } |
| 414 | |
| 415 | @Nullable public V peek() { |
| 416 | return AbstractStore.this.first(); |
| 417 | } |
| 418 | |
| 419 | @Nullable public V poll() { |
| 420 | checkWrite(); |
| 421 | K key = firstKey(); |
| 422 | return key == null ? null : doRemove(key, false, null); |
| 423 | } |
| 424 | |
| 425 | @Nullable public V remove() { |
| 426 | checkWrite(); |
| 427 | if (isTransactional()) { |
| 428 | K key = firstKey(); |
| 429 | if (key == null) throw new NoSuchElementException(); |
| 430 | V v = doRemove(key, false, null); |
| 431 | if (v == null) throw new NoSuchElementException(); |
| 432 | return v; |
| 433 | } |
| 434 | V v = AbstractStore.this.removeFirst(); |
| 435 | if (v == null) |
| 436 | throw new NoSuchElementException(); |
| 437 | return v; |
| 438 | } |
| 439 | |
| 440 | public int size() { |
| 441 | return AbstractStore.this.getSize(); |
| 442 | } |
| 443 | } |
| 444 | |
| 445 | public String toString() { |
| 446 | Map<K, V> map = asMap(); |
| 447 | StringBuilder sb = new StringBuilder(); |
| 448 | sb.append(name).append(", size=").append(map.size()).append(NEW_LINE); |
| 449 | for (Map.Entry<K, V> entry : map.entrySet()) |
| 450 | sb.append(entry.getKey()).append('=').append(entry.getValue()).append(NEW_LINE); |
| 451 | return sb.toString(); |
| 452 | } |
| 453 | |
| 454 | @Override protected void finalize() throws Throwable { |
| 455 | super.finalize(); |
| 456 | if (!closed) close(); |
| 457 | } |
| 458 | } |