一聚教程网:一个值得你收藏的教程网站

最新下载

热门教程

C#使用SqlDependency实现数据库数据变化监听

时间:2026-06-11 08:38:54 编辑:袖梨 来源:一聚教程网

一、核心原理与架构

┌─────────────────────────────────────────────────────────────┐
│                    SqlDependency 监听架构                   │
├─────────────────────────────────────────────────────────────┤
│  应用程序      │  SqlDependency   │  SQL Server Service     │
│                │                   │  Broker (SSB)           │
│  • 注册监听   │  • 订阅查询变更   │  • 消息队列              │
│  • 事件处理   │  • 触发 OnChange  │  • 通知传递              │
│  • 业务逻辑   │  • 自动重连       │  • 事务支持              │
└─────────────────────────────────────────────────────────────┘

二、数据库环境配置

2.1 启用 Service Broker

-- 1. 检查数据库是否启用 Service Broker
SELECT name, is_broker_enabled 
FROM sys.databases 
WHERE name = 'YourDatabaseName';
-- 2. 启用 Service Broker(需要单用户模式)
ALTER DATABASE YourDatabaseName SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
ALTER DATABASE YourDatabaseName SET ENABLE_BROKER;
ALTER DATABASE YourDatabaseName SET MULTI_USER;
-- 3. 创建队列和服务的权限
USE YourDatabaseName;
GRANT CREATE CONTRACT TO [YourUserName];
GRANT CREATE SERVICE TO [YourUserName];
GRANT CREATE QUEUE TO [YourUserName];
GRANT REFERENCES ON CONTRACT::[DEFAULT] TO [YourUserName];

2.2 创建测试表

CREATE TABLE [dbo].[MonitorTable] (
    [Id] INT IDENTITY(1,1) PRIMARY KEY,
    [Name] NVARCHAR(100) NOT NULL,
    [Value] DECIMAL(18,2) NOT NULL,
    [UpdateTime] DATETIME DEFAULT GETDATE()
);
-- 插入测试数据
INSERT INTO MonitorTable (Name, Value) VALUES ('Test1', 100.50);
INSERT INTO MonitorTable (Name, Value) VALUES ('Test2', 200.75);

三、完整源码实现

3.1 数据库连接管理 (DatabaseManager.cs)

using System;
using System.Configuration;
using System.Data.SqlClient;

namespace DatabaseMonitor
{
    public static class DatabaseManager
    {
        private static string _connectionString;
        
        static DatabaseManager()
        {
            // 从配置文件读取连接字符串
            _connectionString = ConfigurationManager.ConnectionStrings["DefaultConnection"]?.ConnectionString;
            
            if (string.IsNullOrEmpty(_connectionString))
            {
                // 默认连接字符串
                _connectionString = @"Data Source=(local);Initial Catalog=YourDatabaseName;Integrated Security=True;";
            }
        }
        
        public static string ConnectionString => _connectionString;
        
        /// <summary>
        /// 测试数据库连接
        /// </summary>
        public static bool TestConnection()
        {
            try
            {
                using (var connection = new SqlConnection(_connectionString))
                {
                    connection.Open();
                    return connection.State == System.Data.ConnectionState.Open;
                }
            }
            catch
            {
                return false;
            }
        }
        
        /// <summary>
        /// 启用 SQL Server Service Broker
        /// </summary>
        public static void EnableServiceBroker()
        {
            try
            {
                using (var connection = new SqlConnection(_connectionString))
                {
                    connection.Open();
                    var command = new SqlCommand(@"
                        IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = DB_NAME() AND is_broker_enabled = 1)
                        BEGIN
                            ALTER DATABASE CURRENT SET ENABLE_BROKER WITH ROLLBACK IMMEDIATE;
                        END", connection);
                    command.ExecuteNonQuery();
                }
            }
            catch (Exception ex)
            {
                throw new Exception($"启用 Service Broker 失败: {ex.Message}");
            }
        }
    }
}

3.2 数据库变化监听器 (DatabaseChangeListener.cs)

using System;
using System.Data;
using System.Data.SqlClient;
using System.Diagnostics;

namespace DatabaseMonitor
{
    /// <summary>
    /// 数据库变化监听器
    /// </summary>
    public class DatabaseChangeListener : IDisposable
    {
        private SqlConnection _connection;
        private SqlCommand _command;
        private SqlDependency _dependency;
        private bool _isListening;
        private string _tableName;
        private string _connectionString;
        
        // 事件定义
        public event EventHandler<DatabaseChangeEventArgs> OnDatabaseChanged;
        public event EventHandler<string> OnError;
        public event EventHandler OnStarted;
        public event EventHandler OnStopped;
        
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="tableName">要监听的表名</param>
        /// <param name="connectionString">连接字符串</param>
        public DatabaseChangeListener(string tableName, string connectionString = null)
        {
            _tableName = tableName;
            _connectionString = connectionString ?? DatabaseManager.ConnectionString;
            _isListening = false;
        }
        
        /// <summary>
        /// 开始监听
        /// </summary>
        public void Start()
        {
            try
            {
                if (_isListening)
                {
                    Stop();
                }
                
                // 确保 Service Broker 已启用
                DatabaseManager.EnableServiceBroker();
                
                // 创建连接和命令
                _connection = new SqlConnection(_connectionString);
                _command = new SqlCommand(GetSelectQuery(), _connection);
                
                // 设置命令通知
                SqlDependency.Start(_connectionString);
                
                // 创建依赖
                _dependency = new SqlDependency(_command);
                _dependency.OnChange += OnDependencyChange;
                
                // 打开连接并执行命令
                _connection.Open();
                _command.ExecuteReader(CommandBehavior.CloseConnection);
                
                _isListening = true;
                OnStarted?.Invoke(this, EventArgs.Empty);
                
                Debug.WriteLine($"开始监听表: {_tableName}");
            }
            catch (Exception ex)
            {
                OnError?.Invoke(this, $"启动监听失败: {ex.Message}");
                Stop();
            }
        }
        
        /// <summary>
        /// 停止监听
        /// </summary>
        public void Stop()
        {
            try
            {
                if (_dependency != null)
                {
                    _dependency.OnChange -= OnDependencyChange;
                    _dependency = null;
                }
                
                if (_command != null)
                {
                    _command.Dispose();
                    _command = null;
                }
                
                if (_connection != null)
                {
                    if (_connection.State == ConnectionState.Open)
                    {
                        _connection.Close();
                    }
                    _connection.Dispose();
                    _connection = null;
                }
                
                _isListening = false;
                OnStopped?.Invoke(this, EventArgs.Empty);
                
                Debug.WriteLine($"停止监听表: {_tableName}");
            }
            catch (Exception ex)
            {
                OnError?.Invoke(this, $"停止监听失败: {ex.Message}");
            }
        }
        
        /// <summary>
        /// 依赖变化事件处理
        /// </summary>
        private void OnDependencyChange(object sender, SqlNotificationEventArgs e)
        {
            try
            {
                Debug.WriteLine($"检测到数据库变化: {e.Type}, {e.Info}, {e.Source}");
                
                // 重新注册监听(SqlDependency 只能触发一次)
                if (_isListening)
                {
                    // 先停止当前监听
                    Stop();
                    
                    // 触发变化事件
                    var changeArgs = new DatabaseChangeEventArgs
                    {
                        TableName = _tableName,
                        ChangeType = GetChangeType(e.Type),
                        NotificationType = e.Type.ToString(),
                        Info = e.Info.ToString(),
                        Source = e.Source.ToString(),
                        ChangeTime = DateTime.Now
                    };
                    
                    OnDatabaseChanged?.Invoke(this, changeArgs);
                    
                    // 重新启动监听
                    Start();
                }
            }
            catch (Exception ex)
            {
                OnError?.Invoke(this, $"处理变化事件失败: {ex.Message}");
            }
        }
        
        /// <summary>
        /// 获取 SELECT 查询语句
        /// </summary>
        private string GetSelectQuery()
        {
            // 注意:SqlDependency 对查询有严格要求
            // 1. 不能使用 *,必须指定具体列名
            // 2. 不能使用 TOP
            // 3. 不能使用 DISTINCT
            // 4. 表名必须使用两节名称 [Schema].[TableName]
            // 5. 不能使用子查询
            return $"SELECT Id, Name, Value, UpdateTime FROM dbo.{_tableName}";
        }
        
        /// <summary>
        /// 获取变化类型
        /// </summary>
        private DatabaseChangeType GetChangeType(SqlNotificationType notificationType)
        {
            switch (notificationType)
            {
                case SqlNotificationType.Change:
                    return DatabaseChangeType.Update;
                case SqlNotificationType.Subscribe:
                    return DatabaseChangeType.Subscribe;
                case SqlNotificationType.Unknown:
                default:
                    return DatabaseChangeType.Unknown;
            }
        }
        
        /// <summary>
        /// 获取当前表数据
        /// </summary>
        public DataTable GetCurrentData()
        {
            var dataTable = new DataTable();
            
            try
            {
                using (var connection = new SqlConnection(_connectionString))
                using (var command = new SqlCommand(GetSelectQuery(), connection))
                {
                    connection.Open();
                    using (var adapter = new SqlDataAdapter(command))
                    {
                        adapter.Fill(dataTable);
                    }
                }
            }
            catch (Exception ex)
            {
                OnError?.Invoke(this, $"获取数据失败: {ex.Message}");
            }
            
            return dataTable;
        }
        
        /// <summary>
        /// 是否正在监听
        /// </summary>
        public bool IsListening => _isListening;
        
        /// <summary>
        /// 表名
        /// </summary>
        public string TableName => _tableName;
        
        public void Dispose()
        {
            Stop();
            SqlDependency.Stop(_connectionString);
        }
    }
    
    /// <summary>
    /// 数据库变化类型
    /// </summary>
    public enum DatabaseChangeType
    {
        Unknown,
        Insert,
        Update,
        Delete,
        Subscribe
    }
    
    /// <summary>
    /// 数据库变化事件参数
    /// </summary>
    public class DatabaseChangeEventArgs : EventArgs
    {
        public string TableName { get; set; }
        public DatabaseChangeType ChangeType { get; set; }
        public string NotificationType { get; set; }
        public string Info { get; set; }
        public string Source { get; set; }
        public DateTime ChangeTime { get; set; }
    }
}

3.3 多表监听器管理器 (MonitorManager.cs)

using System;
using System.Collections.Generic;
using System.Linq;

namespace DatabaseMonitor
{
    /// <summary>
    /// 数据库监听器管理器
    /// </summary>
    public class MonitorManager : IDisposable
    {
        private readonly List<DatabaseChangeListener> _listeners;
        private readonly object _lockObject = new object();
        
        public MonitorManager()
        {
            _listeners = new List<DatabaseChangeListener>();
        }
        
        /// <summary>
        /// 添加监听表
        /// </summary>
        public void AddTable(string tableName)
        {
            lock (_lockObject)
            {
                if (_listeners.Any(l => l.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase)))
                {
                    return;
                }
                
                var listener = new DatabaseChangeListener(tableName);
                listener.OnDatabaseChanged += OnDatabaseChanged;
                listener.OnError += OnError;
                listener.OnStarted += OnListenerStarted;
                listener.OnStopped += OnListenerStopped;
                
                _listeners.Add(listener);
            }
        }
        
        /// <summary>
        /// 移除监听表
        /// </summary>
        public void RemoveTable(string tableName)
        {
            lock (_lockObject)
            {
                var listener = _listeners.FirstOrDefault(l => 
                    l.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase));
                
                if (listener != null)
                {
                    listener.Dispose();
                    _listeners.Remove(listener);
                }
            }
        }
        
        /// <summary>
        /// 启动所有监听器
        /// </summary>
        public void StartAll()
        {
            lock (_lockObject)
            {
                foreach (var listener in _listeners)
                {
                    listener.Start();
                }
            }
        }
        
        /// <summary>
        /// 停止所有监听器
        /// </summary>
        public void StopAll()
        {
            lock (_lockObject)
            {
                foreach (var listener in _listeners)
                {
                    listener.Stop();
                }
            }
        }
        
        /// <summary>
        /// 获取所有监听表
        /// </summary>
        public List<string> GetMonitoredTables()
        {
            lock (_lockObject)
            {
                return _listeners.Select(l => l.TableName).ToList();
            }
        }
        
        /// <summary>
        /// 获取表当前数据
        /// </summary>
        public DataTable GetTableData(string tableName)
        {
            lock (_lockObject)
            {
                var listener = _listeners.FirstOrDefault(l => 
                    l.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase));
                
                return listener?.GetCurrentData();
            }
        }
        
        // 事件转发
        public event EventHandler<DatabaseChangeEventArgs> OnDatabaseChanged;
        public event EventHandler<string> OnError;
        public event EventHandler<string> OnListenerStarted;
        public event EventHandler<string> OnListenerStopped;
        
        private void OnDatabaseChanged(object sender, DatabaseChangeEventArgs e)
        {
            OnDatabaseChanged?.Invoke(sender, e);
        }
        
        private void OnError(object sender, string message)
        {
            OnError?.Invoke(sender, message);
        }
        
        private void OnListenerStarted(object sender, EventArgs e)
        {
            var listener = sender as DatabaseChangeListener;
            OnListenerStarted?.Invoke(sender, listener?.TableName ?? "Unknown");
        }
        
        private void OnListenerStopped(object sender, EventArgs e)
        {
            var listener = sender as DatabaseChangeListener;
            OnListenerStopped?.Invoke(sender, listener?.TableName ?? "Unknown");
        }
        
        public void Dispose()
        {
            StopAll();
            lock (_lockObject)
            {
                foreach (var listener in _listeners)
                {
                    listener.Dispose();
                }
                _listeners.Clear();
            }
        }
    }
}

3.4 主程序 (Program.cs)

using System;
using System.Data;
using System.Threading;

namespace DatabaseMonitor
{
    class Program
    {
        private static MonitorManager _monitorManager;
        private static bool _isRunning = true;
        
        static void Main(string[] args)
        {
            Console.WriteLine("====== 数据库变化监听程序 ======");
            Console.WriteLine($"启动时间: {DateTime.Now:yyyy-MM-dd HH:mm:ss}");
            Console.WriteLine("按 'Q' 键退出程序n");
            
            try
            {
                // 测试数据库连接
                if (!DatabaseManager.TestConnection())
                {
                    Console.WriteLine("❌ 数据库连接失败!");
                    Console.WriteLine("请检查连接字符串和数据库服务状态。");
                    Console.ReadKey();
                    return;
                }
                
                Console.WriteLine("✅ 数据库连接成功!");
                
                // 创建监听器管理器
                _monitorManager = new MonitorManager();
                
                // 注册事件
                _monitorManager.OnDatabaseChanged += OnDatabaseChanged;
                _monitorManager.OnError += OnError;
                _monitorManager.OnListenerStarted += OnListenerStarted;
                _monitorManager.OnListenerStopped += OnListenerStopped;
                
                // 添加要监听的表
                _monitorManager.AddTable("MonitorTable");
                // 可以继续添加其他表
                // _monitorManager.AddTable("AnotherTable");
                // _monitorManager.AddTable("ThirdTable");
                
                // 启动监听
                _monitorManager.StartAll();
                
                Console.WriteLine("n开始监听数据库变化...");
                Console.WriteLine("现在您可以尝试在数据库中插入、更新或删除数据。n");
                
                // 显示当前数据
                DisplayCurrentData();
                
                // 等待用户输入
                while (_isRunning)
                {
                    if (Console.KeyAvailable)
                    {
                        var key = Console.ReadKey(true);
                        if (key.Key == ConsoleKey.Q)
                        {
                            _isRunning = false;
                            break;
                        }
                        else if (key.Key == ConsoleKey.R)
                        {
                            DisplayCurrentData();
                        }
                        else if (key.Key == ConsoleKey.A)
                        {
                            AddTestData();
                        }
                    }
                    
                    Thread.Sleep(100);
                }
                
                // 停止监听
                Console.WriteLine("n正在停止监听...");
                _monitorManager.StopAll();
                _monitorManager.Dispose();
                
                Console.WriteLine("程序已退出。");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"❌ 程序异常: {ex.Message}");
                Console.WriteLine(ex.StackTrace);
            }
            
            Console.WriteLine("按任意键退出...");
            Console.ReadKey();
        }
        
        /// <summary>
        /// 数据库变化事件处理
        /// </summary>
        private static void OnDatabaseChanged(object sender, DatabaseChangeEventArgs e)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine($"n? 检测到数据库变化!");
            Console.WriteLine($"   表名: {e.TableName}");
            Console.WriteLine($"   时间: {e.ChangeTime:HH:mm:ss.fff}");
            Console.WriteLine($"   类型: {e.ChangeType}");
            Console.WriteLine($"   详情: {e.Info}");
            Console.ResetColor();
            
            // 这里可以添加您的业务逻辑
            // 例如:发送通知、更新缓存、同步到其他系统等
            HandleBusinessLogic(e);
        }
        
        /// <summary>
        /// 处理业务逻辑
        /// </summary>
        private static void HandleBusinessLogic(DatabaseChangeEventArgs e)
        {
            // 示例:根据不同的变化类型执行不同的操作
            switch (e.ChangeType)
            {
                case DatabaseChangeType.Insert:
                    Console.WriteLine("   ? 执行新增数据后的业务逻辑...");
                    break;
                case DatabaseChangeType.Update:
                    Console.WriteLine("   ? 执行更新数据后的业务逻辑...");
                    break;
                case DatabaseChangeType.Delete:
                    Console.WriteLine("   ? 执行删除数据后的业务逻辑...");
                    break;
            }
        }
        
        /// <summary>
        /// 错误事件处理
        /// </summary>
        private static void OnError(object sender, string message)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine($"n❌ 错误: {message}");
            Console.ResetColor();
        }
        
        /// <summary>
        /// 监听器启动事件
        /// </summary>
        private static void OnListenerStarted(object sender, string tableName)
        {
            Console.WriteLine($"✅ 开始监听表: {tableName}");
        }
        
        /// <summary>
        /// 监听器停止事件
        /// </summary>
        private static void OnListenerStopped(object sender, string tableName)
        {
            Console.WriteLine($"⏹️ 停止监听表: {tableName}");
        }
        
        /// <summary>
        /// 显示当前数据
        /// </summary>
        private static void DisplayCurrentData()
        {
            Console.WriteLine("n--- 当前表数据 ---");
            var data = _monitorManager.GetTableData("MonitorTable");
            
            if (data != null && data.Rows.Count > 0)
            {
                foreach (DataRow row in data.Rows)
                {
                    Console.WriteLine($"ID: {row["Id"]}, Name: {row["Name"]}, Value: {row["Value"]}, Time: {row["UpdateTime"]}");
                }
            }
            else
            {
                Console.WriteLine("表中暂无数据。");
            }
            Console.WriteLine("--------------------n");
        }
        
        /// <summary>
        /// 添加测试数据
        /// </summary>
        private static void AddTestData()
        {
            try
            {
                using (var connection = new SqlConnection(DatabaseManager.ConnectionString))
                using (var command = new SqlCommand(
                    "INSERT INTO MonitorTable (Name, Value) VALUES (@Name, @Value)", connection))
                {
                    command.Parameters.AddWithValue("@Name", $"Test_{DateTime.Now:HHmmss}");
                    command.Parameters.AddWithValue("@Value", new Random().Next(100, 1000));
                    
                    connection.Open();
                    int rows = command.ExecuteNonQuery();
                    
                    if (rows > 0)
                    {
                        Console.WriteLine("✅ 测试数据添加成功!");
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"❌ 添加测试数据失败: {ex.Message}");
            }
        }
    }
}

3.5 配置文件 (App.config)

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <connectionStrings>
    <add name="DefaultConnection" 
         connectionString="Data Source=(local);Initial Catalog=YourDatabaseName;Integrated Security=True;" 
         providerName="System.Data.SqlClient" />
  </connectionStrings>
  <appSettings>
    <add key="MonitorTables" value="MonitorTable,AnotherTable,ThirdTable"/>
    <add key="CheckInterval" value="5000"/>
    <add key="EnableLogging" value="true"/>
  </appSettings>
</configuration>

四、使用说明与注意事项

4.1 SqlDependency 使用限制

限制项说明解决方案
查询限制不能使用 *, TOP, DISTINCT明确指定列名
表名格式必须使用 [Schema].[TableName]使用 dbo.TableName
子查询不支持子查询使用简单查询
聚合函数不支持 COUNT(), SUM()查询基础表
临时表不支持临时表使用永久表

4.2 性能优化建议

  1. 合理选择监听表:只监听真正需要监控的表
  2. 优化查询语句:确保查询高效,避免全表扫描
  3. 及时释放资源:在不需要监听时及时停止
  4. 异常处理:完善异常处理,避免监听意外终止
  5. 连接池管理:复用数据库连接,避免频繁创建连接

4.3 生产环境部署

权限配置

C#使用SqlDependency实现监听数据库数据变化

-- 授予必要的权限
GRANT SUBSCRIBE QUERY NOTIFICATIONS TO [YourUser];
GRANT SELECT ON [dbo].[MonitorTable] TO [YourUser];

监控 Service Broker 队列

-- 查看队列状态
SELECT * FROM sys.transmission_queue;
SELECT * FROM sys.conversation_endpoints;

重启监听

// 定期重启监听,避免内存泄漏
Timer restartTimer = new Timer(callback =>
{
    monitorManager.StopAll();
    Thread.Sleep(1000);
    monitorManager.StartAll();
}, null, TimeSpan.FromHours(1), TimeSpan.FromHours(1));

4.4 替代方案

如果 SqlDependency 不能满足需求,可以考虑:

  1. SqlTableDependency:第三方库,功能更强大
  2. CDC (Change Data Capture):SQL Server 内置功能
  3. 触发器 + 消息队列:传统方案,可靠性高
  4. 轮询方式:简单但效率较低

以上就是C#使用SqlDependency实现监听数据库数据变化的详细内容,更多关于C#监听数据库数据变化的资料请关注本站其它相关文章!

您可能感兴趣的文章:
  • C#监听txt文档获取新数据方式
  • C#实现监听串口数据的方法详解
  • C#通过FileSystemWatcher监听文件的实战技巧
  • 基于C#实现串口监听与TCP转发功能

热门栏目