WideTableMultiDimSQLParser 解析说明:

2022-03-09  本文已影响0人  光剑书架上的书

WideTableMultiDimSQLParser 解析说明

1.ClickHouse 数组交并差运算

--交 t[1] ∩ t[2] : arrayIntersect(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                               t
         ) t;

--并 t[1] ∪ t[2]: arrayConcat(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                         t
         ) t;

--差 t[1]-t[2] : arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayIntersect(t[3], arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                                                                             t
         ) t;

--并
select length(arrayDistinct(t.res))
from (
         select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                         t
         ) t;


ClickHouse :

(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))

2.Hive 数组交并差运算:

select
    array_intersect(array(1, 2), array(2, 3)) i,
    array_union(array(1, 2), array(2, 3)) u,
    array_except(array(1, 2), array(2, 3)) e;

Hive:

(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))  

附源码


data class TagIdx(var kexprId: Int, var tagCode: String, var tagOptionCode: String, var conditionExpr: String, var index: Int)

fun isLeafNode(e: KunLunExpression) = CollectionUtils.isEmpty(e.subExpression)

fun tagOptionConditions(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): List<TagIdx> {
    val tagIdxList = mutableListOf<TagIdx>()
    //递归解析rule表达式,打平成过滤条件列表
    val kexpr: KunLunExpression = requestDTO.expression
    parseTagIdx(kexpr, tagIdxList, tableMappingMap)
    // 设置 index 字段值,用索引下标+1
    tagIdxList.forEachIndexed { index, tagIdx ->
        tagIdx.index = index + 1
    }
    return tagIdxList
}


fun parseTagIdx(kexpr: KunLunExpression, tagIdxList: MutableList<TagIdx>, tableMappingMap: Map<String, List<KTableMapping>>) {
    val fieldCondition = kexpr.fieldCondition
    if (null != fieldCondition) {

        val dimFilter = StringBuilder()
        // 维度过滤条件,每个标签 TableCode 上都有自己的维度.真正用于过滤的是 FieldCode,所以 fieldCondition 这里加上: tagDimCondition
        val dimConditionList = kexpr.fieldCondition.dimConditionList

        if (CollectionUtils.isEmpty(dimConditionList)) {
            dimFilter.append(" 1=1 ")
        } else {
            val lastIndex = dimConditionList.size - 1
            dimConditionList.forEachIndexed { index, dimField ->

                val dimTagCode = dimField.tableCode
                val dimFieldCode = dimField.fieldCode
                val dimKTableMapping = tableMappingMap[dimTagCode]!![0]
                val dimPhysicalField = dimKTableMapping.fields.first { it.srcField.columnCode == dimFieldCode }.dstField
                val dimPhysicalcolumnCode = dimPhysicalField.columnCode
                val dimFieldValueType = dimPhysicalField.fieldType
                val v = parseFieldValue(dimField, dimFieldValueType)
                val singleValue = v.get(0)?.sqlCondition

                if (index != lastIndex) {
                    dimFilter.append(" $dimPhysicalcolumnCode = $singleValue and ")
                } else {
                    dimFilter.append(" $dimPhysicalcolumnCode = $singleValue ")
                }
            }
        }

        val tagCode = fieldCondition.tableCode
        val fieldCode = fieldCondition.fieldCode
        val KTableMapping = tableMappingMap[tagCode]!![0]

        val physicalField = KTableMapping.fields.first { it.srcField.columnCode == fieldCode }.dstField
        val physicalcolumnCode = physicalField.columnCode
        val fieldValueType = physicalField.fieldType
        val targetFieldCode = KTableMapping.targetField.columnCode
        val dbName = KTableMapping.physicDBName
        val tableName = KTableMapping.getkTableCode()
        val filterConditionClause = genFilterConditionClause(fieldCondition, physicalcolumnCode, fieldValueType)

        val line = "select collect_set($targetFieldCode) from $dbName.$tableName where ( $dimFilter ) and ( $filterConditionClause )"
        val tagIdx = TagIdx(kexprId = kexpr.tfId, tagCode = tagCode, tagOptionCode = fieldCode, conditionExpr = line, index = -1) // index 先设置默认值 -1
        tagIdxList.add(tagIdx)
    }
    // 递归子语句
    kexpr.subExpression?.forEach {
        parseTagIdx(it, tagIdxList, tableMappingMap)
    }
}

