diff --git a/Cargo.lock b/Cargo.lock index 6a4d667..beb0825 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,12 +65,30 @@ version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "cfg-if" version = "1.0.0" @@ -93,6 +111,47 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "defmt" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3939552907426de152b3c2c6f51ed53f98f448babd26f28694c95f5906194595" +dependencies = [ + "bitflags 1.3.2", + "defmt-macros", +] + +[[package]] +name = "defmt-macros" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bdc7a7b92ac413e19e95240e75d3a73a8d8e78aa24a594c22cbb4d44b4bbda" +dependencies = [ + "defmt-parser", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.52", +] + +[[package]] +name = "defmt-parser" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff4a5fefe330e8d7f31b16a318f9ce81000d8e35e69b93eae154d16d2278f70f" +dependencies = [ + "thiserror", +] + +[[package]] +name = "enum_primitive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180" +dependencies = [ + "num-traits 0.1.43", +] + [[package]] name = "env_filter" version = "0.1.0" @@ -116,12 +175,42 @@ dependencies = [ "log", ] +[[package]] +name = "getrandom" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "glob" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "humantime" version = "2.1.0" @@ -169,12 +258,24 @@ version = "0.11.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9106e1d747ffd48e6be5bb2d97fa706ed25b144fbee4d5c02eae110cd8d6badd" +[[package]] +name = "managed" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d" + [[package]] name = "memchr" version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "nfq" version = "0.2.5" @@ -190,7 +291,7 @@ version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" dependencies = [ - "bitflags", + "bitflags 2.4.2", "cfg-if", "libc", ] @@ -201,6 +302,94 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "nom-derive" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff943d68b88d0b87a6e0d58615e8fa07f9fd5a1319fa0a72efc1f62275c79a7" +dependencies = [ + "nom", + "nom-derive-impl", + "rustversion", +] + +[[package]] +name = "nom-derive-impl" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b9a93a84b0d3ec3e70e02d332dc33ac6dfac9cde63e17fcb77172dededa62" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "num-traits" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31" +dependencies = [ + "num-traits 0.2.18", +] + +[[package]] +name = "num-traits" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +dependencies = [ + "autocfg", +] + +[[package]] +name = "phf" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb1c3a8bc4dd4e5cfce29b44ffc14bedd2ee294559a294e2a4d4c9e9a6a13cd" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5285893bb5eb82e6aaf5d59ee909a06a16737a8970984dd7746ba9283498d6" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" +dependencies = [ + "siphasher", +] + [[package]] name = "pnet_base" version = "0.34.0" @@ -219,7 +408,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn", + "syn 2.0.52", ] [[package]] @@ -243,6 +432,36 @@ dependencies = [ "pnet_macros_support", ] +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.78" @@ -261,6 +480,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "regex" version = "1.10.3" @@ -304,8 +553,25 @@ dependencies = [ "pnet_packet", "serde", "serde_json", + "smoltcp", + "tls-parser", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom", +] + +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "ryu" version = "1.0.17" @@ -329,7 +595,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.52", ] [[package]] @@ -343,6 +609,45 @@ dependencies = [ "serde", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + +[[package]] +name = "smoltcp" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a1a996951e50b5971a2c8c0fa05a381480d70a933064245c4a223ddc87ccc97" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "cfg-if", + "defmt", + "heapless", + "libc", + "log", + "managed", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.52" @@ -354,6 +659,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "1.0.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + +[[package]] +name = "tls-parser" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409206e2de64edbf7ea99a44ac31680daf9ef1a57895fb3c5bd738a903691be0" +dependencies = [ + "enum_primitive", + "nom", + "nom-derive", + "phf", + "phf_codegen", + "rusticata-macros", +] + [[package]] name = "unicode-ident" version = "1.0.12" @@ -366,6 +705,18 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index d408357..62f919f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,12 +5,15 @@ edition = "2021" [dependencies] lru_time_cache = "0.11.11" -nfq = "0.2.5" +nfq = {version = "0.2.5", features=["ct"]} pnet_packet = "0.34.0" anyhow="1" serde = { version = "1", features = ["derive"] } serde_json = "1" iptables = "0.5.1" -ctrlc = "3.4.2" +#ctrlc = {version = "3.4.2", features = ["termination"]} +ctrlc = {version = "3.4.2"} log = "0.4.21" env_logger = "0.11.3" +smoltcp = "0.11.0" +tls-parser = "0.11.0" diff --git a/config.json b/config.json new file mode 100644 index 0000000..723bf17 --- /dev/null +++ b/config.json @@ -0,0 +1,5 @@ +{ + "tcp_stream_filter": { + "port_range":[3090,3090] + } +} \ No newline at end of file diff --git a/iptables b/iptables new file mode 100644 index 0000000..b2ee973 --- /dev/null +++ b/iptables @@ -0,0 +1,12 @@ + +// 所有tcp连接已经标记,直接return +iptables -t mangle -A PREROUTING -m connmark --mark 2 -j RETURN + +// 进入队列 +iptables -t mangel -A PREROUTING -p tcp --dport {}:{} -m conntrack --ctstate NEW -j NFQUEUE --queue-bypass --queue-num 1 + + +// 所有fwmark标记为1的,标记为2 +iptables -t mangle -A PREROUTING -m mark --mark 0x1 -j CONNMARK --set-mark 0x2 + +iptables -t mangle -A PREROUTING -m mark --mark 3 -j LOG --log-prefix "FWMark 3 Packet: " \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 4c4eb21..e0d55b4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,8 +6,14 @@ pub struct IPNumberStrict { pub max_ip_number: usize, pub port_range: (u16, u16), } + +#[derive(Debug, Serialize, Deserialize)] +pub struct TcpStreamFilter { + pub port_range: (u16, u16), +} #[derive(Debug, Serialize, Deserialize)] pub struct Config { - pub ip_num_strict: IPNumberStrict, + pub ip_num_strict: Option, + pub tcp_stream_filter: Option, } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 8e5cf0c..108565e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,12 +2,13 @@ mod config; mod nfq_mng; mod nfq_proc; mod processor; +mod tcp_assember; use std::path::PathBuf; use config::Config; use nfq_mng::NFQMng; use nfq_proc::NfqRunBuilder; -use processor::ip_num_strict::IpNumStrict; +use processor::{ip_num_strict::IpNumStrict, log_tcp_stream::TcpStreamLogger}; fn main() -> anyhow::Result<()> { env_logger::init(); let cfg_file: PathBuf = std::env::args() @@ -16,24 +17,85 @@ fn main() -> anyhow::Result<()> { .parse()?; let config: Config = serde_json::from_reader(std::fs::File::open(&cfg_file)?)?; let mut ipt_mng = NFQMng::new(); - ipt_mng.add__nfq_rule( - "mangle", - "PREROUTING", - &format!( - "-p tcp --dport {}:{} -m conntrack --ctstate NEW", - config.ip_num_strict.port_range.0, config.ip_num_strict.port_range.1 - ), - 0, - ); - ipt_mng.run()?; let mut builder = NfqRunBuilder::new(); - builder.add_processor( - 0, - Box::new(IpNumStrict::new( - config.ip_num_strict.max_ip_number, - config.ip_num_strict.time_thr_sec, - )), - ); + ipt_mng.new_chain("mangle", "PROC"); + if let Some(ref ip_num_strict) = config.ip_num_strict { + ipt_mng.add__nfq_rule( + "mangle", + "PROC", + &format!( + "-p tcp --dport {}:{} -m conntrack --ctstate NEW", + ip_num_strict.port_range.0, ip_num_strict.port_range.1 + ), + 0, + ); + builder.add_processor( + 0, + false, + Box::new(IpNumStrict::new( + config.tcp_stream_filter.is_some(), + ip_num_strict.max_ip_number, + ip_num_strict.time_thr_sec, + )), + ); + } + + if let Some(ref stream_filter) = config.tcp_stream_filter { + // 给Conn打上标记 + ipt_mng.insert_rule( + "mangle", + "INPUT", + "-m mark --mark 3 -j CONNMARK --set-mark 3", + 1, + ); + ipt_mng.insert_rule( + "mangle", + "FORWARD", + "-m mark --mark 3 -j CONNMARK --set-mark 3", + 1, + ); + ipt_mng.insert_rule( + "mangle", + "INPUT", + "-m mark --mark 2 -j CONNMARK --set-mark 2", + 1, + ); + ipt_mng.insert_rule( + "mangle", + "FORWARD", + "-m mark --mark 2 -j CONNMARK --set-mark 2", + 1, + ); + // 遇到Conn标记为3的跳过入queue + ipt_mng.add__comm_rule("mangle", "PROC", "-p tcp -m connmark --mark 3 -j RETURN"); + + // 标记为2的执行Rst + ipt_mng.insert_rule( + "filter", + "INPUT", + "-p tcp -m connmark --mark 2 -j REJECT --reject-with tcp-reset", + 1, + ); + ipt_mng.insert_rule( + "filter", + "FORWARD", + "-p tcp -m connmark --mark 2 -j REJECT --reject-with tcp-reset", + 1, + ); + + ipt_mng.add__nfq_rule( + "mangle", + "PROC", + &format!( + "-p tcp --dport {}:{}", + stream_filter.port_range.0, stream_filter.port_range.1 + ), + 1, + ); + builder.add_processor(1, true, Box::new(TcpStreamLogger::new())); + } + ipt_mng.add__comm_rule("mangle", "PREROUTING", "-p tcp -j PROC"); + ipt_mng.run()?; builder.run()?; Ok(()) } diff --git a/src/nfq_mng.rs b/src/nfq_mng.rs index 405daa3..4510b02 100644 --- a/src/nfq_mng.rs +++ b/src/nfq_mng.rs @@ -10,6 +10,7 @@ struct Rule { tbl: String, chain: String, rule: String, + insert: Option, } pub struct NFQMng { rules: Arc>>, @@ -27,6 +28,7 @@ impl NFQMng { tbl: tbl.to_owned(), chain: chain.to_owned(), rule: rule_str, + insert: None, }); } pub fn add__comm_rule(&mut self, tbl: &str, chain: &str, para: &str) { @@ -34,6 +36,21 @@ impl NFQMng { tbl: tbl.to_owned(), chain: chain.to_owned(), rule: para.to_owned(), + insert: None, + }); + } + + pub fn new_chain(&self, tbl: &str, chain: &str) { + let ips = iptables::new(false).unwrap(); + ips.new_chain(tbl, chain); + } + + pub fn insert_rule(&mut self, tbl: &str, chain: &str, para: &str, pos: i32) { + self.rules.lock().unwrap().push(Rule { + tbl: tbl.to_owned(), + chain: chain.to_owned(), + rule: para.to_owned(), + insert: Some(pos), }); } pub fn run(self) -> anyhow::Result<()> { @@ -59,9 +76,16 @@ impl NFQMng { for r in self.rules.lock().unwrap().iter() { match ips.exists(&r.tbl, &r.chain, &r.rule) { Ok(false) => { - if let Err(e) = ips.insert(&r.tbl, &r.chain, &r.rule, 1) { - println!("setup iptables rule: {r:?} failed: {e:?}"); - has_failed = true; + if let Some(idx) = r.insert { + if let Err(e) = ips.insert(&r.tbl, &r.chain, &r.rule, idx) { + println!("setup insert iptables rule: {r:?} failed: {e:?}"); + has_failed = true; + } + } else { + if let Err(e) = ips.append(&r.tbl, &r.chain, &r.rule) { + println!("setup append iptables rule: {r:?} failed: {e:?}"); + has_failed = true; + } } } Ok(true) => {} diff --git a/src/nfq_proc.rs b/src/nfq_proc.rs index a00c133..468261a 100644 --- a/src/nfq_proc.rs +++ b/src/nfq_proc.rs @@ -1,17 +1,25 @@ use std::{collections::HashMap, net::Ipv4Addr}; use anyhow::anyhow; -use nfq::{Queue, Verdict}; -use pnet_packet::{ipv4::Ipv4Packet, tcp::TcpPacket, udp::UdpPacket, Packet}; +use nfq::{Message, Queue, Verdict}; +use pnet_packet::{ + ipv4::Ipv4Packet, + tcp::{TcpFlags, TcpPacket}, + udp::UdpPacket, + Packet, +}; pub struct MessageData { pub from_ip: Ipv4Addr, pub dst_ip: Ipv4Addr, pub src_port: u16, pub dst_port: u16, + pub tcp_flags: Option, + pub tcp_ack: Option, + pub ct: Option<(nfq::conntrack::State, u32)> } pub trait Processor { - fn run<'a>(&mut self, msg: &MessageData, orig_msg: &Ipv4Packet<'a>) -> Verdict; + fn run<'a>(&mut self, msg: &MessageData, ipv4_pkt: &Ipv4Packet<'a>) -> (Verdict, Option); fn message_type(&self) -> MessageType; } @@ -20,7 +28,7 @@ pub enum MessageType { Udp, } pub struct NfqRunBuilder { - processor: HashMap>)>, + processor: HashMap>, bool)>, } impl NfqRunBuilder { @@ -32,50 +40,80 @@ impl NfqRunBuilder { pub fn add_processor( &mut self, que_idx: u16, + ct: bool, processor: Box, ) { self.processor .entry(que_idx) - .or_insert_with(|| (processor.message_type(), Vec::new())) + .or_insert_with(|| (processor.message_type(), Vec::new(), ct)) .1 .push(processor); } pub fn run(self) -> anyhow::Result<()> { let mut handlers = Vec::new(); - for (idx, (ty, mut all_processor)) in self.processor.into_iter() { + for (idx, (ty, mut all_processor, ct)) in self.processor.into_iter() { let parse2 = match ty { MessageType::Tcp => move |ip_pkt: &Ipv4Packet| { let tcp_pkt = TcpPacket::new(ip_pkt.payload()) .ok_or(anyhow!("can't parse tcp packet"))?; - Ok::<_, anyhow::Error>((tcp_pkt.get_source(), tcp_pkt.get_destination())) + let flags = tcp_pkt.get_flags(); + let ack = if (flags & TcpFlags::ACK) != 0 { + Some(tcp_pkt.get_acknowledgement()) + } else { + None + }; + + Ok::<_, anyhow::Error>(( + tcp_pkt.get_source(), + tcp_pkt.get_destination(), + Some(tcp_pkt.get_flags()), + ack, + )) }, MessageType::Udp => move |ip_pkt: &Ipv4Packet| { - let tcp_pkt = UdpPacket::new(ip_pkt.payload()) + let udp_pkt = UdpPacket::new(ip_pkt.payload()) .ok_or(anyhow!("can't parse udp packet"))?; - Ok::<_, anyhow::Error>((tcp_pkt.get_source(), tcp_pkt.get_destination())) + Ok::<_, anyhow::Error>(( + udp_pkt.get_source(), + udp_pkt.get_destination(), + None, + None, + )) }, }; let h = std::thread::spawn(move || { let mut queue = Queue::open()?; queue.bind(idx)?; + if ct { + queue.set_recv_conntrack(idx, ct)?; + } loop { let mut msg = queue.recv()?; + let ct = msg.get_conntrack().map(|ct| (ct.get_state(), ct.get_id())); let mut act = Verdict::Accept; if let Some(ip_packet) = pnet_packet::ipv4::Ipv4Packet::new(msg.get_payload()) { let from_ip = ip_packet.get_source(); let dst_ip = ip_packet.get_destination(); - let (src_port, dst_port) = parse2(&ip_packet)?; + let (src_port, dst_port, tcp_flags, tcp_ack) = parse2(&ip_packet)?; let msg_data = MessageData { from_ip, dst_ip, src_port, dst_port, + tcp_flags, + tcp_ack, + ct }; log::trace!("ip from: {from_ip:?}:{src_port} to {dst_ip:?}:{dst_port}"); for p in all_processor.iter_mut() { - let cur_rst = p.run(&msg_data, &ip_packet); + let (cur_rst, mark) = p.run(&msg_data, &ip_packet); + if let Some(m) = mark { + msg.set_nfmark(m); + act = cur_rst; + break; + } match cur_rst { - Verdict::Drop | Verdict::Stop => { + Verdict::Drop | Verdict::Stop | Verdict::Queue(_) => { act = cur_rst; break; } diff --git a/src/processor/ip_num_strict.rs b/src/processor/ip_num_strict.rs index f54b740..079c027 100644 --- a/src/processor/ip_num_strict.rs +++ b/src/processor/ip_num_strict.rs @@ -1,18 +1,20 @@ use std::{collections::HashMap, net::Ipv4Addr}; -use nfq::Verdict; +use nfq::{Message, Verdict}; use pnet_packet::ipv4::Ipv4Packet; use crate::nfq_proc::{MessageData, MessageType, Processor}; pub struct IpNumStrict { + has_stream_check: bool, ip_number: usize, time_thr: u64, data: HashMap>, } impl IpNumStrict { - pub fn new(ip_number: usize, time_thr: u64) -> Self { + pub fn new(has_stream_check: bool, ip_number: usize, time_thr: u64) -> Self { Self { + has_stream_check, ip_number, time_thr, data: Default::default(), @@ -20,8 +22,12 @@ impl IpNumStrict { } } impl Processor for IpNumStrict { - fn run<'a>(&mut self, msg: &MessageData, _orig_msg: &Ipv4Packet<'a>) -> Verdict { - let mut act = Verdict::Accept; + fn run<'a>(&mut self, msg: &MessageData, _ipv4_pkt: &Ipv4Packet<'a>,) -> (Verdict,Option) { + let mut act = if self.has_stream_check { + Verdict::Queue(1) + } else { + Verdict::Accept + }; let from_ip = msg.from_ip; let dst_ip = msg.dst_ip; let src_port = msg.src_port; @@ -49,7 +55,7 @@ impl Processor for IpNumStrict { // 已经存在,直接放行 } } - act + (act, None) } fn message_type(&self) -> MessageType { MessageType::Tcp diff --git a/src/processor/log_tcp_stream.rs b/src/processor/log_tcp_stream.rs new file mode 100644 index 0000000..047da61 --- /dev/null +++ b/src/processor/log_tcp_stream.rs @@ -0,0 +1,150 @@ +use std::{borrow::BorrowMut, net::Ipv4Addr}; + +use lru_time_cache::Entry; +use nfq::{conntrack::State, Message, Verdict}; +use pnet_packet::{ + ipv4::Ipv4Packet, + tcp::{TcpFlags, TcpPacket}, + Packet, +}; +use tls_parser::{ + parse_tls_extension, parse_tls_plaintext, TlsExtension, TlsMessageHandshake, TlsRecordType, +}; + +use crate::{ + nfq_proc::{MessageData, MessageType, Processor}, + tcp_assember::MonSocket, +}; + +pub struct TcpStreamLogger { + streams: lru_time_cache::LruCache, +} +impl TcpStreamLogger { + pub fn new() -> Self { + Self { + streams: lru_time_cache::LruCache::with_expiry_duration( + std::time::Duration::from_secs(60), + ), + } + } +} +#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord, Clone)] +struct Tuple { + src_ip: Ipv4Addr, + dst_ip: Ipv4Addr, + src_port: u16, + dst_port: u16, +} + +impl Tuple { + fn new(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, src_port: u16, dst_port: u16) -> Self { + Self { + src_ip, + dst_ip, + src_port, + dst_port, + } + } +} + +fn check_stream(data: &[u8], tup: &Tuple) -> anyhow::Result { + if let Ok((_, p)) = parse_tls_plaintext(data) { + match p.hdr.record_type { + TlsRecordType::Handshake => { + if let Some(first) = p.msg.first() { + match first { + tls_parser::TlsMessage::Handshake(h) => match h { + TlsMessageHandshake::ClientHello(c) => { + log::warn!("tup: {tup:?} client hello detected"); + if let Some((_, e)) = + c.ext.and_then(|e| parse_tls_extension(e).ok()) + { + match e { + TlsExtension::SNI(v) => { + for (_, s) in v { + log::warn!( + "sni: {:?}", + String::from_utf8(s.to_vec()) + ); + return Ok(false); + } + } + _ => {} + } + } + } + o => { + log::debug!("other handshake type: {o:?}"); + } + }, + o => { + log::debug!("other message type: {o:?}"); + } + } + } + } + o => { + log::debug!("get tls record type: {o}"); + } + } + } + Ok(true) +} +impl Processor for TcpStreamLogger { + fn run<'a>(&mut self, msg: &MessageData, ipv4_pkt: &Ipv4Packet<'a>) -> (Verdict, Option) { + let act = Verdict::Accept; + let (ct_state, ct_id) = if let Some(d) = msg.ct.as_ref() { + (&d.0, d.1) + } else { + return (act, None); + }; + let from_ip = msg.from_ip; + let dst_ip = msg.dst_ip; + let src_port = msg.src_port; + let dst_port = msg.dst_port; + let socket_entry = if matches!(ct_state, State::New) { + match self.streams.entry(ct_id) { + Entry::Occupied(e) => { + // 可能是重传的? + Some(e.into_mut()) + } + Entry::Vacant(e) => { + log::debug!( + "new conn from_ip: {from_ip:?}:{src_port} to {dst_ip:?}:{dst_port}" + ); + let socket = MonSocket::new(); + Some(e.insert(socket)) + } + } + } else { + self.streams.get_mut(&ct_id) + }; + let tup = Tuple::new(from_ip, dst_ip, src_port, dst_port); + log::trace!("proc tuple: {tup:?}"); + let mut mark = None; + if let Some(socket_entry) = socket_entry { + let socket = socket_entry; + socket.feed_packet(TcpPacket::new(ipv4_pkt.payload()).unwrap()); + match socket.get_data() { + Some(s) => { + // log::debug!("tuple:{:?}, data: {:?}", tup.clone(), String::from_utf8(s)); + if let Ok(false) = check_stream(&s, &tup) { + // log::debug!("mark 2"); + mark = Some(2); + } else { + // log::debug!("mark 3"); + mark = Some(3); + } + self.streams.remove(&ct_id); + } + None => { + // *socket_entry = Some(socket); + } + } + } + (act, mark) + } + fn message_type(&self) -> MessageType { + MessageType::Tcp + } +} diff --git a/src/processor/mod.rs b/src/processor/mod.rs index e7b6885..f4b93d4 100644 --- a/src/processor/mod.rs +++ b/src/processor/mod.rs @@ -1,2 +1,3 @@ pub mod ip_num_strict; +pub mod log_tcp_stream; use super::nfq_proc::Processor; \ No newline at end of file diff --git a/src/tcp_assember.rs b/src/tcp_assember.rs new file mode 100644 index 0000000..d7cb787 --- /dev/null +++ b/src/tcp_assember.rs @@ -0,0 +1,279 @@ +use std::time::Instant; + +use pnet_packet::{ + tcp::{TcpFlags, TcpPacket}, + Packet, +}; +use smoltcp::{ + storage::{Assembler, RingBuffer}, + wire::TcpControl, +}; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +enum State { + Closed, + Listen, + SynSent, + SynReceived, + Established, +} +pub struct MonSocket { + create_time: Instant, + state: State, + remote_seq_no: usize, + assember: Assembler, + rx_buffer: RingBuffer<'static, u8>, +} + +impl MonSocket { + pub fn new() -> Self { + Self { + create_time: Instant::now(), + state: State::Listen, + remote_seq_no: 0, + rx_buffer: RingBuffer::new(vec![0;4096]), + assember: Assembler::new() + } + } + pub fn is_closed(&self) -> bool { + self.state == State::Closed + } + pub fn feed_packet(&mut self, pkt: TcpPacket) { + if self.state == State::Closed { + return; + } + let window_start = self.remote_seq_no + self.rx_buffer.len(); + let window_end = self.remote_seq_no + self.rx_buffer.capacity(); + let segment_start = pkt.get_sequence() as usize; + let segment_end = pkt.get_sequence() as usize + pkt.payload().len(); + + let (payload, payload_offset) = match self.state { + // In LISTEN and SYN-SENT states, we have not yet synchronized with the remote end. + State::Listen | State::SynSent => (&[][..], 0), + _ => { + // https://www.rfc-editor.org/rfc/rfc9293.html#name-segment-acceptability-tests + let segment_in_window = match ( + segment_start == segment_end, + window_start == window_end, + ) { + (true, _) if segment_end == window_start - 1 => { + // log::debug!( + // "received a keep-alive or window probe packet, will send an ACK" + // ); + false + } + (true, true) => { + if window_start == segment_start { + true + } else { + // log::debug!( + // "zero-length segment not inside zero-length window, will send an ACK." + // ); + false + } + } + (true, false) => { + if window_start <= segment_start && segment_start < window_end { + true + } else { + // log::debug!("zero-length segment not inside window, will send an ACK."); + false + } + } + (false, true) => { + // log::debug!( + // "non-zero-length segment with zero receive window, will only send an ACK" + // ); + false + } + (false, false) => { + if (window_start <= segment_start && segment_start < window_end) + || (window_start < segment_end && segment_end <= window_end) + { + true + } else { + // log::debug!( + // "segment not in receive window ({}..{} not intersecting {}..{}), will send challenge ACK", + // segment_start, + // segment_end, + // window_start, + // window_end + // ); + false + } + } + }; + + if segment_in_window { + let overlap_start = window_start.max(segment_start); + let overlap_end = window_end.min(segment_end); + + // the checks done above imply this. + debug_assert!(overlap_start <= overlap_end); + + ( + &pkt.payload()[overlap_start - segment_start..overlap_end - segment_start], + overlap_start - window_start, + ) + } else { + // full state + self.state = State::Closed; + return; + } + } + }; + let flg = pkt.get_flags(); + let mut control = match ( + (flg & TcpFlags::SYN) != 0, + (flg & TcpFlags::FIN) != 0, + (flg & TcpFlags::RST) != 0, + (flg & TcpFlags::PSH) != 0, + ) { + (false, false, false, false) => TcpControl::None, + (false, false, false, true) => TcpControl::Psh, + (true, false, false, _) => TcpControl::Syn, + (false, true, false, _) => TcpControl::Fin, + (false, false, true, _) => TcpControl::Rst, + _ => return, + }; + control = control.quash_psh(); + if control == TcpControl::Fin && window_start != segment_start { + log::trace!("ignoring FIN because we don't have full data yet. window_start={} segment_start={}", window_start, segment_start); + control = TcpControl::None; + } + + match (self.state, control) { + // RSTs are not accepted in the LISTEN state. + (State::Listen, TcpControl::Rst) => return, + + // RSTs in SYN-RECEIVED flip the socket back to the LISTEN state. + (State::SynReceived, TcpControl::Rst) => { + log::trace!("received RST"); + // self.tuple = None; + self.state = State::Listen; + return; + } + + // RSTs in any other state close the socket. + (_, TcpControl::Rst) => { + log::trace!("received RST"); + self.state = State::Closed; + return; + } + + // SYN packets in the LISTEN state change it to SYN-RECEIVED. + (State::Listen, TcpControl::Syn) => { + log::trace!("received SYN"); + self.remote_seq_no = pkt.get_sequence() as usize + 1; + self.state = State::SynReceived; + } + + // ACK packets in the SYN-RECEIVED state change it to ESTABLISHED. + (State::SynReceived, TcpControl::None) => { + self.state = State::Established; + } + + // FIN packets in the SYN-RECEIVED state change it to CLOSE-WAIT. + // It's not obvious from RFC 793 that this is permitted, but + // 7th and 8th steps in the "SEGMENT ARRIVES" event describe this behavior. + (State::SynReceived, TcpControl::Fin) => { + self.remote_seq_no += 1; + self.state = State::Closed; + } + + // SYN|ACK packets in the SYN-SENT state change it to ESTABLISHED. + (State::SynSent, TcpControl::Syn) => { + log::trace!("received SYN|ACK"); + + self.remote_seq_no = pkt.get_sequence() as usize + 1; + + self.state = State::Established; + } + + // ACK packets in ESTABLISHED state reset the retransmit timer, + // except for duplicate ACK packets which preserve it. + (State::Established, TcpControl::None) => {} + + // FIN packets in ESTABLISHED state indicate the remote side has closed. + (State::Established, TcpControl::Fin) => { + self.remote_seq_no += 1; + self.state = State::Closed; + } + + _ => { + log::debug!("unexpected packet {:?}", pkt); + return; + } + } + let payload_len = payload.len(); + if payload_len == 0 { + return; + } + // Try adding payload octets to the assembler. + let Ok(contig_len) = self + .assember + .add_then_remove_front(payload_offset, payload_len) + else { + log::debug!( + "assembler: too many holes to add {} octets at offset {}", + payload_len, + payload_offset + ); + return; + }; + + // Place payload octets into the buffer. + log::trace!( + "rx buffer: receiving {} octets at offset {}", + payload_len, + payload_offset + ); + let len_written = self.rx_buffer.write_unallocated(payload_offset, payload); + debug_assert!(len_written == payload_len); + + if contig_len != 0 { + // Enqueue the contiguous data octets in front of the buffer. + log::trace!( + "rx buffer: enqueueing {} octets (now {})", + contig_len, + self.rx_buffer.len() + contig_len + ); + self.rx_buffer.enqueue_unallocated(contig_len); + } + } + pub fn get_data(&mut self) -> Option> { + if self.state == State::Closed { + return Some(self.rx_buffer.dequeue_many(self.rx_buffer.len()).to_vec()); + } else { + None + } + } + pub fn get_force(mut self) -> Option> { + if self.rx_buffer.is_empty() { + None + } else { + Some(self.rx_buffer.dequeue_many(self.rx_buffer.len()).to_vec()) + } + } +} + +#[cfg(test)] +mod test { + use smoltcp::storage::{Assembler, RingBuffer}; + + #[test] + fn test_ringbuf() { + let mut buf = vec![0 as u8; 100]; + let mut ring_buf = RingBuffer::new(buf); + assert_eq!(ring_buf.write_unallocated(0, &vec![1; 10]), 10); + assert_eq!(ring_buf.write_unallocated(20, &vec![3; 10]), 10); + let mut assember = Assembler::new(); + let s = assember.add_then_remove_front(0, 10).unwrap(); + assert_eq!(s, 10); + let s = assember.add_then_remove_front(20, 10).unwrap(); + assert_eq!(s, 0); + assert_eq!(ring_buf.write_unallocated(10, &vec![2; 10]), 10); + let s = assember.add_then_remove_front(10, 10).unwrap(); + assert_eq!(s, 20); + } +}