import os
from pathlib import Path
import contextlib
from zipfile import ZipFile, ZIP_DEFLATED

import pytest

import pyogrio
import pyogrio.raw
from pyogrio.util import vsi_path, get_vsi_path_or_buffer

try:
    import geopandas  # NOQA

    has_geopandas = True
except ImportError:
    has_geopandas = False


@contextlib.contextmanager
def change_cwd(path):
    curdir = os.getcwd()
    os.chdir(str(path))
    try:
        yield
    finally:
        os.chdir(curdir)


@pytest.mark.parametrize(
    "path, expected",
    [
        # local file paths that should be passed through as is
        ("data.gpkg", "data.gpkg"),
        ("/home/user/data.gpkg", "/home/user/data.gpkg"),
        (r"C:\User\Documents\data.gpkg", r"C:\User\Documents\data.gpkg"),
        ("file:///home/user/data.gpkg", "/home/user/data.gpkg"),
        ("/home/folder # with hash/data.gpkg", "/home/folder # with hash/data.gpkg"),
        # cloud URIs
        ("https://testing/data.gpkg", "/vsicurl/https://testing/data.gpkg"),
        ("s3://testing/data.gpkg", "/vsis3/testing/data.gpkg"),
        ("gs://testing/data.gpkg", "/vsigs/testing/data.gpkg"),
        ("az://testing/data.gpkg", "/vsiaz/testing/data.gpkg"),
        ("adl://testing/data.gpkg", "/vsiadls/testing/data.gpkg"),
        ("adls://testing/data.gpkg", "/vsiadls/testing/data.gpkg"),
        ("hdfs://testing/data.gpkg", "/vsihdfs/testing/data.gpkg"),
        ("webhdfs://testing/data.gpkg", "/vsiwebhdfs/testing/data.gpkg"),
        # archives
        ("zip://data.zip", "/vsizip/data.zip"),
        ("tar://data.tar", "/vsitar/data.tar"),
        ("gzip://data.gz", "/vsigzip/data.gz"),
        ("tar://./my.tar!my.geojson", "/vsitar/./my.tar/my.geojson"),
        (
            "zip://home/data/shapefile.zip!layer.shp",
            "/vsizip/home/data/shapefile.zip/layer.shp",
        ),
        # combined schemes
        ("zip+s3://testing/shapefile.zip", "/vsizip/vsis3/testing/shapefile.zip"),
        (
            "zip+https://s3.amazonaws.com/testing/shapefile.zip",
            "/vsizip/vsicurl/https://s3.amazonaws.com/testing/shapefile.zip",
        ),
        # auto-prefix zip files
        ("test.zip", "/vsizip/test.zip"),
        ("/a/b/test.zip", "/vsizip//a/b/test.zip"),
        ("a/b/test.zip", "/vsizip/a/b/test.zip"),
        # archives using ! notation should be prefixed by vsizip
        ("test.zip!item.shp", "/vsizip/test.zip/item.shp"),
        ("test.zip!/a/b/item.shp", "/vsizip/test.zip/a/b/item.shp"),
        ("test.zip!a/b/item.shp", "/vsizip/test.zip/a/b/item.shp"),
        ("/vsizip/test.zip/a/b/item.shp", "/vsizip/test.zip/a/b/item.shp"),
        ("zip:///test.zip/a/b/item.shp", "/vsizip//test.zip/a/b/item.shp"),
        # auto-prefix remote zip files
        (
            "https://s3.amazonaws.com/testing/test.zip",
            "/vsizip/vsicurl/https://s3.amazonaws.com/testing/test.zip",
        ),
        (
            "https://s3.amazonaws.com/testing/test.zip!/a/b/item.shp",
            "/vsizip/vsicurl/https://s3.amazonaws.com/testing/test.zip/a/b/item.shp",
        ),
        ("s3://testing/test.zip", "/vsizip/vsis3/testing/test.zip"),
        (
            "s3://testing/test.zip!a/b/item.shp",
            "/vsizip/vsis3/testing/test.zip/a/b/item.shp",
        ),
    ],
)
def test_vsi_path(path, expected):
    assert vsi_path(path) == expected


def test_vsi_path_unknown():
    # unrecognized URI gets passed through as is
    assert vsi_path("s4://test/data.geojson") == "s4://test/data.geojson"


