EMMA Coverage Report (generated Tue Apr 17 08:51:20 BST 2007)
[all classes][org.jtoolkit.essence.concurrency]

COVERAGE SUMMARY FOR SOURCE FILE [Threads.java]

nameclass, %method, %block, %line, %
Threads.java100% (3/3)78%  (21/27)59%  (360/608)62%  (89.4/144)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class Threads$EssenceScheduledThreadPoolExecutor100% (1/1)71%  (5/7)30%  (61/202)38%  (18/47)
submit (Runnable): Future 0%   (0/1)0%   (0/6)0%   (0/2)
submit (Runnable, Object): Future 0%   (0/1)0%   (0/7)0%   (0/2)
checkQueueSize (): void 100% (1/1)15%  (22/150)22%  (7/32)
Threads$EssenceScheduledThreadPoolExecutor (String, int, SesThreadFactory): void 100% (1/1)100% (17/17)100% (5/5)
execute (Runnable): void 100% (1/1)100% (6/6)100% (3/3)
submit (Callable): Future 100% (1/1)100% (6/6)100% (2/2)
toString (): String 100% (1/1)100% (10/10)100% (1/1)
     
class Threads100% (1/1)78%  (14/18)74%  (283/385)73%  (66.4/91)
Threads (): void 0%   (0/1)0%   (0/3)0%   (0/2)
access$000 (): Log 0%   (0/1)0%   (0/2)0%   (0/1)
dumpStack (Thread, StackTraceElement []): boolean 0%   (0/1)0%   (0/44)0%   (0/9)
isNotThreadSafe (Object): boolean 0%   (0/1)0%   (0/19)0%   (0/4)
shutdown (ExecutorService): void 100% (1/1)53%  (10/19)74%  (5.2/7)
getCurrentSES (): ScheduledExecutorService 100% (1/1)71%  (12/17)83%  (5/6)
onTimeout (Timeout, Runnable): Future 100% (1/1)83%  (10/12)80%  (1.6/2)
dumpStack (StringBuilder, StackTraceElement []): void 100% (1/1)84%  (94/112)73%  (14.7/20)
<static initializer> 100% (1/1)100% (12/12)100% (3/3)
acquireTimeoutSES (): ScheduledExecutorService 100% (1/1)100% (14/14)100% (3/3)
clearDefaultSES (): void 100% (1/1)100% (14/14)100% (5/5)
createDefaultSES (): void 100% (1/1)100% (11/11)100% (4/4)
createMultiSES (String, int, int): ScheduledExecutorService 100% (1/1)100% (27/27)100% (7/7)
createSingleSES (String): ScheduledExecutorService 100% (1/1)100% (5/5)100% (1/1)
createSingleSES (String, int): ScheduledExecutorService 100% (1/1)100% (5/5)100% (1/1)
dumpStacks (): void 100% (1/1)100% (44/44)100% (11/11)
getWeakRunnable (Runnable): Runnable 100% (1/1)100% (10/10)100% (2/2)
stackSnapshot (): String 100% (1/1)100% (15/15)100% (3/3)
     
class Threads$1100% (1/1)100% (2/2)76%  (16/21)83%  (5/6)
run (): void 100% (1/1)67%  (10/15)80%  (4/5)
Threads$1 (Reference): void 100% (1/1)100% (6/6)100% (1/1)

