복붙노트

[HADOOP] Mysql에서 Pig로 테이블 데이터를 읽는 방법

HADOOP

Mysql에서 Pig로 테이블 데이터를 읽는 방법

누구나 돼지가 DBStorage를 지원한다는 것을 알고 있습니다. 하지만 그것들은 Pig에서 mysql 로의로드 결과만을 지원한다.

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');

하지만 저에게 mysql에서 테이블을 읽는 방법을 보여주십시오.

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');

여기 내 코드가있다.

public class DBLoader extends LoadFunc {
    private final Log log = LogFactory.getLog(getClass());
    private ArrayList mProtoTuple = null;
    private Connection con;
    private String jdbcURL;
    private String user;
    private String pass;
    private int batchSize;
    private int count = 0;
    private String query;
    ResultSet result;
    protected TupleFactory mTupleFactory = TupleFactory.getInstance();

    public DBLoader() {
    }

    public DBLoader(String driver, String jdbcURL, String user, String pass,
            String query) {

        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            log.error("can't load DB driver:" + driver, e);
            throw new RuntimeException("Can't load DB Driver", e);
        }
        this.jdbcURL = jdbcURL;
        this.user = user;
        this.pass = pass;
        this.query = query;

    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        // TODO Auto-generated method stub
        return new TextInputFormat();
    }

    @Override
    public Tuple getNext() throws IOException {
        // TODO Auto-generated method stub
        boolean next = false;

        try {
            next = result.next();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        if (!next)
            return null;
        int numColumns = 0;
        // Get result set meta data
        ResultSetMetaData rsmd;
        try {
            rsmd = result.getMetaData();
            numColumns = rsmd.getColumnCount();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        for (int i = 0; i < numColumns; i++) {

            try {
                Object field = result.getObject(i);

                switch (DataType.findType(field)) {
                case DataType.NULL:

                    mProtoTuple.add(null);

                    break;

                case DataType.BOOLEAN:
                    mProtoTuple.add((Boolean) field);

                    break;

                case DataType.INTEGER:
                    mProtoTuple.add((Integer) field);

                    break;

                case DataType.LONG:
                    mProtoTuple.add((Long) field);

                    break;

                case DataType.FLOAT:
                    mProtoTuple.add((Float) field);

                    break;

                case DataType.DOUBLE:
                    mProtoTuple.add((Double) field);

                    break;

                case DataType.BYTEARRAY:
                    byte[] b = ((DataByteArray) field).get();
                    mProtoTuple.add(b);

                    break;
                case DataType.CHARARRAY:
                    mProtoTuple.add((String) field);

                    break;
                case DataType.BYTE:
                    mProtoTuple.add((Byte) field);

                    break;

                case DataType.MAP:
                case DataType.TUPLE:
                case DataType.BAG:
                    throw new RuntimeException("Cannot store a non-flat tuple "
                            + "using DbStorage");

                default:
                    throw new RuntimeException("Unknown datatype "
                            + DataType.findType(field));

                }

            } catch (Exception ee) {
                throw new RuntimeException(ee);
            }
        }

        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;

    }

    @Override
    public void prepareToRead(RecordReader arg0, PigSplit arg1)
            throws IOException {

        con = null;
        if (query == null) {
            throw new IOException("SQL Insert command not specified");
        }
        try {
            if (user == null || pass == null) {
                con = DriverManager.getConnection(jdbcURL);
            } else {
                con = DriverManager.getConnection(jdbcURL, user, pass);
            }
            con.setAutoCommit(false);
            result = con.createStatement().executeQuery(query);
        } catch (SQLException e) {
            log.error("Unable to connect to JDBC @" + jdbcURL);
            throw new IOException("JDBC Error", e);
        }
        count = 0;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        // TODO Auto-generated method stub

        //TextInputFormat.setInputPaths(job, location);

    }

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{

        @Override
        public RecordReader<NullWritable, NullWritable> createRecordReader(
                InputSplit arg0, TaskAttemptContext arg1) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public List<InputSplit> getSplits(JobContext arg0) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

    }

}

나는 UDF를 쓰기 위해 여러 번 노력하지만 성공하지는 않습니다 .....

해결법

  1. ==============================

    1.DBStorage는 결과를 데이터베이스에 저장하는 기능 만 지원합니다.

    DBStorage는 결과를 데이터베이스에 저장하는 기능 만 지원합니다.

    MySQL에서 데이터를로드하려면 sqoop (데이터베이스에서 HDFS로 데이터를 복사하는) 프로젝트를 살펴 보거나 mysql 덤프를 수행 한 다음 HDFS로 파일을 복사 할 수 있습니다. 두 가지 방법 모두 약간의 상호 작용이 필요하며 돼지 내부에서 직접 사용할 수 없습니다.

    세 번째 옵션은 Pig LoadFunc를 작성하는 것입니다 (UDF를 작성하려고했습니다). 너무 어렵지 않아야합니다. DBStorage (드라이버, 연결 자격 증명 및 실행될 SQL 쿼리)와 동일한 옵션을 전달해야하며 일부 결과 집합 메타 데이터 검사를 사용하여 자동으로 스키마를 생성 할 수도 있습니다.

  2. from https://stackoverflow.com/questions/10942739/a-way-to-read-table-data-from-mysql-to-pig by cc-by-sa and MIT license