EMMA Coverage Report (generated Tue Apr 17 08:51:20 BST 2007)
[all classes][org.jtoolkit.essence.app.net]

COVERAGE SUMMARY FOR SOURCE FILE [NetObject.java]

nameclass, %method, %block, %line, %
NetObject.java100% (6/6)77%  (27/35)43%  (643/1509)61%  (172.9/283)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class NetObject$EventListenerRunnable100% (1/1)67%  (2/3)20%  (58/284)52%  (14.6/28)
getClosingError (Exception): String 0%   (0/1)0%   (0/19)0%   (0/1)
run (): void 100% (1/1)19%  (49/256)48%  (11.6/24)
NetObject$EventListenerRunnable (NetObject, DataSocket): void 100% (1/1)100% (9/9)100% (3/3)
     
class NetObject$NetListenerSet100% (1/1)50%  (4/8)41%  (130/320)43%  (28.8/67)
add (Event$Listener): void 0%   (0/1)0%   (0/6)0%   (0/3)
disableListening (): void 0%   (0/1)0%   (0/54)0%   (0/12)
getIntrMesg (String): String 0%   (0/1)0%   (0/19)0%   (0/1)
remove (Event$Listener): void 0%   (0/1)0%   (0/9)0%   (0/3)
enableListening (): void 100% (1/1)53%  (111/211)54%  (22/41)
removeCallback (Callback): void 100% (1/1)78%  (7/9)92%  (2.8/3)
NetObject$NetListenerSet (NetObject): void 100% (1/1)100% (6/6)100% (1/1)
addCallback (Callback): void 100% (1/1)100% (6/6)100% (3/3)
     
class NetObject$ServerProcessor100% (1/1)70%  (7/10)44%  (191/433)62%  (53/86)
doVisit (DataSocket): Object 0%   (0/1)0%   (0/35)0%   (0/6)
getCmdError (DataSocket, char): String 0%   (0/1)0%   (0/14)0%   (0/1)
getFlushError (DataSocket, Exception): String 0%   (0/1)0%   (0/18)0%   (0/1)
doCmd (DataSocket, char): ServerDataSocket$Processor$Status 100% (1/1)28%  (36/128)40%  (9.2/23)
doCmd2 (DataSocket, char): Object 100% (1/1)40%  (10/25)50%  (3/6)
doListen (DataSocket): Object 100% (1/1)52%  (30/58)70%  (7/10)
doCall (DataSocket): Object 100% (1/1)52%  (25/48)86%  (6/7)
onMessage (DataSocket): ServerDataSocket$Processor$Status 100% (1/1)66%  (23/35)76%  (9.8/13)
doVisit0 (String, Visitor): Object 100% (1/1)92%  (55/60)93%  (13/14)
NetObject$ServerProcessor (Named$Source): void 100% (1/1)100% (12/12)100% (5/5)
     
class NetObject100% (1/1)100% (10/10)54%  (230/427)75%  (66.5/89)
visit (Visitor): Object 100% (1/1)42%  (140/333)65%  (40.9/63)
finalize (): void 100% (1/1)75%  (6/8)90%  (2.7/3)
<static initializer> 100% (1/1)86%  (12/14)92%  (1.8/2)
NetObject (String, String, Factory): void 100% (1/1)100% (30/30)100% (10/10)
call (String, Object []): Object 100% (1/1)100% (12/12)100% (3/3)
close (): void 100% (1/1)100% (7/7)100% (3/3)
getListenerSet (): Event$ListenerSet 100% (1/1)100% (3/3)100% (1/1)
getName (): String 100% (1/1)100% (13/13)100% (1/1)
isClosed (): boolean 100% (1/1)100% (3/3)100% (1/1)
setTimeOut (String): void 100% (1/1)100% (4/4)100% (2/2)
     
class NetObject$ServerProcessor$1100% (1/1)100% (2/2)65%  (20/31)67%  (6/9)
onCallback (Event): void 100% (1/1)50%  (11/22)62%  (5/8)
NetObject$ServerProcessor$1 (NetObject$ServerProcessor, DataSocket): void 100% (1/1)100% (9/9)100% (1/1)
     