1package org.jtoolkit.essence.concurrency;
2 
3import org.apache.commons.logging.Log;
4import static org.apache.commons.logging.LogFactory.getLog;
5import org.jetbrains.annotations.NotNull;
6import org.jetbrains.annotations.Nullable;
7import org.jtoolkit.essence.app.pojo.impl.DataValueClass;
8import org.jtoolkit.essence.app.Main;
9import org.jtoolkit.essence.concurrency.impl.SesThreadFactory;
10import org.jtoolkit.essence.concurrency.impl.SesThreadGroup;
11import org.jtoolkit.essence.utils.ImmutableUtils;
12import org.jtoolkit.essence.utils.StringUtils;
13import static org.jtoolkit.essence.utils.StringUtils.NEW_LINE;
14 
15import java.lang.ref.Reference;
16import java.lang.ref.WeakReference;
17import java.util.ArrayList;
18import java.util.List;
19import java.util.Map;
20import java.util.concurrent.*;
21import java.util.concurrent.atomic.AtomicReference;
22 
23/*
24   Copyright 2006 Peter Lawrey
25 
26   Licensed under the Apache License, Version 2.0 (the "License");
27   you may not use this file except in compliance with the License.
28   You may obtain a copy of the License at
29 
30       http://www.apache.org/licenses/LICENSE-2.0
31 
32   Unless required by applicable law or agreed to in writing, software
33   distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
34   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
35   See the License for the specific language governing permissions and
36   limitations under the License.
37*/
38 
39/**
40 * @author Peter Lawrey
41 */
42public class Threads {
43    private static final Log LOG = getLog(Threads.class);
44    private static final AtomicReference<ScheduledExecutorService> DEFAULT_SES = new AtomicReference<ScheduledExecutorService>();
45    private static final AtomicReference<ScheduledExecutorService> TIMEOUT_SES = new AtomicReference<ScheduledExecutorService>();
46 
47    public static final String CONTROL_THREAD = "ctrl$";
48    public static final String LOGICAL_PROCESS = "lp$";
49    private static final int HIGH_QUEUE_SIZE = 1024;
50    private static final int MEDIUM_QUEUE_SIZE = 256;
51    public static final String INTERRUPTED = "Interrupted";
52    private static final String DUMP_STACK_HEADER = "dumpStack: ";
53    private static final String SYSTEM_THERAD_GROUP = "system";
54    private static final String THREAD = "Thread ";
55 
56    private Threads() {
57    }
58 
59    /**
60     * Create a single threaded ScheduledExecutorService
61     *
62     * @param name of the thread in the SES
63     * @return the ScheduledExecutorService
64     */
65    @NotNull public static ScheduledExecutorService createSingleSES(@NotNull String name) {
66        return createMultiSES(name, 1, Thread.NORM_PRIORITY);
67    }
68 
69    @NotNull public static ScheduledExecutorService createSingleSES(@NotNull String name, int priority) {
70        return createMultiSES(name, 1, priority);
71    }
72 
73    @NotNull public static ScheduledExecutorService createMultiSES(@NotNull String name, int maxThreads, int priority) {
74        SesThreadFactory sesThreadFactory = new SesThreadFactory(name, priority);
75        ScheduledThreadPoolExecutor ses = new EssenceScheduledThreadPoolExecutor(name, maxThreads, sesThreadFactory);
76        sesThreadFactory.setSes(ses);
77        ses.setMaximumPoolSize(maxThreads);
78        ses.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
79        ses.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
80        return ses;
81    }
82 
83    /**
84     * Dump all thread stacks in a summarised form.
85     */
86    public static void dumpStacks() {
87        StringBuilder sb = new StringBuilder(DUMP_STACK_HEADER);
88        Map<Thread, StackTraceElement[]> traces = Thread.getAllStackTraces();
89        for (Thread t : traces.keySet()) {
90            if (SYSTEM_THERAD_GROUP.equals(t.getThreadGroup().getName()))
91                continue;
92            StackTraceElement[] stes = traces.get(t);
93            sb.append(THREAD).append(t);
94            dumpStack(sb, stes);
95        }
96        LOG.warn(sb);
97    }
98 
99    private static void dumpStack(@NotNull StringBuilder sb, @Nullable StackTraceElement[] stes) {
100        try {
101            if (stes == null) return;
102            String firstSte = stes[0].toString();
103            if (firstSte.startsWith("java.net.PlainSocketImpl.socketAccept(")) {
104                sb.append("\tat ").append(firstSte).append(NEW_LINE);
105                return;
106            }
107            int end;
108            for (end = stes.length - 1; end >= 0; end--) {
109                String className = stes[end].getClassName();
110                if (className.equals(Thread.class.getName()) ||
111                        className.startsWith(Main.JAVA_LANG_REFLECT) ||
112                        className.startsWith(Main.JAVA_UTIL_CONCURRENT) ||
113                        className.startsWith(Main.JUNIT) ||
114                        className.startsWith(Main.COM_INTELLIJ_RT_EXECUTION) ||
115                        className.startsWith(Main.SUN_REFLECT))
116                    continue;
117                break;
118            }
119            if (end != 0)
120                sb.append(NEW_LINE);
121            for (int i = 0; i <= end; i++) {
122                String className = stes[i].getClassName();
123                if (className.equals(Thread.class.getName()) ||
124                        className.startsWith(Main.JAVA_UTIL_CONCURRENT) ||
125                        className.startsWith(Main.SUN_REFLECT))
126                    continue;
127                sb.append("\tat ").append(stes[i]).append(NEW_LINE);
128            }
129        } catch (Throwable ignored) {
130            sb.append(" got ").append(ignored);
131        }
132    }
133 
134    /**
135     * Log the threads stack dump.
136     *
137     * @param thread
138     * @param stackTraceElements
139     */
140    public static boolean dumpStack(@Nullable Thread thread, @Nullable StackTraceElement[] stackTraceElements) {
141        if (thread == null || stackTraceElements == null) return false;
142 
143        ThreadGroup threadGroup = thread.getThreadGroup();
144        if (threadGroup == null || SYSTEM_THERAD_GROUP.equals(threadGroup.getName())) return false;
145        StringBuilder sb = new StringBuilder(DUMP_STACK_HEADER);
146        sb.append(THREAD).append(thread);
147        dumpStack(sb, stackTraceElements);
148        if (LOG.isInfoEnabled()) LOG.info(sb);
149        else LOG.warn(sb);
150        return true;
151    }
152 
153    /**
154     * Create a default ScheduledExecutorService for processing callbacks for threaded outside the container.
155     */
156    public static void createDefaultSES() {
157        if (DEFAULT_SES.get() == null) {
158            DEFAULT_SES.compareAndSet(null, createSingleSES(CONTROL_THREAD + "defaultSES"));
159            // give the SES a chance to start on slower machines.
160            // noinspection CallToThreadYield
161            Thread.yield();
162        }
163    }
164 
165    /**
166     * Remove the default ScheduledExecutorService
167     */
168    public static void clearDefaultSES() {
169        ScheduledExecutorService ses = DEFAULT_SES.get();
170        if (ses == null) return;
171        ses.shutdownNow();
172        DEFAULT_SES.set(null);
173    }
174 
175    /**
176     * @return the ScheduledExecutorService for the current thread.
177     */
178    public static ScheduledExecutorService getCurrentSES() {
179        ScheduledExecutorService ret = SesThreadGroup.getSes();
180        if (ret == null)
181            ret = DEFAULT_SES.get();
182        if (ret == null)
183            throw new IllegalStateException("No ScheduledExecutorService is associated with this thread.  Use createDefaultSES()");
184        return ret;
185    }
186 
187    private static ScheduledExecutorService acquireTimeoutSES() {
188        if (TIMEOUT_SES.get() == null)
189            TIMEOUT_SES.compareAndSet(null, createSingleSES(CONTROL_THREAD + "timeout-SES", (Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2));
190        return TIMEOUT_SES.get();
191    }
192 
193    /**
194     * When a timeout occurs run the Runnable (unless cancelled).
195     */
196    @Nullable public static Future onTimeout(Timeout timeout, Runnable runnable) {
197        if (timeout.isNever()) return null;
198        return acquireTimeoutSES().schedule(runnable, timeout.getMillisLeft(), TimeUnit.MILLISECONDS);
199    }
200 
201    public static String stackSnapshot() {
202        StringBuilder ret = new StringBuilder(256);
203        dumpStack(ret, new Throwable("here").getStackTrace());
204        return ret.toString();
205    }
206 
207    public static boolean isNotThreadSafe(@NotNull Object component) {
208        Class<?> clazz = component.getClass();
209        if (ImmutableUtils.isImmutable(clazz)) return false;
210        ThreadSafe threadSafe = clazz.getAnnotation(ThreadSafe.class);
211        return threadSafe == null;
212    }
213 
214    @SuppressWarnings({"CallToSystemSetSecurityManager"})
215    public static void shutdown(ExecutorService executorService) {
216        SecurityManager sm = System.getSecurityManager();
217        System.setSecurityManager(null);
218        try {
219            executorService.shutdown();
220        } catch (Throwable ignored) {
221            // ignored
222        } finally {
223            System.setSecurityManager(sm);
224        }
225    }
226 
227    static class EssenceScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
228        private final String name;
229        private int lastSize = 0;
230        private static final int WARNING_PERIOD = 2000;
231 
232        EssenceScheduledThreadPoolExecutor(String name, int maxThreads, SesThreadFactory sesThreadFactory) {
233            super(maxThreads, sesThreadFactory, new CallerRunsPolicy());
234            this.name = name;
235        }
236 
237        @Override public void execute(Runnable command) {
238            checkQueueSize();
239            super.execute(command);
240        }
241 
242        @Override public Future<?> submit(Runnable task) {
243            checkQueueSize();
244            return super.submit(task);
245        }
246 
247        @Override public <T> Future<T> submit(Callable<T> task) {
248            checkQueueSize();
249            return super.submit(task);
250        }
251 
252        @Override public <T> Future<T> submit(Runnable task, T result) {
253            checkQueueSize();
254            return super.submit(task, result);
255        }
256 
257        private long lastQueueWarning = 0;
258 
259        private void checkQueueSize() {
260            int size = getQueue().size();
261            if (lastSize == size) return;
262 
263            if (size >= HIGH_QUEUE_SIZE) {
264                if (LOG.isInfoEnabled()) {
265                    long now = System.currentTimeMillis() / WARNING_PERIOD;
266                    if (lastQueueWarning != now) {
267                        List<Object> toPrint = new ArrayList<Object>();
268                        try {
269                            for (Object sft : getQueue().toArray()) {
270                                Object sync = DataValueClass.getField(sft, "sync");
271                                Object callable = DataValueClass.getField(sync, "callable");
272                                toPrint.add(DataValueClass.asMap(callable));
273                                if (toPrint.size() > 10)
274                                    break;
275                            }
276                        } catch (Exception e) {
277                            LOG.warn(name + ": Exception thrown extracting run queue data", e);
278                        }
279                        LOG.warn(name + ": Queue is reaching high levels size=" + size + ' ' + StringUtils.truncate(toPrint));
280                    }
281                    lastQueueWarning = now;
282                }
283                // noinspection CallToThreadYield
284                Thread.yield();
285            } else if (size >= MEDIUM_QUEUE_SIZE) {
286                if (LOG.isInfoEnabled()) {
287                    long now = System.currentTimeMillis() / WARNING_PERIOD;
288                    if (lastQueueWarning != now)
289                        LOG.info(name + ": Queue is getting long, size=" + size);
290                    lastQueueWarning = now;
291                }
292                if (size % 16 == 0)
293                    // noinspection CallToThreadYield
294                    Thread.yield();
295            } else if (size >= MEDIUM_QUEUE_SIZE / 2) {
296                if (size % 16 == 0)
297                    // noinspection CallToThreadYield
298                    Thread.yield();
299            }
300            lastSize = size;
301        }
302 
303        public String toString() {
304            return "ESTPE " + name;
305        }
306    }
307 
308    public static Runnable getWeakRunnable(Runnable run) {
309        final Reference<Runnable> run2 = new WeakReference<Runnable>(run);
310        return new Runnable() {
311            public void run() {
312                Runnable runnable = run2.get();
313                if (runnable == null)
314                    throw new IllegalStateException("Runnable cleaned up.");
315                runnable.run();
316            }
317        };
318    }
319}

[all classes][org.jtoolkit.essence.concurrency]
EMMA 2.0.5312 (C) Vladimir Roubtsov