hadoop - Reading images from HDFS using mapreduce -
please me in code. trying reiad images hdfs. using wholefileinputformat. wholefilerecordreader. no compile time errors.but code giving runtime errors. output saying: cannot create instance of given class wholefileinputformat. have written code according comments on how read multiple image files input hdfs in map-reduce? please me in code.it contains 3 classes.how debug it? or other way?
import java.awt.image.bufferedimage; import java.io.bytearrayinputstream; import java.io.fileinputstream; import java.io.ioexception; import java.util.arraylist; import java.util.iterator; import java.util.list; import javax.imageio.imageio; import net.semanticmetadata.lire.imageanalysis.autocolorcorrelogram; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.nulloutputformat; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class map2 extends configured implements tool { public static class mapclass extends mapreducebase implements mapper<nullwritable, byteswritable, text, text> { private text input_image = new text(); private text input_vector = new text(); @override public void map(nullwritable key,byteswritable value, outputcollector<text, text> output, reporter reporter) throws ioexception { system.out.println("correlogramindex method:"); string featurestring; int maximum_distance = 16; autocolorcorrelogram.mode mode = autocolorcorrelogram.mode.fullneighbourhood; byte[] identifier=value.getbytes(); bufferedimage bimg = imageio.read(new bytearrayinputstream(identifier)); autocolorcorrelogram vd = new autocolorcorrelogram(maximum_distance, mode); vd.extract(bimg); featurestring = vd.getstringrepresentation(); double[] bytearray = vd.getdoublehistogram(); system.out.println("image: " + identifier + " " + featurestring); system.out.println(" ------------- "); input_image.set(identifier); input_vector.set(featurestring); output.collect(input_image, input_vector); } } public static class reduce extends mapreducebase implements reducer<text, text, text, text> { @override public void reduce(text key, iterator<text> values, outputcollector<text, text> output, reporter reporter) throws ioexception { string out_vector = ""; while (values.hasnext()) { out_vector += (values.next().tostring()); } output.collect(key, new text(out_vector)); } } static int printusage() { system.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>"); toolrunner.printgenericcommandusage(system.out); return -1; } @override public int run(string[] args) throws exception { jobconf conf = new jobconf(getconf(), map2.class); conf.setjobname("image_mapreduce"); conf.setinputformat(wholefileinputformat.class); conf.setoutputformat(nulloutputformat.class); conf.setoutputkeyclass(text.class); conf.setoutputvalueclass(text.class); conf.setmapperclass(mapclass.class); conf.setreducerclass(reduce.class); list<string> other_args = new arraylist<>(); (int = 0; < args.length; ++i) { try { switch (args[i]) { case "-m": conf.setnummaptasks(integer.parseint(args[++i])); break; case "-r": conf.setnumreducetasks(integer.parseint(args[++i])); break; default: other_args.add(args[i]); break; } } catch (numberformatexception except) { system.out.println("error: integer expected instead of " + args[i]); return printusage(); } catch (arrayindexoutofboundsexception except) { system.out.println("error: required parameter missing " + args[i - 1]); return printusage(); } } // make sure there 2 parameters left. if (other_args.size() != 2) { system.out.println("error: wrong number of parameters: " + other_args.size() + " instead of 2."); return printusage(); } fileinputformat.setinputpaths(conf, other_args.get(0)); fileoutputformat.setoutputpath(conf, new path(other_args.get(1))); jobclient.runjob(conf); return 0; } public static void main(string[] args) throws exception { int res = toolrunner.run(new configuration(), new map2(), args); system.exit(res); } } ----------------------------------------------------------------------------------- //wholefileinputformat import java.io.ioexception; import org.apache.hadoop.fs.path; import org.apache.hadoop.mapred.*; public class wholefileinputformat<nullwritable, byteswritable> extends fileinputformat<nullwritable, byteswritable> { // @override protected boolean issplitable(jobcontext context, path file) { return false; } //@override public wholefilerecordreader createrecordreader( inputsplit split, taskattemptcontext context) throws ioexception, interruptedexception { wholefilerecordreader reader = new wholefilerecordreader(); reader.initialize(split, context); return reader; } @override public recordreader<nullwritable, byteswritable> getrecordreader(inputsplit split, jobconf job, reporter reporter) throws ioexception; } ------------------------------------------------------------------------------- //wholeinputfilerecorder import java.io.ioexception; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.byteswritable; import org.apache.hadoop.io.ioutils; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.mapred.filesplit; import org.apache.hadoop.mapred.inputsplit; import org.apache.hadoop.mapred.recordreader; import org.apache.hadoop.mapred.taskattemptcontext; class wholefilerecordreader implements recordreader<nullwritable, byteswritable> { //recordreader private filesplit filesplit; private configuration conf; private byteswritable value = new byteswritable(); private boolean processed = false; public void initialize(inputsplit split, taskattemptcontext context) throws ioexception, interruptedexception { this.filesplit = (filesplit) split; this.conf = context.getjobconf(); } @override public boolean next(nullwritable k, byteswritable v) throws ioexception { if (!processed) { byte[] contents = new byte[(int) filesplit.getlength()]; path file = filesplit.getpath(); org.apache.hadoop.fs.filesystem fs = file.getfilesystem(conf); fsdatainputstream in = null; try { in = fs.open(file); ioutils.readfully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } { ioutils.closestream(in); } processed = true; return true; } return false; } @override public nullwritable createkey() { return nullwritable.get(); } @override public byteswritable createvalue() { return value; } @override public long getpos() throws ioexception { throw new unsupportedoperationexception("not supported yet."); } @override public void close() throws ioexception { throw new unsupportedoperationexception("not supported yet."); } @override public float getprogress() throws ioexception { throw new unsupportedoperationexception("not supported yet."); } }
wholefileinputformat defined abstract, how want create instance of it?
either make not abstract or subclass concrete implementation.
Comments
Post a Comment