Node.js高级编程

2022-03-08  本文已影响0人  lowpoint

Nodejs可以做什么

nodejs架构.png

Natives modules

Nodejs更适用于IO密集型高并发请求

Nodejs异步IO

Nodejs单线程(主线程是单线程)

使用JS实现高效可伸缩的高性能Web服务

Nodejs应用场景

Nodejs实现Api服务

//server.ts
import express from 'express'
import { DataStore } from './data'

const app = express()

app.get('/', (req, res) => {
  // res.end('ddd')
  res.json(DataStore.list)
})

app.listen(8080, () => {
  console.log('服务开启')
})

Nodejs全局对象

process.memoryUsage() //内存使用情况
process.cpuUsage() //cpu使用情况
process.cwd() //当前工作目录
process.versions //node信息
process.arch //操作系统信息
process.env //当前node环境
process.env.PATH //当前配置的系统环境变量
process.env,USERPROFILE //管理员目录 mac下(HOME)
process.platform //系统平台
process.argv //启动参数
process.pid //程序运行id
process.uptime() //运行消耗时间

//事件
process.on('beforeExit',(code)=>{
  //即将退出
})

process.on('exit',(code)=>{
  //退出 只能执行同步代码
})

process.exit() //程序主动退出

process.stdout //标准输出
process.stdin //标准输入

path模块

常用api

全局变量Buffer

Buffer缓冲区,Buffer让JavaScript 可以操作二进制
IO行为操作的就是二进制数据
流操作配合管道实现数据分段传输
Nodejs中Buffer是一片内存空间

自定义Buffer之split
实现对Buffer拆分

ArrayBuffer.prototype.split = function (sep) {
  let len = Buffer.from(sep).length
  let ret = []
  let start = 0
  let offset = 0

  while (offset = this.indexOf(sep, start) !== -1) {
    ret.push(this.slice(start, offset))
    start = offset + len
  }
  ret.push(this.slice(start))
  return ret
}
let buf = '吃面,吃米,吃肉'
let arr = buf.split('吃')

FS

FS是内置的核心模块,提供文件系统操作的API

fs模块.png
权限位,标识符,文件描述符

文件读写与拷贝操作
文件操作API

const fs = require('fs')
const path = require('path')

fs.readFile(path.resolve('package.json'), 'utf8', (err, data) => {
  console.log(data)
})
fs.writeFile('data.txt', 'abcdefg', (err) => {

})

文件打开与关闭
readFile,writeFile将文件一次性读取与写入,对于大体积的文件不合理。需要一种边读边写的操作方式

const fs = require('fs')
const path = require('path')

fs.open(path.resolve('data.txt'), 'r', (err, fd) => {
  console.log(fd) //3
  fs.close(fd, err => {
    console.log('关闭成功')
  })
})

大文件读写操作

const fs = require('fs')
const path = require('path')

//read:读操作就是将数据从磁盘文件中写入到 buffer 中
let buf = Buffer.alloc(10)

fs.open('data.txt', 'r', (err, rfd) => {
  //read 将磁盘文件写入缓冲区
  /**
   * fs.read(fd,buf,offset,len,pos,cb)
   * fd 定位当前被打开的文件
   * buf 用于表示当前缓冲区
   * offset 从buffer哪一个位置开始写
   * len 当前次写入长度
   * pos 当前从文件的哪个位置开始读
   */
  fs.read(rfd, buf, 0, 3, 0, (err, readBytes, data) => {

  })

  //write 将缓冲区内容写入磁盘文件中
  /**
   * fs.write(fd,buf,offset,len,pos,cb)
   * fd 当前操作的文件
   * buf 用于表示当前缓冲区
   * offset 从buffer哪一个位置开始读数据
   * len 当前次写入长度
   * pos 当前从文件的哪个位置开始写
   */
  fs.write(rfd,buf,0,3,0,(err,written,buffer)=>{
    fs.close(rfd)
  })

})

文件拷贝自定义实现

const fs = require('fs')

//将a.txt 写入 b.txt中
// 打开 a 文件 利用read将数据保存在buffer中
// 打开 b 文件 用write将buffer中数据写入b文件中
let buf = Buffer.alloc(10)

fs.open('a.txt', 'r', (err, rfd) => {
  fs.open('b.txt', 'w', (err, wfd) => {
    fs.read(rfd, buf, 0, 10, 0, (err, readBytes, data) => {
      fs.write(wfd, buf, 0, 10, 0, (werr, written) => {
        console.log('写入成功')
        // fs.close(rfd)
        // fs.close(wfd)
      })
    })
  })
})

优化处理

