EMMA Coverage Report (generated Tue Apr 17 08:51:20 BST 2007)
[all classes][org.jtoolkit.essence.app.net]

COVERAGE SUMMARY FOR SOURCE FILE [ServerDataSocket.java]

nameclass, %method, %block, %line, %
ServerDataSocket.java100% (4/4)86%  (19/22)70%  (376/535)79%  (67.5/85)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ServerDataSocket$1100% (1/1)100% (2/2)54%  (86/158)60%  (13.8/23)
call (): Void 100% (1/1)51%  (74/146)58%  (12.8/22)
ServerDataSocket$1 (ServerDataSocket, DataSocket, String): void 100% (1/1)100% (12/12)100% (1/1)
     
class ServerDataSocket100% (1/1)77%  (10/13)72%  (223/310)85%  (46.7/55)
access$500 (): Log 0%   (0/1)0%   (0/2)0%   (0/1)
getName (): String 0%   (0/1)0%   (0/3)0%   (0/1)
isClosed (): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
run (): void 100% (1/1)24%  (4/17)60%  (3/5)
run0 (): void 100% (1/1)65%  (92/142)90%  (20.6/23)
close (): void 100% (1/1)71%  (40/56)82%  (8.2/10)
<static initializer> 100% (1/1)100% (7/7)100% (2/2)
ServerDataSocket (String, int, Factory): void 100% (1/1)100% (51/51)100% (10/10)
access$100 (ServerDataSocket, String): void 100% (1/1)100% (4/4)100% (1/1)
access$200 (ServerDataSocket): Factory 100% (1/1)100% (3/3)100% (1/1)
access$300 (ServerDataSocket): String 100% (1/1)100% (3/3)100% (1/1)
access$400 (ServerDataSocket): boolean 100% (1/1)100% (3/3)100% (1/1)
setName (String): void 100% (1/1)100% (16/16)100% (2/2)
     
class ServerDataSocket$Processor$Status100% (1/1)100% (4/4)100% (48/48)100% (2/2)
<static initializer> 100% (1/1)100% (34/34)100% (2/2)
ServerDataSocket$Processor$Status (String, int): void 100% (1/1)100% (5/5)100% (1/1)
valueOf (String): ServerDataSocket$Processor$Status 100% (1/1)100% (5/5)100% (1/1)
values (): ServerDataSocket$Processor$Status [] 100% (1/1)100% (4/4)100% (1/1)
     
class ServerDataSocket$SDSThreadFactory100% (1/1)100% (3/3)100% (19/19)100% (5/5)
ServerDataSocket$SDSThreadFactory (): void 100% (1/1)100% (3/3)100% (1/1)
ServerDataSocket$SDSThreadFactory (ServerDataSocket$1): void 100% (1/1)100% (3/3)100% (1/1)
newThread (Runnable): Thread 100% (1/1)100% (13/13)100% (4/4)

