简书FM ——十万个冷热知识阿里云

如何在MaxCompute中利用bitmap进行数据处理?

2020-03-11  本文已影响0人  阿里云技术

很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。

本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。

import com.aliyun.odps.OdpsException;

import com.aliyun.odps.data.Record;

import com.aliyun.odps.data.TableInfo;

import com.aliyun.odps.mapred.JobClient;

import com.aliyun.odps.mapred.MapperBase;

import com.aliyun.odps.mapred.ReducerBase;

import com.aliyun.odps.mapred.conf.JobConf;

import com.aliyun.odps.mapred.utils.InputUtils;

import com.aliyun.odps.mapred.utils.OutputUtils;

import com.aliyun.odps.mapred.utils.SchemaUtils;

import org.roaringbitmap.RoaringBitmap;

import org.roaringbitmap.buffer.ImmutableRoaringBitmap;

import java.io.DataOutputStream;

import java.io.IOException;

import java.io.OutputStream;

import java.nio.ByteBuffer;

import java.util.Base64;

import java.util.Iterator;

public class bitmapDemo2

{

    public static class BitMapper extends MapperBase {

        Record key;

        Record value;

        @Override

        public void setup(TaskContext context) throws IOException {

            key = context.createMapOutputKeyRecord();

            value = context.createMapOutputValueRecord();

        }

        @Override

        public void map(long recordNum, Record record, TaskContext context)

                throws IOException

        {

            RoaringBitmap mrb=new RoaringBitmap();

            long AID=0;

            {

                {

                    {

                        {

                            AID=record.getBigint("id");

                            mrb.add((int) AID);

                            //获取key

                            key.set(new Object[] {record.getString("active_date")});

                        }

                    }

                }

            }

            ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());

            mrb.serialize(new DataOutputStream(new OutputStream(){

                ByteBuffer mBB;

                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}

                public void close() {}

                public void flush() {}

                public void write(int b) {

                    mBB.put((byte) b);}

                public void write(byte[] b) {mBB.put(b);}

                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}

            }.init(outbb)));

            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());

            value.set(new Object[] {serializedstring});

            context.write(key, value);

        }

    }

    public static class BitReducer extends ReducerBase {

        private Record result = null;

        public void setup(TaskContext context) throws IOException {

            result = context.createOutputRecord();

        }

        public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {

            long fcount = 0;

            RoaringBitmap rbm=new RoaringBitmap();

            while (values.hasNext())

            {

                Record val = values.next();

                ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));

                ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);

                RoaringBitmap p= new RoaringBitmap(irb);

                rbm.or(p);

            }

            ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());

            rbm.serialize(new DataOutputStream(new OutputStream(){

                ByteBuffer mBB;

                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}

                public void close() {}

                public void flush() {}

                public void write(int b) {

                    mBB.put((byte) b);}

                public void write(byte[] b) {mBB.put(b);}

                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}

            }.init(outbb)));

            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());

            result.set(0, key.get(0));

            result.set(1, serializedstring);

            context.write(result);

        }

    }

    public static void main( String[] args ) throws OdpsException

    {

        System.out.println("begin.........");

        JobConf job = new JobConf();

        job.setMapperClass(BitMapper.class);

        job.setReducerClass(BitReducer.class);

        job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));

        job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));

        InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);

//        +------------+-------------+

//        | id        | active_date |

//        +------------+-------------+

//        | 1          | 20190729    |

//        | 2          | 20190729    |

//        | 3          | 20190730    |

//        | 4          | 20190801    |

//        | 5          | 20190801    |

//        +------------+-------------+

        OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);

//        +-------------+------------+

//        | active_date | bit_map    |

//        +-------------+------------+

//        20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D

//        20190730,OjAAAAEAAAAAAAAAEAAAAAMA

//        20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D

        JobClient.runJob(job);

    }

}

对Java应用打包后,上传到MaxCompute项目中,即可在MaxCompute中调用该MR作业,对输入表的数据按日期作为key进行用户id的编码,同时按照相同日期对bitmap后的用户id取OR操作(根据需要可以取AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。

上一篇 下一篇

猜你喜欢

热点阅读