provides several utility methods to set the input and output configuration parameters to the job.

    When a Phoenix table is the source for the Map Reduce job, we can provide a SELECT query or pass a table name and specific columns to import data . To retrieve data from the table within the mapper class, we need to have a class that implements DBWritable and pass it as an argument to PhoenixMapReduceUtil.setInput method. The custom DBWritable class provides implementation for that allows us to retrieve columns for each row. This custom DBWritable class will form the input value to the mapper class.

    Similarly, when writing to a Phoenix table, we use the PhoenixMapReduceUtil.setOutput method to set the output table and the columns.

    The output key and value class for the job should always be NullWritable and the custom DBWritable class that implements the write method .

    a) stock

    b) stock_stats

    1. CREATE TABLE IF NOT EXISTS STOCK_STATS (STOCK_NAME VARCHAR NOT NULL , MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME));

    Sample Data

    Job Configuration

    1. final Configuration configuration = HBaseConfiguration.create();
    2. final Job job = Job.getInstance(configuration, "phoenix-mr-job");
    3.  
    4. // We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
    5. final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";
    6.  
    7. // StockWritable is the DBWritable class that enables us to process the Result of the above query
    8. PhoenixMapReduceUtil.setInput(job, StockWritable.class, "STOCK", selectQuery);
    9.  
    10. // Set the target Phoenix table and the columns
    11. PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");
    12.  
    13. job.setMapperClass(StockMapper.class);
    14. job.setOutputFormatClass(PhoenixOutputFormat.class);
    15.  
    16. job.setMapOutputKeyClass(Text.class);
    17. job.setMapOutputValueClass(DoubleWritable.class);
    18. job.setOutputKeyClass(NullWritable.class);
    19. job.setOutputValueClass(StockWritable.class);
    20. TableMapReduceUtil.addDependencyJars(job);
    21. job.waitForCompletion(true);

    Stock Mapper

    1. public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> {
    2.  
    3. private Text stock = new Text();
    4. private DoubleWritable price = new DoubleWritable ();
    5.  
    6. @Override
    7. protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException {
    8. final String stockName = stockWritable.getStockName();
    9. double maxPrice = Double.MIN_VALUE;
    10. for(double recording : recordings) {
    11. if(maxPrice < recording) {
    12. maxPrice = recording;
    13. }
    14. }
    15. stock.set(stockName);
    16. price.set(maxPrice);
    17. context.write(stock,price);
    18. }
    19.  
    20. }

    Stock Reducer

    Packaging & Running

    • Ensure phoenix-[version]-client.jar is in the classpath of your Map Reduce job jar.
    • To run the job, use the hadoop jar command with the necessary arguments.