Ví dụ tuyệt vời được hiển thị trên blog này , Tôi đã thử nó và nó diễn ra rất tốt. Tôi trích dẫn những phần quan trọng nhất của mã.
Đầu tiên, bạn phải tạo một lớp đại diện cho dữ liệu mà bạn muốn lưu trữ. Lớp phải triển khai giao diện DBWille:
public class DBOutputWritable implements Writable, DBWritable
{
private String name;
private int count;
public DBOutputWritable(String name, int count) {
this.name = name;
this.count = count;
}
public void readFields(DataInput in) throws IOException { }
public void readFields(ResultSet rs) throws SQLException {
name = rs.getString(1);
count = rs.getInt(2);
}
public void write(DataOutput out) throws IOException { }
public void write(PreparedStatement ps) throws SQLException {
ps.setString(1, name);
ps.setInt(2, count);
}
}
Tạo các đối tượng của lớp đã xác định trước đó trong Bộ giảm tốc của bạn:
public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
int sum = 0;
for(IntWritable value : values) {
sum += value.get();
}
try {
ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get());
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
Cuối cùng, bạn phải định cấu hình kết nối với DB của mình (đừng quên thêm trình kết nối db của bạn trên classpath) và đăng ký các kiểu dữ liệu đầu vào / đầu ra của trình liên kết và trình giảm tốc của bạn.
public class Main
{
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf,
"com.mysql.jdbc.Driver", // driver class
"jdbc:mysql://localhost:3306/testDb", // db url
"user", // username
"password"); //password
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(Map.class); // your mapper - not shown in this example
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example
job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example
job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT
job.setOutputValueClass(NullWritable.class); // reducer's VALUEOUT
job.setInputFormatClass(...);
job.setOutputFormatClass(DBOutputFormat.class);
DBInputFormat.setInput(...);
DBOutputFormat.setOutput(
job,
"output", // output table name
new String[] { "name", "count" } //table columns
);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}