const fs = require('fs')

//将a.txt 写入 b.txt中
// 打开 a 文件 利用read将数据保存在buffer中
// 打开 b 文件 用write将buffer中数据写入b文件中
let buf = Buffer.alloc(10)
const bufSize = buf.length
let readOffset = 0

fs.open('a.txt', 'r', (err, rfd) => {
  fs.open('b.txt', 'w', (err, wfd) => {
    function next() {
      fs.read(rfd, buf, 0, bufSize, readOffset, (err, readBytes, data) => {
        if (!readBytes) {
          //内容读取完
          fs.close(rfd, () => { })
          fs.close(wfd, () => { })
          return
        }
        readOffset += readBytes
        fs.write(wfd, buf, 0, readBytes, (werr, written) => {
          next()
        })
      })
    }
    next()
  })
})

FS目录操作API(异步)

创建目录(同步)

const fs = require('fs')
const path = require('path')

//接收a/b/c路径格式,使用/将路径拆分,放入数组['a','b','c']
//对数组遍历,拿到每一项,与前一项拼接 / 
//判断一个当前对拼接后的路径是否具有可操作的权限,有则存在,没有则创建
function mkdirSync(dirpath) {
  let pathArr = dirpath.split('/')
  for (let i = 1; i <= pathArr.length; i++) {
    let dir = pathArr.slice(0, i).join(path.sep)
    try {
      fs.accessSync(dir)
    } catch (error) {
      fs.mkdirSync(dir)
    }
  }
}
mkdirSync('a/b/c')

创建目录(异步)

const fs = require('fs')
const path = require('path')

function mkDir(dirPath, cb) {
  let pathArr = dirPath.split('/')
  let index = 1

  function next() {
    if (index > pathArr.length) return cb && cb()
    let current = pathArr.slice(0, index++).join('/')
    fs.access(current, (err) => {
      if (err) {
        fs.mkdir(current, next)
      } else {
        next()
      }
    })
  }
  next()
}

mkDir('a/b/c', () => {
  console.log('创建成功')
})

promise写法

const fs = require('fs')
const path = require('path')
const { promisify } = require('util')

const access = promisify(fs.access)
const mkdir = promisify(fs.mkdir)

async function mkDir(dirPath, cb) {
  let pathArr = dirPath.split('/')
  for (let index = 1; index <= pathArr.length; index++) {
    let current = pathArr.slice(0, index).join('/')
    try {
      await access(current)
    } catch (error) {
      await mkdir(current)
    }
  }

  cb && cb()
}

mkDir('a/b/c', () => {
  console.log('success')
})

目录删除异步实现

const fs = require('fs')
const path = require('path')

//定义一个函数,接收一个路径,删除操作
//判断当前路径是否为一个文件,是则直接删除当前文件
//如果传入的是一个目录 则需继续读取目录中内容 然后执行删除文件操作
function removeDir(dirPath, cb) {
  //判断dirPath类型
  fs.stat(dirPath, (err, statObj) => {
    //判断是否是目录
    if (statObj.isDirectory()) {
      fs.readdir(dirPath, (derr, files) => {
        let dirs = files.map(item => {
          return path.join(dirPath, item)
        })
        let index = 0
        function next() {
          if (index === dirs.length) return fs.rmdir(dirPath, cb)
          let current = dirs[index++]
          removeDir(current, next)
        }
        next()
      })
    } else {
      fs.unlink(dirPath, cb)
    }
  })
}
removeDir('a', () => {
  console.log('delete all')
})

CommonJS规范

CommonJS规范主要应用于Nodejs
CommonJS规范起初是为了弥补JS语言模块化缺陷
CommonJS规范定义模块的加载是同步完成

Nodejs与CommonJS

模块分类与加载流程

模块分类

加载流程

缓存优先原则

核心模块Events

nodejs事件管理 通过EventEmitter类实现事件统一管理

events与EventEmitter

const EventEmitter = require('events')

const ev = new EventEmitter()

ev.on('test', () => {
  console.log('1')
})

ev.on('test', () => {
  console.log('2')
})

ev.emit('test') //1 2

ev.once('one', () => {
  
})

ev.off('one',()=>{})

ev.on('args',(...args)=>{
  console.log(args)
})
ev.emit('args','1','2') //[1,2]

发布订阅模式

定义对象间一对多的依赖关系


发布订阅.png

订阅者将想要订阅的事件监听注册在调度中心,事件被触发的时候,发布者将事件发布在调度中心,之后调度中心统一调度之前订阅者注册的事件

发布订阅要素

