| 1 | package org.jtoolkit.essence.app.net; |
| 2 | |
| 3 | import org.apache.commons.logging.Log; |
| 4 | import static org.apache.commons.logging.LogFactory.getLog; |
| 5 | import org.jetbrains.annotations.NotNull; |
| 6 | import static org.jtoolkit.essence.app.Main.registerComponent; |
| 7 | import static org.jtoolkit.essence.app.Main.unregisterComponent; |
| 8 | import org.jtoolkit.essence.app.Container; |
| 9 | import org.jtoolkit.essence.concurrency.ThreadSafe; |
| 10 | import org.jtoolkit.essence.concurrency.Concurrency; |
| 11 | import org.jtoolkit.essence.data.Transaction; |
| 12 | import static org.jtoolkit.essence.data.Transaction.closeOnRollbackCurrent; |
| 13 | import static org.jtoolkit.essence.data.Transaction.start; |
| 14 | import org.jtoolkit.essence.utils.Closeable; |
| 15 | import org.jtoolkit.essence.utils.Factory; |
| 16 | import org.jtoolkit.essence.utils.Named; |
| 17 | import org.jtoolkit.essence.utils.IOUtils; |
| 18 | |
| 19 | import javax.management.ObjectInstance; |
| 20 | import java.io.IOException; |
| 21 | import java.net.ServerSocket; |
| 22 | import java.net.Socket; |
| 23 | import java.util.LinkedHashMap; |
| 24 | import java.util.Map; |
| 25 | import java.util.WeakHashMap; |
| 26 | import java.util.concurrent.*; |
| 27 | |
| 28 | /* |
| 29 | Copyright 2006 Peter Lawrey |
| 30 | |
| 31 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 32 | you may not use this file except in compliance with the License. |
| 33 | You may obtain a copy of the License at |
| 34 | |
| 35 | http://www.apache.org/licenses/LICENSE-2.0 |
| 36 | |
| 37 | Unless required by applicable law or agreed to in writing, software |
| 38 | distributed under the License is distributed on an "AS IS" BASIS, |
| 39 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 40 | See the License for the specific language governing permissions and |
| 41 | limitations under the License. |
| 42 | */ |
| 43 | |
| 44 | /** |
| 45 | * This acts a Server for accepting NetObject connections. |
| 46 | * |
| 47 | * @author Peter Lawrey |
| 48 | */ |
| 49 | @ThreadSafe(Concurrency.SYNCHRONIZED) |
| 50 | public class ServerDataSocket implements Named, Closeable, Runnable { |
| 51 | private static final Log LOG = getLog(ServerDataSocket.class); |
| 52 | private static final String SERVER_THREAD = "svr$"; |
| 53 | private static final DataSocket[] NO_DATA_SOCKETS = new DataSocket[0]; |
| 54 | |
| 55 | private final BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>(true); |
| 56 | private final ExecutorService executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, workQueue, new SDSThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); |
| 57 | private final String name; |
| 58 | private final ServerSocket ss; |
| 59 | private final Factory<DataSocket, Processor> procFactory; |
| 60 | private final Map<DataSocket, Void> dsSet = new WeakHashMap<DataSocket, Void>(); |
| 61 | private boolean closed = false; |
| 62 | |
| 63 | public ServerDataSocket(@NotNull String name, int port, @NotNull Factory<DataSocket, Processor> procFactory) throws IOException { |
| 64 | this.name = name; |
| 65 | this.procFactory = procFactory; |
| 66 | ss = new ServerSocket(port); |
| 67 | executor.execute(this); |
| 68 | } |
| 69 | |
| 70 | public void close() { |
| 71 | if (LOG.isDebugEnabled()) LOG.debug(name + ": Closing server port " + ss.getLocalPort()); |
| 72 | closed = true; |
| 73 | try { |
| 74 | ss.close(); |
| 75 | } catch (IOException ignored) { |
| 76 | // ignored. |
| 77 | } |
| 78 | DataSocket[] sockets = dsSet.keySet().toArray(NO_DATA_SOCKETS); |
| 79 | for (DataSocket ds : sockets) |
| 80 | ds.close(); |
| 81 | executor.shutdownNow(); |
| 82 | } |
| 83 | |
| 84 | @NotNull public String getName() { |
| 85 | return name; |
| 86 | } |
| 87 | |
| 88 | public boolean isClosed() { |
| 89 | return closed; |
| 90 | } |
| 91 | |
| 92 | public void run() { |
| 93 | try { |
| 94 | run0(); |
| 95 | } catch (Throwable t) { |
| 96 | LOG.error(name + ": Threw a fatal error ", t); |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | private void run0() { |
| 101 | setName("acceptor"); |
| 102 | if (LOG.isDebugEnabled()) LOG.debug(name + ": Listening on server port " + ss.getLocalPort()); |
| 103 | Map<String, String> properties = new LinkedHashMap<String, String>(); |
| 104 | properties.put(Container.DEFAULT_CONTAINER_NAME, procFactory.getName()); |
| 105 | |
| 106 | int counter = 0; |
| 107 | while (!closed) { |
| 108 | Transaction t = null; |
| 109 | try { |
| 110 | Socket s = ss.accept(); |
| 111 | t = start("Waiting to establish connection"); |
| 112 | closeOnRollbackCurrent(s); |
| 113 | // f = Threads.closeOnTimeout("Waiting to establish connection", s, Timeout.parse(connectionTimeOut)); |
| 114 | final DataSocket ds = new DataSocket(name, String.valueOf(counter), s, properties); |
| 115 | final String dsName = ds.otherName + '-' + counter++; |
| 116 | dsSet.put(ds, null); |
| 117 | Callable<Void> runnable = new Callable<Void>() { |
| 118 | public Void call() { |
| 119 | ObjectInstance objectInstance = null; |
| 120 | try { |
| 121 | setName(ds.otherName); |
| 122 | Processor processor = procFactory.acquire(ds); |
| 123 | objectInstance = registerComponent(name + ":Type=serversocket-connections,Name=" + dsName, processor, ds.toString()); |
| 124 | while (!closed) { |
| 125 | // Transaction t = new Transaction(null, "Waiting for heartbeat", Timeout.parse(connectionTimeOut)); |
| 126 | // closeOnRollbackCurrent(ds); |
| 127 | Processor.Status status = Processor.Status.FINISHED; |
| 128 | try { |
| 129 | status = processor.onMessage(ds); |
| 130 | } catch (IOException e) { |
| 131 | LOG.warn(name + ": Unexpected error from " + ds.otherName, e); |
| 132 | } finally { |
| 133 | if (status == Processor.Status.FINISHED) { |
| 134 | Thread.interrupted(); |
| 135 | IOUtils.close(ds); |
| 136 | } |
| 137 | } |
| 138 | // more output but not more input. |
| 139 | if (status == Processor.Status.MORE_OUTPUT || status == Processor.Status.FINISHED) |
| 140 | break; |
| 141 | } |
| 142 | } catch (Throwable e) { |
| 143 | LOG.error(name+": Unexpected exception", e); |
| 144 | } finally { |
| 145 | Thread.interrupted(); |
| 146 | unregisterComponent(objectInstance); |
| 147 | setName("freed"); |
| 148 | } |
| 149 | return null; |
| 150 | } |
| 151 | }; |
| 152 | if (LOG.isDebugEnabled()) LOG.debug(name+": Accepted connection " + s); |
| 153 | executor.submit(runnable); |
| 154 | t.commit(); |
| 155 | } catch (IOException e) { |
| 156 | if (!closed) |
| 157 | LOG.error(name+": Got an error while attempting to accept socket connection", e); |
| 158 | } finally{ |
| 159 | if (t != null) t.complete(); |
| 160 | } |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | private void setName(String text) { |
| 165 | Thread.currentThread().setName(SERVER_THREAD + name + '$' + text); |
| 166 | } |
| 167 | |
| 168 | public interface Processor { |
| 169 | enum Status { |
| 170 | MORE_INPUT, MORE_OUTPUT, FINISHED |
| 171 | } |
| 172 | |
| 173 | /** |
| 174 | * Process messages fro the DataSocket. |
| 175 | * |
| 176 | * @return true if there are more messages. |
| 177 | */ |
| 178 | public Status onMessage(DataSocket ds) throws IOException; |
| 179 | } |
| 180 | |
| 181 | private static class SDSThreadFactory implements ThreadFactory { |
| 182 | /** |
| 183 | * @noinspection CallToThreadSetPriority |
| 184 | */ |
| 185 | public Thread newThread(Runnable r) { |
| 186 | Thread t = new Thread(r); |
| 187 | t.setDaemon(true); |
| 188 | // middle to low priority. |
| 189 | t.setPriority((Thread.NORM_PRIORITY + Thread.MIN_PRIORITY) / 2); |
| 190 | return t; |
| 191 | } |
| 192 | } |
| 193 | } |