Many data systems use polling refresh to display lists, which can cause a delay in updating content status and cannot immediately provide feedback to users on the page. Shortening the refresh time interval on the client side can lead to an excessive load on the server, which should be avoided.

To solve this problem, this article proposes an event subscription mechanism. This mechanism provides real-time updates to the client, eliminating the need for polling refresh and improving the user experience.

Terminologies and Context

This article introduces the following concepts:

  • Hub: An event aggregation center that receives events from producers and sends them to subscribers.
  • Buffer: An event buffer that caches events from producers and waits for the Hub to dispatch them to subscribers.
  • Filter: An event filter that only sends events meeting specified conditions to subscribers.
  • Broadcast: An event broadcaster that broadcasts the producer's events to all subscribers.
  • Observer: An event observer that allows subscribers to receive events through observers.

The document discusses some common concepts such as:

  • Pub-Sub pattern: It is a messaging pattern where the sender (publisher) does not send messages directly to specific recipients (subscribers). Instead, published messages are divided into different categories without needing to know which subscribers (if any) exist. Similarly, subscribers can express interest in one or more categories and receive all messages related to that category, without the publisher needing to know which subscribers (if any) exist.
  • Filter:
    • Topic-based content filtering mode is based on topic filtering events. Producers publish events to one or more topics, and subscribers can subscribe to one or more topics. Only events that match the subscribed topics will be sent to subscribers. However, when a terminal client subscribes directly, this method has too broad a subscription range and is not suitable for a common hierarchical structure.
    • Content-based content filtering mode is based on message content filtering events. Producers publish events to one or more topics, and subscribers can use filters to subscribe to one or more topics. Only events that match the subscribed topics will be sent to subscribers. This method is suitable for a common hierarchical structure.

Functional Requirements

  • Client users can subscribe to events through gRPC Stream, WebSocket, or ServerSentEvent.
  • Whenever a record's status changes (e.g. when the record is updated by an automation task) or when other collaborators operate on the same record simultaneously, an event will be triggered and pushed to the message center.
  • Events will be filtered using content filtering mode, ensuring that only events that meet the specified conditions are sent to subscribers.

Architecture

flowchart TD
  Hub([Hub])
  Buffer0[\"Buffer drop oldest"/]
  Buffer1[\"Buffer1 drop oldest"/]
  Buffer2[\"Buffer2 drop oldest"/]
  Buffer3[\"Buffer3 drop oldest"/]
  Filter1[\"File(Record = 111)"/]
  Filter2[\"Workflow(Project = 222)"/]
  Filter3[\"File(Project = 333)"/]
  Broadcast((Broadcast))
  Client1(Client1)
  Client2(Client2)
  Client3(Client3)
  Hub --> Buffer0
  subgraph Server
    Buffer0 --> Broadcast
    Broadcast --> Filter1 --> Buffer1 --> Observer1
    Broadcast --> Filter2 --> Buffer2 --> Observer2
    Broadcast --> Filter3 --> Buffer3 --> Observer3
  end
  subgraph Clients
    Observer1 -.-> Client1
    Observer2 -.-> Client2
    Observer3 -.-> Client3
  end

High-Level Overview

flowchart TD
  Pipe#a[[...Pipe...]]
  Pipe#b[[...Pipe...]]
  subgraph Hub
  direction LR
  Event1((Event1))
	Event2((Event2))
  Event3((Event3))
  Event4((Event4))
  Event5((Event5))
  Event6((Event6))
  Event7((Event7))
  Event8((Event8))
  Event1 -.-> Event2 -.-> Event3 -.-> Event4 -.-> Event5 -.-> Event6 -.-> Event7 -.-> Event8
  end
	Pipe#a -.-> Event1
  Event8 -.-> Pipe#b
  subgraph Client1
  direction LR
  C1Subscribe((Start))
  C1Cancel((End))
  Event2 -.-> C1Listen2
  Event3 -.-> C1Listen3
  Event4 -.-> C1Listen4
  Event5 -.-> C1Listen5
  C1Subscribe -.-> C1Listen2 -.-> C1Listen3 -.-> C1Listen4 -.-> C1Listen5 -.-> C1Cancel
  end
  subgraph Client2
  direction LR
  C2Subscribe((Start))
  C2Cancel((End))
  Lag(("❌"))
  C2Subscribe -.-> C2Listen1 -- "Poor Network" ---> Lag --"Packet loss"---> C2Listen5 -.-> C2Listen6 -.-> C2Listen7 -.-> C2Listen8 -.-> C2Cancel
  Event1 -.-> C2Listen1
  Event5 -.-> C2Listen5
  Event6 -.-> C2Listen6
  Event7 -.-> C2Listen7
  Event8 -.-> C2Listen8
  end

Clients should follow these steps:

  • Upon entering the page, subscribe as necessary.
  • After listening to the change event, debounce and re-request the list interface, and then render it.
  • When leaving the page, cancel the subscription.

Servers should follow these steps:

  • Subscribe to push events based on the client's filter.
  • When the client's backlog message becomes too heavy, delete the oldest message from the buffer.
  • When the client cancels the subscription, the server should also cancel the broadcast to the client.

Application / Component Level Design (LLD)

flowchart LR
  Server([Server])
  Client([Client: Web...])
  MQ[Kafka or other]
  Broadcast((Broadcast))
  subgraph ExternalHub
    direction LR
    Receiver --> MQ --> Sender
  end
  subgraph InMemoryHub
    direction LR
    Emit -.-> OnEach
  end
  Server -.-> Emit
  Sender --> Broadcast
  OnEach -.-> Broadcast
  Broadcast -.-> gRPC
  Broadcast -.-> gRPC
  Broadcast -.-> gRPC
  Server --  "if horizon scale is needed" --> Receiver
  gRPC --Stream--> Client

For a single-node server, a simple Hub can be implemented using an in-memory queue.

For multi-node servers, an external Hub implementation such as Kafka, MQ, or Knative eventing should be considered. The broadcasting logic is no different from that of a single machine.

Failure Modes

Fast Producer-Slow Consumer

This is a common scenario that requires special attention. The publish-subscribe mechanism for terminal clients cannot always expect clients to consume messages in real time. However, message continuity must be maximally guaranteed. Clients may access our products in an uncontrollable network environment, such as over 4G or poor Wi-Fi. Thus, the server message queue cannot become too backlogged. When a client's consumption rate cannot keep up with the server's production speed, this article recommends using a bounded Buffer with the OverflowStrategy.DropOldest strategy. This ensures that subscriptions between consumers are isolated, avoiding too many unpushed messages on the server (which could lead to potential memory leak risks).

Alternative Design

VMware has publish a very similar design in 2013, but use Go RingChannel

Summary

This document proposes an event subscription mechanism to address the delay in updating content status caused by polling refresh. Clients can subscribe to events through any long connection protocol, and events will be filtered based on specified conditions. To avoid having too many unpushed messages on the server, a bounded buffer with the OverflowStrategy.DropOldest strategy is used.

Implementing this in Reactive Streams is straightforward, but you can choose your preferred technology to do so.

Overview

In the previous post, we discussed how to implement a file tree in PostgreSQL using ltree. Now, let's talk about how to integrate version control management for the file tree.

Version control is a process for managing changes made to a file tree over time. This allows for the tracking of its history and the ability to revert to previous versions, making it an essential tool for file management.

With version control, users have access to the most up-to-date version of a file, and changes are tracked and documented in a systematic manner. This ensures that there is a clear record of what has been done, making it much easier to manage files and their versions.

Terminologies and Context

One flawed implementation involves storing all file metadata for every commit, including files that have not changed but are recorded as NO_CHANGE. However, this approach has a significant problem.

The problem with the simple and naive implementation of storing all file metadata for every commit is that it leads to significant write amplification, as even files that have not changed are recorded as NO_CHANGE. One way to address this is to avoid storing NO_CHANGE transformations when creating new versions, which can significantly reduce the write amplification.

This is good for querying, but bad for writing. When we need to fetch a specific version, the PostgreSQL engine only needs to scan the index with the condition file.version = ?. This is a very cheap cost in modern database systems. However, when a new version needs to be created, the engine must write \(N\) rows of records into the log table (where \(N\) is the number of current files). This will cause a write peak in the database and is unacceptable.

In theory, all we need to do is write the changed file. If we can find a way to fetch an arbitrary version of the file tree in \(O(log(n))\) time, we can reduce unnecessary write amplification.

Non Functional Requirements

Scalability

Consider the worst-case scenario: a file tree with more than 1,000 files that is committed to more than 10,000 times. The scariest possibility is that every commit changes all files, causing a decrease in write performance compared to the efficient implementation. Storing more than 10 million rows in a single table can make it difficult to separate them into partitioned tables.

Suppose \(N\) is the number of files, and \(M\) is the number of commits. We need to ensure that the time complexity of fetching a snapshot of an arbitrary version is less than \(O(N\cdot log(M))\). This is theoretically possible.

Latency

In the worst case, the query can still respond in less than 100ms.

Architecture

Database Design

Illustration of data structures.

Illustration of data structures.

Tech Details

Subqueries appearing in FROM can be preceded by the key word LATERAL. This allows them to reference columns provided by preceding FROM items. (Without LATERAL, each subquery is evaluated independently and so cannot cross-reference any other FROM item.) — https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-LATERAL

PostgreSQL has a keyword called LATERAL. This keyword can be used in a join table to enable the use of an outside table in a WHERE condition. By doing so, we can directly tell the query optimizer how to use the index. Since data in a combined index is stored in an ordered tree, finding the maximum value or any arbitrarily value has a time complexity of \(O(log(n))\).

Finally, we obtain a time complexity of \(O(N \cdot log(M))\).

Performance

Result: Fetching an arbitrary version will be done in tens of milliseconds.

1
2
3
4
5
6
7
8
9
10
11
12
13
explain analyse
select f.record_id, f.filename, latest.revision_id
from files f
inner join lateral (
select *
from file_logs fl
where f.filename = fl.filename
and f.record_id = fl.record_id
-- and revision_id < 20000
order by revision_id desc
limit 1
) as latest
on f.record_id = 'f5c2049f-5a32-44f5-b0cc-b7e0531bf706';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Nested Loop  (cost=0.86..979.71 rows=1445 width=50) (actual time=0.040..18.297 rows=1445 loops=1)
-> Index Only Scan using files_pkey on files f (cost=0.29..89.58 rows=1445 width=46) (actual time=0.019..0.174 rows=1445 loops=1)
Index Cond: (record_id = 'f5c2049f-5a32-44f5-b0cc-b7e0531bf706'::uuid)
Heap Fetches: 0
-> Memoize (cost=0.57..0.65 rows=1 width=4) (actual time=0.012..0.012 rows=1 loops=1445)
" Cache Key: f.filename, f.record_id"
Cache Mode: binary
Hits: 0 Misses: 1445 Evictions: 0 Overflows: 0 Memory Usage: 221kB
-> Subquery Scan on latest (cost=0.56..0.64 rows=1 width=4) (actual time=0.012..0.012 rows=1 loops=1445)
-> Limit (cost=0.56..0.63 rows=1 width=852) (actual time=0.012..0.012 rows=1 loops=1445)
-> Index Only Scan Backward using file_logs_pk on file_logs fl (cost=0.56..11.72 rows=158 width=852) (actual time=0.011..0.011 rows=1 loops=1445)
Index Cond: ((record_id = f.record_id) AND (filename = (f.filename)::text))
Heap Fetches: 0
Planning Time: 0.117 ms
Execution Time: 18.384 ms

Test Datasets

This dataset simulates the worst-case scenario of a table with 14.6 million rows. Specifically, it contains 14.45 million rows representing a situation in which 1,400 files are changed 10,000 times.

1
2
3
4
5
-- cnt: 14605858
select count(0) from file_logs;
-- cnt: 14451538
select count(0) from file_logs where record_id = 'f5c2049f-5a32-44f5-b0cc-b7e0531bf706';

Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
create table public.file_logs
(
file_key ltree not null,
revision_id integer not null,
record_id uuid not null,
filename varchar(2048) not null,
create_time timestamp,
update_time timestamp,
delete_time timestamp,
blob_sha256 char(64),
constraint file_logs_pk
primary key (record_id, filename, revision_id)
);

alter table public.file_logs
owner to postgres;

create table public.files
(
record_id uuid not null,
filename varchar(2048) not null,
create_at timestamp not null,
primary key (record_id, filename)
);

alter table public.files
owner to postgres;

Further Improvements

We can implement this using an intuitive approach in a graph database.

File tree version in graph database

Background

A file tree is a hierarchical structure used to organize files and directories on a computer. It allows users to easily navigate and access their files and folders, and is commonly used in operating systems and file management software.

But implementing file trees in traditional RDBMS like MySQL can be a challenge due to the lack of support for hierarchical data structures. However, there are workarounds such as using nested sets or materialized path approaches. Alternatively, you could consider using NoSQL databases like MongoDB or document-oriented databases like Couchbase, which have built-in support for hierarchical data structures.

It is possible to implement a file tree in PostgreSQL using the ltree datatype provided by PostgreSQL. This datatype can help us build the hierarchy within the database.

TL;DR

Pros

  • Excellent performance!
  • No migration is needed for this, as no new columns will be added. Only a new expression index needs to be created.

Cons

  • Need additional mechanism to create virtual folder entities.(only if you need to show the folder level)
  • There are limitations on the file/folder name length.(especially in non-ASCII characters)

Limitation

The maximum length for a file or directory name is limited, and in the worst case scenario where non-ASCII characters(Chinese) and alphabets are interlaced, it can not be longer than 33 characters. Even if all the characters are Chinese, the name can not exceed 62 characters in length.

Based on PostgreSQL documentation, the label path can not exceed 65535 labels. However, in most cases, this limit should be sufficient and it is unlikely that you would need to nest directories to such a deep level.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
select escape_filename_for_ltree(
'一0二0三0四0五0六0七0八0九0十0' ||
'一0二0三0四0五0六0七0八0九0十0' ||
'一0二0三0四0五0六0七0八0九0十0' ||
'一0二0'
); -- worst case len 34
select escape_filename_for_ltree(
'一二三四五六七八九十' ||
'一二三四五六七八九十' ||
'一二三四五六七八九十' ||
'一二三四五六七八九十' ||
'一二三四五六七八九十' ||
'一二三四五六七八九十' ||
'一二三'
); -- Chinese case len 63
1
[42622] ERROR: label string is too long Detail: Label length is 259, must be at most 255, at character 260. Where: PL/pgSQL function escape_filename_for_ltree(text) line 5 at SQL statement