class NetObject$ServerProcessorFactory100% (1/1)100% (2/2)100% (14/14)100% (4/4)
NetObject$ServerProcessorFactory (Named$Source): void 100% (1/1)100% (8/8)100% (3/3)
acquire (DataSocket): ServerDataSocket$Processor 100% (1/1)100% (6/6)100% (1/1)

1package org.jtoolkit.essence.app.net;
2 
3import org.apache.commons.logging.Log;
4import static org.apache.commons.logging.LogFactory.getLog;
5import org.jetbrains.annotations.NotNull;
6import org.jetbrains.annotations.Nullable;
7import org.jtoolkit.essence.ConfigProperties;
8import org.jtoolkit.essence.app.Container;
9import static org.jtoolkit.essence.app.pojo.DatableUtils.*;
10import org.jtoolkit.essence.concurrency.*;
11import org.jtoolkit.essence.data.Event;
12import org.jtoolkit.essence.data.Transaction;
13import static org.jtoolkit.essence.data.Transaction.start;
14import org.jtoolkit.essence.data.Visitor;
15import org.jtoolkit.essence.data.VisitorException;
16import org.jtoolkit.essence.data.impl.CallVisitor;
17import org.jtoolkit.essence.data.impl.ListenerSetImpl;
18import org.jtoolkit.essence.utils.Factory;
19import org.jtoolkit.essence.utils.IOUtils;
20import org.jtoolkit.essence.utils.Named;
21import org.jtoolkit.essence.utils.StringUtils;
22 
23import java.io.*;
24import java.lang.reflect.InvocationTargetException;
25import java.net.SocketException;
26import java.util.Arrays;
27import java.util.NoSuchElementException;
28 
29/*
30   Copyright 2006 Peter Lawrey
31 
32   Licensed under the Apache License, Version 2.0 (the "License");
33   you may not use this file except in compliance with the License.
34   You may obtain a copy of the License at
35 
36       http://www.apache.org/licenses/LICENSE-2.0
37 
38   Unless required by applicable law or agreed to in writing, software
39   distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
40   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
41   See the License for the specific language governing permissions and
42   limitations under the License.
43*/
44 
45/**
46 * Create a proxy object from a remote object over a DataSocket (using a factory).
47 *
48 * @author Peter Lawrey
49 */
50@ThreadSafe(Concurrency.CONCURRENT_READ_WRITE)
51@SuppressWarnings({"unchecked", "UnnecessaryFullyQualifiedName"})
52public class NetObject<V> implements Visitor.Visitable<V>, org.jtoolkit.essence.utils.Closeable, Named {
53    static final Log LOG = getLog(NetObject.class);
54    static final boolean callDebug = Boolean.getBoolean(ConfigProperties.ESSENCE_NETOBJECT_DEBUG) || LOG.isDebugEnabled();
55 
56    private static final char HEARTBEAT = 'H'; // HeartBeat
57    private static final char VISIT = 'V'; // Visit
58    private static final char CALL = 'C'; // Call
59    private static final char LISTEN = 'L'; // Listen
60    private static final char SUCCESS = 'Y'; // Yes
61    final Event.ListenerSet listenerSet = new NetListenerSet();
62    final String connectName;
63    final String componentName;
64    final Factory<String, DataSocket> factory;
65    boolean closed = false;
66    @SuppressWarnings({"FieldHasSetterButNoGetter"})
67    String timeOut = Timeout.NEVER_STR;
68    // set in synchronised context to block mutliple concurrent attempts to open, and non synchronised for close to allow close even if blocked.
69    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
70    DataSocket listeningDS = null;
71    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
72    Thread listeningThread = null;
73 
74    /**
75     * Create a proxy object from a remote object over a DataSocket (using a factory).
76     */
77    public NetObject(@NotNull String connectName, @NotNull String componentName, @NotNull Factory<String, DataSocket> factory) {
78        this.connectName = connectName;
79        this.componentName = componentName;
80        this.factory = factory;
81    }
82 
83    /**
84     * Do a remote method call.
85     */
86    @Nullable public <R> R call(@NotNull String method, Object... args) throws VisitorException {
87        if (Transaction.isTransactional())
88            Transaction.closeOnRollbackCurrent(this);
89        return visit(new CallVisitor<V, R>(method, args));
90    }
91 
92    public boolean isClosed() {
93        return closed;
94    }
95 
96    /**
97     * Close this net object.
98     */
99    public void close() {
100        IOUtils.close(factory);
101        closed = true;
102    }
103 
104    @NotNull public String getName() {
105        return connectName + '.' + componentName;
106    }
107 
108    public Event.ListenerSet getListenerSet() {
109        return listenerSet;
110    }
111 
112    /**
113     * @param timeOut to perform call/visit operations.
114     */
115    public void setTimeOut(String timeOut) {
116        this.timeOut = timeOut;
117    }
118 
119    /**
120     * Apply a Visitor to this object atomically.
121     */
122    @SuppressWarnings({"OverlyLongMethod"})
123    public <R> R visit(@NotNull Visitor<V, R> visitor) throws VisitorException {
124        if (closed)
125            throw new IllegalStateException(CLOSED);
126        Timeout timeout = Timeout.parse(timeOut);
127        int count = 0;
128        do {
129            DataSocket ds = null;
130            Transaction t = null;
131            String threadName = Thread.currentThread().getName();
132            try {
133                t = start("Applying Visitor", timeout);
134                ds = factory.acquire(connectName);
135                Transaction.closeOnRollbackCurrent(ds);
136                DataOutputStream out = ds.getOut();
137                if (visitor instanceof CallVisitor) {
138                    CallVisitor callVisitor = (CallVisitor) visitor;
139                    if (callDebug)
140                        LOG.info(threadName + ": Call " + connectName + ' ' + componentName + ' ' + callVisitor.method + ' ' + StringUtils.truncate(Arrays.asList(callVisitor.args)));
141                    out.writeChar(CALL);
142                    out.writeUTF(componentName);
143                    out.writeUTF(callVisitor.method);
144                    writeObjects(out, callVisitor.args);
145                } else {
146                    if (callDebug)
147                        LOG.info(threadName + ": Visit " + connectName + ' ' + componentName + ' ' + StringUtils.truncate(visitor));
148                    out.writeChar(VISIT);
149                    out.writeUTF(componentName);
150                    writeObject(out, visitor);
151                }
152                out.flush();
153                DataInput in = ds.getIn();
154                Object returnValue = readObject(in);
155                factory.release(ds);
156                ds = null;
157                if (callDebug)
158                    LOG.info(threadName + ": ... result " + connectName + ' ' + componentName + "' '" + StringUtils.truncate(returnValue) + '\'');
159                try {
160                    t.commit();
161                } catch (IllegalStateException ignored) {
162                    // ignored
163                }
164                if (returnValue instanceof RuntimeException)
165                    throw (RuntimeException) returnValue;
166                if (returnValue instanceof InvocationTargetException)
167                    throw new VisitorException(((Throwable) returnValue).getCause());
168                if (returnValue instanceof Throwable) {
169                    LOG.warn(getName() + ": Exception " + returnValue + " thrown calling " + componentName + '\n' + StringUtils.truncate(visitor));
170//                    CallVisitor callVisitor = (CallVisitor) visitor;
171//                    Map<String, Map> map = (Map<String, Map>) callVisitor.args[2];
172//                    Object[] objects = MapArray.asArray(map.values().iterator().next());
173                    throw new VisitorException((Throwable) returnValue);
174                }
175                //noinspection unchecked
176                return (R) returnValue;
177            } catch (NotSerializableException e) {
178                throw new VisitorException(e);
179            } catch (IOException e) {
180                // noinspection CallToThreadYield
181                Thread.yield();
182                if (closed) throw new IllegalStateException(CLOSED);
183                if (count > 1)
184                    LOG.warn(threadName + ": Retrying after error sending visitor " + visitor + ' ', e);
185            } catch (InterruptedException e) {
186                if (!closed)
187                    throw new VisitorException(connectName + ": Interrupted sending visitor " + visitor, e);
188                Thread.currentThread().interrupt();
189                break;
190            } finally {
191                // if it wasn't released.
192                IOUtils.close(ds);
193                Transaction.complete(t);
194            }
195            // noinspection CallToThreadYield
196            Thread.yield();
197            if (closed) throw new IllegalStateException(CLOSED);
198            count++;
199        } while (timeout.hasNotTimedOut());
200        String mesg = connectName + ": Timeout attempting to send " + visitor;
201        throw new VisitorException(new Timeout.TimeoutException(mesg));
202    }
203 
204    public static class ServerProcessorFactory extends Factory.AbstractFactory<DataSocket, ServerDataSocket.Processor> {
205        final Named.Source container;
206 
207        public ServerProcessorFactory(@NotNull Named.Source container) {
208            super(container.getName());
209            this.container = container;
210        }
211 
212        @NotNull public ServerDataSocket.Processor acquire(DataSocket description) {
213            return new ServerProcessor(container);
214        }
215    }
216 
217    static class ServerProcessor implements ServerDataSocket.Processor {
218        final Named.Source container;
219        boolean sendReply = false;
220        Status status = Status.MORE_INPUT;
221 
222        ServerProcessor(Named.Source container) {
223            this.container = container;
224        }
225 
226        public Status onMessage(DataSocket ds) throws IOException {
227            DataInputStream in = ds.getIn();
228 
229            sendReply = false;
230            char cmd;
231            try {
232                cmd = in.readChar();
233                if (cmd == DataSocket.BYE) return Status.FINISHED;
234            } catch (SocketException ignored) {
235                return Status.FINISHED;
236            } catch (EOFException ignored) {
237                return Status.FINISHED;
238            } catch (IOException e) {
239                if (e.getMessage().contains("closed")) return Status.FINISHED;
240                throw e;
241            }
242 
243            return doCmd(ds, cmd);
244        }
245 
246        private Status doCmd(DataSocket ds, char cmd) throws IOException {
247            DataOutputStream out = ds.getOut();
248            try {
249                sendReply = true;
250                Object reply;
251                try {
252                    reply = doCmd2(ds, cmd);
253                } catch (Exception e) {
254                    if (LOG.isInfoEnabled()) LOG.info(getCmdError(ds, cmd), e);
255                    reply = e;
256                } catch (Throwable e) {
257                    LOG.error(getCmdError(ds, cmd), e);
258                    reply = e;
259                }
260                if (callDebug && (cmd == CALL || cmd == VISIT))
261                    LOG.info(Thread.currentThread().getName() + ": ... reply to process cmd=" + cmd + ' ' + StringUtils.truncate(reply));
262                if (sendReply) {
263                    writeObject(out, reply);
264                }
265                return status;
266            } finally {
267                try {
268                    out.flush();
269                } catch (SocketException e) {
270                    if (sendReply && LOG.isDebugEnabled())
271                        LOG.debug(getFlushError(ds, e));
272                } catch (IOException e) {
273                    if (sendReply) LOG.warn(getFlushError(ds, e));
274                }
275            }
276        }
277 
278        private String getFlushError(DataSocket ds, Exception e) {
279            return container.getName() + ": DataSocket " + ds.otherName + " failed to flush " + e;
280        }
281 
282        private static String getCmdError(DataSocket ds, char cmd) {
283            return ds.otherName + ": cmd=" + cmd + " threw ";
284        }
285 
286        private Object doCmd2(DataSocket ds, char cmd) throws IOException {
287            switch (cmd) {
288                case HEARTBEAT:
289                    return "Hello";
290                case VISIT:
291                    return doVisit(ds);
292                case CALL:
293                    return doCall(ds);
294                case LISTEN:
295                    return doListen(ds);
296                default:
297                    return "Unknown command " + cmd;
298            }
299        }
300 
301        private Object doListen(final DataSocket ds) throws IOException {
302            String componentName = ds.getIn().readUTF();
303            Object component = container.getValue(componentName);
304            if (component instanceof Event.Listenable) {
305                ((Event.Listenable) component).getListenerSet().addCallback(new Callback<Event>() {
306                    public void onCallback(@NotNull Event event) throws IllegalStateException {
307                        try {
308                            DataOutputStream out = ds.getOut();
309                            writeObject(out, event);
310                            out.flush();
311                        } catch (IOException e1) {
312                            if (status != Status.FINISHED)
313                                throw new IllegalStateException(e1);
314                        }
315                    }
316                });
317                status = Status.MORE_OUTPUT;
318                // reply get jumbled with the events freom the listener so the first event is the reply.
319                sendReply = false;
320                return SUCCESS;
321            } else if (component != null) {
322                return Container.COMPONENT + componentName + " is not an Event.Listenable is " + component.getClass().getName();
323            } else {
324                return Container.COMPONENT + componentName + " not found.";
325            }
326        }
327 
328        private Object doVisit(DataSocket ds) throws IOException {
329            DataInputStream in = ds.getIn();
330            String componentName = in.readUTF();
331            Visitor<Object, Object> visitor = (Visitor<Object, Object>) readObject(in);
332 
333            if (callDebug)
334                LOG.info(Thread.currentThread().getName() + ": processing visit " + componentName + ' ' + StringUtils.truncate(visitor));
335            return doVisit0(componentName, visitor);
336        }
337 
338        private Object doCall(DataSocket ds) throws IOException {
339            DataInputStream in = ds.getIn();
340            String componentName = in.readUTF();
341            String method = in.readUTF();
342            Object[] args = readObjects(in, 0, Integer.MAX_VALUE);
343            if (callDebug)
344                LOG.info(Thread.currentThread().getName() + ": processing call " + componentName + ' ' + method + ' ' + StringUtils.truncate(Arrays.asList(args)));
345            return doVisit0(componentName, new CallVisitor<Object, Object>(method, args));
346        }
347 
348        private Object doVisit0(String componentName, Visitor<Object, Object> visitor) {
349            Object component = container.getValue(componentName);
350            if (component == null)
351                throw new NoSuchElementException(componentName);
352            try {
353                Object reply;
354                if (component instanceof Visitor.Visitable)
355                    reply = ((Visitor.Visitable) component).visit(visitor);
356                else
357                    reply = visitor.visit(component);
358                return reply;
359            } catch (VisitorException e) {
360                Throwable reply = e.getCause();
361                if (reply instanceof InvocationTargetException)
362                    reply = e.getCause();
363                if (LOG.isInfoEnabled())
364                    LOG.info(componentName + ": ... got visitor " + StringUtils.truncate(visitor) + ", reply=" + StringUtils.truncate(e));
365                return reply;
366            }
367        }
368    }
369 
370    private class EventListenerRunnable implements Runnable {
371        private final DataSocket ds;
372 
373        EventListenerRunnable(DataSocket ds) {
374            this.ds = ds;
375        }
376 
377        public void run() {
378            if (LOG.isDebugEnabled())
379                LOG.debug(componentName + " Listener started, listenerSet.isEmpty()=" + listenerSet.isEmpty());
380            listenerSet.notifyReset(true);
381            try {
382                while (!closed) {
383                    DataInputStream in = ds.getIn();
384                    Event event = (Event) readObject(in);
385                    if (LOG.isDebugEnabled())
386                        LOG.debug(componentName + " ... heard event" + event);
387                    listenerSet.onCallback(event);
388                }
389            } catch (SocketException e) {
390                if (!closed && LOG.isDebugEnabled())
391                    LOG.debug(getClosingError(e));
392            } catch (EOFException e) {
393                if (!closed && LOG.isDebugEnabled())
394                    LOG.debug(getClosingError(e));
395            } catch (Throwable e) {
396                if (!closed)
397                    LOG.error(connectName + ": Error reading event for " + componentName, e);
398            } finally {
399                IOUtils.close(ds);
400                listenerSet.notifyReset(false);
401                if (LOG.isDebugEnabled())
402                    LOG.debug(componentName + " Listener finished, listenerSet.isEmpty()=" + listenerSet.isEmpty());
403            }
404        }
405 
406        private String getClosingError(Exception e) {
407            return connectName + ": Closing connection after failing to read events for " + componentName + ' ' + e;
408        }
409    }
410 
411    class NetListenerSet extends ListenerSetImpl {
412 
413        public void addCallback(@NotNull Callback<Event> callback) {
414            super.addCallback(callback);
415            enableListening();
416        }
417 
418        public void add(@NotNull Event.Listener l) {
419            super.add(l);
420            enableListening();
421        }
422 
423        public void removeCallback(@NotNull Callback<Event> callback) {
424            super.removeCallback(callback);
425            if (isEmpty()) disableListening();
426        }
427 
428        public void remove(@NotNull Event.Listener l) {
429            super.remove(l);
430            if (isEmpty()) disableListening();
431        }
432 
433        private void enableListening() {
434            if (closed)
435                throw new IllegalStateException(CLOSED);
436            if (listeningThread != null && listeningThread.isAlive())
437                return;
438            Timeout timeout = Timeout.parse(timeOut);
439            synchronized (this) {
440                do {
441                    DataSocket ds = null;
442                    Transaction t = start("Enable listening", timeout);
443                    try {
444                        ds = factory.acquire(connectName);
445                        t.closeOnRollback(ds);
446 
447                        DataOutputStream out = ds.getOut();
448                        out.writeChar(LISTEN);
449                        out.writeUTF(componentName);
450                        out.flush();
451                        DataInputStream in = ds.getIn();
452                        Object ret = readObject(in);
453                        if (ret instanceof RuntimeException)
454                            throw (RuntimeException) ret;
455                        if (ret instanceof Throwable)
456                            throw new RuntimeException((Throwable) ret);
457                        if (!(ret instanceof Event))
458                            LOG.warn(componentName + ": Unexpected reply " + ret);
459 
460                        listeningDS = ds;
461                        listeningThread = new Thread(new EventListenerRunnable(ds), "lstn$" + connectName + '$' + componentName);
462                        listeningThread.setDaemon(true);
463                        listeningThread.start();
464                        t.commit();
465                        return;
466                    } catch (IOException e) {
467                        LOG.warn(getIntrMesg("IO Exception"), e);
468                    } catch (InterruptedException e) {
469                        LOG.warn(getIntrMesg(Threads.INTERRUPTED) + ' ' + e);
470                        break;
471                    } finally {
472                        t.complete();
473                    }
474                    IOUtils.close(ds);
475                    try {
476                        wait(2500);
477                    } catch (InterruptedException e) {
478                        throw new IllegalStateException(getIntrMesg(Threads.INTERRUPTED), e);
479                    }
480                } while (!timeout.hasNotTimedOut());
481            }
482            throw new Timeout.TimeoutException(getIntrMesg("Timed out"));
483        }
484 
485        private String getIntrMesg(String text) {
486            return connectName + ": " + text + " attempting to listen to " + componentName;
487        }
488 
489        private void disableListening() {
490            IOUtils.close(listeningDS);
491            listeningDS = null;
492            if (listeningThread != null) {
493                listeningThread.interrupt();
494                try {
495                    Timeout.parse(timeOut).join(listeningThread);
496                    if (listeningThread.isAlive())
497                        LOG.error(connectName + ": Unable to stop listening thread for " + componentName);
498                } catch (InterruptedException ignored) {
499                    Thread.currentThread().interrupt();
500                }
501                listeningThread = null;
502            }
503        }
504    }
505 
506    @SuppressWarnings({"FinalizeDeclaration"})
507    @Override protected void finalize() throws Throwable {
508        super.finalize();
509        if (!closed) close();
510    }
511}

[all classes][org.jtoolkit.essence.app.net]
EMMA 2.0.5312 (C) Vladimir Roubtsov