威尼斯手机平台-电子正规官网登录首页

热门关键词: 威尼斯手机平台,威尼斯登录首页,威尼斯正规官网
威尼斯官网有类似 Go 语言的协程操作方式,(文/开源中国)    
分类:威尼斯官网

 Swoole 4.4 正式版已发布,该版本包含大量更新,详细信息如下:

什么是 Swoft ?

Swoft 是一款基于 Swoole 扩展实现的 PHP 微服务协程框架。Swoft 能像 Go 一样,内置协程网络服务器及常用的协程客户端且常驻内存,不依赖传统的 PHP-FPM。有类似 Go 语言的协程操作方式,有类似 Spring Cloud 框架灵活的注解、强大的全局依赖注入容器、完善的服务治理、灵活强大的 AOP、标准的 PSR 规范实现等等。

Swoft 通过长达三年的积累和方向的探索,把 Swoft 打造成 PHP 界的 Spring Cloud, 它是 PHP 高性能框架和微服务治理的最佳选择。

Netty笔记——技术点汇总,netty笔记汇总

向下不兼容改动

  • PHP官方保持一致, 不再支持PHP7.0 (@matyhtf)
  • 移除Serialize模块, 在单独的 ext-serialize 扩展中维护. 废弃原因: 由于PHP内核频繁变更, 导致无法实现稳定可用的模块, 与php serialize相比没有太大差异化定位
  • 移除PostgreSQL模块,在单独的 ext-postgresql 扩展中维护. 废弃原因: PostgreSQL使用了异步回调方式实现协程调度, 不符合目前内核协程化的统一规划。另外PostgreSQL目前用户量非常低, 并且缺少必要的单元测试, 无法保证质量
  • Runtime::enableCoroutine不再会自动兼容协程内外环境, 一旦开启, 则一切阻塞操作必须在协程内调用 (@matyhtf)
  • 由于引入了全新的协程MySQL客户端驱动, 底层设计更加规范, 但有一些小的向下不兼容的变化

    • fetch/nextResult优化为按需读取, 会产生IO调度
    • 启动defer特性时, statement发出的的请求, 需要使用statement->recv接收
    • 启动defer/fetch_mode特性时, 如有未接收完的数据, 将无法发起新的请求
    • 与异步不同, connected属性不再会实时基于事件更新, 而是在IO操作失败后更新

Swoft v2.0.7

2.0.7 在 2.0.6 上继续扬帆,已在大量的生产业务中使用,得到很多用户的肯定和支持。正式版本我们做了许多改进和优化,拥有了更好的性能。

  • 新增 Http Session 功能组件,提供http会话管理, 支持多种存储驱动
  • 增强 TCP server 请求支持添加全局或对应的方法中间件
  • 增强 Websocket server 消息请求支持添加全局或对应的方法中间件

目录

· Linux网络IO模型

    · 文件描述符

    · 阻塞IO模型

    · 非阻塞IO模型

    · IO复用模型

    · 信号驱动IO模型

    · 异步IO模型

· BIO编程

· 伪异步IO编程

· NIO编程

    · Buffer和Channel

    · 深入Buffer

    · Selector

· AIO编程

· 四种IO编程对比及选择Netty的原因

· Netty入门

    · 开发与部署

    · Hello World

· 粘包/拆包问题

    · 问题及其解决

    · LineBasedFrameDecoder

    · DelimiterBasedFrameDecoder

    · FixedLengthFrameDecoder

· Java序列化问题

    · 问题描述及其解决

· HTTP协议开发

    · Netty HTTP

    · 文件服务器

· WebSocket协议开发

    · 问题及其解决

    · 原理(过程)

    · 开发

· Netty架构

    · 逻辑架构

    · 高性能

    · 可靠性

    · 可定制性

    · 可扩展性

· 私有协议栈开发


 

废弃警告

  • 将废弃Buffer模块,废弃原因:可替代性强,使用率低,可用PHP字符串、fopen("memory")代替。
  • 将废弃Lock模块,废弃原因:在协程模式下加锁可能存在问题,可使用chan实现协程版本的锁
  • 由于引入了stream_socket_pair协程化, 建议开启hook时, 如有单独配置需求, 请使用SWOOLE_HOOK_STREAM_FUNCTION常量而不是SWOOLE_HOOK_STREAM_SELECT

Http Session

通过 Composer 安装 swoft/session 组件

  • 在项目 composer.json 所在目录执行 composer require swoft/session
  • 将 SwoftHttpSessionSessionMiddleware 中间件加入到全局中间件

在配置文件 app/bean.php 里:

    'httpDispatcher'    => [
        // Add global http middleware
        'middlewares'      => [
            SwoftHttpSessionSessionMiddleware::class,
        ],
    ],

默认是基于本地文件驱动,保存在 runtime/sessions 目录

更在驱动只需要配置对应 handler 类,例如配置 Redis 驱动:

'sessionHandler' => [
    'class'    => RedisHandler::class,
    // Config redis pool
    'redis' => bean('redis.pool')
],

Linux网络IO模型

新特性

  • 新增Library, 使用纯PHP编写内核功能而非C/C++, 提供了以下功能

    • 新增高质量PHP模块CoroutineWaitGroup (@twose)
    • 使用PHP代码实现CURL的hook, 一键使CURL协程化, 目前为实验特性, 需特别调用Runtime::enableCoroutine(SWOOLE_HOOK_CURL)来开启 (@matyhtf) (@Yurunsoft)
    • 使用PHP代码实现exec/shell_exec的协程化 (#2657) (@Yurunsoft)
    • 开启RuntimeHook时, 将替换函数array_walkarray_walk_recursive为swoole实现的版本, 解决原生函数不可重入的问题, 但会造成无法遍历object (@matyhtf) (@twose)
  • 新增协程抢占式调度器, 可防止协程占用CPU时间过长导致其它协程饿死, 通过php.ini配置swoole.enable_preemptive_scheduler = On 开启, 相关例子详见preemptive_scheduler (@shiguangqi)

  • 新增Timer::list()返回TimerIterator, 可遍历所有定时器, TimerclearAll清除所有定时器, Timerinfo(int $id)获取定时器信息, Timer::stats()获取全局定时器状态 (#2498) (@twose)
  • 新增 CoSocket的两个方法getOption 和 setOption (9d13c29) (@matyhtf)
  • 新增 ProcessPool$master_pid 属性和 shutdown方法 (a1d6eaa) (@matyhtf)
  • 新增ProcessPool的构造方法的第四个参数, 为true时底层将自动在onWorkerStart回调开启协程 (8ceb32cd) (@matyhtf)
  • 新增stream_socket_pair协程化支持 (#2546) (@matyhtf)
  • 新增HttpServerstatic_handler_locations设置, 可以设定静态文件路径 (@matyhtf)
  • 新增CoHttpClient->setBasciAuth方法, 用于自动发送Authorization头 (#2542) (@hongbshi)
  • 新增 CoHttp2Client->ping方法 (40041f6) (@shiguangqi)
  • 新增hook_flags配置项,用于取代Runtime::enableCoroutine()函数调用

Websocket消息中间件

  • 全局中间件

配置于 app/bean.php:

    /** @see SwoftWebSocketServerWsMessageDispatcher */
    'wsMsgDispatcher' => [
        'middlewares' => [
            AppWebSocketMiddlewareGlobalWsMiddleware::class
        ],
    ],
  • 作用于控制器的
/**
 * Class HomeController
 *
 * @WsController(middlewares={DemoMiddleware::class})
 */
class TestController
{}

文件描述符

  1. Linux内核将所有外部设备视为文件来操作。

  2. 对一个文件的读写操作会调用内核提供的系统命令,返回一个file descripter(fd,文件描述符)。

  3. 对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符)。

增强

  • 全新的协程MySQL客户端驱动, 底层全面协程化 (#2538) (@twose)

    • 底层使用C++和协程的编程模式(同步阻塞写法, 异步性能)
    • 支持SSL连接 (connect时配置 ['ssl' => true]即可, 暂不支持证书等配置)
    • 支持超大数据发送 (无上限, 底层自动拼包, 上限为MySQL服务器配置上限)
    • 支持超大数据接收
    • 支持fetch按行读取 (现在的fetch为按需读取, 未fetch的数据不会耗费用户内存) (#2106)
    • 支持nextResult按需读取 (同上)
    • 客户端close后, 客户端持有的statements自动转为不可用状态, 避免边界问题
    • 优化掉了一些不必要的内存拷贝(协议解析时)
    • date相关类型小数精度支持
    • 错误代码和信息与PDO/mysqli保持一致
  • CoRedis兼容模式, 通过$redis->set(['compatibility_mode' => true])开启, 可使得hmGet/hGetAll/zRange/zRevRange/zRangeByScore/zRevRangeByScore等方法返回结果和phpredis保持一致 (#2529) (@caohao-php)

  • 默认允许有100K个协程同时存在 (c69d320b) (@twose)
  • 支持bailout机制 (协程内发生致命错误时能正确退出进程) (#2579) (@twose)
  • Server发生错误时会根据情况展示友好的400/404/503界面而不是没有任何输出 (@matyhtf) (f3f2be9d)
  • Server默认开启异步安全重启特性和超大数据发送的自动协程调度功能 (#2555) (9d4a4c47) (@matyhtf)
  • ServeronFinish回调支持自动协程环境 (@twose)
  • Http客户端默认开启websocket_mask, 不再会出现莫名其妙连不上websocket的问题 (c02f4f85) (@twose)
  • 不再允许在协程外使用Channel的调度操作 (519b6043) (@twose)
  • WebSocket握手失败时切断连接 (#2510) (@twose)
  • Linux下父进程异常退出时底层会自动发送信号杀死子进程 (4b833a3d) (@matyhtf)
  • Socket->recv的数据长度不足时回收末尾无用的内存 (642a3552) (@twose)
  • 浮点数计算误差优化 (#2572) (@tangl163)
  • 所有内置类都 禁止克隆/禁止序列化/禁止删除底层定义的属性 (f9c974b8) (@twose)
  • Server->binduid超过UINT32_MAX时会产生警告并返回
  • 兼容PHP7.4 (#2506) (@twose)

TCP 请求中间件

  • 全局中间件

配置于 app/bean.php:

    /** @see SwoftTcpServerTcpDispatcher */
    'tcpDispatcher' => [
        'middlewares' => [
            AppTcpMiddlewareGlobalTcpMiddleware::class
        ],
    ],
  • 作用于控制器的
/**
 * Class DemoController
 *
 * @TcpController(middlewares={DemoMiddleware::class})
 */
class DemoController
{
    // ....
}

阻塞IO模型

  1. 最常用的IO模型。

  2. 默认的IO模型。

  3. 以socket接口为例说明阻塞IO模型。

威尼斯官网 1

修复

  • 修复ProcessPoolgetProcess问题 (#2522) (@matyhtf)
  • 修复某些特殊情况下异常被忽略的问题(VM陷入了事件循环而没有机会检查异常) (@twose)
  • 修复定时器在进程fork后产生的内存泄漏 (8f3abee7) (@twose)
  • 修复非Linux系统编译时timezone的问题 (#2584) (@devnexen)
  • 修复enable_coroutinetask_enable_coroutine一开一关的问题 (#2585) (@matyhtf)
  • 修复Http2的trailer方法不输出值为空的头 (#2578) (@twose)
  • 修复CoHttpClient->setCookies在特殊情况下的内存错误 (#2644) (@Yurunsoft)
  • 修复#2639 (#2656) (@mabu233)
  • 修复arginfo_swoole_process_pool_getProcess (#2658) (@mabu233)
  • 修复static_handler不支持软链接 (@matyhtf)
  • 修复OSX下卡死 (22504dd4) (@matyhtf)
  • 修复启用SSLtask进程使用Server->getClientInfo出错 (#2639) (@matyhtf)
  • 修复多协程操作同一个Socket的非法操作BUG (#2661) (@twose)

更多

  • GitHub: 

  • Gitee: 

  • 官网:https://www.swoft.org

  • 文档:

非阻塞IO模型

  1. 一般轮训检查内核数据是否就绪。

  2. 如果内核数据未就绪,则直接返回一个EWOULDBLOCK错误。

威尼斯官网 2

协程调度器?

  • 新增SwooleCoroutineScheduler调度器类作为cli命令行脚本的入口,取代go() + SwooleEvent::wait()的方式
  • 增加SwooleCoroutineRun函数,提供对SwooleCoroutineScheduler的封装
  • go() + SwooleEvent::wait()的运行方式可能被废除

更新记录

升级提示:

  • SwooleWebSocketServer::push 第四个参数 $finish 在 swoole 4.4.12 后改为了 int 类型。
  • tcp server 的 TcpServerEvent::CONNECT 事件参数保持跟receive, close一致。 $fd, $server 互换位置。

修复(Fixed)

  • 修复 config 注入时,没有找到值也会使用对应类型的默认值覆盖属性,导致属性默认值被覆盖 d84d50a7
  • 修复 ws server 中使用message调度时,没有过滤空数据,导致多发出一个响应。避免方法swoft-cloud/swoft#1002 d84d50a7
  • 修复 tcp server 中使用message调度时,没有过滤空数据,导致多发出一个响应。07a01ba1
  • 修复 独立使用console组件时缺少 swoft/stdlib 库依赖 c569c81a
  • 修复 ArrayHelper::get 传入key为 integer 时,报参数错误 a44dcad
  • 修复 console 渲染使用table,有int值时,计算宽度报类型错误 74a835ab
  • 修复 error 组件中用户无法自定义设置默认的错误处理级别 4c78aeb
  • 修复 启用和禁用 组件设置 isEnable() 不生效的问题 da8c51e56
  • 修复 在 cygwin 环境使用 uniqid() 方法必须将第二个参数设置为 true c7f688f
  • 修复 在 cygwin 环境不能够设置进程title而导致报错 c466f6a
  • 修复 使用 http response->delCookie() 无法删除浏览器的cookie数据问题 8eb9241
  • 修复 ws server消息调度时,接收到的ext数据不一定是数组导致报错 ff45b35
  • 修复 日志文件按时间拆分问题c195413
  • 修复 日志 JSON 格式小问题a3fc6b9
  • 修复 rpc 服务提供者 getList 调用两次问题fd03e71
  • 修复 redis cluster 不支持 auth 参数7a678f
  • 修复 模型查询 json 类型, 不支持 array 6023a9
  • 修复 redis multi 操作没有及时是否连接 e5f698
  • 修复 redis 不支持 expireAtgeoRadius 749241
  • 修复 crontab 时间戳检测偏差问题 eb08a46

更新(Update):

  • 更新 console 在渲染 help信息之前也会发出事件 ConsoleEvent::SHOW_HELP_BEFORE d3f7bc3
  • 简化和统一 http, ws, tcp, rpc server管理命令逻辑 f202c826
  • 更新 ws 和 tcp Connection类添加 newFromArray 和 toArray 方法,方便通过第三方存储(redis)时导出信息和恢复连接 a8b0b7c
  • 优化 server 添加统一的 swoole pipe message 事件处理,在 ws, tcp 中使用swoft事件来处理进程间消息 1c51a8c

增强(Enhancement)

  • 现在 tcp 请求支持添加全局或对应的方法中间件,流程和使用跟http中间件类似。仅当使用系统调度时有用 6b593877
  • 现在 websocket message 请求支持添加全局或对应的方法中间件,流程和使用跟http中间件类似。仅当使用系统调度时有用 9739815
  • 事件管理允许设置 destroyAfterFire 在每次事件调度后清理事件中携带的数据 50bf43d3
  • 数据库错误异常新增 code 返回fd306f4
  • 协程文件操作 writeFile 新增写失败异常08c4244
  • RPC 新增参数验证8646fc5

(文/开源中国)    

IO复用模型

1. Linux提供select/poll,进程传递一个或多个fd给select或poll系统调用,阻塞在select操作上,这样select/poll可以帮助进程同时检测多个fd是否就绪。

2. select/poll存在支持fd数量有限、线性轮训等问题,应采用基于事件驱动方式的epoll代替(当有fd就绪时,立即回调函数)。

威尼斯官网 3

内核

  • 持续的底层代码质量优化工作 (@swoole)
  • 更多的单元测试, 并使用了基于 webmozart/assert 二次开发而来的断言库 swoole/assert (@twose)
  • 补全内存申请失败检测 (b19bebac) (5a1ddad3) (@matyhtf)
  • 彻底废除Windows支持计划
  • 将协程的一些功能整理划分到SystemScheduler模块, 废除util模块
  • CoHttp2Client底层协程化 (f64874c3) (@matyhtf)
  • 底层全面缓存了开发者注册的函数信息, 调用回调时速度更快 (@twose)

信号驱动IO模型

进程先系统调用sigaction执行一个非阻塞的信号处理函数,进程继续运行。当数据就绪时,为该进程生成一个SIGIO信号,通知进程调用recvfrom读取数据。

威尼斯官网 4

实验性内容

  • 可能在5.0新增的CoServerCoHttpServer
  • CURL Hook(暂时不支持curl_multi

(文/开源中国)    

异步IO模型

  1. 进程告知内核启动某个操作,并在内核完成整个操作后再通知进程。

2. 与信号驱动IO模型区别:信号驱动IO模型只通知数据就绪;异步IO模型通知操作已完成。

威尼斯官网 5

BIO编程

1. 有一个独立的Acceptor线程负责监听客户端连接,接收到连接后为每个客户端创建一个新的线程进行链路处理,处理完之后,通过输出流返回给客户端,线程销毁。

2. 问题:服务端线程个数与客户端并发访问数1:1关系。当客户端并发访问量越来越大时,系统会发生线程堆栈溢出、创建新线程失败等问题,最终导致进程宕机或僵死。

威尼斯官网 6

伪异步IO编程

1. 当新客户端接入时,将客户端Socket封装成一个Task(实现Runnable接口)投递到线程池中进行处理。

2. 好处:由于可以设置线程池队列的大小和最大线程数,所以资源占用是可控的,客户端并发数量增加不会导致资源耗尽、宕机。

3. 问题:底层通信依然采用同步阻塞模型,无法从根本上解决应答消息缓慢或网络传输较慢时,长时间阻塞线程的问题。

威尼斯官网 7

NIO编程

Buffer和Channel

  1. BIO是面向流的,一次处理一个字节;NIO是面向块的,以块的形式处理数据。

  2. BIO的java.io.*已经使用NIO重新实现过。

3. Buffer缓冲区存放着准备要写入或读出的数据。通常是一个字节数组,但也可以是其他类型的数组或不是数组。

  1. Buffer类型:

    a) ByteBuffer(常用)

    b) CharBuffer

    c) ShortBuffer

    d) IntBuffer

    e) LongBuffer

    f) FloatBuffer

    g) DoubleBuffer

5. Channel通道是双向的,可通过它读取或写入数据。所有的数据都要通过Buffer来处理,永远不会将数据直接写入Channel。

威尼斯官网 8

 6. 写文件示例。

威尼斯官网 9

 1 import java.io.FileOutputStream;
 2 import java.io.IOException;
 3 import java.io.UnsupportedEncodingException;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.FileChannel;
 6 import java.util.Random;
 7 import java.util.UUID;
 8 
 9 public class Test {
10     
11     private static byte[] getRandomData() {
12         int randomLength = new Random().nextInt(100);
13         StringBuilder data = new StringBuilder();
14         for (int index = 0; index < randomLength; index++) {
15             data.append(UUID.randomUUID().toString());
16         }
17         return data.toString().getBytes();
18     }
19     
20     public static void main(String[] args) {
21         FileOutputStream fileOutputStream = null;
22         try {
23             fileOutputStream = new FileOutputStream("D:/test.txt");
24             FileChannel fileChannel = fileOutputStream.getChannel();
25             ByteBuffer byteBuffer = null;
26             for (int index = 0; index < 1000; index++) {
27                 byte[] data = getRandomData();
28                 if (byteBuffer == null) {
29                     byteBuffer = ByteBuffer.wrap(data);
30                 } else if (data.length > byteBuffer.capacity()) {
31                     if (byteBuffer.position() > 0) {
32                         byteBuffer.flip();
33                         fileChannel.write(byteBuffer);
34                         byteBuffer.clear();
35                     }
36                     byteBuffer = ByteBuffer.wrap(data);
37                 } else if (data.length > byteBuffer.remaining()) {
38                     byteBuffer.flip();
39                     fileChannel.write(byteBuffer);
40                     byteBuffer.clear();
41                 }
42                 
43                 byteBuffer.put(data);
44             }
45             byteBuffer.flip();
46             fileChannel.write(byteBuffer);
47             byteBuffer.clear();
48             
49         } catch (IOException e) {
50             e.printStackTrace();
51         } finally {
52             if (fileOutputStream != null) {
53                 try {
54                     fileOutputStream.close();
55                 } catch (IOException e) {
56                     e.printStackTrace();
57                 }
58             }
59         }
60     }
61     
62 }

View Code

  1. 读文件示例。

威尼斯官网 10

 1 import java.io.FileInputStream;
 2 import java.io.IOException;
 3 import java.nio.ByteBuffer;
 4 import java.nio.channels.FileChannel;
 5 
 6 public class Test {
 7 
 8     public static void main(String[] args) {
 9         FileInputStream fileInputStream = null;
10         try {
11             fileInputStream = new FileInputStream("D:/test.txt");
12             FileChannel fileChannel = fileInputStream.getChannel();
13             ByteBuffer byteBuffer = ByteBuffer.allocate(64);
14             while (fileChannel.read(byteBuffer) > 0) {
15                 byteBuffer.flip();
16                 while (byteBuffer.hasRemaining()) {
17                     System.out.print((char) byteBuffer.get());
18                 }
19                 byteBuffer.clear();
20             }
21             
22         } catch (IOException e) {
23             e.printStackTrace();
24         } finally {
25             if (fileInputStream != null) {
26                 try {
27                     fileInputStream.close();
28                 } catch (IOException e) {
29                     e.printStackTrace();
30                 }
31             }
32         }
33     }
34 
35 }

View Code

  1. 复制文件示例。

威尼斯官网 11

 1 import java.io.IOException;
 2 import java.io.RandomAccessFile;
 3 import java.nio.ByteBuffer;
 4 import java.nio.channels.FileChannel;
 5 
 6 public class Test {
 7 
 8     public static void main(String[] args) {
 9         RandomAccessFile sourceFile = null;
10         RandomAccessFile targetFile = null;
11         try {
12             sourceFile = new RandomAccessFile("D:/test.txt", "r");
13             targetFile = new RandomAccessFile("D:/test.txt.bak", "rw");
14             FileChannel sourceFileChannel = sourceFile.getChannel();
15             FileChannel targetFileChannel = targetFile.getChannel();
16             ByteBuffer byteBuffer = ByteBuffer.allocate(64);
17             while (sourceFileChannel.read(byteBuffer) > 0) {
18                 byteBuffer.flip();
19                 targetFileChannel.write(byteBuffer);
20                 byteBuffer.clear();
21             }
22             
23         } catch (IOException e) {
24             e.printStackTrace();
25         }
26     }
27 
28 }

View Code

威尼斯官网,深入Buffer

  1. Buffer可以理解成数组,它通过以下3个值描述状态:

    a) position:下一个元素的位置;

    b) limit:可读取或写入的元素总数,position总是小于或者等于limit;

    c) capacity:Buffer最大容量,limit总是小于或者等于capacity。

  1. 以读、写举例说明Buffer。

    a) 创建一个8字节的ByteBuffer。position=0,limit=8,capacity=8。

威尼斯官网 12

     b) 读取3个字节。position=3,limit=8,capacity=8。

威尼斯官网 13

     c) 读取2个字节。position=5,limit=8,capacity=8。

威尼斯官网 14

     d) 执行flip()。position=0,limit=5,capacity=8。

威尼斯官网 15

    e) 写入4个字节。position=4,limit=5,capacity=8。

威尼斯官网 16

     f) 写入1个字节。position=5,limit=5,capacity=8。

威尼斯官网 17

    g) 执行clear()。position=0,limit=8,capacity=8。

威尼斯官网 18

 3. 创建ByteBuffer的两种方法:

    a) 创建固定大小的Buffer。

ByteBuffer.allocate(capacity)

    b) 将数组及其内容包装成Buffer。

byte array[] = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(array);

Selector

  1. Selector即IO复用模型中的多路复用器。

  2. JDK使用了epoll。

AIO编程

  1. AIO也称NIO2.0,是异步IO模型。

  2. JDK 7时在java.nio.channels包下新增了4个异步Channel。

    a) AsynchronousSocketChannel

    b) AsynchronousServerSocketChannel

    c) AsynchronousFileChannel

    d) AsynchronousDatagramChannel

  1. 使用Future写文件:异步执行,阻塞Future.get(),直到取得结果。

