EMMA Coverage Report (generated Tue Apr 17 08:51:20 BST 2007)
[all classes][org.jtoolkit.essence.app.net]

COVERAGE SUMMARY FOR SOURCE FILE [DataSocketFactory.java]

nameclass, %method, %block, %line, %
DataSocketFactory.java100% (1/1)100% (7/7)78%  (422/541)85%  (87.7/103)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class DataSocketFactory100% (1/1)100% (7/7)78%  (422/541)85%  (87.7/103)
debugClosing (Object): void 100% (1/1)24%  (4/17)59%  (1.2/2)
acquire (String): DataSocket 100% (1/1)47%  (29/62)67%  (8/12)
DataSocketFactory (String, String): void 100% (1/1)82%  (72/88)93%  (14/15)
createDataSocket (String): DataSocket 100% (1/1)82%  (190/231)87%  (34.7/40)
close (): void 100% (1/1)87%  (80/92)90%  (19/21)
release (DataSocket): void 100% (1/1)91%  (43/47)82%  (9.8/12)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)

1package org.jtoolkit.essence.app.net;
2 
3import org.apache.commons.logging.Log;
4import org.apache.commons.logging.LogFactory;
5import org.jetbrains.annotations.NotNull;
6import org.jetbrains.annotations.Nullable;
7import org.jtoolkit.essence.app.Main;
8import org.jtoolkit.essence.concurrency.Concurrency;
9import org.jtoolkit.essence.concurrency.ThreadSafe;
10import org.jtoolkit.essence.concurrency.Timeout;
11import org.jtoolkit.essence.data.Transaction;
12import org.jtoolkit.essence.utils.Factory;
13import org.jtoolkit.essence.utils.IOUtils;
14 
15import javax.management.ObjectInstance;
16import java.io.DataOutputStream;
17import java.io.IOException;
18import java.net.InetSocketAddress;
19import java.net.Socket;
20import java.net.UnknownHostException;
21import java.util.*;
22import java.util.concurrent.ConcurrentHashMap;
23import java.util.concurrent.ConcurrentMap;
24import java.util.concurrent.LinkedBlockingQueue;
25 
26/*
27   Copyright 2006 Peter Lawrey
28 
29   Licensed under the Apache License, Version 2.0 (the "License");
30   you may not use this file except in compliance with the License.
31   You may obtain a copy of the License at
32 
33       http://www.apache.org/licenses/LICENSE-2.0
34 
35   Unless required by applicable law or agreed to in writing, software
36   distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
37   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
38   See the License for the specific language governing permissions and
39   limitations under the License.
40*/
41 
42/**
43 * A Factory which creates DataSocket's
44 *
45 * @author Peter Lawrey
46 */
47@ThreadSafe(Concurrency.CONCURRENT_READ_WRITE)
48public class DataSocketFactory extends Factory.AbstractFactory<String, DataSocket> {
49    private static final Log LOG = LogFactory.getLog(DataSocketFactory.class);
50 
51    private final String containerName;
52    private final String[] hosts;
53    private final int[] ports;
54    private final ConcurrentMap<String, Queue<DataSocket>> dataSockets = new ConcurrentHashMap<String, Queue<DataSocket>>(256, 0.5f, 16);
55    private final Set<Socket> openningSockets = Collections.synchronizedSet(new LinkedHashSet<Socket>());
56 
57    private int last = 0;
58 
59    public DataSocketFactory(@NotNull String containerName, @NotNull String hostnamePorts) {
60        super(hostnamePorts);
61        this.containerName = containerName;
62        String[] hostnamePortsArr = hostnamePorts.split(",");
63        hosts = new String[hostnamePortsArr.length];
64        ports = new int[hostnamePortsArr.length];
65        for (int i = 0; i < hostnamePortsArr.length; i++) {
66            String[] parts = hostnamePortsArr[i].split(":");
67            if (parts.length != 2)
68                throw new IllegalArgumentException("Unable to decode hostanme:port '" + hostnamePortsArr[i] + '\'');
69            hosts[i] = parts[0];
70            ports[i] = Integer.valueOf(parts[1]);
71        }
72    }
73 
74    @NotNull public DataSocket acquire(@NotNull String description) throws InterruptedException {
75        Queue<DataSocket> dsq = dataSockets.get(description);
76        if (dsq != null) {
77            while (!dsq.isEmpty()) {
78                DataSocket socket = dsq.poll();
79                if (socket != null && !socket.isClosing())
80                    return socket;
81            }
82        }
83 
84        if (LOG.isDebugEnabled())
85            LOG.debug(containerName + ' ' + getName() + ": creating a new connection for " + description); // dataSockets=" + dataSockets, new Throwable("here"));
86 
87        try {
88            return createDataSocket(description);
89        } catch (UnknownHostException e) {
90            throw new IllegalStateException("Connecting to " + description, e);
91        }
92    }
93 
94    private DataSocket createDataSocket(String description) throws InterruptedException, UnknownHostException {
95        int count = 0;
96        while (!closed.get()) {
97            Exception lastException = null;
98            for (int i = 0; i < hosts.length; i++) {
99                Socket openningSocket = null;
100                try {
101//                        new Throwable(containerName+" creating "+description).printStackTrace();
102                    String host = hosts[last];
103                    int port = ports[last];
104                    Map<String, String> properties = new LinkedHashMap<String, String>();
105                    if (host.endsWith(".gz")) {
106                        host = host.substring(0, host.length() - 3);
107                        properties.put(DataSocket.IS_COMPRESSED, "true");
108                    }
109                    openningSocket = new Socket();
110                    openningSockets.add(openningSocket);
111                    Transaction.closeOnRollbackCurrent(openningSocket);
112                    openningSocket.connect(new InetSocketAddress(host, port));
113                    DataSocket dataSocket = new DataSocket(containerName, description, openningSocket, properties);
114                    String dsName = containerName + ":Type=socket-connections,Name=" + dataSocket.otherName;
115                    ObjectInstance objectInstance = Main.registerComponent(dsName, dataSocket, description + ' ' + hosts[last] + ':' + port);
116                    if (objectInstance != null)
117                        dataSocket.setObjectName(objectInstance.getObjectName());
118                    return dataSocket;
119                } catch (UnknownHostException uhe) {
120                    throw uhe;
121                } catch (IOException e) {
122                    if (count > 1)
123                        LOG.warn(containerName + ": Failure attempting to connect to " + hosts[i] + ':' + ports[i] + ' ' + e);
124                    lastException = e;
125                } finally {
126                    openningSockets.remove(openningSocket);
127                }
128                last = (last + 1) % hosts.length;
129            }
130            if (closed.get()) break;
131 
132            count++;
133            long millisLeft = Transaction.getMillisLeft(count * 250);
134            if (millisLeft < 2500) {
135                Timeout.TimeoutException ie = new Timeout.TimeoutException("Timed out on " + description);
136                ie.initCause(lastException);
137                throw ie;
138            }
139            Thread.sleep(2500);
140        }
141        throw new IllegalStateException("Factory closed.");
142    }
143 
144    public void release(@Nullable DataSocket dataSocket) {
145        if (dataSocket == null || dataSocket.isClosing()) return;
146        if (closed.get()) {
147            dataSocket.close();
148            return;
149        }
150        String name = dataSocket.getName();
151        Queue<DataSocket> dsq = dataSockets.get(name);
152        if (dsq == null) {
153            dataSockets.putIfAbsent(name, new LinkedBlockingQueue<DataSocket>());
154            dsq = dataSockets.get(name);
155        }
156        if (!dsq.contains(dataSocket))
157            dsq.add(dataSocket);
158    }
159 
160    public void close() {
161        super.close();
162        synchronized (openningSockets) {
163            for (Socket openningSocket : openningSockets) {
164                debugClosing(openningSocket);
165                IOUtils.close(openningSocket);
166            }
167            openningSockets.clear();
168        }
169        synchronized (dataSockets) {
170            for (Queue<DataSocket> dsq : dataSockets.values()) {
171                for (DataSocket ds : dsq) {
172                    DataOutputStream out = ds.getOut();
173                    if (out == null) continue;
174                    debugClosing(ds);
175                    try {
176                        out.writeChar(DataSocket.BYE);
177                    } catch (IOException ignored) {
178                        // ignored.
179                    }
180                    IOUtils.close(ds);
181                }
182            }
183            dataSockets.clear();
184        }
185    }
186 
187    private void debugClosing(Object socket) {
188        if (LOG.isDebugEnabled()) LOG.debug(name + ": Closing " + socket);
189    }
190}

[all classes][org.jtoolkit.essence.app.net]
EMMA 2.0.5312 (C) Vladimir Roubtsov