Thanks Mohit for your prompt response. 

Sorry, I didn't explain the full configuration on my previous email.

So this is what I'm trying to do:

1) Read constant data flow from message queue and write them into HDFS.
2) Rolling is configured by intervals (1 hour) e.g  hdfs.rollinterval =3600
3) Write number of events into file before flushing into HDFS is set to 100 e.g hdfs.BatchSize=100
4) The appending configuration is enabled at lower level e.g =true.

Snippets from Flume source:

 if (conf.getBoolean("", false) == true && hdfs.isFile
            (dstPath)) {
      outStream = hdfs.append(dstPath);
    } else {
      outStream = hdfs.create(dstPath);

5) Now, all configurations for appending data into HDFS are in place.
6) I tested the flume and I could see a hdfs://test/data/input/event1.tmp file get written into HDFS.
7) When I hdfs dfs -cat hdfs://test/data/input/event1.tmp, I could see all data are being appended into the file e.g 500+ events.
8) However, when I executed a simple MR job to read folder hdfs://test/data/input (Please note I didn't pass the filename) , it only picked up the first 100 event, although it had over 500+ events.

So it would appear that Flume is in fact appending data into HDFS but MR job is failing to pick up everything, perhaps block caching issue or partition issue? Has anyone come across this issue? 

On 19 January 2015 at 13:49, Mohit Durgapal <> wrote:
But why do you want your MR Job to read from the .tmp file? .tmp means it is a temporary file i.e it's state is not specific(at least not to the  user) and hence you're not supposed to read from it. Your MR Job should only consider files that are not ending with .tmp. Also, there's very high probability that the MR Job will not find the same .tmp file when it actually tries to gather the file's contents from Namenode as flume removes the .tmp after flushing the contents completely to a file. In that case it will throw the file doesn't exist error. That's something I have faced. If you want files at shorter intervals then you can reduce the ...hdfs.rollcount or hdfs.rollinterval properties of your flume agent to have files generated more frequently but reading from the .tmp files seems like a bad idea to me.

Just in case if anyone's interested :

what we did was added a filter  for .tmp files by using a PathFilter like below:

public class InputFilter implements PathFilter {
    public boolean accept(Path p) {
        String name = p.getName();
        return !name.endsWith(".tmp");

And included it in the MR Job's driver code.


On Mon, Jan 19, 2015 at 6:59 PM, Raj Kumar <> wrote:
Hello guys!

I'm new to Flume and this group, so please be patience with me :-)

I have a Flume which stream data into HDFS sink (appends to same file), which I could "hdfs dfs -cat" and see it from HDFS. However, when I run MapReduce job on that file (.tmp), it only picks up the first batch that was flushed (bacthSize = 100) into HDFS. The rest are not being picked up, although I could cat and see the rest. When I execute the MapReduce job after the file is rolled(closed), it's picking up all data. 

Do you know why MR job is failing to find the rest of the batch even though it exists.

Best regards,