1package org.jtoolkit.essence.app.net;
2 
3import org.apache.commons.logging.Log;
4import static org.apache.commons.logging.LogFactory.getLog;
5import org.jetbrains.annotations.NotNull;
6import static org.jtoolkit.essence.app.Main.registerComponent;
7import static org.jtoolkit.essence.app.Main.unregisterComponent;
8import org.jtoolkit.essence.app.Container;
9import org.jtoolkit.essence.concurrency.ThreadSafe;
10import org.jtoolkit.essence.concurrency.Concurrency;
11import org.jtoolkit.essence.data.Transaction;
12import static org.jtoolkit.essence.data.Transaction.closeOnRollbackCurrent;
13import static org.jtoolkit.essence.data.Transaction.start;
14import org.jtoolkit.essence.utils.Closeable;
15import org.jtoolkit.essence.utils.Factory;
16import org.jtoolkit.essence.utils.Named;
17import org.jtoolkit.essence.utils.IOUtils;
18 
19import javax.management.ObjectInstance;
20import java.io.IOException;
21import java.net.ServerSocket;
22import java.net.Socket;
23import java.util.LinkedHashMap;
24import java.util.Map;
25import java.util.WeakHashMap;
26import java.util.concurrent.*;
27 
28/*
29   Copyright 2006 Peter Lawrey
30 
31   Licensed under the Apache License, Version 2.0 (the "License");
32   you may not use this file except in compliance with the License.
33   You may obtain a copy of the License at
34 
35       http://www.apache.org/licenses/LICENSE-2.0
36 
37   Unless required by applicable law or agreed to in writing, software
38   distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
39   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
40   See the License for the specific language governing permissions and
41   limitations under the License.
42*/
43 
44/**
45 * This acts a Server for accepting NetObject connections.
46 *
47 * @author Peter Lawrey
48 */
49@ThreadSafe(Concurrency.SYNCHRONIZED)
50public class ServerDataSocket implements Named, Closeable, Runnable {
51    private static final Log LOG = getLog(ServerDataSocket.class);
52    private static final String SERVER_THREAD = "svr$";
53    private static final DataSocket[] NO_DATA_SOCKETS = new DataSocket[0];
54 
55    private final BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>(true);
56    private final ExecutorService executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, workQueue, new SDSThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
57    private final String name;
58    private final ServerSocket ss;
59    private final Factory<DataSocket, Processor> procFactory;
60    private final Map<DataSocket, Void> dsSet = new WeakHashMap<DataSocket, Void>();
61    private boolean closed = false;
62 
63    public ServerDataSocket(@NotNull String name, int port, @NotNull Factory<DataSocket, Processor> procFactory) throws IOException {
64        this.name = name;
65        this.procFactory = procFactory;
66        ss = new ServerSocket(port);
67        executor.execute(this);
68    }
69 
70    public void close() {
71        if (LOG.isDebugEnabled()) LOG.debug(name + ": Closing server port " + ss.getLocalPort());
72        closed = true;
73        try {
74            ss.close();
75        } catch (IOException ignored) {
76            // ignored.
77        }
78        DataSocket[] sockets = dsSet.keySet().toArray(NO_DATA_SOCKETS);
79        for (DataSocket ds : sockets)
80            ds.close();
81        executor.shutdownNow();
82    }
83 
84    @NotNull public String getName() {
85        return name;
86    }
87 
88    public boolean isClosed() {
89        return closed;
90    }
91 
92    public void run() {
93        try {
94            run0();
95        } catch (Throwable t) {
96            LOG.error(name + ": Threw a fatal error ", t);
97        }
98    }
99 
100    private void run0() {
101        setName("acceptor");
102        if (LOG.isDebugEnabled()) LOG.debug(name + ": Listening on server port " + ss.getLocalPort());
103        Map<String, String> properties = new LinkedHashMap<String, String>();
104        properties.put(Container.DEFAULT_CONTAINER_NAME, procFactory.getName());
105 
106        int counter = 0;
107        while (!closed) {
108            Transaction t = null;
109            try {
110                Socket s = ss.accept();
111                t = start("Waiting to establish connection");
112                closeOnRollbackCurrent(s);
113//                f = Threads.closeOnTimeout("Waiting to establish connection", s, Timeout.parse(connectionTimeOut));
114                final DataSocket ds = new DataSocket(name, String.valueOf(counter), s, properties);
115                final String dsName = ds.otherName + '-' + counter++;
116                dsSet.put(ds, null);
117                Callable<Void> runnable = new Callable<Void>() {
118                    public Void call() {
119                        ObjectInstance objectInstance = null;
120                        try {
121                            setName(ds.otherName);
122                            Processor processor = procFactory.acquire(ds);
123                            objectInstance = registerComponent(name + ":Type=serversocket-connections,Name=" + dsName, processor, ds.toString());
124                            while (!closed) {
125//                                Transaction t = new Transaction(null, "Waiting for heartbeat", Timeout.parse(connectionTimeOut));
126//                                closeOnRollbackCurrent(ds);
127                                Processor.Status status = Processor.Status.FINISHED;
128                                try {
129                                    status = processor.onMessage(ds);
130                                } catch (IOException e) {
131                                    LOG.warn(name + ": Unexpected error from " + ds.otherName, e);
132                                } finally {
133                                    if (status == Processor.Status.FINISHED) {
134                                        Thread.interrupted();
135                                        IOUtils.close(ds);
136                                    }
137                                }
138                                // more output but not more input.
139                                if (status == Processor.Status.MORE_OUTPUT || status == Processor.Status.FINISHED)
140                                    break;
141                            }
142                        } catch (Throwable e) {
143                            LOG.error(name+": Unexpected exception", e);
144                        } finally {
145                            Thread.interrupted();
146                                unregisterComponent(objectInstance);
147                            setName("freed");
148                        }
149                        return null;
150                    }
151                };
152                if (LOG.isDebugEnabled()) LOG.debug(name+": Accepted connection " + s);
153                executor.submit(runnable);
154                t.commit();
155            } catch (IOException e) {
156                if (!closed)
157                    LOG.error(name+": Got an error while attempting to accept socket connection", e);
158            } finally{
159                if (t != null) t.complete();
160            }
161        }
162    }
163 
164    private void setName(String text) {
165        Thread.currentThread().setName(SERVER_THREAD + name + '$' + text);
166    }
167 
168    public interface Processor {
169        enum Status {
170            MORE_INPUT, MORE_OUTPUT, FINISHED
171        }
172 
173        /**
174         * Process messages fro the DataSocket.
175         *
176         * @return true if there are more messages.
177         */
178        public Status onMessage(DataSocket ds) throws IOException;
179    }
180 
181    private static class SDSThreadFactory implements ThreadFactory {
182        /**
183         * @noinspection CallToThreadSetPriority
184         */
185        public Thread newThread(Runnable r) {
186            Thread t = new Thread(r);
187            t.setDaemon(true);
188            // middle to low priority.
189            t.setPriority((Thread.NORM_PRIORITY + Thread.MIN_PRIORITY) / 2);
190            return t;
191        }
192    }
193}

[all classes][org.jtoolkit.essence.app.net]
EMMA 2.0.5312 (C) Vladimir Roubtsov