GIS后端

How it works(8) GDAL2Mbtiles源码阅读

2019-03-01  本文已影响2人  默而识之者

vips.py

最重要的切图模块,使用的是libvips这一快速轻量的c++模块的py绑定pyvips,这也是g2m里最大的一个模块.
切图主要分为两部分:

瓦片分割

瓦片分割是g2m的核心功能.基本流程是:

  1. 获取图片(直接使用原片或重采样后的图片)
  2. 从图片中裁取固定大小
  3. 计算该部分所属行列号,进行存储.

因为实际使用中,基本不可能只取某个特定的级别,因此对于低缩放级别的向下采样和超出地图本身分辨率的向上采样就无法避免.
对于需要重采样的缩放级别,g2m在采样前会先对整图进行仿射变换,再直接在瓦片上裁切,而不是裁切后再进行重采样,因为这样的效果会相当不好.
切图的整体流程如下:

# 先进行原片切割
if min_resolution <= self.resolution <= max_resolution:
          self.slice_native(tiles, fill_borders=fill_borders)
# 需要的话进行向下采样
if 0 <= min_resolution < self.resolution:
    self.slice_downsample(tiles=tiles,
                          min_resolution=min_resolution,
                          max_resolution=max_resolution,
                          fill_borders=fill_borders)
# 需要的话进行向上采样
if self.resolution < max_resolution:
    self.slice_upsample(tiles=tiles,
                        min_resolution=min_resolution,
                        max_resolution=max_resolution,
                        fill_borders=fill_borders)

重采样

重采样就需要对整个图片进行按级别缩放,也就是图片的仿射变换.
以向下采样为例(向上采样思路是完全一样的):

def slice_downsample(self, tiles, min_resolution, max_resolution=None,
                         fill_borders=None):
        """
        这里我们假设一个原片A,A的分辨率级别是18级,我们要裁切12级到20级的级别
        """
        # 对于向下采样,最大的级别也应该比原图级别小一级
        # 这里,最大级别变为17级
        if max_resolution is None or max_resolution >= self.resolution:
            max_resolution = self.resolution - 1

        with LibVips.disable_warnings():
            # 将图片向下采样到原片级别下的最小预定级别
            # 这里,A被向下重采样1级
            tiles = tiles.downsample(levels=(self.resolution - max_resolution),)
            for res in reversed(list(range(min_resolution, max_resolution + 1))):
        # 裁切重采样后的A
                tiles._slice()
        # 切完本层,就对下一层进行重采样,直到最小级别
                if res > min_resolution:
                    tiles = tiles.downsample(levels=1)
                    
    def downsample(self, levels=1):
        """
     向下采样图片,返回一个重采样后的图片
        """
        image = self.image
        offset = self.offset
    # 永远继承自原片的缩放级别
        parent = self._parent if self._parent is not None else self
        parent_resolution = parent.resolution
        parent_size = VImageAdapter(parent.image).BufferSize()
        # 因为限定了level为1,所以这个循环只会执行1次
        for res in reversed(list(range(self.resolution - levels, self.resolution))):
           # 每下降一级,像素偏移的X,Y也减半(向上采样的话,像素偏移翻倍)
            offset /= 2.0
            # 进行仿射变换,每下降一级,图片变为原来的四分之一(向上采样的话,每一级变为原来的四倍)
            shrunk = VImageAdapter(image).shrink_affine(xscale=0.5, yscale=0.5)
            # 将变换后的图像修正到正确的位置
            image = VImageAdapter(shrunk).tms_align(tile_width=self.tile_width,
                                     tile_height=self.tile_height,
                                     offset=offset)
            offset = offset.floor()
            # VIPS有一个特性:他会在你渲染时,自动向下重采样几个数量级(这里IMAGE_BUFFER_INTERVAL是4个)到内存中,以便加快速度.
            # 因此,如果想持续加速,就需要在原片向下采样4级后,主动进行重采样到缓存中.
            if parent_resolution - res >= self.IMAGE_BUFFER_INTERVAL:
                if parent_size < self.IMAGE_BUFFER_MEMORY_THRESHOLD:
                    # 对于过小的图片(这里IMAGE_BUFFER_MEMORY_THRESHOLD是1M),没有必要缓存了
                    continue
        # 将图片写入缓存中(超过1M写入内存,超过1G则写入磁盘)
                image = self.write_buffer(image=image, resolution=res)
                parent_resolution = res
                parent_size = VImageAdapter(image).BufferSize()
    # 对于进行过如上缓存的图片重新设定父级
        if parent_resolution < parent.resolution:
            parent = None
      # 因为限制了level为1,所以下面这种情况不会发生.
      # 当level不为1,可能会出现最终的缩放级别与缓存的缩放级别不一致,需要重新生成
            if parent_resolution != res:
                image = self.write_buffer(image=image, resolution=res)
    # 重新生成图像
        result = self.__class__(image=image,
                                storage=self.storage,
                                tile_width=self.tile_width,
                                tile_height=self.tile_height,
                                offset=offset,
                                resolution=res)
        # 绑定父图像
        result._parent = parent
        return result