class Pubsub {
  constructor() {
    this._events = {}
  }
  //订阅
  subscribe(event, cb) {
    if (this._events[event]) {//已经存在 添加监听操作
      this._events[event].push(cb)
    } else {
      this._events[event] = [cb]
    }
  }
  //发布
  publish(event, ...args) {
    const evns = this._events[event]
    if (evns && evns.length) {
      evns.forEach(cb => cb(...args))
    }
  }
}

let pub = new Pubsub()
pub.subscribe('test', (a, b) => {
  console.log(a, b)
})
pub.subscribe('test', (a, b) => {
  console.log('2', a, b)
})
pub.publish('test', 1, 2)

EventEmitter类模拟实现


function MyEvent() {
  //缓存订阅者信息
  this._events = Object.create(null)
}

MyEvent.prototype.on = function (event, cb) {
  //判断当前事件是否存在
  if (this._events[event]) {
    this._events[event].push(cb)
  } else {
    this._events[event] = [cb]
  }
}

MyEvent.prototype.emit = function (event, ...args) {
  let emits = this._events[event]
  if (emits && emits.length) {
    emits.forEach(cb => cb(...args))
  }
}

MyEvent.prototype.off = function (event, cb) {
  let events = this._events[event]
  if (events) {
    this._events = events.filters(item => item !== cb && item.link !== cb)
  }
}

MyEvent.prototype.once = function (event, cb) {
  let foo = function (...args) {
    cb.call(this, ...args)
    this.off(event, foo)
  }
  foo.link = cb
  this.on(event, foo)
}

let ev = new MyEvent()
let fn = function (data, d) {
  console.log('1', data, d)
}
ev.on('test', fn)
ev.on('test', (data) => {
  console.log('2', data)
})
ev.emit('test', '参数', '参数2')

浏览器中的Eventloop

完整事件环执行顺序

Nodejs事件循环机制

在浏览器下有两个任务队列,宏任务,微任务
在nodejs中有六个事件队列。

  1. timers:执行setTimeout与setInterval回调

  2. pending callbacks:执行系统操作的回调,例如tcp udp

  3. idle,prepare:只在系统内部进行使用

  4. poll:执行I/O相关的回调

  5. check:执行setImmediate中的回调

  6. close callbacks:执行close事件的回调

    定时器:本阶段执行已经被 setTimeout() 和 setInterval() 的调度回调函数。
    待定回调:执行延迟到下一个循环迭代的 I/O 回调。
    idle, prepare:仅系统内部使用。
    轮询:检索新的 I/O 事件;执行与 I/O 相关的回调(几乎所有情况下,除了关闭的回调函数,那些由计时器和 setImmediate() 调度的之外),其余情况 node 将在适当的时候在此阻塞。
    检测:setImmediate() 回调函数在这里执行。
    关闭的回调函数:一些关闭的回调函数,如:socket.on('close', ...)。

nodejs事件循环机制.png
Nodejs完整事件环

Node与浏览器事件环不同

Stream模块

Nodejs诞生之初就是为了提高IO性能
文件操作系统和网络模块实现了流接口
Nodejs中的流就是处理流式数据的抽象接口
流处理数据的优势

Nodejs内置了stream,它实现了流操作对象

nodejs中流的分类

const { Readable } = require('stream')

//自定义类 继承Readable
class MyRead extends Readable {
  constructor(source) {
    super()
    this.source = source
  }
  _read() {
    let data = this.source.shift() || null
    this.push(data)
  }
}

//模拟底层数据
let source = ['one', 'two', 'three']
let mr = new MyRead(source)

//手动调用read()方式
mr.on('readable', () => {
  let data = null
  //需要主动调用read从中读取数据 传参 每次读几个字节 当读到null时说明读取完毕
  while ((data = mr.read(2)) !== null) {
    console.log(data.toString())
  }
})

//流动模式
mr.on('data', (chunk) => {
  console.log(chunk.toString())
})

可写流
用于消费数据的流

const fs = require('fs')

//创建一个可读流
let rs = fs.createReadStream('data.txt')
rs.setEncoding('utf-8')

//创建一个可写流
let ws = fs.createWriteStream('data1.txt')

//监听事件调用方法完成具体数据消费
rs.on('data', chunk => {
  //数据写入
  ws.write(chunk)
})

自定义可写流

const { Writable } = require('stream')

class MyWrite extends Writable {
  constructor() {
    super()
  }
  _write(chunk, en, done) {//chunk:数据 en:设置编码集 done:回调
    process.stdout.write(chunk.toString() + '->')
    process.nextTick(done)//在write执行完后执行回调
  }
}

let mw = new MyWrite()

//手动调用write()方式
mw.write('test', 'utf-8', () => {
  console.log('done')
})

可写流数据

Duplex&&Tranform

自定义双工流

自定义转换流(中间可以对数据进行转换操作,可读与可写流之间是打通的)

//自定义双工流
const { Duplex } = require('stream')

class MyDuplex extends Duplex {
  constructor(source) {
    super()
    this.source = source
  }
  _read() {
    let data = this.source.shift() || null
    this.push(data)
  }
  _write(chunk, en, next) {
    process.stdout.write(chunk)
    process.nextTick(next)
  }
}

let source = ['1', '2', '3']
let md = new MyDuplex(source)

md.on('data', chunk => {
  console.log(chunk.toString())
})
md.write('sss', () => {
  console.log('done')
})
//自定义转换流
const { Transform } = require('stream')

class MyTransform extends Transform {
  constructor() {
    super()
  }
  _transform(chunk, en, cb) {
    this.push(chunk.toString().toUpperCase())
    cb(null)
  }
}

let mt = new MyTransform()

mt.write('a')
mt.on('data', chunk => {
  console.log(chunk.toString()) //A
})

文件可读流创建与消费

const fs = require('fs')

let rs = fs.createReadStream('data.txt', {
  flags: 'r',//方式 r:可读方式
  encoding: null,//数据流格式默认buffer
  fd: null,//文件标识符 默认3开始 0,1,2(标准输入,输出,错误占用)
  mode: 438,//权限位 十进制438
  autoClose: true,//自动关闭文件
  start: 0,//默认从哪个位置读取
  // end: 3,//文件结束位置
  highWaterMark: 4,//每次最多读多少字节数据 每次开辟的缓存区大小
})

//data方式
rs.on('data', chunk => {
  console.log(chunk.toString())
  rs.pause()//暂停模式
  rs.resume()//再次打开
})

//readable方式
rs.on('readable', () => {
  let data
  while ((data = rs.read(1)) !== null) {
    console.log(data.toString())
    // console.log('::::', rs._readableState.length)
  }
})

//可读流一些事件
//监听文件是否被打开 调用createReadStream便触发
rs.on('open',fd => { 
  //fd 当前文件操作符
})

//文件关闭 数据被消费完成后触发 在end后
rs.on('close',() => {

})

//数据被清空之后触发 在close之前
rs.on('end',() => {
  
})

//出错触发
rs.on('error',(err) => {
  
})

文件可写流

const fs = require('fs')

const ws = fs.createWriteStream('test.txt', {
  flags: 'w',
  mode: 438,
  fd: null,
  encoding: 'utf-8',
  start: 0,
  highWaterMark: 3
})

//写入数据
ws.write('abcd', () => {
  console.log('done')
})

//createWriteStream 就触发
ws.on('open', (fd) => {

})

ws.end('end') //表示写操作已经完成 end之后不能执行写操作 否则报错 进入error事件 可传参 将参数数据也写入

//close 在数据写入操作全部完成之后执行 也就是在 ws.end() 触发后
ws.on('close', () => {

})

//写入错误触发
ws.on('error', (err) => {

})

Node.js背压机制

nodejs的stream已经实现了可以保证数据平滑流动的背压机制(pipe方法)
文件读取速度大于写入速度,缓存大小有限。可能会导致内存溢出,GC频繁调用,其它进程变慢


数据读操作.png
数据写操作.png
//背压机制简单实现
const fs = require('fs')

let rs = fs.createReadStream('data.txt',{
  highWaterMark:4
})

let ws = fs.createWriteStream('copy.txt',{
  highWaterMark:1
})

let flag = true
rs.on('data',chunk=>{
  flag = ws.write(chunk)
  if(!flag){ //flag为false则当前缓存超限
    rs.pause() //读入暂停
  }
})

ws.on('drain',()=>{
  rs.resume() //当前缓存读取完 再次打开读入
})

文件可读流实现

//文件可读流实现
const fs = require('fs')
const EventEmitter = require('events')

class MyFileReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super()
    this.path = path
    this.flags = options.flags || 'r'
    this.mode = options.mode || 438
    this.autoClose = options.autoClose || true
    this.start = options.start || 0
    this.end = options.end
    this.highWaterMark = options.highWaterMark || 64 * 1024
    this.readOffset = 0

    this.open()
    this.on('newListener', type => {
      if (type === 'data') {
        this.read()
      }
    })
  }

  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        this.emit('error', err)
      }
      this.fd = fd
      this.emit('open', fd)
    })
  }
  read() {
    if (typeof this.fd !== 'number') {
      return this.once('open', this.read) //为了先拿到读取的文件fd
    }

    let buf = Buffer.alloc(this.highWaterMark)

    let howMuchToRead = this.highWaterMark
    if (this.end) {
      howMuchToRead = Math.min(this.end - this.readOffset, this.highWaterMark)
    }

    fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
      if (readBytes) {
        this.readOffset += readBytes
        this.emit('data', buf.slice(0, readBytes))
        this.read()
      } else {
        this.emit('end')
        this.close()
      }
    })

  }
  close() {
    fs.close(this.fd, () => {
      this.emit('close')
    })
  }
}

let rs = new MyFileReadStream('test.txt', {
  end: 6,
  highWaterMark: 3
})

// rs.on('open', fd => {
.
//   console.log(fd)
// })

rs.on('data', chunk => {
  console.log(chunk)
})

文件可写流实现

writeStream文件可写流简易实现

pipe方法

const fs = require('fs')

const rs = fs.createReadStream('data.txt')
const ws = fs.createWriteStream('copy.txt')

rs.pipe(ws)

创建TCP通信

Net模块实现了底层通信接口
通信过程

//server.js
const net = require('net')

//创建服务端实例
const server = net.createServer()

const PORT = 1111
const HOST = 'localhost'

server.listen(PORT, HOST)

server.on('listening', () => {
  console.log(`服务开启  ${HOST}:${PORT}`)
})

//接收消息 回消息
server.on('connection', (socket) => {
  socket.on('data', (chunk) => {
    const msg = chunk.toString()
    console.log(msg)
    //回消息
    socket.write(Buffer.from('回复'))
  })
})

server.on('close', () => {
  console.log('服务关闭')
})

server.on('error', (err) => {
  console.log(err)
})
//client.js
const net = require('net')

const client = net.createConnection({
  port: 1111,
  host: '127.0.0.1'
})

client.on('connect', () => {
  client.write('发送数据')
})

client.on('data', (chunk) => {
  console.log('接收数据:', chunk.toString())
})

client.on('error', (err) => {
  console.log('client error', err)
})

client.on('close', () => {
  console.log('client clsoe')
})

http协议

const http = require('http')
const url = require('url') //处理url路径

//创建服务端
let server = http.createServer((req, res) => {
  //请求路径获取
  let { pathname, query } = url.parse(req.url, true)
  //请求方式获取
  let method = req.method
  //版本号获取
  let httpVersion = req.httpVersion

  //请求头
  let hearders = req.headers

  //请求体获取 请求体是一个数据流 按照数据流处理
  let arr = []
  req.on('data', data => {
    arr.push(data)
  })
  req.on('end', () => {
    let allData = Buffer.concat(arr).toString()
  })


  //设置响应
  res.statusCode = 302 //设置状态码
  res.setHeader('Content-type', 'text/html;charset=utf-8')//设置响应头
  res.end('返回数据')

})

server.listen(8080, () => {
  console.log('服务开启')
})

客户端代理

//server.js
const http = require('http')
const url = require('url')
const querystring = require('querystring')

const server = http.createServer((req, res) => {
  let { pathname, query } = url.parse(req.url)
  let headers = req.headers //请求头

  //post
  let arr = []
  req.on('data', (data) => {
    arr.push(data)
  })
  req.on('end', () => {
    let endData = Buffer.concat(arr).toString //post请求数据
    if (headers['Content-type'] == 'application/json') {
      let d = JSON.parse(endData)
      d.age = 40
      res.end(JSON.stringify(d))
    } else if (headers['Content-type'] == 'application/x-www-form-urlencoded') {
      let fromdata = querystring(endData)
      res.end(JSON.stringify(fromdata))
    }
  })
})

server.listen(1111, () => {

})
//client.js
const http = require('http')

// let options = {
//   host: 'localhost',
//   port: 1111,
//   path: '/?a=1'
// }
// http.get(options, (res) => {

// })
let options = {
  host: 'localhost',
  port: 1111,
  path: '/?a=1',
  method: 'POST',
  headers: {
    'Content-type': 'application/json' //设置数据格式 json
    // 'Content-type': 'application/x-www-form-urlencoded' //form表单格式
  }
}
let req = http.request(options, (res) => {
  let arr = []
  req.on('data', (data) => {
    arr.push(data)
  })
  req.on('end', () => {
    let endData = Buffer.concat(arr).toString //post请求数据

  })
})
// req.end('post请求数据') //字符串格式
req.end('{"name":"jay"}') //json格式
// req.end('name=jay&age=40') //form表单格式

http静态服务

http静态服务实现httpServer

上一篇 下一篇

猜你喜欢

热点阅读