fun genFilterConditionClause(fieldCondition: FieldCondition, physicalField: String, fieldValueType: KFieldValueType): String {
    val fv = parseFieldValue(fieldCondition, fieldValueType)
    if (CollectionUtils.isEmpty(fv)) {
        throw IllegalArgumentException("fieldCondition must have fieldValue!")
    }
    val size = fv.size
    // 多值(1,2,3,4)
    val listValue = StringBuilder()
    listValue.append("(")
    fv.forEachIndexed { index, fieldValue ->
        if (index == size - 1)
            listValue.append(fieldValue?.sqlCondition)
        else
            listValue.append(fieldValue?.sqlCondition).append(",")
    }
    listValue.append(")")
    // 单值
    val singleValue = fv.get(0)?.sqlCondition
    val singleValueNoQuote = fv.get(0)?.qlCondition

    var conditionExpr = ""
    conditionExpr = when (fieldCondition.operator) {
        ArithmeticOperatorEnum.LIKE -> "  like '%${singleValueNoQuote}%' "
        ArithmeticOperatorEnum.EQUAL -> "    = ${singleValue} "
        ArithmeticOperatorEnum.GREATER_EQUAL_THAN -> "    >= ${singleValue} "
        ArithmeticOperatorEnum.LESS_THAN -> "    < ${singleValue} "
        ArithmeticOperatorEnum.LESS_EQUAL_THAN -> "    <= ${singleValue} "
        ArithmeticOperatorEnum.GREATER_THAN -> "    > ${singleValue} "
        ArithmeticOperatorEnum.BETWEEN -> "    between ${fv.get(0)?.sqlCondition} and ${fv.get(1)?.sqlCondition} "
        ArithmeticOperatorEnum.IN -> "    in ${listValue} "
        ArithmeticOperatorEnum.NOT_IN -> "    not in ${listValue} "

        else -> throw IllegalStateException("${fieldCondition.operator} not supported yet")
    }

    return " $physicalField $conditionExpr "
}

/**
 * 解析 fieldValue 值
 */
fun parseFieldValue(fieldCondition: FieldCondition, fieldValueType: KFieldValueType): List<FieldValue<*>?> {
    val values = fieldCondition.values
    if (values == null || values.isEmpty()) {
        ExceptionHelper.bizError("illegal value size,values length must greater than 0.")
    }

    // 特征值类型
    lateinit var clazz: Class<out FieldValue<*>>
    when (fieldValueType) {
        KFieldValueType.STRING -> clazz = StringFieldValue::class.java
        KFieldValueType.LONG -> clazz = LongFieldValue::class.java
        KFieldValueType.DOUBLE -> clazz = DoubleFieldValue::class.java
        else -> ExceptionHelper.bizError("$fieldValueType fieldValueType not supported!")
    }
    return FieldValue.create(clazz, *values.toTypedArray())
}





/**
 * 递归遍历KunLun表达式,并添加tagCode/ objectSet.
 */
fun recurExtractTagCodeAndObjectSet(expression: KunLunExpression, tagBaseFieldList: MutableList<TagBaseField>, objectSetList: MutableList<String>) {

    // 子表达式为空,递归结束
    if (isLeafNode(expression)) {
        val fieldCondition = expression.fieldCondition

        // 添加分群
        if (StringUtils.isNotEmpty(fieldCondition.objectSetId)) {
            objectSetList.add(fieldCondition.objectSetId)
        } else {
            // 添加标签
            val tagBaseField = TagBaseField()
            tagBaseField.tableCode = fieldCondition.tableCode
            tagBaseField.fieldCode = fieldCondition.fieldCode
            tagBaseFieldList.add(tagBaseField)
        }
        return
    }

    // 递归遍历子节点
    for (subExpression in expression.subExpression) {
        recurExtractTagCodeAndObjectSet(subExpression, tagBaseFieldList, objectSetList)
    }
}