在其中使用了两个关键的方法:仿射变换和修正偏移.

仿射变换

图片的放大与缩小就涉及到仿射变换了:

def shrink_affine(self, xscale, yscale, output_size=None):
    return self._scale(
        xscale=xscale, yscale=yscale, output_size=output_size, interpolate='bilinear'
    )
    
 def _scale(self, xscale, yscale, output_size, interpolate):
        if output_size is None:
            if XY(x=xscale, y=yscale) > XY(x=1.0, y=1.0):
              # 放大后图像的长宽
                output_width = int(ceil(self.image.width * xscale))
                output_height = int(ceil(self.image.height * yscale))
            else:
              # 缩小后的长宽
                output_width = int(floor(self.image.width * xscale))
                output_height = int(floor(self.image.height * yscale))
        else:
            output_width, output_height = output_size

     # 因为不进行变形,所以b,c为0.变换矩阵如下:
        # [[xscale, 0],
        # [ 0, yscale]]
        a, b, c, d = xscale, 0, 0, yscale
        if interpolate == 'near':
          # 最小近邻法时没有偏移
            offset_x = offset_y = 0
        else:
            # 对齐拐角
            offset_x = (a - 1) / 2
            offset_y = (d - 1) / 2

        output_x, output_y = 0, 0
        return self.affine(a=a, b=b, c=c, d=d, dx=offset_x, dy=offset_y,
                           ox=output_x, oy=output_y,
                           ow=output_width, oh=output_height,
                           interpolate=interpolate)
    
def affine(self, a, b, c, d, dx, dy, ox, oy, ow, oh,
           interpolate='bilinear'):
    if interpolate == 'near':
        interpolate = 'nearest'

    if interpolate == 'bilinear' or interpolate == 'nearest':
        interpolate = Interpolate.new(interpolate)

    image = self.image.affine(
        [a, b, c, d],
        interpolate=interpolate,
        oarea=[ox, oy, ow, oh],
        odx=dx, ody=dy,
        idx=0, idy=0
    )
    # 将输出图像和原始图像关联起来,避免原始图像在垃圾回收中在c++库中被销毁
    image.__inputref = self.image
    return image

修正偏移

好吧我并不很明确什么时候会发生如下的偏移,似乎是当瓦片的像素坐标不是整数时会发生.

   def tms_align(self, tile_width, tile_height, offset):
        # 修正缩放后的偏移
        x = int(round(offset.x * tile_width)) % tile_width
        y = int(round(self.image.height - offset.y * tile_height)) % tile_height
        tiles_x = ceil((self.image.width + x / 2) / tile_width)
        tiles_y = ceil((self.image.height + y / 2) / tile_height)
        width = int(tiles_x * tile_width)
        height = int(tiles_y * tile_height)
        if width == self.image.width and height == self.image.height:
            # 非重采样模式不改变图像
            assert x == y == 0
            return self.image

        # 将图像镶嵌在一个纠正偏移后的新图像上
        return self.image.embed(
            x, y, width, height, background=[0, 0, 0, 0] # Transparent
        )

