Created
August 16, 2023 09:25
-
-
Save rcoup/5f4b49149ca26084eed4dccfd1b12895 to your computer and use it in GitHub Desktop.
GDAL VSI Python file wrappers and associated IO functions, and tests.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Tests for VSI-File | |
Copyright Koordinates Limited | |
License MIT | |
""" | |
import io | |
import logging | |
import os | |
import shutil | |
import stat | |
import tempfile | |
import uuid | |
import zipfile | |
from collections import namedtuple | |
import pytest | |
from osgeo import gdal | |
from . import vsi | |
LOCALFILE_CONTENT = b"line one\nline two" | |
OTHER_CONTENT = b"some.test.file\n" * 10 | |
TEST_DATA_BUCKET = "vsi-test-data-bucket" | |
NONEXISTANT_BUCKET = "i.do.not.exist.bucket" | |
PRIVATE_BUCKET = "a.private.bucket" # access via credentials | |
NO_ACCESS_BUCKET = "another.private.bucket" # no access | |
WRITE_BUCKET = "test.writing.bucket" # write access for test run | |
@pytest.fixture(autouse=True) | |
def vsireset(): | |
# turn up logging for VSI tests | |
logging.getLogger("gdal").setLevel(logging.DEBUG) | |
try: | |
# clear the Curl cache to avoid weird side-effects | |
gdal.VSICurlClearCache() | |
yield | |
finally: | |
logging.getLogger("gdal").setLevel(logging.INFO) | |
@pytest.fixture() | |
def localfile(): | |
with tempfile.NamedTemporaryFile() as t: | |
t.write(LOCALFILE_CONTENT) | |
t.flush() | |
t.seek(0) | |
yield t | |
@pytest.fixture() | |
def localzip(): | |
with tempfile.NamedTemporaryFile(suffix=".zip") as t: | |
with zipfile.ZipFile(t, "w") as z: | |
z.writestr("file.txt", b"hey there good lookin'\n") | |
yield t | |
def touch_file(fname, times=None, makedirs=False): | |
""" | |
Creates file if it does not exist, and sets mtime on the file to now. | |
If makedirs=True is given, the directory is created also. | |
""" | |
if makedirs: | |
ensure_makedirs(os.path.dirname(fname)) | |
with open(fname, 'a+'): | |
os.utime(fname, times) | |
def test_vsi_open_local(localfile): | |
# Succeed on existing file | |
assert vsi.open(localfile.name, "rb") | |
assert vsi.open(localfile.name, "r") | |
# Fail on non-existing file | |
with pytest.raises(OSError): | |
vsi.open("/no-such-file") | |
with pytest.raises(OSError): | |
vsi.open("/no-such-file", "r+") | |
assert vsi.open(localfile.name, "w") | |
with tempfile.TemporaryDirectory() as tempdir: | |
assert vsi.open(os.path.join(tempdir, "test.file.w"), "w") | |
assert vsi.open(os.path.join(tempdir, "test.file.a"), "a") | |
assert vsi.open(localfile.name, "a") | |
assert vsi.open(localfile.name, "r+") | |
def test_vsi_read_local(localfile): | |
f = vsi.open(localfile.name, mode="rb") | |
assert f.read() == b"line one\nline two" | |
assert f.seek(0) == 0 | |
assert f.read(5) == b"line " | |
assert f.read(5) == b"one\nl" | |
assert f.read() == b"ine two" | |
assert f.read(1) == b"" | |
assert f.read(1) == b"" | |
assert isinstance(f.read(), bytes) | |
def test_vsi_seek(localfile): | |
f = vsi.open(localfile.name, mode="rb") | |
# # Seek relative to start of file | |
assert f.seek(5, 0) == 5 | |
assert f.seek(5, 0) == 5 | |
# # seek relative to current position | |
assert f.seek(10, 1) == 15 | |
# Seek relative to end of file | |
assert f.seek(-5, 2) == 12 | |
def test_vsi_write(localfile): | |
CONTENT = b"aaaaa\x00bbbbb" | |
f = vsi.open(localfile.name, "wb") | |
f.write(CONTENT) | |
f.close() | |
with open(localfile.name, "rb") as f: | |
assert f.read() == CONTENT | |
td = tempfile.mkdtemp() | |
try: | |
path = os.path.join(td, "test.file") | |
with vsi.open(path, "wb") as f: | |
f.write(CONTENT) | |
with open(path, "rb") as f: | |
assert f.read() == CONTENT | |
finally: | |
shutil.rmtree(td) | |
def test_vsi_close(localfile): | |
f = vsi.open(localfile.name) | |
f.close() | |
# Multiple calls to close() should be ignored, not cause an error | |
f.close() | |
# Not sure why this is a ValueError instead of a IOError, but that's | |
# out of our hands, it's thrown by io.RawIOBase | |
with pytest.raises(ValueError): | |
f.read(1) | |
# And we use ValueError in these ones too to match read() | |
with pytest.raises(ValueError): | |
f.seek(1) | |
with pytest.raises(ValueError): | |
f.tell() | |
def test_vsi_exists(localfile): | |
assert vsi.exists(localfile.name) | |
assert not vsi.exists("/blah blah blah") | |
def test_vsi_stat(localfile): | |
st = vsi.stat(localfile.name) | |
assert isinstance(st.st_mtime, int) | |
assert st.st_mode == 0o100600 | |
assert st.st_size == 17 | |
assert vsi.getsize(localfile.name) == 17 | |
assert vsi.getmtime(localfile.name) == int(os.path.getmtime(localfile.name)) | |
def test_vsi_isdir(localfile, localzip): | |
assert not vsi.isdir("/doesnotexist") | |
assert not vsi.isdir("/doesnotexist.zip") | |
assert not vsi.isdir("/vsizip/doesnotexist.zip") | |
assert not vsi.isdir("/vsizip/doesnotexist.zip/file.txt") | |
assert not vsi.isdir(localfile.name) | |
assert vsi.isdir(os.path.dirname(localfile.name)) | |
assert not vsi.isdir(localzip.name) | |
assert vsi.isdir("/vsizip/%s" % localzip.name) | |
assert not vsi.isdir("/vsizip/%s/file.txt" % localzip.name) | |
def test_vsi_isfile(localfile, localzip): | |
assert not vsi.isfile("/doesnotexist") | |
assert not vsi.isfile("/doesnotexist.zip") | |
assert not vsi.isfile("/vsizip/doesnotexist.zip") | |
assert not vsi.isfile("/vsizip/doesnotexist.zip/file.txt") | |
assert vsi.isfile(localfile.name) | |
assert not vsi.isfile(os.path.dirname(localfile.name)) | |
assert vsi.isfile(localzip.name) | |
assert not vsi.isfile("/vsizip/%s" % localzip.name) | |
assert vsi.isfile("/vsizip/%s/file.txt" % localzip.name) | |
def test_vsi_listdir(): | |
with pytest.raises(OSError): | |
vsi.listdir("/doesnotexist") | |
with tempfile.TemporaryDirectory() as tempdir: | |
assert vsi.listdir(tempdir) == [] | |
os.mkdir(os.path.join(tempdir, "foo")) | |
touch_file(os.path.join(tempdir, "foo", "bar")) | |
touch_file(os.path.join(tempdir, "baz")) | |
assert set(vsi.listdir(tempdir)) == {"foo", "baz"} | |
assert vsi.listdir(os.path.join(tempdir, "foo")) == ["bar"] | |
def test_vsi_listdir_unicode(): | |
with tempfile.TemporaryDirectory() as tempdir: | |
touch_file(os.path.join(tempdir, "ಠ_ಠ")) | |
touch_file(os.path.join(tempdir, "bob")) | |
results = vsi.listdir(tempdir) | |
assert set(results) == {"bob", "ಠ_ಠ"} | |
assert isinstance(results[0], str) | |
assert isinstance(results[1], str) | |
def test_vsi_listdir_recursive(): | |
with pytest.raises(OSError): | |
vsi.listdir("/doesnotexist") | |
with tempfile.TemporaryDirectory() as tempdir: | |
assert vsi.listdir_recursive(tempdir) == [] | |
os.mkdir(os.path.join(tempdir, "foo")) | |
touch_file(os.path.join(tempdir, "foo", "bar")) | |
touch_file(os.path.join(tempdir, "baz")) | |
assert set(vsi.listdir_recursive(tempdir)) == {"foo", "foo/bar", "baz"} | |
def test_vsi_walk(): | |
for use_os_walk_if_possible in (True, False): | |
with pytest.raises(OSError): | |
list( | |
vsi.walk( | |
"/doesnotexist", use_os_walk_if_possible=use_os_walk_if_possible | |
) | |
) | |
with tempfile.TemporaryDirectory() as tempdir: | |
assert list( | |
vsi.walk(tempdir, use_os_walk_if_possible=use_os_walk_if_possible) | |
) == [(tempdir, [], [])] | |
touch_file(os.path.join(tempdir, "foo")) | |
os.mkdir(os.path.join(tempdir, "bar")) | |
touch_file(os.path.join(tempdir, "bar", "blah")) | |
assert list( | |
vsi.walk(tempdir, use_os_walk_if_possible=use_os_walk_if_possible) | |
) == [ | |
(tempdir, ["bar"], ["foo"]), | |
(os.path.join(tempdir, "bar"), [], ["blah"]), | |
] | |
def test_vsi_copy(localfile): | |
with tempfile.NamedTemporaryFile() as dest: | |
vsi.copy(localfile, dest) | |
localfile.seek(0) | |
dest.seek(0) | |
assert localfile.read() == dest.read() | |
def test_vsi_copy_max_size_ok(localfile): | |
with tempfile.NamedTemporaryFile() as dest: | |
vsi.copy(localfile, dest, max_size=1000) | |
def test_vsi_copy_max_size_exceeded(localfile): | |
with tempfile.NamedTemporaryFile() as dest: | |
with pytest.raises(IOError): | |
vsi.copy(localfile, dest, max_size=1) | |
def test_vsi_copy_fd(localfile): | |
with tempfile.NamedTemporaryFile() as dest: | |
vsi.copy(localfile, dest) | |
assert not localfile.closed | |
assert not dest.closed | |
with tempfile.TemporaryDirectory() as tempdir: | |
dest = os.path.join(tempdir, "foo") | |
vsi.copy(localfile, dest) | |
assert not localfile.closed | |
with tempfile.NamedTemporaryFile() as dest: | |
vsi.copy(localfile.name, dest) | |
assert not dest.closed | |
def test_vsi_smartcopy_local(localfile): | |
with tempfile.NamedTemporaryFile() as dest: | |
vsi.smart_copy(localfile, dest) | |
localfile.seek(0) | |
dest.seek(0) | |
assert localfile.read() == dest.read() | |
with tempfile.TemporaryDirectory() as tempdir: | |
dest = os.path.join(tempdir, "foo") | |
vsi.smart_copy(localfile.name, dest) | |
with open(localfile.name, "rb") as src_f: | |
with open(dest, "rb") as dest_f: | |
assert src_f.read() == dest_f.read() == LOCALFILE_CONTENT | |
# makes directories | |
dest = os.path.join(tempdir, "bar/bob") | |
vsi.smart_copy(localfile.name, dest) | |
assert os.path.isfile(dest) | |
def test_vsi_smartcopy_fd(localfile): | |
with tempfile.NamedTemporaryFile() as dest: | |
vsi.smart_copy(localfile, dest) | |
assert not localfile.closed | |
assert not dest.closed | |
with tempfile.TemporaryDirectory() as tempdir: | |
dest = os.path.join(tempdir, "foo") | |
vsi.smart_copy(localfile, dest) | |
assert not localfile.closed | |
with tempfile.NamedTemporaryFile() as dest: | |
vsi.smart_copy(localfile.name, dest) | |
assert not dest.closed | |
def test_vsi_remove(): | |
with tempfile.TemporaryDirectory() as tempdir: | |
path = os.path.join(tempdir, "the.file") | |
touch_file(path) | |
vsi.remove(path) | |
assert not os.path.exists(path) | |
with pytest.raises(OSError): | |
vsi.remove(path) | |
WorkBucket = namedtuple( | |
"WorkBucket", ["client", "bucket", "bucket_name", "prefix", "vsi_path"] | |
) | |
@pytest.fixture(scope="session") | |
def s3bucket(): | |
import time | |
import boto3 | |
if boto3.session.Session().get_credentials() is None: | |
pytest.skip("No AWS Credentials") | |
bucket_prefix = f"vsi-tests-{uuid.uuid4()}" | |
vsi_path = f"/vsis3/{WRITE_BUCKET}/{bucket_prefix}" | |
s3 = boto3.resource("s3") | |
bucket = s3.Bucket(WRITE_BUCKET) | |
try: | |
yield WorkBucket( | |
client=s3, | |
bucket=bucket, | |
bucket_name=WRITE_BUCKET, | |
prefix=bucket_prefix, | |
vsi_path=vsi_path, | |
) | |
finally: | |
# clean up bucket | |
objects = bucket.objects.filter(Prefix=bucket_prefix) | |
objects.delete() | |
@pytest.mark.internet | |
def test_vsis3_copy_up(s3bucket, localfile): | |
dest = os.path.join(s3bucket.vsi_path, "test.up") | |
vsi.copy(localfile, dest) | |
with io.BytesIO() as data: | |
s3bucket.bucket.download_fileobj(os.path.join(s3bucket.prefix, "test.up"), data) | |
assert data.getvalue() == LOCALFILE_CONTENT | |
@pytest.mark.internet | |
def test_vsis3_copy_down(s3bucket): | |
s3bucket.bucket.upload_fileobj( | |
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "test.down") | |
) | |
s3src = os.path.join(s3bucket.vsi_path, "test.down") | |
with tempfile.TemporaryDirectory() as tempdir: | |
localdest = os.path.join(tempdir, "test.down.local") | |
vsi.copy(s3src, localdest) | |
with open(localdest, "rb") as f: | |
assert f.read() == OTHER_CONTENT | |
@pytest.mark.internet | |
def test_vsis3_copy_up_perm_denied(localfile): | |
dest = os.path.join(f"/vsis3/{TEST_DATA_BUCKET}/vsi-tests/perm.denied") | |
with pytest.raises(IOError): | |
vsi.copy(localfile, dest) | |
assert not vsi.exists(dest) | |
@pytest.mark.internet | |
def test_vsis3_copy_down_not_found(s3bucket): | |
s3src = os.path.join(f"/vsis3/{TEST_DATA_BUCKET}/vsi-tests/not.found") | |
with tempfile.TemporaryDirectory() as tempdir: | |
localdest = os.path.join(tempdir, "test.down.local") | |
with pytest.raises(OSError): | |
vsi.copy(s3src, localdest) | |
@pytest.mark.internet | |
def test_vsis3_copy_down_perm_denied(s3bucket): | |
# some file that exists but we'll never have access to | |
s3src = os.path.join( | |
f"/vsis3/{NO_ACCESS_BUCKET}/test.file" | |
) | |
with tempfile.TemporaryDirectory() as tempdir: | |
localdest = os.path.join(tempdir, "test.down.local") | |
with pytest.raises( | |
OSError | |
): # comes across as not-found, since the .exists() call fails. | |
vsi.copy(s3src, localdest) | |
@pytest.mark.internet | |
def test_vsis3_smartcopy_up(s3bucket, localfile): | |
dest = os.path.join(s3bucket.vsi_path, "sm.test.up") | |
vsi.smart_copy(localfile, dest) | |
with io.BytesIO() as data: | |
s3bucket.bucket.download_fileobj( | |
os.path.join(s3bucket.prefix, "sm.test.up"), data | |
) | |
assert data.getvalue() == LOCALFILE_CONTENT | |
@pytest.mark.internet | |
def test_vsis3_smartcopy_down(s3bucket): | |
s3bucket.bucket.upload_fileobj( | |
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "sm.test.down") | |
) | |
s3src = os.path.join(s3bucket.vsi_path, "sm.test.down") | |
with tempfile.TemporaryDirectory() as tempdir: | |
localdest = os.path.join(tempdir, "smm.test.down.local") | |
vsi.smart_copy(s3src, localdest) | |
with open(localdest, "rb") as f: | |
assert f.read() == OTHER_CONTENT | |
# makes directories | |
localdest = os.path.join(tempdir, "sm/test/down.local") | |
vsi.smart_copy(s3src, localdest) | |
assert os.path.isfile(localdest) | |
@pytest.mark.internet | |
def test_vsis3_smartcopy_server(s3bucket): | |
s3bucket.bucket.upload_fileobj( | |
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "sm.test.server") | |
) | |
s3src = os.path.join(s3bucket.vsi_path, "sm.test.server") | |
s3dest = os.path.join(s3bucket.vsi_path, "sm.test.server.dest") | |
vsi.smart_copy(s3src, s3dest) | |
with io.BytesIO() as data: | |
s3bucket.bucket.download_fileobj( | |
os.path.join(s3bucket.prefix, "sm.test.server.dest"), data | |
) | |
assert data.getvalue() == OTHER_CONTENT | |
@pytest.mark.internet | |
def test_vsis3_write(s3bucket, localfile): | |
dest = os.path.join(s3bucket.vsi_path, "write.up") | |
f = vsi.open(dest, "wb") | |
f.write(OTHER_CONTENT) | |
f.close() | |
with io.BytesIO() as data: | |
s3bucket.bucket.download_fileobj( | |
os.path.join(s3bucket.prefix, "write.up"), data | |
) | |
assert data.getvalue() == OTHER_CONTENT | |
@pytest.mark.internet | |
def test_vsis3_read(s3bucket): | |
s3bucket.bucket.upload_fileobj( | |
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "read.down") | |
) | |
s3src = os.path.join(s3bucket.vsi_path, "read.down") | |
f = vsi.open(s3src, "rb") | |
assert f.read() == OTHER_CONTENT | |
f.close() | |
@pytest.mark.internet | |
def test_vsis3_remove(s3bucket): | |
dest = os.path.join(s3bucket.vsi_path, "write.up") | |
with vsi.open(dest, "wb") as f: | |
f.write(OTHER_CONTENT) | |
assert vsi.exists(dest) | |
vsi.remove(dest) | |
assert not vsi.exists(dest) | |
with pytest.raises(OSError): | |
vsi.remove(dest) | |
def test_mkdir(): | |
# get current umask value | |
umask = os.umask(0) | |
os.umask(umask) | |
exp_dir_perms = "%o" % (0o777 & ~umask) | |
with tempfile.TemporaryDirectory() as tempdir: | |
path = os.path.join(tempdir, "bob") | |
vsi.mkdir(path) | |
assert os.path.isdir(path) | |
assert "%o" % stat.S_IMODE(os.stat(path).st_mode) == exp_dir_perms | |
with pytest.raises(OSError): | |
vsi.mkdir(path) | |
path = os.path.join(tempdir, "jim") | |
vsi.mkdir(path, 0o700) | |
assert os.path.isdir(path) | |
assert "%o" % stat.S_IMODE(os.stat(path).st_mode) == "700" | |
path = os.path.join(tempdir, "dave") + "/" | |
vsi.mkdir(path) | |
assert os.path.isdir(path) | |
@pytest.mark.internet | |
def test_vsis3_mkdir(s3bucket): | |
path = os.path.join(s3bucket.vsi_path, "mkdir.1") | |
vsi.mkdir(path) | |
assert vsi.isdir(path) | |
with pytest.raises(OSError): | |
vsi.mkdir(path) | |
@pytest.mark.internet | |
def test_makedirs(s3bucket): | |
# get current umask value | |
umask = os.umask(0) | |
os.umask(umask) | |
exp_dir_perms = "%o" % (0o777 & ~umask) | |
with tempfile.TemporaryDirectory() as tempdir: | |
parent_path = os.path.join(tempdir, "bob") | |
path = os.path.join(parent_path, "alex") | |
vsi.makedirs(path) | |
assert os.path.isdir(parent_path) | |
assert os.path.isdir(path) | |
assert "%o" % stat.S_IMODE(os.stat(path).st_mode) == exp_dir_perms | |
assert "%o" % stat.S_IMODE(os.stat(parent_path).st_mode) == exp_dir_perms | |
with pytest.raises(OSError): | |
vsi.makedirs(path) | |
path = os.path.join(parent_path, "jim") | |
vsi.makedirs(path, 0o700) | |
assert os.path.isdir(path) | |
assert "%o" % stat.S_IMODE(os.stat(path).st_mode) == "700" | |
assert "%o" % stat.S_IMODE(os.stat(parent_path).st_mode) == exp_dir_perms | |
path = os.path.join(tempdir, "jim", "owen", "dave") + "/" | |
vsi.makedirs(path) | |
assert os.path.isdir(path) | |
@pytest.mark.internet | |
def test_vsis3_makedirs(s3bucket): | |
path = os.path.join(s3bucket.vsi_path, "mkdir.2", "bob", "jim", "alex") | |
vsi.makedirs(path) | |
assert vsi.isdir(path) | |
assert vsi.isdir(os.path.split(path)[0]) | |
with pytest.raises(OSError): | |
vsi.makedirs(path) | |
def test_rmdir(): | |
with tempfile.TemporaryDirectory() as tempdir: | |
path = os.path.join(tempdir, "bob") | |
vsi.mkdir(path) | |
fn = os.path.join(path, "a.file") | |
with vsi.open(fn, "w") as f: | |
f.write("hi") | |
with pytest.raises(OSError): | |
vsi.rmdir(path) | |
vsi.remove(fn) | |
assert not os.path.exists(fn) | |
vsi.rmdir(path) | |
assert not os.path.exists(path) | |
with pytest.raises(OSError): | |
vsi.rmdir(path) | |
@pytest.mark.internet | |
def test_vsis3_rmdir(s3bucket): | |
path = os.path.join(s3bucket.vsi_path, "rmdir", "bob") | |
vsi.mkdir(path) | |
fn = os.path.join(path, "a.file") | |
with vsi.open(fn, "w") as f: | |
f.write("hi") | |
with pytest.raises(OSError): | |
vsi.rmdir(path) | |
vsi.remove(fn) | |
assert not vsi.exists(fn) | |
vsi.rmdir(path) | |
assert not vsi.exists(path), [ | |
o.key for o in s3bucket.bucket.objects.filter(Prefix=s3bucket.prefix) | |
] | |
with pytest.raises(OSError): | |
vsi.rmdir(path) | |
def test_rmtree(): | |
with tempfile.TemporaryDirectory() as tempdir: | |
path1 = os.path.join(tempdir, "bob") | |
path2 = os.path.join(path1, "jim") | |
vsi.makedirs(path2) | |
with vsi.open(os.path.join(path1, "a.file"), "w") as f: | |
f.write("hi") | |
with vsi.open(os.path.join(path2, "b.file"), "w") as f: | |
f.write("ho") | |
vsi.rmtree(path1) | |
assert not os.path.exists(path1) | |
with pytest.raises(OSError): | |
vsi.rmtree(path1) | |
@pytest.mark.internet | |
def test_vsis3_rmtree(s3bucket): | |
path1 = os.path.join(s3bucket.vsi_path, "rmtree", "bob") | |
path2 = os.path.join(path1, "jim") | |
vsi.makedirs(path2) | |
with vsi.open(os.path.join(path1, "a.file"), "w") as f: | |
f.write("hi") | |
with vsi.open(os.path.join(path2, "b.file"), "w") as f: | |
f.write("ho") | |
vsi.rmtree(path1) | |
assert not vsi.exists(path1), [ | |
o.key for o in s3bucket.bucket.objects.filter(Prefix=s3bucket.prefix) | |
] | |
assert not vsi.exists(path2) | |
assert not vsi.exists(os.path.join(path1, "a.file")) | |
assert not vsi.exists(os.path.join(path2, "b.file")) | |
with pytest.raises(OSError): | |
vsi.rmtree(path1) | |
@pytest.mark.internet | |
def test_vsis3_cache(s3bucket, localfile): | |
path = os.path.join(s3bucket.vsi_path, "cache0") | |
vsi.makedirs(path) | |
assert len(vsi.listdir(path)) == 0 | |
# add something directly to the bucket | |
s3bucket.bucket.upload_fileobj( | |
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "cache0", "a.file") | |
) | |
assert len(vsi.listdir(path)) == 0 | |
# check manual cache-clearing works | |
vsi.clear_cache(path) | |
assert len(vsi.listdir(path)) == 1 | |
# vsi methods themselves should sort the cache out | |
with vsi.open(os.path.join(path, "b.file"), "w") as f: | |
f.write("hey") | |
assert len(vsi.listdir(path)) == 2 | |
# smart copy clears the cache automatically | |
vsi.smart_copy(localfile, os.path.join(path, "c.file")) | |
assert len(vsi.listdir(path)) == 3 | |
@pytest.mark.internet | |
def test_vsi_gdal_behaviour(s3bucket): | |
# VSI functions sometimes throw errors, other times use return codes. yay | |
# https://trac.osgeo.org/gdal/ticket/7145 | |
if (gdal.GetConfigOption("CPL_DEBUG") or "NO").upper() in ("YES", "ON", "TRUE", "1"): | |
pytest.skip("With CPL_DEBUG on we get segfaults ¯\\_(ツ)_/¯") | |
assert gdal.GetUseExceptions() | |
assert gdal.VSIFOpenL("/non-existing", "r") is None | |
assert gdal.VSIGetLastErrorNo() == 0 | |
# VSIFOpenExL has a flag to set error codes. VSIFOpenL doesn't set them. | |
assert gdal.VSIFOpenExL("/non-existing", "r", 1) is None | |
assert gdal.VSIGetLastErrorNo() == 1 | |
assert gdal.VSIGetLastErrorMsg() == "/non-existing: No such file or directory" | |
gdal.VSIErrorReset() | |
assert gdal.VSIStatL("/non-existing") is None | |
assert gdal.VSIGetLastErrorNo() == 0 | |
with tempfile.TemporaryDirectory() as tempdir: | |
path = os.path.join(tempdir, "a.file").encode() | |
fd = gdal.VSIFOpenExL(path, "w+", 1) | |
assert fd is not None | |
try: | |
assert gdal.VSIFWriteL("a" * 1000, 1, 1000, fd) == 1000 | |
assert gdal.VSIFTellL(fd) == 1000 | |
assert gdal.VSIFSeekL(fd, 500, os.SEEK_SET) == 0 | |
assert gdal.VSIFTellL(fd) == 500 | |
assert not gdal.VSIFEofL(fd) | |
finally: | |
assert gdal.VSIFCloseL(fd) == 0 | |
del fd | |
assert gdal.VSIStatL(path) is not None | |
assert gdal.VSIGetLastErrorNo() == 0 | |
fd = gdal.VSIFOpenExL(path, "r", 1) | |
assert fd is not None | |
try: | |
# GDAL returns None if you ask for 0 bytes (!?) | |
assert gdal.VSIFReadL(1, 0, fd) is None | |
data = gdal.VSIFReadL(1, 100, fd) | |
assert len(data) == 100 | |
assert data == b"a" * 100 | |
assert not gdal.VSIFEofL(fd) | |
assert gdal.VSIFTellL(fd) == 100 | |
assert gdal.VSIFSeekL(fd, 995, os.SEEK_SET) == 0 | |
assert not gdal.VSIFEofL(fd) | |
data = gdal.VSIFReadL(1, 2000, fd) | |
assert len(data) == 5 | |
assert data == b"a" * 5 | |
assert gdal.VSIFEofL(fd) | |
assert gdal.VSIFTellL(fd) == 1000 | |
# keep reading past the end of the file | |
data = gdal.VSIFReadL(1, 2000, fd) | |
assert len(data) == 0 | |
assert gdal.VSIFEofL(fd) | |
# seek past the end of the file | |
assert gdal.VSIFSeekL(fd, 9999999, os.SEEK_SET) == 0 | |
assert gdal.VSIGetLastErrorNo() == 0 | |
assert gdal.VSIFTellL(fd) == 9999999 | |
assert gdal.VSIFEofL(fd) == 0 # ¯\_(ツ)_/¯ | |
finally: | |
assert gdal.VSIFCloseL(fd) == 0 | |
del fd | |
# this path is in a private bucket | |
s3path = f"/vsis3/{PRIVATE_BUCKET}/vsi-tests/perm.denied" | |
assert gdal.VSIStatL(s3path) is None | |
fd = gdal.VSIFOpenExL(s3path, "r", 1) | |
assert fd is not None | |
try: | |
data = gdal.VSIFReadL(1, 100, fd) | |
# huh, shouldn't this return a 403/404? | |
assert gdal.VSIGetLastErrorNo() == 0 | |
assert len(data) == 0 | |
finally: | |
assert gdal.VSIFCloseL(fd) == 0 | |
assert gdal.VSIGetLastErrorNo() == 0 | |
# this path doesn't exist | |
s3path = f"/vsis3/{NONEXISTANT_BUCKET}/vsi-tests/nope" | |
assert gdal.VSIStatL(s3path) is None | |
fd = gdal.VSIFOpenExL(s3path, "r", 1) | |
assert fd is not None | |
try: | |
data = gdal.VSIFReadL(1, 100, fd) | |
# huh, shouldn't this return a 403/404? | |
assert gdal.VSIGetLastErrorNo() == 0 | |
assert len(data) == 0 | |
finally: | |
assert gdal.VSIFCloseL(fd) == 0 | |
del fd | |
assert gdal.VSIGetLastErrorNo() == 0 | |
# small s3 writes happen (ie. error) at close time | |
fd = gdal.VSIFOpenExL(s3path, "wb", 1) | |
assert fd is not None | |
assert gdal.VSIFWriteL("a", 1, 1, fd) == 1 | |
assert gdal.VSIGetLastErrorNo() == 0 | |
assert gdal.VSIFFlushL(fd) == 0 # ¯\_(ツ)_/¯ | |
assert gdal.VSIGetLastErrorNo() == 0 | |
with pytest.raises(RuntimeError) as e: | |
gdal.VSIFCloseL(fd) | |
del fd | |
assert gdal.VSIGetLastErrorNo() == 0 # ¯\_(ツ)_/¯ | |
assert "DoSinglePartPUT" in str(e) | |
# this path exists, we can write it | |
s3path = os.path.join(s3bucket.vsi_path, "write.up") | |
fd = gdal.VSIFOpenExL(s3path, "wb", 1) | |
assert fd is not None | |
assert gdal.VSIFWriteL("a" * 100, 1, 100, fd) == 100 | |
assert gdal.VSIGetLastErrorNo() == 0 | |
assert gdal.VSIFFlushL(fd) == 0 | |
assert gdal.VSIGetLastErrorNo() == 0 | |
assert gdal.VSIFCloseL(fd) == 0 | |
assert gdal.VSIGetLastErrorNo() == 0 | |
del fd | |
assert gdal.VSIStatL(s3path) is not None | |
fd = gdal.VSIFOpenExL(s3path, "r", 1) | |
assert fd is not None | |
try: | |
data = gdal.VSIFReadL(1, 50, fd) | |
# huh, shouldn't this return a 404? | |
assert gdal.VSIGetLastErrorNo() == 0 | |
assert len(data) == 50 | |
assert data == b"a" * 50 | |
finally: | |
assert gdal.VSIFCloseL(fd) == 0 | |
del fd | |
assert gdal.VSIGetLastErrorNo() == 0 | |
# mkdir errors on paths with trailing / | |
path = os.path.join(tempdir, "jim") + "/" | |
with pytest.raises(RuntimeError) as e: | |
gdal.Mkdir(path, 0o777) | |
path = os.path.join(tempdir, "jim", "alex", "fred") + "/" | |
with pytest.raises(RuntimeError) as e: | |
gdal.MkdirRecursive(path, 0o777) | |
# mkdirrecursive doesn't error on existing paths | |
with tempfile.TemporaryDirectory() as tempdir: | |
path = os.path.join(tempdir, "bob") | |
assert gdal.MkdirRecursive(path, 0o777) == 0 | |
assert os.path.isdir(path) | |
assert gdal.MkdirRecursive(path, 0o777) == 0 | |
# VSIRmdir has weird behaviour with vsimem | |
assert gdal.MkdirRecursive("/vsimem/x/y/z", 0o777) == 0 | |
assert "x" in gdal.ReadDir("/vsimem") | |
assert gdal.ReadDirRecursive("/vsimem/x") == ["y/", "y/z/"] | |
# err, this shouldn't succeed: | |
assert gdal.Rmdir("/vsimem/x") == 0 | |
# the directory is gone... | |
assert "x" not in (gdal.ReadDir("/vsimem") or []) | |
# but the file still exists?! | |
assert gdal.ReadDirRecursive("/vsimem/x") == ["y/", "y/z/"] | |
assert gdal.ReadDirRecursive("/vsimem/x") == ["y/", "y/z/"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Implements I/O functions using GDAL's VSI functions, so that they can | |
operate on virtual filesystem paths. | |
Tries to mirror the standard python functions as closely as possible, | |
but see the docstrings on the individual functions. | |
Copyright Koordinates Limited | |
License MIT | |
""" | |
from collections import defaultdict, namedtuple | |
from contextlib import contextmanager | |
import tempfile | |
import errno | |
import io | |
import logging | |
import os | |
import re | |
from django.utils.encoding import force_str | |
from osgeo import gdal | |
__all__ = ( | |
"copy", | |
"exists", | |
"getmtime", | |
"getsize", | |
"isdir", | |
"listdir", | |
"listdir_recursive", | |
"open", | |
"remove", | |
"smart_copy", | |
"stat", | |
"VSIFile", | |
"walk", | |
"mkdir", | |
"makedirs", | |
"rmdir", | |
"rmtree", | |
"clear_cache", | |
) | |
_VSI_CURL_PREFIX_RE = re.compile( | |
r"/vsi(s3|curl|gs|az|oss|swift|webhdfs|hdfs)(_streaming)?/" | |
) | |
L = logging.getLogger("vsi") | |
def _vsi_err_wrap(func, *args, **kwargs): | |
""" | |
Wrap VSI_RETVAL functions in exceptions | |
Pass an exception_class kwarg to raise something other than IOError | |
""" | |
exception_class = kwargs.pop("exception_class", IOError) | |
# clear any error code | |
gdal.VSIErrorReset() | |
try: | |
try: | |
ret = func(*args, **kwargs) | |
except RuntimeError as e: | |
raise exception_class(str(e)) | |
if ret != 0: | |
# Note: the error codes from VSIGetLastErrorNo() aren't | |
# really mappable to IOError.errno in any way. | |
# List is here: https://github.com/koordinates/gdal/blob/trunk-kx-build/gdal/port/cpl_vsi_error.h#L45-L55 | |
code = gdal.VSIGetLastErrorNo() | |
msg = gdal.VSIGetLastErrorMsg().decode() | |
raise exception_class("%s [%d]" % (msg, code)) | |
else: | |
return ret | |
except: | |
# clear any error code | |
gdal.VSIErrorReset() | |
raise | |
class VSIFile(io.RawIOBase): | |
def __init__(self, path, mode="r"): | |
# Assuming we're not in 'w' or 'a' mode, the file must exist. | |
# Because stat calls are cached by VSI filesystems, whereas open() isn't, | |
# we can optimise by doing a stat first. | |
if "r" in mode and not exists(path): | |
raise OSError(errno.ENOENT, "No such file or directory: %r" % path) | |
# VSIFOpenExL has a flag to set error codes. VSIFOpenL doesn't set them. | |
fd = gdal.VSIFOpenExL(str(path), mode, 1) | |
if fd is None: | |
code = gdal.VSIGetLastErrorNo() | |
msg = gdal.VSIGetLastErrorMsg() or "Failed to open file: %s" % path | |
# Note: the error codes from VSIGetLastErrorNo() aren't | |
# really mappable to IOError.errno in any way. | |
# List is here: https://github.com/koordinates/gdal/blob/trunk-kx-build/gdal/port/cpl_vsi_error.h#L45-L55 | |
gdal.VSIErrorReset() | |
raise OSError("%s [%d]" % (msg, code)) | |
self._fd = fd | |
self._closed = False | |
self.mode = mode | |
# Some methods to handle closing sanely. Weird that RawIOBase doesn't handle this for us (?) | |
def _check_open(self): | |
if self._closed: | |
# Maybe should be IOError? But RawIOBase.read() throws a ValueError, | |
# so we use ValueError here too to be consistent. | |
raise ValueError("I/O operation on closed file") | |
def close(self): | |
if not self._closed: | |
try: | |
_vsi_err_wrap(gdal.VSIFCloseL, self._fd) | |
finally: | |
# if VSIFCloseL() errors the fd is broken, we can't call it again | |
self._closed = True | |
def flush(self): | |
self._check_open() | |
_vsi_err_wrap(gdal.VSIFFlushL, self._fd) | |
@property | |
def closed(self): | |
return self._closed | |
def readable(self): | |
return True | |
def seek(self, offset, whence=os.SEEK_SET): | |
self._check_open() | |
_vsi_err_wrap(gdal.VSIFSeekL, self._fd, offset, whence) | |
# Have to return the new absolute position (!) | |
try: | |
return gdal.VSIFTellL(self._fd) | |
except RuntimeError as e: | |
raise OSError(str(e)) | |
def seekable(self): | |
return True | |
def tell(self): | |
self._check_open() | |
try: | |
return gdal.VSIFTellL(self._fd) | |
except RuntimeError as e: | |
raise OSError(str(e)) | |
def readinto(self, b): | |
""" | |
Read into an existing writable buffer (memoryview, probably) | |
This is the underlying implementation of read(), read1(), readall(), readlines(), etc. | |
""" | |
self._check_open() | |
n = len(b) | |
if n == 0: | |
# GDAL returns None if you ask for 0 bytes (!?) | |
# Luckily, reading 0 bytes is pretty easy! | |
return 0 | |
try: | |
bytez = gdal.VSIFReadL(1, n, self._fd) | |
except RuntimeError as e: | |
raise OSError(str(e)) | |
num_bytes = len(bytez) | |
b[:num_bytes] = bytez | |
return num_bytes | |
def writable(self): | |
return "w" in self.mode or "a" in self.mode or "+" in self.mode | |
def write(self, b): | |
""" | |
Write b to the underlying raw stream, and return the number of bytes written. | |
The object b should be an array of bytes, either bytes, bytearray, or memoryview. | |
The return value can be less than len(b), depending on specifics of the underlying raw stream, | |
and especially if it is in non-blocking mode. | |
None is returned if the raw stream is set not to block and no single byte could be readily | |
written to it. | |
The caller may release or mutate b after this method returns, | |
so the implementation should only access b during the method call. | |
""" | |
self._check_open() | |
n = len(b) | |
try: | |
return gdal.VSIFWriteL(b.tobytes(), 1, n, self._fd) | |
except RuntimeError as e: | |
raise OSError(str(e)) | |
def open(filename, mode="r", buffering=None, encoding=None, errors=None, newline=None): | |
""" | |
Open a file using gdal's VSI functions. | |
Returns a file-like object. | |
Drop-in replacement for __builtins__.open() | |
""" | |
# Much of this copied from https://www.python.org/dev/peps/pep-3116/#the-open-built-in-function | |
assert isinstance(filename, (str, int, os.PathLike)), ( | |
"filename: Expected Path, str, or int, got: %r" % filename | |
) | |
assert isinstance(mode, str), "mode: Expected str, got: %r" % mode | |
assert buffering is None or isinstance(buffering, int), ( | |
"buffering: Expected int, got: %r" % buffering | |
) | |
assert encoding is None or isinstance(encoding, str), ( | |
"encoding: Expected str, got: %r" % encoding | |
) | |
assert newline in (None, "", "\n", "\r", "\r\n"), ( | |
"newline: Invalid newline char: %r" % newline | |
) | |
modes = set(mode) | |
if modes - set("arwb+t") or len(mode) > len(modes): | |
raise ValueError("invalid mode: %r" % mode) | |
reading = "r" in modes | |
writing = "w" in modes | |
binary = "b" in modes | |
appending = "a" in modes | |
updating = "+" in modes | |
text = "t" in modes or not binary | |
if text and binary: | |
raise ValueError("can't have text and binary mode at once") | |
if reading + writing + appending > 1: | |
raise ValueError("can't have read/write/append mode at once") | |
if not (reading or writing or appending): | |
raise ValueError("must have exactly one of read/write/append mode") | |
if binary and encoding is not None: | |
raise ValueError("binary modes doesn't take an encoding arg") | |
if binary and errors is not None: | |
raise ValueError("binary modes doesn't take an errors arg") | |
if binary and newline is not None: | |
raise ValueError("binary modes doesn't take a newline arg") | |
raw = VSIFile(filename, mode=mode) | |
line_buffering = buffering == 1 or buffering is None and raw.isatty() | |
if line_buffering or buffering is None: | |
buffering = 8 * 1024 # International standard buffer size | |
# XXX Try setting it to fstat().st_blksize | |
if buffering < 0: | |
raise ValueError("invalid buffering size") | |
if buffering == 0: | |
if binary: | |
return raw | |
raise ValueError("can't have unbuffered text I/O") | |
if updating: | |
buffered = io.BufferedRandom(raw, buffering) | |
elif writing or appending: | |
buffered = io.BufferedWriter(raw, buffering) | |
else: | |
buffered = io.BufferedReader(raw, buffering) | |
if binary: | |
return buffered | |
return io.TextIOWrapper(buffered, encoding, errors, newline, line_buffering) | |
def _copyfileobj(fsrc, fdst, length=16 * 1024, max_size=None): | |
""" | |
Like shutil.copyfileobj, but takes an extra max_size arg. | |
If max_size is given, the copy will be aborted (IOError) | |
if the size reaches that number of bytes. | |
The dest file will be partially written though. | |
""" | |
ptr = 0 | |
while 1: | |
buf = fsrc.read(length) | |
if not buf: | |
break | |
ptr += len(buf) | |
fdst.write(buf) | |
if max_size is not None and ptr >= max_size: | |
raise OSError(f"Copy exceeded {max_size} bytes") | |
def copy(src, dest, max_size=None, buffering=1 * 1024 * 1024): | |
""" | |
Copy a file. | |
src and dest can be path strings or open file-like objects (eg. from vsi.open()) | |
If paths, the files are accessed using GDAL's VSI functions, so they can be on virtual/remote | |
filesystems. For S3 objects, you might want to use smart_copy() for additional speedups. | |
If src/dest are paths, the file will be opened & closed by this function. If they're file-like | |
objects the caller will need to close them. Note that some VSI functions don't do much work | |
(or error) until close() is called, so don't assume the dest file is ready on return from here. | |
If max_size is given, the copy will be aborted if the size reaches that number of bytes. The | |
dest file will still be written to, though. | |
""" | |
if isinstance(src, (str, os.PathLike)): | |
# VSI open() | |
fsrc = open(src, "rb") | |
else: | |
fsrc = src | |
try: | |
if isinstance(dest, (str, os.PathLike)): | |
# VSI open() | |
fdest = open(dest, "wb") | |
else: | |
fdest = dest | |
try: | |
_copyfileobj(fsrc, fdest, length=buffering, max_size=max_size) | |
# Debatable whether this is the caller's responsibility or not, | |
# but it's a gotcha I've hit *several* times while testing so it | |
# seems reasonable to do it here and save everyone some grief. | |
fdest.flush() | |
finally: | |
if isinstance(dest, (str, os.PathLike)): | |
fdest.close() | |
finally: | |
if isinstance(src, (str, os.PathLike)): | |
fsrc.close() | |
def _readdir(gdal_readdir, path): | |
listing = gdal_readdir(path) | |
if listing is None: | |
if isdir(path): | |
return [] | |
# Probably? No actual error code exposed AFAICT | |
raise OSError(errno.ENOENT, "No such directory: %r" % path) | |
return [ | |
# GDAL returns a *mixture* of bytes and unicode. | |
# force_str() decodes only as required. | |
force_str( | |
# GDAL lists directories with a trailing slash sometimes. Strip that | |
filename.rstrip("/") | |
) | |
for filename in listing | |
# Never include `.` and `..`. Different VSI drivers are quite inconsistent | |
# about whether they return these paths or not. | |
# But os.listdir() doesn't, so we don't either. | |
if filename not in {".", ".."} | |
] | |
def listdir(path): | |
""" | |
Lists files and directories in the given directory path. | |
Returns a list of filenames. | |
Raises OSError if the given directory doesn't exist. | |
POSIX entries '.' and '..' are not included in the returned value. | |
""" | |
return _readdir(gdal.ReadDir, path) | |
def listdir_recursive(path): | |
""" | |
Read a directory listing *including* all files in subfolders. | |
No direct equivalent in stdlib, but you could achieve a similar thing with a | |
thinnish wrapper around os.walk(). | |
Returns a list of paths relative to the given directory path. | |
Raises OSError if the given directory doesn't exist. | |
POSIX entries '.' and '..' are not included in the returned value. | |
""" | |
try: | |
return _readdir(gdal.ReadDirRecursive, path) | |
except OSError: | |
# HACK workaround: ReadDirRecursive returns None for an empty dir, | |
# whereas ReadDir doesn't. | |
# TODO: ticket this. maybe related to: | |
# * https://github.com/OSGeo/gdal/issues/1739 | |
# * https://trac.osgeo.org/gdal/ticket/7135 | |
if listdir(path) == []: | |
return [] | |
raise | |
def walk(root_dir, topdown=True, onerror=None, use_os_walk_if_possible=True): | |
""" | |
Generator. An implementation of os.walk on top of GDAL's VSI functions. | |
This calls listdir_recursive() but organises the results so that they | |
look like the results of os.walk(). | |
Yields 3-tuples: (path, dirs, files) | |
This has some differences from os.walk(): | |
* modifying `dirs` in place has no effect on future iterations | |
* errors from listdir_recursive and stat are not ignored by default. (see below!) | |
* always follows symlinks (in os.walk the default is *not* to follow symlinks). | |
Note that this can have nasty results if a symlink links to its parent dir. | |
If use_os_walk_if_possible (the default), then this may actually just | |
use os.walk() for improved performance, if the root filesystem | |
isn't actually a GDAL virtual filesystem. | |
NOTE: the `onerror` callback is a bit tricky: | |
* by default, all errors will be raised. This is *different* than os.walk() behaviour. | |
* If onerror is given: | |
* it will be ignored for VSI filesystems - errors are always raised. | |
* it will be passed to os.walk() for normal filesystems. | |
""" | |
if use_os_walk_if_possible and not root_dir.startswith("/vsi"): | |
if onerror is None: | |
# Raise all errors by default. (ie, same as for VSI filesystems) | |
def onerror(exc): | |
raise exc | |
for p, dirs, files in os.walk( | |
root_dir, topdown=topdown, followlinks=True, onerror=onerror | |
): | |
# Ensure that modifying dirs doesn't affect the next iteration. | |
dirs = dirs[:] | |
yield p, dirs, files | |
return | |
# GDAL treats 'foo' and 'foo/' slightly differently. | |
# If you *don't* put a trailing slash on, listdir_recursive() | |
# will yield an entry for *both* the trailing-slash and the non-trailing-slash | |
# versions of the root dir. | |
relative_paths = listdir_recursive(os.path.join(root_dir, "")) | |
dirs = defaultdict(list) | |
files = defaultdict(list) | |
all_dir_paths = {root_dir} | |
# We're assuming that isdir() calls here have been cached by GDAL, so shouldn't be too expensive. | |
for rel_path in relative_paths: | |
abs_path = os.path.join(root_dir, rel_path) | |
parent_dir, basename = os.path.split(abs_path) | |
all_dir_paths.add(parent_dir) | |
if isdir(abs_path): | |
dirs[parent_dir].append(basename) | |
all_dir_paths.add(abs_path) | |
else: | |
files[parent_dir].append(basename) | |
# Yield an entry for *every* directory, including the root, including empty ones. | |
for p in sorted(all_dir_paths, reverse=not topdown): | |
yield p, dirs[p], files[p] | |
VSIStatResult = namedtuple("VSIStatResult", ("st_mode", "st_mtime", "st_size")) | |
def stat(path, _flags=0): | |
""" | |
Stats the given VSI path. Returns an object which can be used directly for stat results, | |
The return value is an object whose attributes correspond to the members of the stat structure. | |
(i.e. https://docs.python.org/2/library/os.html#os.stat ) | |
HOWEVER only basic information is available. Other results are not implemented | |
and will cause errors when accessed. | |
Currently implemented: st_mode, st_mtime, st_size. | |
""" | |
s = gdal.VSIStatL(path, _flags) | |
if s is None: | |
raise OSError(errno.ENOENT, "No such file or directory: %r" % path) | |
return VSIStatResult(s.mode, s.mtime, s.size) | |
def exists(path): | |
""" | |
Return True if path refers to an existing path. | |
This function may return False if permission is not granted to execute os.stat() on the requested file, even if the path physically exists. | |
""" | |
return gdal.VSIStatL(str(path), gdal.VSI_STAT_EXISTS_FLAG) is not None | |
def isdir(path): | |
""" | |
Return True if path is an existing directory. | |
""" | |
if isinstance(path, os.PathLike): | |
path = str(path) | |
if path.startswith("/vsizip") and path.rstrip("/").lower().endswith(".zip"): | |
# HACK: workaround https://github.com/OSGeo/gdal/issues/1739 | |
return exists(path) | |
st = gdal.VSIStatL(path, (gdal.VSI_STAT_EXISTS_FLAG | gdal.VSI_STAT_NATURE_FLAG)) | |
return (st is not None) and bool(st.IsDirectory()) | |
def isfile(path): | |
""" | |
Returns True if path is an existing non-directory. | |
""" | |
if isinstance(path, os.PathLike): | |
path = str(path) | |
if path.startswith("/vsizip") and path.rstrip("/").lower().endswith(".zip"): | |
# HACK: workaround https://github.com/OSGeo/gdal/issues/1739 | |
return False | |
st = gdal.VSIStatL(path, (gdal.VSI_STAT_EXISTS_FLAG | gdal.VSI_STAT_NATURE_FLAG)) | |
return (st is not None) and not st.IsDirectory() | |
def getsize(path): | |
""" | |
Return the size, in bytes, of path. | |
Raise OSError if the file does not exist or is inaccessible. | |
""" | |
return stat(path, _flags=gdal.VSI_STAT_SIZE_FLAG).st_size | |
def getmtime(path): | |
""" | |
Return the time of last modification of path. | |
The return value is a number giving the number of seconds since the epoch (see the time module). | |
NOTE: this may not work on various non-local filesystems | |
Raise OSError if the file does not exist or is inaccessible. | |
""" | |
return stat(path).st_mtime | |
def smart_copy( | |
src, dest, max_size=None, buffering=1 * 1024 * 1024, s3_client=None, dir_mode=0o777 | |
): | |
""" | |
Similar to copy(), but for /vsis3/ file paths uses boto for: | |
- multi-threaded/multi-part S3 uploads | |
- multi-threaded/multi-part S3 downloads | |
- server-side S3 copies | |
Falls back to copy() which uses VSI | |
Also uses VSI for chained (eg. /vsizip/vsis3/...) paths. | |
if src & dest are file-like objects, treats them as local(vsi) | |
If src/dest are paths, the file will be opened & closed by this function. If they're file-like | |
objects the caller will need to close them^. Note that some VSI functions don't do much work | |
(or error) until close() is called, so don't assume the dest file is ready on return from here. | |
If dest is a path, will call makedirs() to recursively create the directory hierarchy. | |
^ Boto has a bug where it may close source files by itself when uploading: https://github.com/boto/s3transfer/issues/80 | |
""" | |
import boto3 | |
re_s3key = r"^/vsis3/(?P<bucket>[^/]+)/(?P<key>.+)$" | |
m_src = re.match(re_s3key, src) if isinstance(src, str) else False | |
m_dest = re.match(re_s3key, dest) if isinstance(dest, str) else False | |
if m_src and m_dest: | |
# S3 -> S3 copy | |
s3_client = s3_client or boto3.client("s3") | |
copy_source = {"Bucket": m_src.group("bucket"), "Key": m_src.group("key")} | |
s3_client.copy(copy_source, m_dest.group("bucket"), m_dest.group("key")) | |
clear_cache(dest) | |
elif m_src: | |
# S3 -> local/vsi | |
s3_client = s3_client or boto3.client("s3") | |
if isinstance(dest, str): | |
dest_dir = os.path.split(dest)[0] | |
if dest_dir and not os.path.isdir(dest_dir): | |
makedirs(dest_dir, mode=dir_mode) | |
dest = open(dest, mode="wb", buffering=buffering) | |
try: | |
s3_client.download_fileobj(m_src.group("bucket"), m_src.group("key"), dest) | |
dest.flush() | |
finally: | |
if isinstance(dest, str): | |
dest.close() | |
elif m_dest: | |
# local/vsi -> S3 | |
s3_client = s3_client or boto3.client("s3") | |
if isinstance(src, str): | |
src = open(src, mode="rb", buffering=buffering) | |
try: | |
s3_client.upload_fileobj(src, m_dest.group("bucket"), m_dest.group("key")) | |
clear_cache(dest) | |
finally: | |
if isinstance(src, str): | |
if not src.closed: # https://github.com/boto/s3transfer/issues/80 | |
src.close() | |
else: | |
# fallback to vsi -> vsi | |
if isinstance(dest, str): | |
dest_dir = os.path.split(dest)[0] | |
if dest_dir and not os.path.isdir(dest_dir): | |
makedirs(dest_dir, mode=dir_mode) | |
dest = open(dest, mode="wb", buffering=buffering) | |
return copy(src, dest, max_size=max_size, buffering=buffering) | |
@contextmanager | |
def smart_download_to_local_path(path): | |
""" | |
Contextmanager. | |
Given a path, makes sure it's a local path and yields it. | |
If it's a VSI path, this copies it to a local temp file and yields the local path. | |
The local temp file is removed when the context exits. | |
""" | |
if path.startswith("/vsi"): | |
with tempfile.NamedTemporaryFile() as f: | |
smart_copy(path, f) | |
yield f.name | |
else: | |
yield path | |
def remove(path): | |
"""Remove (delete) the file path.""" | |
_vsi_err_wrap(gdal.Unlink, str(path), exception_class=OSError) | |
def mkdir(path, mode=0o777): | |
""" | |
Create a directory named path with numeric mode mode. | |
The default mode is 0777 (octal). If the directory already exists, OSError is raised. | |
""" | |
path = os.path.abspath(path) # strips trailing slashes | |
if exists(path): | |
raise OSError(17, "File exists: '%s'" % path) | |
_vsi_err_wrap(gdal.Mkdir, path, mode, exception_class=OSError) | |
def makedirs(path, mode=0o777): | |
""" | |
Recursive directory creation function. Like mkdir(), but makes all intermediate-level | |
directories needed to contain the leaf directory. Raises an error exception if the leaf | |
directory already exists or cannot be created. The default mode is 0777 (octal). | |
""" | |
path = os.path.abspath(path) # strips trailing slashes | |
if exists(path): | |
raise OSError(17, "File exists: '%s'" % path) | |
_vsi_err_wrap(gdal.MkdirRecursive, path, mode, exception_class=OSError) | |
def rmdir(path): | |
""" | |
Remove (delete) the directory path. Only works when the directory is empty, otherwise, OSError is raised. | |
In order to remove whole directory trees, rmtree() can be used. | |
Note: rmdir() has odd behaviour on /vsimem/, sometimes it'll error and remove anyway, although the deeper | |
paths stay present. | |
""" | |
if not isdir(path): | |
raise OSError("Path isn't a directory: %s" % path) | |
try: | |
_vsi_err_wrap(gdal.Rmdir, path, exception_class=OSError) | |
finally: | |
# otherwise weird stuff happens | |
clear_cache(path) | |
def rmtree(path, ignore_errors=True): | |
""" | |
Delete an entire directory tree; path must point to a directory (but not a symbolic link to a directory). | |
If ignore_errors is true, errors resulting from failed removals will be ignored. This is the default | |
because RmdirRecursive() seems to raise errors for no reason. | |
Note: rmtree() has odd behaviour on /vsimem/, sometimes it'll error and remove anyway, although the deeper | |
paths stay present. | |
""" | |
if not isdir(path): | |
raise OSError("Path isn't a directory: %s" % path) | |
try: | |
_vsi_err_wrap(gdal.RmdirRecursive, path, exception_class=OSError) | |
except Exception: | |
if not ignore_errors: | |
raise | |
finally: | |
# otherwise weird stuff happens | |
clear_cache(path) | |
def clear_cache(path): | |
if _VSI_CURL_PREFIX_RE.search(path): | |
# TODO: Use VSICurlPartialClearCache() when it's in the Python bindings | |
L.debug("clear_cache") | |
gdal.VSICurlClearCache() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment