support sni filter

This commit is contained in:
lulin 2024-03-11 19:28:06 +08:00
parent 33f528a72d
commit 1f6679ccdb
12 changed files with 981 additions and 44 deletions

357
Cargo.lock generated
View File

@ -65,12 +65,30 @@ version = "1.0.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1"
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -93,6 +111,47 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "defmt"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3939552907426de152b3c2c6f51ed53f98f448babd26f28694c95f5906194595"
dependencies = [
"bitflags 1.3.2",
"defmt-macros",
]
[[package]]
name = "defmt-macros"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18bdc7a7b92ac413e19e95240e75d3a73a8d8e78aa24a594c22cbb4d44b4bbda"
dependencies = [
"defmt-parser",
"proc-macro-error",
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "defmt-parser"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff4a5fefe330e8d7f31b16a318f9ce81000d8e35e69b93eae154d16d2278f70f"
dependencies = [
"thiserror",
]
[[package]]
name = "enum_primitive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180"
dependencies = [
"num-traits 0.1.43",
]
[[package]]
name = "env_filter"
version = "0.1.0"
@ -116,12 +175,42 @@ dependencies = [
"log",
]
[[package]]
name = "getrandom"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "hash32"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
dependencies = [
"byteorder",
]
[[package]]
name = "heapless"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad"
dependencies = [
"hash32",
"stable_deref_trait",
]
[[package]]
name = "humantime"
version = "2.1.0"
@ -169,12 +258,24 @@ version = "0.11.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9106e1d747ffd48e6be5bb2d97fa706ed25b144fbee4d5c02eae110cd8d6badd"
[[package]]
name = "managed"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d"
[[package]]
name = "memchr"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "nfq"
version = "0.2.5"
@ -190,7 +291,7 @@ version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags",
"bitflags 2.4.2",
"cfg-if",
"libc",
]
@ -201,6 +302,94 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65"
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nom-derive"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff943d68b88d0b87a6e0d58615e8fa07f9fd5a1319fa0a72efc1f62275c79a7"
dependencies = [
"nom",
"nom-derive-impl",
"rustversion",
]
[[package]]
name = "nom-derive-impl"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd0b9a93a84b0d3ec3e70e02d332dc33ac6dfac9cde63e17fcb77172dededa62"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "num-traits"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31"
dependencies = [
"num-traits 0.2.18",
]
[[package]]
name = "num-traits"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a"
dependencies = [
"autocfg",
]
[[package]]
name = "phf"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_codegen"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb1c3a8bc4dd4e5cfce29b44ffc14bedd2ee294559a294e2a4d4c9e9a6a13cd"
dependencies = [
"phf_generator",
"phf_shared",
]
[[package]]
name = "phf_generator"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d5285893bb5eb82e6aaf5d59ee909a06a16737a8970984dd7746ba9283498d6"
dependencies = [
"phf_shared",
"rand",
]
[[package]]
name = "phf_shared"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096"
dependencies = [
"siphasher",
]
[[package]]
name = "pnet_base"
version = "0.34.0"
@ -219,7 +408,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"syn",
"syn 2.0.52",
]
[[package]]
@ -243,6 +432,36 @@ dependencies = [
"pnet_macros_support",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn 1.0.109",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro2"
version = "1.0.78"
@ -261,6 +480,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "regex"
version = "1.10.3"
@ -304,8 +553,25 @@ dependencies = [
"pnet_packet",
"serde",
"serde_json",
"smoltcp",
"tls-parser",
]
[[package]]
name = "rusticata-macros"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632"
dependencies = [
"nom",
]
[[package]]
name = "rustversion"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
[[package]]
name = "ryu"
version = "1.0.17"
@ -329,7 +595,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.52",
]
[[package]]
@ -343,6 +609,45 @@ dependencies = [
"serde",
]
[[package]]
name = "siphasher"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
[[package]]
name = "smoltcp"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a1a996951e50b5971a2c8c0fa05a381480d70a933064245c4a223ddc87ccc97"
dependencies = [
"bitflags 1.3.2",
"byteorder",
"cfg-if",
"defmt",
"heapless",
"libc",
"log",
"managed",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "syn"
version = "2.0.52"
@ -354,6 +659,40 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "tls-parser"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "409206e2de64edbf7ea99a44ac31680daf9ef1a57895fb3c5bd738a903691be0"
dependencies = [
"enum_primitive",
"nom",
"nom-derive",
"phf",
"phf_codegen",
"rusticata-macros",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
@ -366,6 +705,18 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "windows-sys"
version = "0.52.0"

View File

@ -5,12 +5,15 @@ edition = "2021"
[dependencies]
lru_time_cache = "0.11.11"
nfq = "0.2.5"
nfq = {version = "0.2.5", features=["ct"]}
pnet_packet = "0.34.0"
anyhow="1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
iptables = "0.5.1"
ctrlc = "3.4.2"
#ctrlc = {version = "3.4.2", features = ["termination"]}
ctrlc = {version = "3.4.2"}
log = "0.4.21"
env_logger = "0.11.3"
smoltcp = "0.11.0"
tls-parser = "0.11.0"

5
config.json Normal file
View File

@ -0,0 +1,5 @@
{
"tcp_stream_filter": {
"port_range":[3090,3090]
}
}

12
iptables Normal file
View File

@ -0,0 +1,12 @@
// 所有tcp连接已经标记直接return
iptables -t mangle -A PREROUTING -m connmark --mark 2 -j RETURN
// 进入队列
iptables -t mangel -A PREROUTING -p tcp --dport {}:{} -m conntrack --ctstate NEW -j NFQUEUE --queue-bypass --queue-num 1
// 所有fwmark标记为1的标记为2
iptables -t mangle -A PREROUTING -m mark --mark 0x1 -j CONNMARK --set-mark 0x2
iptables -t mangle -A PREROUTING -m mark --mark 3 -j LOG --log-prefix "FWMark 3 Packet: "

View File

@ -6,8 +6,14 @@ pub struct IPNumberStrict {
pub max_ip_number: usize,
pub port_range: (u16, u16),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TcpStreamFilter {
pub port_range: (u16, u16),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
pub ip_num_strict: IPNumberStrict,
pub ip_num_strict: Option<IPNumberStrict>,
pub tcp_stream_filter: Option<TcpStreamFilter>,
}

View File

@ -2,12 +2,13 @@ mod config;
mod nfq_mng;
mod nfq_proc;
mod processor;
mod tcp_assember;
use std::path::PathBuf;
use config::Config;
use nfq_mng::NFQMng;
use nfq_proc::NfqRunBuilder;
use processor::ip_num_strict::IpNumStrict;
use processor::{ip_num_strict::IpNumStrict, log_tcp_stream::TcpStreamLogger};
fn main() -> anyhow::Result<()> {
env_logger::init();
let cfg_file: PathBuf = std::env::args()
@ -16,24 +17,85 @@ fn main() -> anyhow::Result<()> {
.parse()?;
let config: Config = serde_json::from_reader(std::fs::File::open(&cfg_file)?)?;
let mut ipt_mng = NFQMng::new();
ipt_mng.add__nfq_rule(
"mangle",
"PREROUTING",
&format!(
"-p tcp --dport {}:{} -m conntrack --ctstate NEW",
config.ip_num_strict.port_range.0, config.ip_num_strict.port_range.1
),
0,
);
ipt_mng.run()?;
let mut builder = NfqRunBuilder::new();
builder.add_processor(
0,
Box::new(IpNumStrict::new(
config.ip_num_strict.max_ip_number,
config.ip_num_strict.time_thr_sec,
)),
);
ipt_mng.new_chain("mangle", "PROC");
if let Some(ref ip_num_strict) = config.ip_num_strict {
ipt_mng.add__nfq_rule(
"mangle",
"PROC",
&format!(
"-p tcp --dport {}:{} -m conntrack --ctstate NEW",
ip_num_strict.port_range.0, ip_num_strict.port_range.1
),
0,
);
builder.add_processor(
0,
false,
Box::new(IpNumStrict::new(
config.tcp_stream_filter.is_some(),
ip_num_strict.max_ip_number,
ip_num_strict.time_thr_sec,
)),
);
}
if let Some(ref stream_filter) = config.tcp_stream_filter {
// 给Conn打上标记
ipt_mng.insert_rule(
"mangle",
"INPUT",
"-m mark --mark 3 -j CONNMARK --set-mark 3",
1,
);
ipt_mng.insert_rule(
"mangle",
"FORWARD",
"-m mark --mark 3 -j CONNMARK --set-mark 3",
1,
);
ipt_mng.insert_rule(
"mangle",
"INPUT",
"-m mark --mark 2 -j CONNMARK --set-mark 2",
1,
);
ipt_mng.insert_rule(
"mangle",
"FORWARD",
"-m mark --mark 2 -j CONNMARK --set-mark 2",
1,
);
// 遇到Conn标记为3的跳过入queue
ipt_mng.add__comm_rule("mangle", "PROC", "-p tcp -m connmark --mark 3 -j RETURN");
// 标记为2的执行Rst
ipt_mng.insert_rule(
"filter",
"INPUT",
"-p tcp -m connmark --mark 2 -j REJECT --reject-with tcp-reset",
1,
);
ipt_mng.insert_rule(
"filter",
"FORWARD",
"-p tcp -m connmark --mark 2 -j REJECT --reject-with tcp-reset",
1,
);
ipt_mng.add__nfq_rule(
"mangle",
"PROC",
&format!(
"-p tcp --dport {}:{}",
stream_filter.port_range.0, stream_filter.port_range.1
),
1,
);
builder.add_processor(1, true, Box::new(TcpStreamLogger::new()));
}
ipt_mng.add__comm_rule("mangle", "PREROUTING", "-p tcp -j PROC");
ipt_mng.run()?;
builder.run()?;
Ok(())
}

View File

@ -10,6 +10,7 @@ struct Rule {
tbl: String,
chain: String,
rule: String,
insert: Option<i32>,
}
pub struct NFQMng {
rules: Arc<Mutex<Vec<Rule>>>,
@ -27,6 +28,7 @@ impl NFQMng {
tbl: tbl.to_owned(),
chain: chain.to_owned(),
rule: rule_str,
insert: None,
});
}
pub fn add__comm_rule(&mut self, tbl: &str, chain: &str, para: &str) {
@ -34,6 +36,21 @@ impl NFQMng {
tbl: tbl.to_owned(),
chain: chain.to_owned(),
rule: para.to_owned(),
insert: None,
});
}
pub fn new_chain(&self, tbl: &str, chain: &str) {
let ips = iptables::new(false).unwrap();
ips.new_chain(tbl, chain);
}
pub fn insert_rule(&mut self, tbl: &str, chain: &str, para: &str, pos: i32) {
self.rules.lock().unwrap().push(Rule {
tbl: tbl.to_owned(),
chain: chain.to_owned(),
rule: para.to_owned(),
insert: Some(pos),
});
}
pub fn run(self) -> anyhow::Result<()> {
@ -59,9 +76,16 @@ impl NFQMng {
for r in self.rules.lock().unwrap().iter() {
match ips.exists(&r.tbl, &r.chain, &r.rule) {
Ok(false) => {
if let Err(e) = ips.insert(&r.tbl, &r.chain, &r.rule, 1) {
println!("setup iptables rule: {r:?} failed: {e:?}");
has_failed = true;
if let Some(idx) = r.insert {
if let Err(e) = ips.insert(&r.tbl, &r.chain, &r.rule, idx) {
println!("setup insert iptables rule: {r:?} failed: {e:?}");
has_failed = true;
}
} else {
if let Err(e) = ips.append(&r.tbl, &r.chain, &r.rule) {
println!("setup append iptables rule: {r:?} failed: {e:?}");
has_failed = true;
}
}
}
Ok(true) => {}

View File

@ -1,17 +1,25 @@
use std::{collections::HashMap, net::Ipv4Addr};
use anyhow::anyhow;
use nfq::{Queue, Verdict};
use pnet_packet::{ipv4::Ipv4Packet, tcp::TcpPacket, udp::UdpPacket, Packet};
use nfq::{Message, Queue, Verdict};
use pnet_packet::{
ipv4::Ipv4Packet,
tcp::{TcpFlags, TcpPacket},
udp::UdpPacket,
Packet,
};
pub struct MessageData {
pub from_ip: Ipv4Addr,
pub dst_ip: Ipv4Addr,
pub src_port: u16,
pub dst_port: u16,
pub tcp_flags: Option<u8>,
pub tcp_ack: Option<u32>,
pub ct: Option<(nfq::conntrack::State, u32)>
}
pub trait Processor {
fn run<'a>(&mut self, msg: &MessageData, orig_msg: &Ipv4Packet<'a>) -> Verdict;
fn run<'a>(&mut self, msg: &MessageData, ipv4_pkt: &Ipv4Packet<'a>) -> (Verdict, Option<u32>);
fn message_type(&self) -> MessageType;
}
@ -20,7 +28,7 @@ pub enum MessageType {
Udp,
}
pub struct NfqRunBuilder {
processor: HashMap<u16, (MessageType, Vec<Box<dyn Processor + Send + Sync + 'static>>)>,
processor: HashMap<u16, (MessageType, Vec<Box<dyn Processor + Send + Sync + 'static>>, bool)>,
}
impl NfqRunBuilder {
@ -32,50 +40,80 @@ impl NfqRunBuilder {
pub fn add_processor(
&mut self,
que_idx: u16,
ct: bool,
processor: Box<dyn Processor + Send + Sync + 'static>,
) {
self.processor
.entry(que_idx)
.or_insert_with(|| (processor.message_type(), Vec::new()))
.or_insert_with(|| (processor.message_type(), Vec::new(), ct))
.1
.push(processor);
}
pub fn run(self) -> anyhow::Result<()> {
let mut handlers = Vec::new();
for (idx, (ty, mut all_processor)) in self.processor.into_iter() {
for (idx, (ty, mut all_processor, ct)) in self.processor.into_iter() {
let parse2 = match ty {
MessageType::Tcp => move |ip_pkt: &Ipv4Packet| {
let tcp_pkt = TcpPacket::new(ip_pkt.payload())
.ok_or(anyhow!("can't parse tcp packet"))?;
Ok::<_, anyhow::Error>((tcp_pkt.get_source(), tcp_pkt.get_destination()))
let flags = tcp_pkt.get_flags();
let ack = if (flags & TcpFlags::ACK) != 0 {
Some(tcp_pkt.get_acknowledgement())
} else {
None
};
Ok::<_, anyhow::Error>((
tcp_pkt.get_source(),
tcp_pkt.get_destination(),
Some(tcp_pkt.get_flags()),
ack,
))
},
MessageType::Udp => move |ip_pkt: &Ipv4Packet| {
let tcp_pkt = UdpPacket::new(ip_pkt.payload())
let udp_pkt = UdpPacket::new(ip_pkt.payload())
.ok_or(anyhow!("can't parse udp packet"))?;
Ok::<_, anyhow::Error>((tcp_pkt.get_source(), tcp_pkt.get_destination()))
Ok::<_, anyhow::Error>((
udp_pkt.get_source(),
udp_pkt.get_destination(),
None,
None,
))
},
};
let h = std::thread::spawn(move || {
let mut queue = Queue::open()?;
queue.bind(idx)?;
if ct {
queue.set_recv_conntrack(idx, ct)?;
}
loop {
let mut msg = queue.recv()?;
let ct = msg.get_conntrack().map(|ct| (ct.get_state(), ct.get_id()));
let mut act = Verdict::Accept;
if let Some(ip_packet) = pnet_packet::ipv4::Ipv4Packet::new(msg.get_payload()) {
let from_ip = ip_packet.get_source();
let dst_ip = ip_packet.get_destination();
let (src_port, dst_port) = parse2(&ip_packet)?;
let (src_port, dst_port, tcp_flags, tcp_ack) = parse2(&ip_packet)?;
let msg_data = MessageData {
from_ip,
dst_ip,
src_port,
dst_port,
tcp_flags,
tcp_ack,
ct
};
log::trace!("ip from: {from_ip:?}:{src_port} to {dst_ip:?}:{dst_port}");
for p in all_processor.iter_mut() {
let cur_rst = p.run(&msg_data, &ip_packet);
let (cur_rst, mark) = p.run(&msg_data, &ip_packet);
if let Some(m) = mark {
msg.set_nfmark(m);
act = cur_rst;
break;
}
match cur_rst {
Verdict::Drop | Verdict::Stop => {
Verdict::Drop | Verdict::Stop | Verdict::Queue(_) => {
act = cur_rst;
break;
}

View File

@ -1,18 +1,20 @@
use std::{collections::HashMap, net::Ipv4Addr};
use nfq::Verdict;
use nfq::{Message, Verdict};
use pnet_packet::ipv4::Ipv4Packet;
use crate::nfq_proc::{MessageData, MessageType, Processor};
pub struct IpNumStrict {
has_stream_check: bool,
ip_number: usize,
time_thr: u64,
data: HashMap<u16, lru_time_cache::LruCache<Ipv4Addr, ()>>,
}
impl IpNumStrict {
pub fn new(ip_number: usize, time_thr: u64) -> Self {
pub fn new(has_stream_check: bool, ip_number: usize, time_thr: u64) -> Self {
Self {
has_stream_check,
ip_number,
time_thr,
data: Default::default(),
@ -20,8 +22,12 @@ impl IpNumStrict {
}
}
impl Processor for IpNumStrict {
fn run<'a>(&mut self, msg: &MessageData, _orig_msg: &Ipv4Packet<'a>) -> Verdict {
let mut act = Verdict::Accept;
fn run<'a>(&mut self, msg: &MessageData, _ipv4_pkt: &Ipv4Packet<'a>,) -> (Verdict,Option<u32>) {
let mut act = if self.has_stream_check {
Verdict::Queue(1)
} else {
Verdict::Accept
};
let from_ip = msg.from_ip;
let dst_ip = msg.dst_ip;
let src_port = msg.src_port;
@ -49,7 +55,7 @@ impl Processor for IpNumStrict {
// 已经存在,直接放行
}
}
act
(act, None)
}
fn message_type(&self) -> MessageType {
MessageType::Tcp

View File

@ -0,0 +1,150 @@
use std::{borrow::BorrowMut, net::Ipv4Addr};
use lru_time_cache::Entry;
use nfq::{conntrack::State, Message, Verdict};
use pnet_packet::{
ipv4::Ipv4Packet,
tcp::{TcpFlags, TcpPacket},
Packet,
};
use tls_parser::{
parse_tls_extension, parse_tls_plaintext, TlsExtension, TlsMessageHandshake, TlsRecordType,
};
use crate::{
nfq_proc::{MessageData, MessageType, Processor},
tcp_assember::MonSocket,
};
pub struct TcpStreamLogger {
streams: lru_time_cache::LruCache<u32, MonSocket>,
}
impl TcpStreamLogger {
pub fn new() -> Self {
Self {
streams: lru_time_cache::LruCache::with_expiry_duration(
std::time::Duration::from_secs(60),
),
}
}
}
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord, Clone)]
struct Tuple {
src_ip: Ipv4Addr,
dst_ip: Ipv4Addr,
src_port: u16,
dst_port: u16,
}
impl Tuple {
fn new(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, src_port: u16, dst_port: u16) -> Self {
Self {
src_ip,
dst_ip,
src_port,
dst_port,
}
}
}
fn check_stream(data: &[u8], tup: &Tuple) -> anyhow::Result<bool> {
if let Ok((_, p)) = parse_tls_plaintext(data) {
match p.hdr.record_type {
TlsRecordType::Handshake => {
if let Some(first) = p.msg.first() {
match first {
tls_parser::TlsMessage::Handshake(h) => match h {
TlsMessageHandshake::ClientHello(c) => {
log::warn!("tup: {tup:?} client hello detected");
if let Some((_, e)) =
c.ext.and_then(|e| parse_tls_extension(e).ok())
{
match e {
TlsExtension::SNI(v) => {
for (_, s) in v {
log::warn!(
"sni: {:?}",
String::from_utf8(s.to_vec())
);
return Ok(false);
}
}
_ => {}
}
}
}
o => {
log::debug!("other handshake type: {o:?}");
}
},
o => {
log::debug!("other message type: {o:?}");
}
}
}
}
o => {
log::debug!("get tls record type: {o}");
}
}
}
Ok(true)
}
impl Processor for TcpStreamLogger {
fn run<'a>(&mut self, msg: &MessageData, ipv4_pkt: &Ipv4Packet<'a>) -> (Verdict, Option<u32>) {
let act = Verdict::Accept;
let (ct_state, ct_id) = if let Some(d) = msg.ct.as_ref() {
(&d.0, d.1)
} else {
return (act, None);
};
let from_ip = msg.from_ip;
let dst_ip = msg.dst_ip;
let src_port = msg.src_port;
let dst_port = msg.dst_port;
let socket_entry = if matches!(ct_state, State::New) {
match self.streams.entry(ct_id) {
Entry::Occupied(e) => {
// 可能是重传的?
Some(e.into_mut())
}
Entry::Vacant(e) => {
log::debug!(
"new conn from_ip: {from_ip:?}:{src_port} to {dst_ip:?}:{dst_port}"
);
let socket = MonSocket::new();
Some(e.insert(socket))
}
}
} else {
self.streams.get_mut(&ct_id)
};
let tup = Tuple::new(from_ip, dst_ip, src_port, dst_port);
log::trace!("proc tuple: {tup:?}");
let mut mark = None;
if let Some(socket_entry) = socket_entry {
let socket = socket_entry;
socket.feed_packet(TcpPacket::new(ipv4_pkt.payload()).unwrap());
match socket.get_data() {
Some(s) => {
// log::debug!("tuple:{:?}, data: {:?}", tup.clone(), String::from_utf8(s));
if let Ok(false) = check_stream(&s, &tup) {
// log::debug!("mark 2");
mark = Some(2);
} else {
// log::debug!("mark 3");
mark = Some(3);
}
self.streams.remove(&ct_id);
}
None => {
// *socket_entry = Some(socket);
}
}
}
(act, mark)
}
fn message_type(&self) -> MessageType {
MessageType::Tcp
}
}

View File

@ -1,2 +1,3 @@
pub mod ip_num_strict;
pub mod log_tcp_stream;
use super::nfq_proc::Processor;

279
src/tcp_assember.rs Normal file
View File

@ -0,0 +1,279 @@
use std::time::Instant;
use pnet_packet::{
tcp::{TcpFlags, TcpPacket},
Packet,
};
use smoltcp::{
storage::{Assembler, RingBuffer},
wire::TcpControl,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum State {
Closed,
Listen,
SynSent,
SynReceived,
Established,
}
pub struct MonSocket {
create_time: Instant,
state: State,
remote_seq_no: usize,
assember: Assembler,
rx_buffer: RingBuffer<'static, u8>,
}
impl MonSocket {
pub fn new() -> Self {
Self {
create_time: Instant::now(),
state: State::Listen,
remote_seq_no: 0,
rx_buffer: RingBuffer::new(vec![0;4096]),
assember: Assembler::new()
}
}
pub fn is_closed(&self) -> bool {
self.state == State::Closed
}
pub fn feed_packet(&mut self, pkt: TcpPacket) {
if self.state == State::Closed {
return;
}
let window_start = self.remote_seq_no + self.rx_buffer.len();
let window_end = self.remote_seq_no + self.rx_buffer.capacity();
let segment_start = pkt.get_sequence() as usize;
let segment_end = pkt.get_sequence() as usize + pkt.payload().len();
let (payload, payload_offset) = match self.state {
// In LISTEN and SYN-SENT states, we have not yet synchronized with the remote end.
State::Listen | State::SynSent => (&[][..], 0),
_ => {
// https://www.rfc-editor.org/rfc/rfc9293.html#name-segment-acceptability-tests
let segment_in_window = match (
segment_start == segment_end,
window_start == window_end,
) {
(true, _) if segment_end == window_start - 1 => {
// log::debug!(
// "received a keep-alive or window probe packet, will send an ACK"
// );
false
}
(true, true) => {
if window_start == segment_start {
true
} else {
// log::debug!(
// "zero-length segment not inside zero-length window, will send an ACK."
// );
false
}
}
(true, false) => {
if window_start <= segment_start && segment_start < window_end {
true
} else {
// log::debug!("zero-length segment not inside window, will send an ACK.");
false
}
}
(false, true) => {
// log::debug!(
// "non-zero-length segment with zero receive window, will only send an ACK"
// );
false
}
(false, false) => {
if (window_start <= segment_start && segment_start < window_end)
|| (window_start < segment_end && segment_end <= window_end)
{
true
} else {
// log::debug!(
// "segment not in receive window ({}..{} not intersecting {}..{}), will send challenge ACK",
// segment_start,
// segment_end,
// window_start,
// window_end
// );
false
}
}
};
if segment_in_window {
let overlap_start = window_start.max(segment_start);
let overlap_end = window_end.min(segment_end);
// the checks done above imply this.
debug_assert!(overlap_start <= overlap_end);
(
&pkt.payload()[overlap_start - segment_start..overlap_end - segment_start],
overlap_start - window_start,
)
} else {
// full state
self.state = State::Closed;
return;
}
}
};
let flg = pkt.get_flags();
let mut control = match (
(flg & TcpFlags::SYN) != 0,
(flg & TcpFlags::FIN) != 0,
(flg & TcpFlags::RST) != 0,
(flg & TcpFlags::PSH) != 0,
) {
(false, false, false, false) => TcpControl::None,
(false, false, false, true) => TcpControl::Psh,
(true, false, false, _) => TcpControl::Syn,
(false, true, false, _) => TcpControl::Fin,
(false, false, true, _) => TcpControl::Rst,
_ => return,
};
control = control.quash_psh();
if control == TcpControl::Fin && window_start != segment_start {
log::trace!("ignoring FIN because we don't have full data yet. window_start={} segment_start={}", window_start, segment_start);
control = TcpControl::None;
}
match (self.state, control) {
// RSTs are not accepted in the LISTEN state.
(State::Listen, TcpControl::Rst) => return,
// RSTs in SYN-RECEIVED flip the socket back to the LISTEN state.
(State::SynReceived, TcpControl::Rst) => {
log::trace!("received RST");
// self.tuple = None;
self.state = State::Listen;
return;
}
// RSTs in any other state close the socket.
(_, TcpControl::Rst) => {
log::trace!("received RST");
self.state = State::Closed;
return;
}
// SYN packets in the LISTEN state change it to SYN-RECEIVED.
(State::Listen, TcpControl::Syn) => {
log::trace!("received SYN");
self.remote_seq_no = pkt.get_sequence() as usize + 1;
self.state = State::SynReceived;
}
// ACK packets in the SYN-RECEIVED state change it to ESTABLISHED.
(State::SynReceived, TcpControl::None) => {
self.state = State::Established;
}
// FIN packets in the SYN-RECEIVED state change it to CLOSE-WAIT.
// It's not obvious from RFC 793 that this is permitted, but
// 7th and 8th steps in the "SEGMENT ARRIVES" event describe this behavior.
(State::SynReceived, TcpControl::Fin) => {
self.remote_seq_no += 1;
self.state = State::Closed;
}
// SYN|ACK packets in the SYN-SENT state change it to ESTABLISHED.
(State::SynSent, TcpControl::Syn) => {
log::trace!("received SYN|ACK");
self.remote_seq_no = pkt.get_sequence() as usize + 1;
self.state = State::Established;
}
// ACK packets in ESTABLISHED state reset the retransmit timer,
// except for duplicate ACK packets which preserve it.
(State::Established, TcpControl::None) => {}
// FIN packets in ESTABLISHED state indicate the remote side has closed.
(State::Established, TcpControl::Fin) => {
self.remote_seq_no += 1;
self.state = State::Closed;
}
_ => {
log::debug!("unexpected packet {:?}", pkt);
return;
}
}
let payload_len = payload.len();
if payload_len == 0 {
return;
}
// Try adding payload octets to the assembler.
let Ok(contig_len) = self
.assember
.add_then_remove_front(payload_offset, payload_len)
else {
log::debug!(
"assembler: too many holes to add {} octets at offset {}",
payload_len,
payload_offset
);
return;
};
// Place payload octets into the buffer.
log::trace!(
"rx buffer: receiving {} octets at offset {}",
payload_len,
payload_offset
);
let len_written = self.rx_buffer.write_unallocated(payload_offset, payload);
debug_assert!(len_written == payload_len);
if contig_len != 0 {
// Enqueue the contiguous data octets in front of the buffer.
log::trace!(
"rx buffer: enqueueing {} octets (now {})",
contig_len,
self.rx_buffer.len() + contig_len
);
self.rx_buffer.enqueue_unallocated(contig_len);
}
}
pub fn get_data(&mut self) -> Option<Vec<u8>> {
if self.state == State::Closed {
return Some(self.rx_buffer.dequeue_many(self.rx_buffer.len()).to_vec());
} else {
None
}
}
pub fn get_force(mut self) -> Option<Vec<u8>> {
if self.rx_buffer.is_empty() {
None
} else {
Some(self.rx_buffer.dequeue_many(self.rx_buffer.len()).to_vec())
}
}
}
#[cfg(test)]
mod test {
use smoltcp::storage::{Assembler, RingBuffer};
#[test]
fn test_ringbuf() {
let mut buf = vec![0 as u8; 100];
let mut ring_buf = RingBuffer::new(buf);
assert_eq!(ring_buf.write_unallocated(0, &vec![1; 10]), 10);
assert_eq!(ring_buf.write_unallocated(20, &vec![3; 10]), 10);
let mut assember = Assembler::new();
let s = assember.add_then_remove_front(0, 10).unwrap();
assert_eq!(s, 10);
let s = assember.add_then_remove_front(20, 10).unwrap();
assert_eq!(s, 0);
assert_eq!(ring_buf.write_unallocated(10, &vec![2; 10]), 10);
let s = assember.add_then_remove_front(10, 10).unwrap();
assert_eq!(s, 20);
}
}