本文共 19750 字,大约阅读时间需要 65 分钟。
在地图服务领域,我们的选择其实是不多的.商业的首推arcgis server,开源的一般是mapserver和geoserver.这些专业地图服务其专业性很强,强大到可以满足几乎所有地图需求.不过正是因为有许多我们日常几乎用不到的专业功能,它们都十分的重,无论是从代码结构还是运行消耗(使用java编写的geoserver和arcgis server尤甚)上.
轻量级的地图服务也有不少,但几乎完全不被市场认可,因为对于大多数GIS人员,更需要的是拿来就用,拿来就一定能满足需求的集成型地图服务.不过实际情况是,专业性地理需求并没有那么大,而对于灵活性,与其他业务的联系性等等非专业性需求更大.我比较了解的geoserver,是基于geotools编写的开源的纯java的地图服务.看过源码,确实是个庞大的项目,想要对其进行修改确实需要对代码以及geotools深层的理解.因此,我需要一个麻雀虽小五脏俱全的地图服务,我找到了Tilestrata和tilestache,这次我先阅读Tilestrata的源码.
根据官方的描述,Tilestrata是一个可拔插的nodejs地图瓦片服务.它的配置完全基于代码:
var Tilestrata = require('Tilestrata');var disk = require('Tilestrata-disk');var sharp = require('Tilestrata-sharp');var mapnik = require('Tilestrata-mapnik');var dependency = require('Tilestrata-dependency');var strata = Tilestrata();//定义图层strata.layer('basemap').route('tile@2x.png').use(disk.cache({ dir: '/var/lib/tiles/basemap'})).use(mapnik({ pathname: '/path/to/map.xml',tileSize: 512,scale: 2})).route('tile.png').use(disk.cache({ dir: '/var/lib/tiles/basemap'})).use(dependency('basemap', 'tile@2x.png')).use(sharp(function(image, sharp) { return image.resize(256);}));//开始接受请求strata.listen(8080);
很显然,它从一开始面向的就是灵活的实际业务而非学术.它的设计思路是:
我主要带着两个思考去阅读Tilestrata的源码:
首先与以往一样,是madge生成的文件结构:
可以看出,Tilestrata的主体并不大,这正是其插件化的体现,整个Tilestrata并不包含任何实际功能,只是一个可以容纳插件,编排动作的框架.主要模块的导出,实际上并没有起什么具体作用.
Tileserver.js是整个服务的真正入口与承载者.
这里主要实现了四个功能:负载均衡属于可选项,且需要配合负载均衡服务,后面有机会会说,我们主要来看一下另外几个主要的部分:
tilestra是地图服务器,其图层,文件名(路由)和插件对应于一般服务的一级路由,二级路由和中间件.
对于地图服务器,图层是很普遍的,但一般的简单的地图服务器也就实现一个基础的png类型. 不过Tilestrata的文件名作为路由的模式使得我们可以自定格式,无论是png,jpg这种图片,还是png@2x这种应对高dpi环境的图片,甚至是json,pbf这种矢量格式都是可能的.TileServer.prototype.initialize = function (callback) { if (this.initialized) return callback(); var self = this; var done = function (err) { //这里使用了async.memoize我觉的应该是加速/防止二次初始化 self.initialize = async.memoize(TileServer.prototype.initialize.bind(self)); if (callback) callback(err); }; //遍历所有图层 async.each(_.values(this.layers), function (layer, callback) { //遍历每图层的每个路由 async.each(_.values(layer.routes), function (route, callback) { //初始化每个插件 route.handler._initialize(self, callback); }, function (err) { if (err) err = new Error('Unable to initialize "' + layer.name + '" layer: "' + (err.message || err) + '"'); callback(err); }); }, function (err) { if (!err) { self.start_time = Date.now(); self.initialized = true; } done(err); });};
Tilestrata使用了原生的http模块处理请求:
TileServer.prototype.listen = function (port) { var self = this; var args = Array.prototype.slice.call(arguments); this.http_port = port; var server = this.http_server = http.createServer(function (req, res) { //绑定处理请求 self._handleRequest(req, res); }); //初始化完成后监听端口 self.initialize(function (err) { //把参数apply到listen上,就可以像调用普通的httpserver的listen方法一样调用tileserer的listen方法 server.listen.apply(server, args); }); return server;};
请求处理是Tilestrata的核心了.在这部分,Tilestrata的各种插件承载了地图处理的所有用到的功能.
Tilestrata的插件有如下几种类型:因此,请求处理其实就是请求以及返回的结果如何在插件之间流转的过程.
请求到达服务,第一个进入最简单的路由判断:
TileServer.prototype._handleRequest = function (req, res, next) { //对非地图瓦片请求进行处理 if (req.url === '/health') { return route_health(req, res, this); } else if (req.url === '/profile' || req.url === '/profile?format=json') { return route_profile(req, res, this); } else if (req.url === '/robots.txt') { return route_robots(req, res, this); } //将求封装 var tilereq = TileRequest.parse(req.url, req.headers, req.method); //处理请求 this.serve(tilereq, { req: req, res: res }, function (status, buffer, headers) { if (next && status === 404) return next(); res.writeHead(status, headers); //响应用户 res.write(buffer); res.end(); });};
接着进入重头,完成请求在所有插件里的流转:
TileServer.prototype.serve = function (req, http, callback) { var self = this; var _method = req.method; if (req.method === 'HEAD') { _method = 'GET'; } //根据请求的图层,获取该图层所注册的所有插件 var handler = this._getTileHandler(req); //处理各种不匹配的情况 if (!handler) { return callback(404, BUFFER_NOTFOUND, { 'X-Powered-By': HEADER_XPOWEREDBY, 'Content-Length': BUFFER_NOTFOUND.length }); } else if (_method !== _method.toUpperCase() || !handler[_method]) { return callback(501, BUFFER_NOTIMPLEMENTED, { 'X-Powered-By': HEADER_XPOWEREDBY, 'Content-Length': BUFFER_NOTIMPLEMENTED.length }); } var result = { }; //串行执行 async.series([ //先执行请求钩子 function invokeRequestHooks(callback) { if (!http || !handler.requestHooks.length) return callback(); async.eachSeries(handler.requestHooks, function (hook, callback) { var __profile = self.profile(hook.id, req); hook.plugin.reqhook(self, req, http.req, http.res, function (err) { __profile(err); callback(err); }); }, callback); }, //进入其他插件的流转 function handleRequest(callback) { handler[_method](self, req, function (status, buffer, headers) { headers = headers || { }; headers['X-Powered-By'] = HEADER_XPOWEREDBY; if (status === 200) { headers['Cache-Control'] = 'max-age=60'; } result = { status: status, buffer: buffer, headers: headers }; callback(); }); }, //执行应答钩子 function invokeResponseHooks(callback) { if (!http || !handler.responseHooks.length) return callback(); async.eachSeries(handler.responseHooks, function (hook, callback) { var __profile = self.profile(hook.id, req); hook.plugin.reshook(self, req, http.req, http.res, result, function (err) { __profile(err); callback(err); }); }, callback); }, function finalizeResult(callback) { if (!result.headers) result.headers = { }; if (result.status !== 204) { result.headers['Content-Length'] = result.buffer ? result.buffer.length : 0; } // head request support if (req.method === 'HEAD') result.buffer = new Buffer([]); callback(); } ], function (err) { //构建错误的应答体 if (err) { var buffer = new Buffer(String(err.message || err), 'utf8'); log.error(null, 'Failed to serve ' + req.filename + ' for "' + req.layer + '" layer {x:' + req.x + ',y:' + req.y + ',z:' + req.z + '}'); log.error(null, err); callback(500, buffer, { 'Cache-Control': 'no-cache, no-store, must-revalidate', 'Pragma': 'no-cache', 'Expires': '0', 'X-Powered-By': HEADER_XPOWEREDBY, 'Content-Length': buffer.length }); } else { callback(result.status, result.buffer, result.headers); } });};
上面的处理流程单独把请求钩子插件和应答钩子插件独立于其他插件提出来,是因为这两个钩子不像其他插件一样具有互斥性(比如缓存插件与提供器插件),对于所有请求都应该执行,因此独立提取出来.
###指标检查 除了独立的健康检测,Tilestrata有profile机制,记录遇到的错误数和响应速度等运行指标 定义:TileServer.prototype.profile = function (plugin_id, req) { if (!PROFILING_ENABLED) return noop; var self = this; //记录调用的时间 var start = Date.now(); //处理完的回调 return function (err, _data) { //时间间隔 var dur = Date.now() - start; //生成标识key,其一般形式是'图层名::扩展名::插件id::放大级别' var filenameKey = req.hasFilename ? req.filename : '*' + req.filename.substring(1); var key = [req.layer, filenameKey, plugin_id, 'z' + req.z].join('::'); if (!self.profiles[key]) self.profiles[key] = { }; var data = self.profiles[key]; //记录内容:错误数,全部请求数,最大处理时长,最短处理时长,平均处理时长 data.errors = (data.errors || 0) + (err ? 1 : 0); data.dur_samples = (data.dur_samples || 0) + 1; data.dur_max = typeof data.dur_max === 'undefined' ? dur : Math.max(dur, data.dur_max); data.dur_min = typeof data.dur_min === 'undefined' ? dur : Math.min(dur, data.dur_min); data.dur_avg = ((data.dur_avg || 0) * (data.dur_samples - 1) + dur) / data.dur_samples; //需要统计所处理数据情况 if (_data) { if (typeof _data.hit === 'boolean') { data.hits = (data.hits || 0) + (_data.hit ? 1 : 0); data.misses = (data.misses || 0) + (_data.hit ? 0 : 1); } if (typeof _data.size === 'number') { data.size_samples = (data.size_samples || 0) + 1; data.size_max = typeof data.size_max === 'undefined' ? _data.size : Math.max(_data.size, data.size_max); data.size_min = typeof data.size_min === 'undefined' ? _data.size : Math.min(_data.size, data.size_min); data.size_avg = ((data.size_avg || 0) * (data.size_samples - 1) + _data.size) / data.size_samples; } } };};
调用:
//构建函数var __profile = self.profile(hook.id, req);hook.plugin.reqhook(self, req, http.req, http.res, function (err) { //调用 __profile(err); callback(err);});
Tilestrata的所有插件的运行状况都会经由profile方法的统计,在内存中维持一个字典对象.当需要查看时,可以调用Tileserver的getProfileData方法,将其转换成json对象,便于查看.这个方法很简单,只是将字符串形式的key结构化成对象,在此不再赘述.
状况统计也算是整个Tilestrata的重要功能了,不过调用的时间计数未采用高精度时间,或许误差会大一些,当然可能是为了性能做过考虑.这个模块主要负责检测,提取请求体中的参数.构建一个可信的标准的请求对象.
图层,也就是一级路由.其他诸如二级路由以及所有的插件,都是挂载在特定的图层下.
Tilelayer实现了两个功能:范围预判和接受插件挂载. 首先来看范围预判,这是一个地理服务的常用优化技巧:var TileLayer = module.exports = function(name, options) { //非法文件名检测 if (!name) throw new Error('The layer must be given name'); if (!/^[a-zA-Z0-9_\-]+$/.test(name)) { throw new Error('Invalid layer name "' + name + '" (must match /^[a-zA-Z0-9_\-]+$)'); } this.options = options || { }; this.name = name; this.routes = { }; //范围检测,对于规定了范围的图层,对于越界的坐标请求,直接pass var bbox = this.options.bbox; if (bbox) { if (Array.isArray(bbox[0])) { var bbox_count = bbox.length; this._isInBounds = function(req) { var req_bbox = tilebelt.tileToBBOX([req.x,req.y,req.z]); for (var i = 0; i < bbox_count; i++) { if (intersect(req_bbox, bbox[i])) return true; } return false; }; } else { this._isInBounds = function(req) { var req_bbox = tilebelt.tileToBBOX([req.x,req.y,req.z]); return intersect(req_bbox, bbox); }; } }};
一个图层可以挂载若干个文件名.如挂载png格式,jpg格式或其他格式,每种格式都有自己的插件.
这种模式与传统的中间件模式是不同的,它的中间件相当于是针对于特定的二级路由的中间件,而不是对所有请求一视同仁的.TileLayer.prototype.route = function(filename, options) { if (this.routes.hasOwnProperty(filename)) { if (options) throw new Error('Cannot pass options when layer is already registered'); return this.routes[filename].handler; } var handler = new TileRequestHandler(options); handler.layer = this.layer; //将route的this绑定到改图层上,用于链式调用 handler.route = this.route.bind(this); //将指定的文件名(路由)绑定到本layer上 this.routes[filename] = { filename: filename, handler: handler }; return handler;};
上面Tileserver里只实现了怎么处理请求,以及请求数据如何在插件里流转,但这些插件挂载到对应的图层上的实现,却是在本模块.
先来看挂载的例程strata.layer('basemap').route('tile@2x.png').use(disk.cache({ dir: '/var/lib/tiles/basemap'})) .use(mapnik({ pathname: '/path/to/map.xml',tileSize: 512,scale: 2}))
可以看出,插件挂载到对应的图层用的是use的链式调用方法:
TileRequestHandler.prototype.use = function (plugin) { //可以传入插件数组 if (Array.isArray(plugin)) { plugin.map(this.use.bind(this)); return this; } //对不同类型插件的注册 //值得注意的是,虽然Tilestrata可以任意添加插件,但插件的种类已经限定死了是这5种 if (plugin.get) return this._registerCache(plugin); if (plugin.serve) return this._registerProvider(plugin); if (plugin.transform) return this._registerTransform(plugin); if (plugin.reqhook) return this._registerRequestHook(plugin); if (plugin.reshook) return this._registerResponseHook(plugin); throw new Error('Invalid plugin');};//这里选择其中的provider插件为例,其他的插件都大同小异.TileRequestHandler.prototype._registerProvider = function (plugin) { if (!plugin) throw new Error('Falsy value passed to registerProvider()'); if (this.provider) throw new Error('There is a provider already registered to this layer'); if (typeof plugin.serve !== 'function') throw new Error('Attempted to register a provider with no serve() method'); //一种文件类型只会有一个提供器,因此这里的ID是固定的,对于可以在一个文件类型下存在多个的其他插件,这里的id就会是动态的 //如缓存的id就是 'cache#' + (this.caches.length); this.provider = { id: 'provider#0', plugin: plugin }; return this;};
在Tileserver.js的初始化中,插件是这样调用的:
route.handler._initialize(self, callback);
这个handle的初始化对应的就是如下的函数:
TileRequestHandler.prototype._invokeLifecycleMethod = function (server, plugin_method, skip_errors, callback) { var last_error; //回调函数包装器,可以控制初始化过程中的错误日志是否打印日志,同时利用闭包记录最后一个错误 var wrapcb = function (callback) { return function (err) { if (err && skip_errors) { last_error = err; log.warn('plugin', 'Failed to close: "' + err.message + '"'); return callback(err); } return callback(err); }; }; var self = this; //因为没有先后顺序,所以使用并行的流程控制 //只保留了provider和cache的初始化,其他的大同小异 async.parallel([ function invokeProvider(callback) { if (!self.provider || !self.provider.plugin[plugin_method]) return callback(); //约定插件拥有初始化函数,并接收传入server和初始化完成回调 //provider插件只能有一个 self.provider.plugin[plugin_method](server, wrapcb(callback)); }, function invokeCaches(callback) { if (!self.caches.length) return callback(); //cache插件可以有若干个,每一个都需要初始化 async.map(self.caches, function (cache, callback) { if (!cache.plugin[plugin_method]) return setImmediate(callback); cache.plugin[plugin_method](server, wrapcb(callback)); }, callback); } ], function (err) { //将回调插入当前事件循环的最后,确保每个插件都真正的初始化结束 setImmediate(function () { callback(err || last_error || null); }); });};
一切准备妥当,就进入了整个Tilestrata的核心,具体处理请求.
在TileServer模块中,我们看到它将流转过程中的请求钩子和应答钩子提出来做了单独处理,将其他插件对请求的处理夹在了中间:function invokeRequestHooks(callback) { ... },function handleRequest(callback) { handler[_method](self, req, function (status, buffer, headers) { headers = headers || { }; headers['X-Powered-By'] = HEADER_XPOWEREDBY; if (status === 200) { headers['Cache-Control'] = 'max-age=60'; } result = { status: status, buffer: buffer, headers: headers }; callback(); });},function invokeResponseHooks(callback) { ...}
在Tilestrata中,我们发现它其实只对GET方法做了默认响应(上面的_method一般是’GET’):
TileRequestHandler.prototype['GET'] = function (server, req, callback) { var self = this; //用once包装回调,避免多次对同一个请求的重复处理 var done = _.once(callback); var renderedHeaders; var renderedBuffer; var cacheHeaders; var backgroundRefresh = false; var waitForCache = req.headers.hasOwnProperty('x-Tilestrata-cachewait'); // 请求在插件中流转的流程 // 1) 从缓存中获取 // 2) 如果需要的话从提供者处生成 // 3) 执行转换 // 4) 执行回调 // 5) 如果需要的的话存入缓存 async.series([ function step_requestCache(callback) { var cacheHitBuffer; var cacheHitHeaders; if (!self.caches.length) return callback(); //如果请求头带有"x-Tilestrata-skipcache"则进行跳过缓存检测 //如果请求头中包含是'*',则对所有请求都不从缓存获取 //如果请求体头中是"[layer]/[file],[layer]/[file]'这种形式,则看是否与当前请求匹配 //匹配就跳过缓存 if (req.headers.hasOwnProperty('x-Tilestrata-skipcache')) { var list = req.headers['x-Tilestrata-skipcache']; if (list === '*') return callback(); var key = req.layer + '/' + req.filename; if (list.split(',').indexOf(key) > -1) { return callback(); } } //从缓存中获取对应瓦片 function fetcher(cache, callback) { var __profile = server.profile(cache.id + '.get', req); cache.plugin.get(server, req, function (err, buffer, headers, refresh) { cacheHitBuffer = buffer; cacheHitHeaders = headers; if (!err && buffer) { backgroundRefresh = refresh; //记录响应效率 __profile(null, { hit: true, size: buffer.length }); return callback('HIT'); } else { __profile(null, { hit: false }); callback();//忽略缓存获取时的错误 } }); } function complete() { if (cacheHitBuffer) { var headers = cacheHitHeaders || { }; headers['X-TileStrata-CacheHit'] = '1'; done(200, cacheHitBuffer, headers); //缓存刷新:虽然找到了缓存,但依旧进行后续的流程,相当于更新缓存 if (backgroundRefresh) callback(); //不刷新的话就不执行这个回调,async.series不再继续 } else { //未找到就进入后续流程 callback(); } } //缓存策略可以是并行获取或串行获取.并行的适用于多个相似的缓存器,谁最快就采用谁的 //串行适合于有优先级的缓存,一个找不到再寻找优先级别靠后的 if (self.cacheFetchMode === 'race') { async.each(self.caches, fetcher, complete); } else { async.eachSeries(self.caches, fetcher, complete); } }, //进入提供器插件,获取瓦片 function step_runProvider(callback) { if (!self.provider) { return done(404, BUFFER_NOPROVIDER, { }); } var __profile = server.profile(self.provider.id, req); self.provider.plugin.serve(server, req, function (err, buffer, headers) { if (err) { __profile(err); return done(err.statusCode || 500, new Buffer(err.message || err, 'utf8'), { }); } renderedHeaders = headers || { }; renderedBuffer = buffer; __profile(null, { size: buffer.length }); //这里我觉得直接执行callback()也没什么问题 setImmediate(callback); }); }, //进入转换插件 function step_applyTransforms(callback) { if (!self.transforms.length) return callback(); async.eachSeries(self.transforms, function (transform, callback) { var __profile = server.profile(transform.id, req); transform.plugin.transform(server, req, renderedBuffer, renderedHeaders, function (err, buffer, headers) { if (err) { __profile(err); return callback(err); } renderedBuffer = buffer; renderedHeaders = headers || { }; __profile(); callback(); }); }, function (err) { if (err) return done(500, new Buffer(err.message || err, 'utf8'), { }); callback(); }); }, function step_serveResult(callback) { renderedHeaders = renderedHeaders || { }; //因为应答头可能会被应答钩子插件修改,所以在缓存前需要创造一份拷贝体存入缓存 cacheHeaders = _.clone(renderedHeaders); //可以不等待缓存完毕直接返回结果 if (!waitForCache) done(200, renderedBuffer, renderedHeaders); callback(); }, //将瓦片写入缓存 function step_cacheResult(callback) { async.map(self.caches, function (cache, callback) { var __profile = server.profile(cache.id + '.set', req); cache.plugin.set(server, req, renderedBuffer, cacheHeaders, function (err) { __profile(err); //不让其中一个缓存写入出错影响其他的 callback(); }); }, function () { if (waitForCache) done(200, renderedBuffer, renderedHeaders); renderedBuffer = null; renderedHeaders = null; cacheHeaders = null; }); } ], function (err) { });};
然而这个get方法并不止这些,它其实是另有玄机的.在初始化整个TileRequestHandler时,我们可以看到对get函数进行了装饰:
var TileRequestHandler = module.exports = function (options) { options = options || { }; this.provider = null; this.caches = []; this.transforms = []; this.requestHooks = []; this.responseHooks = []; this.cacheFetchMode = options.cacheFetchMode || 'sequential'; //包装Get方法 this.GET = requestBatcher(this.GET);};
requestBatcher方法实现了一个非常有用的功能,类似于前端的’防抖’,处理极短时间内接收到的的相同请求:
function requestBatcher(fn) { var callbacks = { }; //包装Get方法,其接收的参数等同于GET方法 return function (server, req, callback) { var self = this; //获取一个KEY var key = [req.layer, req.z, req.x, req.y, req.filename, req.qs || '', req.headers && req.headers['x-Tilestrata-skipcache']].join(':'); //如果已经存在了相同的请求,就将回调放入等待数组中 if (callbacks.hasOwnProperty(key)) return callbacks[key].push(callback); //记录最初的请求 callbacks[key] = [callback]; fn.apply(self, [server, req, function () { var cbs = callbacks[key]; //清除这次防抖缓存 delete callbacks[key]; //当最初的请求执行完毕,开始执行回调时,等待数组中的回调共享最初请求的成果,一并执行 for (var i = 0, n = cbs.length; i < n; i++) { cbs[i].apply(null, arguments); } }]); };}
这个实现的核心就是,在一次请求时间内,对额外的相同请求共享计算成果减少不必要计算.
这个防抖的有效时间就是一次请求的处理时间,可能只有几百毫秒,但在极端情况下可以减少很多不必要的计算.
回到一开始提的几个思考:
首先是Tilestrata的插件架构.可以看出,插件本身对于Tilestrata来说地位是很低的.约定好了输入以及回调输出,其实根本上相当于一个黑盒函数.所有的流程控制,加载顺序,使用与否都交由Tilestrata本身来调度. 在某种意义上,这种插件是很不灵活的:限定了功能,限定了输入变量(传入server变量),限定了输出(特定格式的回调函数). 但是这种不灵活却是十分符合Tilestrata定位的:它是一个地图瓦片服务,而不是一个通用型的基础框架,它的目的是尽可能把跟地图瓦片相关的功能做完善而不是尽可能便于用户扩展. 同时,这种黑盒式的插件模式也让插件的编写变得简单多了. 第二,一个地图瓦片服务器究竟需要哪些要素,Tilestrata已经为我们展示了最基本的:其实因为web墨卡托坐标系成为通用的坐标系,在大多数情况下坐标转换没有存在的必要.
Tilestrata给我的很大感触就是细节的完善,从指标记录,到防抖,都是锦上添花的功能点,却能感觉到这是一个经过实际考验的商业级系统.尽管整个项目距离现在已经有2年的时间,许多es6或异步的高级特性没有用上,但整个系统的优雅与严谨还是十分值得学习的.
转载地址:http://fgqws.baihongyu.com/