@Service
class CommonParseUtils {


    fun getTableMappingMap(tenant: Tenant, requestDTO: SQLQueryReqDTO): Map<String, List<KTableMapping>> {
        // 标签 & 分群
        val tagBaseFieldList: MutableList<TagBaseField> = mutableListOf()
        val objectSetList: MutableList<String> = mutableListOf()
        recurExtractTagCodeAndObjectSet(requestDTO.getExpression(), tagBaseFieldList, objectSetList)
        // META
        val tableMappingList: List<KTableMapping> = getTagCodeTableMapping(tenant.id, tagBaseFieldList, requestDTO.getDriverType())
        return tableMappingList.groupBy { it.tableCode }
    }

    /**
     * 获取KunLun表达式中所有标签对应物理表的映射关系.
     */
    fun getTagCodeTableMapping(tenantId: Long, tagBaseFieldList: List<TagBaseField>, driverType: DriverType): List<KTableMapping> {
        if (CollectionUtils.isEmpty(tagBaseFieldList)) {
            return emptyList()
        }

        // 获取映射关系
        // TODO 元数据: kTableMappings
        val kTableMappings: List<KTableMapping> = ArrayList()

        val tagCodeTableMapping = kTableMappings.stream().collect(Collectors.toMap({ obj: KTableMapping -> obj.tableCode }, Function.identity()))

        // check
        for (tagBaseField in tagBaseFieldList) {
            val kTableMapping = tagCodeTableMapping[tagBaseField.tableCode] ?: throw ExceptionHelper.bizError(String.format("tag code [%s] is non-exists", tagBaseField.tableCode))
            val fields = kTableMapping.fields
            val existsTagOption = fields.stream().noneMatch { kFieldMapping: KFieldMapping -> kFieldMapping.srcField.columnCode == tagBaseField.fieldCode }
            if (!existsTagOption) {
                throw ExceptionHelper.bizError(String.format("tag option [%s] is non-exists", tagBaseField.fieldCode))
            }
        }
        return kTableMappings
    }

}



/**
 * 宽表多维标签CH SQL 解析器
 * @author chenguangjian.jk
 * @date 2022-03-09 02:28:48
 */
@Service
class WideTableMultiDimCHSQLParser {
    val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)

    @Resource
    lateinit var commonParseUtils: CommonParseUtils

    /**
     * 宽表多维标签预估 SQL
     */
    fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        // Parse KunLunExpression
        return WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap)
        )
    }


    /**
     * 宽表多维标签圈选 SQL
     */
    fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        
        val csvFile = ""
        // Parse KunLunExpression
        return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap),
            csvFile = csvFile,
        )
    }


    fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val exprMap = tagIdxs.groupBy { it.kexprId }
        return genWhereClause(exprMap, requestDTO.expression)
    }


    private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String {
        val subExpression = kunLunExpression.subExpression
        if (CollectionUtils.isEmpty(subExpression)) { // 叶子节点
            return ""
        }

        val w = StringBuffer()
        val size = subExpression.size
        val logic = kunLunExpression.logic

        w.append("(")

        if (logic == LogicOperatorEnum.AND) {
            w.append("arrayIntersect(")
        } else if (logic == LogicOperatorEnum.OR) {
            w.append("arrayConcat(")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("arrayMap(x->multiIf(x not in arrayIntersect(")
        } else {
            throw IllegalArgumentException("logic $logic not supported!")
        }

        var firstTagIdx: Int = 1
        subExpression.forEachIndexed { index, e ->
            // 最叶子节点
            if (isLeafNode(e)) {
                val targetTagIdx = exprMap[e.tfId]?.get(0)
                val tagIdx = targetTagIdx!!.index

                // 计算差集使用
                if (index == 0) {
                    firstTagIdx = tagIdx
                }

                if (index != size - 1) {
                    w.append("t[$tagIdx],")
                } else {
                    w.append("t[$tagIdx]")
                }
            }
            // 递归非叶子节点
            else {
                w.append(genWhereClause(exprMap, e))
            }
        }

        if (logic == LogicOperatorEnum.AND || logic == LogicOperatorEnum.OR) {
            w.append("))")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("), x, NULL), t[$firstTagIdx]))")
        }

        return w.toString()
    }


    /**
     * 生成 arrayLines (最后一行没有: , 逗号)
    (select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
     */
    fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val size = tagIdxs.size
        val arrayLines = StringBuffer()

        tagIdxs.forEachIndexed { index, tagIdx ->
            if (index != size - 1) {
                arrayLines.append("(${tagIdx.conditionExpr}), \n")
            } else {
                arrayLines.append("(${tagIdx.conditionExpr})  \n")
            }
        }
        return arrayLines.toString()
    }


    /**
    select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
    from (
    select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
    array(
    (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
    ) = """
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""


    /**
    select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
    from (
    select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
    array(
    (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
    INTO OUTFILE 'tos:///xxx' FORMAT CSV
    settings distributed_perfect_shard=1,max_execution_time = 600
     */
    private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
        csvFile: String,
    ) = """
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
INTO OUTFILE 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
"""


}


