feat: propagate ipfs to filters, local id query

when query parsing fails, go-ipfs returns 500 and the
v0::ResponseMessage for which this adds a builder. Perhaps we'll need a
custom query parsing filter and an Ser/Deserialize implementation for Cids,
Multiaddrs, PeerIds and whatnot, which luckily might all have FromStr
already.

the current structure in the big handler of id needs refactoring, and a
way to handle the rejections.
This commit is contained in:
Joonas Koivunen 2020-03-12 14:07:02 +02:00 committed by Joonas Koivunen
parent cb664e11f8
commit 9e44c069d3
5 changed files with 243 additions and 52 deletions

View File

@ -65,6 +65,10 @@ fn create(
}
if profiles.len() != 1 || profiles[0] != "test" {
// profiles are expected to be (comma separated) "test" as there are no bootstrap peer
// handling yet. the conformance test cases seem to init `go-ipfs` in this profile where
// it does not have any bootstrap nodes, and multi node tests later call swarm apis to
// dial the nodes together.
return Err(InitializationError::InvalidProfiles(profiles));
}

View File

@ -1,4 +1,3 @@
use serde::Serialize;
use std::num::NonZeroU16;
use std::path::PathBuf;
use structopt::StructOpt;
@ -59,7 +58,7 @@ fn main() {
let config_path = home.join("config");
match opts {
let keypair = match opts {
Options::Init { bits, profile } => {
println!("initializing IPFS node at {:?}", home);
@ -118,6 +117,11 @@ fn main() {
eprintln!("please run: 'ipfs init'");
std::process::exit(1);
}
std::fs::File::open(config_path)
.map_err(config::LoadingError::ConfigurationFileOpening)
.and_then(config::load)
.unwrap()
}
};
@ -129,8 +133,22 @@ fn main() {
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop");
rt.block_on(async move {
let opts: IpfsOptions<ipfs::TestTypes> = IpfsOptions::new(
home.clone().into(),
keypair,
Vec::new());
let (ipfs, task) = UninitializedIpfs::new(opts)
.await
.start()
.await
.expect("Initialization failed");
tokio::spawn(task);
let api_link_file = home.join("api");
let (addr, server) = serve(home, ());
let (addr, server) = serve(&ipfs);
let api_multiaddr = format!("/ip4/{}/tcp/{}", addr.ip(), addr.port());
@ -152,12 +170,13 @@ fn main() {
.await
.map_err(|e| eprintln!("Failed to truncate {:?}: {}", api_link_file, e));
}
ipfs.exit_daemon().await;
});
}
fn serve(
_home: PathBuf,
_options: (),
fn serve<Types: IpfsTypes>(
ipfs: &Ipfs<Types>,
) -> (std::net::SocketAddr, impl std::future::Future<Output = ()>) {
use tokio::stream::StreamExt;
use warp::Filter;
@ -179,7 +198,10 @@ fn serve(
// the http libraries seem to prefer POST method
let api = shutdown
.or(warp::path("id").and(with_ipfs(&ipfs)).and_then(id_query))
.or(warp::path!("id")
.and(with_ipfs(ipfs))
.and(warp::query::<v0::id::Query>())
.and_then(v0::id::identity))
// Placeholder paths
// https://docs.rs/warp/0.2.2/warp/macro.path.html#path-prefixes
.or(warp::path!("add").and_then(not_implemented))
@ -237,43 +259,11 @@ async fn not_implemented() -> Result<impl warp::Reply, std::convert::Infallible>
Ok(warp::http::StatusCode::NOT_IMPLEMENTED)
}
// NOTE: go-ipfs accepts an -f option for format (unsure if same with Accept: request header),
// unsure on what values -f takes, since `go-ipfs` seems to just print the value back? With plain http
// requests the `f` or `format` is ignored. Perhaps it's a cli functionality. This should return a
// json body, which does get pretty formatted by the cli.
//
// FIXME: Reference has argument `arg: PeerId` which does get processed.
//
// https://docs.ipfs.io/reference/api/http/#api-v0-id
async fn id_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
) -> Result<impl warp::Reply, std::convert::Infallible> {
// the ids are from throwaway go-ipfs init -p test instance
let response = IdResponse {
id: "QmdNmxF88uyUzm8T7ps8LnCuZJzPnJvgUJxpKGqAMuxSQE",
public_key: "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC+z3lZB1KQxNtahCLOQFFdsUrQ/XT3XLe/09sODTbGGp5mUjylR6hNQijCHFtYY7DcuAKPeyxNbdcOPmyC85gb1a1UB+nT3DiHPlM3AspnVFXDDv1u
kZZ6Fgfs8amaWAWgx5KBRE49GjaG65+wVqtMwoALPa655bpsvaJX5JEeKe8hb8bLNup0O5Tpl3ThQ+ADXLJGmu/tWFI8SdE10xZY6hQG446B0/sL3f4HWqRbrlYrV8Ac2LyU3ZXynQ0yScqO4pXDDXoKZSI44xQNPuWQA9Y9IBWel/cbzTNjWMxapuyjoT9gmFRi52IFAl0RH9X85jFa6FYRkM+h81AiVjD/AgMB
AAE=",
addresses: vec!["/ip4/127.0.0.1/tcp/8002/ipfs/QmdNmxF88uyUzm8T7ps8LnCuZJzPnJvgUJxpKGqAMuxSQE"],
agent_version: "rust-ipfs/0.0.1",
protocol_version: "ipfs/0.1.0",
};
Ok(warp::reply::json(&response))
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
struct IdResponse {
// PeerId
#[serde(rename = "ID")]
id: &'static str,
// looks like Base64
public_key: &'static str,
// Multiaddrs
addresses: Vec<&'static str>,
// Multiaddr alike <agent_name>/<version>, like rust-ipfs/0.0.1
agent_version: &'static str,
// Multiaddr alike ipfs/0.1.0 ... not sure if there are plans to bump this anytime soon
protocol_version: &'static str,
/// Clones the handle to the filters
fn with_ipfs<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl warp::Filter<Extract = (Ipfs<T>,), Error = std::convert::Infallible> + Clone {
use warp::Filter;
let ipfs = ipfs.clone();
warp::any().map(move || ipfs.clone())
}

View File

@ -1,3 +1,47 @@
pub mod version;
// pub mod id;
use serde::Serialize;
use std::borrow::Cow;
pub mod id;
pub mod swarm;
pub mod version;
/// The common responses apparently returned by the go-ipfs HTTP api on errors.
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct MessageResponse {
message: Cow<'static, str>,
code: usize,
r#type: MessageKind,
}
impl MessageResponse {
fn to_json_reply(&self) -> warp::reply::Json {
warp::reply::json(self)
}
}
#[derive(Debug, Clone, Serialize)]
pub enum MessageKind {
Error,
}
impl MessageKind {
// FIXME: haven't found a spec for these codes yet
pub fn with_code(self, code: usize) -> MessageResponseBuilder {
MessageResponseBuilder(self, code)
}
}
#[derive(Debug, Clone)]
pub struct MessageResponseBuilder(MessageKind, usize);
impl MessageResponseBuilder {
pub fn with_message<S: Into<Cow<'static, str>>>(self, message: S) -> MessageResponse {
let Self(kind, code) = self;
MessageResponse {
message: message.into(),
code,
r#type: kind,
}
}
}

89
http/src/v0/id.rs Normal file
View File

@ -0,0 +1,89 @@
use super::MessageKind;
use ipfs::{Ipfs, IpfsTypes, PeerId};
use serde::{Deserialize, Serialize};
// NOTE: go-ipfs accepts an -f option for format (unsure if same with Accept: request header),
// unsure on what values -f takes, since `go-ipfs` seems to just print the value back? With plain http
// requests the `f` or `format` is ignored. Perhaps it's a cli functionality. This should return a
// json body, which does get pretty formatted by the cli.
//
// FIXME: Reference has argument `arg: PeerId` which does get processed.
//
// https://docs.ipfs.io/reference/api/http/#api-v0-id
pub async fn identity<T: IpfsTypes>(
ipfs: Ipfs<T>,
query: Query,
) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
use multibase::Base::Base64Pad;
use std::str::FromStr;
if let Some(peer_id) = query.arg {
match PeerId::from_str(&peer_id) {
Ok(_) => {
// FIXME: probably find this peer, if a match, use identify protocol
return Ok(Box::new(warp::http::StatusCode::NOT_IMPLEMENTED));
}
Err(_) => {
// FIXME: we need to customize the query deserialization error
return Ok(Box::new(warp::reply::with_status(
MessageKind::Error
.with_code(0)
.with_message("invalid peer id")
.to_json_reply(),
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)));
}
}
}
match ipfs.identity().await {
Ok((public_key, addresses)) => {
let id = public_key.clone().into_peer_id().to_string();
let public_key = Base64Pad.encode(public_key.into_protobuf_encoding());
let response = Response {
id,
public_key,
addresses: addresses.into_iter().map(|addr| addr.to_string()).collect(),
agent_version: "rust-ipfs/0.1.0",
protocol_version: "ipfs/0.1.0",
};
// TODO: investigate how this could be avoided, perhaps by making the ipfs::Error a
// Reject
Ok(Box::new(warp::reply::json(&response)))
}
Err(e) => Ok(Box::new(warp::reply::with_status(
MessageKind::Error
.with_code(0)
.with_message(e.to_string())
.to_json_reply(),
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
))),
}
}
/// Query string of /api/v0/id?arg=peerid&format=notsure
#[derive(Debug, Deserialize)]
pub struct Query {
// the peer id to query
arg: Option<String>,
// this does not seem to be reacted to by go-ipfs
format: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
struct Response {
// PeerId
#[serde(rename = "ID")]
id: String,
// looks like Base64
public_key: String,
// Multiaddrs
addresses: Vec<String>,
// Multiaddr alike <agent_name>/<version>, like rust-ipfs/0.0.1
agent_version: &'static str,
// Multiaddr alike ipfs/0.1.0 ... not sure if there are plans to bump this anytime soon
protocol_version: &'static str,
}

View File

@ -9,10 +9,15 @@ extern crate log;
use async_std::path::PathBuf;
pub use bitswap::Block;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::channel::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
use futures::sink::SinkExt;
pub use libipld::cid::Cid;
use libipld::cid::Codec;
pub use libipld::ipld::Ipld;
pub use libp2p::{identity::Keypair, Multiaddr, PeerId};
pub use libp2p::{
identity::{Keypair, PublicKey},
Multiaddr, PeerId,
};
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
@ -90,10 +95,40 @@ impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
.field("ipfs_log", &self.ipfs_log)
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("keypair", &DebuggableKeypair(&self.keypair))
.finish()
}
}
use std::borrow::Borrow;
#[derive(Clone)]
struct DebuggableKeypair<I: Borrow<Keypair>>(I);
impl<I: Borrow<Keypair>> fmt::Debug for DebuggableKeypair<I> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = match self.borrow() {
Keypair::Ed25519(_) => "Ed25519",
Keypair::Rsa(_) => "Rsa",
Keypair::Secp256k1(_) => "Secp256k1",
};
write!(fmt, "Keypair::{}", kind)
}
}
impl<I: Borrow<Keypair>> Borrow<Keypair> for DebuggableKeypair<I> {
fn borrow(&self) -> &Keypair {
self.0.borrow()
}
}
impl<I: Borrow<Keypair>> DebuggableKeypair<I> {
fn get(&self) -> &Keypair {
self.borrow()
}
}
impl<Types: IpfsTypes> IpfsOptions<Types> {
pub fn new(ipfs_path: PathBuf, keypair: Keypair, bootstrap: Vec<(Multiaddr, PeerId)>) -> Self {
Self {
@ -175,13 +210,16 @@ pub struct Ipfs<Types: IpfsTypes> {
repo: Arc<Repo<Types>>,
dag: IpldDag<Types>,
ipns: Ipns<Types>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
}
/// Events used internally to communicate with the swarm, which is executed in the the background
/// task.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Debug)]
enum IpfsEvent {
/// Request background task to return the listened and external addresses
GetAddresses(OneshotSender<Vec<Multiaddr>>),
Exit,
}
@ -190,6 +228,7 @@ pub struct UninitializedIpfs<Types: IpfsTypes> {
repo: Arc<Repo<Types>>,
dag: IpldDag<Types>,
ipns: Ipns<Types>,
keys: Keypair,
moved_on_init: Option<(Receiver<RepoEvent>, TSwarm<Types>)>,
}
@ -197,6 +236,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
/// Configures a new UninitializedIpfs with from the given options.
pub async fn new(options: IpfsOptions<Types>) -> Self {
let repo_options = RepoOptions::<Types>::from(&options);
let keys = options.secio_key_pair().clone();
let (repo, repo_events) = create_repo(repo_options);
let swarm_options = SwarmOptions::<Types>::from(&options);
let swarm = create_swarm(swarm_options, repo.clone()).await;
@ -207,6 +247,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
repo,
dag,
ipns,
keys,
moved_on_init: Some((repo_events, swarm)),
}
}
@ -234,7 +275,11 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
};
let UninitializedIpfs {
repo, dag, ipns, ..
repo,
dag,
ipns,
keys,
..
} = self;
Ok((
@ -242,6 +287,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
repo,
dag,
ipns,
keys: DebuggableKeypair(keys),
to_task,
},
fut,
@ -304,9 +350,19 @@ impl<Types: IpfsTypes> Ipfs<Types> {
Ok(())
}
pub async fn identity(&self) -> Result<(PublicKey, Vec<Multiaddr>), Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetAddresses(tx))
.await?;
let addresses = rx.await?;
Ok((self.keys.get().public(), addresses))
}
/// Exit daemon.
pub async fn exit_daemon(mut self) {
use futures::sink::SinkExt;
// ignoring the error because it'd mean that the background task would had already been
// dropped
let _ = self.to_task.send(IpfsEvent::Exit).await;
@ -330,6 +386,7 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use futures::Stream;
use libp2p::Swarm;
loop {
// FIXME: this can probably be rewritten as a async { loop { select! { ... } } } once
// libp2p uses std::future ... I couldn't figure out way to wrap it as compat,
@ -347,6 +404,13 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
};
match inner {
IpfsEvent::GetAddresses(ret) => {
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());