documentation
Get Started Free
  • Get Started Free
  • Stream
      Confluent Cloud

      Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

      Confluent Platform

      An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

  • Connect
      Managed

      Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

      Self-Managed

      Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

  • Govern
      Managed

      Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

      Self-Managed

      Use self-managed Schema Registry and Stream Governance with Confluent Platform.

  • Process
      Managed

      Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

      Self-Managed

      Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Stream
Confluent Cloud

Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

Confluent Platform

An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

Connect
Managed

Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

Self-Managed

Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

Govern
Managed

Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

Self-Managed

Use self-managed Schema Registry and Stream Governance with Confluent Platform.

Process
Managed

Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

Self-Managed

Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Learn
Get Started Free
  1. Home
  2. Cloud
  3. Process Data with Confluent Cloud for Apache Flink
  4. Operate and Deploy Confluent Cloud for Apache Flink

CLOUD

  • Overview
  • Get Started
    • Overview
    • Quick Start
    • REST API Quick Start
    • Manage Schemas
    • Deploy Free Clusters
    • Tutorials and Examples
      • Overview
      • Example: Use Replicator to Copy Kafka Data to Cloud
      • Example: Create Fully-Managed Services
      • Example: Build an ETL Pipeline
  • Manage Kafka Clusters
    • Overview
    • Cluster Types
    • Manage Configuration Settings
    • Cloud Providers and Regions
    • Resilience
    • Copy Data with Cluster Linking
      • Overview
      • Quick Start
      • Use Cases and Tutorials
        • Share Data Across Clusters, Regions, and Clouds
        • Disaster Recovery and Failover
        • Create Hybrid Cloud and Bridge-to-Cloud Deployments
        • Use Tiered Separation of Critical Workloads
        • Migrate Data
        • Manage Audit Logs
      • Configure, Manage, and Monitor
        • Configure and Manage Cluster Links
        • Manage Mirror Topics
        • Manage Private Networking
        • Manage Security
        • Monitor Metrics
      • FAQ
      • Troubleshooting
    • Copy Data with Replicator
      • Quick Start
      • Use Replicator to Migrate Topics
    • Resize a Dedicated Cluster
    • Multi-Tenancy and Client Quotas for Dedicated Clusters
      • Overview
      • Quick Start
    • Create Cluster Using Terraform
    • Create Cluster Using Pulumi
    • Connect Confluent Platform and Cloud Environments
      • Overview
      • Connect Self-Managed Control Center to Cloud
      • Connect Self-Managed Clients to Cloud
      • Connect Self-Managed Connect to Cloud
      • Connect Self-Managed REST Proxy to Cloud
      • Connect Self-Managed ksqlDB to Cloud
      • Connect Self-Managed MQTT to Cloud
      • Connect Self-Managed Schema Registry to Cloud
      • Connect Self-Managed Streams to Cloud
      • Example: Autogenerate Self-Managed Component Configs for Cloud
  • Build Client Applications
    • Overview
    • Client Quick Start
    • Configure Clients
      • Architectural Considerations
      • Consumer
      • Producer
      • Configuration Properties
      • Connect Program
    • Test and Monitor a Client
      • Test
      • Monitor
      • Reset Offsets
    • Optimize and Tune
      • Overview
      • Configuration Settings
      • Throughput
      • Latency
      • Durability
      • Availability
      • Freight
    • Client Guides
      • Python
      • .NET Client
      • JavaScript Client
      • Go Client
      • C++ Client
      • Java Client
      • JMS Client
        • Overview
        • Development Guide
    • Kafka Client APIs
      • Python Client API
      • .NET Client API
      • JavaScript Client API
      • Go Client API
      • C++ Client API
      • Java Client API
      • JMS Client
        • Overview
        • Development Guide
    • Deprecated Client APIs
    • Client Examples
      • Overview
      • Python Client
      • .NET Client
      • JavaScript Client
      • Go Client
      • C++ Client
      • Java
      • Spring Boot
      • KafkaProducer
      • REST
      • Clojure
      • Groovy
      • Kafka Connect Datagen
      • kafkacat
      • Kotlin
      • Ruby
      • Rust
      • Scala
    • VS Code Extension
  • Build Kafka Streams Applications
    • Overview
    • Quick Start
    • Monitor Applications
    • ksqlDB
      • Create Stream Processing Apps with ksqlDB
      • Quick Start
      • Enable ksqlDB Integration with Schema Registry
      • ksqlDB Cluster API Quick Start
      • Monitor ksqlDB
      • Manage ksqlDB by using the CLI
      • Manage Connectors With ksqlDB
      • Develop ksqlDB Applications
      • Pull Queries
      • Grant Role-Based Access
      • Migrate ksqlDB Applications on Confluent Cloud
  • Manage Topics
    • Overview
    • Configuration Reference
    • Message Browser
    • Share Streams
      • Overview
      • Provide Stream Shares
      • Consume Stream Shares
    • Tableflow
      • Overview
      • Concepts
        • Overview
        • Storage
        • Schemas
        • Materialize Change Data Capture Streams
        • Billing
      • Get Started
        • Overview
        • Quick Start with Managed Storage
        • Quick Start Using Your Storage and AWS Glue
        • Quick Start with Delta Lake Tables
      • How-to Guides
        • Overview
        • Configure Storage
        • Integrate Catalogs
          • Overview
          • Integrate with AWS Glue Catalog
          • Integrate with Snowflake Open Catalog or Apache Polaris
        • Query Data
          • Overview
          • Query with AWS
          • Query with Flink
          • Query with Snowflake
          • Query with Trino
      • Operate
        • Overview
        • Configure
        • Grant Role-Based Access
        • Monitor
        • Use Private Networking
        • Supported Cloud Regions
  • Govern Data Streams
    • Overview
    • Stream Governance
      • Manage Governance Packages
      • Data Portal
      • Track Data with Stream Lineage
      • Manage Stream Catalog
        • Stream Catalog User Guide
        • REST API Catalog Usage and Examples Guide
        • GraphQL API Catalog Usage and Examples Guide
    • Manage Schemas
      • Overview
      • Manage Schemas
      • Delete Schemas and Manage Storage
      • Use Broker-Side Schema ID Validation
      • Schema Linking
      • Schema Registry Tutorial
    • Fundamentals
      • Key Concepts
      • Schema Evolution and Compatibility
      • Schema Formats
        • Serializers and Deserializers Overview
        • Avro
        • Protobuf
        • JSON Schema
      • Data Contracts
      • Security Considerations
      • Enable Private Networking
        • Enable Private Networking with Schema Registry PrivateLink
        • Enable Private Networking for Schema Registry with a Public Endpoint
    • Reference
      • Configure Clients to Schema Registry
      • Schema Registry REST API Usage Examples
      • Use AsyncAPI to Describe Topics and Schemas
      • Maven Plugin
    • FAQ
  • Connect to External Services
    • Overview
    • Install Connectors
      • ActiveMQ Source
      • AlloyDB Sink
      • Amazon CloudWatch Logs Source
      • Amazon CloudWatch Metrics Sink
      • Amazon DynamoDB CDC Source
      • Amazon DynamoDB Sink
      • Amazon Kinesis Source
      • Amazon Redshift Sink
      • Amazon S3 Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
      • Amazon S3 Source
      • Amazon SQS Source
      • AWS Lambda Sink
      • Azure Blob Storage Sink
        • Configure and Launch
        • Configure with Azure Egress Private Link Endpoints
      • Azure Blob Storage Source
      • Azure Cognitive Search Sink
      • Azure Cosmos DB Sink
      • Azure Cosmos DB Sink V2
      • Azure Cosmos DB Source
      • Azure Cosmos DB Source V2
      • Azure Data Lake Storage Gen2 Sink
      • Azure Event Hubs Source
      • Azure Functions Sink
      • Azure Log Analytics Sink
      • Azure Service Bus Source
      • Azure Synapse Analytics Sink
      • Databricks Delta Lake Sink
        • Set up Databricks Delta Lake (AWS) Sink Connector for Confluent Cloud
        • Configure and launch the connector
      • Datadog Metrics Sink
      • Datagen Source (development and testing)
      • Elasticsearch Service Sink
      • GitHub Source
      • Google BigQuery Sink [Deprecated]
      • Google BigQuery Sink V2
      • Google Cloud BigTable Sink
      • Google Cloud Dataproc Sink [Deprecated]
      • Google Cloud Functions Gen 2 Sink
      • Google Cloud Functions Sink [Deprecated]
      • Google Cloud Pub/Sub Source
      • Google Cloud Spanner Sink
      • Google Cloud Storage Sink
      • Google Cloud Storage Source
      • HTTP Sink
      • HTTP Sink V2
      • HTTP Source
      • HTTP Source V2
      • IBM MQ Source
      • InfluxDB 2 Sink
      • InfluxDB 2 Source
      • Jira Source
      • Microsoft SQL Server CDC Source (Debezium) [Deprecated]
      • Microsoft SQL Server CDC Source V2 (Debezium)
        • Configure and launch the connector
        • Backward incompatibility considerations
      • Microsoft SQL Server Sink (JDBC)
      • Microsoft SQL Server Source (JDBC)
      • MongoDB Atlas Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Egress Private Service Connect Endpoints
      • MongoDB Atlas Source
      • MQTT Sink
      • MQTT Source
      • MySQL CDC Source (Debezium) [Deprecated]
      • MySQL CDC Source V2 (Debezium)
        • Configure and Launch the connector
        • Backward Incompatible Changes
      • MySQL Sink (JDBC)
      • MySQL Source (JDBC)
      • New Relic Metrics Sink
      • OpenSearch Sink
      • Oracle XStream CDC Source
        • Overview
        • Configure and Launch the connector
        • Oracle Database Prerequisites
        • Change Events
        • Examples
      • Oracle CDC Source
        • Overview
        • Configure and Launch the connector
        • Horizontal Scaling
        • Oracle Database Prerequisites
        • SMT Examples
        • DDL Changes
        • Troubleshooting
      • Oracle Database Sink (JDBC)
      • Oracle Database Source (JDBC)
      • PagerDuty Sink [Deprecated]
      • Pinecone Sink
      • PostgreSQL CDC Source (Debezium) [Deprecated]
      • PostgreSQL CDC Source V2 (Debezium)
        • Configure and Launch the connector
        • Backward Incompatible Changes
      • PostgreSQL Sink (JDBC)
      • PostgreSQL Source (JDBC)
      • RabbitMQ Sink
      • RabbitMQ Source
      • Redis Sink
      • Salesforce Bulk API 2.0 Sink
      • Salesforce Bulk API 2.0 Source
      • Salesforce Bulk API Source
      • Salesforce CDC Source
      • Salesforce Platform Event Sink
      • Salesforce Platform Event Source
      • Salesforce PushTopic Source
      • Salesforce SObject Sink
      • ServiceNow Sink
      • ServiceNow Source [Legacy]
      • ServiceNow Source V2
      • SFTP Sink
      • SFTP Source
      • Snowflake Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Private Service Connect Endpoints
      • Snowflake Source
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Private Service Connect Endpoints
      • Solace Sink
      • Splunk Sink
      • Zendesk Source
    • Confluent Hub
      • Overview
      • Component Archive Specification
      • Contribute
    • Install Custom Plugins and Custom Connectors
      • Overview
      • Quick Start
      • Manage Custom Connectors
      • Limitations and Support
      • API and CLI
    • Manage CSFLE
    • Manage Provider Integration
      • Quick Start
      • Provider Integration APIs
    • Networking and DNS
      • Overview
      • AWS Egress PrivateLink Endpoints for First-Party Services
      • AWS Egress PrivateLink Endpoints for Self-Managed Services
      • AWS Egress PrivateLink Endpoints for Amazon RDS
      • Azure Egress Private Link Endpoints for First-Party Services
      • Azure Egress Private Link Endpoints for Self-Managed Services
      • Google Cloud Private Service Connect Endpoints for First-Party Services
    • Connect API Usage
    • Manage Public Egress IP Addresses
    • Sample Connector Output
    • Configure Single Message Transforms
    • View Connector Events
    • Interpret Connector Statuses
    • Manage Service Accounts
    • Configure RBAC
    • View Errors in the Dead Letter Queue
    • Connector Limits
    • Manage Offsets
    • Transforms List
      • Overview
      • Cast
      • Drop
      • DropHeaders
      • EventRouter
      • ExtractField
      • ExtractTopic
      • Filter (Kafka)
      • Filter (Confluent)
      • Flatten
      • GzipDecompress
      • HeaderFrom
      • HoistField
      • InsertField
      • InsertHeader
      • MaskField
      • MessageTimestampRouter
      • RegexRouter
      • ReplaceField (Kafka)
      • ReplaceField (Confluent)
      • SetSchemaMetadata
      • TimestampConverter
      • TimestampRouter
      • TombstoneHandler
      • TopicRegexRouter
      • ValueToKey
  • Integrate with Confluent Cloud
    • Overview
    • Reuse Connections with External Services
      • Overview
      • Supported External Services
      • Manage Connections
    • Integrate with Cloud Service Providers
      • Overview
      • Create an AWS Provider Integration
      • Manage an AWS Provider Integration
  • Process Data with Flink
    • Overview
    • Get Started
      • Overview
      • Quick Start with Cloud Console
      • Quick Start with SQL Shell in Confluent CLI
      • Quick Start with Java Table API
      • Quick Start with Python Table API
    • Concepts
      • Overview
      • Compute Pools
      • Autopilot
      • Statements
      • Determinism
      • Tables and Topics
      • Time and Watermarks
      • User-defined Functions
      • Delivery Guarantees and Latency
      • Schema and Statement Evolution
      • Snapshot Queries
      • Private Networking
      • Comparison with Apache Flink
      • Billing
    • How-To Guides
      • Overview
      • Aggregate a Stream in a Tumbling Window
      • Combine Streams and Track Most Recent Records
      • Compare Current and Previous Values in a Stream
      • Convert the Serialization Format of a Topic
      • Create a UDF
      • Deduplicate Rows in a Table
      • Enable UDF Logging
      • Handle Multiple Event Types
      • Mask Fields in a Table
      • Process Schemaless Events
      • Resolve Common SQL Query Problems
      • Run a Snapshot Query
      • Scan and Summarize Tables
      • Transform a Topic
      • View Time Series Data
    • Operate and Deploy
      • Overview
      • Manage Compute Pools
      • Monitor and Manage Statements
      • Grant Role-Based Access
      • Deploy a Statement with CI/CD
      • Generate a Flink API Key
      • REST API
      • Move SQL Statements to Production
      • Enable Private Networking
    • Flink Reference
      • Overview
      • SQL Syntax
      • DDL Statements
        • Statements Overview
        • ALTER MODEL
        • ALTER TABLE
        • ALTER VIEW
        • CREATE FUNCTION
        • CREATE MODEL
        • CREATE TABLE
        • CREATE VIEW
        • DESCRIBE
        • DROP MODEL
        • DROP TABLE
        • DROP VIEW
        • HINTS
        • EXPLAIN
        • RESET
        • SET
        • SHOW
        • USE CATALOG
        • USE database_name
      • DML Statements
        • Queries Overview
        • Deduplication
        • Group Aggregation
        • INSERT INTO FROM SELECT
        • INSERT VALUES
        • Joins
        • LIMIT
        • Pattern Recognition
        • ORDER BY
        • OVER Aggregation
        • SELECT
        • Set Logic
        • EXECUTE STATEMENT SET
        • Top-N
        • Window Aggregation
        • Window Deduplication
        • Window Join
        • Window Top-N
        • Window Table-Valued Function
        • WITH
      • Functions
        • Flink SQL Functions
        • Aggregate
        • Collections
        • Comparison
        • Conditional
        • Datetime
        • Hashing
        • JSON
        • AI Model Inference
        • Numeric
        • String
        • Table API
      • Data Types
      • Data Type Mappings
      • Time Zone
      • Keywords
      • Information Schema
      • Example Streams
      • Supported Cloud Regions
      • SQL Examples
      • Table API
      • CLI Reference
    • Get Help
  • Build AI with Flink
    • Overview
    • Run an AI Model
    • Create an Embedding
  • Manage Networking
    • Overview
    • Networking on AWS
      • Overview
      • Public Networking on AWS
      • Confluent Cloud Network on AWS
      • PrivateLink on AWS
        • Overview
        • Inbound PrivateLink for Dedicated Clusters
        • Inbound PrivateLink for Serverless Products
        • Outbound PrivateLink for Dedicated Clusters
        • Outbound PrivateLink for Serverless Products
      • VPC Peering on AWS
      • Transit Gateway on AWS
      • Private Network Interface on AWS
    • Networking on Azure
      • Overview
      • Public Networking on Azure
      • Confluent Cloud Network on Azure
      • Private Link on Azure
        • Overview
        • Inbound Private Link for Dedicated Clusters
        • Inbound Private Link for Serverless Products
        • Outbound Private Link for Dedicated Clusters
        • Outbound Private Link for Serverless Products
      • VNet Peering on Azure
    • Networking on Google Cloud
      • Overview
      • Public Networking on Google Cloud
      • Confluent Cloud Network on Google Cloud
      • Private Service Connect on Google Cloud
        • Overview
        • Inbound Private Service Connect for Dedicated Clusters
        • Inbound Private Service Connect for Serverless Products
        • Outbound Private Service Connect for Dedicated Clusters
      • VPC Peering on Google Cloud
    • Connectivity for Confluent Resources
      • Overview
      • Public Egress IP Address for Connectors and Cluster Linking
      • Cluster Linking using AWS PrivateLink
      • Follower Fetching using AWS VPC Peering
    • Use the Confluent Cloud Console with Private Networking
    • Test Connectivity
  • Log and Monitor
    • Metrics
    • Manage Notifications
    • Monitor Consumer Lag
    • Monitor Dedicated Clusters
      • Monitor Cluster Load
      • Manage Performance and Expansion
      • Track Usage by Team
    • Observability for Kafka Clients to Confluent Cloud
  • Manage Security
    • Overview
    • Manage Authentication
      • Overview
      • Manage User Identities
        • Overview
        • Manage User Accounts
          • Overview
          • Authentication Security Protections
          • Manage Local User Accounts
          • Multi-factor Authentication
          • Manage SSO User Accounts
        • Manage User Identity Providers
          • Overview
          • Use Single Sign-On (SSO)
          • Manage SAML Single Sign-On (SSO)
          • Manage Azure Marketplace SSO
          • Just-in-time User Provisioning
          • Group Mapping
            • Overview
            • Enable Group Mapping
            • Manage Group Mappings
            • Troubleshooting
            • Best Practices
          • Manage Trusted Domains
          • Manage SSO provider
          • Troubleshoot SSO
      • Manage Workload Identities
        • Overview
        • Manage Workload Identities
        • Manage Service Accounts and API Keys
          • Overview
          • Manage Service Accounts
          • Manage API Keys
            • Overview
            • Manage API keys
            • Best Practices
            • Troubleshoot
        • Manage OAuth/OIDC Identity Providers
          • Overview
          • Add an OIDC identity provider
          • Use OAuth identity pools and filters
          • Manage identity provider configurations
          • Manage the JWKS URI
          • Configure OAuth clients
          • Access Kafka REST APIs
          • Use Confluent STS tokens with REST APIs
          • Best Practices
        • Manage mTLS Identity Providers
          • Overview
          • Configure mTLS
          • Manage Certificate Authorities
          • Manage Certificate Identity Pools
          • Create CEL Filters for mTLS
          • Create JSON payloads for mTLS
          • Manage Certificate Revocation
          • Troubleshoot mTLS Issues
    • Control Access
      • Overview
      • Resource Hierarchy
        • Overview
        • Organizations
          • Overview
          • Manage Multiple Organizations
        • Environments
        • Confluent Resource Names (CRNs)
      • Manage Role-Based Access Control
        • Overview
        • Predefined RBAC Roles
        • Manage Role Bindings
        • Use ACLs with RBAC
      • Manage IP Filtering
        • Overview
        • Manage IP Groups
        • Manage IP Filters
        • Best Practices
      • Manage Access Control Lists
      • Use the Confluent CLI with multiple credentials on Confluent Cloud
    • Encrypt and Protect Data
      • Overview
      • Manage Data in Transit With TLS
      • Encrypt Data at Rest Using Self-managed Encryption Keys
        • Overview
        • Use Self-managed Encryption Keys on AWS
        • Use Self-managed Encryption Keys on Azure
        • Use Self-managed Encryption Keys on Google Cloud
        • Use Pre-BYOK-API-V1 Self-managed Encryption Keys
        • Use Confluent CLI for Self-managed Encryption Keys
        • Use BYOK API for Self-managed Encryption Keys
        • Revoke Access to Data at Rest
        • Best Practices
      • Encrypt Sensitive Data Using Client-side Field Level Encryption
        • Overview
        • Manage CSFLE using Confluent Cloud Console
        • Use Client-side Field Level Encryption
        • Configuration Settings
        • Manage Encryption Keys
        • Quick Start
        • Implement a Custom KMS Driver
        • Process Encrypted Data with Apache Flink
        • Code examples
        • Troubleshoot
        • FAQ
    • Monitor Activity
      • Concepts
      • Understand Audit Log Records
      • Audit Log Event Schema
      • Auditable Event Methods
        • Connector
        • Custom connector plugin
        • Flink
        • Flink Authentication and Authorization
        • IP Filter Authorization
        • Kafka Cluster Authentication and Authorization
        • Kafka Cluster Management
        • ksqlDB Cluster Authentication and Authorization
        • Networking
        • Notifications Service
        • OAuth/OIDC Identity Provider and Identity Pool
        • Organization
        • Role-based Access Control (RBAC)
        • Schema Registry Authentication and Authorization
        • Schema Registry Management and Operations
        • Tableflow Data Plane
        • Tableflow Control Plane
      • Access and Consume Audit Log Records
      • Retain Audit Logs
      • Best Practices
      • Troubleshoot
    • Access Management Tutorial
  • Manage Billing
    • Overview
    • Marketplace Consumption Metrics
    • Use AWS Pay As You Go
    • Use AWS Commits
    • Use Azure Pay As You Go
    • Use Azure Commits
    • Use Professional Services on Azure
    • Use Google Cloud Pay As You Go
    • Use Google Cloud Commits
    • Use Professional Services on Google Cloud
    • Marketplace Organization Suspension and Deactivation
  • Manage Service Quotas
    • Overview
    • Service Quotas
    • View Service Quotas using Confluent CLI
    • Service Quotas API
  • APIs
    • Confluent Cloud APIs
    • Kafka Admin and Produce REST APIs
    • Connect API
    • Client APIs
      • C++ Client API
      • Python Client API
      • Go Client API
      • .NET Client API
    • Provider Integration API
    • Flink REST API
    • Metrics API
    • Stream Catalog REST API Usage
    • GraphQL API
    • Service Quotas API
  • Confluent CLI
  • Release Notes & FAQ
    • Release Notes
    • FAQ
    • Upgrade Policy
    • Compliance
    • Generate a HAR file for Troubleshooting
    • Confluent AI Assistant
  • Support
  • Glossary

Monitor and Manage Flink SQL Statements in Confluent Cloud for Apache Flink¶

You start a stream-processing app on Confluent Cloud for Apache Flink® by running a SQL statement. Once a statement is running, you can monitor its progress by using the Confluent Cloud Console. Also, you can set up integrations with monitoring services like Prometheus and Datadog.

View and monitor statements in Cloud Console¶

Cloud Console shows details about your statements on the Flink page.

  1. If you don’t have running statements currently, run a SQL query like INSERT INTO FROM SELECT in the Flink SQL shell or in a workspace.

  2. Log in to the Confluent Cloud Console.

  3. Navigate to the Environments page.

  4. Click the tile that has the environment where your Flink compute pools are provisioned.

  5. Click Flink, and in the Flink page, click Flink statements.

    The Statements list opens.

  6. You can use the Filter options on the page to identify the statements you want to view.

  7. The following information is available in the Flink statements table to help you monitor your statements.

    Field Description
    Flink Statement Name The name of the statement. The name is populated automatically when a statement is submitted. You can set the name by using the SET command.
    Status

    The statement status Represents what is currently happening with the statement. These are the status values:

    • Pending: The statement has been submitted and Flink is preparing to start running the statement.
    • Running: Flink is actively running the Flink statement.
    • Completed: The statement has completed all of its work.
    • Deleting: The statement is being deleted.
    • Failed: The statement has encountered an error and is no longer running.
    • Degraded: The statement appears unhealthy, for example, no transactions have been committed for a long time, or the statement has frequently restarted recently.
    • Stopping: The statement is about to be stopped.
    • Stopped: The statement has been stopped and is no longer running.
    Statement Type The type of SQL function that is used in the statement.
    Created Indicates when the statement started running. If you stop and resume the statement, the Created date shows the date when the statement was first submitted.
    Messages Behind The Consumer Lag of the statement. You are also shown an indicator of whether the back pressure is increasing, decreasing, or if the back pressure is being maintained at a stable rate. Ideally, the Messages Behind metric should be as close to zero as possible. A low, close-to-zero consumer lag is the best indicator that your statement is running smoothly and keeping up with all of its inputs. A growing consumer lag indicates there is a problem.
    Messages in The count of Messages in per minute which represents the rate at which records are read. You also have a watermark for the messages read. The watermark displayed in the Flink statements table is the minimum watermark from the source(s) in the query.
    Messages out The count of Messages out per minute which represents the rate at which records are written. You also have a watermark for the messages written. The watermark displayed in the Flink statements table is the minimum watermark from the sink(s) in the query.
    Account The name of the user account or service account the statement is running with.
  8. When you click on a particular statement a detailed side panel opens up. The panel provides detailed information on the statement at a more granular level, showing how messages are being read from sources and written to sinks. The watermarks for each individual source and sink table are shown in this panel along with the statement’s catalog, database, local time zone, and Scaling status .

    The SQL Content section shows the code used to generate the statement.

    The panel also contains visual interactive graphs of statement’s performance over time. There are charts for # Messages behind, Messages in per minute, and Messages out per minute.

