flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Johnson <st...@webninja.com>
Subject Some test findings and insight appreciated
Date Wed, 05 Sep 2012 16:37:40 GMT
Hi all, I wanted to share some of my test findings/concerns, etc.. First
off, I apologize for this being so verbose, but I feel I need to give a
little bit of a background into our setup and needs to show the big
picture.  Please ignore this if your not interested, you've been warned...

But if you are, great, cause I do have some valid questions to follow and
really looking forward to any constructive comments.

Prior to a few weeks back, I have zero experience with Flume.  Have been
familiar with it's existence for some time (about a year) but nothing more
than that.

My company, generates about 8billion log records per day, spread across 5
dataceters, with about 200 servers in each location.  So about 1.6 billion
per day in each cage.  We're growing and shotting to increase that to about
30billion per day based on holiday traffic growth and our companies growth.
 These log records are currently hourly rotated logback(slf4j) generated
logs from our java applications, containing tab delimited ascii data of
various widths.  There's probably 25 different log types we collect, but
generally all the same format, some average record lengths of 50-60 bytes,
while some others average 1k in width.

Right now, we collect them using a custom built java scheduling
application.  We have a machine dedicated to this at each DC.  This box
fires off some hourly jobs (within minutes after log rotations) that pulls
all the logs from the 200+ servers (some servers generate up to 10
different log types per hour), uncompressed.  We used to pull directly to
our central location, and would initiate compression on the servers
themselves, but this generated CPU/IO spikes every hour that were
causing performance issues.  So we put a remote machine in each node to
handle local collection.  They pull all the logs files locally first, then
compress, then move into a queue.  This happens across all 5 dc's in
parallel.  We have another set of schedulers in our central location that
then each collect from those remote nodes.  Pull them locally, then we do
some ETL work and load the raw log data into our Greenplum warehouse for
nightly aggregations and analysis.

This is obviously becoming very cumbersome to maintain, as we have right
now, 10 different schedulers running over 6 locations.  Also, to guarantee
we've fetched every log file, and also to guarantee we haven' double-loaded
any raw data (this data has only a logrec that's maintained globally to
guarantee uniqueness, so removing dupes is a nightmare, so we like to avoid
that), we have to track every file pickup, for each hop (currently tracked
in a postgresql db) and then use that for validation and to also make sure
we don't pull a rotated log again (logs stay archived on their original
servers for 7 days).

A couple years back when we had 1 or even 2 dc's with only about 30 servers
in each, this wasn't so bad.  But as you can imaging, we're looking at over
80k files generated per day to track and manage.  When things  run smooth,
it's great, when we have issues, it's a pain to dig into.

So what are the requirements I'm looking at for a replacement of said
system?

1. Less, or no custom configuration, must be drop-in and go environment,
right now, as we add/remove servers, I have to edit a lot of db records to
tell the schedulers which servers have which types of logs, I also need to
replicate it out, reload the configs and make sure log sources have ssh
keys installed, etc.
2. Must be able to compress data going between flume agents in remote DC's
to the flume agents in our central location. (bandwith for this kind of
data is not cheap, right now by gzipping the hourly logs locally before we
transfer, we get between about  6:1 to a 10:1 compression ratio depending
on the log type.
3. Must be able to handle the throughput.
4. Must be transcriptional and recoverable, many of these logs correlate
directly to revenue, we must not lose data.
5. Scalable.

>From reading the docs I believe Flume is a possible solution.

Forward to today...

Flume Agent Config:
Version flume-ng 1.2.0:
JAVA_OPTS="-Xms1g -Xmx5g -Dcom.sun.management.jmxremote
-XX:MaxDirectMemorySize=5g"
This is running on a 16 core Intel Xeon 2.4ghz with 48Gb ram, and local
drives running raid5/xfs(not sure of the rpm's, but they're pretty fast).


Testing/Flume Setup:
testagent.sources = netcatSource
testagent.channels = memChannel
testagent.sinks = fileSink

testagent.sources.netcatSource.type = netcat
testagent.sources.netcatSource.channels = memChannel
testagent.sinks.fileSink.type = FILE_ROLL
testagent.sinks.fileSink.channel = memChannel
testagent.channels.memChannel.type = memory

testagent.sources.netcatSource.bind = 0.0.0.0
testagent.sources.netcatSource.port = 6172
testagent.sources.netcatSource.max-line-length = 65536
testagent.channels.memChannel.capacity = 4294967296
testagent.sinks.fileSink.sink.directory =
/opt/dotomi/flume-data/sink/file_roll
testagent.sinks.fileSink.sink.rollInterval = 0

I took one of our production servers hourly logs, one of the largest we
produce, (this one has about 1.2million rows in it for that hour, average
record length about 700 bytes, some creeping up to 4k.  Keep in mind, this
is one server in one cage out of 50 total).

I wrote a Perl script that opens a socket to the NetCat source port on the
agent, and buffers about 10 log recs and then sends them in batches of 10.
 I originally tried line-by-line, this was obviously super inefficient.  I
also attempted more (more on that below) to buffer but started dropping too
many events, i think it was causing buffering issues on the agent.  10
seemed to be the magic number for my setup.  I also started with a
FileChannel (recoverable), and a simple file_roll sink so i could verify
the output files.

I ended up having some troubles getting the FileChannel started.  I ended
up getting it to start with some pretty narrow parameters which caused my
flow to be very slow.  When I tried to set higher numbers in capacity, it
would either not start up, or start but nothing would flow.   I ended up
moving to a memory channel just to get my proof of concept moving, and to
get a test of the framework first.  Also, since we're a java shop, we're
not opposed to the idea of writing custom sources/sinks/channels where need
be, assuming the framework is sound.

After some heavy tuning, I was able to get something that worked well, and
performed very well.  I was eventually able to get 200k per second of these
log events through.

To cut to the chase, Here's some issues I had;

1. Data loss (this was brought up in another thread).  About every other
time (a little less, 40%) I would run the exact same test, it would drop a
very small number of events, 10 or less (out of about 500k events).  Other
times it would pass every event through without issue.
2. Looking at my tunings, I was able to get about 60k per second on a
single flume instance with the above mentioned tuning.  I decided to crank
everything up (double it, even tried then doubling that once more).  This
machine has 48gb and is doing nothing but this.  So logically, I figured I
could bump my OPTS to 10g instead of 5, and up my channel capacity to 8g.
 Allowing me to buffer more and in theory double my throughput.  This
wasn't at all the case, by attempting to throw more at it (either by
lowering my sleep times between batches, or even using the same sleep times
but double my batch size from 10lines to 20, things started flaking out).
 Basically, after about 200k lines went through, it just stopped
processing, no warnings, errors nothing.
Here's where it gets interesting though.  I then setup four flume agents on
the same machine with all the same configurations and startup params back
at the 4g range, all listening on different ports.  I started all 4, and
then in parallel (on another machine), ran my test script to hit all four
agents.  That's when I was able to get 200k through.  So by running four of
them with lower tunables, I was able to get the throughput I couldn't get
running one with 4x the tunables and startup options.


Number 2 is something i can easily live with but would like to hear some
insight on maybe what's causing it.  Obviously the disks can keep up
because all of the file_roll paths for all 4 agents are using the same
drive.  And obviously I have the ram to buffer accordingly.  But for some
reason, one agent with 2x or even 4x the juice starts getting flaky.

Number 1 is more concerning, this obviously will need to be solved.

In summary, I'm willing and ready to spend more time on this.  But wanted
to get some insight from the pros, developers here and also make sure I'm
not crazy and maybe just trying to use this for more than it was designed.

Many thanks for anyone that stuck around to read this! :)


 Cheers

-- 
Steve Johnson
steve@webninja.com

Mime
View raw message