1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.jeromq;
19
20 import java.io.Serializable;
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import org.apache.logging.log4j.core.Appender;
25 import org.apache.logging.log4j.core.Filter;
26 import org.apache.logging.log4j.core.Layout;
27 import org.apache.logging.log4j.core.LogEvent;
28 import org.apache.logging.log4j.core.appender.AbstractAppender;
29 import org.apache.logging.log4j.core.config.Node;
30 import org.apache.logging.log4j.core.config.Property;
31 import org.apache.logging.log4j.core.config.plugins.Plugin;
32 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33 import org.apache.logging.log4j.core.config.plugins.PluginElement;
34 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36 import org.apache.logging.log4j.core.layout.PatternLayout;
37 import org.apache.logging.log4j.util.Strings;
38
39
40
41
42
43
44
45
46
47
48
49 @Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
50 public final class JeroMqAppender extends AbstractAppender {
51
52 private static final int DEFAULT_BACKLOG = 100;
53
54 private static final int DEFAULT_IVL = 100;
55
56 private static final int DEFAULT_RCV_HWM = 1000;
57
58 private static final int DEFAULT_SND_HWM = 1000;
59
60 private final JeroMqManager manager;
61 private final List<String> endpoints;
62 private int sendRcFalse;
63 private int sendRcTrue;
64
65 private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
66 final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
67 final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
68 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
69 final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
70 final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
71 final long tcpKeepAliveInterval, final boolean xpubVerbose) {
72 super(name, filter, layout, ignoreExceptions);
73 this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only,
74 linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax,
75 sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle,
76 tcpKeepAliveInterval, xpubVerbose, endpoints);
77 this.endpoints = endpoints;
78 }
79
80
81
82 @PluginFactory
83 public static JeroMqAppender createAppender(
84
85 @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
86 @PluginElement("Layout") Layout<?> layout,
87 @PluginElement("Filter") final Filter filter,
88 @PluginElement("Properties") final Property[] properties,
89
90 @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
91
92 @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
93 @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
94 @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
95 @PluginAttribute(value = "identity") final byte[] identity,
96 @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
97 @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
98 @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
99 @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
100 @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
101 @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
102 @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
103 @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
104 @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
105 @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
106 @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
107 @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
108 @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
109 @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
110 @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
111 @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
112
113 ) {
114 if (layout == null) {
115 layout = PatternLayout.createDefaultLayout();
116 }
117 List<String> endpoints;
118 if (properties == null) {
119 endpoints = new ArrayList<>(0);
120 } else {
121 endpoints = new ArrayList<>(properties.length);
122 for (final Property property : properties) {
123 if ("endpoint".equalsIgnoreCase(property.getName())) {
124 final String value = property.getValue();
125 if (Strings.isNotEmpty(value)) {
126 endpoints.add(value);
127 }
128 }
129 }
130 }
131 LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
132 name, filter, layout, ignoreExceptions, endpoints);
133 return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
134 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
135 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
136 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
137 }
138
139 @Override
140 public synchronized void append(final LogEvent event) {
141 final Layout<? extends Serializable> layout = getLayout();
142 final byte[] formattedMessage = layout.toByteArray(event);
143 if (manager.send(getLayout().toByteArray(event))) {
144 sendRcTrue++;
145 } else {
146 sendRcFalse++;
147 LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
148 }
149 }
150
151 @Override
152 public void stop() {
153 manager.release();
154 super.stop();
155 }
156
157
158 int getSendRcFalse() {
159 return sendRcFalse;
160 }
161
162
163 int getSendRcTrue() {
164 return sendRcTrue;
165 }
166
167
168 void resetSendRcs() {
169 sendRcTrue = sendRcFalse = 0;
170 }
171
172 @Override
173 public String toString() {
174 return "JeroMqAppender{" +
175 "name=" + getName() +
176 ", state=" + getState() +
177 ", manager=" + manager +
178 ", endpoints=" + endpoints +
179 '}';
180 }
181 }