Manage statements in Cloud Console¶

Cloud Console gives you actions to manage your statements on the Flink page.

  1. In the statement list, click the checkbox next to one of your statements to select it.

  2. Click Actions.

    A menu opens, showing options for managing the statement’s status. You can select Stop statement, Resume statement, or Delete statement.

Flink metrics integrations¶

Confluent Cloud for Apache Flink supports metrics integrations with services like Prometheus and Datadog.

  1. If you don’t have running statements currently, run a SQL query like INSERT INTO FROM SELECT in the Flink SQL shell or in a workspace.

  2. Log in to the Confluent Cloud Console.

  3. Open the Administration menu (admin-menu-icon) and select Metrics to open the Metrics integration page.

  4. In the Explore available metrics section, click the Metric dropdown.

  5. Scroll until you find the Flink compute pool and Flink statement metrics, for example, Messages behind. This list doesn’t include all available metrics. For a full list of available metrics, see Metrics API Reference.

  6. Click the Resource dropdown and select the corresponding compute pool or statement that you want to monitor.

    A graph showing the most recent data for your selected Flink metric displays.

  7. Click New integration to export your metrics to a monitoring service. For more information, see Integrate with third-party monitoring.

Error handling and recovery¶

Confluent Cloud for Apache Flink classifies exceptions that occur during the runtime of a statement into two categories: USER and SYSTEM exceptions.

  • USER: Exceptions are classified as USER if they fall into the user’s responsibility. Examples includes deserialization or arithmetic exceptions. Usually, the root cause is related to the data or the query. USER exceptions are forwarded to the user via the Statement.status.statusDetails.
  • SYSTEM: Exceptions are classified as SYSTEM if they fall into Confluent’s responsibility. Examples include exceptions during checkpointing or networking. Usually, the root cause is related to the infrastructure.

Furthermore, Confluent Cloud for Apache Flink classifies exceptions as “recoverable” (or “transient”) or “non-recoverable” (or “permanent”). SYSTEM exceptions are always classified as recoverable. Usually, USER exceptions are classified as non-recoverable. For example, a division-by-zero or a deserialization exception can’t be solved by restarting the underlying Flink job, because the same input message is replayed and leads to the same exception again.

Some USER exceptions are classified as recoverable, for example, the deletion of a statement’s input or output topic, or the deletion of the access rights to these topics.

If a non-recoverable exception occurs, the Flink statement moves into the FAILED state, and the underlying Flink job is cancelled. FAILED statements do not consume any CFUs. FAILED statements can be resumed, like STOPPED statements with exactly-once semantics, but in most cases, some change to the query or data is required so that the statement doesn’t transition immediately into the FAILED state again. For more information on the available options for evolving statements, see Schema and Statement Evolution.

Note

Confluent is actively working on additional options for handling non-recoverable exceptions, like skipping the offending message or sending it to a dead-letter-queue automatically. If you’re interested in providing feedback or feature requests, contact Support or your account manager.

Degraded statements¶

If a recoverable exception occurs, then the statement stays in the RUNNING state and the underlying Flink job is restarted. If the job is restarted repeatedly or is not recovering within 10 minutes, then the statement moves to the DEGRADED state. DEGRADED statements will continue to consume CFUs.

