1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.jeromq;
19
20 import java.util.Arrays;
21 import java.util.List;
22
23 import org.apache.logging.log4j.LogManager;
24 import org.apache.logging.log4j.core.appender.AbstractManager;
25 import org.apache.logging.log4j.core.appender.ManagerFactory;
26 import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
27 import org.apache.logging.log4j.util.PropertiesUtil;
28 import org.zeromq.ZMQ;
29
30
31
32
33
34
35 public class JeroMqManager extends AbstractManager {
36
37
38
39
40 public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
41
42
43
44
45 public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
46
47 private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
48 private static final ZMQ.Context CONTEXT;
49
50 static {
51 LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
52
53 final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
54 LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
55 CONTEXT = ZMQ.context(ioThreads);
56
57 final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
58 SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
59 if (enableShutdownHook) {
60 ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
61 @Override
62 public void run() {
63 CONTEXT.close();
64 }
65 });
66 }
67 }
68
69 private final ZMQ.Socket publisher;
70
71 private JeroMqManager(final String name, final JeroMqConfiguration config) {
72 super(name);
73 publisher = CONTEXT.socket(ZMQ.PUB);
74 publisher.setAffinity(config.affinity);
75 publisher.setBacklog(config.backlog);
76 publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
77 if (config.identity != null) {
78 publisher.setIdentity(config.identity);
79 }
80 publisher.setIPv4Only(config.ipv4Only);
81 publisher.setLinger(config.linger);
82 publisher.setMaxMsgSize(config.maxMsgSize);
83 publisher.setRcvHWM(config.rcvHwm);
84 publisher.setReceiveBufferSize(config.receiveBufferSize);
85 publisher.setReceiveTimeOut(config.receiveTimeOut);
86 publisher.setReconnectIVL(config.reconnectIVL);
87 publisher.setReconnectIVLMax(config.reconnectIVLMax);
88 publisher.setSendBufferSize(config.sendBufferSize);
89 publisher.setSendTimeOut(config.sendTimeOut);
90 publisher.setSndHWM(config.sndHwm);
91 publisher.setTCPKeepAlive(config.tcpKeepAlive);
92 publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
93 publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
94 publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
95 publisher.setXpubVerbose(config.xpubVerbose);
96 for (final String endpoint : config.endpoints) {
97 publisher.bind(endpoint);
98 }
99 LOGGER.debug("Created JeroMqManager with {}", config);
100 }
101
102 public boolean send(final byte[] data) {
103 return publisher.send(data);
104 }
105
106 @Override
107 protected void releaseSub() {
108 publisher.close();
109 }
110
111 public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
112 final boolean delayAttachOnConnect, final byte[] identity,
113 final boolean ipv4Only, final long linger, final long maxMsgSize,
114 final long rcvHwm, final long receiveBufferSize,
115 final int receiveTimeOut, final long reconnectIVL,
116 final long reconnectIVLMax, final long sendBufferSize,
117 final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
118 final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
119 final long tcpKeepAliveInterval, final boolean xpubVerbose,
120 final List<String> endpoints) {
121 return getManager(name, FACTORY,
122 new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
123 rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
124 sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
125 endpoints));
126 }
127
128 public static ZMQ.Context getContext() {
129 return CONTEXT;
130 }
131
132 private static class JeroMqConfiguration {
133 private final long affinity;
134 private final long backlog;
135 private final boolean delayAttachOnConnect;
136 private final byte[] identity;
137 private final boolean ipv4Only;
138 private final long linger;
139 private final long maxMsgSize;
140 private final long rcvHwm;
141 private final long receiveBufferSize;
142 private final int receiveTimeOut;
143 private final long reconnectIVL;
144 private final long reconnectIVLMax;
145 private final long sendBufferSize;
146 private final int sendTimeOut;
147 private final long sndHwm;
148 private final int tcpKeepAlive;
149 private final long tcpKeepAliveCount;
150 private final long tcpKeepAliveIdle;
151 private final long tcpKeepAliveInterval;
152 private final boolean xpubVerbose;
153 private final List<String> endpoints;
154
155 private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
156 final byte[] identity, final boolean ipv4Only, final long linger,
157 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
158 final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
159 final long sendBufferSize, final int sendTimeOut, final long sndHwm,
160 final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
161 final long tcpKeepAliveInterval, final boolean xpubVerbose,
162 final List<String> endpoints) {
163 this.affinity = affinity;
164 this.backlog = backlog;
165 this.delayAttachOnConnect = delayAttachOnConnect;
166 this.identity = identity;
167 this.ipv4Only = ipv4Only;
168 this.linger = linger;
169 this.maxMsgSize = maxMsgSize;
170 this.rcvHwm = rcvHwm;
171 this.receiveBufferSize = receiveBufferSize;
172 this.receiveTimeOut = receiveTimeOut;
173 this.reconnectIVL = reconnectIVL;
174 this.reconnectIVLMax = reconnectIVLMax;
175 this.sendBufferSize = sendBufferSize;
176 this.sendTimeOut = sendTimeOut;
177 this.sndHwm = sndHwm;
178 this.tcpKeepAlive = tcpKeepAlive;
179 this.tcpKeepAliveCount = tcpKeepAliveCount;
180 this.tcpKeepAliveIdle = tcpKeepAliveIdle;
181 this.tcpKeepAliveInterval = tcpKeepAliveInterval;
182 this.xpubVerbose = xpubVerbose;
183 this.endpoints = endpoints;
184 }
185
186 @Override
187 public String toString() {
188 return "JeroMqConfiguration{" +
189 "affinity=" + affinity +
190 ", backlog=" + backlog +
191 ", delayAttachOnConnect=" + delayAttachOnConnect +
192 ", identity=" + Arrays.toString(identity) +
193 ", ipv4Only=" + ipv4Only +
194 ", linger=" + linger +
195 ", maxMsgSize=" + maxMsgSize +
196 ", rcvHwm=" + rcvHwm +
197 ", receiveBufferSize=" + receiveBufferSize +
198 ", receiveTimeOut=" + receiveTimeOut +
199 ", reconnectIVL=" + reconnectIVL +
200 ", reconnectIVLMax=" + reconnectIVLMax +
201 ", sendBufferSize=" + sendBufferSize +
202 ", sendTimeOut=" + sendTimeOut +
203 ", sndHwm=" + sndHwm +
204 ", tcpKeepAlive=" + tcpKeepAlive +
205 ", tcpKeepAliveCount=" + tcpKeepAliveCount +
206 ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
207 ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
208 ", xpubVerbose=" + xpubVerbose +
209 ", endpoints=" + endpoints +
210 '}';
211 }
212 }
213
214 private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
215 @Override
216 public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
217 return new JeroMqManager(name, data);
218 }
219 }
220 }