威尼斯官网 19

 1 import java.io.IOException;
 2 import java.nio.ByteBuffer;
 3 import java.nio.channels.AsynchronousFileChannel;
 4 import java.nio.file.Path;
 5 import java.nio.file.Paths;
 6 import java.nio.file.StandardOpenOption;
 7 import java.util.ArrayList;
 8 import java.util.List;
 9 import java.util.Random;
10 import java.util.UUID;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.Future;
13 
14 public class Test {
15     
16     private static byte[] getRandomData() {
17         int randomLength = new Random().nextInt(100);
18         StringBuilder data = new StringBuilder();
19         for (int index = 0; index < randomLength; index++) {
20             data.append(UUID.randomUUID().toString());
21         }
22         return data.append('n').toString().getBytes();
23     }
24 
25     public static void main (String [] args) {
26         Path file = Paths.get("D:/test.txt");
27         AsynchronousFileChannel asynchronousFileChannel = null;
28         try {
29             asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE);
30             List<Future<Integer>> futures = new ArrayList<>();
31             for (int index = 0; index < 10; index++) {
32                 ByteBuffer byteBuffer = ByteBuffer.wrap(getRandomData());
33                 Future<Integer> future = asynchronousFileChannel.write(byteBuffer, 0);
34                 futures.add(future);
35             }
36             for (Future<Integer> future : futures) {
37                 Integer length = null;
38                 try {
39                     length = future.get();
40                 } catch (InterruptedException | ExecutionException e) {
41                     e.printStackTrace();
42                 }
43                 System.out.println("Bytes written: " + length);
44             }
45             
46         } catch (IOException e) {
47             e.printStackTrace();
48         } finally {
49             if (asynchronousFileChannel != null) {
50                 try {
51                     asynchronousFileChannel.close();
52                 } catch (IOException e) {
53                     e.printStackTrace();
54                 }
55             }
56         }
57     }
58 }

View Code

4. 使用CompletionHandler写文件:异步执行,回调CompletionHandler。注意:示例中,由于不阻塞主线程,即异步任务是否结果主线程都会结束,有时会看不到结果,所以sleep 5秒。

威尼斯官网 20

 1 import java.io.IOException;
 2 import java.nio.ByteBuffer;
 3 import java.nio.channels.AsynchronousFileChannel;
 4 import java.nio.channels.CompletionHandler;
 5 import java.nio.file.Path;
 6 import java.nio.file.Paths;
 7 import java.nio.file.StandardOpenOption;
 8 import java.util.Random;
 9 import java.util.UUID;
10 
11 public class Test {
12     
13     private static byte[] getRandomData() {
14         int randomLength = new Random().nextInt(100);
15         StringBuilder data = new StringBuilder();
16         for (int index = 0; index < randomLength; index++) {
17             data.append(UUID.randomUUID().toString());
18         }
19         return data.append('n').toString().getBytes();
20     }
21 
22     public static void main (String [] args) {
23         Path file = Paths.get("D:/test.txt");
24         AsynchronousFileChannel asynchronousFileChannel = null;
25         try {
26             asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE);
27             CompletionHandler<Integer, Object> completionHandler = new CompletionHandler<Integer, Object>() {
28                 @Override
29                 public void completed(Integer result, Object attachment) {
30                     System.out.println("Bytes written: " + result);
31                 }
32                 @Override
33                 public void failed(Throwable exc, Object attachment) {
34                 }
35             };
36             for (int index = 0; index < 10; index ++) {
37                 ByteBuffer byteBuffer = ByteBuffer.wrap(getRandomData());
38                 asynchronousFileChannel.write(byteBuffer, 0, null, completionHandler);
39             }
40             
41         } catch (IOException e) {
42             e.printStackTrace();
43         } finally {
44             if (asynchronousFileChannel != null) {
45                 try {
46                     asynchronousFileChannel.close();
47                 } catch (IOException e) {
48                     e.printStackTrace();
49                 }
50             }
51         }
52         try {
53             Thread.sleep(5000);
54         } catch (InterruptedException e) {
55             e.printStackTrace();
56         }
57     }
58 }

View Code

  1. 使用Future读文件:异步执行,阻塞Future.get(),直到取得结果。

威尼斯官网 21

 1 import java.io.IOException;
 2 import java.nio.ByteBuffer;
 3 import java.nio.channels.AsynchronousFileChannel;
 4 import java.nio.file.Path;
 5 import java.nio.file.Paths;
 6 import java.nio.file.StandardOpenOption;
 7 import java.util.concurrent.ExecutionException;
 8 import java.util.concurrent.Future;
 9 
10 public class Test {
11 
12     public static void main (String [] args) {
13         Path file = Paths.get("D:/test.txt");
14         AsynchronousFileChannel asynchronousFileChannel = null;
15         try {
16             asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
17             ByteBuffer byteBuffer = ByteBuffer.allocate(64);
18             int position = 0;
19             int length = 0;
20             do {
21                 Future<Integer> future = asynchronousFileChannel.read(byteBuffer, position);
22                 length = future.get();
23                 if (length > 0) {
24                     byteBuffer.flip();
25                     System.out.print(new String(byteBuffer.array()));
26                     byteBuffer.clear();
27                 }
28                 position += length;
29             } while (length > 0);
30             
31         } catch (IOException e) {
32             e.printStackTrace();
33         } catch (InterruptedException e) {
34             e.printStackTrace();
35         } catch (ExecutionException e) {
36             e.printStackTrace();
37         } finally {
38             if (asynchronousFileChannel != null) {
39                 try {
40                     asynchronousFileChannel.close();
41                 } catch (IOException e) {
42                     e.printStackTrace();
43                 }
44             }
45         }
46     }
47 }

View Code

6. 使用CompletionHandler读文件:异步执行,回调CompletionHandler。注意:示例中,由于不阻塞主线程,即异步任务是否结果主线程都会结束,有时会看不到结果,所以sleep 5秒。

威尼斯官网 22

 1 import java.io.IOException;
 2 import java.nio.ByteBuffer;
 3 import java.nio.channels.AsynchronousFileChannel;
 4 import java.nio.channels.CompletionHandler;
 5 import java.nio.file.Path;
 6 import java.nio.file.Paths;
 7 import java.nio.file.StandardOpenOption;
 8 
 9 public class Test {
10 
11     public static void main (String [] args) {
12         Path file = Paths.get("D:/test.txt");
13         AsynchronousFileChannel asynchronousFileChannel = null;
14         try {
15             asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
16             // 10个异步任务分别读取文件头64个字节,5秒后分别输出。
17             CompletionHandler<Integer, ByteBuffer> completionHandler = new CompletionHandler<Integer, ByteBuffer>() {
18                 @Override
19                 public void completed(Integer result, ByteBuffer byteBuffer) {
20                     byteBuffer.flip();
21                     System.out.print(new String(byteBuffer.array()));
22                     byteBuffer.clear();
23                 }
24                 @Override
25                 public void failed(Throwable exc, ByteBuffer byteBuffer) {
26                 }
27             };
28             for (int index = 0; index < 10; index++) {
29                 ByteBuffer byteBuffer = ByteBuffer.allocate(64);
30                 asynchronousFileChannel.read(byteBuffer, byteBuffer.limit() * index, byteBuffer, completionHandler);
31             }
32             
33         } catch (IOException e) {
34             e.printStackTrace();
35         } finally {
36             if (asynchronousFileChannel != null) {
37                 try {
38                     asynchronousFileChannel.close();
39                 } catch (IOException e) {
40                     e.printStackTrace();
41                 }
42             }
43         }
44         try {
45             Thread.sleep(5000);
46         } catch (InterruptedException e) {
47             e.printStackTrace();
48         }
49     }
50 }

View Code

四种IO编程对比及选择Netty的原因

  1. 对比。

威尼斯官网 23

  1. 选择NIO框架Netty,而不选择JDK的NIO类库的理由。

    a) NIO类库和API繁杂。

    b) 需另具备Java多线程编程等技能。

    c) 可靠性不高,工作量和难度非常大。

    d) 臭名昭著的epoll Bug导致Selector空轮训。

Netty入门

开发与部署

  1. 开发环境:CLASSPATH中导入“netty-all-x.y.z.jar”即可。

  2. 打包部署:由于是非Web应用,构建成jar包部署即可。

Hello World

  1. 配置Maven的pom.xml文件。

威尼斯官网 24

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha1</version>
</dependency>

View Code

  1. 时间服务器TimeServer

威尼斯官网 25

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 
10 public class TimeServer {
11     
12     public void bind(int port) throws Exception {
13         // 服务器NIO线程组线
14         EventLoopGroup bossGroup = new NioEventLoopGroup();
15         EventLoopGroup workerGroup = new NioEventLoopGroup();
16         try {
17             ServerBootstrap serverBootstrap = new ServerBootstrap();
18             serverBootstrap.group(bossGroup, workerGroup)
19                     .channel(NioServerSocketChannel.class)
20                     .option(ChannelOption.SO_BACKLOG, 1024)
21                     .childHandler(new ChildChannelHandler());
22             // 绑定端口,同步等待成功
23             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
24             // 等待服务器监听端口关闭
25             channelFuture.channel().closeFuture().sync();
26         } finally {
27             // 优雅退出,释放线程池资源
28             workerGroup.shutdownGracefully();
29             bossGroup.shutdownGracefully();
30         }
31     }
32     
33     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
34 
35         @Override
36         protected void initChannel(SocketChannel socketChannel) throws Exception {
37             socketChannel.pipeline().addLast(new TimeServerHandler());
38         }
39         
40     }
41     
42     public static void main(String[] args) throws Exception {
43         new TimeServer().bind(8080);
44     }
45     
46 }

View Code

  1. 时间服务器TimeServerHandler

威尼斯官网 26

 1 import java.util.Date;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 public class TimeServerHandler extends ChannelHandlerAdapter {
 9 
10     @Override
11     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
12         ByteBuf reqBuf = (ByteBuf) msg;
13         byte[] req = new byte[reqBuf.readableBytes()];
14         reqBuf.readBytes(req);
15         String reqString = new String(req, "UTF-8");
16         String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";
17         ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());
18         ctx.write(respBuf);
19     }
20     
21     @Override
22     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
23         ctx.flush();
24     }
25 
26     @Override
27     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
28         ctx.close();
29     }
30 
31 }

View Code

  1. 时间客户端TimeClient

威尼斯官网 27

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 
10 public class TimeClient {
11     
12     public void connect(String host, int port) throws Exception {
13         EventLoopGroup group = new NioEventLoopGroup();
14         try {
15             // 客户端NIO线程组
16             Bootstrap bootstrap = new Bootstrap();
17             bootstrap.group(group).channel(NioSocketChannel.class)
18                     .option(ChannelOption.TCP_NODELAY, true)
19                     .handler(new ChildChannelHandler());
20             // 发起异步连接操作
21             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
22             // 等待客户端链路关闭
23             channelFuture.channel().closeFuture().sync();
24             
25         } finally {
26             // 优雅退出,释放NIO线程组
27             group.shutdownGracefully();
28         }
29     }
30     
31     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
32 
33         @Override
34         protected void initChannel(SocketChannel socketChannel) throws Exception {
35             socketChannel.pipeline().addLast(new TimeClientHandler());
36         }
37         
38     }
39     
40     public static void main(String[] args) throws Exception {
41         new TimeClient().connect("127.0.0.1", 8080);
42     }
43     
44 }

View Code

  1. 时间客户端TimeClientHandler

威尼斯官网 28

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 
 6 public class TimeClientHandler extends ChannelHandlerAdapter {
 7 
 8     private final ByteBuf reqBuf;
 9     
10     public TimeClientHandler() {
11         byte[] req = "QUERY TIME ORDER".getBytes();
12         reqBuf = Unpooled.buffer(req.length);
13         reqBuf.writeBytes(req);
14     }
15     
16     @Override
17     public void channelActive(ChannelHandlerContext ctx) throws Exception {
18         ctx.writeAndFlush(reqBuf);
19     }
20     
21     @Override
22     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
23         ByteBuf respBuf = (ByteBuf) msg;
24         byte[] resp = new byte[respBuf.readableBytes()];
25         respBuf.readBytes(resp);
26         String respString = new String(resp, "UTF-8");
27         System.out.println(respString);
28     }
29     
30     @Override
31     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
32         ctx.close();
33     }
34     
35 }

View Code

粘包/拆包问题

问题及其解决

  1. TCP是一个“流协议”,是没有界限的一串数据。

2. TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分。所以在业务上认为,一个完整的包可能会被TCP拆包发送,也可能封装多个

小包成大包发送。

  1. 业界主流协议的解决方案归纳:

    a) 消息定长。如每个报文的大小固定长度200字节,不足时空位补空格。

    b) 在包尾增加回车换行符进行分割。如FTP协议。

    c) 将消息分为消息头、消息体,消息头中包含消息总长度(或消息体长度)的字段。

    d) 更复杂的应用层协议。

  1. Netty提供了多种编码器用于解决粘包/拆包问题。

LineBasedFrameDecoder

  1. 原理:遍历ByteBuf中的可读字节,发现“n”或“rn”时就结束。

2. 支持携带结束符或不携带结束符两种编码方式;支持配置单行的最大长度(超过最大长度未发现换行符则抛出异常,同时忽略掉之前读到的异常码流)。

3. StringDecoder功能:将接受到的对象转成字符串,然后继续调用后面的Handler。

  1. 使用LineBasedFrameDecoder优化后的时间服务器。

    a) 时间服务器TimeServer

威尼斯官网 29

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 import io.netty.handler.codec.LineBasedFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11 
12 public class TimeServer {
13     
14     public void bind(int port) throws Exception {
15         // 服务器NIO线程组线
16         EventLoopGroup bossGroup = new NioEventLoopGroup();
17         EventLoopGroup workerGroup = new NioEventLoopGroup();
18         try {
19             ServerBootstrap serverBootstrap = new ServerBootstrap();
20             serverBootstrap.group(bossGroup, workerGroup)
21                     .channel(NioServerSocketChannel.class)
22                     .option(ChannelOption.SO_BACKLOG, 1024)
23                     .childHandler(new ChildChannelHandler());
24             // 绑定端口,同步等待成功
25             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
26             // 等待服务器监听端口关闭
27             channelFuture.channel().closeFuture().sync();
28         } finally {
29             // 优雅退出,释放线程池资源
30             workerGroup.shutdownGracefully();
31             bossGroup.shutdownGracefully();
32         }
33     }
34     
35     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
36 
37         @Override
38         protected void initChannel(SocketChannel socketChannel) throws Exception {
39             socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
40             socketChannel.pipeline().addLast(new StringDecoder());
41             socketChannel.pipeline().addLast(new TimeServerHandler());
42         }
43         
44     }
45     
46     public static void main(String[] args) throws Exception {
47         new TimeServer().bind(8080);
48     }
49     
50 }

View Code

    b) 时间服务器TimeServerHandler

威尼斯官网 30

 1 import java.util.Date;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 public class TimeServerHandler extends ChannelHandlerAdapter {
 9 
10     @Override
11     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
12         String reqString = (String) msg;
13         String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";
14         respString += "n";
15         ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());
16         ctx.write(respBuf);
17     }
18     
19     @Override
20     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
21         ctx.flush();
22     }
23 
24     @Override
25     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
26         ctx.close();
27     }
28 
29 }

View Code

    c) 时间客户端TimeClient

威尼斯官网 31

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 import io.netty.handler.codec.LineBasedFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11 
12 public class TimeClient {
13     
14     public void connect(String host, int port) throws Exception {
15         EventLoopGroup group = new NioEventLoopGroup();
16         try {
17             // 客户端NIO线程组
18             Bootstrap bootstrap = new Bootstrap();
19             bootstrap.group(group).channel(NioSocketChannel.class)
20                     .option(ChannelOption.TCP_NODELAY, true)
21                     .handler(new ChildChannelHandler());
22             // 发起异步连接操作
23             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
24             // 等待客户端链路关闭
25             channelFuture.channel().closeFuture().sync();
26             
27         } finally {
28             // 优雅退出,释放NIO线程组
29             group.shutdownGracefully();
30         }
31     }
32     
33     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
34 
35         @Override
36         protected void initChannel(SocketChannel socketChannel) throws Exception {
37             socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
38             socketChannel.pipeline().addLast(new StringDecoder());
39             socketChannel.pipeline().addLast(new TimeClientHandler());
40         }
41         
42     }
43     
44     public static void main(String[] args) throws Exception {
45         new TimeClient().connect("127.0.0.1", 8080);
46     }
47     
48 }

View Code

    d) 时间客户端TimeClientHandler

威尼斯官网 32

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 
 6 public class TimeClientHandler extends ChannelHandlerAdapter {
 7 
 8     private final ByteBuf reqBuf;
 9     
10     public TimeClientHandler() {
11         byte[] req = "QUERY TIME ORDERn".getBytes();
12         reqBuf = Unpooled.buffer(req.length);
13         reqBuf.writeBytes(req);
14     }
15     
16     @Override
17     public void channelActive(ChannelHandlerContext ctx) throws Exception {
18         ctx.writeAndFlush(reqBuf);
19     }
20     
21     @Override
22     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
23         String respString = (String) msg;
24         System.out.println(respString);
25     }
26     
27     @Override
28     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
29         ctx.close();
30     }
31     
32 }

View Code

DelimiterBasedFrameDecoder

  1. 功能:以分隔符作为码流结束标识符的消息解码。

  2. 时间服务器TimeServer

威尼斯官网 33

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.buffer.ByteBuf;
 3 import io.netty.buffer.Unpooled;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 public class TimeServer {
15     
16     public void bind(int port) throws Exception {
17         // 服务器NIO线程组线
18         EventLoopGroup bossGroup = new NioEventLoopGroup();
19         EventLoopGroup workerGroup = new NioEventLoopGroup();
20         try {
21             ServerBootstrap serverBootstrap = new ServerBootstrap();
22             serverBootstrap.group(bossGroup, workerGroup)
23                     .channel(NioServerSocketChannel.class)
24                     .option(ChannelOption.SO_BACKLOG, 1024)
25                     .childHandler(new ChildChannelHandler());
26             // 绑定端口,同步等待成功
27             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
28             // 等待服务器监听端口关闭
29             channelFuture.channel().closeFuture().sync();
30         } finally {
31             // 优雅退出,释放线程池资源
32             workerGroup.shutdownGracefully();
33             bossGroup.shutdownGracefully();
34         }
35     }
36     
37     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
38 
39         @Override
40         protected void initChannel(SocketChannel socketChannel) throws Exception {
41             ByteBuf delimiter = Unpooled.copiedBuffer("*&*".getBytes());
42             socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
43             socketChannel.pipeline().addLast(new StringDecoder());
44             socketChannel.pipeline().addLast(new TimeServerHandler());
45         }
46         
47     }
48     
49     public static void main(String[] args) throws Exception {
50         new TimeServer().bind(8080);
51     }
52     
53 }

View Code

  1. 时间服务器TimeServerHandler

威尼斯官网 34

 1 import java.util.Date;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 public class TimeServerHandler extends ChannelHandlerAdapter {
 9 
10     @Override
11     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
12         String reqString = (String) msg;
13         String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";
14         respString += "*&*";
15         ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());
16         ctx.write(respBuf);
17     }
18     
19     @Override
20     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
21         ctx.flush();
22     }
23 
24     @Override
25     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
26         ctx.close();
27     }
28 
29 }

View Code

  1. 时间客户端TimeClient

威尼斯官网 35

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.buffer.ByteBuf;
 3 import io.netty.buffer.Unpooled;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 public class TimeClient {
15     
16     public void connect(String host, int port) throws Exception {
17         EventLoopGroup group = new NioEventLoopGroup();
18         try {
19             // 客户端NIO线程组
20             Bootstrap bootstrap = new Bootstrap();
21             bootstrap.group(group).channel(NioSocketChannel.class)
22                     .option(ChannelOption.TCP_NODELAY, true)
23                     .handler(new ChildChannelHandler());
24             // 发起异步连接操作
25             ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
26             // 等待客户端链路关闭
27             channelFuture.channel().closeFuture().sync();
28             
29         } finally {
30             // 优雅退出,释放NIO线程组
31             group.shutdownGracefully();
32         }
33     }
34     
35     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
36 
37         @Override
38         protected void initChannel(SocketChannel socketChannel) throws Exception {
39             ByteBuf delimiter = Unpooled.copiedBuffer("*&*".getBytes());
40             socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
41             socketChannel.pipeline().addLast(new StringDecoder());
42             socketChannel.pipeline().addLast(new TimeClientHandler());
43         }
44         
45     }
46     
47     public static void main(String[] args) throws Exception {
48         new TimeClient().connect("127.0.0.1", 8080);
49     }
50     
51 }

View Code

  1. 时间客户端TimeClientHandler

威尼斯官网 36

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5 
 6 public class TimeClientHandler extends ChannelHandlerAdapter {
 7 
 8     private final ByteBuf reqBuf;
 9     
10     public TimeClientHandler() {
11         byte[] req = "QUERY TIME ORDER*&*".getBytes();
12         reqBuf = Unpooled.buffer(req.length);
13         reqBuf.writeBytes(req);
14     }
15     
16     @Override
17     public void channelActive(ChannelHandlerContext ctx) throws Exception {
18         ctx.writeAndFlush(reqBuf);
19     }
20     
21     @Override
22     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
23         String respString = (String) msg;
24         System.out.println(respString);
25     }
26     
27     @Override
28     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
29         ctx.close();
30     }
31     
32 }

View Code

FixedLengthFrameDecoder

1. 原理:无论一次接受到多少数据包,它都会按照设置的固定长度解码,如果是半包消息,则缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。

  1. 回显服务器EchoServer

威尼斯官网 37

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 import io.netty.handler.codec.FixedLengthFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11 
12 public class EchoServer {
13     
14     public void bind(int port) throws Exception {
15         // 服务器NIO线程组线
16         EventLoopGroup bossGroup = new NioEventLoopGroup();
17         EventLoopGroup workerGroup = new NioEventLoopGroup();
18         try {
19             ServerBootstrap serverBootstrap = new ServerBootstrap();
20             serverBootstrap.group(bossGroup, workerGroup)
21                     .channel(NioServerSocketChannel.class)
22                     .option(ChannelOption.SO_BACKLOG, 1024)
23                     .childHandler(new ChildChannelHandler());
24             // 绑定端口,同步等待成功
25             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
26             // 等待服务器监听端口关闭
27             channelFuture.channel().closeFuture().sync();
28         } finally {
29             // 优雅退出,释放线程池资源
30             workerGroup.shutdownGracefully();
31             bossGroup.shutdownGracefully();
32         }
33     }
34     
35     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
36 
37         @Override
38         protected void initChannel(SocketChannel socketChannel) throws Exception {
39             socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
40             socketChannel.pipeline().addLast(new StringDecoder());
41             socketChannel.pipeline().addLast(new EchoServerHandler());
42         }
43         
44     }
45     
46     public static void main(String[] args) throws Exception {
47         new EchoServer().bind(8080);
48     }
49     
50 }

View Code

  1. 回显服务器EchoServerHandler

威尼斯官网 38

 1 import io.netty.channel.ChannelHandlerAdapter;
 2 import io.netty.channel.ChannelHandlerContext;
 3 
 4 public class EchoServerHandler extends ChannelHandlerAdapter {
 5 
 6     @Override
 7     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 8         System.out.println(msg);
 9     }
10     
11     @Override
12     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
13         ctx.close();
14     }
15 
16 }

View Code

  1. 使用telnet命令测试,当长度达到20个字符时,服务器打印。

Java序列化问题

问题描述及其解决

  1. 无法跨语言。Java序列化是Java语言内部的私有协议,其他语言并不支持。

2. 序列化后的码流太大。编码后的字节数组越大,存储的时候就越占空间,存储的硬件成本就越高,网络传输时更占带宽,导致系统的吞吐量降低。

  1. 序列化性能太低。编解码耗时长。

  2. 解决:编解码框架,如Google Protobuf、MessagePack。此处不深入展开。

HTTP协议开发

Netty HTTP

1. 由于HTTP协议的通用性,很多异构系统间的通信交互采用HTTP协议,如非常流行的HTTP

  • XML或者RESTful + JSON。
  1. 与Web容器相比,Netty开发HTTP的优势:轻量级;安全。

  2. 这里以文件服务器举例,至于HTTP + XML,此处不深入展开。

文件服务器

  1. 文件服务器HttpFileServer

威尼斯官网 39

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.EventLoopGroup;
 5 import io.netty.channel.nio.NioEventLoopGroup;
 6 import io.netty.channel.socket.SocketChannel;
 7 import io.netty.channel.socket.nio.NioServerSocketChannel;
 8 import io.netty.handler.codec.http.HttpObjectAggregator;
 9 import io.netty.handler.codec.http.HttpRequestDecoder;
10 import io.netty.handler.codec.http.HttpResponseEncoder;
11 import io.netty.handler.stream.ChunkedWriteHandler;
12 
13 public class HttpFileServer {
14     
15     public void run(int port, String folderPath) throws Exception {
16         EventLoopGroup bossGroup = new NioEventLoopGroup();
17         EventLoopGroup workerGroup = new NioEventLoopGroup();
18         try {
19             ServerBootstrap serverBootstrap = new ServerBootstrap();
20             serverBootstrap.group(bossGroup, workerGroup)
21                     .channel(NioServerSocketChannel.class)
22                     .childHandler(new ChannelInitializer<SocketChannel>() {
23                         
24                         @Override
25                         protected void initChannel(SocketChannel socketChannel) throws Exception {
26                             socketChannel.pipeline().addLast(new HttpRequestDecoder());
27                             socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
28                             socketChannel.pipeline().addLast(new HttpResponseEncoder());
29                             socketChannel.pipeline().addLast(new ChunkedWriteHandler());
30                             socketChannel.pipeline().addLast(new HttpFileServerHandler(folderPath));
31                         }
32                         
33                     });
34             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
35             channelFuture.channel().closeFuture().sync();
36         } finally {
37             workerGroup.shutdownGracefully();
38             bossGroup.shutdownGracefully();
39         }
40     }
41     
42     public static void main(String[] args) throws Exception {
43         int port = 8080;
44         String folderPath = "E:/workspace";
45         new HttpFileServer().run(port, folderPath);
46     }
47 
48 }

View Code

  1. 文件服务器HttpFileServerHandler

威尼斯官网 40

  1 import io.netty.buffer.ByteBuf;
  2 import io.netty.buffer.Unpooled;
  3 import io.netty.channel.ChannelFutureListener;
  4 import io.netty.channel.ChannelHandlerContext;
  5 import io.netty.channel.SimpleChannelInboundHandler;
  6 import io.netty.handler.codec.http.DefaultFullHttpResponse;
  7 import io.netty.handler.codec.http.DefaultHttpResponse;
  8 import io.netty.handler.codec.http.FullHttpRequest;
  9 import io.netty.handler.codec.http.FullHttpResponse;
 10 import io.netty.handler.codec.http.HttpHeaders;
 11 import io.netty.handler.codec.http.HttpMethod;
 12 import io.netty.handler.codec.http.HttpResponse;
 13 import io.netty.handler.codec.http.HttpResponseStatus;
 14 import io.netty.handler.codec.http.HttpVersion;
 15 import io.netty.handler.stream.ChunkedFile;
 16 import io.netty.util.CharsetUtil;
 17 
 18 import java.io.File;
 19 import java.io.FileNotFoundException;
 20 import java.io.RandomAccessFile;
 21 import java.net.URLDecoder;
 22 
 23 public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
 24     
 25     private String folderPath;
 26     
 27     public HttpFileServerHandler(String folderPath) {
 28         this.folderPath = folderPath;
 29     }
 30 
 31     @Override
 32     protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
 33         if (!req.getDecoderResult().isSuccess()) {
 34             sendStatus(ctx, HttpResponseStatus.BAD_REQUEST);
 35             return;
 36         }
 37         if (!HttpMethod.GET.equals(req.getMethod())) {
 38             sendStatus(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
 39             return;
 40         }
 41         String uri = req.getUri();
 42         File file = getFile(uri);
 43         if (file == null || file.isHidden() || !file.exists()) {
 44             sendStatus(ctx, HttpResponseStatus.NOT_FOUND);
 45             return;
 46         }
 47         try {
 48             if (file.isDirectory()) {
 49                 listFiles(ctx, file, uri);
 50             } else {
 51                 returnFile(ctx, req, file);
 52             }
 53         } catch (Exception e) {
 54             sendStatus(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
 55         }
 56     }
 57     
 58     private File getFile(String uri) throws Exception {
 59         uri = URLDecoder.decode(uri, "UTF-8");
 60         return new File(folderPath + uri);
 61     }
 62     
 63     private void listFiles(ChannelHandlerContext ctx, File folder, String uri) throws Exception {
 64         uri = uri.endsWith("/") ? uri : uri + "/";
 65         StringBuilder html = new StringBuilder("<h1>Index of ").append(URLDecoder.decode(uri, "UTF-8")).append("</h1><hr/><pre><a href="").append(uri).append("../">../</a>n");
 66         File[] subfiles = folder.listFiles();
 67         if (subfiles != null && subfiles.length > 0) {
 68             for (File subfile : subfiles) {
 69                 String name = subfile.getName();
 70                 html.append("<a href="").append(uri).append(name).append("">").append(name).append("</a>n");
 71             }
 72         }
 73         html.append("</pre><hr/>");
 74         FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
 75         resp.headers().add(HttpHeaders.Names.CONTENT_TYPE, "text/html;charset=UTF-8");
 76         ByteBuf content = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8);
 77         resp.content().writeBytes(content);
 78         ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
 79     }
 80     
 81     private void returnFile(ChannelHandlerContext ctx, FullHttpRequest req, File file) throws Exception {
 82         
 83         RandomAccessFile randomAccessFile = null;
 84         try {
 85             randomAccessFile = new RandomAccessFile(file, "r");
 86             HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
 87             resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH, randomAccessFile.length())
 88                     .set(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
 89             if (HttpHeaders.Values.KEEP_ALIVE.toString().equalsIgnoreCase(req.headers().get(HttpHeaders.Names.CONNECTION))) {
 90                 resp.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
 91             }
 92             ctx.write(resp);
 93             ctx.writeAndFlush(new ChunkedFile(randomAccessFile, 0, randomAccessFile.length(), 8192)).addListener(ChannelFutureListener.CLOSE);
 94             
 95         } catch (FileNotFoundException e) {
 96             sendStatus(ctx, HttpResponseStatus.NOT_FOUND);
 97         } finally {
 98             if (randomAccessFile != null) {
 99                 randomAccessFile.close();
100             }
101         }
102     }
103     
104     private void sendStatus(ChannelHandlerContext ctx, HttpResponseStatus status) throws Exception {
105         HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);
106         ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
107     }
108 
109 }

View Code

WebSocket协议开发

问题及其解决

  1. 轮训、Comet等服务器推送技术效率低下,大量消耗服务器带宽和资源。

  2. WebSocket的特点:

    a) 单一的TCP连接,全双工模式。

    b) 对代理、防火墙和路由器透明。

    c) 无头部信息、Cookie和身份验证。

    d) 无安全开销。

    e) 通过“ping/pong”帧保持链路激活。

    f) 服务器可以主动传递消息给客户端,客户端不再轮训。

原理(过程)

1. 浏览器向服务器发起一个HTTP请求(特别的头信息,Sec-WebSocket-Key是随机的),准备建立WebSocket连接。

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com

2. 服务器用Sec-WebSocket-Key加上魔幻字符串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,先SHA-1加密,再BASE-64编码,作为Sec-WebSocket-Accept返回浏览器。握手完成。

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
Sec-WebSocket-Protocol: chat
  1. 服务器和浏览器可通过message方式进行通信。

4. 关闭消息带有一个状态码和一个可选的关闭原因,按协议要求发送一个Close控制帧,当对端接受到关闭控制帧指令时,主动关闭WebSocket连接。

开发

  1. 服务器WebSocketServer

威尼斯官网 41

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.EventLoopGroup;
 5 import io.netty.channel.nio.NioEventLoopGroup;
 6 import io.netty.channel.socket.SocketChannel;
 7 import io.netty.channel.socket.nio.NioServerSocketChannel;
 8 import io.netty.handler.codec.http.HttpObjectAggregator;
 9 import io.netty.handler.codec.http.HttpRequestDecoder;
10 import io.netty.handler.codec.http.HttpResponseEncoder;
11 import io.netty.handler.stream.ChunkedWriteHandler;
12 
13 public class WebSocketServer {
14     
15     public void run(int port) throws Exception {
16         EventLoopGroup bossGroup = new NioEventLoopGroup();
17         EventLoopGroup workerGroup = new NioEventLoopGroup();
18         try {
19             ServerBootstrap serverBootstrap = new ServerBootstrap();
20             serverBootstrap.group(bossGroup, workerGroup)
21                     .channel(NioServerSocketChannel.class)
22                     .childHandler(new ChannelInitializer<SocketChannel>() {
23                         
24                         @Override
25                         protected void initChannel(SocketChannel socketChannel) throws Exception {
26                             socketChannel.pipeline().addLast(new HttpRequestDecoder());
27                             socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
28                             socketChannel.pipeline().addLast(new HttpResponseEncoder());
29                             socketChannel.pipeline().addLast(new ChunkedWriteHandler());
30                             socketChannel.pipeline().addLast(new WebSocketServerHandler());
31                         }
32                         
33                     });
34             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
35             channelFuture.channel().closeFuture().sync();
36         } finally {
37             workerGroup.shutdownGracefully();
38             bossGroup.shutdownGracefully();
39         }
40     }
41     
42     public static void main(String[] args) throws Exception {
43         int port = 8080;
44         new WebSocketServer().run(port);
45     }
46     
47 }

View Code

  1. 服务器WebSocketServerHandler

威尼斯官网 42

 1 import io.netty.channel.ChannelFutureListener;
 2 import io.netty.channel.ChannelHandlerContext;
 3 import io.netty.channel.SimpleChannelInboundHandler;
 4 import io.netty.handler.codec.http.DefaultHttpResponse;
 5 import io.netty.handler.codec.http.FullHttpRequest;
 6 import io.netty.handler.codec.http.HttpHeaders;
 7 import io.netty.handler.codec.http.HttpResponse;
 8 import io.netty.handler.codec.http.HttpResponseStatus;
 9 import io.netty.handler.codec.http.HttpVersion;
10 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
11 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
12 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
13 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
14 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
15 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
16 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
17 
18 import java.util.Date;
19 
20 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
21     
22     private WebSocketServerHandshaker handshaker;
23     
24     @Override
25     protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
26         // 传统HTTP
27         if (msg instanceof FullHttpRequest) {
28             handleHttpRequest(ctx, (FullHttpRequest) msg);
29         } else if (msg instanceof WebSocketFrame) {
30             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
31         }
32     }
33     
34     @Override
35     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
36         ctx.flush();
37     }
38     
39     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
40         if (!req.getDecoderResult().isSuccess()
41                 || !HttpHeaders.Values.WEBSOCKET.toString().equalsIgnoreCase(req.headers().get(HttpHeaders.Names.UPGRADE))) {
42             sendStatus(ctx, HttpResponseStatus.BAD_REQUEST);
43             return;
44         }
45         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/testws", null, false);
46         handshaker = wsFactory.newHandshaker(req);
47         if (handshaker == null) {
48             WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
49         } else {
50             handshaker.handshake(ctx.channel(), req);
51         }
52     }
53     
54     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
55         if (frame instanceof CloseWebSocketFrame) {
56             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
57             return;
58         }
59         if (frame instanceof PingWebSocketFrame) {
60             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
61             return;
62         }
63         if (!(frame instanceof TextWebSocketFrame)) {
64             throw new UnsupportedOperationException();
65         }
66         String req = ((TextWebSocketFrame) frame).text();
67         ctx.channel().write(new TextWebSocketFrame("欢迎" + req + ",现在时刻" + new Date()));
68     }
69     
70     private void sendStatus(ChannelHandlerContext ctx, HttpResponseStatus status) throws Exception {
71         HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);
72         ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
73     }
74     
75 }

View Code

  1. 浏览器websocketclient.html

威尼斯官网 43

 1 <script type="text/javascript">
 2 var socket;
 3 function initSocket() {
 4     if (socket) return;
 5     if (!window.WebSocket) window.WebSocket = window.MozWebSocket;
 6     if (!window.WebSocket) {
 7         alert('浏览器不支持WebSocket');
 8         return;
 9     }
10     socket = new WebSocket('ws://localhost:8080/testws');
11     socket.onmessage = function(event) {
12         alert(event.data);
13     };
14     socket.onopen = function(event) {
15         alert('WebSocket连接建立成功');
16     };
17     socket.onclose = function(event) {
18         alert('WebSocket连接已关闭');
19     };
20 }
21 
22 function sendMsg() {
23     initSocket();
24     if (socket && WebSocket && socket.readyState == WebSocket.OPEN) {
25         var msg = document.getElementById('msg').value;
26         socket.send(msg);
27     }
28 }
29 </script>
30 <input type="text" id="msg"/>
31 <input type="button" value="Send" onclick="sendMsg()"/>

View Code

Netty架构

逻辑架构

  1. Netty采用三层网络架构设计和开发。

威尼斯官网 44

2. Reactor通信调度层(第1层)。负责监听网络的读写和连接操作。将网络层的数据读取到内存缓存区,然后触发各种网络事件,如连接创建、连接激活、读事件、写事件等,将这些事件触发到Pipeline中,有Pipeline管理的责任链进行后续处理。

3. 责任链ChannelPipleline(第2层)。负责事件在责任链中的有序传播,同时动态地编排责任链。通常,由编解码Handler将外部协议消息转换成内部POJO对象,这样上层业务只需关心业务逻辑处理。

  1. 业务逻辑编排层Service ChannelHandler(第3层)。通常有两类:存储的业务逻辑编排和其他应用层协议插件,用于特定协议相关的会话和链路管理。

  2. 通常,开发者值需关系责任链和业务逻辑编排层。

高性能

Netty的高性能是如何实现的?

1. 采用异步非阻塞IO类库,基于Reactor模式实现,解决了传统同步阻塞IO模式下一个服务端无法平滑处理线性增长的客户端的问题。

2. TCP接收和发送缓冲区使用直接内存代替堆内存,避免内存复制,提升了IO读写性能。俗称“零拷贝”(Zero-Copy)。

3. 通过内存池方式循环利用ByteBuf,避免了频繁创建和销毁ByteBuf带来的性能损耗。

4. 可配置IO线程数、TCP参数等,为不同场景提供定制化的调优参数,满足不同的性能场景。

  1. 采用环形数组缓冲区实现无锁化并发编程,代替传统的线程安全容器和锁。

  2. 合理使用线程安全容器、原子类等,提升系统的并发处理能力。

7. 关键资源的处理使用单线程串行化方式,避免了多线程并发访问带来的锁竞争和额外的CPU资源消耗问题。

8. 通过引用计数器及时申请释放不再被引用的对象,细粒度的内存管理降低了GC频繁,减少了频繁GC带来的延时和CPU损耗。

可靠性

Netty的可靠性是如何实现的?

  1. 链路有效性检测。

    a) 长连接无需每次发送消息时创建链路,也无需在消息交互完成后关闭链路,因此相对短链接更高。

    b) 为保证长连接有效性,需要周期性心跳检测。一旦发现问题,可以及时关闭链路,重建TCP链接。

  1. 内存保护机制。

    a) 通过对象引用计数器对ByteBuf等内置对象进行细粒度的内存申请和释放,对非法对象引用进行检测和保护。

    b) 通过内存池方式循环利用ByteBuf,节省内存。

    c) 可设置内存容量上限,包括ByteBuf、线程池线程数等。

  1. 优雅停机。

    a) 当系统退出时,JVM通过注册的Shutdown Hook拦截到退出信号量,然后执行退出操作,释放相关模块的资源,将缓冲区的消息处理完成或清空,将待刷新的数据持久化到磁盘或数据库,完成后再退出。

    b) 需设置超时时间T,如果达到T后仍然没有退出,则通过“kill -9 pid”强杀进程。

可定制性

Netty的可定制性是如何实现的?

1. 责任链模式:ChannelPipeline基于责任链模式,便于业务逻辑的拦截、定制和扩展。

  1. 基于接口开发:关键类库都提供了接口或抽象类。

  2. 提供大量工厂类,重载工厂类可创建用户实现的对象。

  3. 提供大量系统参数供用户设置。

可扩展性

可定义私有协议栈。

私有协议栈开发

  1. 开发时编写的代码。

    a) 数据结构NettyMessage;

    b) 消息编解码器NettyMessageEncoder和NettyMessageDecoder;

    c) 握手认证Handler LoginAuthReqHanlder和LoginAuthRespHanlder;

    d) 心跳检测Handler HearBeatReqHanlder和HearBeatRespHanlder。

  1. 私有协议栈细节待补充。

 

作者:netoxi
出处:
本文版权归作者和博客园共有,欢迎转载,未经同意须保留此段声明,且在文章页面明显位置给出原文连接。欢迎指正与交流。

 

目录 · Linux网络IO模型 · 文件描述符 · 阻塞IO模型 · 非阻塞IO模型 · IO复用模型 · 信号驱动IO模型...

本文由威尼斯手机平台发布于威尼斯官网,转载请注明出处:威尼斯官网有类似 Go 语言的协程操作方式,(文/开源中国)    

上一篇:没有了 下一篇:没有了
猜你喜欢
热门排行
精彩图文