View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.jeromq;
19  
20  import java.io.Serializable;
21  import java.util.ArrayList;
22  import java.util.List;
23  
24  import org.apache.logging.log4j.core.Appender;
25  import org.apache.logging.log4j.core.Filter;
26  import org.apache.logging.log4j.core.Layout;
27  import org.apache.logging.log4j.core.LogEvent;
28  import org.apache.logging.log4j.core.appender.AbstractAppender;
29  import org.apache.logging.log4j.core.config.Node;
30  import org.apache.logging.log4j.core.config.Property;
31  import org.apache.logging.log4j.core.config.plugins.Plugin;
32  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33  import org.apache.logging.log4j.core.config.plugins.PluginElement;
34  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36  import org.apache.logging.log4j.core.layout.PatternLayout;
37  import org.apache.logging.log4j.util.Strings;
38  
39  /**
40   * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
41   * <p>
42   * Requires the JeroMQ jar (LGPL as of 0.3.5)
43   * </p>
44   */
45  // TODO
46  // Some methods are synchronized because a ZMQ.Socket is not thread-safe
47  // Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
48  // some issue on threads owning certain resources as opposed to others.
49  @Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
50  public final class JeroMqAppender extends AbstractAppender {
51  
52      private static final int DEFAULT_BACKLOG = 100;
53  
54      private static final int DEFAULT_IVL = 100;
55  
56      private static final int DEFAULT_RCV_HWM = 1000;
57  
58      private static final int DEFAULT_SND_HWM = 1000;
59  
60      private final JeroMqManager manager;
61      private final List<String> endpoints;
62      private int sendRcFalse;
63      private int sendRcTrue;
64  
65      private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
66              final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
67              final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
68              final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
69              final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
70              final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
71              final long tcpKeepAliveInterval, final boolean xpubVerbose) {
72          super(name, filter, layout, ignoreExceptions);
73          this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only,
74              linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax,
75              sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle,
76              tcpKeepAliveInterval, xpubVerbose, endpoints);
77          this.endpoints = endpoints;
78      }
79  
80      // The ZMQ.Socket class has other set methods that we do not cover because
81      // they throw unsupported operation exceptions.
82      @PluginFactory
83      public static JeroMqAppender createAppender(
84              // @formatter:off
85              @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
86              @PluginElement("Layout") Layout<?> layout,
87              @PluginElement("Filter") final Filter filter,
88              @PluginElement("Properties") final Property[] properties,
89              // Super attributes
90              @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
91              // ZMQ attributes; defaults picked from zmq.Options.
92              @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
93              @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
94              @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
95              @PluginAttribute(value = "identity") final byte[] identity,
96              @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
97              @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
98              @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
99              @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
100             @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
101             @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
102             @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
103             @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
104             @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
105             @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
106             @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
107             @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
108             @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
109             @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
110             @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
111             @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
112             // @formatter:on
113     ) {
114         if (layout == null) {
115             layout = PatternLayout.createDefaultLayout();
116         }
117         List<String> endpoints;
118         if (properties == null) {
119             endpoints = new ArrayList<>(0);
120         } else {
121             endpoints = new ArrayList<>(properties.length);
122             for (final Property property : properties) {
123                 if ("endpoint".equalsIgnoreCase(property.getName())) {
124                     final String value = property.getValue();
125                     if (Strings.isNotEmpty(value)) {
126                         endpoints.add(value);
127                     }
128                 }
129             }
130         }
131         LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
132                 name, filter, layout, ignoreExceptions, endpoints);
133         return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
134                 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
135                 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
136                 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
137     }
138 
139     @Override
140     public synchronized void append(final LogEvent event) {
141         final Layout<? extends Serializable> layout = getLayout();
142         final byte[] formattedMessage = layout.toByteArray(event);
143         if (manager.send(getLayout().toByteArray(event))) {
144             sendRcTrue++;
145         } else {
146             sendRcFalse++;
147             LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
148         }
149     }
150 
151     @Override
152     public void stop() {
153         manager.release();
154         super.stop();
155     }
156 
157     // not public, handy for testing
158     int getSendRcFalse() {
159         return sendRcFalse;
160     }
161 
162     // not public, handy for testing
163     int getSendRcTrue() {
164         return sendRcTrue;
165     }
166 
167     // not public, handy for testing
168     void resetSendRcs() {
169         sendRcTrue = sendRcFalse = 0;
170     }
171 
172     @Override
173     public String toString() {
174         return "JeroMqAppender{" +
175             "name=" + getName() +
176             ", state=" + getState() +
177             ", manager=" + manager +
178             ", endpoints=" + endpoints +
179             '}';
180     }
181 }