https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/
Flink 1.10
Clusters & Deployment
FileSystems should be loaded via Plugin Architecture (FLINK-11956)
s3-hadoop and s3-presto filesystems do no longer use class relocations and need to be loaded through plugins but now seamlessly integrate with all credential providers. Other filesystems are strongly recommended to be only used as plugins as we will continue to remove relocations.
Flink Client respects Classloading Policy (FLINK-13749)
The Flink client now also respects the configured classloading policy, i.e., parent-first
or child-first
classloading. Previously, only cluster components such as the job manager or task manager supported this setting. This does mean that users might get different behaviour in their programs, in which case they should configure the classloading policy explicitly to use parent-first
classloading, which was the previous (hard-coded) behaviour.
Enable spreading out Tasks evenly across all TaskManagers (FLINK-12122)
When FLIP-6 was rolled out with Flink 1.5.0, we changed how slots are allocated from TaskManagers (TMs). Instead of evenly allocating the slots from all registered TMs, we had the tendency to exhaust a TM before using another one. To use a scheduling strategy that is more similar to the pre-FLIP-6 behaviour, where Flink tries to spread out the workload across all currently available TMs, one can set cluster.evenly-spread-out-slots: true
in the flink-conf.yaml
.
Directory Structure Change for highly available Artifacts (FLINK-13633)
All highly available artifacts stored by Flink will now be stored under HA_STORAGE_DIR/HA_CLUSTER_ID
with HA_STORAGE_DIR
configured by high-availability.storageDir
and HA_CLUSTER_ID
configured by high-availability.cluster-id
.
Resources and JARs shipped via –yarnship will be ordered in the Classpath (FLINK-13127)
When using the --yarnship
command line option, resource directories and jar files will be added to the classpath in lexicographical order with resources directories appearing first.
Removal of –yn/–yarncontainer Command Line Options (FLINK-12362)
The Flink CLI no longer supports the deprecated command line options -yn/--yarncontainer
, which were used to specify the number of containers to start on YARN. This option has been deprecated since the introduction of FLIP-6. All Flink users are advised to remove this command line option.
Removal of –yst/–yarnstreaming Command Line Options (FLINK-14957)
The Flink CLI no longer supports the deprecated command line options -yst/--yarnstreaming
, which were used to disable eager pre-allocation of memory. All Flink users are advised to remove this command line option.
Mesos Integration will reject expired Offers faster (FLINK-14029)
Flink’s Mesos integration now rejects all expired offers instead of only 4. This improves the situation where Fenzo holds on to a lot of expired offers without giving them back to the Mesos resource manager.
Scheduler Rearchitecture (FLINK-14651)
Flink’s scheduler was refactored with the goal of making scheduling strategies customizable in the future. Using the legacy scheduler is discouraged as it will be removed in a future release. However, users that experience issues related to scheduling can fallback to the legacy scheduler by setting jobmanager.scheduler
to legacy
in their flink-conf.yaml
for the time being. Note, however, that using the legacy scheduler with the Pipelined Region Failover Strategy enabled has the following caveats:
- Exceptions that caused a job to restart will not be shown on the job overview page of the Web UI (FLINK-15917). However, exceptions that cause a job to fail (e.g., when all restart attempts exhausted) will still be shown.
- The
uptime
metric will not be reset after restarting a job due to task failure (FLINK-15918).
Note that in the default flink-conf.yaml
, the Pipelined Region Failover Strategy is already enabled. That is, users that want to use the legacy scheduler and cannot accept aforementioned caveats should make sure that jobmanager.execution.failover-strategy
is set to full
or not set at all.
Java 11 Support (FLINK-10725)
Beginning from this release, Flink can be compiled and run with Java 11. All Java 8 artifacts can be also used with Java 11. This means that users that want to run Flink with Java 11 do not have to compile Flink themselves.
When starting Flink with Java 11, the following warnings may be logged:
1 | WARNING: An illegal reflective access operation has occurred |
These warnings are considered harmless and will be addressed in future Flink releases.
Lastly, note that the connectors for Cassandra, Hive, HBase, and Kafka 0.8–0.11 have not been tested with Java 11 because the respective projects did not provide Java 11 support at the time of the Flink 1.10.0 release.
Memory Management
New Task Executor Memory Model (FLINK-13980)
With FLIP-49, a new memory model has been introduced for the task executor. New configuration options have been introduced to control the memory consumption of the task executor process. This affects all types of deployments: standalone, YARN, Mesos, and the new active Kubernetes integration. The memory model of the job manager process has not been changed yet but it is planned to be updated as well.
If you try to reuse your previous Flink configuration without any adjustments, the new memory model can result in differently computed memory parameters for the JVM and, thus, performance changes.
Please, check the user documentation for more details.
Deprecation and breaking changes
The following options have been removed and have no effect anymore:
Deprecated/removed config option | Note |
---|---|
taskmanager.memory.fraction | Check also the description of the new option taskmanager.memory.managed.fraction but it has different semantics and the value of the deprecated option usually has to be adjusted |
taskmanager.memory.off-heap | Support for on-heap managed memory has been removed, leaving off-heap managed memory as the only possibility |
taskmanager.memory.preallocate | Pre-allocation is no longer supported, and managed memory is always allocated lazily |
The following options, if used, are interpreted as other new options in order to maintain backwards compatibility where it makes sense:
Deprecated config option | Interpreted as |
---|---|
taskmanager.heap.size | taskmanager.memory.flink.size for standalone deploymenttaskmanager.memory.process.size for containerized deployments |
taskmanager.memory.size | taskmanager.memory.managed.size |
taskmanager.network.memory.min | taskmanager.memory.network.min |
taskmanager.network.memory.max | taskmanager.memory.network.max |
taskmanager.network.memory.fraction | taskmanager.memory.network.fraction |
The container cut-off configuration options, containerized.heap-cutoff-ratio
and containerized.heap-cutoff-min
, have no effect for task executor processes anymore but they still have the same semantics for the JobManager process.
RocksDB State Backend Memory Control (FLINK-7289)
Together with the introduction of the new Task Executor Memory Model, the memory consumption of the RocksDB state backend will be limited by the total amount of Flink Managed Memory, which can be configured via taskmanager.memory.managed.size
or taskmanager.memory.managed.fraction
. Furthermore, users can tune RocksDB’s write/read memory ratio (state.backend.rocksdb.memory.write-buffer-ratio
, by default 0.5
) and the reserved memory fraction for indices/filters (state.backend.rocksdb.memory.high-prio-pool-ratio
, by default 0.1
). More details and advanced configuration options can be found in the Flink user documentation.
Fine-grained Operator Resource Management (FLINK-14058)
Config options table.exec.resource.external-buffer-memory
, table.exec.resource.hash-agg.memory
, table.exec.resource.hash-join.memory
, and table.exec.resource.sort.memory
have been deprecated. Beginning from Flink 1.10, these config options are interpreted as weight hints instead of absolute memory requirements. Flink choses sensible default weight hints which should not be adjustment by users.
Table API & SQL
Rename of ANY Type to RAW Type (FLINK-14904)
The identifier raw
is a reserved keyword now and must be escaped with backticks when used as a SQL field or function name.
Rename of Table Connector Properties (FLINK-14649)
Some indexed properties for table connectors have been flattened and renamed for a better user experience when writing DDL statements. This affects the Kafka Connector properties connector.properties
and connector.specific-offsets
. Furthermore, the Elasticsearch Connector property connector.hosts
is affected. The aforementioned, old properties are deprecated and will be removed in future versions. Please consult the Table Connectors documentation for the new property names.
Methods for interacting with temporary Tables & Views (FLINK-14490)
Methods registerTable()
/registerDataStream()
/registerDataSet()
have been deprecated in favor of createTemporaryView()
, which better adheres to the corresponding SQL term.
The scan()
method has been deprecated in favor of the from()
method.
Methods registerTableSource()
/registerTableSink()
become deprecated in favor of ConnectTableDescriptor#createTemporaryTable()
. The ConnectTableDescriptor
approach expects only a set of string properties as a description of a TableSource or TableSink instead of an instance of a class in case of the deprecated methods. This in return makes it possible to reliably store those definitions in catalogs.
Method insertInto(String path, String... pathContinued)
has been removed in favor of in insertInto(String path)
.
All the newly introduced methods accept a String identifier which will be parsed into a 3-part identifier. The parser supports quoting the identifier. It also requires escaping any reserved SQL keywords.
Removal of ExternalCatalog API (FLINK-13697)
The deprecated ExternalCatalog
API has been dropped. This includes:
ExternalCatalog
(and all dependent classes, e.g.,ExternalTable
)SchematicDescriptor
,MetadataDescriptor
,StatisticsDescriptor
Users are advised to use the new Catalog API.
Configuration
Introduction of Type Information for ConfigOptions (FLINK-14493)
Getters of org.apache.flink.configuration.Configuration
throw IllegalArgumentException
now if the configured value cannot be parsed into the required type. In previous Flink releases the default value was returned in such cases.
Increase of default Restart Delay (FLINK-13884)
The default restart delay for all shipped restart strategies, i.e., fixed-delay
and failure-rate
, has been raised to 1 s (from originally 0 s).
Simplification of Cluster-Level Restart Strategy Configuration (FLINK-13921)
Previously, if the user had set restart-strategy.fixed-delay.attempts
or restart-strategy.fixed-delay.delay
but had not configured the option restart-strategy
, the cluster-level restart strategy would have been fixed-delay
. Now the cluster-level restart strategy is only determined by the config option restart-strategy
and whether checkpointing is enabled. See “Task Failure Recovery” for details.
Disable memory-mapped BoundedBlockingSubpartition by default (FLINK-14952)
The config option taskmanager.network.bounded-blocking-subpartition-type
has been renamed to taskmanager.network.blocking-shuffle.type
. Moreover, the default value of the aforementioned config option has been changed from auto
to file
. The reason is that TaskManagers running on YARN with auto
, could easily exceed the memory budget of their container, due to incorrectly accounted memory-mapped files memory usage.
Removal of non-credit-based Network Flow Control (FLINK-14516)
The non-credit-based network flow control code was removed alongside of the configuration option taskmanager.network.credit-model
. Flink will now always use credit-based flow control.
Removal of HighAvailabilityOptions#HA_JOB_DELAY (FLINK-13885)
The configuration option high-availability.job.delay
has been removed since it is no longer used.
State
Enable Background Cleanup of State with TTL by default (FLINK-14898)
Background cleanup of expired state with TTL is activated by default now for all state backends shipped with Flink. Note that the RocksDB state backend implements background cleanup by employing a compaction filter. This has the caveat that even if a Flink job does not store state with TTL, a minor performance penalty during compaction is incurred. Users that experience noticeable performance degradation during RocksDB compaction can disable the TTL compaction filter by setting the config option state.backend.rocksdb.ttl.compaction.filter.enabled
to false
.
Deprecation of StateTtlConfig#Builder#cleanupInBackground() (FLINK-15606)
StateTtlConfig#Builder#cleanupInBackground()
has been deprecated because the background cleanup of state with TTL is already enabled by default.
Timers are stored in RocksDB by default when using RocksDBStateBackend (FLINK-15637)
The default timer store has been changed from Heap to RocksDB for the RocksDB state backend to support asynchronous snapshots for timer state and better scalability, with less than 5% performance cost. Users that find the performance decline critical can set state.backend.rocksdb.timer-service.factory
to HEAP
in flink-conf.yaml
to restore the old behavior.
Removal of StateTtlConfig#TimeCharacteristic (FLINK-15605)
StateTtlConfig#TimeCharacteristic
has been removed in favor of StateTtlConfig#TtlTimeCharacteristic
.
New efficient Method to check if MapState is empty (FLINK-13034)
We have added a new method MapState#isEmpty()
which enables users to check whether a map state is empty. The new method is 40% faster than mapState.keys().iterator().hasNext()
when using the RocksDB state backend.
RocksDB Upgrade (FLINK-14483)
We have again released our own RocksDB build (FRocksDB) which is based on RocksDB version 5.17.2 with several feature backports for the Write Buffer Manager to enable limiting RocksDB’s memory usage. The decision to release our own RocksDB build was made because later RocksDB versions suffer from a performance regression under certain workloads.
RocksDB Logging disabled by default (FLINK-15068)
Logging in RocksDB (e.g., logging related to flush, compaction, memtable creation, etc.) has been disabled by default to prevent disk space from being filled up unexpectedly. Users that need to enable logging should implement their own RocksDBOptionsFactory
that creates DBOptions
instances with InfoLogLevel
set to INFO_LEVEL
.
Improved RocksDB Savepoint Recovery (FLINK-12785)
In previous Flink releases users may encounter an OutOfMemoryError
when restoring from a RocksDB savepoint containing large KV pairs. For that reason we introduced a configurable memory limit in the RocksDBWriteBatchWrapper
with a default value of 2 MB. RocksDB’s WriteBatch will flush before the consumed memory limit is reached. If needed, the limit can be tuned via the state.backend.rocksdb.write-batch-size
config option in flink-conf.yaml
.
PyFlink
Python 2 Support dropped (FLINK-14469)
Beginning from this release, PyFlink does not support Python 2. This is because Python 2 has reached end of life on January 1, 2020, and several third-party projects that PyFlink depends on are also dropping Python 2 support.
Monitoring
InfluxdbReporter skips Inf and NaN (FLINK-12147)
The InfluxdbReporter
now silently skips values that are unsupported by InfluxDB, such as Double.POSITIVE_INFINITY
, Double.NEGATIVE_INFINITY
, Double.NaN
, etc.
Connectors
Kinesis Connector License Change (FLINK-12847)
flink-connector-kinesis is now licensed under the Apache License, Version 2.0, and its artifacts will be deployed to Maven central as part of the Flink releases. Users no longer need to build the Kinesis connector from source themselves.
Miscellaneous Interface Changes
ExecutionConfig#getGlobalJobParameters() cannot return null anymore (FLINK-9787)
ExecutionConfig#getGlobalJobParameters
has been changed to never return null
. Conversely, ExecutionConfig#setGlobalJobParameters(GlobalJobParameters)
will not accept null
values anymore.
Change of contract in MasterTriggerRestoreHook interface (FLINK-14344)
Implementations of MasterTriggerRestoreHook#triggerCheckpoint(long, long, Executor)
must be non-blocking now. Any blocking operation should be executed asynchronously, e.g., using the given executor.
Client-/ and Server-Side Separation of HA Services (FLINK-13750)
The HighAvailabilityServices
have been split up into client-side ClientHighAvailabilityServices
and cluster-side HighAvailabilityServices
. When implementing custom high availability services, users should follow this separation by overriding the factory method HighAvailabilityServicesFactory#createClientHAServices(Configuration)
. Moreover, HighAvailabilityServices#getWebMonitorLeaderRetriever()
should no longer be implemented since it has been deprecated.
Deprecation of HighAvailabilityServices#getWebMonitorLeaderElectionService() (FLINK-13977)
Implementations of HighAvailabilityServices
should implement HighAvailabilityServices#getClusterRestEndpointLeaderElectionService()
instead of HighAvailabilityServices#getWebMonitorLeaderElectionService()
.
Interface Change in LeaderElectionService (FLINK-14287)
LeaderElectionService#confirmLeadership(UUID, String)
now takes an additional second argument, which is the address under which the leader will be reachable. All custom LeaderElectionService
implementations will need to be updated accordingly.
Deprecation of Checkpoint Lock (FLINK-14857)
The method org.apache.flink.streaming.runtime.tasks.StreamTask#getCheckpointLock()
is deprecated now. Users should use MailboxExecutor
to run actions that require synchronization with the task’s thread (e.g. collecting output produced by an external thread). The methods MailboxExecutor#yield()
or MailboxExecutor#tryYield()
can be used for actions that need to give up control to other actions temporarily, e.g., if the current operator is blocked. The MailboxExecutor
can be accessed by using YieldingOperatorFactory
(see AsyncWaitOperator
for an example usage).
Deprecation of OptionsFactory and ConfigurableOptionsFactory interfaces (FLINK-14926)
Interfaces OptionsFactory
and ConfigurableOptionsFactory
have been deprecated in favor of RocksDBOptionsFactory
and ConfigurableRocksDBOptionsFactory
, respectively.
Flink 1.9
Known shortcomings or limitations for new features
New Table / SQL Blink planner
Flink 1.9.0 provides support for two planners for the Table API, namely Flink’s original planner and the new Blink planner. The original planner maintains same behaviour as previous releases, while the new Blink planner is still considered experimental and has the following limitations:
- The Blink planner can not be used with
BatchTableEnvironment
, and therefore Table programs ran with the planner can not be transformed toDataSet
programs. This is by design and will also not be supported in the future. Therefore, if you want to run a batch job with the Blink planner, please use the newTableEnvironment
. For streaming jobs, bothStreamTableEnvironment
andTableEnvironment
works. - Implementations of
StreamTableSink
should implement theconsumeDataStream
method instead ofemitDataStream
if it is used with the Blink planner. Both methods work with the original planner. This is by design to make the returnedDataStreamSink
accessible for the planner. - Due to a bug with how transformations are not being cleared on execution,
TableEnvironment
instances should not be reused across multiple SQL statements when using the Blink planner. Table.flatAggregate
is not supported- Session and count windows are not supported when running batch jobs.
- The Blink planner only supports the new
Catalog
API, and does not supportExternalCatalog
which is now deprecated.
Related issues:
- FLINK-13708: Transformations should be cleared because a table environment could execute multiple job
- FLINK-13473: Add GroupWindowed FlatAggregate support to stream Table API (Blink planner), i.e, align with Flink planner
- FLINK-13735: Support session window with Blink planner in batch mode
- FLINK-13736: Support count window with Blink planner in batch mode
SQL DDL
In Flink 1.9.0, the community also added a preview feature about SQL DDL, but only for batch style DDLs. Therefore, all streaming related concepts are not supported yet, for example watermarks.
Related issues:
- FLINK-13661: Add a stream specific CREATE TABLE SQL DDL
- FLINK-13568: DDL create table doesn’t allow STRING data type
Java 9 support
Since Flink 1.9.0, Flink can now be compiled and run on Java 9. Note that certain components interacting with external systems (connectors, filesystems, metric reporters, etc.) may not work since the respective projects may have skipped Java 9 support.
Related issues:
Memory management
In Fink 1.9.0 and prior version, the managed memory fraction of taskmanager is controlled by taskmanager.memory.fraction
, and with 0.7 as the default value. However, sometimes this will cause OOMs due to the fact that the default value of JVM parameter NewRatio
is 2, which means the old generation occupied only 2/3 (0.66) of the heap memory. So if you run into this case, please manually change this value to a lower value.
Related issues:
Deprecations and breaking changes
Scala expression DSL for Table API moved to flink-table-api-scala
Since 1.9.0, the implicit conversions for the Scala expression DSL for the Table API has been moved to flink-table-api-scala
. This requires users to update the imports in their Table programs.
Users of pure Table programs should define their imports like:
1 | import org.apache.flink.table.api._ |
Users of the DataStream API should define their imports like:
1 | import org.apache.flink.table.api._ |
Related issues:
Failover strategies
As a result of completing fine-grained recovery (FLIP-1), Flink will now attempt to only restart tasks that are connected to failed tasks through a pipelined connection. By default, the region
failover strategy is used.
Users who were not using a restart strategy or have already configured a failover strategy should not be affected. Moreover, users who already enabled the region
failover strategy, along with a restart strategy that enforces a certain number of restarts or introduces a restart delay, will see changes in behavior. The region
failover strategy now correctly respects constraints that are defined by the restart strategy.
Streaming users who were not using a failover strategy may be affected if their jobs are embarrassingly parallel or contain multiple independent jobs. In this case, only the failed parallel pipeline or affected jobs will be restarted.
Batch users may be affected if their job contains blocking exchanges (usually happens for shuffles) or the ExecutionMode
was set to BATCH
or BATCH_FORCED
via the ExecutionConfig
.
Overall, users should see an improvement in performance.
Related issues:
- FLINK-13223: Set jobmanager.execution.failover-strategy to region in default flink-conf.yaml
- FLINK-13060: FailoverStrategies should respect restart constraints
Job termination via CLI
With the support of graceful job termination with savepoints for semantic correctness (FLIP-34), a few changes related to job termination has been made to the CLI.
From now on, the stop
command with no further arguments stops the job with a savepoint targeted at the default savepoint location (as configured via the state.savepoints.dir
property in the job configuration), or a location explicitly specified using the -p
option. Please make sure to configure the savepoint path using either one of these options.
Since job terminations are now always accompanied with a savepoint, stopping jobs is expected to take longer now.
Related issues:
- FLINK-13123: Align Stop/Cancel Commands in CLI and REST Interface and Improve Documentation
- FLINK-11458: Add TERMINATE/SUSPEND Job with Savepoint
Network stack
A few changes in the network stack related to changes in the threading model of StreamTask
to a mailbox-based approach requires close attention to some related configuration:
- Due to changes in the lifecycle management of result partitions, partition requests as well as re-triggers will now happen sooner. Therefore, it is possible that some jobs with long deployment times and large state might start failing more frequently with
PartitionNotFound
exceptions compared to previous versions. If that’s the case, users should increase the value oftaskmanager.network.request-backoff.max
in order to have the same effective partition request timeout as it was prior to 1.9.0. - To avoid a potential deadlock, a timeout has been added for how long a task will wait for assignment of exclusive memory segments. The default timeout is 30 seconds, and is configurable via
taskmanager.network.memory.exclusive-buffers-request-timeout-ms
. It is possible that for some previously working deployments this default timeout value is too low and might have to be increased.
Please also notice that several network I/O metrics have had their scope changed. See the 1.9 metrics documentation for which metrics are affected. In 1.9.0, these metrics will still be available under their previous scopes, but this may no longer be the case in future versions.
Related issues:
- FLINK-13013: Make sure that SingleInputGate can always request partitions
- FLINK-12852: Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
- FLINK-12555: Introduce an encapsulated metric group layout for shuffle API and deprecate old one
AsyncIO
Due to a bug in the AsyncWaitOperator
, in 1.9.0 the default chaining behaviour of the operator is now changed so that it is never chained after another operator. This should not be problematic for migrating from older version snapshots as long as an uid was assigned to the operator. If an uid was not assigned to the operator, please see the instructions here for a possible workaround.
Related issues:
Connectors and Libraries
Introduced KafkaSerializationSchema
to fully replace KeyedSerializationSchema
The universal FlinkKafkaProducer
(in flink-connector-kafka
) supports a new KafkaSerializationSchema
that will fully replace KeyedSerializationSchema
in the long run. This new schema allows directly generating Kafka ProducerRecord
s for sending to Kafka, therefore enabling the user to use all available Kafka features (in the context of Kafka records).
Dropped connectors and libraries
- The Elasticsearch 1 connector has been dropped and will no longer receive patches. Users may continue to use the connector from a previous series (like 1.8) with newer versions of Flink. It is being dropped due to being used significantly less than more recent versions (Elasticsearch versions 2.x and 5.x are downloaded 4 to 5 times more), and hasn’t seen any development for over a year.
- The older Python APIs for batch and streaming have been removed and will no longer receive new patches. A new API is being developed based on the Table API as part of FLINK-12308: Support python language in Flink Table API. Existing users may continue to use these older APIs with future versions of Flink by copying both the
flink-streaming-python
andflink-python
jars into the/lib
directory of the distribution and the corresponding start scriptspyflink-stream.sh
andpyflink.sh
into the/bin
directory of the distribution. - The older machine learning libraries have been removed and will no longer receive new patches. This is due to efforts towards a new Table-based machine learning library (FLIP-39). Users can still use the 1.8 version of the legacy library if their projects still rely on it.
Related issues:
- FLINK-11693: Add KafkaSerializationSchema that directly uses ProducerRecord
- FLINK-12151: Drop Elasticsearch 1 connector
- FLINK-12903: Remove legacy flink-python APIs
- FLINK-12308: Support python language in Flink Table API
- FLINK-12597: Remove the legacy flink-libraries/flink-ml
MapR dependency removed
Dependency on MapR vendor-specific artifacts has been removed, by changing the MapR filesystem connector to work purely based on reflection. This does not introduce any regession in the support for the MapR filesystem. The decision to remove hard dependencies on the MapR artifacts was made due to very flaky access to the secure https endpoint of the MapR artifact repository, and affected build stability of Flink.
Related issues:
- FLINK-12578: Use secure URLs for Maven repositories
- FLINK-13499: Remove dependency on MapR artifact repository
StateDescriptor interface change
Access to the state serializer in StateDescriptor
is now modified from protected to private access. Subclasses should use the StateDescriptor#getSerializer()
method as the only means to obtain the wrapped state serializer.
Related issues:
Web UI dashboard
The web frontend of Flink has been updated to use the latest Angular version (7.x). The old frontend remains available in Flink 1.9.x, but will be removed in a later Flink release once the new frontend is considered stable.
Related issues:
Flink 1.8
State
Continuous incremental cleanup of old Keyed State with TTL
We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (FLINK-9510). This feature allowed to clean up and make inaccessible keyed state entries when accessing them. In addition state would now also being cleaned up when writing a savepoint/checkpoint.
Flink 1.8 introduces continous cleanup of old entries for both the RocksDB state backend (FLINK-10471) and the heap state backend (FLINK-10473). This means that old entries (according to the ttl setting) are continously being cleanup up.
New Support for Schema Migration when restoring Savepoints
With Flink 1.7.0 we added support for changing the schema of state when using the AvroSerializer
(FLINK-10605). With Flink 1.8.0 we made great progress migrating all built-in TypeSerializers
to a new serializer snapshot abstraction that theoretically allows schema migration. Of the serializers that come with Flink, we now support schema migration for the PojoSerializer
(FLINK-11485), and Java EnumSerializer
(FLINK-11334), As well as for Kryo in limited cases (FLINK-11323).
Savepoint compatibility
Savepoints from Flink 1.2 that contain a Scala TraversableSerializer
are not compatible with Flink 1.8 anymore because of an update in this serializer (FLINK-11539). You can get around this restriction by first upgrading to a version between Flink 1.3 and Flink 1.7 and then updating to Flink 1.8.
RocksDB version bump and switch to FRocksDB (FLINK-10471)
We needed to switch to a custom build of RocksDB called FRocksDB because we need certain changes in RocksDB for supporting continuous state cleanup with TTL. The used build of FRocksDB is based on the upgraded version 5.17.2 of RocksDB. For Mac OS X, RocksDB version 5.17.2 is supported only for OS X version >= 10.13. See also: https://github.com/facebook/rocksdb/issues/4862.
Maven Dependencies
Changes to bundling of Hadoop libraries with Flink (FLINK-11266)
Convenience binaries that include hadoop are no longer released.
If a deployment relies on flink-shaded-hadoop2
being included in flink-dist
, then you must manually download a pre-packaged Hadoop jar from the optional components section of the download page and copy it into the /lib
directory. Alternatively, a Flink distribution that includes hadoop can be built by packaging flink-dist
and activating the include-hadoop
maven profile.
As hadoop is no longer included in flink-dist
by default, specifying -DwithoutHadoop
when packaging flink-dist
no longer impacts the build.
Configuration
TaskManager configuration (FLINK-11716)
TaskManagers
now bind to the host IP address instead of the hostname by default . This behaviour can be controlled by the configuration option taskmanager.network.bind-policy
. If your Flink cluster should experience inexplicable connection problems after upgrading, try to set taskmanager.network.bind-policy: name
in your flink-conf.yaml
to return to the pre-1.8 behaviour.
Table API
Deprecation of direct Table
constructor usage (FLINK-11447)
Flink 1.8 deprecates direct usage of the constructor of the Table
class in the Table API. This constructor would previously be used to perform a join with a lateral table. You should now use table.joinLateral()
or table.leftOuterJoinLateral()
instead.
This change is necessary for converting the Table class into an interface, which will make the API more maintainable and cleaner in the future.
Introduction of new CSV format descriptor (FLINK-9964)
This release introduces a new format descriptor for CSV files that is compliant with RFC 4180. The new descriptor is available as org.apache.flink.table.descriptors.Csv
. For now, this can only be used together with the Kafka connector. The old descriptor is available as org.apache.flink.table.descriptors.OldCsv
for use with file system connectors.
Deprecation of static builder methods on TableEnvironment (FLINK-11445)
In order to separate API from actual implementation, the static methods TableEnvironment.getTableEnvironment()
are deprecated. You should now use Batch/StreamTableEnvironment.create()
instead.
Change in the Maven modules of Table API (FLINK-11064)
Users that had a flink-table
dependency before, need to update their dependencies to flink-table-planner
and the correct dependency of flink-table-api-*
, depending on whether Java or Scala is used: one of flink-table-api-java-bridge
or flink-table-api-scala-bridge
.
Change to External Catalog Table Builders (FLINK-11522)
ExternalCatalogTable.builder()
is deprecated in favour of ExternalCatalogTableBuilder()
.
Change to naming of Table API connector jars (FLINK-11026)
The naming scheme for kafka/elasticsearch6 sql-jars has been changed.
In maven terms, they no longer have the sql-jar
qualifier and the artifactId is now prefixed with flink-sql
instead of flink
, e.g., flink-sql-connector-kafka...
.
Change to how Null Literals are specified (FLINK-11785)
Null literals in the Table API need to be defined with nullOf(type)
instead of Null(type)
from now on. The old approach is deprecated.
Connectors
Introduction of a new KafkaDeserializationSchema that give direct access to ConsumerRecord (FLINK-8354)
For the Flink KafkaConsumers
, we introduced a new KafkaDeserializationSchema
that gives direct access to the Kafka ConsumerRecord
. This subsumes the KeyedSerializationSchema
functionality, which is deprecated but still available for now.
FlinkKafkaConsumer will now filter restored partitions based on topic specification (FLINK-10342)
Starting from Flink 1.8.0, the FlinkKafkaConsumer
now always filters out restored partitions that are no longer associated with a specified topic to subscribe to in the restored execution. This behaviour did not exist in previous versions of the FlinkKafkaConsumer
. If you wish to retain the previous behaviour, please use the disableFilterRestoredPartitionsWithSubscribedTopics()
configuration method on the FlinkKafkaConsumer
.
Consider this example: if you had a Kafka Consumer that was consuming from topic A
, you did a savepoint, then changed your Kafka consumer to instead consume from topic B
, and then restarted your job from the savepoint. Before this change, your consumer would now consume from both topic A
and B
because it was stored in state that the consumer was consuming from topic A
. With the change, your consumer would only consume from topic B
after restore because we filter the topics that are stored in state using the configured topics.
Miscellaneous Interface changes
The canEqual() method was dropped from the TypeSerializer interface (FLINK-9803)
The canEqual()
methods are usually used to make proper equality checks across hierarchies of types. The TypeSerializer
actually doesn’t require this property, so the method is now removed.
Removal of the CompositeSerializerSnapshot utility class (FLINK-11073)
The CompositeSerializerSnapshot
utility class has been removed. You should now use CompositeTypeSerializerSnapshot
instead, for snapshots of composite serializers that delegate serialization to multiple nested serializers. Please see here for instructions on using CompositeTypeSerializerSnapshot
.
Memory management
In Fink 1.8.0 and prior version, the managed memory fraction of taskmanager is controlled by taskmanager.memory.fraction
, and with 0.7 as the default value. However, sometimes this will cause OOMs due to the fact that the default value of JVM parameter NewRatio
is 2, which means the old generation occupied only 2/3 (0.66) of the heap memory. So if you run into this case, please manually change this value to a lower value.
Flink 1.7
Scala 2.12 support
When using Scala 2.12
you might have to add explicit type annotations in places where they were not required when using Scala 2.11
. This is an excerpt from the TransitiveClosureNaive.scala
example in the Flink code base that shows the changes that could be required.
Previous code:
1 | val terminate = prevPaths |
With Scala 2.12
you have to change it to:
1 | val terminate = prevPaths |
The reason for this is that Scala 2.12
changes how lambdas are implemented. They now use the lambda support using SAM interfaces introduced in Java 8. This makes some method calls ambiguous because now both Scala-style lambdas and SAMs are candidates for methods were it was previously clear which method would be invoked.
State evolution
Before Flink 1.7, serializer snapshots were implemented as a TypeSerializerConfigSnapshot
(which is now deprecated, and will eventually be removed in the future to be fully replaced by the new TypeSerializerSnapshot
interface introduced in 1.7). Moreover, the responsibility of serializer schema compatibility checks lived within the TypeSerializer
, implemented in the TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)
method.
To be future-proof and to have flexibility to migrate your state serializers and schema, it is highly recommended to migrate from the old abstractions. Details and migration guides can be found here.
Removal of the legacy mode
Flink no longer supports the legacy mode. If you depend on this, then please use Flink 1.6.x
.
Savepoints being used for recovery
Savepoints are now used while recovering. Previously when using exactly-once sink one could get into problems with duplicate output data when a failure occurred after a savepoint was taken but before the next checkpoint occurred. This results in the fact that savepoints are no longer exclusively under the control of the user. Savepoint should not be moved nor deleted if there was no newer checkpoint or savepoint taken.
MetricQueryService runs in separate thread pool
The metric query service runs now in its own ActorSystem
. It needs consequently to open a new port for the query services to communicate with each other. The query service port can be configured in flink-conf.yaml
.
Granularity of latency metrics
The default granularity for latency metrics has been modified. To restore the previous behavior users have to explicitly set the granularity to subtask
.
Latency marker activation
Latency metrics are now disabled by default, which will affect all jobs that do not explicitly set the latencyTrackingInterval
via ExecutionConfig#setLatencyTrackingInterval
. To restore the previous default behavior users have to configure the latency interval in flink-conf.yaml
.
Relocation of Hadoop’s Netty dependency
We now also relocate Hadoop’s Netty dependency from io.netty
to org.apache.flink.hadoop.shaded.io.netty
. You can now bundle your own version of Netty into your job but may no longer assume that io.netty
is present in the flink-shaded-hadoop2-uber-*.jar
file.
Local recovery fixed
With the improvements to Flink’s scheduling, it can no longer happen that recoveries require more slots than before if local recovery is enabled. Consequently, we encourage our users to enable local recovery in flink-conf.yaml
.
Support for multi slot TaskManagers
Flink now properly supports TaskManagers
with multiple slots. Consequently, TaskManagers
can now be started with an arbitrary number of slots and it is no longer recommended to start them with a single slot.
StandaloneJobClusterEntrypoint generates JobGraph with fixed JobID
The StandaloneJobClusterEntrypoint
, which is launched by the script standalone-job.sh
and used for the job-mode container images, now starts all jobs with a fixed JobID
. Thus, in order to run a cluster in HA mode, one needs to set a different cluster id for each job/cluster.
Scala shell does not work with Scala 2.12
Flink’s Scala shell does not work with Scala 2.12. Therefore, the module flink-scala-shell
is not being released for Scala 2.12.
See FLINK-10911 for more details.
Limitations of failover strategies
Flink’s non-default failover strategies are still a very experimental feature which come with a set of limitations. You should only use this feature if you are executing a stateless streaming job. In any other cases, it is highly recommended to remove the config option jobmanager.execution.failover-strategy
from your flink-conf.yaml
or set it to "full"
.
In order to avoid future problems, this feature has been removed from the documentation until it will be fixed. See FLINK-10880 for more details.
SQL over window preceding clause
The over window preceding
clause is now optional. It defaults to UNBOUNDED
if not specified.
OperatorSnapshotUtil writes v2 snapshots
Snapshots created with OperatorSnapshotUtil
are now written in the savepoint format v2
.
SBT projects and the MiniClusterResource
If you have a sbt
project which uses the MiniClusterResource
, you now have to add the flink-runtime
test-jar dependency explicitly via:
1 | libraryDependencies += "org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests" |
The reason for this is that the MiniClusterResource
has been moved from flink-test-utils
to flink-runtime
. The module flink-test-utils
has correctly a test-jar
dependency on flink-runtime
. However, sbt
does not properly pull in transitive test-jar
dependencies as described in this sbt issue. Consequently, it is necessary to specify the test-jar
dependency explicitly.
Flink 1.6
Changed Configuration Default Values
The default value of the slot idle timeout slot.idle.timeout
is set to the default value of the heartbeat timeout (50 s
).
Changed ElasticSearch 5.x Sink API
Previous APIs in the Flink ElasticSearch 5.x Sink’s RequestIndexer
interface have been deprecated in favor of new signatures. When adding requests to the RequestIndexer
, the requests now must be of type IndexRequest
, DeleteRequest
, or UpdateRequest
, instead of the base ActionRequest
.
Limitations of failover strategies
Flink’s non-default failover strategies are still a very experimental feature which come with a set of limitations. You should only use this feature if you are executing a stateless streaming job. In any other cases, it is highly recommended to remove the config option jobmanager.execution.failover-strategy
from your flink-conf.yaml
or set it to "full"
.
In order to avoid future problems, this feature has been removed from the documentation until it will be fixed. See FLINK-10880 for more details.
Flink 1.5
These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.4 and Flink 1.5. Please read these notes carefully if you are planning to upgrade your Flink version to 1.5.
Update Configuration for Reworked Job Deployment
Flink’s reworked cluster and job deployment component improves the integration with resource managers and enables dynamic resource allocation. One result of these changes is, that you no longer have to specify the number of containers when submitting applications to YARN and Mesos. Flink will automatically determine the number of containers from the parallelism of the application.
Although the deployment logic was completely reworked, we aimed to not unnecessarily change the previous behavior to enable a smooth transition. Nonetheless, there are a few options that you should update in your conf/flink-conf.yaml
or know about.
- The allocation of TaskManagers with multiple slots is not fully supported yet. Therefore, we recommend to configure TaskManagers with a single slot, i.e., set
taskmanager.numberOfTaskSlots: 1
- If you observed any problems with the new deployment mode, you can always switch back to the pre-1.5 behavior by configuring
mode: legacy
.
Please report any problems or possible improvements that you notice to the Flink community, either by posting to a mailing list or by opening a JIRA issue.
Note: We plan to remove the legacy mode in the next release.
Update Configuration for Reworked Network Stack
The changes on the networking stack for credit-based flow control and improved latency affect the configuration of network buffers. In a nutshell, the networking stack can require more memory to run applications. Hence, you might need to adjust the network configuration of your Flink setup.
There are two ways to address problems of job submissions that fail due to lack of network buffers.
- Reduce the number of buffers per channel, i.e.,
taskmanager.network.memory.buffers-per-channel
or - Increase the amount of TaskManager memory that is used by the network stack, i.e., increase
taskmanager.network.memory.fraction
and/ortaskmanager.network.memory.max
.
Please consult the section about network buffer configuration in the Flink documentation for details. In case you experience issues with the new credit-based flow control mode, you can disable flow control by setting taskmanager.network.credit-model: false
.
Note: We plan to remove the old model and this configuration in the next release.
Hadoop Classpath Discovery
We removed the automatic Hadoop classpath discovery via the Hadoop binary. If you want Flink to pick up the Hadoop classpath you have to export HADOOP_CLASSPATH
. On cloud environments and most Hadoop distributions you would do
1 | export HADOOP_CLASSPATH=`hadoop classpath`. |
Breaking Changes of the REST API
In an effort to harmonize, extend, and improve the REST API, a few handlers and return values were changed.
- The jobs overview handler is now registered under
/jobs/overview
(before/joboverview
) and returns a list of job details instead of the pre-grouped view of running, finished, cancelled and failed jobs. - The REST API to cancel a job was changed.
- The REST API to cancel a job with savepoint was changed.
Please check the REST API documentation for details.
Kafka Producer Flushes on Checkpoint by Default
The Flink Kafka Producer now flushes on checkpoints by default. Prior to version 1.5, the behaviour was disabled by default and users had to explicitly call setFlushOnCheckpoints(true)
on the producer to enable it.
Updated Kinesis Dependency
The Kinesis dependencies of Flink’s Kinesis connector have been updated to the following versions.
1 | <aws.sdk.version>1.11.319</aws.sdk.version> |
Limitations of failover strategies
Flink’s non-default failover strategies are still a very experimental feature which come with a set of limitations. You should only use this feature if you are executing a stateless streaming job. In any other cases, it is highly recommended to remove the config option jobmanager.execution.failover-strategy
from your flink-conf.yaml
or set it to "full"
.
In order to avoid future problems, this feature has been removed from the documentation until it will be fixed. See FLINK-10880 for more details.