测试例子:
字段名(Kafka中的信息) |
类型 |
|
userId |
long |
用户id |
itemId |
long |
商品id |
categoryId |
long |
类别id |
behaviorType |
String |
pv 商品详情页,pv等价于点击| buy 商品购买 |cart 将商品加入购物车| fav 收藏商品 |
timestamp |
long |
时间戳 |
转换
过滤:排除pv的用户行为
将behaviorType
转换成:buy->商品购买 cart->将商品加入购物车 fav->收藏商品
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| SELECT `userId` AS 'user_id', `itemId` AS 'item_id', `categoryId` AS 'category_id', CASE WHEN `behaviorType` == 'buy' THEN '商品购买' CASE WHEN `behaviorType` == 'cart' THEN '将商品加入购物车' CASE WHEN `behaviorType` == 'fav' THEN '收藏商品' ELSE '未知' END AS 'behavior_type', `timestamp` AS 'timestamp', 1 AS 'times' FROM 'record' FOR 'dwd_user_behavior' KEY BY `timestamp` as 'timestamp',`userId` as 'user_id' PARTITION `userId` UNIQUE BY `userId`,`itemId`,`categoryId`,`timestamp` OPERATION 'OPERATION_INSERT'
|
转换定义:DefRecordTransform
1 2 3 4 5 6 7 8 9 10 11 12
| { "defRecordTransformId": "ada151ce-1113-430a-88eb-0f3880ad1a9f", "defName": "cwx-user-behavior", "defDescription": "cwx-user-behavior", "topicName": "user_behavior", "recordFilter": "`behaviorType` != 'pv'", "transformDef": "SELECT\n\t`userId` AS 'user_id',\n\t`itemId` AS 'item_id',\n\t`categoryId` AS 'category_id',\nCASE\n\tWHEN `behaviorType` == 'buy' THEN\n\t'商品购买'\nCASE\n\tWHEN `behaviorType` == 'cart' THEN\n\t'将商品加入购物车'\nCASE\n\tWHEN `behaviorType` == 'fav' THEN\n\t'收藏商品' ELSE '未知' \n\tEND AS 'behavior_type',\n\t`timestamp` AS 'timestamp',\n\t1 AS 'times' \nFROM\n\t'record' \nFOR 'dwd_user_behavior' \nKEY BY `timestamp` as 'timestamp',`userId` as 'user_id'\nPARTITION `userId` \nUNIQUE BY `userId`,`itemId`,`categoryId`,`timestamp`\nOPERATION 'OPERATION_INSERT'", "sourceInfo": "user_behavior", "targetInfo": "dwd_user_behavior", "chainType": 0, "transformType": 3 }
|
OperateContext:
1 2 3 4 5 6 7 8 9 10
| { "topic": "user_behavior", "record": { "userId": 179, "itemId": 146, "categoryId": 5, "behaviorType": "fav", "timestamp": 1620715517527 } }
|
OperateResult:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| [ { "times": 1, "category_id": 5, "user_id": 179, "item_id": 146, "behavior_type": "收藏商品", "_key": { "user_id": 179, "timestamp": 1620715517527 }, "_m": { "p": "179", "i": "17914651620715517527", "n": "dwd_user_behavior", "o": 0 }, "timestamp": 1620715517527 } ]
|
计算算子定义
转换定义:
1 2 3 4 5 6 7 8 9 10 11 12
| SELECT `user_id` AS 'user_id', `item_id` AS 'item_id', `category_id` AS 'category_id', `behavior_type` AS 'behavior_type', `sum` AS 'user_behavior_num', `timestamp` AS 'timestamp' FROM 'record' FOR 'dws_user_behavior' KEY BY `user_id` AS 'user_id' PARTITION `user_id` UNIQUE BY `user_id`,`item_id`,`category_id`
|
结果:
OperateContext:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| { "record": { "times": 1, "category_id": 96, "user_id": 81, "item_id": 19, "behavior_type": "商品购买", "_key": { "user_id": 81, "timestamp": 1620715203646 }, "_m": { "p": "81", "i": "8119961620715203646", "n": "dwd_user_behavior", "o": 0 }, "timestamp": 1620715203646 } }
|
Map:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| { "record": { "times": 1, "category_id": 96, "user_id": 81, "item_id": 19, "behavior_type": "商品购买", "_key": { "user_id": 81, "timestamp": 1620715203646 }, "_m": { "p": "81", "i": "8119961620715203646", "n": "dwd_user_behavior", "o": 0 }, "timestamp": 1620715203646 }, "_m": { "p": "s|dws_user_behavior|null" }, "dws_user_behavior": { "sum": [ 1 ] } }
|
OperateResult:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| [ { "category_id": 96, "user_id": 81, "item_id": 19, "behavior_type": "商品购买", "_key": { "user_id": 81 }, "_m": { "p": "81", "i": "811996", "n": "dws_user_behavior" }, "user_behavior_num": 1, "timestamp": 1620715203646 } ]
|
计算定义;
1 2 3 4 5 6 7 8
| SELECT `user_id` AS 'user_id', sum(`times`) AS 'sum' FROM 'record' GROUP BY `userId` FOR 'dws_user_behavior'
|
OperateContext:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| { "record": { "times": 1, "category_id": 96, "user_id": 81, "item_id": 19, "behavior_type": "商品购买", "_key": { "user_id": 81, "timestamp": 1620715203646 }, "_m": { "p": "81", "i": "8119961620715203646", "n": "dwd_user_behavior", "o": 0 }, "timestamp": 1620715203646 } }
|
OperateResult:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| [ { "code": "dws_user_behavior", "user_id": 81, "sum": [ 1 ], "groupBy": [ null ], "_key": {}, "_m": {} } ]
|
过滤: