如何进行数据库中间件MyCAT源码分析

这篇文章将为大家详细讲解有关如何进行数据库中间件 MyCAT 源码分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

创新互联专注于企业网络营销推广、网站重做改版、宁化网站定制设计、自适应品牌网站建设、H5场景定制商城网站建设、集团公司官网建设、外贸营销网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为宁化等各大城市提供网站开发制作服务。

1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。

吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识

  2. 查询操作

  3. 插入操作

2. 主流程

如何进行数据库中间件 MyCAT 源码分析

MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB  Server。

MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL  Client。

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。

如何进行数据库中间件 MyCAT 源码分析

Java数据库连接,(Java Database  Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun  Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB  的访问。可能这样说法有些抽象,看个类图压压惊。

如何进行数据库中间件 MyCAT 源码分析

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;

如何进行数据库中间件 MyCAT 源码分析

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

1、查询 MongoDB

// MongoSQLParser.java public MongoData query() throws MongoSQLException {    if (!(statement instanceof SQLSelectStatement)) {        //return null;        throw new IllegalArgumentException("not a query sql statement");    }    MongoData mongo = new MongoData();    DBCursor c = null;    SQLSelectStatement selectStmt = (SQLSelectStatement) statement;    SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();    int icount = 0;    if (sqlSelectQuery instanceof MySqlSelectQueryBlock) {        MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();         BasicDBObject fields = new BasicDBObject();         // 显示(返回)的字段        for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) {            //System.out.println(item.toString());            if (!(item.getExpr() instanceof SQLAllColumnExpr)) {                if (item.getExpr() instanceof SQLAggregateExpr) {                    SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();                    if (expr.getMethodName().equals("COUNT")) { // TODO 待读:count(*)                        icount = 1;                        mongo.setField(getExprFieldName(expr), Types.BIGINT);                    }                    fields.put(getExprFieldName(expr), 1);                } else {                    fields.put(getFieldName(item), 1);                }            }         }         // 表名        SQLTableSource table = mysqlSelectQuery.getFrom();        DBCollection coll = this._db.getCollection(table.toString());        mongo.setTable(table.toString());         // WHERE        SQLExpr expr = mysqlSelectQuery.getWhere();        DBObject query = parserWhere(expr);         // GROUP BY        SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();        BasicDBObject gbkey = new BasicDBObject();        if (groupby != null) {            for (SQLExpr gbexpr : groupby.getItems()) {                if (gbexpr instanceof SQLIdentifierExpr) {                    String name = ((SQLIdentifierExpr) gbexpr).getName();                    gbkey.put(name, Integer.valueOf(1));                }            }            icount = 2;        }         // SKIP / LIMIT        int limitoff = 0;        int limitnum = 0;        if (mysqlSelectQuery.getLimit() != null) {            limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());            limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());        }        if (icount == 1) { // COUNT(*)            mongo.setCount(coll.count(query));        } else if (icount == 2) { // MapReduce            BasicDBObject initial = new BasicDBObject();            initial.put("num", 0);            String reduce = "function (obj, prev) { " + "  prev.num++}";            mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));        } else {            if ((limitoff > 0) || (limitnum > 0)) {                c = coll.find(query, fields).skip(limitoff).limit(limitnum);            } else {                c = coll.find(query, fields);            }             // order by            SQLOrderBy orderby = mysqlSelectQuery.getOrderBy();            if (orderby != null) {                BasicDBObject order = new BasicDBObject();                for (int i = 0; i < orderby.getItems().size(); i++) {                    SQLSelectOrderByItem orderitem = orderby.getItems().get(i);                    order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));                }                c.sort(order);                // System.out.println(order);            }        }        mongo.setCursor(c);    }    return mongo; }

2、查询条件

// MongoSQLParser.java private void parserWhere(SQLExpr aexpr, BasicDBObject o) {    if (aexpr instanceof SQLBinaryOpExpr) {        SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;        SQLExpr exprL = expr.getLeft();        if (!(exprL instanceof SQLBinaryOpExpr)) {            if (expr.getOperator().getName().equals("=")) {                o.put(exprL.toString(), getExpValue(expr.getRight()));            } else {                String op = "";                if (expr.getOperator().getName().equals("<")) {                    op = "$lt";                } else if (expr.getOperator().getName().equals("<=")) {                    op = "$lte";                } else if (expr.getOperator().getName().equals(">")) {                    op = "$gt";                } else if (expr.getOperator().getName().equals(">=")) {                    op = "$gte";                } else if (expr.getOperator().getName().equals("!=")) {                    op = "$ne";                } else if (expr.getOperator().getName().equals("<>")) {                    op = "$ne";                }                parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));            }        } else {            if (expr.getOperator().getName().equals("AND")) {                parserWhere(exprL, o);                parserWhere(expr.getRight(), o);            } else if (expr.getOperator().getName().equals("OR")) {                orWhere(exprL, expr.getRight(), o);            } else {                throw new RuntimeException("Can't identify the operation of  of where");            }        }    } }  private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) {    BasicDBObject xo = new BasicDBObject();    BasicDBObject yo = new BasicDBObject();    parserWhere(exprL, xo);    parserWhere(exprR, yo);    ob.put("$or", new Object[]{xo, yo}); }

3、解析 MongoDB 数据

// MongoResultSet.java public MongoResultSet(MongoData mongo, String schema) throws SQLException {    this._cursor = mongo.getCursor();    this._schema = schema;    this._table = mongo.getTable();    this.isSum = mongo.getCount() > 0;    this._sum = mongo.getCount();    this.isGroupBy = mongo.getType();     if (this.isGroupBy) {        dblist = mongo.getGrouyBys();        this.isSum = true;    }    if (this._cursor != null) {        select = _cursor.getKeysWanted().keySet().toArray(new String[0]);        // 解析 fields        if (this._cursor.hasNext()) {            _cur = _cursor.next();            if (_cur != null) {                if (select.length == 0) {                    SetFields(_cur.keySet());                }                _row = 1;            }        }        // 设置 fields 类型        if (select.length == 0) {            select = new String[]{"_id"};            SetFieldType(true);        } else {            SetFieldType(false);        }    } else {        SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};        SetFieldType(mongo.getFields());    } }
  • 当使用 SELECT * 查询字段时,fields 使用***条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

4、返回数据给 MySQL Client

// JDBCConnection.java private void ouputResultSet(ServerConnection sc, String sql)        throws SQLException {    ResultSet rs = null;    Statement stmt = null;     try {        stmt = con.createStatement();        rs = stmt.executeQuery(sql);         // header        List fieldPks = new LinkedList<>();        ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);        int colunmCount = fieldPks.size();        ByteBuffer byteBuf = sc.allocate();        ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();        headerPkg.fieldCount = fieldPks.size();        headerPkg.packetId = ++packetId;        byteBuf = headerPkg.write(byteBuf, sc, true);        byteBuf.flip();        byte[] header = new byte[byteBuf.limit()];        byteBuf.get(header);        byteBuf.clear();        List fields = new ArrayList(fieldPks.size());        for (FieldPacket curField : fieldPks) {            curField.packetId = ++packetId;            byteBuf = curField.write(byteBuf, sc, false);            byteBuf.flip();            byte[] field = new byte[byteBuf.limit()];            byteBuf.get(field);            byteBuf.clear();            fields.add(field);        }        // header eof        EOFPacket eofPckg = new EOFPacket();        eofPckg.packetId = ++packetId;        byteBuf = eofPckg.write(byteBuf, sc, false);        byteBuf.flip();        byte[] eof = new byte[byteBuf.limit()];        byteBuf.get(eof);        byteBuf.clear();        this.respHandler.fieldEofResponse(header, fields, eof, this);         // row        while (rs.next()) {            RowDataPacket curRow = new RowDataPacket(colunmCount);            for (int i = 0; i < colunmCount; i++) {                int j = i + 1;                if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {                    curRow.add(rs.getBytes(j));                } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||                        fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte                    // ensure that do not use scientific notation format                    BigDecimal val = rs.getBigDecimal(j);                    curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset()));                } else {                    curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));                }            }            curRow.packetId = ++packetId;            byteBuf = curRow.write(byteBuf, sc, false);            byteBuf.flip();            byte[] row = new byte[byteBuf.limit()];            byteBuf.get(row);            byteBuf.clear();            this.respHandler.rowResponse(row, this);        }        fieldPks.clear();        // row eof        eofPckg = new EOFPacket();        eofPckg.packetId = ++packetId;        byteBuf = eofPckg.write(byteBuf, sc, false);        byteBuf.flip();        eof = new byte[byteBuf.limit()];        byteBuf.get(eof);        sc.recycle(byteBuf);        this.respHandler.rowEofResponse(eof, this);    } finally {        if (rs != null) {            try {                rs.close();            } catch (SQLException e) {            }        }        if (stmt != null) {            try {                stmt.close();            } catch (SQLException e) {            }        }    } }  // MongoResultSet.java @Override public String getString(String columnLabel) throws SQLException {    Object x = getObject(columnLabel);    if (x == null) {        return null;    }    return x.toString(); }

当返回字段值是 Object 时,返回该对象.toString()。例如:

mysql> select * from user order by _id asc; +--------------------------+------+-------------------------------+ | _id                      | name | profile                       | +--------------------------+------+-------------------------------+ | 1                        | 123  | { "age" : 1 , "height" : 100} |

4. 插入操作

如何进行数据库中间件 MyCAT 源码分析

// MongoSQLParser.java public int executeUpdate() throws MongoSQLException {    if (statement instanceof SQLInsertStatement) {        return InsertData((SQLInsertStatement) statement);    }    if (statement instanceof SQLUpdateStatement) {        return UpData((SQLUpdateStatement) statement);    }    if (statement instanceof SQLDropTableStatement) {        return dropTable((SQLDropTableStatement) statement);    }    if (statement instanceof SQLDeleteStatement) {        return DeleteDate((SQLDeleteStatement) statement);    }    if (statement instanceof SQLCreateTableStatement) {        return 1;    }    return 1; }  private int InsertData(SQLInsertStatement state) {    if (state.getValues().getValues().size() == 0) {        throw new RuntimeException("number of  columns error");    }    if (state.getValues().getValues().size() != state.getColumns().size()) {        throw new RuntimeException("number of values and columns have to match");    }    SQLTableSource table = state.getTableSource();    BasicDBObject o = new BasicDBObject();    int i = 0;    for (SQLExpr col : state.getColumns()) {        o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));        i++;    }    DBCollection coll = this._db.getCollection(table.toString());    coll.insert(o);    return 1; }

关于如何进行数据库中间件 MyCAT 源码分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


文章名称:如何进行数据库中间件MyCAT源码分析
网页URL:http://myzitong.com/article/gppddj.html