//////////////////// Licensed to Cloudera, Inc. under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. Cloudera, Inc. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. //////////////////// === Logical Configurations Manually configuring nodes in Flume is manageable for a small number of nodes, but can become burdensome for an operator as demands inevitably grow. Ideally, the operator only has to assign a role to a particular machine. Because configuration management is centralized via the Master, the Master potentially has all the information necessary to intelligently create a node topology and isolate flows of data from each other. To explain how this can be done, the concept of a *logical node* is introduced. To manage communications between logical nodes, the concepts of *logical sources* and *logical sinks* are introduced. To isolate different groups of nodes, the concept of a *flow* is introduced that allows you to group agents and collectors into separate and isolated groups. ==== Logical Nodes The logical node abstraction allows for each JVM instance (a physical node) to contain multiple logical nodes. This allows for the processing of many source sink combinations on many threads of execution to occur on a single JVM instance. Each logical node has a name that may be completely different from its physical name or hostname. You now need new operations that enable you to spawn a new node, map logical nodes to physical nodes, and decommission existing logical nodes. NOTE: The following commands are entered via the web interface using the "raw command" web page on the Master. You might prefer using the Flume command shell (described in a later section) for these operations. The same commands described in this section can be entered in web interface or entered at the command shell by prefixing the command with 'exec'. Suppose that initially you know you want an agent-collector topology, but you don’t know the particular names of the exact machines. For now, you can specify the configuration of the logical nodes without specifying any physical machine names. ---- agent1 : _source_ | autoBEChain ; collector1 : autoCollectorSource | collectorSink("hdfs://....") ; ---- Later you learn that host1 is the name of the agent1 machine and host2 is the name of the collector machine. You can 'map' logical nodes onto the physical Flume instances on host1 and host2 by issuing the following map commands: ---- map host1 agent1 map host2 collector1 ---- Afterwards, the node status table should display a new row of information for each logical node. Each logical node reports its own execution state, configuration, and heartbeat. There is also a new entry in the logical node mapping table showing that the logical node has been placed on the specified physical node. To configure the node's sources and sinks, use exactly the same mechanisms described in the previous sections. You can also remove a logical node by using the 'decommission' command. Suppose you no longer needed agent1 and wanted to "turn it off". You can do so by entering the following command: ---- decommission agent1 ---- This terminates the thread and removes the configuration associated with a logical node, and the mapping between the logical node and physical node. You can also move a logical node from one physical node to another by first unmapping a logical node and then mapping it on another physical node. In this scenario, you change the collector1 from being on host2 to host3. ---- unmap host2 collector1 ---- At this point, the logical node mapping is removed, and collector1 is not active anywhere. You can then map collector1 onto host3 by using the map command: ---- map host3 collector1 ---- NOTE: Logical nodes are not templates -- if you want to have the same source/sink pairs on a particular physical node, you need to have a logical node for each. When adminstering many logical nodes it is often useful to write a script that generates configurations and unique individual logical node names. Using the part of a host name is a common pattern. ==== Logical Sources and Logical Sinks WARNING: The logical sources and logical sinks feature does not currently work when using multiple masters. In the previous example, we used two abstractions under-the-covers that allow the specifications of a graph topology for communications 'without having to use physical hostnames and ports'. These abstractions -- the *logical source* and *logical sink* -- allow you to create a different graph topology without having to know physical machines until they are mapped. Suppose you have two nodes producing data and sending it to the consumer: ---- dataProducer1 : console | logicalSink("dataConsumer") ; dataProducer2 : console | logicalSink("dataConsumer") ; dataConsumer : logicalSource | console ; ---- Note that in this example, the destination argument is the *logical name* of the node and not a specific host/port combination. To implement these features, there is a generalized mechanism where users enter logical configurations that are 'translated' by the Master to a physical configuration. When the logical nodes get mapped to physical nodes: ---- map host1 dataProducer1 map host2 dataProducer2 map host3 dataConsumer ---- and after the Master learns the host names (the host1, host2, and host3 machine’s heartbeat against the Master), the Master has enough information to translate configurations with physical hostnames and ports. A possible translation would replace the logicalSource with a rpcSources and the logicalSink with an rpcSinks: ---- dataProducer1 : console | rpcSink("host3",56789) ; dataProducer2 : console | rpcSink("host3",56789) ; dataConsumer : rpcSource(56789) | console ; ---- In fact, auto agents and collectors, are another example of *translated sources and sinks*. These translate auto*Chain sinks and collectorSource into a configuration that uses logicalSinks and logicalSources which in turn are translated into physical rpcSource and rpcSinks instances. TIP: Translations are powerful and can be fairly smart; if new collectors are added, they will become new failover options. If collectors are removed, then the removed collectors will be automatically replaced by other failover nodes. ==== Flow Isolation WARNING: The automatic flow isolation feature does not currently work when using multiple masters. What happens if you want collect different kinds of data from the same physical node? For example, suppose you wanted to collect httpd logs as well as syslog logs from the same physical machine. Suppose also you want to write all of the syslog data from the cluster in one directory tree, and all of the httpd logs from the cluster in another. One approach is to tag all the data with source information and then push all the data down the same pipe. This could then be followed by some post- processing that demultiplexes (demuxes) the data into different buckets. Another approach is to keep the two sets of data logically isolated from each other the entire time and avoid post processing. Flume can do both approaches but enables the latter lower-latency approach, by introducing the concept of grouping nodes into *flows*. Since there are logical nodes that allow for multiple nodes on a single JVM, you can have a node for each different kinds of produced data. The following example shows how flows can be used. Start by having six logical nodes in the system. ---- fooNode1 : fooSrc | autoBEChain ; barNode1 : barSrc | autoBEChain ; forNode2 : fooSrc | autoBEChain ; barNode2 : barSrc | autoBEChain ; fooConsumer : autoCollectorSource | collectorSink("hdfs://nn/foodir") ; barConsumer : autoCollectorSource | collectorSink("hdfs://nn/bardir") ; ---- In this scenario, there are two physical machines that produce both kinds of data -- foo data and bar data. You want to send data to single collector that collects both foo data and bar data and writes it to different HDFS directories. You could then map the nodes onto physical nodes: ---- map host1 fooNode1 map host1 barNode1 map host2 fooNode2 map host2 barNode2 map host3 fooConsumer map host3 barConsumer ---- ["graphviz", "single-flow.png"] .Flume Flows: Single Flow --------------------------------------------------------------------- digraph single_flow { label="A single flume flow"; rankdir=LR; node [shape=record]; HDFS; node [shape=Mrecord] edge [style="bold", weight=10] fooNode1 -> fooConsumer -> HDFS; fooNode1 -> barConsumer -> HDFS; fooNode2 -> fooConsumer; fooNode2 -> barConsumer; barNode1 -> fooConsumer; barNode1 -> barConsumer; barNode2 -> fooConsumer; barNode2 -> barConsumer; subgraph cluster_host1 { label="host1"; fooNode1; barNode1; } subgraph cluster_host2 { label="host2"; fooNode2; barNode2; } subgraph cluster_host3 { label="host3"; fooConsumer; barConsumer } node [shape=none] "agent tier" -> "collector tier" -> "storage tier" } --------------------------------------------------------------------- This setup essentially instantiates the first approach. It mixes foo and bar data together since the translation of autoBEChain would see two collectorSources that the Master considers to be equivalent. Foo data will likely be sent to the barConsumer and bar data will likely be sent to fooConsumer. You really wanted to separate sources of information into logically isolated streams of data. Flume provides a grouping abstraction called a *flow*. A flow groups particular logical nodes together so that the different logical data types remain isolated. More concretely, it allows for a different failover chain for each kind of data in the Flume cluster. The auto*Chain based agents would only send data to collectors in the same flow group. This isolates data so that it only flows to nodes within the group. Currently, the compact form of the configuration language does not allow you to specify flows. Instead you must add an extra argument to the config command to specify a flow. This example shows commands that would be entered in the Flume shell without flow group information. In this case all of the nodes are in the same flow. ---- exec config fooNode1 fooSrc autoBEChain exec config barNode1 barSrc autoBEChain exec config fooNode2 fooSrc autoBEChain exec config barNode2 barSrc autoBEChain exec config fooConsumer autoCollectorSource 'collectorSink("hdfs://nn/foodir")' exec config barConsumer autoCollectorSource 'collectorSink("hdfs://nn/bardir")' ---- Now using the following commands you can specify flows by adding an extra parameter after the node name. In this example we have two flows: flowfoo and flowbar. flowfoo contains fooNode1, fooNode2 and fooConsumer. flowbar contains barNode1, barNode2 and barConsumer. ---- exec config fooNode1 flowfoo fooSrc autoBEChain exec config barNode1 flowbar barSrc autoBEChain exec config fooNode2 flowfoo fooSrc autoBEChain exec config barNode2 flowbar barSrc autoBEChain exec config fooConsumer flowfoo autoCollectorSource 'collectorSink("hdfs://nn/foodir")' exec config barConsumer flowbar autoCollectorSource 'collectorSink("hdfs://nn/bardir")' ---- ["graphviz", "multi-flow.png"] .Flume Flows: Multiple Flows --------------------------------------------------------------------- digraph multi_flow { label="Multiple isolated flume flows"; rankdir=LR; node [shape=record]; HDFS; node [shape=Mrecord] edge [style="bold", weight=10] fooNode1 -> fooConsumer -> HDFS; fooNode2 -> fooConsumer; barNode1 -> barConsumer -> HDFS; barNode2 -> barConsumer; subgraph cluster_host1 { label="host1"; fooNode1; barNode1; } subgraph cluster_host2 { label="host2"; fooNode2; barNode2; } subgraph cluster_host3 { label="host3"; fooConsumer; barConsumer } node [shape=none] "agent tier" -> "collector tier" -> "storage tier" } --------------------------------------------------------------------- By using these commands, the data from fooNode1 and fooNode2 will only be sent to fooConsumer, and barNode1 and barNode2's data will only be sent to barConsumer. Data from one node is not mixed with other data from other nodes unless explicitly connected. TIP: In practice it is a good idea to use different node names and different flow ids for different kinds of data. When node names are reused, the default behavior is to attempt to recover from failures assuming that leftover data from a crashed execution or previous source/sink configuration version are still producing the same kind of data. ==== Section Summary This section introduced logical nodes, logical sources, logical sinks, and flows and showed how these abstractions enable you to automatically deal with manageability problems. * Only one input source per physical node. * Multiple sets of isolated flows. * Being machine specific, having to know all physical host names and ports. The translation mechanism can be quite powerful. When coupled with metrics information, this could be used to perform automated dynamic configuration changes. A possible example would be to automatically commission or decommission new collectors to match diurnal traffic and load patterns.