上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程:

线上hdfs sink的几个重要设置

1
2
3
4
5
6
7
8
hdfs.path = hdfs:
//xxxxx/%{logtypename}/%Y%m%d/%H:
hdfs.rollInterval = 
60
hdfs.rollSize = 
0 
//想让文件只根据实际来roll
hdfs.rollCount = 
0
hdfs.batchSize = 
2000
hdfs.txnEventMax = 
2000
hdfs.fileType = DataStream 
hdfs.writeFormat = Text

这里说下和类相关的hdfs.fileType和hdfs.writeFormat,一个定义了文件流式用的类,一个定义了具体的数据序列化的类.

1)hdfs.fileType 有3个可选项:SequenceFile/DataStream/CompressedStream,DataStream可以想象成hdfs的textfile,默认是SequenceFileType,CompressedStream是用于压缩时设置
2)hdfs.writeFormat 定义了3种序列化方法,TEXT只写Event的body部分,HEADER_AND_TEXT写Event的body和header,AVRO_EVENT是avro的序列化方式
上面的设置,其数据写入流程大概如下:

1
SinkRunner.process->SinkProcessor.process->HDFSEventSink.process->HDFSEventSink.append->BucketWriter.append->HDFSWriter.append->HDFSDataStream.append->BodyTextEventSerializer.write->java.io.OutputStream.write

简单说下:

在HDFSEventSink中会实例化BucketWriter和HDFSWriter:

1
2
3
4
5
6
7
        
if 
(bucketWriter == 
null
) {
          
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType ); 
//获取HDFSWriter 对象
....
          
bucketWriter = 
new 
BucketWriter(rollInterval , rollSize , rollCount ,
              
batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
              
suffix, codeC, compType, hdfsWriter, timedRollerPool,
              
proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath); 
//根据HDFSWriter 对象获取BucketWriter对象

这里获取HDFSWriter 对象时用到了org.apache.flume.sink.hdfs.HDFSWriterFactory的getWriter方法,根据hdfs.fileType的设置会返回具体的org.apache.flume.sink.hdfs.HDFSWriter实现类的对象

目前只支持3种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  
static 
final 
String SequenceFileType = 
"SequenceFile" 
;
  
static 
final 
String DataStreamType = 
"DataStream" 
;
  
static 
final 
String CompStreamType = 
"CompressedStream" 
;
....
  
