上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程:
线上hdfs sink的几个重要设置 1 2 3 4 5 6 7 8 | hdfs.path = hdfs: //xxxxx/%{logtypename}/%Y%m%d/%H: hdfs.rollInterval = 60 hdfs.rollSize = 0 //想让文件只根据实际来roll hdfs.rollCount = 0 hdfs.batchSize = 2000 hdfs.txnEventMax = 2000 hdfs.fileType = DataStream hdfs.writeFormat = Text |
这里说下和类相关的hdfs.fileType和hdfs.writeFormat,一个定义了文件流式用的类,一个定义了具体的数据序列化的类.
1)hdfs.fileType 有3个可选项:SequenceFile/DataStream/CompressedStream,DataStream可以想象成hdfs的textfile,默认是SequenceFileType,CompressedStream是用于压缩时设置 2)hdfs.writeFormat 定义了3种序列化方法,TEXT只写Event的body部分,HEADER_AND_TEXT写Event的body和header,AVRO_EVENT是avro的序列化方式 上面的设置,其数据写入流程大概如下: 1 | SinkRunner.process->SinkProcessor.process->HDFSEventSink.process->HDFSEventSink.append->BucketWriter.append->HDFSWriter.append->HDFSDataStream.append->BodyTextEventSerializer.write->java.io.OutputStream.write |
简单说下:
在HDFSEventSink中会实例化BucketWriter和HDFSWriter: 1 2 3 4 5 6 7 | if (bucketWriter == null ) { HDFSWriter hdfsWriter = writerFactory.getWriter(fileType ); //获取HDFSWriter 对象 .... bucketWriter = new BucketWriter(rollInterval , rollSize , rollCount , batchSize, context , realPath, realName, inUsePrefix, inUseSuffix, suffix, codeC, compType, hdfsWriter, timedRollerPool, proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath); //根据HDFSWriter 对象获取BucketWriter对象 |
这里获取HDFSWriter 对象时用到了org.apache.flume.sink.hdfs.HDFSWriterFactory的getWriter方法,根据hdfs.fileType的设置会返回具体的org.apache.flume.sink.hdfs.HDFSWriter实现类的对象
目前只支持3种 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static final String SequenceFileType = "SequenceFile" ; static final String DataStreamType = "DataStream" ; static final String CompStreamType = "CompressedStream" ; .... public HDFSWriter getWriter(String fileType) throws IOException { if (fileType.equalsIgnoreCase( SequenceFileType)) { //SequenceFile,sequencefile return new HDFSSequenceFile(); } else if (fileType.equalsIgnoreCase(DataStreamType)) { //DataStream return new HDFSDataStream(); } else if (fileType.equalsIgnoreCase(CompStreamType)) { //CompressedStream return new HDFSCompressedDataStream(); } else { throw new IOException( "File type " + fileType + " not supported" ); } |
BucketWriter可以理解成是对下层数据操作的一个封装,比如数据写入时其实调用了其append方法,append主要有下面几个步骤:
1)首先判断文件是否打开: 1 2 3 4 5 6 7 | if (! isOpen) { if (idleClosed) { throw new IOException( "This bucket writer was closed due to idling and this handle " + "is thus no longer valid" ); } open(); //如果没有打开,则调用open->doOpen->HDFSWriter.open方法打开bucketPath (bucketPath是临时写入目录,比如tmp结尾的目录,targetPath是最终目录) } |
doOpen的主要步骤
a.设置两个文件名: 1 2 3 | bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix + fullFileName + inUseSuffix; targetPath = filePath + DIRECTORY_DELIMITER + fullFileName; |
b.调用HDFSWriter.open方法打开bucketPath
1 2 3 4 5 6 7 8 9 10 11 12 | if (codeC == null ) { // Need to get reference to FS using above config before underlying // writer does in order to avoid shutdown hook & IllegalStateExceptions fileSystem = new Path(bucketPath ).getFileSystem(config); LOG.info( "Creating " + bucketPath ); writer.open( bucketPath); } else { // need to get reference to FS before writer does to avoid shutdown hook fileSystem = new Path(bucketPath ).getFileSystem(config); LOG.info( "Creating " + bucketPath ); writer.open( bucketPath, codeC , compType ); } |
c.如果设置了rollInterval ,则执行计划任务调用close方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // if time-based rolling is enabled, schedule the roll if (rollInterval > 0 ) { Callable<Void> action = new Callable<Void>() { public Void call() throws Exception { LOG.debug( "Rolling file ({}): Roll scheduled after {} sec elapsed." , bucketPath, rollInterval ); try { close(); } catch (Throwable t) { LOG.error( "Unexpected error" , t); } return null ; } }; timedRollFuture = timedRollerPool.schedule(action, rollInterval , TimeUnit. SECONDS); } |
2)判断文件是否需要翻转(达到hdfs.rollSize或者hdfs.rollCount设置):
1 2 3 4 5 | // check if it's time to rotate the file if (shouldRotate()) { close(); //close调用flush+doClose,flush调用doFlush,doFlush调用HDFSWriter.sync方法把数据同步到hdfs中 open(); } |
其中shouldRotate(基于数量和大小的roll方式):
1 2 3 4 5 6 7 8 9 10 11 12 | private boolean shouldRotate() { boolean doRotate = false ; if (( rollCount > 0 ) && (rollCount <= eventCounter )) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true LOG.debug( "rolling: rollCount: {}, events: {}" , rollCount , eventCounter ); doRotate = true ; } if (( rollSize > 0 ) && ( rollSize <= processSize)) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true LOG.debug( "rolling: rollSize: {}, bytes: {}" , rollSize , processSize ); doRotate = true ; } return doRotate; } |
其中doClose主要的步骤
a.调用HDFSWriter.close方法 b.调用renameBucket方法把tmp文件命名为最终文件: 1 2 3 4 | if (bucketPath != null && fileSystem != null ) { renameBucket(); // could block or throw IOException fileSystem = null ; } |
其中renameBucket:
1 | fileSystem.rename(srcPath, dstPath) |
3)调用HDFSWriter.append方法写入Event
1 | writer.append(event); |
4) 更新计数器
1 2 3 4 | // update statistics processSize += event.getBody(). length; eventCounter++; batchCounter++; |
5)判断是否需要flush(达到hdfs.batchSize的设置),batch写入数据到hdfs
1 2 3 | if (batchCounter == batchSize) { flush(); } |
Event写入时BucketWriter的append方法调用org.apache.flume.sink.hdfs.HDFSWriter实现类的append方法,比如这里的HDFSDataStream类,HDFSDataStream的主要方法:
configure用于设置serializer: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public void configure(Context context) { serializerType = context.getString( "serializer" , "TEXT" ); //默认序列化方式为TEXT useRawLocalFileSystem = context.getBoolean( "hdfs.useRawLocalFileSystem" , false ); serializerContext = new Context(context.getSubProperties(EventSerializer.CTX_PREFIX)); logger.info( "Serializer = " + serializerType + ", UseRawLocalFileSystem = " + useRawLocalFileSystem); } append方法用于Event的写入,调用EventSerializer.write方法: public void append(Event e) throws IOException { // shun flumeformatter... serializer.write(e); //调用EventSerializer.write方法写入Event } |
open方法主要步骤:
1)根据hdfs.append.support的设置(默认为false)打开或者新建文件 1 2 3 4 5 6 7 8 | boolean appending = false ; if (conf.getBoolean( "hdfs.append.support" , false ) == true && hdfs.isFile (dstPath)) { //默认hdfs.append.support为false outStream = hdfs.append(dstPath); appending = true ; } else { outStream = hdfs.create(dstPath); //如果不支持append,则创建文件 } |
2)使用EventSerializerFactory.getInstance方法创建EventSerializer的对象
1 2 | serializer = EventSerializerFactory.getInstance( serializerType, serializerContext , outStream ); //实例化EventSerializer对象 |
3)如果EventSerializer对象支持reopen,并且hdfs.append.support设置为true时会抛出异常
1 2 3 4 5 6 | if (appending && ! serializer.supportsReopen()) { outStream.close(); serializer = null ; throw new IOException( "serializer (" + serializerType + ") does not support append" ); } |
4)调用文件打开或者reopen之后的操作
1 2 3 4 5 6 | if (appending) { serializer.afterReopen(); } else { serializer.afterCreate(); } } |
这里hdfs.writeFormat的3种设置和对应的类:
1 2 3 | TEXT(BodyTextEventSerializer.Builder. class ), //支持reopen HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder. class ), //支持reopen AVRO_EVENT(FlumeEventAvroEventSerializer.Builder. class ), // 不支持reopen |
默认设置为TEXT,即BodyTextEventSerializer类:
1 2 3 4 5 6 7 8 9 10 | private BodyTextEventSerializer(OutputStream out, Context ctx) { //构造方法 this . appendNewline = ctx.getBoolean(APPEND_NEWLINE , APPEND_NEWLINE_DFLT ); //默认为true this . out = out; } .... public void write(Event e) throws IOException { //write方法 out.write(e.getBody()); //java.io.OutputStream.write,只写Event的body if (appendNewline) { //每一行之后增加一个回车 out.write( '\n' ); } |