View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.jeromq;
19  
20  import java.util.Arrays;
21  import java.util.List;
22  
23  import org.apache.logging.log4j.LogManager;
24  import org.apache.logging.log4j.core.appender.AbstractManager;
25  import org.apache.logging.log4j.core.appender.ManagerFactory;
26  import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
27  import org.apache.logging.log4j.util.PropertiesUtil;
28  import org.zeromq.ZMQ;
29  
30  /**
31   * Manager for publishing messages via JeroMq.
32   *
33   * @since 2.6
34   */
35  public class JeroMqManager extends AbstractManager {
36  
37      /**
38       * System property to enable shutdown hook.
39       */
40      public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
41  
42      /**
43       * System property to control JeroMQ I/O thread count.
44       */
45      public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
46  
47      private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
48      private static final ZMQ.Context CONTEXT;
49  
50      static {
51          LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
52  
53          final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
54          LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
55          CONTEXT = ZMQ.context(ioThreads);
56  
57          final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
58              SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
59          if (enableShutdownHook) {
60              ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
61                  @Override
62                  public void run() {
63                      CONTEXT.close();
64                  }
65              });
66          }
67      }
68  
69      private final ZMQ.Socket publisher;
70  
71      private JeroMqManager(final String name, final JeroMqConfiguration config) {
72          super(name);
73          publisher = CONTEXT.socket(ZMQ.PUB);
74          publisher.setAffinity(config.affinity);
75          publisher.setBacklog(config.backlog);
76          publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
77          if (config.identity != null) {
78              publisher.setIdentity(config.identity);
79          }
80          publisher.setIPv4Only(config.ipv4Only);
81          publisher.setLinger(config.linger);
82          publisher.setMaxMsgSize(config.maxMsgSize);
83          publisher.setRcvHWM(config.rcvHwm);
84          publisher.setReceiveBufferSize(config.receiveBufferSize);
85          publisher.setReceiveTimeOut(config.receiveTimeOut);
86          publisher.setReconnectIVL(config.reconnectIVL);
87          publisher.setReconnectIVLMax(config.reconnectIVLMax);
88          publisher.setSendBufferSize(config.sendBufferSize);
89          publisher.setSendTimeOut(config.sendTimeOut);
90          publisher.setSndHWM(config.sndHwm);
91          publisher.setTCPKeepAlive(config.tcpKeepAlive);
92          publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
93          publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
94          publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
95          publisher.setXpubVerbose(config.xpubVerbose);
96          for (final String endpoint : config.endpoints) {
97              publisher.bind(endpoint);
98          }
99          LOGGER.debug("Created JeroMqManager with {}", config);
100     }
101 
102     public boolean send(final byte[] data) {
103         return publisher.send(data);
104     }
105 
106     @Override
107     protected void releaseSub() {
108         publisher.close();
109     }
110 
111     public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
112                                                  final boolean delayAttachOnConnect, final byte[] identity,
113                                                  final boolean ipv4Only, final long linger, final long maxMsgSize,
114                                                  final long rcvHwm, final long receiveBufferSize,
115                                                  final int receiveTimeOut, final long reconnectIVL,
116                                                  final long reconnectIVLMax, final long sendBufferSize,
117                                                  final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
118                                                  final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
119                                                  final long tcpKeepAliveInterval, final boolean xpubVerbose,
120                                                  final List<String> endpoints) {
121         return getManager(name, FACTORY,
122             new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
123                 rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
124                 sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
125                 endpoints));
126     }
127 
128     public static ZMQ.Context getContext() {
129         return CONTEXT;
130     }
131 
132     private static class JeroMqConfiguration {
133         private final long affinity;
134         private final long backlog;
135         private final boolean delayAttachOnConnect;
136         private final byte[] identity;
137         private final boolean ipv4Only;
138         private final long linger;
139         private final long maxMsgSize;
140         private final long rcvHwm;
141         private final long receiveBufferSize;
142         private final int receiveTimeOut;
143         private final long reconnectIVL;
144         private final long reconnectIVLMax;
145         private final long sendBufferSize;
146         private final int sendTimeOut;
147         private final long sndHwm;
148         private final int tcpKeepAlive;
149         private final long tcpKeepAliveCount;
150         private final long tcpKeepAliveIdle;
151         private final long tcpKeepAliveInterval;
152         private final boolean xpubVerbose;
153         private final List<String> endpoints;
154 
155         private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
156                                     final byte[] identity, final boolean ipv4Only, final long linger,
157                                     final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
158                                     final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
159                                     final long sendBufferSize, final int sendTimeOut, final long sndHwm,
160                                     final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
161                                     final long tcpKeepAliveInterval, final boolean xpubVerbose,
162                                     final List<String> endpoints) {
163             this.affinity = affinity;
164             this.backlog = backlog;
165             this.delayAttachOnConnect = delayAttachOnConnect;
166             this.identity = identity;
167             this.ipv4Only = ipv4Only;
168             this.linger = linger;
169             this.maxMsgSize = maxMsgSize;
170             this.rcvHwm = rcvHwm;
171             this.receiveBufferSize = receiveBufferSize;
172             this.receiveTimeOut = receiveTimeOut;
173             this.reconnectIVL = reconnectIVL;
174             this.reconnectIVLMax = reconnectIVLMax;
175             this.sendBufferSize = sendBufferSize;
176             this.sendTimeOut = sendTimeOut;
177             this.sndHwm = sndHwm;
178             this.tcpKeepAlive = tcpKeepAlive;
179             this.tcpKeepAliveCount = tcpKeepAliveCount;
180             this.tcpKeepAliveIdle = tcpKeepAliveIdle;
181             this.tcpKeepAliveInterval = tcpKeepAliveInterval;
182             this.xpubVerbose = xpubVerbose;
183             this.endpoints = endpoints;
184         }
185 
186         @Override
187         public String toString() {
188             return "JeroMqConfiguration{" +
189                 "affinity=" + affinity +
190                 ", backlog=" + backlog +
191                 ", delayAttachOnConnect=" + delayAttachOnConnect +
192                 ", identity=" + Arrays.toString(identity) +
193                 ", ipv4Only=" + ipv4Only +
194                 ", linger=" + linger +
195                 ", maxMsgSize=" + maxMsgSize +
196                 ", rcvHwm=" + rcvHwm +
197                 ", receiveBufferSize=" + receiveBufferSize +
198                 ", receiveTimeOut=" + receiveTimeOut +
199                 ", reconnectIVL=" + reconnectIVL +
200                 ", reconnectIVLMax=" + reconnectIVLMax +
201                 ", sendBufferSize=" + sendBufferSize +
202                 ", sendTimeOut=" + sendTimeOut +
203                 ", sndHwm=" + sndHwm +
204                 ", tcpKeepAlive=" + tcpKeepAlive +
205                 ", tcpKeepAliveCount=" + tcpKeepAliveCount +
206                 ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
207                 ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
208                 ", xpubVerbose=" + xpubVerbose +
209                 ", endpoints=" + endpoints +
210                 '}';
211         }
212     }
213 
214     private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
215         @Override
216         public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
217             return new JeroMqManager(name, data);
218         }
219     }
220 }