/**
tagIdxList=[{"conditionExpr":"select groupUniqArray(user_id) from db1.table1 where (  cate_id = '1001'  ) and (  f1     = '1'   )","index":1,"kexprId":684563482,"tagCode":"t1","tagOptionCode":"f1"},{"conditionExpr":"select groupUniqArray(user_id) from db2.table2 where (  cate_id = '1002'  ) and (  f2     = '22'   )","index":2,"kexprId":684642314,"tagCode":"t2","tagOptionCode":"f2"},{"conditionExpr":"select groupUniqArray(user_id) from db2.table2 where (  shop_id = '798322'  ) and (  f3     = 333   )","index":3,"kexprId":568144263,"tagCode":"t2","tagOptionCode":"f3"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1004'  ) and (  f4     = '4'   )","index":4,"kexprId":684626037,"tagCode":"t3","tagOptionCode":"f4"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1005'  ) and (  f5     = 5   )","index":5,"kexprId":684627036,"tagCode":"t3","tagOptionCode":"f5"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1006'  ) and (  f6     = 6   )","index":6,"kexprId":684628027,"tagCode":"t3","tagOptionCode":"f6"}]
(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select groupUniqArray(user_id) from db1.table1 where (  cate_id = '1001'  ) and (  f1     = '1'   )),
(select groupUniqArray(user_id) from db2.table2 where (  cate_id = '1002'  ) and (  f2     = '22'   )),
(select groupUniqArray(user_id) from db2.table2 where (  shop_id = '798322'  ) and (  f3     = 333   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1004'  ) and (  f4     = '4'   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1005'  ) and (  f5     = 5   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1006'  ) and (  f6     = 6   ))
 */
