{"doy": "dayofyear", "dow": "weekday", "milliseconds": "millisecond"},
)
- def get_select_precolumns(self, select, **kw):
- s = select._distinct and "DISTINCT " or ""
-
- if select._simple_int_limit and not select._offset:
- kw["literal_execute"] = True
- s += "TOP %s " % self.process(select._limit_clause, **kw)
-
- if select._offset:
- raise NotImplementedError("Sybase ASE does not support OFFSET")
- return s
-
def get_from_hint_text(self, table, text):
return text
def limit_clause(self, select, **kw):
- # Limit in sybase is after the select keyword
- return ""
+ text = ""
+ if select._limit_clause is not None:
+ text += " ROWS LIMIT " + self.process(select._limit_clause, **kw)
+ if select._offset_clause is not None:
+ if select._limit_clause is None:
+ text += " ROWS"
+ text += " OFFSET " + self.process(select._offset_clause, **kw)
+ return text
def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
from sqlalchemy import select
from sqlalchemy import sql
from sqlalchemy.dialects import sybase
-from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import fixtures
'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % subst,
)
- def test_offset_not_supported(self):
- stmt = select([1]).offset(10)
- assert_raises_message(
- NotImplementedError,
- "Sybase ASE does not support OFFSET",
- stmt.compile,
- dialect=self.__dialect__,
+ def test_limit_offset(self):
+ stmt = select([1]).limit(5).offset(6)
+ assert stmt.compile().params == {"param_1": 5, "param_2": 6}
+ self.assert_compile(
+ stmt, "SELECT 1 ROWS LIMIT :param_1 OFFSET :param_2"
)
+ def test_offset(self):
+ stmt = select([1]).offset(10)
+ assert stmt.compile().params == {"param_1": 10}
+ self.assert_compile(stmt, "SELECT 1 ROWS OFFSET :param_1")
+
+ def test_limit(self):
+ stmt = select([1]).limit(5)
+ assert stmt.compile().params == {"param_1": 5}
+ self.assert_compile(stmt, "SELECT 1 ROWS LIMIT :param_1")
+
def test_delete_extra_froms(self):
t1 = sql.table("t1", sql.column("c1"))
t2 = sql.table("t2", sql.column("c1"))