public 
HDFSWriter getWriter(String fileType) 
throws 
IOException {
    
if 
(fileType.equalsIgnoreCase( SequenceFileType)) { 
//SequenceFile,sequencefile
      
return 
new 
HDFSSequenceFile();
    
else 
if 
(fileType.equalsIgnoreCase(DataStreamType)) { 
//DataStream
      
return 
new 
HDFSDataStream();
    
else 
if 
(fileType.equalsIgnoreCase(CompStreamType)) { 
//CompressedStream
      
return 
new 
HDFSCompressedDataStream();
    
else 
{
      
throw 
new 
IOException(
"File type " 
+ fileType + 
" not supported"
);
    
}

BucketWriter可以理解成是对下层数据操作的一个封装,比如数据写入时其实调用了其append方法,append主要有下面几个步骤:

1)首先判断文件是否打开:

1
2
3
4
5
6
7
    
if 
(! isOpen) {
      
if
(idleClosed) {
        
throw 
new 
IOException(
"This bucket writer was closed due to idling and this handle " 
+
            
"is thus no longer valid"
);
      
}
      
open(); 
//如果没有打开,则调用open->doOpen->HDFSWriter.open方法打开bucketPath (bucketPath是临时写入目录,比如tmp结尾的目录,targetPath是最终目录)
    
}

doOpen的主要步骤

a.设置两个文件名:

1
2
3
        
bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
          
+ fullFileName + inUseSuffix;
        
targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;

b.调用HDFSWriter.open方法打开bucketPath

1
2
3
4
5
6
7
8
9
10
11
12
         
if 
(codeC == 
null
) {
          
// Need to get reference to FS using above config before underlying
          
// writer does in order to avoid shutdown hook & IllegalStateExceptions
          
fileSystem = 
new 
Path(bucketPath ).getFileSystem(config);
          
LOG.info(
"Creating " 
+ bucketPath );
          
writer.open( bucketPath);
        
else 
{
          
// need to get reference to FS before writer does to avoid shutdown hook
          
fileSystem = 
new 
Path(bucketPath ).getFileSystem(config);
          
LOG.info(
"Creating " 
+ bucketPath );
          
writer.open( bucketPath, codeC , compType );
        
}

c.如果设置了rollInterval ,则执行计划任务调用close方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    
// if time-based rolling is enabled, schedule the roll
    
if 
(rollInterval > 
0
) {
      
Callable<Void> action = 
new 
Callable<Void>() {
        
public 
Void call() 
throws 
Exception {
          
LOG.debug(
"Rolling file ({}): Roll scheduled after {} sec elapsed." 
,
              
bucketPath, rollInterval );
          
try 
{
            
close();
          
catch
(Throwable t) {
            
LOG.error(
"Unexpected error" 
, t);
          
}
          
return 
null 
;
        
}
      
};
      
timedRollFuture = timedRollerPool.schedule(action, rollInterval ,
          
TimeUnit. SECONDS);
    
}

2)判断文件是否需要翻转(达到hdfs.rollSize或者hdfs.rollCount设置):

1
2
3
4
5
    
// check if it's time to rotate the file
    
if 
(shouldRotate()) {
      
close(); 
//close调用flush+doClose,flush调用doFlush,doFlush调用HDFSWriter.sync方法把数据同步到hdfs中
      
open();
    
}

其中shouldRotate(基于数量和大小的roll方式):

1
2
3
4
5
6
7
8
9
10
11
12
  
private 
boolean 
shouldRotate() {
    
boolean 
doRotate = 
false
;
    
if 
(( rollCount > 
0
) && (rollCount <= eventCounter )) { 
//hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
      
LOG.debug( 
"rolling: rollCount: {}, events: {}" 
, rollCount , eventCounter );
      
doRotate = 
true
;
    
}
    
if 
(( rollSize > 
0
) && ( rollSize <= processSize)) { 
//hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
      
LOG.debug( 
"rolling: rollSize: {}, bytes: {}" 
, rollSize , processSize );
      
doRotate = 
true
;
    
}
    
return 
doRotate;
  
}

其中doClose主要的步骤

a.调用HDFSWriter.close方法
b.调用renameBucket方法把tmp文件命名为最终文件:

1
2
3
4
    
if 
(bucketPath != 
null 
&& fileSystem != 
null
) {
      
renameBucket(); 
// could block or throw IOException
      
fileSystem = 
null
;
    
}

其中renameBucket:

1
fileSystem.rename(srcPath, dstPath)

3)调用HDFSWriter.append方法写入Event

1
writer.append(event);

4) 更新计数器

1
2
3
4
    
// update statistics
    
processSize += event.getBody(). length;
    
eventCounter++;
    
batchCounter++;

5)判断是否需要flush(达到hdfs.batchSize的设置),batch写入数据到hdfs

1
2
3
    
if 
(batchCounter == batchSize) {
      
flush();
    
}

Event写入时BucketWriter的append方法调用org.apache.flume.sink.hdfs.HDFSWriter实现类的append方法,比如这里的HDFSDataStream类,HDFSDataStream的主要方法:

configure用于设置serializer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  
public 
void 
configure(Context context) {
    
serializerType = context.getString( 
"serializer"
"TEXT" 
); 
//默认序列化方式为TEXT
    
useRawLocalFileSystem = context.getBoolean( 
"hdfs.useRawLocalFileSystem"
,
        
false
);
    
serializerContext =
        
new 
Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
    
logger.info( 
"Serializer = " 
+ serializerType + 
", UseRawLocalFileSystem = "
        
+ useRawLocalFileSystem);
  
}
append方法用于Event的写入,调用EventSerializer.write方法:
  
public 
void 
append(Event e) 
throws 
IOException {
    
// shun flumeformatter...
    
serializer.write(e); 
//调用EventSerializer.write方法写入Event
  
}

open方法主要步骤:

1)根据hdfs.append.support的设置(默认为false)打开或者新建文件

1
2
3
4
5
6
7
8
    
boolean 
appending = 
false
;
    
if 
(conf.getBoolean( 
"hdfs.append.support"
false 
) == 
true 
&& hdfs.isFile
            
(dstPath)) { 
//默认hdfs.append.support为false
      
outStream = hdfs.append(dstPath);
      
appending = 
true
;
    
else 
{
      
outStream = hdfs.create(dstPath); 
//如果不支持append,则创建文件
    
}

2)使用EventSerializerFactory.getInstance方法创建EventSerializer的对象

1
2
    
serializer = EventSerializerFactory.getInstance(
        
serializerType, serializerContext , outStream ); 
//实例化EventSerializer对象

3)如果EventSerializer对象支持reopen,并且hdfs.append.support设置为true时会抛出异常

1
2
3
4
5
6
    
if 
(appending && ! serializer.supportsReopen()) {
      
outStream.close();
      
serializer = 
null
;
      
throw 
new 
IOException(
"serializer (" 
+ serializerType +
          
") does not support append"
);
    
}

4)调用文件打开或者reopen之后的操作

1
2
3
4
5
6
    
if 
(appending) {
      
serializer.afterReopen();
    
else 
{
      
serializer.afterCreate();
    
}
  
}

这里hdfs.writeFormat的3种设置和对应的类:

1
2
3
  
TEXT(BodyTextEventSerializer.Builder. 
class
), 
//支持reopen
  
HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder. 
class
), 
//支持reopen
  
AVRO_EVENT(FlumeEventAvroEventSerializer.Builder. 
class
), 
// 不支持reopen

默认设置为TEXT,即BodyTextEventSerializer类:

1
2
3
4
5
6
7
8
9
10
  
private 
BodyTextEventSerializer(OutputStream out, Context ctx) { 
//构造方法
    
this
. appendNewline = ctx.getBoolean(APPEND_NEWLINE , APPEND_NEWLINE_DFLT ); 
//默认为true
    
this
. out = out;
  
}
....
  
public 
void 
write(Event e) 
throws 
IOException { 
//write方法
    
out.write(e.getBody()); 
//java.io.OutputStream.write,只写Event的body
    
if 
(appendNewline) { 
//每一行之后增加一个回车
      
out.write(
'\n'
);
    
}