def test_vsi_handling_read_functions(naturalearth_lowres_vsi):
    # test that all different read entry points have the path handling
    # (a zip:// path would otherwise fail)
    path, _ = naturalearth_lowres_vsi
    path = "zip://" + str(path)

    result = pyogrio.raw.read(path)
    assert len(result[2]) == 177

    result = pyogrio.read_info(path)
    assert result["features"] == 177

    result = pyogrio.read_bounds(path)
    assert len(result[0]) == 177


@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_vsi_handling_read_dataframe(naturalearth_lowres_vsi):
    path, _ = naturalearth_lowres_vsi
    path = "zip://" + str(path)

    result = pyogrio.read_dataframe(path)
    assert len(result) == 177


@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_path_absolute(data_dir):
    # pathlib path
    path = data_dir / "naturalearth_lowres/naturalearth_lowres.shp"
    df = pyogrio.read_dataframe(path)
    assert len(df) == 177

    # str path
    df = pyogrio.read_dataframe(str(path))
    assert len(df) == 177


def test_path_relative(data_dir):
    path = "naturalearth_lowres/naturalearth_lowres.shp"

    with change_cwd(data_dir):
        result = pyogrio.raw.read(path)
        assert len(result[2]) == 177

        result = pyogrio.read_info(path)
        assert result["features"] == 177

        result = pyogrio.read_bounds(path)
        assert len(result[0]) == 177


@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_path_relative_dataframe(data_dir):
    with change_cwd(data_dir):
        df = pyogrio.read_dataframe("naturalearth_lowres/naturalearth_lowres.shp")
        assert len(df) == 177


def test_uri_local_file(data_dir):
    path = "file://" + str(data_dir / "naturalearth_lowres/naturalearth_lowres.shp")
    result = pyogrio.raw.read(path)
    assert len(result[2]) == 177

    result = pyogrio.read_info(path)
    assert result["features"] == 177

    result = pyogrio.read_bounds(path)
    assert len(result[0]) == 177


@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_uri_local_file_dataframe(data_dir):
    uri = "file://" + str(data_dir / "naturalearth_lowres/naturalearth_lowres.shp")
    df = pyogrio.read_dataframe(uri)
    assert len(df) == 177


def test_zip_path(naturalearth_lowres_vsi):
    path, path_vsi = naturalearth_lowres_vsi
    path_zip = "zip://" + str(path)

    # absolute zip path
    result = pyogrio.raw.read(path_zip)
    assert len(result[2]) == 177

    result = pyogrio.read_info(path_zip)
    assert result["features"] == 177

    result = pyogrio.read_bounds(path_zip)
    assert len(result[0]) == 177

    # absolute vsizip path
    result = pyogrio.raw.read(path_vsi)
    assert len(result[2]) == 177

    result = pyogrio.read_info(path_vsi)
    assert result["features"] == 177

    result = pyogrio.read_bounds(path_vsi)
    assert len(result[0]) == 177

    # relative zip path
    relative_path = "zip://" + path.name
    with change_cwd(path.parent):
        result = pyogrio.raw.read(relative_path)
        assert len(result[2]) == 177

        result = pyogrio.read_info(relative_path)
        assert result["features"] == 177

        result = pyogrio.read_bounds(relative_path)
        assert len(result[0]) == 177


@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_zip_path_dataframe(naturalearth_lowres_vsi):
    path, path_vsi = naturalearth_lowres_vsi
    path_zip = "zip://" + str(path)

    # absolute zip path
    df = pyogrio.read_dataframe(path_zip)
    assert len(df) == 177

    # absolute vsizip path
    df = pyogrio.read_dataframe(path_vsi)
    assert len(df) == 177

    # relative zip path
    with change_cwd(path.parent):
        df = pyogrio.read_dataframe("zip://" + path.name)
        assert len(df) == 177


@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_detect_zip_path(tmp_path, naturalearth_lowres):
    # create a zipfile with 2 shapefiles in a set of subdirectories
    df = pyogrio.read_dataframe(naturalearth_lowres, where="iso_a3 in ('CAN', 'PER')")
    pyogrio.write_dataframe(df.loc[df.iso_a3 == "CAN"], tmp_path / "test1.shp")
    pyogrio.write_dataframe(df.loc[df.iso_a3 == "PER"], tmp_path / "test2.shp")

    path = tmp_path / "test.zip"
    with ZipFile(path, mode="w", compression=ZIP_DEFLATED, compresslevel=5) as out:
        for ext in ["dbf", "prj", "shp", "shx"]:
            filename = f"test1.{ext}"
            out.write(tmp_path / filename, filename)

            filename = f"test2.{ext}"
            out.write(tmp_path / filename, f"/a/b/{filename}")

    # defaults to the first shapefile found, at lowest subdirectory
    df = pyogrio.read_dataframe(path)
    assert df.iso_a3[0] == "CAN"

    # selecting a shapefile from within the zip requires "!"" archive specifier
    df = pyogrio.read_dataframe(f"{path}!test1.shp")
    assert df.iso_a3[0] == "CAN"

    df = pyogrio.read_dataframe(f"{path}!/a/b/test2.shp")
    assert df.iso_a3[0] == "PER"

    # specifying zip:// scheme should also work
    df = pyogrio.read_dataframe(f"zip://{path}!/a/b/test2.shp")
    assert df.iso_a3[0] == "PER"

    # specifying /vsizip/ should also work but path must already be in GDAL ready
    # format without the "!"" archive specifier
    df = pyogrio.read_dataframe(f"/vsizip/{path}/a/b/test2.shp")
    assert df.iso_a3[0] == "PER"


@pytest.mark.network
def test_url():
    url = "https://raw.githubusercontent.com/geopandas/pyogrio/main/pyogrio/tests/fixtures/naturalearth_lowres/naturalearth_lowres.shp"

    result = pyogrio.raw.read(url)
    assert len(result[2]) == 177

    result = pyogrio.read_info(url)
    assert result["features"] == 177

    result = pyogrio.read_bounds(url)
    assert len(result[0]) == 177


@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_url_dataframe():
    url = "https://raw.githubusercontent.com/geopandas/pyogrio/main/pyogrio/tests/fixtures/naturalearth_lowres/naturalearth_lowres.shp"

    assert len(pyogrio.read_dataframe(url)) == 177


@pytest.mark.network
def test_url_with_zip():
    url = "zip+https://s3.amazonaws.com/fiona-testing/coutwildrnp.zip"

    result = pyogrio.raw.read(url)
    assert len(result[2]) == 67

    result = pyogrio.read_info(url)
    assert result["features"] == 67

    result = pyogrio.read_bounds(url)
    assert len(result[0]) == 67


@pytest.mark.network
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_url_with_zip_dataframe():
    url = "zip+https://s3.amazonaws.com/fiona-testing/coutwildrnp.zip"
    df = pyogrio.read_dataframe(url)
    assert len(df) == 67


@pytest.fixture
def aws_env_setup(monkeypatch):
    monkeypatch.setenv("AWS_NO_SIGN_REQUEST", "YES")


@pytest.mark.network
def test_uri_s3(aws_env_setup):
    url = "zip+s3://fiona-testing/coutwildrnp.zip"

    result = pyogrio.raw.read(url)
    assert len(result[2]) == 67

    result = pyogrio.read_info(url)
    assert result["features"] == 67

    result = pyogrio.read_bounds(url)
    assert len(result[0]) == 67


@pytest.mark.network
@pytest.mark.skipif(not has_geopandas, reason="GeoPandas not available")
def test_uri_s3_dataframe(aws_env_setup):
    df = pyogrio.read_dataframe("zip+s3://fiona-testing/coutwildrnp.zip")
    assert len(df) == 67


def test_get_vsi_path_or_buffer_obj_to_string():
    path = Path("/tmp/test.gpkg")
    assert get_vsi_path_or_buffer(path) == str(path)


def test_get_vsi_path_or_buffer_fixtures_to_string(tmp_path):
    path = tmp_path / "test.gpkg"
    assert get_vsi_path_or_buffer(path) == str(path)


@pytest.mark.parametrize(
    "raw_path", ["/vsimem/test.shp.zip", "/vsizip//vsimem/test.shp.zip"]
)
def test_vsimem_path_exception(raw_path):
    with pytest.raises(ValueError, match=""):
        vsi_path(raw_path)