If the DEGRADED state is caused by a USER exception, then the error message is shown in Statement.status.statusDetails.

If no exception is shown in the Statement.status.statusDetails, then the DEGRADED state is caused by a SYSTEM exception. In this case, contact Support.

Notifications¶

Confluent Cloud for Apache Flink integrates with Notifications for Confluent Cloud. The following notifications are available for Flink statements. They apply only to background Data Manipulation Language (DML) statements like INSERT INTO, EXECUTE STATEMENT SET, or CREATE TABLE AS.

  • Statement failure: This notification is triggered when a statement transitions from RUNNING to FAILED. A statement transitions to FAILED on exceptions that Confluent classifies as USER, as opposed to SYSTEM exceptions.
  • Statement degraded: This notification triggered when a statement transitions from RUNNING to DEGRADED.
  • Statement returned to pending: This notification is triggered when a statement transitions from RUNNING to PENDING. This may happen if the compute pool doesn’t have enough resources to keep all statements running with their minimum resource requirements.
  • Statement stuck in pending: This notification is triggered when a newly submitted statement stays in PENDING for a long time. The time period for a statement to be considered stuck in the PENDING state depends on the cloud provider that’s running your Flink statements:
    • AWS: 10 minutes
    • Azure: 30 minutes
    • Google Cloud: 10 minutes
  • Statement auto-stopped: This notification is triggered when a statement moves into STOPPED because the compute pool it is using was deleted by a user.

