Upgrade libp2p 0.19.0 (#169)
* chore: upgrade the libp2p to 0.18.(0|1) * refactor: fix most of the swarm with todos mainly leaving the multiple connections case to panic. * feat: add ability to configure listening addr the original random listening address is still in place. * add test case for the panic with two connections * fix: allow multiple connections in p2p/swarm * doc: Connection and the any connecting address * chore: cleanup libp2p patching examples * refactor: rename SwarmApi::stats perhaps roundtrip_times is more readable. * chore: upgrade to libp2p 0.19 * fix: awaiting for binding new listening addresses the initial implementation didn't actually wait for anything other than asking the swarm to start listening, but now the await will complete only after a listening address has actually been bound. this required some nasty matching on the multiaddr protocol parts which will hopefully ever be used by test code but ... it works now, at least a bit better. * fix test on macos, fix unspecified addr handling
This commit is contained in:
parent
ad4601a37d
commit
7799d888f6
231
Cargo.lock
generated
231
Cargo.lock
generated
@ -188,14 +188,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-tls"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ce6977f57fa68da77ffe5542950d47e9c23d65f5bc7cb0a9f8700996913eec7"
|
||||
checksum = "95fd83426b89b034bf4e9ceb9c533c2f2386b813fd3dcae0a425ec6f1837d78a"
|
||||
dependencies = [
|
||||
"futures 0.3.4",
|
||||
"rustls",
|
||||
"webpki",
|
||||
"webpki-roots 0.17.0",
|
||||
"webpki-roots 0.19.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -238,15 +238,6 @@ version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b20b618342cf9891c292c4f5ac2cde7287cc5c87e87e9c769d617793607dec1"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e"
|
||||
dependencies = [
|
||||
"byteorder 1.3.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.11.0"
|
||||
@ -302,25 +293,13 @@ dependencies = [
|
||||
"libp2p-core",
|
||||
"libp2p-swarm",
|
||||
"log 0.4.8",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
"prost",
|
||||
"prost-build",
|
||||
"thiserror",
|
||||
"unsigned-varint",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "blake2"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94cb07b0da6a73955f8fb85d24c466778e70cda767a568229b104f0264089330"
|
||||
dependencies = [
|
||||
"byte-tools",
|
||||
"crypto-mac",
|
||||
"digest",
|
||||
"opaque-debug",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "blake2-rfc"
|
||||
version = "0.2.18"
|
||||
@ -505,7 +484,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b08bd59ac7b32410f32cb71c027c454e61c7826d166e281866560106f4a55956"
|
||||
dependencies = [
|
||||
"multibase",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
"unsigned-varint",
|
||||
]
|
||||
|
||||
@ -1370,7 +1349,7 @@ dependencies = [
|
||||
"libp2p",
|
||||
"log 0.4.8",
|
||||
"multibase",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
"prost",
|
||||
"prost-build",
|
||||
"rand 0.7.3",
|
||||
@ -1390,7 +1369,7 @@ dependencies = [
|
||||
"futures 0.3.4",
|
||||
"ipfs",
|
||||
"libipld 0.1.0 (git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766387f2926838e8e4c9)",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1406,7 +1385,7 @@ dependencies = [
|
||||
"libipld 0.1.0 (git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766387f2926838e8e4c9)",
|
||||
"log 0.4.8",
|
||||
"multibase",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
"openssl",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
@ -1521,7 +1500,7 @@ dependencies = [
|
||||
"libipld-base 0.1.0 (git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766387f2926838e8e4c9)",
|
||||
"libipld-macro 0.1.0 (git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766387f2926838e8e4c9)",
|
||||
"multibase",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
@ -1541,7 +1520,7 @@ dependencies = [
|
||||
"libipld-base 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libipld-macro 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"multibase",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
@ -1552,7 +1531,7 @@ source = "git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766
|
||||
dependencies = [
|
||||
"cid",
|
||||
"multibase",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
@ -1563,7 +1542,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8c4f844ea2e357f53cb53a2dd28a1dd4ab9d61d75d121cb8468b1a12b621970"
|
||||
dependencies = [
|
||||
"cid",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
@ -1573,7 +1552,7 @@ version = "0.1.0"
|
||||
source = "git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766387f2926838e8e4c9#b2286c53c13f3eeec2a3766387f2926838e8e4c9"
|
||||
dependencies = [
|
||||
"libipld-base 0.1.0 (git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766387f2926838e8e4c9)",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1583,7 +1562,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fbc55b3b493e909833fb7af7f5deed4e994e2c625c648777711f8791a02ebb5"
|
||||
dependencies = [
|
||||
"libipld-base 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"multihash",
|
||||
"multihash 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1598,8 +1577,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p"
|
||||
version = "0.16.2"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ec214d189b57e4412f079ac5a1442578d06b12ca7282ba4696104cc92ab96c1"
|
||||
dependencies = [
|
||||
"bytes 0.5.4",
|
||||
"futures 0.3.4",
|
||||
@ -1625,8 +1605,8 @@ dependencies = [
|
||||
"libp2p-wasm-ext",
|
||||
"libp2p-websocket",
|
||||
"libp2p-yamux",
|
||||
"multihash 0.11.1",
|
||||
"parity-multiaddr",
|
||||
"parity-multihash",
|
||||
"parking_lot 0.10.0",
|
||||
"pin-project",
|
||||
"smallvec 1.2.0",
|
||||
@ -1635,21 +1615,23 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-core"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "80a6000296bdbff540b6c00ef82108ef23aa68d195b9333823ea491562c338d7"
|
||||
dependencies = [
|
||||
"asn1_der",
|
||||
"bs58",
|
||||
"ed25519-dalek",
|
||||
"either",
|
||||
"fnv",
|
||||
"futures 0.3.4",
|
||||
"futures-timer 3.0.2",
|
||||
"lazy_static",
|
||||
"libsecp256k1",
|
||||
"log 0.4.8",
|
||||
"multihash 0.11.1",
|
||||
"multistream-select",
|
||||
"parity-multiaddr",
|
||||
"parity-multihash",
|
||||
"parking_lot 0.10.0",
|
||||
"pin-project",
|
||||
"prost",
|
||||
@ -1667,8 +1649,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-core-derive"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "67f0d915bee5d457a6d113377101e1f06e86a4286778aa4c6939553e9a4d7033"
|
||||
dependencies = [
|
||||
"quote 1.0.3",
|
||||
"syn 1.0.17",
|
||||
@ -1676,8 +1659,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-deflate"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "975c847575ef9b3d63f9c11d465e9a9b0ea940cfa408b93cc6981bbc3b1bac40"
|
||||
dependencies = [
|
||||
"flate2",
|
||||
"futures 0.3.4",
|
||||
@ -1686,8 +1670,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-dns"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3cc186d9a941fd0207cf8f08ef225a735e2d7296258f570155e525f6ee732f87"
|
||||
dependencies = [
|
||||
"futures 0.3.4",
|
||||
"libp2p-core",
|
||||
@ -1696,8 +1681,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-floodsub"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6dd8cc558e0edde2d4a423d017efd6b36c1b6bf97f4304c83076895c5edaed8"
|
||||
dependencies = [
|
||||
"cuckoofilter",
|
||||
"fnv",
|
||||
@ -1712,8 +1698,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-gossipsub"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce48659363fe765c09d77eb5b2248e04362557b11bba3701f05879ad34919ccd"
|
||||
dependencies = [
|
||||
"base64 0.11.0",
|
||||
"byteorder 1.3.4",
|
||||
@ -1736,8 +1723,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-identify"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a455af71c59473444eba05e83dbaa20262bdbd9b4154f22389510fbac16f201"
|
||||
dependencies = [
|
||||
"futures 0.3.4",
|
||||
"libp2p-core",
|
||||
@ -1751,8 +1739,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-kad"
|
||||
version = "0.16.2"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41d6c1d5100973527ae70d82687465b17049c1b717a7964de38b8e65000878ff"
|
||||
dependencies = [
|
||||
"arrayvec 0.5.1",
|
||||
"bytes 0.5.4",
|
||||
@ -1763,7 +1752,7 @@ dependencies = [
|
||||
"libp2p-core",
|
||||
"libp2p-swarm",
|
||||
"log 0.4.8",
|
||||
"parity-multihash",
|
||||
"multihash 0.11.1",
|
||||
"prost",
|
||||
"prost-build",
|
||||
"rand 0.7.3",
|
||||
@ -1777,8 +1766,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-mdns"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5bc788d92111802cb0c92d2e032fa6f46333aaeb5650c2f37b5d3eba78cabe6"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"data-encoding",
|
||||
@ -1798,8 +1788,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-mplex"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4095bce2100f840883f1f75dbd010c966ee4ad323ae9f82026396da5cf6cce68"
|
||||
dependencies = [
|
||||
"bytes 0.5.4",
|
||||
"fnv",
|
||||
@ -1813,8 +1804,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-noise"
|
||||
version = "0.16.2"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "84fd504e27b0eadd451e06b67694ef714bd8374044e7db339bb0cdb83755ddf4"
|
||||
dependencies = [
|
||||
"curve25519-dalek",
|
||||
"futures 0.3.4",
|
||||
@ -1833,8 +1825,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-ping"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "82930c36490008b1ef2f26c237a2c205c38ef6edc263738d0528b842740ab09f"
|
||||
dependencies = [
|
||||
"futures 0.3.4",
|
||||
"libp2p-core",
|
||||
@ -1847,8 +1840,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-plaintext"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ad28fe7beaa3e516ee8ba2af8c4f6820f269afa60d661831e879f2afea64f4a0"
|
||||
dependencies = [
|
||||
"bytes 0.5.4",
|
||||
"futures 0.3.4",
|
||||
@ -1864,8 +1858,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-pnet"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dabaa2194e1ce3c51cd78d734dd4c81dc5c7b150b309cbf9029df044034ac261"
|
||||
dependencies = [
|
||||
"futures 0.3.4",
|
||||
"log 0.4.8",
|
||||
@ -1877,8 +1872,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-secio"
|
||||
version = "0.16.1"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22e30b873276846181fa9c04126653678c2797cb1556361d01b7b7fd6bf24682"
|
||||
dependencies = [
|
||||
"aes-ctr",
|
||||
"ctr",
|
||||
@ -1906,12 +1902,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-swarm"
|
||||
version = "0.16.1"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4a8101a0e0d5f04562137a476bf5f5423cd5bdab2f7e43a75909668e63cb102"
|
||||
dependencies = [
|
||||
"futures 0.3.4",
|
||||
"libp2p-core",
|
||||
"log 0.4.8",
|
||||
"rand 0.7.3",
|
||||
"smallvec 1.2.0",
|
||||
"void",
|
||||
"wasm-timer",
|
||||
@ -1919,8 +1917,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-tcp"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4462bd96b97cac3f3a83b1b343ad3c3460cebbc8d929c040b1520c30e3611e08"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"futures 0.3.4",
|
||||
@ -1929,12 +1928,14 @@ dependencies = [
|
||||
"ipnet",
|
||||
"libp2p-core",
|
||||
"log 0.4.8",
|
||||
"socket2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-uds"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69660d235449bb2d99333b9892c9176d06fd2b380490cb8213feb5b015678cf1"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"futures 0.3.4",
|
||||
@ -1944,8 +1945,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-wasm-ext"
|
||||
version = "0.16.2"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f59fdbb5706f2723ca108c088b1c7a37f735a8c328021f0508007162627e9885"
|
||||
dependencies = [
|
||||
"futures 0.3.4",
|
||||
"js-sys",
|
||||
@ -1957,8 +1959,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-websocket"
|
||||
version = "0.16.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "085fbe4c05c4116c2164ab4d5a521eb6e00516c444f61b3ee9f68c7b1e53580b"
|
||||
dependencies = [
|
||||
"async-tls",
|
||||
"bytes 0.5.4",
|
||||
@ -1977,8 +1980,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-yamux"
|
||||
version = "0.16.2"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b305d3a8981e68f11c0e17f2d11d5c52fae95e0d7c283f9e462b5b2dab413b2"
|
||||
dependencies = [
|
||||
"futures 0.3.4",
|
||||
"libp2p-core",
|
||||
@ -2226,6 +2230,21 @@ dependencies = [
|
||||
"unsigned-varint",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "multihash"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae32179a9904ccc6e063de8beee7f5dd55fae85ecb851ca923d55722bc28cf5d"
|
||||
dependencies = [
|
||||
"blake2b_simd",
|
||||
"blake2s_simd",
|
||||
"digest",
|
||||
"sha-1",
|
||||
"sha2",
|
||||
"sha3",
|
||||
"unsigned-varint",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "multimap"
|
||||
version = "0.8.0"
|
||||
@ -2252,14 +2271,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "multistream-select"
|
||||
version = "0.7.0"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "74cdcf7cfb3402881e15a1f95116cb033d69b33c83d481e1234777f5ef0c3d2c"
|
||||
dependencies = [
|
||||
"bytes 0.5.4",
|
||||
"futures 0.1.29",
|
||||
"futures 0.3.4",
|
||||
"log 0.4.8",
|
||||
"pin-project",
|
||||
"smallvec 1.2.0",
|
||||
"tokio-io",
|
||||
"unsigned-varint",
|
||||
]
|
||||
|
||||
@ -2366,14 +2386,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parity-multiaddr"
|
||||
version = "0.7.3"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "12ca96399f4a01aa89c59220c4f52ac371940eb4e53e3ce990da796f364bdf69"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"bs58",
|
||||
"byteorder 1.3.4",
|
||||
"data-encoding",
|
||||
"parity-multihash",
|
||||
"multihash 0.11.1",
|
||||
"percent-encoding",
|
||||
"serde",
|
||||
"static_assertions",
|
||||
@ -2381,20 +2402,6 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parity-multihash"
|
||||
version = "0.2.3"
|
||||
source = "git+https://github.com/koivunej/rust-libp2p.git?branch=v0.16.2-with-floodsub-opts#7314b7dccdc457c574d823cdf7a9cbaf97d6273d"
|
||||
dependencies = [
|
||||
"blake2",
|
||||
"bytes 0.5.4",
|
||||
"rand 0.7.3",
|
||||
"sha-1",
|
||||
"sha2",
|
||||
"sha3",
|
||||
"unsigned-varint",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parity-send-wrapper"
|
||||
version = "0.1.0"
|
||||
@ -2986,11 +2993,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.16.0"
|
||||
version = "0.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b25a18b1bf7387f0145e7f8324e700805aade3842dd3db2e74e4cdeb4677c09e"
|
||||
checksum = "c0d4a31f5d68413404705d6982529b0e11a9aacd4839d1d6222ee3b8cb4015e1"
|
||||
dependencies = [
|
||||
"base64 0.10.1",
|
||||
"base64 0.11.0",
|
||||
"log 0.4.8",
|
||||
"ring",
|
||||
"sct",
|
||||
@ -3232,9 +3239,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.3.11"
|
||||
version = "0.3.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85"
|
||||
checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
@ -4013,18 +4020,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "0.17.0"
|
||||
version = "0.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a262ae37dd9d60f60dd473d1158f9fbebf110ba7b6a5051c8160460f6043718b"
|
||||
checksum = "91cd5736df7f12a964a5067a12c62fa38e1bd8080aff1f80bc29be7c80d19ab4"
|
||||
dependencies = [
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "0.18.0"
|
||||
version = "0.19.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91cd5736df7f12a964a5067a12c62fa38e1bd8080aff1f80bc29be7c80d19ab4"
|
||||
checksum = "f8eff4b7516a57307f9349c64bf34caa34b940b66fed4b2fb3136cb7386e5739"
|
||||
dependencies = [
|
||||
"webpki",
|
||||
]
|
||||
|
12
Cargo.toml
12
Cargo.toml
@ -20,7 +20,7 @@ dirs = "2.0.2"
|
||||
domain = { git = "https://github.com/nlnetlabs/domain", rev="084964", features = ["resolv"] }
|
||||
futures = { version = "0.3.4", features = ["compat", "io-compat"] }
|
||||
libipld = { git = "https://github.com/ipfs-rust/rust-ipld", rev = "b2286c53c13f3eeec2a3766387f2926838e8e4c9", features = ["dag-pb", "dag-json"] }
|
||||
libp2p = "0.16.2"
|
||||
libp2p = "0.19.0"
|
||||
log = "0.4.8"
|
||||
multibase = "0.8.0"
|
||||
multihash = "0.10.1"
|
||||
@ -44,8 +44,8 @@ members = [ "bitswap", "http", "examples" ]
|
||||
[patch.crates-io]
|
||||
ctr = { git = "https://github.com/koivunej/stream-ciphers.git", branch = "ctr128-64to128" }
|
||||
|
||||
# these are needed for the floodsub local originated messages to be seen by subscribers
|
||||
libp2p = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
|
||||
libp2p-core = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
|
||||
libp2p-swarm = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
|
||||
libp2p-floodsub = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
|
||||
# at least these libp2p components need to be patched if you want to use a local version
|
||||
# libp2p = { path = "../libp2p" }
|
||||
# libp2p-core = { path = "../libp2p" }
|
||||
# libp2p-swarm = { path = "../libp2p" }
|
||||
# libp2p-floodsub = { path = "../libp2p" }
|
||||
|
@ -14,8 +14,8 @@ async-trait = "0.1.29"
|
||||
fnv = "1.0.6"
|
||||
futures = "0.3.4"
|
||||
libipld = "0.1.0"
|
||||
libp2p-core = "0.16.0"
|
||||
libp2p-swarm = "0.16.1"
|
||||
libp2p-core = "0.19.0"
|
||||
libp2p-swarm = "0.19.0"
|
||||
log = "0.4.8"
|
||||
multihash = "0.10.1"
|
||||
prost = "0.6.1"
|
||||
|
@ -13,9 +13,11 @@ use fnv::FnvHashSet;
|
||||
use futures::task::Context;
|
||||
use futures::task::Poll;
|
||||
use libipld::cid::Cid;
|
||||
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
|
||||
use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId};
|
||||
use libp2p_swarm::protocols_handler::{IntoProtocolsHandler, OneShotHandler, ProtocolsHandler};
|
||||
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||
use libp2p_swarm::{
|
||||
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
|
||||
};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
|
||||
/// Network behaviour that handles sending and receiving IPFS blocks.
|
||||
@ -94,10 +96,11 @@ impl<TStrategy> Bitswap<TStrategy> {
|
||||
debug!("bitswap: connect");
|
||||
if self.target_peers.insert(peer_id.clone()) {
|
||||
debug!(" queuing dial_peer to {}", peer_id.to_base58());
|
||||
self.events
|
||||
.push_back(NetworkBehaviourAction::DialPeer { peer_id });
|
||||
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
||||
peer_id,
|
||||
condition: DialPeerCondition::Disconnected,
|
||||
});
|
||||
}
|
||||
debug!("");
|
||||
}
|
||||
|
||||
/// Sends a block to the peer.
|
||||
@ -111,10 +114,12 @@ impl<TStrategy> Bitswap<TStrategy> {
|
||||
.expect("Peer not in ledger?!");
|
||||
let message = ledger.send_block(block);
|
||||
debug!(" queuing block for {}", peer_id.to_base58());
|
||||
self.events.push_back(NetworkBehaviourAction::SendEvent {
|
||||
peer_id,
|
||||
event: message,
|
||||
});
|
||||
self.events
|
||||
.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
event: message,
|
||||
handler: NotifyHandler::Any,
|
||||
});
|
||||
debug!("");
|
||||
}
|
||||
|
||||
@ -127,10 +132,12 @@ impl<TStrategy> Bitswap<TStrategy> {
|
||||
message.want_block(cid, *priority);
|
||||
}
|
||||
debug!(" queuing wanted blocks");
|
||||
self.events.push_back(NetworkBehaviourAction::SendEvent {
|
||||
peer_id,
|
||||
event: message,
|
||||
});
|
||||
self.events
|
||||
.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
event: message,
|
||||
handler: NotifyHandler::Any,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,10 +149,12 @@ impl<TStrategy> Bitswap<TStrategy> {
|
||||
for (peer_id, ledger) in self.connected_peers.iter_mut() {
|
||||
let message = ledger.want_block(&cid, priority);
|
||||
debug!(" queuing want for {}", peer_id.to_base58());
|
||||
self.events.push_back(NetworkBehaviourAction::SendEvent {
|
||||
peer_id: peer_id.to_owned(),
|
||||
event: message,
|
||||
});
|
||||
self.events
|
||||
.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id: peer_id.to_owned(),
|
||||
event: message,
|
||||
handler: NotifyHandler::Any,
|
||||
});
|
||||
}
|
||||
self.wanted_blocks.insert(cid, priority);
|
||||
debug!("");
|
||||
@ -163,7 +172,11 @@ impl<TStrategy> Bitswap<TStrategy> {
|
||||
let peer_id = peer_id.to_owned();
|
||||
debug!(" queuing cancel for {}", peer_id.to_base58());
|
||||
self.events
|
||||
.push_back(NetworkBehaviourAction::SendEvent { peer_id, event });
|
||||
.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
event,
|
||||
handler: NotifyHandler::Any,
|
||||
});
|
||||
}
|
||||
}
|
||||
self.wanted_blocks.remove(cid);
|
||||
@ -185,25 +198,19 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer_id: PeerId, cp: ConnectedPoint) {
|
||||
debug!("bitswap: inject_connected");
|
||||
debug!(" peer_id: {}", peer_id.to_base58());
|
||||
debug!(" connected_point: {:?}", cp);
|
||||
fn inject_connected(&mut self, peer_id: &PeerId) {
|
||||
debug!("bitswap: inject_connected {}", peer_id);
|
||||
let ledger = Ledger::new();
|
||||
self.connected_peers.insert(peer_id.clone(), ledger);
|
||||
self.send_want_list(peer_id);
|
||||
debug!("");
|
||||
self.send_want_list(peer_id.clone());
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId, cp: ConnectedPoint) {
|
||||
debug!("bitswap: inject_disconnected {:?}", cp);
|
||||
debug!(" peer_id: {}", peer_id.to_base58());
|
||||
debug!(" connected_point: {:?}", cp);
|
||||
debug!("");
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||
debug!("bitswap: inject_disconnected {:?}", peer_id);
|
||||
//self.connected_peers.remove(peer_id);
|
||||
}
|
||||
|
||||
fn inject_node_event(&mut self, source: PeerId, event: InnerMessage) {
|
||||
fn inject_event(&mut self, source: PeerId, _connection: ConnectionId, event: InnerMessage) {
|
||||
debug!("bitswap: inject_node_event");
|
||||
debug!("{:?}", event);
|
||||
let message = match event {
|
||||
@ -232,7 +239,7 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
|
||||
.process_want(source.clone(), cid.to_owned(), *priority);
|
||||
}
|
||||
// TODO: Remove cancelled `Want` events from the queue.
|
||||
// TODO: Remove cancelled blocks from `SendEvent`.
|
||||
// TODO: Remove cancelled blocks from `NotifyHandler`.
|
||||
debug!("");
|
||||
}
|
||||
|
||||
@ -242,19 +249,33 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
|
||||
{
|
||||
// TODO concat messages to same destination to reduce traffic.
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
if let NetworkBehaviourAction::SendEvent { peer_id, event } = event {
|
||||
if let NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
event,
|
||||
handler,
|
||||
} = event
|
||||
{
|
||||
match self.connected_peers.get_mut(&peer_id) {
|
||||
None => {
|
||||
debug!(" requeueing send event to {}", peer_id.to_base58());
|
||||
debug!(" requeueing send event to {}", peer_id);
|
||||
// FIXME: I wonder if this should be
|
||||
self.events
|
||||
.push_back(NetworkBehaviourAction::SendEvent { peer_id, event })
|
||||
.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
event,
|
||||
handler,
|
||||
})
|
||||
}
|
||||
Some(ref mut ledger) => {
|
||||
// FIXME: this is a bit early to update stats as the block hasn't been sent
|
||||
// to anywhere at this point.
|
||||
ledger.update_outgoing_stats(&event);
|
||||
debug!(" send_message to {}", peer_id.to_base58());
|
||||
return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event });
|
||||
debug!(" send_message to {}", peer_id);
|
||||
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
event,
|
||||
handler,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
311
src/lib.rs
311
src/lib.rs
@ -17,10 +17,11 @@ use futures::stream::Fuse;
|
||||
pub use libipld::cid::Cid;
|
||||
use libipld::cid::Codec;
|
||||
pub use libipld::ipld::Ipld;
|
||||
pub use libp2p::core::{ConnectedPoint, Multiaddr, PeerId, PublicKey};
|
||||
pub use libp2p::core::{connection::ListenerId, ConnectedPoint, Multiaddr, PeerId, PublicKey};
|
||||
pub use libp2p::identity::Keypair;
|
||||
|
||||
use std::borrow::Borrow;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
@ -245,6 +246,8 @@ enum IpfsEvent {
|
||||
PubsubSubscribed(OneshotSender<Vec<String>>),
|
||||
WantList(Option<PeerId>, OneshotSender<Vec<(Cid, bitswap::Priority)>>),
|
||||
BitswapStats(OneshotSender<BitswapStats>),
|
||||
AddListeningAddress(Multiaddr, Channel<Multiaddr>),
|
||||
RemoveListeningAddress(Multiaddr, Channel<()>),
|
||||
Exit,
|
||||
}
|
||||
|
||||
@ -298,6 +301,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
|
||||
repo_events: repo_events.fuse(),
|
||||
from_facade: receiver.fuse(),
|
||||
swarm,
|
||||
listening_addresses: HashMap::new(),
|
||||
};
|
||||
|
||||
let UninitializedIpfs {
|
||||
@ -538,6 +542,45 @@ impl<Types: IpfsTypes> Ipfs<Types> {
|
||||
Ok(rx.await?)
|
||||
}
|
||||
|
||||
/// Add a given multiaddr as a listening address. Will fail if the address is unsupported, or
|
||||
/// if it is already being listened on. Currently will invoke `Swarm::listen_on` internally,
|
||||
/// keep the ListenerId for later `remove_listening_address` use in a HashMap.
|
||||
///
|
||||
/// The returned future will resolve on the first bound listening address when this is called
|
||||
/// with `/ip4/0.0.0.0/...` or anything similar which will bound through multiple concrete
|
||||
/// listening addresses.
|
||||
///
|
||||
/// Trying to add an unspecified listening address while any other listening address adding is
|
||||
/// in progress will result in error.
|
||||
///
|
||||
/// Returns the bound multiaddress, which in the case of original containing an ephemeral port
|
||||
/// has now been changed.
|
||||
pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<Multiaddr, Error> {
|
||||
let (tx, rx) = oneshot_channel();
|
||||
|
||||
self.to_task
|
||||
.clone()
|
||||
.send(IpfsEvent::AddListeningAddress(addr, tx))
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
}
|
||||
|
||||
/// Stop listening on a previously added listening address. Fails if the address is not being
|
||||
/// listened to.
|
||||
///
|
||||
/// The removal of all listening addresses added through unspecified addresses is not supported.
|
||||
pub async fn remove_listening_address(&self, addr: Multiaddr) -> Result<(), Error> {
|
||||
let (tx, rx) = oneshot_channel();
|
||||
|
||||
self.to_task
|
||||
.clone()
|
||||
.send(IpfsEvent::RemoveListeningAddress(addr, tx))
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
}
|
||||
|
||||
/// Exit daemon.
|
||||
pub async fn exit_daemon(mut self) {
|
||||
// FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
|
||||
@ -556,6 +599,121 @@ struct IpfsFuture<Types: SwarmTypes> {
|
||||
swarm: TSwarm<Types>,
|
||||
repo_events: Fuse<Receiver<RepoEvent>>,
|
||||
from_facade: Fuse<Receiver<IpfsEvent>>,
|
||||
listening_addresses: HashMap<Multiaddr, (ListenerId, Option<Channel<Multiaddr>>)>,
|
||||
}
|
||||
|
||||
impl<Types: SwarmTypes> IpfsFuture<Types> {
|
||||
/// Completes the adding of listening address by matching the new listening address `addr` to
|
||||
/// the `self.listening_addresses` so that we can detect even the multiaddresses with ephemeral
|
||||
/// ports.
|
||||
fn complete_listening_address_adding(&mut self, addr: Multiaddr) {
|
||||
let maybe_sender = match self.listening_addresses.get_mut(&addr) {
|
||||
// matching a non-ephemeral is simpler
|
||||
Some((_, maybe_sender)) => maybe_sender.take(),
|
||||
None => {
|
||||
// try finding an ephemeral binding on the same prefix
|
||||
let mut matching_keys = self
|
||||
.listening_addresses
|
||||
.keys()
|
||||
.filter(|right| could_be_bound_from_ephemeral(0, &addr, right))
|
||||
.cloned();
|
||||
|
||||
let first = matching_keys.next();
|
||||
|
||||
if let Some(first) = first {
|
||||
let second = matching_keys.next();
|
||||
|
||||
match (first, second) {
|
||||
(first, None) => {
|
||||
if let Some((id, maybe_sender)) =
|
||||
self.listening_addresses.remove(&first)
|
||||
{
|
||||
self.listening_addresses.insert(addr.clone(), (id, None));
|
||||
maybe_sender
|
||||
} else {
|
||||
unreachable!("We found a matching ephemeral key already, it must be in the listening_addresses")
|
||||
}
|
||||
}
|
||||
(first, Some(second)) => {
|
||||
// this is more complicated, but we are guarding
|
||||
// against this in the from_facade match below
|
||||
unreachable!(
|
||||
"More than one matching [{}, {}] and {:?} for {}",
|
||||
first,
|
||||
second,
|
||||
matching_keys.collect::<Vec<_>>(),
|
||||
addr
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// this case is hit when user asks for /ip4/0.0.0.0/tcp/0 for example, the
|
||||
// libp2p will bound to multiple addresses but we will not get access in 0.19
|
||||
// to their ListenerIds.
|
||||
|
||||
let first = self
|
||||
.listening_addresses
|
||||
.iter()
|
||||
.filter(|(addr, _)| starts_unspecified(addr))
|
||||
.filter(|(could_have_ephemeral, _)| {
|
||||
could_be_bound_from_ephemeral(1, &addr, could_have_ephemeral)
|
||||
})
|
||||
// finally we want to make sure we only match on addresses which are yet to
|
||||
// be reported back
|
||||
.filter(|(_, (_, maybe_sender))| maybe_sender.is_some())
|
||||
.map(|(addr, _)| addr.to_owned())
|
||||
.next();
|
||||
|
||||
if let Some(first) = first {
|
||||
let (id, maybe_sender) = self
|
||||
.listening_addresses
|
||||
.remove(&first)
|
||||
.expect("just filtered this key out");
|
||||
self.listening_addresses.insert(addr.clone(), (id, None));
|
||||
log::trace!("guessing the first match for {} to be {}", first, addr);
|
||||
maybe_sender
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(sender) = maybe_sender {
|
||||
let _ = sender.send(Ok(addr));
|
||||
}
|
||||
}
|
||||
|
||||
fn start_add_listener_address(&mut self, addr: Multiaddr, ret: Channel<Multiaddr>) {
|
||||
use libp2p::Swarm;
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
if starts_unspecified(&addr)
|
||||
&& self
|
||||
.listening_addresses
|
||||
.values()
|
||||
.filter(|(_, maybe_sender)| maybe_sender.is_some())
|
||||
.count()
|
||||
> 0
|
||||
{
|
||||
let _ = ret.send(Err(format_err!("Cannot start listening to unspecified address when there are pending specified addresses awaiting")));
|
||||
return;
|
||||
}
|
||||
|
||||
match self.listening_addresses.entry(addr) {
|
||||
Entry::Occupied(oe) => {
|
||||
let _ = ret.send(Err(format_err!("Already adding a possibly ephemeral multiaddr, wait first one to resolve before adding next: {}", oe.key())));
|
||||
}
|
||||
Entry::Vacant(ve) => match Swarm::listen_on(&mut self.swarm, ve.key().to_owned()) {
|
||||
Ok(id) => {
|
||||
ve.insert((id, Some(ret)));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = ret.send(Err(Error::from(e)));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
|
||||
@ -566,8 +724,7 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
|
||||
use libp2p::{swarm::SwarmEvent, Swarm};
|
||||
|
||||
// begin by polling the swarm so that initially it'll first have chance to bind listeners
|
||||
// and such. TODO: this no longer needs to be a swarm event but perhaps we should
|
||||
// consolidate logging of these events here, if necessary?
|
||||
// and such.
|
||||
|
||||
let mut done = false;
|
||||
|
||||
@ -582,19 +739,12 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
|
||||
Poll::Pending => break,
|
||||
}
|
||||
};
|
||||
// as a swarm event was returned, we need to do at least one more round to fully
|
||||
// exhaust the swarm before possibly causing the swarm to do more work by popping
|
||||
// off the events from Ipfs and ... this looping goes on for a while.
|
||||
done = false;
|
||||
match inner {
|
||||
SwarmEvent::Behaviour(()) => {}
|
||||
SwarmEvent::Connected(_peer_id) => {}
|
||||
SwarmEvent::Disconnected(_peer_id) => {}
|
||||
SwarmEvent::NewListenAddr(_addr) => {}
|
||||
SwarmEvent::ExpiredListenAddr(_addr) => {}
|
||||
SwarmEvent::UnreachableAddr {
|
||||
peer_id: _peer_id,
|
||||
address: _address,
|
||||
error: _error,
|
||||
} => {}
|
||||
SwarmEvent::StartConnect(_peer_id) => {}
|
||||
if let SwarmEvent::NewListenAddr(addr) = inner {
|
||||
self.complete_listening_address_adding(addr);
|
||||
}
|
||||
}
|
||||
|
||||
@ -623,7 +773,7 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
|
||||
}
|
||||
IpfsEvent::Connections(ret) => {
|
||||
let connections = self.swarm.connections();
|
||||
ret.send(Ok(connections)).ok();
|
||||
ret.send(Ok(connections.collect())).ok();
|
||||
}
|
||||
IpfsEvent::Disconnect(addr, ret) => {
|
||||
if let Some(disconnector) = self.swarm.disconnect(addr) {
|
||||
@ -676,6 +826,24 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
|
||||
let wantlist = self.swarm.bitswap().local_wantlist();
|
||||
let _ = ret.send((stats, peers, wantlist).into());
|
||||
}
|
||||
IpfsEvent::AddListeningAddress(addr, ret) => {
|
||||
self.start_add_listener_address(addr, ret);
|
||||
}
|
||||
IpfsEvent::RemoveListeningAddress(addr, ret) => {
|
||||
let removed = if let Some((id, _)) = self.listening_addresses.remove(&addr)
|
||||
{
|
||||
Swarm::remove_listener(&mut self.swarm, id).map_err(|_: ()| {
|
||||
format_err!(
|
||||
"Failed to remove previously added listening address: {}",
|
||||
addr
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Err(format_err!("Address was not listened to before: {}", addr))
|
||||
};
|
||||
|
||||
let _ = ret.send(removed);
|
||||
}
|
||||
IpfsEvent::Exit => {
|
||||
// FIXME: we could do a proper teardown
|
||||
return Poll::Ready(());
|
||||
@ -778,15 +946,126 @@ mod node {
|
||||
}
|
||||
}
|
||||
|
||||
// Checks if the multiaddr starts with ip4 or ip6 unspecified address, like 0.0.0.0
|
||||
fn starts_unspecified(addr: &Multiaddr) -> bool {
|
||||
use libp2p::core::multiaddr::Protocol;
|
||||
|
||||
match addr.iter().next() {
|
||||
Some(Protocol::Ip4(ip4)) if ip4.is_unspecified() => true,
|
||||
Some(Protocol::Ip6(ip6)) if ip6.is_unspecified() => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn could_be_bound_from_ephemeral(
|
||||
skip: usize,
|
||||
bound: &Multiaddr,
|
||||
may_have_ephemeral: &Multiaddr,
|
||||
) -> bool {
|
||||
use libp2p::core::multiaddr::Protocol;
|
||||
|
||||
if bound.len() != may_have_ephemeral.len() {
|
||||
// no zip_longest in std
|
||||
false
|
||||
} else {
|
||||
// this is could be wrong at least in the future; /p2p/peerid is not a
|
||||
// valid suffix but I could imagine some kind of ws or webrtc could
|
||||
// give us issues in the long future?
|
||||
bound
|
||||
.iter()
|
||||
.skip(skip)
|
||||
.zip(may_have_ephemeral.iter().skip(skip))
|
||||
.all(|(left, right)| match (right, left) {
|
||||
(Protocol::Tcp(0), Protocol::Tcp(x))
|
||||
| (Protocol::Udp(0), Protocol::Udp(x))
|
||||
| (Protocol::Sctp(0), Protocol::Sctp(x)) => {
|
||||
assert_ne!(x, 0, "cannot have bound to port 0");
|
||||
true
|
||||
}
|
||||
(Protocol::Memory(0), Protocol::Memory(x)) => {
|
||||
assert_ne!(x, 0, "cannot have bound to port 0");
|
||||
true
|
||||
}
|
||||
(right, left) => right == left,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_std::task;
|
||||
use libipld::ipld;
|
||||
use libp2p::build_multiaddr;
|
||||
use multihash::Sha2_256;
|
||||
|
||||
const MDNS: bool = false;
|
||||
|
||||
#[test]
|
||||
fn unspecified_multiaddrs() {
|
||||
assert!(starts_unspecified(&build_multiaddr!(
|
||||
Ip4([0, 0, 0, 0]),
|
||||
Tcp(1u16)
|
||||
)));
|
||||
assert!(starts_unspecified(&build_multiaddr!(
|
||||
Ip6([0, 0, 0, 0, 0, 0, 0, 0]),
|
||||
Tcp(1u16)
|
||||
)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn localhost_multiaddrs_are_not_unspecified() {
|
||||
assert!(!starts_unspecified(&build_multiaddr!(
|
||||
Ip4([127, 0, 0, 1]),
|
||||
Tcp(1u16)
|
||||
)));
|
||||
assert!(!starts_unspecified(&build_multiaddr!(
|
||||
Ip6([0, 0, 0, 0, 0, 0, 0, 1]),
|
||||
Tcp(1u16)
|
||||
)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bound_ephemerals() {
|
||||
assert!(could_be_bound_from_ephemeral(
|
||||
0,
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(55555u16)),
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16))
|
||||
));
|
||||
assert!(could_be_bound_from_ephemeral(
|
||||
1,
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(55555u16)),
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16))
|
||||
));
|
||||
assert!(could_be_bound_from_ephemeral(
|
||||
1,
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(55555u16)),
|
||||
&build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16))
|
||||
));
|
||||
assert!(could_be_bound_from_ephemeral(
|
||||
1,
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(55555u16)),
|
||||
&build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16))
|
||||
));
|
||||
|
||||
assert!(!could_be_bound_from_ephemeral(
|
||||
0,
|
||||
&build_multiaddr!(Ip4([192, 168, 0, 1]), Tcp(55555u16)),
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16))
|
||||
));
|
||||
assert!(could_be_bound_from_ephemeral(
|
||||
1,
|
||||
&build_multiaddr!(Ip4([192, 168, 0, 1]), Tcp(55555u16)),
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16))
|
||||
));
|
||||
|
||||
assert!(!could_be_bound_from_ephemeral(
|
||||
1,
|
||||
&build_multiaddr!(Ip4([192, 168, 0, 1]), Tcp(55555u16)),
|
||||
&build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(44444u16))
|
||||
));
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn test_put_and_get_block() {
|
||||
let options = IpfsOptions::inmemory_with_generated_keys(MDNS);
|
||||
|
@ -58,18 +58,22 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<KademliaEvent>
|
||||
for Behaviour<TSwarmTypes>
|
||||
{
|
||||
fn inject_event(&mut self, event: KademliaEvent) {
|
||||
use libp2p::kad::{GetProvidersError, GetProvidersOk};
|
||||
use libp2p::kad::{GetProvidersError, GetProvidersOk, QueryResult};
|
||||
|
||||
match event {
|
||||
KademliaEvent::Discovered { peer_id, ty, .. } => {
|
||||
log::trace!("kad: Discovered peer {} {:?}", peer_id.to_base58(), ty);
|
||||
self.add_peer(peer_id);
|
||||
}
|
||||
KademliaEvent::GetProvidersResult(Ok(GetProvidersOk {
|
||||
key,
|
||||
providers,
|
||||
closest_peers,
|
||||
})) => {
|
||||
KademliaEvent::QueryResult {
|
||||
result:
|
||||
QueryResult::GetProviders(Ok(GetProvidersOk {
|
||||
key,
|
||||
providers,
|
||||
closest_peers,
|
||||
})),
|
||||
..
|
||||
} => {
|
||||
// FIXME: really wasteful to run this through Vec
|
||||
let cid = PeerId::from_bytes(key.to_vec()).unwrap().to_base58();
|
||||
if providers.is_empty() {
|
||||
@ -82,7 +86,10 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<KademliaEvent>
|
||||
}
|
||||
}
|
||||
}
|
||||
KademliaEvent::GetProvidersResult(Err(GetProvidersError::Timeout { key, .. })) => {
|
||||
KademliaEvent::QueryResult {
|
||||
result: QueryResult::GetProviders(Err(GetProvidersError::Timeout { key, .. })),
|
||||
..
|
||||
} => {
|
||||
// FIXME: really wasteful to run this through Vec
|
||||
let cid = PeerId::from_bytes(key.to_vec()).unwrap().to_base58();
|
||||
warn!("kad: timed out get providers query for {}", cid);
|
||||
@ -113,13 +120,13 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<PingEvent> for Behavi
|
||||
peer,
|
||||
result: Result::Ok(PingSuccess::Pong),
|
||||
} => {
|
||||
log::trace!("ping: pong from {}", peer.to_base58());
|
||||
log::trace!("ping: pong from {}", peer);
|
||||
}
|
||||
PingEvent {
|
||||
peer,
|
||||
result: Result::Err(PingFailure::Timeout),
|
||||
} => {
|
||||
log::trace!("ping: timeout to {}", peer.to_base58());
|
||||
log::trace!("ping: timeout to {}", peer);
|
||||
self.remove_peer(&peer);
|
||||
}
|
||||
PingEvent {
|
||||
@ -202,8 +209,8 @@ impl<TSwarmTypes: SwarmTypes> Behaviour<TSwarmTypes> {
|
||||
addrs
|
||||
}
|
||||
|
||||
pub fn connections(&self) -> Vec<Connection> {
|
||||
self.swarm.connections().cloned().collect()
|
||||
pub fn connections(&self) -> impl Iterator<Item = Connection> + '_ {
|
||||
self.swarm.connections()
|
||||
}
|
||||
|
||||
pub fn connect(&mut self, addr: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
|
||||
|
@ -7,8 +7,11 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
|
||||
use libp2p::floodsub::{Floodsub, FloodsubEvent, FloodsubMessage, FloodsubOptions, Topic};
|
||||
use libp2p::core::{
|
||||
connection::{ConnectedPoint, ConnectionId, ListenerId},
|
||||
Multiaddr, PeerId,
|
||||
};
|
||||
use libp2p::floodsub::{Floodsub, FloodsubConfig, FloodsubEvent, FloodsubMessage, Topic};
|
||||
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
|
||||
|
||||
/// Currently a thin wrapper around Floodsub, perhaps supporting both Gossipsub and Floodsub later.
|
||||
@ -125,12 +128,12 @@ impl Pubsub {
|
||||
/// top of the floodsub.
|
||||
pub fn new(peer_id: PeerId) -> Self {
|
||||
let (tx, rx) = channel::unbounded();
|
||||
let mut opts = FloodsubOptions::new(peer_id);
|
||||
opts.subscribe_local_messages = true;
|
||||
let mut config = FloodsubConfig::new(peer_id);
|
||||
config.subscribe_local_messages = true;
|
||||
Pubsub {
|
||||
streams: HashMap::new(),
|
||||
peers: HashMap::new(),
|
||||
floodsub: Floodsub::from_options(opts),
|
||||
floodsub: Floodsub::from_config(config),
|
||||
unsubscriptions: (tx, rx),
|
||||
}
|
||||
}
|
||||
@ -243,20 +246,41 @@ impl NetworkBehaviour for Pubsub {
|
||||
self.floodsub.addresses_of_peer(peer_id)
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer_id: PeerId, cp: ConnectedPoint) {
|
||||
self.floodsub.inject_connected(peer_id, cp)
|
||||
fn inject_connected(&mut self, peer_id: &PeerId) {
|
||||
self.floodsub.inject_connected(peer_id)
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId, cp: ConnectedPoint) {
|
||||
self.floodsub.inject_disconnected(peer_id, cp)
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||
self.floodsub.inject_disconnected(peer_id)
|
||||
}
|
||||
|
||||
fn inject_node_event(
|
||||
fn inject_connection_established(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
connection_id: &ConnectionId,
|
||||
connected_point: &ConnectedPoint,
|
||||
) {
|
||||
self.floodsub
|
||||
.inject_connection_established(peer_id, connection_id, connected_point)
|
||||
}
|
||||
|
||||
fn inject_connection_closed(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
connection_id: &ConnectionId,
|
||||
connected_point: &ConnectedPoint,
|
||||
) {
|
||||
self.floodsub
|
||||
.inject_connection_closed(peer_id, connection_id, connected_point)
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
connection: ConnectionId,
|
||||
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||
) {
|
||||
self.floodsub.inject_node_event(peer_id, event)
|
||||
self.floodsub.inject_event(peer_id, connection, event)
|
||||
}
|
||||
|
||||
fn inject_addr_reach_failure(
|
||||
@ -269,6 +293,26 @@ impl NetworkBehaviour for Pubsub {
|
||||
.inject_addr_reach_failure(peer_id, addr, error)
|
||||
}
|
||||
|
||||
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||
self.floodsub.inject_dial_failure(peer_id)
|
||||
}
|
||||
|
||||
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
self.floodsub.inject_new_listen_addr(addr)
|
||||
}
|
||||
|
||||
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
self.floodsub.inject_expired_listen_addr(addr)
|
||||
}
|
||||
|
||||
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||
self.floodsub.inject_new_external_addr(addr)
|
||||
}
|
||||
|
||||
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
|
||||
self.floodsub.inject_listener_error(id, err)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
@ -363,11 +407,19 @@ impl NetworkBehaviour for Pubsub {
|
||||
NetworkBehaviourAction::DialAddress { address } => {
|
||||
return Poll::Ready(NetworkBehaviourAction::DialAddress { address });
|
||||
}
|
||||
NetworkBehaviourAction::DialPeer { peer_id } => {
|
||||
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id });
|
||||
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
|
||||
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition });
|
||||
}
|
||||
NetworkBehaviourAction::SendEvent { peer_id, event } => {
|
||||
return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event });
|
||||
NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
event,
|
||||
handler,
|
||||
} => {
|
||||
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
event,
|
||||
handler,
|
||||
});
|
||||
}
|
||||
NetworkBehaviourAction::ReportObservedAddr { address } => {
|
||||
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address });
|
||||
|
159
src/p2p/swarm.rs
159
src/p2p/swarm.rs
@ -1,6 +1,6 @@
|
||||
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
|
||||
use core::task::{Context, Poll};
|
||||
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
|
||||
use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
|
||||
use libp2p::swarm::protocols_handler::{
|
||||
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
|
||||
};
|
||||
@ -8,19 +8,27 @@ use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
/// A description of currently active connection.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct Connection {
|
||||
/// The connected peer.
|
||||
pub peer_id: PeerId,
|
||||
/// Any connecting address of the peer as peers can have multiple connections to
|
||||
pub address: Multiaddr,
|
||||
/// Latest ping report on any of the connections
|
||||
pub rtt: Option<Duration>,
|
||||
}
|
||||
|
||||
/// Disconnected will use banning to disconnect a node. Disconnecting a single peer connection is
|
||||
/// not supported at the moment.
|
||||
pub struct Disconnector {
|
||||
peer_id: PeerId,
|
||||
}
|
||||
|
||||
impl Disconnector {
|
||||
pub fn disconnect<T: NetworkBehaviour>(self, swarm: &mut Swarm<T>) {
|
||||
pub fn disconnect<T: NetworkBehaviour>(self, swarm: &mut Swarm<T>)
|
||||
where <<<T as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: std::clone::Clone
|
||||
{
|
||||
Swarm::ban_peer_id(swarm, self.peer_id.clone());
|
||||
Swarm::unban_peer_id(swarm, self.peer_id);
|
||||
}
|
||||
@ -34,8 +42,9 @@ pub struct SwarmApi {
|
||||
events: VecDeque<NetworkBehaviourAction>,
|
||||
peers: HashSet<PeerId>,
|
||||
connect_registry: SubscriptionRegistry<Multiaddr, Result<(), String>>,
|
||||
connections: HashMap<Multiaddr, Connection>,
|
||||
connected_peers: HashMap<PeerId, Multiaddr>,
|
||||
connections: HashMap<Multiaddr, PeerId>,
|
||||
roundtrip_times: HashMap<PeerId, Duration>,
|
||||
connected_peers: HashMap<PeerId, Vec<Multiaddr>>,
|
||||
}
|
||||
|
||||
impl SwarmApi {
|
||||
@ -55,16 +64,27 @@ impl SwarmApi {
|
||||
self.peers.remove(peer_id);
|
||||
}
|
||||
|
||||
pub fn connections(&self) -> impl Iterator<Item = &Connection> {
|
||||
self.connections.iter().map(|(_, conn)| conn)
|
||||
pub fn connections(&self) -> impl Iterator<Item = Connection> + '_ {
|
||||
self.connected_peers
|
||||
.iter()
|
||||
.filter_map(move |(peer, conns)| {
|
||||
let rtt = self.roundtrip_times.get(peer).cloned();
|
||||
|
||||
if let Some(any) = conns.first() {
|
||||
Some(Connection {
|
||||
peer_id: peer.clone(),
|
||||
address: any.clone(),
|
||||
rtt,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_rtt(&mut self, peer_id: &PeerId, rtt: Duration) {
|
||||
if let Some(addr) = self.connected_peers.get(peer_id) {
|
||||
if let Some(mut conn) = self.connections.get_mut(addr) {
|
||||
conn.rtt = Some(rtt);
|
||||
}
|
||||
}
|
||||
// FIXME: this is for any connection
|
||||
self.roundtrip_times.insert(peer_id.clone(), rtt);
|
||||
}
|
||||
|
||||
pub fn connect(&mut self, address: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
|
||||
@ -77,15 +97,28 @@ impl SwarmApi {
|
||||
|
||||
pub fn disconnect(&mut self, address: Multiaddr) -> Option<Disconnector> {
|
||||
log::trace!("disconnect {}", address);
|
||||
self.connections.remove(&address);
|
||||
let peer_id = self
|
||||
.connections
|
||||
.get(&address)
|
||||
.map(|conn| conn.peer_id.clone());
|
||||
if let Some(peer_id) = &peer_id {
|
||||
self.connected_peers.remove(peer_id);
|
||||
// FIXME: closing a single specific connection would be allowed for ProtocolHandlers
|
||||
let peer_id = self.connections.remove(&address);
|
||||
|
||||
if let Some(peer_id) = peer_id {
|
||||
// wasted some time wondering if the peer should be removed here or not; it should. the
|
||||
// API is a bit ackward since we can't tolerate the Disconnector::disconnect **not**
|
||||
// being called.
|
||||
//
|
||||
// there are currently no events being fired from the closing of connections to banned
|
||||
// peer, so we need to modify the accounting even before the banning happens.
|
||||
self.mark_disconnected(&peer_id);
|
||||
Some(Disconnector { peer_id })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
peer_id.map(|peer_id| Disconnector { peer_id })
|
||||
}
|
||||
|
||||
fn mark_disconnected(&mut self, peer_id: &PeerId) {
|
||||
for address in self.connected_peers.remove(peer_id).into_iter().flatten() {
|
||||
self.connections.remove(&address);
|
||||
}
|
||||
self.roundtrip_times.remove(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,41 +133,66 @@ impl NetworkBehaviour for SwarmApi {
|
||||
|
||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||
log::trace!("addresses_of_peer {}", peer_id);
|
||||
if let Some(addr) = self.connected_peers.get(peer_id).cloned() {
|
||||
vec![addr]
|
||||
} else {
|
||||
Default::default()
|
||||
}
|
||||
self.connected_peers
|
||||
.get(peer_id)
|
||||
.cloned()
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer_id: PeerId, cp: ConnectedPoint) {
|
||||
log::trace!("inject_connected {} {:?}", peer_id.to_string(), cp);
|
||||
let addr = match cp {
|
||||
ConnectedPoint::Dialer { address } => address,
|
||||
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
|
||||
};
|
||||
let conn = Connection {
|
||||
peer_id: peer_id.clone(),
|
||||
address: addr.clone(),
|
||||
rtt: None,
|
||||
};
|
||||
fn inject_connection_established(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
_id: &ConnectionId,
|
||||
cp: &ConnectedPoint,
|
||||
) {
|
||||
// TODO: could be that the connection is not yet fully established at this point
|
||||
log::trace!("inject_connected {} {:?}", peer_id, cp);
|
||||
let addr = connection_point_addr(cp);
|
||||
self.peers.insert(peer_id.clone());
|
||||
self.connected_peers.insert(peer_id, addr.clone());
|
||||
self.connections.insert(addr.clone(), conn);
|
||||
let connections = self.connected_peers.entry(peer_id.clone()).or_default();
|
||||
|
||||
connections.push(addr.clone());
|
||||
|
||||
self.connections.insert(addr.clone(), peer_id.clone());
|
||||
self.connect_registry.finish_subscription(&addr, Ok(()));
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId, cp: ConnectedPoint) {
|
||||
log::trace!("inject_disconnected {} {:?}", peer_id.to_string(), cp);
|
||||
let addr = match cp {
|
||||
ConnectedPoint::Dialer { address } => address,
|
||||
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
|
||||
};
|
||||
self.connected_peers.remove(peer_id);
|
||||
self.connections.remove(&addr);
|
||||
fn inject_connected(&mut self, _peer_id: &PeerId) {
|
||||
// we have at least one fully open connection and handler is running
|
||||
}
|
||||
|
||||
fn inject_node_event(&mut self, _peer_id: PeerId, _event: void::Void) {}
|
||||
fn inject_connection_closed(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
_id: &ConnectionId,
|
||||
cp: &ConnectedPoint,
|
||||
) {
|
||||
log::trace!("inject_connection_closed {} {:?}", peer_id, cp);
|
||||
let closed_addr = connection_point_addr(cp);
|
||||
let became_empty = if let Some(connections) = self.connected_peers.get_mut(peer_id) {
|
||||
if let Some(index) = connections.iter().position(|addr| addr == closed_addr) {
|
||||
connections.swap_remove(index);
|
||||
}
|
||||
connections.is_empty()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if became_empty {
|
||||
self.connected_peers.remove(peer_id);
|
||||
}
|
||||
self.connections.remove(closed_addr);
|
||||
// FIXME: should be an error
|
||||
self.connect_registry
|
||||
.finish_subscription(closed_addr, Ok(()));
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||
// in rust-libp2p 0.19 this at least will not be invoked for a peer we boot by banning it.
|
||||
log::trace!("inject_disconnected: {}", peer_id);
|
||||
self.mark_disconnected(peer_id);
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}
|
||||
|
||||
fn inject_addr_reach_failure(
|
||||
&mut self,
|
||||
@ -160,6 +218,13 @@ impl NetworkBehaviour for SwarmApi {
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_point_addr(cp: &ConnectedPoint) -> &Multiaddr {
|
||||
match cp {
|
||||
ConnectedPoint::Dialer { address } => address,
|
||||
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -69,3 +69,57 @@ fn connect_two_nodes() {
|
||||
jh.await;
|
||||
});
|
||||
}
|
||||
|
||||
/// More complicated one to the above; first node will have two listening addresses and the second
|
||||
/// one should dial both of the addresses, resulting in two connections.
|
||||
#[test]
|
||||
fn connect_two_nodes_with_two_connections_doesnt_panic() {
|
||||
const MDNS: bool = false;
|
||||
|
||||
task::block_on(async move {
|
||||
let node_a = ipfs::Node::new(MDNS).await;
|
||||
let node_b = ipfs::Node::new(MDNS).await;
|
||||
|
||||
node_a
|
||||
.add_listening_address(libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let addresses = node_a.addrs_local().await.unwrap();
|
||||
assert_eq!(
|
||||
addresses.len(),
|
||||
2,
|
||||
"there should had been two local addresses, found {:?}",
|
||||
addresses
|
||||
);
|
||||
|
||||
for addr in addresses {
|
||||
node_b.connect(addr).await.unwrap();
|
||||
}
|
||||
|
||||
// not too sure on this, since there'll be a single peer but two connections; the return
|
||||
// type is `Vec<Connection>` but it's peer with any connection.
|
||||
let mut peers = node_a.peers().await.unwrap();
|
||||
assert_eq!(
|
||||
peers.len(),
|
||||
1,
|
||||
"there should had been one peer, found {:?}",
|
||||
peers
|
||||
);
|
||||
|
||||
// sadly we are unable to currently verify that there exists two connections for the node_b
|
||||
// peer..
|
||||
|
||||
node_a
|
||||
.disconnect(peers.remove(0).address)
|
||||
.await
|
||||
.expect("failed to disconnect peer_b at peer_a");
|
||||
|
||||
let peers = node_a.peers().await.unwrap();
|
||||
assert!(
|
||||
peers.is_empty(),
|
||||
"node_b was still connected after disconnect: {:?}",
|
||||
peers
|
||||
);
|
||||
});
|
||||
}
|
||||
|
116
tests/multiple_listening_addresses.rs
Normal file
116
tests/multiple_listening_addresses.rs
Normal file
@ -0,0 +1,116 @@
|
||||
use async_std::task;
|
||||
|
||||
#[test]
|
||||
fn multiple_consecutive_ephemeral_listening_addresses() {
|
||||
const MDNS: bool = false;
|
||||
task::block_on(async move {
|
||||
let node = ipfs::Node::new(MDNS).await;
|
||||
|
||||
let target = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16));
|
||||
|
||||
let first = node.add_listening_address(target.clone()).await.unwrap();
|
||||
assert_ne!(target, first);
|
||||
|
||||
let second = node.add_listening_address(target.clone()).await.unwrap();
|
||||
assert_ne!(target, second);
|
||||
assert_ne!(first, second);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() {
|
||||
const MDNS: bool = false;
|
||||
task::block_on(async move {
|
||||
let node = ipfs::Node::new(MDNS).await;
|
||||
|
||||
let target = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16));
|
||||
|
||||
let first = node.add_listening_address(target.clone());
|
||||
let second = node.add_listening_address(target);
|
||||
|
||||
let (first, second) = futures::future::join(first, second).await;
|
||||
|
||||
// one should succeed and other should fail
|
||||
assert_eq!(first.is_ok(), second.is_err());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() {
|
||||
const MDNS: bool = false;
|
||||
task::block_on(async move {
|
||||
let node = ipfs::Node::new(MDNS).await;
|
||||
|
||||
// it doesnt work on mac os x as 127.0.0.2 is not enabled by default.
|
||||
let first =
|
||||
node.add_listening_address(libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)));
|
||||
let second =
|
||||
node.add_listening_address(libp2p::build_multiaddr!(Ip4([127, 0, 0, 2]), Tcp(0u16)));
|
||||
|
||||
let (first, second) = futures::future::join(first, second).await;
|
||||
|
||||
// both should succeed
|
||||
first.unwrap();
|
||||
second.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn adding_unspecified_addr_resolves_with_first() {
|
||||
const MDNS: bool = false;
|
||||
task::block_on(async move {
|
||||
let node = ipfs::Node::new(MDNS).await;
|
||||
// there is no test in trying to match this with others as ... that would be quite
|
||||
// perilous.
|
||||
node.add_listening_address(libp2p::build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16)))
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn listening_for_multiple_unspecified_addresses() {
|
||||
const MDNS: bool = false;
|
||||
task::block_on(async move {
|
||||
let node = ipfs::Node::new(MDNS).await;
|
||||
// there is no test in trying to match this with others as ... that would be quite
|
||||
// perilous.
|
||||
let target = libp2p::build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16));
|
||||
let first = node.add_listening_address(target.clone());
|
||||
let second = node.add_listening_address(target);
|
||||
|
||||
let (first, second) = futures::future::join(first, second).await;
|
||||
|
||||
// the other should be denied because there is a pending incomplete when trying to listen
|
||||
// on unspecified address
|
||||
assert_eq!(first.is_ok(), second.is_err());
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_listening_address() {
|
||||
const MDNS: bool = false;
|
||||
task::block_on(async move {
|
||||
let node = ipfs::Node::new(MDNS).await;
|
||||
|
||||
let unbound = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16));
|
||||
let first = node.add_listening_address(unbound.clone()).await.unwrap();
|
||||
|
||||
// the book keeping changes from matching the unbound address to the bound one returned
|
||||
// from the future.
|
||||
node.remove_listening_address(unbound.clone())
|
||||
.await
|
||||
.unwrap_err();
|
||||
node.remove_listening_address(first).await.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn remove_listening_address_before_completing() {
|
||||
// TODO: cannot test this before we have a way of getting between the queue used to communicate
|
||||
// with the IpfsFuture (or better yet, construct one ourselves here in the test) to make sure
|
||||
// we can push a IpfsEvent::AddListenerAddress followed by an IpfsEvent::RemoveListenerAddress
|
||||
// "immediatedly".
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user