切割图像与存储

完成了图片的重采样,就可以对采样后的图片切割了.当然,对于原始缩放级别,不需要进行重采样,可以直接进入这一步.

def _slice(self):
        with LibVips.disable_warnings():
          # 因为处理好了缩放和偏移,所以裁切时就可以不用考虑其他干扰因素
            for y in range(0, self.image_height, self.tile_height):
                for x in range(0, self.image_width, self.tile_width):
                    out = self.image.extract_area(x, y,self.tile_width, self.tile_height)
                    # 计算瓦片的所在行列号
                    offset = XY(
                        x=int(x / self.tile_width + self.offset.x),
                        y=int((self.image_height - y) / self.tile_height +
                              self.offset.y - 1)
                    )
                    # 保存到存储中(可能是mbtiles也可能是本地文件)
                    self.storage.save(x=offset.x, y=offset.y,
                                      z=self.resolution,
                                      image=out)

在代码里一直出现的偏移(offset),其实就是该地图的左下角坐标对应的行列号.是以右与上为正方向的坐标系.但对vips处理图片,则是左上为原点,有与正下为正方向.因此在计算不同瓦片的行列号时,需要进行转换.

波段处理

虽然波段处理不是必要的,但g2m依然提供了功能,它对波段的处理采用了纯数学的方式.其基本流程是:

  1. 将图片转换为Numpy格式
  2. 利用NumExpr,运算表达式,完成波段处理
  3. 将Numpy格式的数据转回图像

其中比较特别的方法是g2m采用表达式进行颜色转换的计算.

def colorize(self, image, nodata=None):
    # 将数据转换为numpy格式
    data = numpy.frombuffer(buffer=image.write_to_memory(),
                            dtype=VImageAdapter(image).NumPyType())

    # 利用numexpr将色彩采样成标准rgba
    bands = self._colorize_bands(data=data, nodata=nodata)

    # 将波段融合到一个新的vips图像中
    images = [VImageAdapter.from_numpy_array(
        array=band, width=image.width, height=image.height, bands=1,
        format='uchar'
    ) for band in bands]
    return VImageAdapter.gbandjoin(bands=images)
    
def _colorize_bands(self, data, nodata=None):
  # 遍历rgba
  for band in 'rgba':
      expr = self._expression(band=band, nodata=nodata)
      if expr is None:
          # 没有表达式就图像整体置为背景色
          array = numpy.empty(shape=data.size, dtype=numpy.uint8)
          array.fill(self._background(band=band))
          yield array
      else:
          # 执行表达式
          yield numexpr.evaluate(self._expression(band=band,
                                                  nodata=nodata),
                                 local_dict=dict(n=data.copy()),
                                 global_dict={})
    
def _expression(self, band, nodata=None):
  # 不同的颜色策略使用不同的_clauses方法
    clauses = self._clauses(band=band, nodata=nodata)
  # 设置默认背景色
    result = str(getattr(self.BACKGROUND, band))
    # 拼接表达式
    for expression, true_value in clauses:
        result = 'where({expression}, {true}, {false})'.format(
            expression=expression, true=true_value, false=result
        )
    return result

图片的处理与用户定义的处理方法有关,这里,以特定颜色转换这种处理方法为例:

def _clauses(self, band, nodata=None):
  # 颜色表,获取在指定的波段,什么颜色会被替换为什么颜色.
  # 替换的规则形如
  # {
  #     0:rgba(255,0,0,255)
  # }
  # 对于r波段,原始图像为0,则会被替换为255
    colors = self._colors(band=band)
    background = self._background(band=band)
  # 将会生成形如[('n == 0',255)]的表达式,
  # numexpr模块会识别并在numpy里加速执行
    return [('n == {0!r}'.format(band_value),  
             color)                            
            for band_value, color in colors
            if band_value != nodata and color != background]
上一篇下一篇

猜你喜欢

热点阅读