fun main() {
    val requestDTO = SQLQueryReqDTO()
    val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
    val expression = KunLunExpression()
    expression.logic = LogicOperatorEnum.EXCEPT
    val subExpressionList = arrayListOf<KunLunExpression>()
    val e1 = KunLunExpression()
    val e2 = KunLunExpression()
    val e3 = KunLunExpression()

    val dimList = listOf(
        FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
        FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
    )

    e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
    e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
    e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
    e3.logic = LogicOperatorEnum.AND

    val e3SubExpressionList = arrayListOf<KunLunExpression>()
    val e31 = KunLunExpression()
    val e32 = KunLunExpression()
    val e33 = KunLunExpression()
    e3SubExpressionList.add(e31)
    e3SubExpressionList.add(e32)
    e3SubExpressionList.add(e33)
    e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
    e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
    e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
    e3.subExpression = e3SubExpressionList

    subExpressionList.add(e1)
    subExpressionList.add(e2)
    subExpressionList.add(e3)
    expression.subExpression = subExpressionList
    requestDTO.expression = expression

    // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
    // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
    // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
    // KFieldMapping(KField srcField, KField dstField)

    tableMappingMap["t1"] = listOf(KTableMapping(
        "t1",
        "table1",
        "db1",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db1", "table1", "user_id"),
        listOf(
            KFieldMapping(
                KField("f1", "", KFieldValueType.STRING, ""), // srcField
                KField("f1", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t2"] = listOf(KTableMapping(
        "t2",
        "table2",
        "db2",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db2", "table2", "user_id"),
        listOf(
            KFieldMapping(
                KField("f2", "", KFieldValueType.STRING, ""), // srcField
                KField("f2", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f3", "", KFieldValueType.LONG, ""), // srcField
                KField("f3", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t3"] = listOf(KTableMapping(
        "t3",
        "table3",
        "db3",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db3", "table3", "user_id"),
        listOf(
            KFieldMapping(
                KField("f4", "", KFieldValueType.STRING, ""), // srcField
                KField("f4", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f5", "", KFieldValueType.LONG, ""), // srcField
                KField("f5", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("f6", "", KFieldValueType.LONG, ""), // srcField
                KField("f6", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    val WideTableMultiDimCHSQLParser = WideTableMultiDimCHSQLParser()
    val expr = WideTableMultiDimCHSQLParser.expr(requestDTO, tableMappingMap)
    val arrayLines = WideTableMultiDimCHSQLParser.arrayLines(requestDTO, tableMappingMap)

    println(expr)
    println(arrayLines)
}









/**
 * 宽表多维标签 HIVE SQL 解析器
 * @author chenguangjian.jk
 * @date 2022-03-09 02:28:48
 */
@Service
class WideTableMultiDimHiveSQLParser {

    val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
    @Resource
    lateinit var commonParseUtils: CommonParseUtils
    /**
     * 宽表多维标签预估 SQL
     */
    fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        // Parse KunLunExpression
        return WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap)
        )
    }


    /**
     * 宽表多维标签圈选 SQL
     */
    fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
       
        val csvFile = ""
        // Parse KunLunExpression
        return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap),
            csvFile = csvFile,
        )
    }


    fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val exprMap = tagIdxs.groupBy { it.kexprId }
        return genWhereClause(exprMap, requestDTO.expression)
    }


    private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String {
        val subExpression = kunLunExpression.subExpression
        if (CollectionUtils.isEmpty(subExpression)) { // 叶子节点
            return ""
        }

        val w = StringBuffer()
        val size = subExpression.size
        val logic = kunLunExpression.logic

        w.append("(")

        if (logic == LogicOperatorEnum.AND) {
            w.append("array_intersect(")
        } else if (logic == LogicOperatorEnum.OR) {
            w.append("array_union(")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("array_except(")
        } else {
            throw IllegalArgumentException("logic $logic not supported!")
        }

        var firstTagIdx: Int = 1
        subExpression.forEachIndexed { index, e ->
            // 最叶子节点
            if (isLeafNode(e)) {
                val targetTagIdx = exprMap[e.tfId]?.get(0)
                val tagIdx = targetTagIdx!!.index

                // 计算差集使用
                if (index == 0) {
                    firstTagIdx = tagIdx
                }

                if (index != size - 1) {
                    w.append("t[$tagIdx],")
                } else {
                    w.append("t[$tagIdx]")
                }
            }
            // 递归非叶子节点
            else {
                w.append(genWhereClause(exprMap, e))
            }
        }

        w.append("))")
        return w.toString()
    }


    /**
     * 生成 arrayLines (最后一行没有: , 逗号)
    (select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
     */
    fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val size = tagIdxs.size
        val arrayLines = StringBuffer()

        tagIdxs.forEachIndexed { index, tagIdx ->
            if (index != size - 1) {
                arrayLines.append("(${tagIdx.conditionExpr}), \n")
            } else {
                arrayLines.append("(${tagIdx.conditionExpr})  \n")
            }
        }
        return arrayLines.toString()
    }



    /**
    select  size(t.res) as cnt
    from (
    select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
    array(
    (select collect_set(UserID) from hits_v1 where Sex = 1),
    (select collect_set(UserID) from hits_v1 where Age > 18),
    (select collect_set(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
    ) = """
select size(t.res) as cnt
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""


    /**
    select explode(t.res) as ids
    from (
    select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
    array(
    (select collect_set(UserID) from hits_v1 where Sex = 1),
    (select collect_set(UserID) from hits_v1 where Age > 18),
    (select collect_set(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
        csvFile: String,
    ) = """
select explode(t.res) as ids
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""






}


/**
WideTableMultiDimCHSQLParser - tagIdxList=[{"conditionExpr":"select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )","index":1,"kexprId":-316732738,"tagCode":"t1","tagOptionCode":"f1"},{"conditionExpr":"select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )","index":2,"kexprId":-316653905,"tagCode":"t2","tagOptionCode":"f2"},{"conditionExpr":"select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )","index":3,"kexprId":-315132611,"tagCode":"t2","tagOptionCode":"f3"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )","index":4,"kexprId":127438862,"tagCode":"t3","tagOptionCode":"f4"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )","index":5,"kexprId":127439854,"tagCode":"t3","tagOptionCode":"f5"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   )","index":6,"kexprId":-316668196,"tagCode":"t3","tagOptionCode":"f6"}]
(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))
 */
fun main() {
    val requestDTO = SQLQueryReqDTO()
    val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
    val expression = KunLunExpression()
    expression.logic = LogicOperatorEnum.EXCEPT
    val subExpressionList = arrayListOf<KunLunExpression>()
    val e1 = KunLunExpression()
    val e2 = KunLunExpression()
    val e3 = KunLunExpression()

    val dimList = listOf(
        FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
        FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
    )

    e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
    e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
    e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
    e3.logic = LogicOperatorEnum.AND

    val e3SubExpressionList = arrayListOf<KunLunExpression>()
    val e31 = KunLunExpression()
    val e32 = KunLunExpression()
    val e33 = KunLunExpression()
    e3SubExpressionList.add(e31)
    e3SubExpressionList.add(e32)
    e3SubExpressionList.add(e33)
    e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
    e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
    e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
    e3.subExpression = e3SubExpressionList

    subExpressionList.add(e1)
    subExpressionList.add(e2)
    subExpressionList.add(e3)
    expression.subExpression = subExpressionList
    requestDTO.expression = expression

    // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
    // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
    // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
    // KFieldMapping(KField srcField, KField dstField)

    tableMappingMap["t1"] = listOf(KTableMapping(
        "t1",
        "table1",
        "db1",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db1", "table1", "user_id"),
        listOf(
            KFieldMapping(
                KField("f1", "", KFieldValueType.STRING, ""), // srcField
                KField("f1", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t2"] = listOf(KTableMapping(
        "t2",
        "table2",
        "db2",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db2", "table2", "user_id"),
        listOf(
            KFieldMapping(
                KField("f2", "", KFieldValueType.STRING, ""), // srcField
                KField("f2", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f3", "", KFieldValueType.LONG, ""), // srcField
                KField("f3", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t3"] = listOf(KTableMapping(
        "t3",
        "table3",
        "db3",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db3", "table3", "user_id"),
        listOf(
            KFieldMapping(
                KField("f4", "", KFieldValueType.STRING, ""), // srcField
                KField("f4", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f5", "", KFieldValueType.LONG, ""), // srcField
                KField("f5", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("f6", "", KFieldValueType.LONG, ""), // srcField
                KField("f6", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    val WideTableMultiDimHiveSQLParser = WideTableMultiDimHiveSQLParser()
    val expr = WideTableMultiDimHiveSQLParser.expr(requestDTO, tableMappingMap)
    val arrayLines = WideTableMultiDimHiveSQLParser.arrayLines(requestDTO, tableMappingMap)

    println(expr)
    println(arrayLines)
}
上一篇下一篇

猜你喜欢

热点阅读