import re
from pystratum_backend.StratumStyle import StratumStyle
from pystratum_common.exception.LoaderException import LoaderException
from pystratum_common.helper.DataTypeHelper import DataTypeHelper
from pystratum_common.helper.RoutineLoaderHelper import RoutineLoaderHelper
from pystratum_mssql.helper.MsSqlDataTypeHelper import MsSqlDataTypeHelper
from pystratum_mssql.MsSqlMetadataDataLayer import MsSqlMetadataDataLayer
[docs]class MsSqlRoutineLoaderHelper(RoutineLoaderHelper):
"""
Class for loading a single stored routine into a SQL Server instance from a (pseudo) SQL file.
"""
# ------------------------------------------------------------------------------------------------------------------
def __init__(self,
io: StratumStyle,
dl: MsSqlMetadataDataLayer,
routine_filename,
routine_file_encoding,
pystratum_old_metadata,
replace_pairs,
rdbms_old_metadata):
"""
Object constructor.
:param str routine_filename: The filename of the source of the stored routine.
:param str routine_file_encoding: The encoding of the source file.
:param dict pystratum_old_metadata: The metadata of the stored routine from PyStratum.
:param dict[str,str] replace_pairs: A map from placeholders to their actual values.
:param dict rdbms_old_metadata: The old metadata of the stored routine from MS SQL Server.
:param pystratum.style.PyStratumStyle.PyStratumStyle io: The output decorator.
"""
RoutineLoaderHelper.__init__(self,
io,
routine_filename,
routine_file_encoding,
pystratum_old_metadata,
replace_pairs,
rdbms_old_metadata)
self._routine_base_name: str = ''
"""
The name of the stored routine without schema name.
"""
self._routines_schema_name: str = ''
"""
The name of the schema of the stored routine.
"""
self._dl: MsSqlMetadataDataLayer = dl
"""
The metadata layer.
"""
# ------------------------------------------------------------------------------------------------------------------
def _drop_routine(self) -> None:
"""
Drops the stored routine if it exists.
"""
if self._rdbms_old_metadata:
if self._rdbms_old_metadata['routine_type'].strip() == 'P':
routine_type = 'procedure'
elif self._rdbms_old_metadata['routine_type'].strip() in ('FN', 'TF'):
routine_type = 'function'
else:
raise Exception("Unknown routine type '{0}'.".format(self._rdbms_old_metadata['routine_type']))
self._dl.drop_stored_routine(routine_type,
self._rdbms_old_metadata['schema_name'],
self._routine_base_name)
# ------------------------------------------------------------------------------------------------------------------
def _get_bulk_insert_table_columns_info(self) -> None:
"""
Gets the column names and column types of the current table for bulk insert.
"""
raise NotImplementedError()
# ------------------------------------------------------------------------------------------------------------------
def _get_data_type_helper(self) -> DataTypeHelper:
"""
Returns a data type helper object for SQL Server.
:rtype: pystratum.helper.DataTypeHelper.DataTypeHelper
"""
return MsSqlDataTypeHelper()
# ------------------------------------------------------------------------------------------------------------------
def _get_name(self) -> None:
"""
Extracts the name of the stored routine and the stored routine type (i.e. procedure or function) source.
"""
p = re.compile(r"create\s+(procedure|function)\s+(?:(\w+)\.([a-zA-Z0-9_]+))", re.IGNORECASE)
matches = p.findall(self._routine_source_code)
if matches:
self._routine_type = matches[0][0].lower()
self._routines_schema_name = matches[0][1]
self._routine_base_name = matches[0][2]
if self._routine_name != (matches[0][1] + '.' + matches[0][2]):
raise LoaderException(
'Stored routine name {0}.{1} does not match filename in file {2}'.format(matches[0][1],
matches[0][2],
self._source_filename))
if not self._routine_type:
raise LoaderException('Unable to find the stored routine name and type in file {0}'.
format(self._source_filename))
# ------------------------------------------------------------------------------------------------------------------
def _get_routine_parameters_info(self) -> None:
"""
Retrieves information about the stored routine parameters from the meta data of SQL Server.
"""
routine_parameters = self._dl.get_routine_parameters(self._routines_schema_name,
self._routine_base_name)
if len(routine_parameters) != 0:
for routine_parameter in routine_parameters:
if routine_parameter['parameter_name']:
parameter_name = routine_parameter['parameter_name'][1:]
value = routine_parameter['type_name']
self._parameters.append({'name': parameter_name,
'data_type': routine_parameter['type_name'],
'numeric_precision': routine_parameter['precision'],
'numeric_scale': routine_parameter['scale'],
'data_type_descriptor': value})
# ------------------------------------------------------------------------------------------------------------------
def _is_start_of_stored_routine(self, line: str) -> bool:
"""
Returns True if a line is the start of the code of the stored routine.
:param str line: The line with source code of the stored routine.
:rtype: bool
"""
return re.match(r'^\s*create\s+(procedure|function)', line, re.IGNORECASE) is not None
# ------------------------------------------------------------------------------------------------------------------
def _is_start_of_stored_routine_body(self, line: str) -> bool:
"""
Returns True if a line is the start of the body of the stored routine.
:param str line: The line with source code of the stored routine.
:rtype: bool
"""
return re.match(r'^\s*as', line, re.IGNORECASE) is not None
# ------------------------------------------------------------------------------------------------------------------
def _load_routine_file(self) -> None:
"""
Loads the stored routine into the SQL Server instance.
"""
self._io.text('Loading {0} <dbo>{1}</dbo>'.format(self._routine_type, self._routine_name))
self._set_magic_constants()
routine_source = []
i = 0
for line in self._routine_source_code_lines:
new_line = line
self._replace['__LINE__'] = "'%d'" % (i + 1)
for search, replace in self._replace.items():
tmp = re.findall(search, new_line, re.IGNORECASE)
if tmp:
new_line = new_line.replace(tmp[0], replace)
routine_source.append(new_line)
i += 1
routine_source = "\n".join(routine_source)
self._unset_magic_constants()
if self._rdbms_old_metadata:
if self._pystratum_old_metadata and self._pystratum_old_metadata['designation'] == \
self._pystratum_metadata['designation']:
p = re.compile(r'(create\s+(procedure|function))', re.IGNORECASE)
matches = p.findall(routine_source)
if matches:
routine_source = routine_source.replace(matches[0][0], 'alter %s' % matches[0][1])
else:
raise LoaderException("Unable to find the stored routine type in modified source of file '{0}'".
format(self._source_filename))
else:
self._drop_routine()
self._dl.execute_none(routine_source)
self._dl.commit()
# ------------------------------------------------------------------------------------------------------------------
def _must_reload(self) -> bool:
"""
Returns True if the source file must be load or reloaded. Otherwise returns False.
:rtype: bool
"""
if not self._pystratum_old_metadata:
return True
if self._pystratum_old_metadata['timestamp'] != self._m_time:
return True
if self._pystratum_old_metadata['replace']:
for key, value in self._pystratum_old_metadata['replace'].items():
if key.lower() not in self._replace_pairs or self._replace_pairs[key.lower()] != value:
return True
if not self._rdbms_old_metadata:
return True
return False
# ------------------------------------------------------------------------------------------------------------------
def _update_metadata(self) -> None:
"""
Updates the metadata of the stored routine.
"""
# Update general metadata.
RoutineLoaderHelper._update_metadata(self)
# Update SQL Server specific metadata.
self._pystratum_metadata['schema_name'] = self._routines_schema_name
# Update SQL Server specific metadata.
self._pystratum_metadata['routine_base_name'] = self._routine_base_name
# ----------------------------------------------------------------------------------------------------------------------