Best practices for alerting¶

Use the Metrics API and Notifications for Confluent Cloud to monitor your compute pools and statements over time. You should monitor and configure alerts for the following conditions:

  • Per compute pool
    • Alert on exhausted compute pools by comparing the current CFUs (io.confluent.flink/compute_pool_utilization/current_cfus) to the maximum CFUs of the pool (io.confluent.flink/compute_pool_utilization/cfu_limit).
    • Flink statement stuck in pending and Flink Statement returned to pending notifications also indicate compute-pool exhaustion.
  • Per statement
    • Alert on statement failures (see Notifications)
    • Alert on Statement degradation (see Notifications)
    • Alert on Statements returning pending (see Notifications)
    • Alert on a increase of “Messages Behind”/”Consumer Lag” (metric name: io.confluent.flink/pending_records) over an extended period of time, for example > 10 minutes; your mileage may vary. Note that Confluent Cloud for Apache Flink does not appear as a consumer in the regular consumer lag monitoring feature in Confluent Cloud, because it uses the assign() method.
    • (Optional) Alert on an increase of the difference between the output (io.confluent.flink/current_output_watermark_ms) and input watermark (io.confluent.flink/current_input_watermark_ms). The input watermark corresponds to the time up to which the input data is complete, and the output watermark corresponds to the time up to which the output data is complete. This difference can be considered as a measure of the amount of data that’s currently “in-flight”. Depending on the logic of the statement, different patterns are expected. For example, for a tumbling event-time window, expect an increasing difference until the window is fired, at which point the difference drops to zero and starts increasing again.

Related content¶

  • Video: How to work with a paused stream
  • Statements
  • Queries
  • Flink SQL Shell Quick Start
  • Flink SQL Shell

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.

Was this doc page helpful?

Give us feedback

Do you still need help?

Confluent support portal Ask the community
Thank you. We'll be in touch!
Be the first to get updates and new content

By clicking "SIGN UP" you agree that your personal data will be processed in accordance with our Privacy Policy.

  • Confluent
  • About
  • Careers
  • Contact
  • Professional Services
  • Product
  • Confluent Cloud
  • Confluent Platform
  • Connectors
  • Flink
  • Stream Governance
  • Developer
  • Free Courses
  • Tutorials
  • Event Streaming Patterns
  • Documentation
  • Blog
  • Podcast
  • Community
  • Forum
  • Meetups
  • Kafka Summit
  • Catalysts
Terms & Conditions Privacy Policy Do Not Sell My Information Modern Slavery Policy Cookie Settings Feedback

Copyright © Confluent, Inc. 2014- Apache®️, Apache Kafka®️, Kafka®️, Apache Flink®️, Flink®️, Apache Iceberg®️, Iceberg®️ and associated open source project names are trademarks of the Apache Software Foundation

On this page: