004 pyspark对应于 scalaspark的实现(参见0

2019-11-30  本文已影响0人  逸章
image.png
>>> from decimal import Decimal
>>> acTransList = ["SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10"]
>>> acTransRDD = sc.parallelize(acTransList)
>>> goodTransRecords = acTransRDD.filter(lambda trans: Decimal(trans.split(",")[1]) > 0).filter(lambda trans: (trans.split(",")[0]).startswith('SB') == True)
>>> highValueTransRecords = goodTransRecords.filter(lambda trans: Decimal(trans.split(",")[1]) > 1000)
>>> badAmountLambda = lambda trans: Decimal(trans.split(",")[1]) <= 0
>>> badAcNoLambda = lambda trans: (trans.split(",")[0]).startswith('SB') == False
>>> badAmountRecords = acTransRDD.filter(badAmountLambda)
>>> badAccountRecords = acTransRDD.filter(badAcNoLambda)
>>> badTransRecords = badAmountRecords.union(badAccountRecords)
>>> acTransRDD.collect()
['SB10001,1000', 'SB10002,1200', 'SB10003,8000', 'SB10004,400', 'SB10005,300', 'SB10006,10000', 'SB10007,500', 'SB10008,56', 'SB10009,30', 'SB10010,7000', 'CR10001,7000', 'SB10002,-10']
>>> goodTransRecords.collect()
['SB10001,1000', 'SB10002,1200', 'SB10003,8000', 'SB10004,400', 'SB10005,300', 'SB10006,10000', 'SB10007,500', 'SB10008,56', 'SB10009,30', 'SB10010,7000']
>>> highValueTransRecords.collect()
['SB10002,1200', 'SB10003,8000', 'SB10006,10000', 'SB10010,7000']
>>> badAccountRecords.collect()
['CR10001,7000']
>>> badAmountRecords.collect()
['SB10002,-10']
>>> badTransRecords.collect()
['SB10002,-10', 'CR10001,7000']
>>> sumAmounts = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a+b)
>>> maxAmount = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a if a > b else b)
>>> sumAmounts
Decimal('28486')
>>> maxAmount
Decimal('10000')
>>> minAmount = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a if a < b else b)
>>> minAmount
Decimal('30')
>>> combineAllElements = acTransRDD.flatMap(lambda trans: trans.split(","))
>>> combineAllElements.collect()
['SB10001', '1000', 'SB10002', '1200', 'SB10003', '8000', 'SB10004', '400', 'SB10005', '300', 'SB10006', '10000', 'SB10007', '500', 'SB10008', '56', 'SB10009', '30', 'SB10010', '7000', 'CR10001', '7000', 'SB10002', '-10']
>>> allGoodAccountNos = combineAllElements.filter(lambda trans: trans.startswith('SB') == True)
>>> allGoodAccountNos.distinct().collect()
['SB10007', 'SB10010', 'SB10003', 'SB10006', 'SB10002', 'SB10005', 'SB10009', 'SB10001', 'SB10004', 'SB10008']
>>> 
image.png
上一篇下一篇

猜你喜欢

热点阅读