网站图片用什么格式上海短视频推广
要求
多个小文件合并,要求将文件合并到SequenceFile中
SequenceFile对外是一个整体,对内还是一个个的文件
期望结果是:
- key:每一个小文件的带路径的文件名
- value:每一个小文件的文件内容
第一步:自定义RecordReader类
public class FileCombineRecordReader extends RecordReader<Text, BytesWritable> {//每一个切片(小文件)调用一次这个类private FileSplit split;private Configuration cfg;private boolean isProcess = false;private Text key = new Text();private BytesWritable value = new BytesWritable();@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext Context) {this.split = (FileSplit) inputSplit;cfg = Context.getConfiguration();}@Override//核心业务逻辑public boolean nextKeyValue() throws IOException {//一次读取一个完整的文件并封装到KV中if (!isProcess) {byte[] buf = new byte[(int) split.getLength()]; //1.根据切片长度定义缓冲区Path path = split.getPath();//2.获得路径FileSystem fs = path.getFileSystem(cfg); //3.通过路径获得文件系统FSDataInputStream fis = fs.open(path); //4.通过文件系统获得输入流IOUtils.readFully(fis, buf, 0, buf.length); //5.拷贝流key.set(split.getPath().toString());//设置key值为文件的路径+名称value.set(buf, 0, buf.length);//将buf中的内容输出到value中IOUtils.closeStream(fis);IOUtils.closeStream(fs);//6.关闭流isProcess = true;//读完之后结束return true;}return false;}@Overridepublic Text getCurrentKey() {//获取当前的keyreturn key;}@Overridepublic BytesWritable getCurrentValue() {//获取当前的valuereturn value;}@Overridepublic float getProgress() {//获取正在处理的进度return 0;}@Overridepublic void close() {}
}
第二步:自定义InputFromat
public class FileCombineInputFormat extends FileInputFormat<Text, BytesWritable> {@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false; //原文件不可切割}@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) {FileCombineRecordReader recordReader = new FileCombineRecordReader();//自定义RecordReader对象并初始化recordReader.initialize(split,context);return recordReader;}
}
第三步:编写Mapper类
public class FileCombineMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);}
}
第四步:编写Reducer类
public class FileCombineReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {//循环写出for(BytesWritable value : values){context.write(key, value);}}
}
第五步:编写SequenceFileDriver类
public class FileCombineDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 数据输入路径和输出路径args = new String[2];args[0] = "src/main/resources/aai/";args[1] = "src/main/resources/aao";Configuration cfg = new Configuration();//设置本地模式运行(即使项目类路径下core-site.xml文件,依然采用本地模式)cfg.set("mapreduce.framework.name", "local");cfg.set("fs.defaultFS", "file:///");Job job = Job.getInstance(cfg);job.setJarByClass(FileCombineDriver.class);job.setMapperClass(FileCombineMapper.class);job.setReducerClass(FileCombineReducer.class);//设置inputFormat为自定义的FileCombileInputFormatjob.setInputFormatClass(FileCombineInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);//设置输出的outputFormatjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean b = job.waitForCompletion(true);System.out.println(b);}
}