import csv
import re
import sys
from time import gmtime, strftime
from typing import Any, Dict, List, Optional
from pystratum_middle.exception.ResultException import ResultException
from pystratum_mssql.MsSqlConnector import MsSqlConnector
[docs]class MsSqlDataLayer:
"""
Class for connecting to a SQL Server instance and executing SQL statements. Also, a parent class for classes with
static wrapper methods for executing stored procedures and functions.
"""
# ------------------------------------------------------------------------------------------------------------------
_suppress_superfluous_messages = True
"""
If set superfluous messages like below will be suppressed:
* "Warning: Null value is eliminated by an aggregate or other SET operation."
* The module ... depends on the missing object .... The module will still be created; however, it cannot run
successfully until the object exists.
:type: bool
"""
line_buffered = True
"""
If True log messages from stored procedures with designation type 'log' are line buffered (Note: In python
sys.stdout is buffered by default).
:type: bool
"""
# ------------------------------------------------------------------------------------------------------------------
def __init__(self, connector: MsSqlConnector):
"""
Object constructor.
"""
self.__connector: MsSqlConnector = connector
"""
The object for connecting to a MySQL instance.
"""
self.__conn: Optional[Any] = None
"""
The connection between Python and the MySQL instance.
"""
# ------------------------------------------------------------------------------------------------------------------
[docs] def autocommit(self, status: bool) -> None:
"""
Sets auto commit mode.
See http://pymssql.org/en/stable/ref/pymssql.html#pymssql.Connection.autocommit.
:param bool status: True: Auto commit on. False: Auto commit off.
"""
self.__conn.autocommit(status)
# ------------------------------------------------------------------------------------------------------------------
[docs] def commit(self) -> None:
"""
Commits the current transaction.
See http://pymssql.org/en/stable/ref/pymssql.html#pymssql.Connection.commit.
"""
self.__conn.commit()
# ------------------------------------------------------------------------------------------------------------------
[docs] def connect(self) -> None:
"""
Connects to a MS SQL Server instance.
"""
self.__conn = self.__connector.connect()
# ------------------------------------------------------------------------------------------------------------------
[docs] def disconnect(self) -> None:
"""
Disconnects from the MS SQL Server instance.
See http://pymssql.org/en/stable/ref/pymssql.html#pymssql.Connection.close.
"""
self.__conn = None
self.__connector.disconnect()
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_csv(self, sql: str, filename: str, dialect: str = 'unix', encoding: str = 'utf-8') -> int:
file = open(filename, 'w', encoding=encoding)
csv_file = csv.writer(file, dialect=dialect)
# Run the query.
cursor = self.__conn.cursor(as_dict=False)
cursor.execute(sql)
# Store all rows in CSV format in the file.
n = 0
for row in cursor:
csv_file.writerow(row)
n += 1
# Close the CSV file and the cursor.
file.close()
cursor.close()
return n
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_log(self, sql: str, *params) -> int:
"""
Executes a query with log statements. Returns the number of lines in the log.
:param str sql: The SQL statement.
:param iterable params: The parameters.
:rtype: int
"""
cursor = self.__conn.cursor()
cursor.execute(sql, params)
n = 0
next_set = True
while next_set:
stamp = strftime('%Y-%m-%d %H:%M:%S', gmtime())
for row in cursor:
print(stamp, end='')
for field in row:
print(' %s' % field, end='')
print('', flush=self.line_buffered)
n += 1
next_set = cursor.nextset()
cursor.close()
return n
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_none(self, sql: str, *params) -> None:
"""
Executes a query that does not select any rows.
:param str sql: The SQL statement.
:param iterable params: The parameters.
:rtype: None
"""
cursor = self.__conn.cursor()
cursor.execute(sql, *params)
cursor.close()
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_row0(self, sql, *params) -> Optional[Dict[str, Any]]:
"""
Executes a query that selects 0 or 1 row. Returns the selected row or None.
:param str sql: The SQL statement.
:param iterable params: The parameters.
:rtype: None|dict[str,*]
"""
cursor = self.__conn.cursor(as_dict=True)
cursor.execute(sql, *params)
rows = cursor.fetchall()
cursor.close()
n = len(rows)
if n == 1:
return rows[0]
elif n == 0:
return None
else:
raise ResultException('0 or 1', n, sql)
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_row1(self, sql: str, *params) -> Dict[str, Any]:
"""
Executes a query that selects 1 row. Returns the selected row.
:param str sql: The SQL statement.
:param iterable params: The parameters.
:rtype: dict[str,*]
"""
cursor = self.__conn.cursor(as_dict=True)
cursor.execute(sql, *params)
rows = cursor.fetchall()
cursor.close()
n = len(rows)
if n != 1:
raise ResultException('1', n, sql)
return rows[0]
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
"""
Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows
are selected).
:param str sql: The SQL statement.
:param iterable params: The parameters.
:rtype: list[dict[str,*]]
"""
cursor = self.__conn.cursor(as_dict=True)
cursor.execute(sql, *params)
rows = cursor.fetchall()
cursor.close()
return rows
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_singleton0(self, sql: str, *params) -> Any:
"""
Executes a query that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
:param str sql: The SQL statement.
:param iterable params: The parameters.
:rtype: *
"""
cursor = self.__conn.cursor()
cursor.execute(sql, *params)
rows = cursor.fetchall()
cursor.close()
n = len(rows)
if n == 1:
return rows[0][0]
elif n == 0:
return None
else:
raise ResultException('0 or 1', n, sql)
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_singleton1(self, sql: str, *params) -> Any:
"""
Executes a query that selects 1 row with 1 column. Returns the value of the selected column.
:param str sql:The SQL statement.
:param iterable params: The parameters.
:rtype: *
"""
cursor = self.__conn.cursor()
cursor.execute(sql, *params)
rows = cursor.fetchall()
cursor.close()
n = len(rows)
if n != 1:
raise ResultException('1', n, sql)
return rows[0][0]
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_sp_none(self, sql: str, *params) -> None:
"""
Executes a stored routine that does not select any rows.
:param str sql: The SQL calling the stored procedure.
:param iterable params: The parameters for the stored procedure.
:rtype: None
"""
cursor = self.__conn.cursor()
cursor.execute(sql, params)
cursor.close()
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_sp_row0(self, sql: str, *params) -> Optional[Dict[str, Any]]:
"""
Executes a stored procedure that selects 0 or 1 row. Returns the selected row or None.
:param str sql: The SQL call the the stored procedure.
:param iterable params: The parameters for the stored procedure.
:rtype: None|dict[str,*]
"""
cursor = self.__conn.cursor(as_dict=True)
cursor.execute(sql, params)
rows = cursor.fetchall()
cursor.close()
n = len(rows)
if n == 1:
return rows[0]
elif n == 0:
return None
else:
raise ResultException('0 or 1', n, sql)
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_sp_row1(self, sql: str, *params) -> Dict[str, Any]:
"""
Executes a stored procedure that selects 1 row. Returns the selected row.
:param str sql: The SQL calling the the stored procedure.
:param iterable params: The parameters for the stored procedure.
:rtype: dict[str,*]
"""
cursor = self.__conn.cursor(as_dict=True)
cursor.execute(sql, params)
rows = cursor.fetchall()
cursor.close()
n = len(rows)
if n != 1:
raise ResultException('1', n, sql)
return rows[0]
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_sp_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
"""
Executes a stored procedure that selects 0 or more rows. Returns the selected rows (an empty list if no rows
are selected).
:param str sql: The SQL calling the the stored procedure.
:param iterable params: The parameters for the stored procedure.
:rtype: list[dict[str,*]]
"""
cursor = self.__conn.cursor(as_dict=True)
cursor.execute(sql, params)
rows = cursor.fetchall()
cursor.close()
return rows
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_sp_singleton0(self, sql: str, *params) -> Any:
"""
Executes a stored procedure that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
:param str sql: The SQL calling the stored procedure.
:param iterable params: The parameters for the stored procedure.
:rtype: *
"""
cursor = self.__conn.cursor()
cursor.execute(sql, params)
rows = cursor.fetchall()
cursor.close()
n = len(rows)
if n == 1:
return rows[0][0]
elif n == 0:
return None
else:
raise ResultException('0 or 1', n, sql)
# ------------------------------------------------------------------------------------------------------------------
[docs] def execute_sp_singleton1(self, sql: str, *params) -> Any:
"""
Executes a stored routine with designation type "table", i.e a stored routine that is expected to select 1 row
with 1 column.
:param str sql: The SQL calling the the stored procedure.
:param iterable params: The parameters for the stored procedure.
:rtype: * The value of the selected column.
"""
cursor = self.__conn.cursor()
cursor.execute(sql, params)
rows = cursor.fetchall()
cursor.close()
n = len(rows)
if n != 1:
raise ResultException('1', n, sql)
return rows[0][0]
# ------------------------------------------------------------------------------------------------------------------
[docs] def rollback(self) -> None:
"""
Rolls back the current transaction.
See http://pymssql.org/en/stable/ref/pymssql.html#pymssql.Connection.rollback.
"""
self.__conn.rollback()
# ------------------------------------------------------------------------------------------------------------------
[docs] @staticmethod
def stratum_msg_handler(msgstate: str, severity: int, srvname: str, procname: str, line: int, msgtext: bin) -> None:
"""
Custom message handler suppressing some superfluous messages.
"""
if severity > 0:
print("Error at line %d: %s" % (line, msgtext.decode("utf-8")), file=sys.stderr)
else:
msg = msgtext.decode("utf-8")
# Suppress bogus messages if flag is set.
if MsSqlDataLayer._suppress_superfluous_messages:
# @todo Make this method more flexible by using two lists. One with strings and one on regex to
# suppress.
if msg == 'Warning: Null value is eliminated by an aggregate or other SET operation.':
return
if re.match(
"^The module \'.*\' depends on the missing object \'.*\'. The module will still be created; "
"however, it cannot run successfully until the object exists.$",
msg):
return
print(msg)
# ----------------------------------------------------------------------------------------------------------------------