mirror of
https://github.com/OpenNebula/one.git
synced 2025-03-21 14:50:08 +03:00
B #-: Add tproxy fixes: socket leak + nft formatting (#3256)
- Ensure sockets are always closed properly. - Normalize and cleanup nft scripts (to prevent potential segfaults). (cherry picked from commit 8537add3656737a404c56e6e61ec4304ec639b3b)
This commit is contained in:
parent
a47839f1fa
commit
527d1edb78
@ -112,8 +112,10 @@ module VNMMAD
|
||||
|
||||
def glue_peers(task)
|
||||
@local_ep.accept do |client_peer|
|
||||
client_peer_address = client_peer.remote_address.inspect
|
||||
|
||||
$logger.debug(self) do
|
||||
"Accept #{client_peer.remote_address.inspect}"
|
||||
"Accept #{client_peer_address}"
|
||||
end
|
||||
|
||||
begin
|
||||
@ -125,13 +127,15 @@ module VNMMAD
|
||||
end
|
||||
|
||||
remote_ep.connect do |remote_peer|
|
||||
remote_peer_address = remote_peer.remote_address.inspect
|
||||
|
||||
client_stream, remote_stream = Async::IO::Stream.new(client_peer),
|
||||
Async::IO::Stream.new(remote_peer)
|
||||
|
||||
glue_streams(client_stream, remote_stream, task).wait
|
||||
ensure
|
||||
$logger.debug(self) do
|
||||
"Close #{remote_peer.remote_address.inspect}"
|
||||
"Close #{remote_peer_address}"
|
||||
end
|
||||
|
||||
remote_peer.close
|
||||
@ -146,7 +150,7 @@ module VNMMAD
|
||||
end
|
||||
ensure
|
||||
$logger.debug(self) do
|
||||
"Close #{client_peer.remote_address.inspect}"
|
||||
"Close #{client_peer_address}"
|
||||
end
|
||||
|
||||
client_peer.close
|
||||
@ -161,12 +165,26 @@ module VNMMAD
|
||||
stream2.write chunk
|
||||
stream2.flush
|
||||
end
|
||||
rescue StandardError => e
|
||||
$logger.debug(self) do
|
||||
e.message
|
||||
end
|
||||
ensure
|
||||
stream1.close
|
||||
stream2.close
|
||||
end
|
||||
concurrent << subtask.async do
|
||||
while (chunk = stream2.read_partial)
|
||||
stream1.write chunk
|
||||
stream1.flush
|
||||
end
|
||||
rescue StandardError => e
|
||||
$logger.debug(self) do
|
||||
e.message
|
||||
end
|
||||
ensure
|
||||
stream2.close
|
||||
stream1.close
|
||||
end
|
||||
concurrent.each(&:wait)
|
||||
end
|
||||
|
@ -103,19 +103,17 @@ module VNMMAD
|
||||
table bridge one_tproxy {
|
||||
chain ch_<%= brdev %> {
|
||||
type filter hook prerouting priority dstnat; policy accept;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
};
|
||||
flush chain bridge one_tproxy ch_<%= brdev %>;
|
||||
|
||||
table bridge one_tproxy {
|
||||
chain ch_<%= brdev %> {
|
||||
meta ibrname "<%= brdev %>" \\
|
||||
ip daddr 169.254.16.9 \\
|
||||
meta pkttype set host ether daddr set <%= veth_mac %> \\
|
||||
accept
|
||||
}
|
||||
}
|
||||
meta ibrname "<%= brdev %>" \
|
||||
ip daddr 169.254.16.9 \
|
||||
meta pkttype set host ether daddr set <%= veth_mac %> \
|
||||
accept;
|
||||
};
|
||||
};
|
||||
NFT
|
||||
|
||||
# The tproxy processes read their config from "ip one_tproxy ep_*" maps
|
||||
@ -125,18 +123,13 @@ module VNMMAD
|
||||
nft(ERB.new(<<~NFT).result(binding))
|
||||
table ip one_tproxy {
|
||||
map ep_<%= brdev %> {
|
||||
type inet_service : ipv4_addr \\
|
||||
. inet_service;
|
||||
}
|
||||
}
|
||||
|
||||
type inet_service : ipv4_addr . inet_service;
|
||||
};
|
||||
};
|
||||
flush map ip one_tproxy ep_<%= brdev %>;
|
||||
|
||||
<% endpoints.each do |ep| %>
|
||||
add element ip one_tproxy ep_<%= brdev %> {
|
||||
<%= ep[:service_port] %> : <%= ep[:remote_addr] %> \\
|
||||
. <%= ep[:remote_port] %>
|
||||
}
|
||||
add element ip one_tproxy ep_<%= brdev %> \
|
||||
{<%= ep[:service_port] %> : <%= ep[:remote_addr] %> . <%= ep[:remote_port] %>};
|
||||
<% end %>
|
||||
NFT
|
||||
end
|
||||
@ -147,11 +140,9 @@ module VNMMAD
|
||||
nft(ERB.new(<<~NFT).result(binding))
|
||||
table ip one_tproxy {
|
||||
map ep_<%= brdev %> {
|
||||
type inet_service : ipv4_addr \\
|
||||
. inet_service;
|
||||
}
|
||||
}
|
||||
|
||||
type inet_service : ipv4_addr . inet_service;
|
||||
};
|
||||
};
|
||||
delete map ip one_tproxy ep_<%= brdev %>;
|
||||
NFT
|
||||
|
||||
@ -159,9 +150,8 @@ module VNMMAD
|
||||
table bridge one_tproxy {
|
||||
chain ch_<%= brdev %> {
|
||||
type filter hook prerouting priority dstnat; policy accept;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
};
|
||||
delete chain bridge one_tproxy ch_<%= brdev %>;
|
||||
NFT
|
||||
|
||||
@ -234,7 +224,20 @@ module VNMMAD
|
||||
end
|
||||
|
||||
def self.nft(script, **opts)
|
||||
run(:nft, '-f-', **opts, :stdin_data => script)
|
||||
# Normalize nft scripts to avoid potential segfaults..
|
||||
pass1 = script.lines.each_with_object([]) do |line, a|
|
||||
line.sub!(/\\\n$/, ' ') # undo explicit line breaks
|
||||
|
||||
a << line
|
||||
end.join
|
||||
pass2 = pass1.lines.each_with_object([]) do |line, a|
|
||||
next if line =~ /^\s*$/ # ignore empty lines
|
||||
|
||||
line.gsub!(/([^ ])[ ]+/, '\1 ') # remove redundant spaces
|
||||
|
||||
a << line
|
||||
end.join
|
||||
run(:nft, '-f-', **opts, :stdin_data => pass2)
|
||||
end
|
||||
|
||||
def self.run_tproxy(cmd)
|
||||
|
Loading…
x
Reference in New Issue
Block a user