type = 'TYPE ' + type
source.voidcmd(type)
target.voidcmd(type)
- sourcehost, sourceport = parse227(source.sendcmd('PASV'))
+ # Don't trust the IPv4 address the source server advertises in its PASV
+ # reply: a malicious source could otherwise point the target's data
+ # connection at an arbitrary host (SSRF). A caller that needs the old
+ # behavior can set trust_server_pasv_ipv4_address on the source FTP
+ # object. See FTP.makepasv(), which applies the same rule.
+ untrusted_host, sourceport = parse227(source.sendcmd('PASV'))
+ if source.trust_server_pasv_ipv4_address:
+ sourcehost = untrusted_host
+ else:
+ sourcehost = source.sock.getpeername()[0]
target.sendport(sourcehost, sourceport)
# RFC 959: the user must "listen" [...] BEFORE sending the
# transfer request.
except ImportError:
ssl = None
-from unittest import TestCase, skipUnless
+from unittest import mock, TestCase, skipUnless
from test import support
from test.support import requires_subprocess
from test.support import threading_helper
ftp.close()
+class TestFtpcpSecurity(TestCase):
+ """ftpcp() must not trust the host a source server advertises in PASV.
+
+ A malicious source server can otherwise redirect the target server's
+ data connection to an arbitrary host:port (SSRF), so ftpcp() uses the
+ source server's actual peer address instead, the same as FTP.makepasv().
+ """
+
+ def _make_pair(self, *, advertised_host, real_host, trust=False):
+ source = mock.Mock(spec=ftplib.FTP)
+ source.trust_server_pasv_ipv4_address = trust
+ source.sock.getpeername.return_value = (real_host, 21)
+ # PASV replies give the host as comma-separated octets, not dotted.
+ advertised = advertised_host.replace('.', ',')
+ source.sendcmd.side_effect = lambda cmd: (
+ f'227 Entering Passive Mode ({advertised},1,2).'
+ if cmd == 'PASV' else '150 ok')
+ target = mock.Mock(spec=ftplib.FTP)
+ target.sendcmd.return_value = '150 ok'
+ return source, target
+
+ def test_ftpcp_ignores_untrusted_pasv_host(self):
+ source, target = self._make_pair(advertised_host='10.0.0.5',
+ real_host='198.51.100.7')
+ ftplib.ftpcp(source, 'a', target, 'b')
+ target.sendport.assert_called_once_with('198.51.100.7', 258)
+
+ def test_ftpcp_trust_server_pasv_ipv4_address(self):
+ source, target = self._make_pair(advertised_host='10.0.0.5',
+ real_host='198.51.100.7', trust=True)
+ ftplib.ftpcp(source, 'a', target, 'b')
+ target.sendport.assert_called_once_with('10.0.0.5', 258)
+
+
class MiscTestCase(TestCase):
def test__all__(self):
not_exported = {