from samba.dcerpc import security as sec
from samba import reparse_symlink
import samba.tests.libsmb
+import stat
class ReparsePoints(samba.tests.libsmb.LibsmbTests):
force_smb1=smb1)
return conn
+ def connection_posix(self):
+ share = samba.tests.env_get_var_value("SHARENAME", allow_missing=True)
+ if not share:
+ share = "posix_share"
+ conn = libsmb.Conn(
+ self.server_ip,
+ share,
+ self.lp,
+ self.creds,
+ force_smb1=True)
+ conn.smb1_posix()
+ return conn
+
def clean_file(self, conn, filename):
try:
conn.unlink(filename)
conn.delete_on_close(fd, 1)
conn.close(fd)
+ def test_fifo_reparse(self):
+ """Test FIFO reparse tag"""
+ filename = 'fifo'
+ smb2 = self.connection()
+ smb1 = self.connection_posix()
+
+ self.clean_file(smb2, filename)
+ smb1.mknod(filename, stat.S_IFIFO | 0o755)
+
+ fd = smb2.create(
+ filename,
+ DesiredAccess=sec.SEC_FILE_READ_ATTRIBUTE|sec.SEC_STD_DELETE,
+ CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT,
+ CreateDisposition=libsmb.FILE_OPEN)
+ smb2.delete_on_close(fd, 1)
+
+ info = smb2.qfileinfo(fd, libsmb.FSCC_FILE_ATTRIBUTE_TAG_INFORMATION);
+ self.assertEqual(info['tag'], libsmb.IO_REPARSE_TAG_NFS)
+
+ reparse = smb2.fsctl(fd, libsmb.FSCTL_GET_REPARSE_POINT, b'', 1024)
+ (tag, ) = reparse_symlink.get(reparse)
+ self.assertEqual(tag, 'NFS_SPECFILE_FIFO')
+
if __name__ == '__main__':
import unittest
unittest.main()