How to use

Build expression index

1
CREATE INDEX idx_file_tree_filename ON files using gist (escape_filename_for_ltree(filename));

Example Query

1
2
3
4
5
explain analyse
select filename
from files
where escape_filename_for_ltree(filename) ~ 'ow.*{1}'
and record_id = '1666bad1-202c-496e-bb0e-9664ce3febcb';

Query Result

1
2
3
4
5
ow/ros_00000000_2022-03-02-12-55-19_330.bag
ow/ros_00011426_2022-08-15-19-24-11_0.bag
ow/ros_00019378_2022-08-12-18-40-06_0.bag
ow/ros_00011426_2022-08-15-19-24-11_0.bag
ow/ros_00011426_2022-08-15-19-24-11_0.bag.coscene-reserved-index

Query Explain

1
2
3
4
5
6
7
8
9
10
11
Bitmap Heap Scan on files  (cost=32.12..36.38 rows=1 width=28) (actual time=0.341..0.355 rows=8 loops=1)
Recheck Cond: ((record_id = '1666bad1-202c-496e-bb0e-9664ce3febcb'::uuid) AND (escape_filename_for_ltree((filename)::text) <@ 'ow'::ltree))
Heap Blocks: exact=3
-> BitmapAnd (cost=32.12..32.12 rows=1 width=0) (actual time=0.323..0.324 rows=0 loops=1)
-> Bitmap Index Scan on idx_file_tree_record_id (cost=0.00..4.99 rows=93 width=0) (actual time=0.051..0.051 rows=100 loops=1)
Index Cond: (record_id = '1666bad1-202c-496e-bb0e-9664ce3febcb'::uuid)
-> Bitmap Index Scan on idx_file_tree_filename (cost=0.00..26.88 rows=347 width=0) (actual time=0.253..0.253 rows=52 loops=1)
Index Cond: (escape_filename_for_ltree((filename)::text) <@ 'ow'::ltree)
Planning Time: 0.910 ms
Execution Time: 0.599 ms

Explaination

PostgreSQL's LTREE data type allows you to use a sequence of alphanumeric characters and underscores on the label, with a maximum length of 256 characters. So, we get a special character underscore that can be used as a notation to build our escape rules within the label.

Slashes(/) will be replaced with dots(.). I think it does not require further explanation.

Initially, I attempted to encode all non-alphabetic characters into their Unicode hex format. However, after receiving advice from other guys, I discovered that using base64 encoding can be more efficient in terms of information entropy. Ultimately, I decided to use base62 encoding instead to ensure that no illegal characters are produced and to achieve the maximum possible information entropy.

This is the final representation of the physical data that will be stored in the index of PostgreSQL.

1
2
3
4
select escape_filename_for_ltree('root/folder1/机器人仿真gazebo11-noetic集成ROS1/state.log');
-- result:
-- root.folder1._1hOBTVt5n7EhFWzIbUcjT_gazebo11_j_noetic_1Aw3qhY48_ROS1.state_k_log

Further

If you want to store an isolated file tree in the same table, one thing you need to do is prepend the isolation key as the first label of the ltree. For example:

1
select escape_filename_for_ltree('<put_user_id_in_there>' || '/' || '<path_to_file>');

By doing this, you will get the best query performance.

Summary

This document explains how to implement a file tree in PostgreSQL using the ltree datatype. The ltree datatype can help build the hierarchy within the database, and an expression index needs to be created. There are some limitations on the file/folder name length, but the performance is excellent. The document also provides PostgreSQL functions for escaping and encoding file/folder names.

Appendix: PostgreSQL Functions

Entry function (immutable is required)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE OR REPLACE FUNCTION escape_filename_for_ltree(filename TEXT)
RETURNS ltree AS
$$
DECLARE
escaped_path ltree;
BEGIN
select string_agg(escape_part(part), '.')
into escaped_path
from (select regexp_split_to_table as part
from regexp_split_to_table(filename, '/')) as parts;

return escaped_path;

END;
$$ LANGUAGE plpgsql IMMUTABLE;

Util: Escape every part (folder or file)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
create or replace function escape_part(part text) returns text as
$$
declare
escaped_part text;
begin
select string_agg(escaped, '')
into escaped_part
from (select case substring(sep, 1, 1) ~ '[0-9a-zA-Z]'
when true then sep
else '_' || base62_encode(sep) || '_'
end as escaped
from (select split_string_by_alpha as sep
from split_string_by_alpha(part)) as split) as escape;
RETURN escaped_part;
end;
$$ language plpgsql immutable

Util: Split a string into groups

Each group contains only alphabetic characters or non-alphabetic characters.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
CREATE OR REPLACE FUNCTION split_string_by_alpha(input_str TEXT) RETURNS SETOF TEXT AS
$$
DECLARE
split_str TEXT;
BEGIN
IF input_str IS NULL OR input_str = '' THEN
RETURN;
END IF;

WHILE input_str != ''
LOOP
split_str := substring(input_str from '[^0-9a-zA-Z]+|[0-9a-zA-Z]+');
IF split_str != '' THEN
RETURN NEXT split_str;
END IF;
input_str := substring(input_str from length(split_str) + 1);
END LOOP;

RETURN;
END;
$$ LANGUAGE plpgsql

Util: base62 encode function

By using the base62_encode function, we can create a string that meets the requirements of LTREE and achieves maximum information entropy.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CREATE OR REPLACE FUNCTION base62_encode(data TEXT) RETURNS TEXT AS $$
DECLARE
ALPHABET CHAR(62)[] := ARRAY[
'0','1','2','3','4','5','6','7','8','9',
'A','B','C','D','E','F','G','H','I','J',
'K','L','M','N','O','P','Q','R','S','T',
'U','V','W','X','Y','Z','a','b','c','d',
'e','f','g','h','i','j','k','l','m','n',
'o','p','q','r','s','t','u','v','w','x',
'y','z'
];
BASE BIGINT := 62;
result TEXT := '';
val numeric := 0;
bytes bytea := data::bytea;
len INT := length(data::bytea);
BEGIN
FOR i IN 0..(len - 1) LOOP
val := (val * 256) + get_byte(bytes, i);
END LOOP;

WHILE val > 0 LOOP
result := ALPHABET[val % BASE + 1] || result;
val := floor(val / BASE);
END LOOP;

RETURN result;
END;
$$ LANGUAGE plpgsql;

起因

这个月(2022年8月)于Rust二群与某人辩论,因为某人坚持认为 Rust 的所有权 Ownership 机制仅仅是等同于垃圾回收 Garbage Collection ,而我认为 Ownership 还解决了另一个困扰无数码农的问题:资源安全

定义

常见资源可以分为三大类:

  • 文件
    • Socket
      • TCP
      • HTTP
      • JDBC
    • 文件系统
    • 本地
    • 远程(Redis,RDBMS)
  • 逻辑资源
    • Stream(背后可能是文件)
    • 日志区块或长链接起止符
    • 临时文件删除

而资源管理总共分三步,分别是:

  1. 资源申请
  2. 资源使用
  3. 资源释放

这三个事件需要严格按顺序发生。

而资源安全关注的是:

  • 在使用前已经正确初始化
  • 使用后能被正确释放
  • 释放后不能被再次使用

语言(语法)提供的资源管理有什么用呢?它给程序员一个强有力的保证,非极端情况下(断电等),资源释放逻辑必定会被执行。

资源管理的历史

史前

C

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <stdlib.h>

int main()
{
int num;
FILE *fptr;

if ((fptr = fopen("C:\\program.txt","r")) == NULL){
printf("Error! opening file");

// Program exits if the file pointer returns NULL.
exit(1);
}

fscanf(fptr,"%d", &num);

printf("Value of n=%d", num);
fclose(fptr);

return 0;
}

早期的计算机语言并没有意识到资源管理问题,依然是指令式编码风格,需要程序员自己保证资源的申请与释放。同时那个年代软件系统并不复杂,再加上从业者素质普遍较高,所以资源管理问题不像今天这么突出。

语言结构(Language constructs)

后来,有些语言引入了异常机制,允许程序无视控制流语句自行中断并跳出,资源管理也变得复杂起来。支持抛出异常的语言通常使用 try {} catch {} finally {} 约束资源作用范围, try 关键字表示资源作用范围,无论程序以任何形式跳出, finally 关键字标记代码块都应当被确保执行,资源正确释放,代表语法:

1
2
3
4
5
6
7
8
FileReader fr = new FileReader(path);
BufferedReader br = new BufferedReader(fr);
try {
return br.readLine();
} finally {
br.close();
fr.close();
}

销毁模式(Dispose pattern)

这个模式也是当今大部分有 GC 语言所支持的,比如 Java 语法关键字 try

据我所知 Java 所谓的资源安全只有 Java7 时代引入的 try-with-resource,资源需继承自AutoCloseable,可以在 try(...) { ... code block ... } 内放心使用,也就意味着异步代码没有任何保障。

1
2
3
4
5
6
static String readFirstLineFromFile(String path) throws IOException {
try (FileReader fr = new FileReader(path);
BufferedReader br = new BufferedReader(fr)) {
return br.readLine();
}
}

Resource Monad 模式

后续发展中诞生的较为安全的设计模式,将使用资源的同步或异步代码包裹在一个代码块中,使用结束后释放,这样可以避免在每次使用资源后手动关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static <R> CompletionStage<R> use (Function<Resource, CompletionStage<R>> f) {
return Resource.make()
.thenCompose((res) -> f.apply(res)
.handle((r, e) -> {
res.close();
if (e != null) throw new RuntimeException(e);
return r;
})
);
}

use((res) -> {
System.out.println(res);
// 将业务逻辑编写到 CompletableFuture 内部执行
return CompletableFuture.failedStage(new Exception("error"));
}).handle((r, e) -> {
if (e != null) {
System.out.println(e.getMessage());
}
return r;
});

以上方案都有一个缺陷,在使用过程中误将资源变量共享给其他代码段(闭包,回调,外部变量,无意中发送给队列 HTTP Response,例如如下 Java 代码。

1
2
3
4
5
6
static HttpEntity fileEntity(String filename) throws IOException {
try (FileReader fr = new FileReader(path);
BufferedReader br = new BufferedReader(fr)) {
return new HttpEntity(br);
}
}

如果 HttpEntity 类并不是在创建时消费 Reader ,而是会等待 HTTP Body 传输时才开始读取字节流,那毫无疑问,这会造成访问已关闭资源,可能引起应用程序崩溃。

正确的写法如下

1
2
3
4
5
6
7
8
9
10
11

// 伪代码
static HttpEntity fileEntity(String filename) throws IOException {
final FileReader fr = new FileReader(path);
final BufferedReader br = new BufferedReader(fr);
return new HttpEntity(br) {
public void close() {
br.close();
fr.close();
}
};

RAII

C++

Move semantics make it possible to safely transfer resource ownership between objects, across scopes, and in and out of threads, while maintaining resource safety. — (since C++11)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void f()
{
vector<string> vs(100); // not std::vector: valid() added
if (!vs.valid()) {
// handle error or exit
}

ifstream fs("foo"); // not std::ifstream: valid() added
if (!fs.valid()) {
// handle error or exit
}

// ...
} // destructors clean up as usual

C++ 提出了 RAII 这一先进概念,几乎解决了资源安全问题。但是受限于 C++ 诞生年代,早期 C++ 为了保证资源安全,只支持左值引用(LValue Reference) + Clone(Deep Copy) 语义,使得赋值操作会频繁深拷贝整个对象与频繁构造/析构资源,浪费了很多操作。C++11 开始支持右值引用,但是仍然需要实现右值引用(RValue Reference)的 Move(Shallow Copy)。同时,C++ 无法检查多次 move 的问题和 move 后原始变量仍然可用的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
using namespace std;

class A{
public:
A(const string& str, int* arr):_str(str),_arr(arr){cout << "parameter ctor" << endl;}
A(A&& obj):_str(std::move(obj._str)),_arr(std::move(obj._arr)){obj._arr = nullptr;cout << "move ctor" << endl;}
A& operator =(A&& rhs){
_str = std::move(rhs._str);
_arr = std::move(rhs._arr);
rhs._arr = nullptr;
cout << "move assignment operation" << endl;
return *this;
}
void print(){
cout << _str << endl;
}
~A(){
delete[] _arr;
cout << "dtor" << endl;
}
private:
string _str;
int* _arr;
};

int main(){
int* arr = new int[6] {1,1,4,5,1,4};
A a("Yajuu Senpai", std::move(arr)); // 错误的指针移动 --> STUPID MOVE!!
A b(std::move(a)); // move ctor

cout << "print a: ";
a.print(); // a 失去所有权 --> CORRECT!!
cout << "print b: ";
b.print(); // b 获得所有权 --> CORRECT!!

b = std::move(a); // 二次移动

cout << "print a: ";
a.print(); // ???
cout << "print b: ";
b.print(); // ???
}

Rust

继承自 C++ RAII ,当创建资源和使用资源不在同一个领域时,Rust 的 move / borrow 依然可以安心睡觉,这种语言级别的保证让我一个写 Scala 的看了都羡慕。

在Rust 中move 给别人就是别人负责 dropborrow 给别人还是自己负责 drop,且编译器会根据生命周期检查,确保不会发生多次 move,也不会有超出拥有者(owner)的借用(&borrow)发生。责任划分很清晰,只要自己脑子清醒,完全不担心异步的时候会泄漏。

程序员无法显式 delete,只能遵守 Rust 语法,编译器依据变量生命周期将相关变量的 drop 插入到正确位置,通常是离开块级作用域的位置。

Rust Drop

https://doc.rust-lang.org/rust-by-example/scope/raii.html

同步示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct Entity {
connection: &Connection
file: File
}

impl Drop for Entity {
fn drop(&mut self) {
file.write(EOF); // <- 文件关闭前写入终止符
}
}

let conn = makeConnection();
let file = openFile();
let entity = Entity {
connection: &conn, // <- 将 conn 借用给 entity
file: file // <- 将 file 所有权转移给 entity
} // <- 从此以后,file 将不可被访问
fn send(entity: Entity) {
// logic
return;
// <- 编译器会在此处插入释放 entity
}
send(entity) // <- 将 entity 所有权移交给 send 函数
// <- 编译器会在此处插入释放 conn
// <- 因 entity 与 file 所有权已转移,此处不会重复释放 entity 与 file

异步示例如下:

1
2
3
4
5
6
7
8
9
fn move_block() -> impl Future<Output = ()> {
let my_string = "foo".to_string();
async move { // <- my_string 被 move 到此 block scope
// ...
println!("{}", my_string); // <- my_string 在此处可以见
// <- 编译器将在此处插入 my_string drop 代码
}
// <- my_string 将不再可见,本函数失去 my_string 所有权,不再插入 drop 代码
}

闭包捕获造成的资源逃逸与将引用赋值给类/结构体帮助资源逃逸在语言本质上是同一个问题,所以 Rust 可以用相同的方法来处理它们。

总结

理论上来说垃圾回收器(Garbage collector)无法解决资源安全问题,可能有人会认为:

“给 Java 语言添加 Destructor ,这样开发者就可以在析构函数中实现资源释放逻辑,交给 GC 在回收内存时自动调用 destory()/dispose() ,问题不就解决了吗?“

实际上这条路是走不通的,GC 根据算法不同,所参考的策略也不同,其收集(collect)/释放(free)动作必定会执行,但没有保证什么时候会执行,以什么顺序执行。因为考察 GC 性能指标时,更关注的是吞吐量而不是回收实时性,如果内存没有压力,GC 倾向于不回收。

这一行为可能会导致意想不到的后果,比如业务逻辑 A 结束时,文件资源对象的引用计数已经为零,通知下一个逻辑 B 可以处理此文件,而 B 尝试打开文件时却发现文件不完整,因为关闭文件的系统调用尚未被 GC 执行😵。

Rust 给出了一套完善的解决方案,它不仅解决了诸多内存安全问题,还顺带解决了资源安全问题。基于所有权机制和严格的编译器检查,强迫程序员写出资源安全的代码,仅需要程序员正确实现 impl Drop for [...]

我认为 Rust 实现的所有权与 RAII 是当下最完善的资源管理机制。

  • try-catch-finally 相比,不需要在每次使用资源时,都格外小心是否双重释放(double delete 这在 Java 中是个很常见且令人头痛的问题)。
  • ResourceMonad 相比,不会产生资源逃逸。
  • 是一门全新的语言,不像 C++ 一样有沉重的历史包袱。

即使不使用 Rust 编码,依然可以借鉴它的思想,因为其语法本身就是资源管理的最佳实践,学习它可以帮助自己在其他语言中避开错误的写法。

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

Goals

  • Sending event message by http post
  • Recording success/error response from third-party reply.
  • It should re-send message after third-party reply error or down.

Features

  • Delivery (HTTP POST)
  • Success / Error Record
  • Re-send in the case of third-party failure.
  • Run in multi-process environment (e.g. Kubernetes)
  • Metrics of backlog message
  • Policy of record automatic archive
  • Prevent sent too fequencly after massive event generated.(throttle)

Results

Foreword

On April 17, 2020, I start a huge project to migrate the Future[T] from legacy playframework project to ZIO. Three months later, after tens of thousands of lines of code modification, the migration was a complete success. In September, I used this experience as the background to share in the Chinese Scala Meetup community with the title "Introduction to ZIO". In this share, I explained a fanatic imageation, 在这次分享中,我展望了一个美好的愿景,借由 ZIO 的类型参数 R 提供的抽象能力来实现代码的可移植性,和提高可测试性。但想在遗留项目中实现这个愿景并不容易,主要挑战来自遗留项目的代码耦合,和开发者的思维惯性。如今,这个愿景已经达成。我会在这篇 Post 中与你分享我的进化之路。

What is R

A ZIO[R, E, A] value is an immutable value that lazily describes a workflow or job. The workflow requires some environment R, and may fail with an error of type E, or succeed with a value of type A.

上面这段来自 ZIO 源码中的注释,其中指出 R 是 workflow 所需要的环境参数的类型。我们可以借助这个环境参数,来抽象出过程对环境的依赖。 这听起来非常像依赖注入,实际上确实如此。不同的是,常见的控制反转框架,注入入口都是业务逻辑类的成员以及其构造函数(比如:spring autowireguicemacwire 等);而 ZIO 的做法是,在运行时提供环对象的实例。

Example:

Spring @Autowired: Inject the dependent instance into the object as a member of the class

1
2
3
4
5
6
7
8
9
10
11
public class MovieRecommender {

private final CustomerPreferenceDao customerPreferenceDao;

@Autowired
public MovieRecommender(CustomerPreferenceDao customerPreferenceDao) {
this.customerPreferenceDao = customerPreferenceDao;
}

// ...
}
ZIO R: Treat the instance as part of the environment, and provide it on demand at runtime
1
2
3
4
5
6
7
object MovieRecommender {
def recommend(): ZIO[CustomerPreferenceDao, Throwable, RecommendResult] = {
...
}

// ...
}

What are the problems caused by DI tools?

I want to ask the reader a question: How much does it cost to test a small feature in your software system?

See: Negative example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package meetup.di.legacy

import meetup.di.legacy.member.{Accountant, Baker, CandyMaker, Cashier, Decorator, HunanChef, Logistic, Manager, Security, SichuanChef, Waiter, Washer}
import meetup.di.legacy.tools.{Mixer, Oven, PipingTip, Turntable}
import meetup.di.legacy.utils.Demo

object djx314 extends App with Demo {
val mixer = new Mixer()
val oven = new Oven()
val pipingTip = new PipingTip()
val turntable = new Turntable()
val baker = new Baker(oven, mixer)
val decorator = new Decorator(mixer, pipingTip, turntable)
val candyMaker = new CandyMaker
val scChef = new SichuanChef
val hnChef = new HunanChef
val cashier = new Cashier
val waiter = new Waiter
val washer = new Washer
val logistic = new Logistic
val security = new Security
val accountant = new Accountant
val manager = new Manager
val cs = new CakeShop(
baker, decorator, candyMaker,
scChef, hnChef, cashier,
waiter, washer, logistic,
security, accountant, manager
)

cs.simpleCake()
.onComplete(println)

}

I just want to test a small part of the functions, why do I need to construct all dependent instances. Why do I have to do so much preparation to test this simple function. Because it belongs to the method of the class, and this class has too many construction parameters, these construction parameters are unnecessary for the function we want to test

若想让工程代码最大程度上可移植、可测试,一个简单易行的方法是:不要在类中编写与对象无关(no use this)的函数,将他们移动到 object 中(in java: mark method static)。同时,编写引用透明的代码对达成这一目标有正面作用。

但是,在遗留项目中实现这一点有些困难, 因为大多数开发者都把依赖注入框架错用了,就像上面的反面教材一样。Even Spring contributors made the same mistake. See: Spring's sample project

Too many irrelevant dependencies have brought huge obstacles to the portability of codes, turning them into ad hoc codes that are difficult to test.

The whole system is like a balls made up of strings and knots. Software System like a ball made up of strings and knots

Current Situation

社区中的方案... zio layer 每次 unsafeRun 都会重新生成,这很纯函数式,但这不适合 web 服务。例如连接池

遇到的问题 * 连接池 * 信号量

所以,我只能自己尝试:

Evolution Stage 1

ZioController + PlayRunner

Evolution Stage 2

ZController + ZRunner

1
2
3
```

```scala

Evolution Stage 3

ZController + Runtime.Managed

1
2
3
4
5
6
7
8
9
10
11
12
13
trait ZController[Z, R[_], B] {
def runtime: Runtime.Managed[Z]

implicit class ActionBuilderOps(actionBuilder: ActionBuilder[R, B]) {
def zio[E](zioActionBody: => ZIO[Z, Throwable, Result]): Action[AnyContent] = actionBuilder.async {
runtime.unsafeRunToFuture(zioActionBody.resurrect)
}

def zio[E](zioActionBody: R[B] => ZIO[Z, Throwable, Result]): Action[B] = actionBuilder.async { req =>
runtime.unsafeRunToFuture(zioActionBody(req).resurrect)
}
}
}

1
2
3
4
/**
* A runtime that can be shutdown to release resources allocated to it.
*/
abstract class Managed[+R] extends Runtime[R] { /* ... */ }

使用起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object ExampleController {
case class Config(endpoint: String)
def flow(endpoint: String, url: String, body: RawBuffer): ZIO[Has[WSClient], Throwable, String] = ???
}

class ExampleController(cc: ControllerComponents,
config: ExampleController.Config,
MAction: MemberAction)
(implicit val runtime: Runtime.Managed[Has[WSClient]],
val ec: ExecutionContext)
extends AbstractController(cc) with ZController[Has[WSClient], MemberRequest, RawBuffer] {

def handle(url: String): Action[RawBuffer] = MAction(parse.raw) zio { request: MemberRequest[RawBuffer] =>
flow(config.endpoint, url, request.body)
.map(name => Ok(name))
.mapError(e => InternalServerError("Oh no!\n" + e.getMessage))
.merge
}
}

Conclusion

References

Dependency Injection Trade-offs

0%